前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >从 MySQL 到 ClickHouse 实时数据同步 —— Debezium + Kafka 表引擎

从 MySQL 到 ClickHouse 实时数据同步 —— Debezium + Kafka 表引擎

作者头像
用户1148526
发布于 2024-04-26 02:47:04
发布于 2024-04-26 02:47:04
1.8K00
代码可运行
举报
文章被收录于专栏:Hadoop数据仓库Hadoop数据仓库
运行总次数:0
代码可运行

本文介绍从 MySQL 作为源到 ClickHouse 作为目标的整个过程。MySQL 数据库更改通过 Debezium 捕获,并作为事件发布在到 Kafka 上。ClickHouse 通过 Kafka 表引擎按部分顺序应用这些更改,实时并保持最终一致性。相关软件版本如下:

  • MySQL:8.0.16
  • ClickHouse:24.1.8
  • JDK:11.0.22
  • zookeeper:3.9.1
  • Kafka:3.7.0
  • debezium-connector-mysql:2.4.2

这种方案的优点之一是可以做到 ClickHouse 与 MySQL 的数据最终严格一致。

一、总体架构

代码语言:txt
AI代码解释
复制
    总体结构如下图所示。
代码语言:txt
AI代码解释
复制
    ClickHouse 是由四个实例构成的两分片、每分片两副本集群,票选和协调器使用 ClickHouse 自带的 keeper 组件。分片、副本、keeper 节点、Zookeeper集群、Kafaka集群、Debezium-Connector-MySQL 插件的部署如下表所示。

IP

主机名

实例角色

ClickHouse Keeper

Zookeeper

Kafka

Debezium Connector MySQL

172.18.4.126

node1

分片1副本1

*

172.18.4.188

node2

分片1副本2

*

*

*

*

172.18.4.71

node3

分片2副本1

*

*

*

*

172.18.4.86

node4

分片2副本2

*

*

*

二、安装配置 MySQL 主从复制

代码语言:txt
AI代码解释
复制
    配置好主从复制后,在主库创建测试库表及数据:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 建库
create database test;

-- 建表
create table test.t1 (
  id bigint(20) not null auto_increment,
  remark varchar(32) default null comment '备注',
  createtime timestamp not null default current_timestamp comment '创建时间',
  primary key (id));

-- 插入三条测试数据
insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
commit;

三、安装配置 ClickHouse 集群

四、安装 JDK

五、安装配置 Zookeeper 集群

六、安装配置 Kafaka 集群

七、安装配置 Debezium-Connector-MySQL 插件

代码语言:txt
AI代码解释
复制
    在 node2 上执行以下步骤。

1. 创建插件目录

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
mkdir $KAFKA_HOME/plugins

2. 解压文件到插件目录

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
cd ~
# debezium-connector-mysql
unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/

3. 配置 Kafka Connector

(1)配置属性文件
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 先备份
cp $KAFKA_HOME/config/connect-distributed.properties $KAFKA_HOME/config/connect-distributed.properties.bak
# 编辑 connect-distributed.properties 文件
vim $KAFKA_HOME/config/connect-distributed.properties
代码语言:txt
AI代码解释
复制
    内容如下:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bootstrap.servers=node2:9092,node3:9092,node4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
plugin.path=/root/kafka_2.13-3.7.0/plugins
(2)分发到其它节点
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/
(3)以 distributed 方式启动 Kafka connect
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties 
# 确认日志是否有 ERROR
grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out
(4)确认 connector 插件和自动生成的 topic
代码语言:txt
AI代码解释
复制
    查看连接器插件:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
curl -X GET http://node2:8083/connector-plugins | jq
代码语言:txt
AI代码解释
复制
    从输出中可以看到,Kafka connect 已经识别到了 MySqlConnector source 插件:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   403  100   403    0     0   3820      0 --:--:-- --:--:-- --:--:--  3838
[
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "2.4.2.Final"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.7.0"
  }
]
[root@vvml-yz-hbase-test~]#
代码语言:txt
AI代码解释
复制
    查看 topic:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
代码语言:txt
AI代码解释
复制
    从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
[root@vvml-yz-hbase-test~]#

4. 创建 source connector

