首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Debezium,Kafka-connect : postgres的更新没有显示为消息,只有插入显示

Debezium 和 Kafka Connect 是用于捕获和传输数据库更改事件的工具。如果你在使用 Debezium 和 Kafka Connect 时遇到 PostgreSQL 的更新操作没有生成消息,只有插入操作生成了消息,可能是以下几个原因:

基础概念

Debezium 是一个开源的分布式平台,用于捕获数据库更改数据并将其流式传输到 Kafka。Kafka Connect 是一个用于在 Kafka 和其他系统之间可扩展且可靠地传输数据的工具。

可能的原因

  1. 配置问题:Debezium 的配置可能没有正确设置以捕获更新事件。
  2. PostgreSQL 日志配置:PostgreSQL 的日志配置可能没有启用或配置不正确,导致更新操作没有被记录。
  3. Debezium 连接器配置:Kafka Connect 中的 Debezium 连接器配置可能没有正确设置以捕获更新事件。
  4. 数据库权限:Debezium 连接器使用的数据库用户可能没有足够的权限来捕获更新事件。

解决方法

  1. 检查 Debezium 配置: 确保 Debezium 连接器的配置文件中包含了捕获更新事件的设置。例如:
  2. 检查 Debezium 配置: 确保 Debezium 连接器的配置文件中包含了捕获更新事件的设置。例如:
  3. 检查 PostgreSQL 日志配置: 确保 PostgreSQL 的日志配置文件(通常是 postgresql.conf)中启用了日志记录,并且包含了更新操作的日志记录。例如:
  4. 检查 PostgreSQL 日志配置: 确保 PostgreSQL 的日志配置文件(通常是 postgresql.conf)中启用了日志记录,并且包含了更新操作的日志记录。例如:
  5. 检查数据库权限: 确保 Debezium 连接器使用的数据库用户具有足够的权限来捕获更新事件。例如:
  6. 检查数据库权限: 确保 Debezium 连接器使用的数据库用户具有足够的权限来捕获更新事件。例如:
  7. 检查 Kafka Connect 日志: 查看 Kafka Connect 的日志文件,确保没有错误信息提示为什么更新事件没有被捕获。

参考链接

通过以上步骤,你应该能够找到并解决 PostgreSQL 更新操作没有生成消息的问题。如果问题仍然存在,建议查看相关组件的详细日志,以便进一步诊断问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

使用Kafka和ksqlDB构建和部署实时流处理ETL引擎

如果选择选项2,我们可以预见用例的一些问题;如果Elasticsearch确认更新较慢,可能会减慢我们的应用程序的速度,或者在出现不一致的情况下,我们如何重试插入一个事件或一组事件?...Apache Kafka:Kafka是Confluent平台的核心。它是一个基于开源的分布式事件流平台。这将是我们数据库事件(插入,更新和删除)的主要存储区域。...Kafka Connect:我们使用Kafka-connect从Debezium的Postgres连接器将数据提取到Kafka中,该连接器从Postgres WAL文件中获取事件。...有计划在没有ZooKeeper的情况下运行Kafka,但是目前,这是管理集群的必要条件。...;→即使有任何架构更新,我们的流也应该可以正常工作;→再次进行连接,以说明基础数据源或接收器的密码或版本更改。

2.7K20

存储 2000 亿个实体:Notion 的数据湖项目

• Notion 的更新密集型数据块数据的数据摄取和计算应该是快速的、可扩展的和具有成本效益的。 • 支持非规范化数据,这些数据可以解锁 AI 和搜索等关键功能。 下图显示了新数据湖的高级设计。...该过程的工作原理如下: • 使用 Debezium CDC 连接器将增量更新的数据从 Postgres 提取到 Kafka。...但是,Hudi 在 Notion 的更新繁重工作负载以及与 Debezium CDC 消息的本机集成中提供了更好的性能。...1 - CDC 连接器和 Kafka 他们为每个 Postgres 主机设置了一个 Debezium CDC 连接器,并将其部署在 AWS EKS 集群中。...作为参考,下图显示了使用 Debezium 和 Kafka 的 CDC 如何在高级别上工作。

