首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka与Debezium连接

Debezium 是一个开源的分布式平台,用于捕获数据库的变更数据(CDC,Change Data Capture),并将这些变更数据流式传输到 Kafka 等消息系统中。Debezium 支持多种数据库,包括 MySQL、PostgreSQL、MongoDB 等。

以下是如何将 Kafka 与 Debezium 连接起来的步骤:

1. 安装 Kafka

首先,你需要安装并启动 Kafka。如果你还没有安装 Kafka,可以参考以下步骤:

  1. 下载 Kafka: wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz tar -xzf kafka_2.13-3.0.0.tgz cd kafka_2.13-3.0.0
  2. 启动 ZooKeeper: bin/zookeeper-server-start.sh config/zookeeper.properties
  3. 启动 Kafka: bin/kafka-server-start.sh config/server.properties

2. 安装 Debezium Connector

Debezium 连接器可以作为 Kafka Connect 的插件运行。你需要下载并安装适用于你数据库的 Debezium 连接器。

  1. 下载 Debezium 连接器: wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.0.Final/debezium-connector-mysql-1.7.0.Final-plugin.tar.gz tar -xzf debezium-connector-mysql-1.7.0.Final-plugin.tar.gz
  2. 将连接器放到 Kafka Connect 的插件目录: mkdir -p kafka_2.13-3.0.0/plugins/debezium-connector-mysql mv debezium-connector-mysql/* kafka_2.13-3.0.0/plugins/debezium-connector-mysql/

3. 配置 Kafka Connect

Kafka Connect 是一个用于数据流的框架,支持将数据源连接到 Kafka。你需要配置 Kafka Connect 来使用 Debezium 连接器。

  1. 创建 Kafka Connect 配置文件 connect-distributed.properties: bootstrap.servers=localhost:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-status plugin.path=plugins
  2. 启动 Kafka Connect: bin/connect-distributed.sh config/connect-distributed.properties

4. 配置 Debezium 连接器

你需要创建一个 JSON 文件来配置 Debezium 连接器。以下是一个 MySQL 连接器的示例配置:

  1. 创建 register-mysql.json 文件: { "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
  2. 注册连接器: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-mysql.json

5. 验证连接器

你可以通过 Kafka 消费者来验证连接器是否正常工作。

  1. 启动 Kafka 消费者: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.customers --from-beginning

你应该能够看到来自 MySQL 数据库的变更数据流式传输到 Kafka 主题中。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Debezium教程翻译02:启动Docker,Debezium,Zookeeper,Kafka

使用Docker运行Debezium 运行Debezium涉及三个主要服务:Zookeeper、KafkaDebezium连接器服务。...debezium/kafka:0.8 提示 在本教程中,我们总是在Docker容器中连接Kafka,只要我们连接Kafka容器,就总是能够看到Kafka容器并与之通信。...如果我们想从Docker容器外部连接Kafka,那么我们就会希望Kafka通过Docker的主机地址告知外界,我们可以通过添加 -e ADVERTISED_HOST_NAME= 紧随其后的是在Linux...我们使用debezium/kafka镜像的0.8版本运行一个新的容器,并将kafka名称分配给这个容器。"...该命令将容器中的端口9092映射到Docker主机上的相同端口,以便容器外部的软件可以Kafka通信。

1.4K10

基于 Kafka Debezium 构建实时数据同步

支持; Snapshot Mode 可以将表中的现有数据全部导入 Kafka,并且全量数据增量数据形式一致,可以统一处理; 利用了 Kafka 的 Log Compaction 特性,变更数据可以实现...”不过期”永久保存; 利用了 Kafka Connect,自动拥有高可用开箱即用的调度接口; 社区活跃:Debezium 很年轻,面世不到1年,但它的 Gitter上每天都有百余条技术讨论,并且有两位...Redhat 全职工程师进行维护; 最终我们选择了 Debezium + Kafka 作为整套架构的基础组件,并以 Apache Avro 作为统一数据格式,下面我们将结合各个模块的目标设计阐释选型动机...这时,Debezium 独特的 Snapshot 功能就能帮上忙,它可以实现将现有数据作为一次”插入变更”捕捉到 Kafka 中,因此只要编写一次客户端就能一并处理全量数据后续的增量数据。...其中有一些上面没有涉及的点:我们使用 Kafka 的 MirrorMaker 解决了跨数据中心问题,使用 Kafka Connect 集群运行 Debezium 任务实现了高可用调度能力。

2.4K30
  • Kafka 连接器使用开发

    Kafka 连接器介绍 Kafka 连接器通常用来构建数据管道,一般有两种使用场景: 开始和结束的端点:例如,将 Kafka 中的数据导出到 HBase 数据库,或者把 Oracle 数据库中的数据导入...Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。 Kafka 连接器分为两种: Source 连接器:负责将数据导入 Kafka。...Sink 连接器:负责将数据从 Kafka 系统中导出。 连接器作为 Kafka 的一部分,是随着 Kafka 系统一起发布的,无须独立安装。...Kafka 连接器特性 Kafka 连接器包含以下特性: 1.是一种处理数据的通用框架,Kafka 连接器指定了一种标准,用来约束 Kafka 与其他系统的集成,简化了 Kafka 连接器的开发、部署和管理过程..."stdout" : filename; } } 打包部署 将编写好的连接器代码打成 JAR 包,放在每台 Kafka 的 libs目录下,然后重启 Kafka 集群 和 分布式模式连接器。

    2.3K30

    「首席看架构」CDC (捕获数据变化) Debezium 介绍

    Debezium构建在Apache Kafka之上,并提供Kafka连接兼容的连接器来监视特定的数据库管理系统。DebeziumKafka日志中记录数据更改的历史,您的应用程序将从这里使用它们。...Debezium架构 最常见的是,Debezium是通过Apache Kafka连接部署的。...Kafka Connect是一个用于实现和操作的框架和运行时 源连接器,如Debezium,它将数据摄取到Kafka和 接收连接器,它将数据从Kafka主题传播到其他系统。...如果需要,可以在Debezium的主题路由SMT的帮助下调整主题名称,例如,使用捕获的表名不同的主题名称,或者将多个表的更改转换为单个主题。...Debezium特性 Debezium是Apache Kafka Connect的一组源连接器,使用change data capture (CDC)从不同的数据库中获取更改。

    2.5K20

    Debezium使用指南

    Debezium是构建于Kafka之上的,将捕获的数据实时的采集到Kafka上 图片 Debezium监控MySQL 监控MySQL的前提是MySQL需要开启binlog日志哦 MySQL开启binlog.../debezium-connector-mysql 启动kafka connector 启动之前记得把debezium MySQL connector里面的jar包拷贝到kafka的libs目录下 /...注册连接器的方式也比较简单,kafka连接器发送post请求将配置信息放到请求体就可以了。...图片 Debezium Oracle Connector 的快照模式 snapshot.mode snapshot.mode 支持的参数配置,这个参数只在连接器在第一次启动时起作用 参数值 描述 initial...(默认) 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。

    3.3K31

    Flink + Debezium CDC 实现原理及代码实战

    Debezium 构建在 Apache Kafka 之上,并提供 Kafka 连接器来监视特定的数据库。在介绍 Debezium 之前,我们要先了解一下什么是 Kafka Connect。...在上图中,中间的部分是 Kafka Broker,而 Kafka Connect 是单独的服务,需要下载 debezium-connector-mysql 连接器,解压到服务器指定的地方,然后在 connect-distribute.properties...中指定连接器的根路径,即可使用。...Debezium Server ? 这种模式中,需要配置不同的连接器,从源头处捕获数据的变化,序列化成指定的格式,发送到指定的系统中。...6 注册一个 Connector 去检测 mysql 数据库的变化 注册的话,需要往 Kafka Connect 的 rest api 发送一个 Post 请求,请求内容如下 其中: 1 是连接器的名字

    7.4K31

    基于Apache Hudi和Debezium构建CDC入湖管道

    Hudi 独特地提供了 Merge-On-Read[8] 写入器,使用 Spark 或 Flink 的典型数据湖写入器相比,该写入器可以显着降低摄取延迟[9]。...Apicurio)和 Debezium 连接器组成,Debezium 连接器不断轮询数据库中的更改日志,并将每个数据库行的更改写入 AVRO 消息到每个表的专用 Kafka 主题。...连接器 Strimzi[18] 是在 Kubernetes 集群上部署和管理 Kafka 连接器的推荐选项,或者可以选择使用 Confluent 托管的 Debezium 连接器[19]。...连接器的 Dockerfile 构建 docker 映像 debezium-kafka-connect FROM confluentinc/cp-kafka-connect:6.2.0 as cp RUN.../lib /opt/kafka/plugins/avro/ USER 1001 一旦部署了 Strimzi 运算符和 Kafka 连接器,我们就可以启动 Debezium 连接器。

    2.2K20

    使用 KafkaDebezium 和 Kubernetes 实现应用现代化的模式

    好消息是 Debezium 有针对不同数据库的连接器,所以它为我们完成了理解所有这些日志文件格式的艰巨工作。...Debezium 可以读取日志文件,并产生一个通用的抽象事件到消息系统中,如 Apache Kafka,其中会包含数据的变化。图 5 显示了 Debezium 连接器是如何作为各种数据库的接口的。...图 5:微服务架构中的 Debezium 连接Debezium 是使用最广泛的开源变更数据捕获(change data capture,CDC)项目,其多种连接器和特性使它非常适合 Strangler...组合使用 Apache KafkaDebezium:有很多证据可以表明,在进行应用程序的迁移和现代化的过程中,Debezium 和 Apache Kafka 能够很好地进行协作。...我们还可以将编排异步通信相结合,将协调服务参与服务的可用性,甚至 Kafka 的可用性解耦。这给了我们两全其美的结果:编排式以及参与服务之间异步、非阻塞、并行的通信,没有时间上的耦合。

    61020

    数据同步工具之FlinkCDCCanalDebezium对比

    Debezium最初设计成一个Kafka Connect 的Source Plugin,目前开发者虽致力于将其Kafka Connect解耦,但当前的代码实现还未变动。...扫描所有数据库的表,并且为每一个表产生一个和特定表相关的kafka topic创建事件(即为每一个表创建一个kafka topic)。 提交事务。 记录连接器成功完成快照任务时的连接器偏移量。...每个 Debezium Connector 都会与其源数据库建立连接: MySQL Connector 使用客户端库来访问 binlog。...除了 Kafka Broker 之外,Kafka Connect 也作为一个单独的服务运行。默认情况下,数据库表的变更会写入名称表名称对应的 Kafka Topic 中。...例如,您可以: 将记录路由到名称表名不同的 Topic 中 将多个表的变更事件记录流式传输到一个 Topic 中 变更事件记录在 Apache Kafka 中后,Kafka Connect 生态系统中的不同

    7.5K51
    领券