(1)Debezium 三个必要的配置说明
代码语言:txt
AI代码解释
复制
    Debezium 是一个众所周知的用于读取和解析 MySQL Binlog 的工具。它将 KafkaConnect 作为一个连接器进行集成,并对 Kafka 主题进行每一次更改。
  • 只记录后状态
代码语言:txt
AI代码解释
复制
    默认情况下,Debezium 会向 Kafka 发出每个操作的前状态和后状态的每条记录,这很难被 ClickHouse Kafka 表解析。此外,在执行删除操作的情况下(Clickhouse 同样无法解析),它会创建 tombstone 记录,即具有 Null 值的记录。下表展示了这个行为。

操作

操作前

操作后

附加记录

Create

Null

新纪录

-

Update

更新前的记录

更新后的记录

-

Delete

删除前的记录

Null

墓碑记录

代码语言:txt
AI代码解释
复制
    在 Debezium 配置中使用 ExtractNewRecod 转换器来处理此问题。由于有了这个选项,Debezium 只为创建/更新操作保留 after 状态,而忽略 before 状态。但缺点是,它删除了包含先前状态的 Delete 记录和墓碑记录,换句话说就是不再捕获删除操作。紧接着说明如何解决这个问题。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  • 重写删除事件
代码语言:txt
AI代码解释
复制
    要捕获删除操作,必须添加如下所示的重写配置:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
"transforms.unwrap.delete.handling.mode":"rewrite"
代码语言:txt
AI代码解释
复制
    Debezium 使用此配置添加字段 \_\_deleted,对于 delete 操作为 true,对于其他操作为 false。因此,删除将包含以前的状态以及 \_\_deleted:true 字段。
  • 处理非主键更新
代码语言:txt
AI代码解释
复制
    在提供上述配置的情况下,更新记录(主键除外的每一列)会发出一个具有新状态的简单记录。通常在关系数据库系统中,更新后的记录会替换前一个记录,但在 ClickHouse 不行。出于性能考虑,ClickHouse 将行级更新变为多版本插入。在本示例中,MySQL 中的 test.t1 表以 id 列为主键,如果更新了 remark 列,在 ClikHouse 中,最终会得到重复的记录,这意味着 id 相同,但 remark 不同!
代码语言:txt
AI代码解释
复制
    幸运的是有办法应付这种情况。默认情况下,Debezium 会创建一个删除记录和一个创建记录,用于更新主键。因此,如果源更新 id,它会发出一个带有前一个 id 的删除记录和一个带有新 id 的创建记录。带有 \_\_deleted=ture 字段的前一个记录将替换 CH 中的 stall 记录。然后,可以在视图中过滤暗示删除的记录。可以使用以下选项将此行为扩展到其他列:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
"message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime"

注意:

代码语言:txt
AI代码解释
复制
     通过更改连接器的键列,Debezium 将这些列用作主键,而不是源表的默认主键。因此,与数据库的一条记录相关的不同操作可能最终会出现在 Kafka 中的其他分区。由于记录在不同分区中失去顺序,除非确保 ClickHouse 顺序键和 Debezium 消息键相同,否则可能会导致 Clikchouse 中的数据不一致。

经验法则如下:

  1. 根据想要的表结构来设计分区键和排序键。
  2. 提取分区和排序键的来源,假设它们是在物化过程中计算的。
  3. 合并所有这些列。
  4. 将步骤 3 的结果定义为 Debezium 连接器配置中的 message.column.keys。
  5. 检查 Clickhouse 排序键是否包含所有这些列。如果没有则添加它们。
代码语言:txt
AI代码解释
复制
    现在,通过将上述所有选项和常用选项放在一起,将拥有一个功能齐全的 Debezium 配置,能够处理 ClickHouse 所需的任何更改。
(2)创建源 mysql 配置文件
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 编辑文件
vim $KAFKA_HOME/plugins/source-mysql.json
代码语言:txt
AI代码解释
复制
    内容如下:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{
 "name": "mysql-source-connector",
 "config": {
     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
     "database.hostname": "172.18.16.156",
     "database.port": "3307",
     "database.user": "dba",
     "database.password": "123456",
     "database.server.id": "1563307",
     "database.server.name": "dbserver1",
     "database.include.list": "test",
     "table.include.list": "test.t1",
     "topic.prefix": "mysql-clickhouse-test",
     "schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
     "schema.history.internal.kafka.topic": "schemahistory.mysql-clickhouse-test",
     "message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime",
     "transforms":"unwrap",
     "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
     "transforms.unwrap.delete.handling.mode": "rewrite"
     }
 }