13910
  • 基于Apache Hudi和Debezium构建CDC入湖管道

    总体设计 上面显示了使用 Apache Hudi 的端到端 CDC 摄取流的架构,第一个组件是 Debezium 部署,它由 Kafka 集群、schema registry(Confluent 或...Apicurio)和 Debezium 连接器组成,Debezium 连接器不断轮询数据库中的更改日志,并将每个数据库行的更改写入 AVRO 消息到每个表的专用 Kafka 主题。...第二个组件是 Hudi Deltastreamer[11],它为每个表从 Kafka 读取和处理传入的 Debezium 记录,并在云存储上的 Hudi 表中写入(更新)相应的行。...•记录键 - 表的 Hudi 记录键[15]应设置为上游数据库中表的主键。这可确保正确应用更新,因为记录键唯一地标识 Hudi 表中的一行。...下面显示了一个这样的命令实例,它适用于 Postgres 数据库。几个关键配置如下: •将源类设置为 PostgresDebeziumSource。

    2.2K20

    Robinhood基于Apache Hudi的下一代数据湖实践

    即使对于一个有数十亿行的表来说,一天只有几十万行的变化,摄取该表的完整快照也会导致读取和写入整个表。...Debezium 是一个构建在 Kafka Connect 之上的开源分布式变更数据捕获平台,Debezium 带有一个经过充分证明的一流 Postgres CDC 连接器。...Kafka 集成和一次性写入功能,与不可变数据不同,我们的 CDC 数据有相当大比例的更新和删除,Hudi Deltastreamer 利用其可插入的记录级索引在 Data Lake 表上执行快速高效的...如果 Debezium 卡住或无法跟上消耗 WAL 日志的速度,这可能会导致 WAL 日志文件累积并耗尽可用磁盘空间,Debezium 社区建议密切监视滞后消息,我们的 Debezium 负载测试也让我们对...管理 Postgres 模式更新 我们的业务是将表从在线 OLTP 世界复制到 Data Lake 世界,复制的数据不是不透明的,而是具有适当的模式,并且复制管道保证了将在线表模式转换为数据湖的模式的明确定义的行为

    1.4K20

    降本增效!Notion数据湖构建和扩展之路

    Notion 用户更新现有块(文本、标题、标题、项目符号列表、数据库行等)的频率远远高于添加新块的频率。这导致块数据主要是更新量大的 ~90% 的 Notion 更新插入是更新。...我们使用 Debezium CDC 连接器将增量更新的数据从 Postgres 摄取到 Kafka,然后使用 Apache Hudi(一个开源数据处理和存储框架)将这些更新从 Kafka 写入 S3。...最后我们选择了 Hudi,因为它具有出色的性能,可以处理大量更新的工作负载,并且具有开源特性以及与 Debezium CDC 消息的原生集成。...另一方面,当我们在 2022 年考虑 Iceberg 和 Delta Lake 时,它们并没有针对我们的更新繁重工作负载进行优化。...Iceberg 还缺乏一个能够理解 Debezium 消息的开箱即用的解决方案;Delta Lake 有一个但并不开源。

    14310

    十行代码构建基于 CDC 的实时更新物化视图

    全量更新 全量更新策略在每次更新时都会清除物化视图中现有的所有数据,并将最新的查询结果集重新插入。...这个过程可以理解为执行了 TRUNCATE TABLE 和 INSERT INTO SELECT 的组合操作。全量更新虽然简单直接,但在大数据量或高频更新的场景下,其效率和资源消耗可能成为一个问题。...虽然 Snowflake 没有提供 On Commit Refresh 的功能,但它可以通过自动刷新实现接近实时的数据更新。...:8083"}, "tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"source"} 如果连接器状态显示为...MySQL 数据库中的每次更改(插入、更新、删除)都会被 Debezium MySQL Connector 捕获并发送至 Kafka Broker。

    12010

    「首席看架构」CDC (捕获数据变化) Debezium 介绍

    下图显示了一个基于Debezium的CDC管道的架构: ? 除了Kafka代理本身之外,Kafka Connect是作为一个单独的服务来操作的。...部署了用于MySQL和Postgres的Debezium连接器来捕获这两个数据库的更改。...与其他方法如轮询或双写不同,基于日志的CDC由Debezium实现: 确保捕获所有数据更改 以非常低的延迟(例如,MySQL或Postgres的ms范围)生成更改事件,同时避免增加频繁轮询的CPU使用量...不需要更改数据模型(如“最后更新”列) 可以捕获删除 可以捕获旧记录状态和其他元数据,如事务id和引发查询(取决于数据库的功能和配置) 要了解更多关于基于日志的CDC的优点,请参阅本文。...不同的即时消息转换:例如,用于消息路由、提取新记录状态(关系连接器、MongoDB)和从事务性发件箱表中路由事件 有关所有受支持的数据库的列表,以及关于每个连接器的功能和配置选项的详细信息,请参阅连接器文档

    2.6K20

    Flink CDC 原理、实践和优化

    的数据流)看做是同一事物的两面,因此内部提供的 Upsert 消息结构(+I 表示新增、-U 表示记录更新前的值、+U 表示记录更新后的值,-D 表示删除)可以与 Debezium 等生成的变动记录一一对应...Debezium 某条 Upsert 消息的格式 上图表示 Debezium JSON 的一条更新(Update)消息,它表示上游已将 id=123 的数据更新,且字段内包含了更新前的旧值,以及更新后的新值...(op)) { // 如果是更新 (u) 消息 before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的数据类型设置为撤回 (-U) after.setRowKind...对于插入 +I 和删除 D,都只需要一条消息即可;而对于更新,则涉及删除旧数据和写入新数据,因此需要 -U 和 +U 两条消息来对应。...上游 Debezium 崩溃导致写入重复数据,结果不准 Debezium 服务端发生异常并恢复后,由于可能没有及时记录崩溃前的现场,可能会退化为 At least once 模式,即同样的数据可能被发送多次

    4.6K52

    深入解读flink sql cdc的使用以及源码分析

    flink消费cdc数据 在以前的数据同步中,比如我们想实时获取数据库的数据,一般采用的架构就是采用第三方工具,比如canal、debezium等,实时采集数据库的变更日志,然后将数据发送到kafka等消息队列...data : 代表操作的数据。如果为'INSERT',则表示行的内容;如果为'UPDATE',则表示行的更新后的状态;如果为'DELETE',则表示删除前的状态。...还支持其他的数据库的同步,比如 PostgreSQL、Oracle等,目前debezium支持的序列化格式为 JSON 和 Apache Avro 。...postgres数据库,我们需要把connector替换成postgres-cdc,DDL中表的schema和数据库一一对应。...也就是说flink底层是采用了Debezium工具从mysql、postgres等数据库中获取的变更数据。

    5.6K30

    Flink CDC 原理、实践和优化

    的数据流)看做是同一事物的两面,因此内部提供的 Upsert 消息结构(+I 表示新增、-U 表示记录更新前的值、+U 表示记录更新后的值,-D 表示删除)可以与 Debezium 等生成的变动记录一一对应...[image.png] 上图表示 Debezium JSON 的一条更新(Update)消息,它表示上游已将 id=123 的数据更新,且字段内包含了更新前的旧值,以及更新后的新值。...(op)) { // 如果是更新 (u) 消息 before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的数据类型设置为撤回 (-U) after.setRowKind...对于插入 +I 和删除 D,都只需要一条消息即可;而对于更新,则涉及删除旧数据和写入新数据,因此需要 -U 和 +U 两条消息来对应。...上游 Debezium 崩溃导致写入重复数据,结果不准 Debezium 服务端发生异常并恢复后,由于可能没有及时记录崩溃前的现场,可能会退化为 At least once 模式,即同样的数据可能被发送多次

    25.5K189

    《一文读懂腾讯云Flink CDC 原理、实践和优化》

    dynamic_tables.html),因此内部提供的 Upsert 消息结构(+I 表示新增、-U 表示记录更新前的值、+U 表示记录更新后的值,-D 表示删除)可以与 Debezium 等生成的变动记录一一对应...上图表示 Debezium JSON 的一条更新(Update)消息,它表示上游已将 id=123 的数据更新,且字段内包含了更新前的旧值,以及更新后的新值。...(op)) { // 如果是更新 (u) 消息 before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的数据类型设置为撤回 (-U)...对于插入 +I 和删除 D,都只需要一条消息即可;而对于更新,则涉及删除旧数据和写入新数据,因此需要 -U 和 +U 两条消息来对应。...上游 Debezium 崩溃导致写入重复数据,结果不准 Debezium 服务端发生异常并恢复后,由于可能没有及时记录崩溃前的现场,可能会退化为 At least once 模式,即同样的数据可能被发送多次

    3K31

    降本百万!Notion 基于Apache Hudi构建LakeHouse

    Blocks 面临的挑战是它们所代表的数据规模:Notion 的数据倍增率为六个月到一年。这是令人震惊的,特别是考虑到 200 亿区块的起点。表 1 显示了增长率。...当团队努力寻找解决这些扩展难题的方法时,他们发现了一种可能提供线索的模式。他们注意到只有大约 1% 的块被更新插入(更新记录的操作,或者如果记录尚不存在则插入它)。...因此,与通常的情况一样,与表的大小相比,总更新插入量实际上相当小,如图 4 所示。...• 开箱即用的 Postgres 集成:Debezium 变更数据捕获 (CDC) 平台与 Postgres 和 Hudi 一起开箱即用,这一点至关重要,因为这显着加快了实施速度。...新的基础设施将数据从 Postgres 摄取到 Debezium CDC,该数据通过 Kafka 传输,然后馈送到 Hudi 以针对 Hudi 数据集进行批量增量更新,最后推送到下游到 Apache Spark

    19010

    【Apache Doris】Flink Doris Connector 整库同步使用指南

    而flink-connector-xx-cdc 只有该 connector 的代码,不包含其所需的依赖,提供 datastream 作业使用,用户需要自己管理所需的三方包依赖,有冲突的依赖需要自己做 exclude...选择的分片列需要保证不存在数据的更新操作(比如从 1 更新到 2),如果存在更新操作,则只能保证 At-Least-Once 语义。...③ --ignore-default-value "true"/"false"(since 1.5.0) 该参数主要是针对原表的schema 设置的default 值,但是插入的为null,如果不设置为...然而,对于 SQL Server 和 Db2 的情况,当表结构发生更改时,Debezium 生成的 JSON 日志中的 DDL 属性通常为 null。...例如,如果您的数据时区为 UTC+3,可以通过以下设置来调整: --mysql-conf debezium.date.format.timestamp.zone="UTC+3" 这样的设置确保数据同步过程中时间戳正确地反映了数据的原始时区

    47510

    如何使用 Kafka、MongoDB 和 Maxwell’s Daemon 构建 SQL 数据库的审计系统

    社区版可能会缺失这样的插件。以 MySQL 为例,审计日志插件只有企业版中才能使用。...b.为数据添加一个版本号,然后每次更新都会插入一条已递增版本号的数据。 c.写入到两个数据库表中,其中一张表包含最新的数据,另外一张表包含审计跟踪信息。...应用程序执行数据库写入、更新或删除操作。 SQL 数据库将会以 ROW 格式为这些操作生成 bin 日志。这是 SQL 数据库相关的配置。...localhost:9092 上述的命令会给我们显示一个提示,从中可以输入消息内容,然后点击回车键,以便于发送消息到 Kafka 中。...最终测试 最后,我们的环境搭建终于完成了。登录 MySQL 数据库并运行任意的插入、删除或更新命令。如果环境搭建正确的话,将会在 mongodb auditlog 数据库中看到相应的条目。

    1.1K30

    基于 Flink SQL CDC 的实时数据同步方案

    RowKind 里面包括了插入、更新前、更新后、删除,这样和数据库里面的 binlog 概念十分类似。...通过 Debezium 采集的 JSON 格式,包含了旧数据和新数据行以及原数据信息,op 的 u表示是 update 更新操作标识符,ts_ms 表示同步的时间戳。...通过 Debezium 订阅业务库 MySQL 的 Binlog 传输至 Kafka ,Flink 通过创建 Kafka 表指定 format 格式为 debezium-json ,然后通过 Flink...包含插入/更新/删除,只有付款的订单才能计算进入 GMV ,观察 GMV 值的变化。 ?...因为 group by 的结果是一个更新的结果,目前无法写入 append only 的消息队列中里面去。更新的结果写入 Kafka 中将在 1.12 版本中原生地支持。

    3.7K21

    如何使用发件箱模式实现微服务的 Saga 编排

    Debezium 是一个分布式的开源数据变更捕获平台,为使用发件箱模式的编排式 Saga 流提供了健壮和灵活的基础。 在转向微服务的时候,我们意识到的第一件事情就是单个服务都不是孤立存在的。...但是,好朋友是不会让自己的朋友进行双重写入的,发件箱模式提供了一个非常优雅的方式来解决这个问题: 图 2:安全地更新数据库并通过发件箱模式发送消息到 Kafka 我们不会在更新数据之后直接发送消息,而是让服务基于同一个事务执行正常的更新并将消息插入到数据库中一个特定的发件箱表中...只有在一个分区内部,才能确保消费者接收到消息的顺序与生产者发送消息的顺序完全一致。...因为代理没有接收到消息已经得到处理的确认信息,所以在一定的时间之后,它就会重复性地重发该消息,直到得到确认为止。...Debezium connector 在发送发件箱消息给 Kafka 之后就崩溃了,此时还没有在源数据库事务日志中提交偏移(offset)。

    66230

    Flink CDC 新一代数据集成框架

    依赖表中的更新时间字段,每次执行查询去捕获表中的最新数据 无法捕获的是删除事件,从而无法保证数据一致性问题 无法保障实时性,基于离线调度存在天然的延迟 基于日志的CDC 实时消费日志,流处理。...每条RowData都有一个元数据RowKind,包括4种类型,分别是插入、更新前镜像、更新后镜像、删除,这四种类型和数据库里面的binlog概念保持一致 而Debezium的数据结构,也有一个类似的元数据字段...即使机器或软件出现故 障,既没有重复数据,也不会丢数据。 幂等就是一个相同的操作,无论重复多少次,造成的效果和只操作一次相等。...这种方案中利用Kafka消息队列做消费解耦,binlog可以提供其他业务系统的应用,消费端可采用kafka Sink Connector或者自定义消费程序,但是由于原生Debezium中的Producer...与方案一的不同就是,采用了Flink通过创建Kafka表,指定format格式为debezium-json,然后通过Flink进行计算后或者直接插入到其他外部数据存储系统。

    3.2K31
    领券