使用阿里云 DTS 同步 RDS 数据到 ClickHouse

在工作中,往往有一些需要对大批量数据进行分析的场景,依赖于传统关系型数据库(如 MySQL)的话,效率往往及其低下。 相比之下,一些 OLAP 数据库在统计分析等场景下表现优异,比如 ClickHouse。

比如:

  1. 扫描数据量:ClickHouse 采用列式存储,同一列的数据物理上连续存储,查询时只需读取目标列,而 MySQL 查询一列需要扫描 20 列数据(假设表有 20 列)
  2. 数据压缩:ClickHouse 存储的数据是经过压缩的,读取的时候可以减少 IO 的开销。
  3. 多核计算:ClickHouse 的查询任务可以自动拆分到多个 CPU 核心执行,充分利用现代硬件并行能力。

也就是说,不管是存储、读取还是计算,专门的 OLAP 数据库都比 OLTP 关系型数据库要高效。 正因如此,我们会选择使用一些 OLAP 数据库来处理一些数据分析的业务,比如 ClickHouse,这个时候如何将数据从 MySQL 同步到 ClickHouse 就成了一个亟需解决的问题。

如何将数据从 MySQL 同步到 ClickHouse

目前比较有效的在 MySQL 和 ClickHouse 之间同步数据的方式都是通过 binlog 捕获数据库变更,然后写入到 ClickHouse。比如有如下的一些方案:

  1. Debezium + Kafka:通过 Debezium 捕获 MySQL 的 Binlog 变更事件,经 Kafka 中转后写入 ClickHouse。
  2. Canal + Kafka:通过 Canal 捕获 MySQL 的 Binlog 变更事件,经 Kafka 中转后写入 ClickHouse。
  3. 使用阿里云 DTS(数据传输服务):DTS 支持 MySQL 到 ClickHouse 的实时同步。

但是直接通过捕获 MySQL 的 binlog 写入 ClickHouse,在运维层面复杂度还是偏高,如果使用的是阿里云的 RDS, 可以直接使用阿里云的 DTS 来进行 MySQL 到 ClickHouse 的同步,好处是我们不需要处理底层同步的一些细节问题。

DTS 里面包含了几种服务,本文只是使用它提供的数据订阅服务。

阿里云 DTS 数据订阅的工作原理

数据订阅支持实时拉取 RDS 实例的增量日志,用户可以通过 DTS 提供的 SDK 数据订阅服务端来订阅增量日志,同时可以根据业务需求,实现数据定制化消费。

在这个过程中,我们需要处理的就是 业务逻辑 部分,也就是获取到 RDS 的数据库变更之后,如何将这些变更写入到 ClickHouse。我们可以选择处理全部字段,或者只处理自己需要的某些字段。

如何使用阿里云 DTS 同步 MySQL 到 ClickHouse

具体文档可在阿里云文档查看:数据传输服务 - 快速入门 - 数据订阅操作指导

ClickHouse 安装配置(CentOS)

我们可以通过如下命令来安装 ClickHouse:

1
2
3
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
sudo yum install -y clickhouse-server clickhouse-client

安装完毕后,通过下面的命令来启动 ClickHouse 服务:

1
2
3
sudo systemctl enable clickhouse-server
sudo systemctl start clickhouse-server
sudo systemctl status clickhouse-server

如果一切顺利,我们会看到如下输出:

1
2
3
4
5
6
7
● clickhouse-server.service - ClickHouse Server (analytic DBMS for big data)
Loaded: loaded (/usr/lib/systemd/system/clickhouse-server.service; enabled; vendor preset: disabled)
Active: active (running) since Mon 2025-03-17 09:19:03 CST; 2 weeks 1 days ago
Main PID: 2865 (clickhouse-serv)
Tasks: 822
Memory: 1.3G
CGroup: /system.slice/clickhouse-server.service

如果不是绿色的 active,则可以通过 journalctl -xe -u clickhouse-server 查看一下具体错误。

下面是通过 yum install 安装的 ClickHouse 的一些目录:

  • /var/log/clickhouse-server 日志
  • /var/run/clickhouse-server 运行时的一些文件
  • /etc/clickhouse-server 配置目录
  • /var/lib/clickhouse

配置文件位置:/etc/clickhouse-server/config.xml,我们可以通过修改这个配置文件来调整 ClickHouse 服务的相关参数。如:

  • listen_host 修改绑定的 ip 地址,如果需要给本机以外的人访问,可以修改为 0.0.0.0

连接到 ClickHouse

