我有一个CDC设置的Postgres DB。
我为Postgres DB部署了Kafka Debezium连接器1.8.0。
POST http://localhost:8083/connectors
身体:
{
"name": "postgres-kafkaconnector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "example.com",
"database.port": "5432",
"database.dbname": "my_db",
"database.user": "xxx",
"database.password": "xxx",
"database.server.name": "postgres_server",
"table.include.list": "public.products",
"plugin.name": "pgoutput"
}
}
我注意到一些奇怪的事情。
在同一表中,当我更新行时,一些行可以生成CDC,但其他行不能生成CDC。
这些行非常相似,除了id
和identifier
是不同的。
-- Updating this row can generate CDC
UPDATE public.products
SET identifier = 'GET /api/accounts2'
WHERE id = '90c21719-ce41-4523-8ad1-ed6b21ecfaf1';
-- Updating this row cannot generate CDC
UPDATE public.products
SET identifier = 'GET /api/notworking/accounts2'
WHERE id = '22f5ebf3-9594-493d-8aa6-649d9fbcefd2';
我检查了我的Kafka容器日志,也没有错误。
有什么想法吗?
发布于 2022-02-22 22:32:35
找到问题了!这是因为我的Kafka连接器postgres-kafkaconnector
最初指向DB (stage1),然后通过更新切换到另一个DB (stage2)。
"database.hostname": "example.com",
"database.port": "5432",
"database.dbname": "my_db",
"database.user": "xxx",
"database.password": "xxx",
但是,它们在最初部署的Kafka Connect I中使用了相同的配置属性:
config.storage.topic
offset.storage.topic
status.storage.topic
由于具有不同DB配置的连接器共享上述Kafka配置属性,而且数据库表模式是相同的,
由于分享同样的卡夫卡补偿,它变得一团糟。
一个简单的修复方法是在部署Kafka连接器在不同的DB上进行测试时,使用不同的名称,比如postgres-kafkaconnector-stage1
和postgres-kafkaconnector-stage2
,以避免卡夫卡主题偏移混乱。
https://stackoverflow.com/questions/71165403
复制相似问题