(3)创建 mysql source connector
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
# 查看 topic
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
代码语言:txt
AI代码解释
复制
    从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
HTTP/1.1 201 Created
Date: Thu, 25 Apr 2024 03:47:26 GMT
Location: http://node2:8083/connectors/mysql-source-connector
Content-Type: application/json
Content-Length: 818
Server: Jetty(9.4.53.v20231009)

{"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","table.include.list":"test.t1","topic.prefix":"mysql-clickhouse-test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-clickhouse-test","message.key.columns":"test.t1:id;test.t1:remark;test.t1:createtime","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.delete.handling.mode":"rewrite","name":"mysql-source-connector"},"tasks":[],"type":"source"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   182  100   182    0     0  24045      0 --:--:-- --:--:-- --:--:-- 26000
{
  "name": "mysql-source-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.188:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.18.4.188:8083"
    }
  ],
  "type": "source"
}
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql-clickhouse-test
mysql-clickhouse-test.test.t1
schemahistory.mysql-clickhouse-test
[root@vvml-yz-hbase-test~]#

八、在 ClickHouse 中创建库表、物化视图和视图

代码语言:txt
AI代码解释
复制
    ClickHouse 可以利用 Kafka 表引擎将 Kafka 记录放入一个表中。需要定义三个对象:Kafka 表、主表和消费者物化视图。

1. 建库

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
create database db2 on cluster cluster_2S_2R;

2. 创建 Kafka 表

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE db2.kafka_t1 on cluster cluster_2S_2R
(
    `id` Int64,
    `remark` Nullable(String),
    `createtime` String,
    `__deleted` String
)
ENGINE = Kafka('node2:9092,node3:9092,node4:9092', 'mysql-clickhouse-test.test.t1', 'clickhouse', 'JSONEachRow');

3. 创建主表

代码语言:txt
AI代码解释
复制
    主表具有源结构和 \_\_deleted 字段。这里使用的是 ReplicatedReplacingMergeTree,因为需要用已删除或更新的记录替换 stall 记录。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 创建本地表
CREATE TABLE db2.stream_t1 on cluster cluster_2S_2R
(
    `id` Int64,
    `remark` Nullable(String),
    `createtime` timestamp,
    `__deleted` String
)
ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{shard}/db2/t1',
    '{replica}'
)
ORDER BY (id, createtime)
SETTINGS index_granularity = 8192;

-- 创建分布式表,以源表的主键 id 作为分片键,保证同一 id 的数据落在同一分片上
create table db2.t1_replica_all on cluster cluster_2S_2R
as db2.stream_t1
engine = Distributed(cluster_2S_2R, db2, stream_t1, id);

4. 创建消费者物化视图

代码语言:txt
AI代码解释
复制
    在创建物化视图前,先停止MySQL从库的复制。从库停止复制,不影响主库的正常使用,也就不会影响业务。此时从库的数据处于静止状态,不会产生变化,这使得获取存量数据变得轻而易举。然后创建物化视图时会自动将数据写入 db2.t1\_replica\_all 对应的本地表中。之后在 ClickHouse 集群中的任一实例上,都能从物化视图中查询到一致的 MySQL 存量数据。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- MySQL 从库停止复制
stop slave;
代码语言:txt
AI代码解释
复制
    Kafka 表的每一条记录只读取一次,因为它的消费者组会改变偏移量,不能读取两次。因此,需要定义一个主表,并通过物化视图将每个 Kafka 表记录具化到它:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 注意时间戳的处理
CREATE MATERIALIZED VIEW db2.consumer_t1 on cluster cluster_2S_2R
TO db2.t1_replica_all
(
    `id` Int64,
    `remark` Nullable(String),
    `createtime` timestamp,
    `__deleted` String
) AS
SELECT id, remark, addHours(toDateTime(substring(createtime,1,length(createtime)-1)),8) createtime, __deleted FROM db2.kafka_t1;

5. 创建视图