我们可以通过下面的命令来连接到 ClickHouse:

1
clickhouse-client --host 127.0.0.1 --port 9011

这跟 MySQL 的命令行类似,除了样子长得像以外,很多的 SQL 语句都是跟 MySQL 一样的。比如,进入交互式的命令行之后,我们可以通过 show databases; 来查看有哪些数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
➜  ~ clickhouse-client --host 127.0.0.1 --port 9011
ClickHouse client version 25.2.1.3085 (official build).
Connecting to 127.0.0.1:9011 as user default.
Connected to ClickHouse server version 25.2.1.

Warnings:
* Linux transparent hugepages are set to "always". Check /sys/kernel/mm/transparent_hugepage/enabled

example.com :) show databases;

SHOW DATABASES

Query id: 27bcb695-878d-4f0d-8a6a-f3b07fa0d498

┌─name───────────────┐
1. │ INFORMATION_SCHEMA │
2. │ default │
3. │ information_schema │
4. │ system │
└────────────────────┘

4 rows in set. Elapsed: 0.005 sec.

注意:这里的 9011 端口是我本地的端口,不是默认的端口。

创建对应的 ClickHouse 数据库和表

现在,假设我们的 MySQL 中有一个名字为 order 的数据库,里面有一个 orders 的表。然后我们需要将其中的一些字段同步到 ClickHouse 中,首先我们需要创建对应的数据库和表。

  1. 创建数据库
1
create database order;
  1. 创建表
1
2
3
4
5
6
7
8
9
10
11
12
13
create table `orders`(
id UInt64,
company_id UInt64,
supplier_id UInt64,
order_total_fee Decimal(20, 4),
coupon_id Nullable(UInt64),
coupon_price Nullable(Decimal(20, 4)),
deleted_at Nullable(Datetime),
created_at Datetime,
PRIMARY KEY (created_at, id)
) ENGINE = MergeTree()
PARTITION BY toStartOfMonth(created_at)
ORDER BY (created_at, id);

初次同步数据到 ClickHouse

因为 DTS 只能捕获增量数据,初次同步的时候我们需要手动将 MySQL 中的全量数据导入到 ClickHouse。如果表的数据写入频繁的话,可能需要停服之后再进行此操作。

我们可以通过下面的命令来从 MySQL 直接导数据到 ClickHouse:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
INSERT INTO orders SELECT
id,
company_id,
supplier_id,
order_total_fee,
coupon_id,
coupon_price,
deleted_at,
created_at
FROM
mysql (
'127.0.0.1:3306',
'order',
'orders',
'账号',
'密码')

在实际执行中,将 MySQL 里面的信息替换为你自己的 MySQL 配置即可。在导入数据完成后,可以通过下面的命令来查看导入是否成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
example.com :) select count(*) from orders;

SELECT count(*)
FROM orders

Query id: 227ce057-5949-4d33-b5ae-2f2ce7e96e38

┌─count()─┐
1. │ 25781 │
└─────────┘

1 row in set. Elapsed: 0.039 sec. Processed 25.78 thousand rows, 103.13 KB (668.04 thousand rows/s., 2.67 MB/s.)
Peak memory usage: 401.44 KiB.

如果 count 的值跟 MySQL 中的数据一致,则说明导入成功。

在这一步执行完成之后,就可以启动 DTS 的数据订阅服务了。后续的数据库变动会被捕获到,直到我们 ack 为止。

MySQL 跟 ClickHouse 的字段对应关系

太长了,自己复制到浏览器打开:

https://help.aliyun.com/zh/dts/user-guide/use-a-kafka-client-to-consume-tracked-data-2#section-woc-4pq-mes

编写数据订阅的代码(Python)

官方 demo 在:https://github.com/aliyun/aliyun-dts-subscribe-demo/tree/master/python

