消费 MySQL 系列订阅数据操作指导

最近更新时间:2024-09-24 18:02:21

我的收藏

操作场景

本操作适用于 MySQL/TDSQL-C MySQL/MariaDB/Percona/TDSQL MySQL 的数据订阅,数据订阅 Kafka 版(当前 Kafka Server 版本为V2.6.0)中,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据,本文为您提供了 Java、Go、Python 语言的客户端消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。

注意事项

1. Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
2. 目前不支持通过外网连接数据订阅的 Kafka 进行消费,只支持腾讯云内网的访问,并且订阅的数据库实例所属地域与数据消费的地域相同。
3. 数据格式选择 Avro、JSON 时,DTS 订阅对于大消息的处理存在如下限制。
2024年3月25日之前创建的任务,DTS 订阅处理单条消息有一定上限,当源库中的单行数据超过5MB时,订阅任务可能会报错。
2024年3月25日及之后创建的任务,当源库中的单行数据超过5MB时,订阅任务不会报错,使用最新的消费 Demo 即可获得大消息的消费数据。
4. 在订阅指定库/表对象(非源实例全部),并且采用 Kafka 单分区的场景中,DTS 解析增量数据后,仅将订阅对象的数据写入 Kafka Topic 中,其他非订阅对象的数据会转成空事务写入 Kafka Topic,所以在消费数据时会出现空事务。空事务的 Begin/Commit 消息中保留了事务的 GTID 信息,可以保证 GTID 的连续性和完整性,同时,在 MySQL/TDSQL-C MySQL 的消费 Demo 中,多个空事务也做了压缩处理以减少消息数量。
5. 为了保证数据可重入,DTS 订阅引入 Checkpoint 机制。消息写入 Kafka Topic 时,一般每10秒会插入一个 Checkpoint,用来标识数据同步的位点,在任务中断后再重启识别断点位置,实现断点续传。另外,消费端遇到 Checkpoint 消息会做一次 Kafka 消费位点提交,以便及时更新消费位点。
6. 数据格式选择 JSON 时,如果您使用过或者熟悉开源订阅工具 Canal,可以选择将这里消费出来的 JSON 格式数据转换成 Canal 工具兼容的数据格式,再进行后续处理,我们的 Demo 中已经提供了相关支持,在启动 Demo 的参数中添加参数 trans2canal 即可实现。目前该功能仅限 Java 语言支持。
7. 选择 ProtoBuf 和 JSON 数据格式时,对于源端 MySQL 的数值类型,DTS 写入 Kafka 时会转为字符串类型,其中 FLOAT 和 DOUBLE 类型在转为字符串时如果绝对值小于 1e-4 或大于等于 1e+6,那么结果将使用科学计数法表示。科学计数法以尾数乘以10的幂的形式表示浮点数,这样可以更清晰地表示其数量级和精确值,同时能够节省存储空间。如果在消费端用户需要转换为数值类型,请注意对科学记数法表示的浮点数字符串做特殊处理。
8. 订阅任务的数据格式选择 JSON 时,如果源库 MySQL 参数 binlog_row_image 设置为 minimal,因为 minimal 的实现为,只将有影响的列记录到 binlog,没有影响的列不记录,所以消费结果 UPDATE 语句中对于未更新字段的值是缺失的,DTS 为了与 NULL 值进行区分,使用 ##DTS_NA_VALUE## 表示未更新字段的值。
如果用户希望消费结果的 UPDATE 语句包含未更新字段的值,则需要将源端 MySQL 参数 binlog_row_image 设置为 full,然后重建订阅任务。
ProtoBuf 与 Avro 格式由于是二进制格式,不需要通过特殊字符串区分缺失字段值与 NULL 值,所以不涉及该问题。

Demo 下载