代码语言:txt
AI代码解释
复制
    最后需要过滤每个被删除的记录,并拥有最新的记录,以防不同的记录具有相同的排序键。可以定义一个简单的视图来隐式完成这项工作:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE VIEW db2.t1 on cluster cluster_2S_2R
(
    `id` Int64,
    `remark` Nullable(String),
    `createtime` String,
    `__deleted` String
) AS
SELECT *
FROM db2.consumer_t1
FINAL
WHERE __deleted = 'false';

6. 验证

代码语言:txt
AI代码解释
复制
    从 clickhouse 视图查询存量数据:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;

SELECT *
FROM db2.t1

Query id: 2a51fd5e-6b4f-4b78-b522-62b7be32535b

┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│  2 │ 第二行:row2 │ 2024-04-25 11:51:07false     │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│  1 │ 第一行:row1 │ 2024-04-25 11:51:07false     │
│  3 │ 第三行:row3 │ 2024-04-25 11:51:07false     │
└────┴──────────────┴─────────────────────┴───────────┘

3 rows in set. Elapsed: 0.007 sec. 

vvml-yz-hbase-test.172.18.4.126 :) 
代码语言:txt
AI代码解释
复制
    可以看到,存量数据已经与 MySQL 同步。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- MySQL 主库修改数据
insert into test.t1 (remark) values ('第四行:row4');
update test.t1 set remark = '第五行:row5' where id = 4;
delete from test.t1 where id =1;
insert into test.t1 (remark) values ('第六行:row6');
 
-- MySQL 从库启动复制
start slave;
代码语言:txt
AI代码解释
复制
    此时 MySQL 的数据如下:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
mysql> select * from test.t1;
+----+------------------+---------------------+
| id | remark           | createtime          |
+----+------------------+---------------------+
|  2 | 第二行:row2     | 2024-04-25 11:51:07 |
|  3 | 第三行:row3     | 2024-04-25 11:51:07 |
|  4 | 第五行:row5     | 2024-04-25 11:56:29 |
|  5 | 第六行:row6     | 2024-04-25 11:56:29 |
+----+------------------+---------------------+
4 rows in set (0.00 sec)
代码语言:txt
AI代码解释
复制
    从 clickhouse 视图查询增量数据:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;

SELECT *
FROM db2.t1

Query id: b34bb37b-091b-490e-b55b-a0e9eedf5573

┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│  2 │ 第二行:row2 │ 2024-04-25 11:51:07false     │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│  4 │ 第五行:row5 │ 2024-04-25 11:56:29false     │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│  3 │ 第三行:row3 │ 2024-04-25 11:51:07false     │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│  5 │ 第六行:row6 │ 2024-04-25 11:56:29false     │
└────┴──────────────┴─────────────────────┴───────────┘

4 rows in set. Elapsed: 0.008 sec. 

vvml-yz-hbase-test.172.18.4.126 :) 
代码语言:txt
AI代码解释
复制
    可以看到,增量数据已经与 MySQL 同步,现在从 ClickHouse 视图查询的数据与 MySQL 一致。
代码语言:txt
AI代码解释
复制
    查看 Kafka 消费:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
代码语言:txt
AI代码解释
复制
    输出如下:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse

GROUP           TOPIC                         PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                  HOST            CLIENT-ID
clickhouse      mysql-clickhouse-test.test.t1 0          8               8               0               ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1-26e6aa8e-1f08-4491-8af7-f1822f1a7e94 /172.18.4.126   ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1
[root@vvml-yz-hbase-test~]#
代码语言:txt
AI代码解释
复制
    可以看到,最后被消费的消息偏移量是8,MySQL 的存量、增量数据都已经通过 Kafka 消息同步到了 ClickHouse。

