操作场景
本操作适用于 MySQL/TDSQL-C MySQL/MariaDB/Percona/TDSQL MySQL 的数据订阅,数据订阅 Kafka 版(当前 Kafka Server 版本为V2.6.0)中,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据,本文为您提供了 Java、Go、Python 语言的客户端消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。
注意事项
1. Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
2. 目前不支持通过外网连接数据订阅的 Kafka 进行消费,只支持腾讯云内网的访问,并且订阅的数据库实例所属地域与数据消费的地域相同。
3. 数据格式选择 Avro、JSON 时,DTS 订阅对于大消息的处理存在如下限制。
2024年3月25日之前创建的任务,DTS 订阅处理单条消息有一定上限,当源库中的单行数据超过5MB时,订阅任务可能会报错。
2024年3月25日及之后创建的任务,当源库中的单行数据超过5MB时,订阅任务不会报错,使用最新的消费 Demo 即可获得大消息的消费数据。
4. 在订阅指定库/表对象(非源实例全部),并且采用 Kafka 单分区的场景中,DTS 解析增量数据后,仅将订阅对象的数据写入 Kafka Topic 中,其他非订阅对象的数据会转成空事务写入 Kafka Topic,所以在消费数据时会出现空事务。空事务的 Begin/Commit 消息中保留了事务的 GTID 信息,可以保证 GTID 的连续性和完整性,同时,在 MySQL/TDSQL-C MySQL 的消费 Demo 中,多个空事务也做了压缩处理以减少消息数量。
5. 为了保证数据可重入,DTS 订阅引入 Checkpoint 机制。消息写入 Kafka Topic 时,一般每10秒会插入一个 Checkpoint,用来标识数据同步的位点,在任务中断后再重启识别断点位置,实现断点续传。另外,消费端遇到 Checkpoint 消息会做一次 Kafka 消费位点提交,以便及时更新消费位点。
6. 数据格式选择 JSON 时,如果您使用过或者熟悉开源订阅工具 Canal,可以选择将这里消费出来的 JSON 格式数据转换成 Canal 工具兼容的数据格式,再进行后续处理,我们的 Demo 中已经提供了相关支持,在启动 Demo 的参数中添加参数 trans2canal 即可实现。目前该功能仅限 Java 语言支持。
7. 选择 ProtoBuf 和 JSON 数据格式时,对于源端 MySQL 的数值类型,DTS 写入 Kafka 时会转为字符串类型,其中 FLOAT 和 DOUBLE 类型在转为字符串时如果绝对值小于 1e-4 或大于等于 1e+6,那么结果将使用科学计数法表示。科学计数法以尾数乘以10的幂的形式表示浮点数,这样可以更清晰地表示其数量级和精确值,同时能够节省存储空间。如果在消费端用户需要转换为数值类型,请注意对科学记数法表示的浮点数字符串做特殊处理。
8. 订阅任务的数据格式选择 JSON 时,如果源库 MySQL 参数
binlog_row_image
设置为 minimal
,因为 minimal 的实现为,只将有影响的列记录到 binlog,没有影响的列不记录,所以消费结果 UPDATE 语句中对于未更新字段的值是缺失的,DTS 为了与 NULL 值进行区分,使用 ##DTS_NA_VALUE##
表示未更新字段的值。如果用户希望消费结果的 UPDATE 语句包含未更新字段的值,则需要将源端 MySQL 参数
binlog_row_image
设置为 full
,然后重建订阅任务。ProtoBuf 与 Avro 格式由于是二进制格式,不需要通过特殊字符串区分缺失字段值与 NULL 值,所以不涉及该问题。
Demo 下载
在配置订阅任务中,您可以选择不同的订阅数据格式,ProtoBuf、Avro 和 JSON。ProtoBuf 和 Avro 采用二进制格式,消费效率更高,JSON 采用轻量级的文本格式,更加简单易用。订阅任务中选择了哪种格式,这里就需要下载对应格式的 Demo。
如下 Demo 示例中已包含了对应的 Protobuf/Avro/JSON 协议文件,您无需另外下载。如果您选择自行下载 Protobuf 协议文件,请使用 Protobuf 3.X 版本进行代码生成,以便数据结构可以正确兼容。
Demo 中的逻辑讲解及关键参数说明,请参考 Demo 说明。
Java Demo 操作步骤
编译环境:Maven 或者 Gradle 包管理工具,JDK8。
运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 JRE8 。
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版。
2. 创建一个或多个消费组,详情请参见 新增消费组。
3. 下载 Java Demo ,然后解压该文件。
4. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件和 Gradle 相关配置文件,用户根据需要选用。
使用 Maven 进行打包:mvn clean package 。
使用 Gradle 进行打包:gradle fatJar 打包并包含所有依赖,或者 gradle jar 进行打包。
5. 运行 Demo。
使用 Maven 打包后,进入目标文件夹 target ,运行
java -jar sub_demo-1.0-SNAPSHOT-jar-with-dependencies.jar --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql
使用 Gradle 打包后,进入文件夹 build/libs ,运行
java -jar sub_demo-with-dependencies-1.0-SNAPSHOT.jar --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql
各参数详细说明如下:
6. 观察消费情况。
用户也可以使用 IDE 进行编译打包,打包完成后,工程根目录 target 文件夹下的 sub_demo-1.0-SNAPSHOT-jar-with-dependencies 即为一个包含了所需依赖的可运行的 jar 包。
Golang Demo 操作步骤
编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。
运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址)。
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版。
2. 创建一个或多个消费组,详情请参见 新增消费组。
3. 下载 Golang Demo,然后解压该文件。
4. 进入解压后的目录,运行
go build -o subscribe ./main
,生成可执行文件 subscribe。5. 运行如下代码。
/subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true
其中,brokers
为数据订阅 Kafka 的内网访问地址,topic
为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看,group
、user
、password
分别为消费组的名称、账号和密码,可在 消费管理 页查看,trans2sql
表示是否转换为 SQL 语句。6. 观察消费情况。
Python3 Demo 操作步骤
编译运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 Python3,pip3(用于依赖包安装)。
使用 pip3 安装依赖包:
pip install flagpip install kafka-pythonpip install protobuf
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版。
2. 创建一个或多个消费组,详情请参见 新增消费组。
3. 下载 Python3 Demo ,然后解压该文件。
4. 运行如下代码:
python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1
其中,brokers
为数据订阅 Kafka 的内网访问地址,topic
为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看,group
、user
、password
分别为消费组的名称、账号和密码,可在 消费管理 页查看,trans2sql
表示是否转换为 SQL 语句。5. 观察消费情况。