核心代码如下(比较标准的 Kafka 消费者代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
dts_record_schema = schema.load_schema(
"dts_record_avsc/com.alibaba.dts.formats.avro.Record.avsc"
)

def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
data = schemaless_reader(message_bytes, dts_record_schema)
return data

if __name__ == "__main__":
try:
# Kafka Consumer 配置参数
topic_name = "cn_hangzhou_vpc_rm_bp1907x8zbo20z60u_dts_upgrade_from_old_version2"
auto_commit = False
# 消费组 ID
group_id = "dtse9gh4883283o991"
sasl_mechanism = "PLAIN"
security_protocol = "SASL_PLAINTEXT"
username = "xiaqiutest"
password = "DTStest1234"
bootstrap_servers = ["dts-cn-hangzhou.aliyuncs.com:18001"]

# 如果username不含有group_id,则更新username为username-group_id
if group_id not in username:
username = username + "-" + group_id

# 创建 KafkaConsumer 实例
consumerGroupHandler = KafkaConsumer(
topic_name,
enable_auto_commit=auto_commit,
group_id=group_id,
sasl_mechanism=sasl_mechanism,
security_protocol=security_protocol,
sasl_plain_username=username,
sasl_plain_password=password,
bootstrap_servers=bootstrap_servers,
)

print("start")
for msg in consumerGroupHandler:
record = decode(msg.value)
# import datetime

# sourceTimestamp = record.get("sourceTimestamp")
# formatted_date = datetime.datetime.fromtimestamp(sourceTimestamp).strftime(
# "%Y-%m-%d %H:%M:%S"
# )
# print(formatted_date)
print(record)

print("end")
except Exception as e:
print(e)
traceback.print_exc()

但是官方的 demo 实在是过于简陋,所以这里会针对里面一些比较关键的地方说明一下:

  • schema.load_schemaschemaless_readerpython 版本的 SDK 使用了 fastavro 来解析 Avro 格式的数据,所以需要用到 fastavro.schemaless_reader 来读取数据。
  • Kafka Consumer 配置参数在 DTS 订阅任务那里都可以看到,其中 username 和 password 是新建订阅任务的时候自己填写的。
  • record = decode(msg.value) 这一行会将 Kafka 中的数据解码成字典格式,这样就可以直接使用了。
  • record 里面包含的字段可以看 dts_record_avsc/com.alibaba.dts.formats.avro.Record.avsc 这里面的定义。
  • operation 字段表示的是操作类型,比如 INSERT, UPDATE, DELETE 等。我们可以根据这个字段来决定在 ClickHouse 中执行什么样的操作。
  • recordbeforeImages 是数据变更前的字段值,afterImages 是数据变更后的字段值。在执行 insert 操作的时候 beforeImages 是空的,在执行 delete 操作的时候 afterImages 是空的。update 的时候,两个都有值,分别是变动前后的字段变动。

官方 demo 的代码只是给出了基本的消费的代码,在实际开发中,我们还需要根据 operation 字段来决定在 ClickHouse 中执行什么样的操作。如,我们可以像下面这样做一些插入、删除的操作:

1
2
3
4
5
6
7
if op == 'INSERT':
self.insert(...)
if op == 'DELETE':
self.clickhouse_client.delete(...)
if op == 'UPDATE':
self.clickhouse_client.delete(...)
self.insert(...)

更具体代码自行实现了。需要注意的是,在插入之前,可能需要做一些数据类型转换的操作,否则可能在插入到 ClickHouse 的时候出现类型不匹配的问题。

部署(CentOS)

本文以 condasupervisor 来作为示例部署。

Python 环境(conda)

由于 Python 各版本的差异,所以更推荐使用 conda 来管理 Python 环境,我们可以通过以下命令来安装一下 miniconda

1
2
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash ~/Miniconda3-latest-Linux-x86_64.sh

安装到默认的 ~/miniconda3 目录即可。

conda 的常用命令:

1
2
/root/miniconda3/bin/conda env list # 列出所有环境
/root/miniconda3/bin/conda create --name dts python=3.11 # 创建 py 环境

更详细的内容可参考官方文档:https://www.anaconda.com/docs/getting-started/miniconda/install#macos-linux-installation

supervisor

1
2
3
4
5
6
7
8
9
10
[program:dts_consumer]
command=/root/miniconda3/envs/dts/bin/python main.py
numprocs=1
autostart=true
autorestart=true
startretries=3
user=root
redirect_stderr=true
directory=/home/www/dts-consumer
stdout_logfile=/var/log/dts_consumer.log

保存到服务器的 /etc/supervisord.d 目录,然后执行:

1
supervisorctl update

即可启动。

参考文档

  • DTS 常见问题 https://help.aliyun.com/zh/dts/support/faq#section-fle-1lu-lla
  • Kafka https://kafka.apache.org/intro
  • fastavro https://fastavro.readthedocs.io/en/latest/
  • 排查订阅任务问题 https://help.aliyun.com/zh/dts/user-guide/troubleshoot-issues-in-change-tracking-tasks
  • MySQL 字段类型与 dataTypeNumber 数值的对应关系 https://help.aliyun.com/zh/dts/user-guide/use-a-kafka-client-to-consume-tracked-data-2#section-woc-4pq-mes