Apache Kafka 是一个分布式流处理平台,主要用于实时数据的传输和处理。在现代的数据密集型应用中,事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持,包括其基本概念、架构、使用方法以及相关代码示例和运行效果。
Kafka 的事务性支持在 0.11.0 版本中引入,目的是提供跨多个 topic 和 partition 的原子消息写入能力。这意味着事务消息要么全部写入成功,要么全部失败,从而确保数据的一致性和完整性。
Kafka 的事务特性主要用于以下场景:
Kafka 事务涉及三个主要组件:
在 Kafka 中,每个事务都有一个唯一的 Transactional ID,用于标识事务的生命周期。事务的状态通过 Broker 中的事务协调器(Transaction Coordinator)进行管理。
要使用 Kafka 事务性支持,首先需要配置生产者。下面是一个配置事务性生产者的示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class TransactionalProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("my-topic", "key1", "value1")).get();
producer.send(new ProducerRecord<>("my-topic", "key2", "value2")).get();
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
throw e;
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.close();
}
}
为了正确消费事务性消息,需要配置隔离级别(isolation.level)为“读已提交(read_committed)”:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Collections;
import java.util.Properties;
public class TransactionalConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
当事务性生产者运行时,如果事务成功提交,我们可以看到以下输出:
Successfully sent message: key1, value1
Successfully sent message: key2, value2
如果事务失败并被回滚,我们将不会看到任何消息发送成功的日志。
事务性消费者只会读取已提交的事务消息。例如,如果我们发送了两条消息,但只提交了一条,那么消费者只会读取已提交的那条消息。
offset = 0, key = key1, value = value1
未提交的消息将不会被读取,从而确保数据的一致性。
Kafka 的事务性支持提供了一种确保消息一致性和完整性的方法,尤其适用于需要跨多个 topic 和 partition 进行原子写入的场景。通过配置事务性生产者和消费者,我们可以实现端到端的 Exactly Once 语义,防止消息丢失或重复消费。希望本文能帮助你更好地理解和使用 Kafka 的事务特性。