首先请将 Kafka 的 enable_auto_commit 参数设置为 false,以关闭自动 commit。生产者会在消息序列中的合适位置插入 Checkpoint 消息,消费者消费到 Checkpoint 消息后进行 commit,这样有利于保证消息的完整性。