Debezium 是一个开源的分布式平台,用于捕获数据库的变更数据(Change Data Capture,CDC)。它支持多种数据库,包括 MySQL。下面我们详细说一下如何进行配置。
CREATE USER 'debezium_user'@'localhost' IDENTIFIED BY 'Pass-123-debezium_user';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium_user' IDENTIFIED BY 'Pass-123-debezium_user';
flush privileges;
检查binlog是否开启
// for MySql 5.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';
在执行上述命令时如果出现如下报错:
ERROR 3167 (HY000): The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled; see the documentation for 'show_compatibility_56'
请先修改数据库配置,将show_compatibility_56设置为ON
设置完上述配置后,再次执行检查binlog是否开启的SQL,如果为 OFF,请使用以下属性配置 MySQL 服务器配置文件,如下表所述:
server-id = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id';
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
重启MySQL之后,通过再次检查 binlog 状态来确认您的更改:
// for MySql 5.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';
得到:
全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。 虽然 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制,并使您能够更轻松地确认主服务器和副本服务器是否一致。
基本步骤:
set GLOBAL gtid_mode=OFF_PERMISSIVE;
set GLOBAL gtid_mode=ON_PERMISSIVE;
set GLOBAL gtid_mode=ON;
set GLOBAL enforce_gtid_consistency=ON;
查看修改:
show global variables like '%GTID%';
得到:
set interactive_timeout=60;
set wait_timeout=60;
set binlog_rows_query_log_events=ON;
查看当前变量值:
show global variables where variable_name = 'binlog_row_value_options';
在开始部署之前,确定你已经安装了kafka,并且配置了Debezium MySQL connector的kafka connect已经启动。
kafka安装可参考:
下面说一下kafka connect配置问题。
首先下载kafka二进制包,例如下属例子中,将其下载到/data/app目录下。
cd /data/app && wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar zxvf kafka_2.12-3.3.1.tgz
ln -s kafka_2.12-3.3.1 kafka
# 新建plugin目录
cd kafka && mkdir plugins
cd plugins && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
tar zxvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
修改配置,设置kafka plugin目录
vim /data/app/kafka/config/connect-distributed.properties
# 设置
plugin.path=/data/app/kafka/plugins
接下来便可以启动kafka connect
bin/connect-distributed.sh config/connect-distributed.properties
kafka connect默认启动的端口为8083
在mysql中新建products 表
create database if not exists inventory;
CREATE TABLE IF NOT EXISTS inventory.products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
插入数据:
insert inventory.products values(1, 'tom', 'tall', 1.8);
创建同步任务
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.74.129",
"database.port": "3306",
"database.user": "debezium_user",
"database.password": "Pass-123-debezium_user",
"database.server.id": "223344",
"database.server.name": "fullfillment",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:30092",
"database.history.kafka.topic": "dbhistory.fullfillment",
"include.schema.changes": "true"
}
}
可以看到kafka connect控制台输出:
相关DDL
0 0 投票数
文章评分
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。