在配置订阅任务中,您可以选择不同的订阅数据格式,ProtoBuf、Avro 和 JSON。ProtoBuf 和 Avro 采用二进制格式,消费效率更高,JSON 采用轻量级的文本格式,更加简单易用。订阅任务中选择了哪种格式,这里就需要下载对应格式的 Demo。
如下 Demo 示例中已包含了对应的 Protobuf/Avro/JSON 协议文件,您无需另外下载。如果您选择自行下载 Protobuf 协议文件,请使用 Protobuf 3.X 版本进行代码生成,以便数据结构可以正确兼容。
Demo 中的逻辑讲解及关键参数说明,请参考 Demo 说明
Demo 语言
ProtoBuf(MySQL/MariaDB/TDSQL-C MySQL/Percona)
ProtoBuf(TDSQL MySQL)
Avro(MySQL/TDSQL-C MySQL)
Avro(MySQL/TDSQL-C MySQL)兼容阿里云订阅服务的数据格式
JSON(MySQL/TDSQL-C MySQL)
Go
地址
地址
地址
-
地址
Java
地址
地址
地址
地址
地址
Python
地址
地址
地址
-
地址

Java Demo 操作步骤

编译环境:Maven 或者 Gradle 包管理工具,JDK8。 运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 JRE8 。 操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版
2. 创建一个或多个消费组,详情请参见 新增消费组
3. 下载 Java Demo ,然后解压该文件。
4. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件和 Gradle 相关配置文件,用户根据需要选用。
使用 Maven 进行打包:mvn clean package 。
使用 Gradle 进行打包:gradle fatJar 打包并包含所有依赖,或者 gradle jar 进行打包。
5. 运行 Demo。
使用 Maven 打包后,进入目标文件夹 target ,运行 java -jar sub_demo-1.0-SNAPSHOT-jar-with-dependencies.jar --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql
使用 Gradle 打包后,进入文件夹 build/libs ,运行 java -jar sub_demo-with-dependencies-1.0-SNAPSHOT.jar --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql
各参数详细说明如下:
参数
说明
brokers
数据订阅 Kafka 的内网访问地址,可在 订阅详情 页查看。
topic
数据订阅任务的订阅 topic,可在 订阅详情 页查看。
group
消费组名称。可在 消费管理 页查看。
user/password
消费账号和密码,可在 消费管理 页查看。
trans2sql
表示是否转换为 SQL 语句,携带该参数表示转换为 SQL 语句,不携带则不转换。
trans2canal
可选,仅对 JSON 数据格式支持,并限定 Java 语言。
数据格式选择 JSON 时,可以将 JSON 格式数据转换成 Canal 工具兼容的数据格式。携带该参数表示转换为 Canal 格式,不携带则不转换。
6. 观察消费情况。

用户也可以使用 IDE 进行编译打包,打包完成后,工程根目录 target 文件夹下的 sub_demo-1.0-SNAPSHOT-jar-with-dependencies 即为一个包含了所需依赖的可运行的 jar 包。

Golang Demo 操作步骤

编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。 运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址)。 操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版
2. 创建一个或多个消费组,详情请参见 新增消费组
3. 下载 Golang Demo,然后解压该文件。
4. 进入解压后的目录,运行 go build -o subscribe ./main,生成可执行文件 subscribe。
5. 运行如下代码。/subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true 其中,brokers 为数据订阅 Kafka 的内网访问地址,topic 为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看,groupuserpassword 分别为消费组的名称、账号和密码,可在 消费管理 页查看,trans2sql 表示是否转换为 SQL 语句。
6. 观察消费情况。


Python3 Demo 操作步骤

编译运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 Python3,pip3(用于依赖包安装)。 使用 pip3 安装依赖包:
pip install flag
pip install kafka-python
pip install protobuf
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版
2. 创建一个或多个消费组,详情请参见 新增消费组
3. 下载 Python3 Demo ,然后解压该文件。
4. 运行如下代码:python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1 其中,brokers 为数据订阅 Kafka 的内网访问地址,topic 为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看,groupuserpassword 分别为消费组的名称、账号和密码,可在 消费管理 页查看,trans2sql 表示是否转换为 SQL 语句。
5. 观察消费情况。