参考:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:
用户1148526
2024/03/21
5850
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
从 MySQL 到 ClickHouse 实时数据同步 —— MaterializeMySQL + Materialized View
本篇演示使用 ClickHouse 的 MaterializeMySQL 数据库引擎和物化视图,实时将 MySQL 库表中的数据同步到 ClickHouse 的库表中。相关软件版本如下:
用户1148526
2024/04/20
4K0
从 MySQL 到 ClickHouse 实时数据同步 —— MaterializeMySQL + Materialized View
基于 HBase & Phoenix 构建实时数仓(4)—— Kafka 集群安装部署
Kafka 是一个完整的消息系统,常用于实时系统中的消息中转和数据持久化。Kafka 集群安装部署依赖于 Zookeeper,本专栏前面文章介绍了 Zookeeper 安装部署及运行,参见 “安装部署 Zookeeper 集群”。本篇继续介绍在相同主机环境下安装部署 Kafka 集群。
用户1148526
2024/03/12
2350
ClickHouse 集群部署(不需要 Zookeeper)
(1)安装 ClickHouse Server 和 ClickHouse Client
用户1148526
2024/04/18
8.3K2
基于 HBase & Phoenix 构建实时数仓(2)—— HBase 完全分布式安装
完全分布式 HBase 集群的运行依赖于 Zookeeper 和 Hadoop,在前一篇中已经详细介绍了他们的安装部署及运行,参见“基于 HBase & Phoenix 构建实时数仓(1)—— Hadoop HA 安装部署”。本篇继续介绍在相同主机环境下安装配置完全分布式 HBase 集群。
用户1148526
2024/03/09
4420
基于 HBase & Phoenix 构建实时数仓(2)—— HBase 完全分布式安装
基于 HBase & Phoenix 构建实时数仓(1)—— Hadoop HA 安装部署
172.18.4.126 node1 172.18.4.188 node2 172.18.4.71 node3 172.18.4.86 node4
用户1148526
2024/03/08
4470
基于 HBase & Phoenix 构建实时数仓(1)—— Hadoop HA 安装部署
Mysql实时数据变更事件捕获kafka confluent之debezium
如果你的后端应用数据存储使用的MySQL,项目中如果有这样的业务场景你会怎么做呢?
XING辋
2019/03/26
3.5K0
Mysql实时数据变更事件捕获kafka confluent之debezium
湖仓一体电商项目(三):3万字带你从头开始搭建12个大数据项目基础组件
​上篇已经大概讲述大数据组件版本和集群矩阵配置说明,有不清楚的同学,可以阅读上一篇
Lansonli
2022/07/31
1.3K0
湖仓一体电商项目(三):3万字带你从头开始搭建12个大数据项目基础组件
大数据Flink进阶(九):集群基础环境搭建
Flink可以运行在所有类unix环境中,例如:Linux,Mac OS 和Windows,一般企业中使用Flink基于的都是Linux环境,后期我们进行Flink搭建和其他框架整合也是基于linux环境,使用的是Centos7.6版本,JDK使用JDK8版本(Hive版本不支持JDK11,所以这里选择JDK8),本小节主要针对Flink集群使用到的基础环境进行配置,不再从零搭建Centos系统,另外对后续整合使用到的技术框架也一并进行搭建,如果你目前已经有对应的基础环境,可以忽略本小节,Linux及各个搭建组件使用版本如下表所示。
Lansonli
2023/03/30
1.3K0
大数据Flink进阶(九):集群基础环境搭建
大数据ClickHouse(十四):Integration系列表引擎
ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。
Lansonli
2022/08/30
7170
大数据ClickHouse(十四):Integration系列表引擎
kafka 连接器实现 Mysql 数据同步 Elasticsearch
Mysql 作为传统的关系型数据库,主要面向 OLTP,性能优异,支持事务,但是在一些全文检索,复杂查询上面并不快。Elasticsearch 底层基于 Lucense 实现,天然分布式,采用倒排索引存储数据,全文检索效率很高,使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。
Se7en258
2021/05/18
2.7K0
kafka 连接器实现 Mysql 数据同步 Elasticsearch
基于 HBase & Phoenix 构建实时数仓(3)—— Phoenix 安装
继续上一篇,本篇介绍在同一环境中安装 Phoenix,并连接上篇部署的 HBase 集群。
用户1148526
2024/03/09
5170
Kafka——分布式的消息队列
Producers – 生产者 生产者将数据发布到他们选择的主题。生产者负责选择要分配给主题中哪个分区的消息 可以以循环方式完成此操作,仅是为了平衡负载,也可以根据某些语义分区功能(例如基于消息中的某些键)进行此操作。
时间静止不是简史
2020/07/24
1.4K0
Streaming Data Changes from MySQL to Elasticsearch
MySQL Binary Log包含了针对数据库执行DDL(Data Definition Language)和DML(Data Manipulation Language)操作的完整事件,其被广泛应用于数据复制和数据恢复场景。本文所分享的就是一种基于MySQL Binary Log特性实现增量数据近实时同步到Elasticsearch的一种技术。要想实现增量数据的同步,仅仅有binary log是不够的,我们还需要一款变更数据捕获(CDC,Change Data Capture)工具,可能大家很快就会想到阿里巴巴开源的Canal。没错,但本文今天给大家分享一款新的开源工具:Debezium。Debezium构建于Kafka之上,它为MySQL、MongoDB、PostgreSQL、Orcale和Cassandra等一众数据库量身打造了一套完全适配于Kafka Connect的source connector。首先,source connector会实时获取由INSERT、UPDATE和DELETE操作所触发的数据变更事件;然后,将其发送到Kafka topic中;最后,我们使用sink connector将topic中的数据变更事件同步到Elasticsearch中去,从而最终实现数据的近实时流转,如下图所示。
程序猿杜小头
2022/12/01
1.6K0
Streaming Data Changes from MySQL to Elasticsearch
大数据Kafka(三):Kafka的集群搭建以及shell启动命令脚本编写
为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
Lansonli
2021/10/11
3.5K0
大数据Kafka(三):Kafka的集群搭建以及shell启动命令脚本编写
Debezium kafka connector 运行报错1236
今天发现stag环境kafka connector运行报错( curl 172.18.1.1:8083/connectors/order-center-connector/status)
XING辋
2019/07/19
2.2K0
数据同步工具之FlinkCDC/Canal/Debezium对比
数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的替代。本文把市面上常见的几种开源产品,Canal、Debezium、Flink CDC 从原理和适用做了对比,供大家参考。
王知无-import_bigdata
2021/10/27
13.9K0
ClickHouse 实战笔记 第01期:Kafka 数据同步到 ClickHouse
这一期首先聊聊 Kafka 数据同步到 ClickHouse 的其中一个方案:通过 Kafka 引擎方式同步,下面进入实际操作过程(环境:CentOS7.4):
数据库交流
2022/04/25
3.3K0
ClickHouse 实战笔记 第01期:Kafka 数据同步到 ClickHouse
Debezium结合kafka connect实时捕获mysql变更事件写入elasticsearch实现搜索流程
本文将会实现一套完整的Debezium结合Kafka Connect实时捕获MySQL变更事件写入Elasticsearch并实现查询的流程.
XING辋
2019/03/26
7.5K4
Debezium结合kafka connect实时捕获mysql变更事件写入elasticsearch实现搜索流程
Spark Streaming + Canal + Kafka打造Mysql增量数据实时进行监测分析
Spark中的Spark Streaming可以用于实时流项目的开发,实时流项目的数据源除了可以来源于日志、文件、网络端口等,常常也有这种需求,那就是实时分析处理MySQL中的增量数据。
王知无-import_bigdata
2021/04/21
1.6K0
Spark Streaming + Canal + Kafka打造Mysql增量数据实时进行监测分析
推荐阅读
相关推荐
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
更多 >
LV.1
这个人很懒,什么都没有留下~
目录
  • 一、总体架构
  • 二、安装配置 MySQL 主从复制
  • 三、安装配置 ClickHouse 集群
  • 四、安装 JDK
  • 五、安装配置 Zookeeper 集群
  • 六、安装配置 Kafaka 集群
  • 七、安装配置 Debezium-Connector-MySQL 插件
    • 1. 创建插件目录
    • 2. 解压文件到插件目录
    • 3. 配置 Kafka Connector
      • (1)配置属性文件
      • (2)分发到其它节点
      • (3)以 distributed 方式启动 Kafka connect
      • (4)确认 connector 插件和自动生成的 topic
    • 4. 创建 source connector
      • (1)Debezium 三个必要的配置说明
      • (2)创建源 mysql 配置文件
      • (3)创建 mysql source connector
  • 八、在 ClickHouse 中创建库表、物化视图和视图
    • 1. 建库
    • 2. 创建 Kafka 表
    • 3. 创建主表
    • 4. 创建消费者物化视图
    • 5. 创建视图
    • 6. 验证
  • 参考:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档