文档中心>数据集成>实践教程>MySQL + gh-ost 实时同步至 Kafka

MySQL + gh-ost 实时同步至 Kafka

最近更新时间:2023-12-28 08:54:41

我的收藏

业务场景

gh-ost 的业务场景是在 MySQL 中进行在线表结构变更,即 Online DDL,而不影响业务的正常运行。它可以解决传统的 alter table 或 create index 等命令导致的表锁、性能下降、同步延迟等问题。它适用于需要对表进行修改的场景,例如增加新列、添加索引、修改字段类型等。
本实践介绍使用 gh-ost 变更 MySQL 的表后,实时同步它的 DDL 变更记录到 Kafka。

gh-ost 使用前提

1. gh-ost 必须能访问 MySQL。
2. 如果 MySQL 是腾讯云的 CDB,在 gh-ost 执行命令的参数里面需要添加 `--aliyun-rds` 。
3. gh-ost 工具执行过程中,生成的临时表规则为 ^_(.*)_(gho|ghc|del)$,其中 (.*) 是变更表的名称,不支持自定义临时表的名称。
4. 其他 gh-ost 限制,可以参考:gh-ost/requirements-and-limitations.md at master · github/gh-ost · GitHub。

操作步骤

1. 创建 MySQL 实时同步到 Kafka 的整库同步任务。



2. 在任务配置中的数据来源设置高级设置开启同步 gh-ost 临时表功能。



3. 在任务配置中的运行设置消息处理策略选择需要的 DDL 消息处理策略,这里选择下发新增列,删除列,重命名列和修改列类型。配置完其余选项后,提交任务。



4. 使用 gh-ost 变更原始表。
4.1 查看表结构。
mysql> desc tb1;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id | int | NO | PRI | NULL | |
| name | varchar(255) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+
2 rows in set (0.00 sec)
4.2 使用 gh-ost 工具在 源表 上增加一个字段。
gh-ost 工具下载:Releases · github/gh-ost · GitHub
下载后解压,执行下面命令,为 `tb1` 表添加 `c` 字段。
./gh-ost \\
--max-load=Threads_running=25 \\
--critical-load=Threads_running=1000 \\
--chunk-size=1000 \\
--throttle-control-replicas="" \\
--max-lag-millis=1500 \\
--user="root" \\ // 用户名
--password="test" \\ // 密码
--host=127.0.0.1 \\ // mysql 的ip
--allow-on-master \\
--database="databaseName" \\ // 需要变更表所在的数据库名称
--table="tb1" \\ // 需要变更的表名称
--verbose \\
--alter="engine=innodb" \\
--switch-to-rbr \\
--allow-master-master \\
--exact-rowcount \\
--concurrent-rowcount \\
--default-retries=120 \\
--panic-flag-file=/tmp/ghost.panic.flag \\
--default-retries=120 \\
--alter="add column c varchar(255);" \\ // alter 变更语句
--approve-renamed-columns \\
--initially-drop-ghost-table \\
--initially-drop-old-table \\
--ok-to-drop-table \\
--aliyun-rds \\ // 使用腾讯云CDB,需要添加这个参数
--execute
4.3 查看变更后的表结构。
mysql> desc tb1;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id | int | NO | PRI | NULL | |
| name | varchar(255) | YES | | NULL | |
| c | varchar(255) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)
5. 在 Kafka 上查看结果。
{"data":[],"es":1683519346931,"table":"tb1","type":"INSERT","database":"databaseName","ts":0,"sql":"alter table `databaseName`.`tb1` add column c varchar(255)","mysqlType":{"id":"INT(-1)","name":"VARCHAR(255)","c":"VARCHAR(255)"},"sqlType":{"id":4,"name":12,"c":12},"isDdl":true,"pkNames":["id"],"operation":{"type":"alterOperation","alterColumns":[{"alterType":"ADD_COLUMN","newColumn":{"name":"c","definition":["255"],"jdbcType":12,"isNullable":true,"defaultValue":"","comment":"","nullable":true}}],"operationType":"ALTER"},"incremental":true,"dataSourceName":"databaseName","ddl":true}
示例如下: