前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >debezium采集MySQL CDC指南

debezium采集MySQL CDC指南

作者头像
从大数据到人工智能
发布2023-10-18 14:48:39
6160
发布2023-10-18 14:48:39
举报
文章被收录于专栏:大数据-BigData

Debezium 是一个开源的分布式平台,用于捕获数据库的变更数据(Change Data Capture,CDC)。它支持多种数据库,包括 MySQL。下面我们详细说一下如何进行配置。

MySQL配置

创建用户

代码语言:javascript
复制
CREATE USER 'debezium_user'@'localhost' IDENTIFIED BY 'Pass-123-debezium_user';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium_user' IDENTIFIED BY 'Pass-123-debezium_user';

flush privileges;

开启binlog

检查binlog是否开启

代码语言:javascript
复制
// for MySql 5.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';

在执行上述命令时如果出现如下报错:

代码语言:javascript
复制
ERROR 3167 (HY000): The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled; see the documentation for 'show_compatibility_56'

请先修改数据库配置,将show_compatibility_56设置为ON

设置完上述配置后,再次执行检查binlog是否开启的SQL,如果为 OFF,请使用以下属性配置 MySQL 服务器配置文件,如下表所述:

代码语言:javascript
复制
server-id         = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id';
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 10

重启MySQL之后,通过再次检查 binlog 状态来确认您的更改:

代码语言:javascript
复制
// for MySql 5.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';

得到:

开启GTIDs

全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。 虽然 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制,并使您能够更轻松地确认主服务器和副本服务器是否一致。

基本步骤:

代码语言:javascript
复制
set GLOBAL gtid_mode=OFF_PERMISSIVE;
set GLOBAL gtid_mode=ON_PERMISSIVE;
set GLOBAL gtid_mode=ON;

set GLOBAL enforce_gtid_consistency=ON;

查看修改:

代码语言:javascript
复制
show global variables like '%GTID%';

得到:

设置Session超时时间

代码语言:javascript
复制
set interactive_timeout=60;
set wait_timeout=60;

开启query log events

代码语言:javascript
复制
set binlog_rows_query_log_events=ON;

查看当前变量值:

代码语言:javascript
复制
show global variables where variable_name = 'binlog_row_value_options';

开始部署

在开始部署之前,确定你已经安装了kafka,并且配置了Debezium MySQL connector的kafka connect已经启动。

kafka安装可参考:

下面说一下kafka connect配置问题。

首先下载kafka二进制包,例如下属例子中,将其下载到/data/app目录下。

代码语言:javascript
复制
cd /data/app && wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar zxvf kafka_2.12-3.3.1.tgz
ln -s kafka_2.12-3.3.1 kafka

下载MySQL connector plug-in.

代码语言:javascript
复制
# 新建plugin目录
cd kafka && mkdir plugins
cd plugins && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
tar zxvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz

修改配置,设置kafka plugin目录

代码语言:javascript
复制
vim /data/app/kafka/config/connect-distributed.properties

# 设置

plugin.path=/data/app/kafka/plugins

接下来便可以启动kafka connect

代码语言:javascript
复制
bin/connect-distributed.sh config/connect-distributed.properties 

kafka connect默认启动的端口为8083

创建MySQL同步任务

在mysql中新建products 表

代码语言:javascript
复制
create database if not exists inventory;
CREATE TABLE IF NOT EXISTS inventory.products (
 id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
 name VARCHAR(255) NOT NULL,
 description VARCHAR(512),
 weight FLOAT
);

插入数据:

代码语言:javascript
复制
insert inventory.products values(1, 'tom', 'tall', 1.8);

创建同步任务

代码语言:javascript
复制
{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "192.168.74.129", 
        "database.port": "3306", 
        "database.user": "debezium_user", 
        "database.password": "Pass-123-debezium_user", 
        "database.server.id": "223344", 
        "database.server.name": "fullfillment", 
        "database.include.list": "inventory", 
        "database.history.kafka.bootstrap.servers": "kafka:30092", 
        "database.history.kafka.topic": "dbhistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}

可以看到kafka connect控制台输出:

kafka中查看数据

相关DDL

0 0 投票数

文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2345732

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-07-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MySQL配置
    • 创建用户
      • 开启binlog
        • 开启GTIDs
          • 设置Session超时时间
            • 开启query log events
            • 开始部署
              • 创建MySQL同步任务
                • kafka中查看数据
                相关产品与服务
                云数据库 MySQL
                腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档