本文基于Kafka-clients:1.1.0版本。
此图描述的是用户启动一个线程使用KafkaProducer进行消息发送的大概流程。除了用户主线程,KafkaProducer在新建实例时还会启动一个设置为daemon的Sender线程。
图中红线画出的用户主线程调用栈。当调用send方法发送record后会经历如下流程:
下面由代码分析一下,为了清晰起见省略了部分内容。
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// 将topic放入metadata中,如果已存在就刷新过期时间
metadata.add(topic);
// 从内存中fetch cluster元数据
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic); // 找到了直接return
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// 标识需要进行元数据更新,并且等待至更新完成或抛出超时异常
do {
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
//省略部分超时、权限异常检测代码
......
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//省略部分
//......
byte[] serializedKey;
try {
//序列化key
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
//序列化value
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers);
//估算序列化后大小,确保不会超过max_request_size或者total_memory_size
ensureValidRecordSize(serializedSize);
}
//省略部分
//......
}
消息会被append进accumulator缓存结构中的双端队列,下面用一张图解释一下这个缓存结构。消息会从对应双端队列的尾部batch被写入。在Sender线程中每次会从头部batch拉取。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
//根据TopicPartition获得对应双端队列dq,没有则创建
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
//尝试向已有dq中的last batch已写入此条record
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// dq中的last batch无法再添加record,于是申请一个新的batch,并且添加到dq的last
// 从BufferPool中拿出一个buffer
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
//两次获取dq锁的过程中可能有其它线程创建了可用的batch,如果tryAppend成功则直接返回
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
//将record append进刚刚新建的batch
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
// 将申请的batch放入dp last
dq.addLast(batch);
incomplete.add(batch);
// 避免在finally模块中释放这次申请的buffer
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
// 如果使用了其它线程申请的buffer则释放此线程申请的buffer
free.deallocate(buffer);
}
}
KafkaProducer的构造函数中会启动一个新线程,用来跑Sender任务,所以这里我们将它称为sender线程,实际的线程名为"kafka-producer-network-thread | $clientId"。
Sender类实现了Runnable接口,实现的run()方法逻辑也很简单,一直循环下面几个步骤直到close被调用:
幂等的相关实现后面会进行讲解,这里主要讲解步骤2,3。
private long sendProducerData(long now) {
//获取本地元数据
Cluster cluster = metadata.fetch();
// 获取存在待发送分区对应的node
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果本地元数据中没找到对应leader信息,强制元数据更新
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 删除没有准备好发送的node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 检查是否能向此node发送消息(这里会保证等待同一node的上次request send发送完成)
if (!this.client.ready(node, now)) {
iter.remove();
// 获得disconnected状态的node剩余backoff时间
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 从Accumulator中获取准备发送的batchs
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// 如果max.inflight.requests = 1,则将待发送的partition锁住
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
//从accumulator中拿出timeout的batch(max.inflight.requests > 1的情况下过期不保回调顺序)
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
for (ProducerBatch expiredBatch : expiredBatches) {
//回调timeout异常
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
//......
}
// 如果存在可发送数据并且准备好发送的node,会以0的timeout来返回pollTimeout,否则此timeout会由当前无法发送消息的nodes决定(比如 lingering, backing off)。总的来说:
// 1. 如果存在partition的数据准备好发送,pollTimeout=0
// 2. 另外如果存在partition有数据写入accumulator但是没有准备好发送,pollTimeout=min{当前时间与linger过期时间差值(重发的batch考虑retryBackoff差值), 当前时间与disconnected node的reconnectBackoffMs的时间差值}
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
pollTimeout = 0;
}
//将batchs构建成request发送到缓存
sendProduceRequests(batches, now);
return pollTimeout;
}
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
// 遍历batches,以node为单位发送request请求
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
//......
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
//......
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
// 如果producer开启了事务属性则获取transactionId
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
// 用produceRecordsByPartition构建requestBuilder
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
// 用recordsByPartition构建回调类,client.poll收到response后会调用callback.onComplete方法。
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
//调用client.send发送消息
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
这个内容在上一篇写过,这里就不详细列selector的方法实现了,只需要关注poll方法,handleCompletedSends生成已经发送成功并且不需要ack的请求对应response,handCompletedReceives生成等待收到回复的请求对应response。最后统一由completeResponses进行处理。
public List<ClientResponse> poll(long timeout, long now) {
if (!abortedSends.isEmpty()) { // 如果存在abortedSends,直接处理,不经过selector的poll
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
//判断是否要更新元数据,如果需要的话发送更新请求(会选择inflight请求数量最少的ready node发送请求)
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
// 完成一次selector poll,最大阻塞时间为timeout,metadataTimeout,requestTimeoutMs的最小值
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
long updatedNow = this.time.milliseconds();
// 初始化response list
List<ClientResponse> responses = new ArrayList<>();
// 如果是不等待回复的请求则直接生成response放入list
handleCompletedSends(responses, updatedNow);
// 根据inFlightRequests的发送顺序生成已完成回复的response放入list
handleCompletedReceives(responses, updatedNow);
// 处理断开连接的请求,生成response放入list
handleDisconnections(responses, updatedNow);
// 更新connectionStates中node的连接状态
handleConnections();
// 需要的话发送api version获取请求
handleInitiateApiVersionRequests(updatedNow);
// 处理超时请求,生成response放入list
handleTimedOutRequests(responses, updatedNow);
// 完成所有response的处理
completeResponses(responses);
return responses;
}
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
// 调用response中的callback进行处理,producer端可以参考Sender类的handleProduceResponse方法
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
最终所有response都会在completeResponses中处理,而处理所调用的方法就是我们在构建request时传入的callback.onComplete(response)。所以回到Sender类中的handleProducerResponse方法就可以看到producer收到broker回复后的处理逻辑了。
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
//......
if (response.wasDisconnected()) {
// 处理连接断开batches
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
} else if (response.versionMismatch() != null) {
// 处理协议版本不匹配
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);
} else {
// 如果response有内容,则解析后completeBatch
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now);
}
} else {
// 如果没有response,即ack = 0 时会走此处逻辑,直接completeBatch
for (ProducerBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
}
}
}
}
completeBatch负责进行batch的retry reenqueueBatch(如果是可重试),以及回调的触发batch.done(不可重试或者success)。
设置producer的"enable.idempotence"=true就开启了producer的幂等逻辑。这个配置是为了实现producer以精确一次的语义进行消息发送(只能保证单producer实例内的精确一次,如果存在producer的failover切换则无法保证)。开启幂等producer还额外要求配置 “max.in.flight.requests.per.connection” <= 5, “retries” > 0, 以及 “ack” = all。
TransactionManager类负责幂等相关的实现,在sender发送具体消息之前会从broker端获取一个producer id(PID),并且发送到broker的每批消息都会被赋予一个序列号用于消息去重。kafka会把序列号与消息一起保存在底层日志中,这样即使分区的leader副本挂掉,新选出来的leader broker也能执行消息去重工作(所以幂等要求ack=all,即所有replicas都写入才ack)。
每对(PID, 分区号)都会有对应的递增sequence,Broker收到消息后,如果发现sequence <= old sequence便不会写入日志中,返回client DUPLICATE_SEQUENCE_NUMBER异常或者OutOfOrderSequence异常,client会以发送成功的逻辑来处理此异常;如果sequence = old sequence + 1,则正常写入并返回成功;如果sequence > old sequence + 1,也会拒绝此次写入,并且返回client OUT_OF_ORDER_SEQUENCE_NUMBER异常,幂等producer将会按照sequence顺序reenqueue发送失败的消息,等待发送。
这里引用一段KAFKA-5494 issue的描述:
Currently, the idempotent producer (and hence transactional producer) requires max.in.flight.requests.per.connection=1.
This was due to simplifying the implementation on the client and server. With some additional work, we can satisfy the idempotent guarantees even with any number of in flight requests. The changes on the client be summarized as follows:
With the changes above, we would get the benefits of multiple inflights in normal cases. When there are failures, we automatically constrain to a single inflight until we get back in sequence.
With multiple inflights, we now have the possibility of getting duplicates for batches other than the last appended batch. In order to return the record metadata (including offset) of the duplicates inside the log, we would require a log scan at the tail to get the metadata at the tail. This can be optimized by caching the metadata for the last ‘n’ batches. For instance, if the default max.inflight is 5, we could cache the record metadata of the last 5 batches, and fall back to a scan if the duplicate is not within those 5.
最后说的很清楚,因为kafka broker当前实现会缓存the last 5 batches用于判断duplicates,而在client端TransactionManager中的inflightBatchesBySequence也是初始化了缓存长度为5的PriorityQueue。所以强制规定max.in.flight.requests.connection是出于效率上的考虑。
配置名 | 含义 |
---|---|
acks | 消息发送的确认机制。0:无需确认 1: 仅需partition leader确认 -1 or all:需要所有replicas确认 |
enable.idepotence | true:幂等producer false:非幂等producer |
retries | 设置遇到可重试异常时最多重试多少次 |
max.block.ms | 当buffer满了或者metadata不可用时最多阻塞多久(kafkaProducer.send() & KafkaProducer.partitionsFor()) |
buffer.memory | 可用用来缓存records的bytes大小,满了后会阻塞send,此配置不能表示KafkaProducer用的所有内存,还有一些内存用来维护inflight请求以及压缩 |
request.timeout.ms | 请求的超时时间。 在2.1.0版本前还表示消息的最大留存时间,2.1.0后此语义被delivery.timeout.ms取代 |
connections.max.idle.ms | 连接的最大idle时间 |
bootstrap.servers | 启动连接broker地址 |
max.in.flight.requests.per.connection | 同一个连接上的未确认请求的最大数量 |
batch.size | 默认的producer发送消息的batch大小 |
linger.ms | 在对应partition消息大小未达到batch.size时增加的latency,如果对于同一个broker有多个partition leader,则已最先需要发送的partition为准 |
max.request.size | 单个请求的最大bytes大小 |
compression.type | 是否压缩请求数据,提供none, gzip, snappy, lz4四种配置 |
还有很多配置项没有列出,kafka给了用户非常多配置项,在进行KafkaProducer的配置前一定要先确定好自己的需求。比如说是否能容忍可能的消息丢失;是否需要严格保证消息的顺序;是否需要尽可能减少Broker端日志的重复数据;是否需要增加适当的消息延时(linger batch, compression)来提高吞吐等。
我们在使用kafka时上游还有一个binlog解析器,为了确认消息整体不丢,解析器为的每一条消息分配了一个递增id,并且在ack时会验证消息是否为期望的递增id,如果不是就抛出异常。
因为我们只需要保证特定的binlog顺序(表级别),消息会被hash到不同分区上,因此kafka的回调不一定会按照全局的上游顺序被调用。而同步的kafka send效率又太低,所以我们实现了一个ackBuffer来解决这个问题。
实现目标:
//获得buffer中的下一个元素,在buffer为空或者待取出id没有被标记过的时候阻塞(waiting for signal)
long get() throws InterruptedException;
//标记传入Sequence的id
boolean mark(long markSequence) throws InterruptedException;
//放入id,返回在buffer中的Sequence(可以计算出数组下标),buffer满时会被阻塞(waiting for signal)
long put(long id) throws InterruptedException;
void setBufferSize(int bufferSize); //设置BufferSize
void start(); //初始化buffer
void stop(); //关闭buff,重置putSequence,getSequence,清除buffer
在kafka-client 1.1.0版本开发producer的时候发现这个配置非常坑,正常来说TimeoutException是一个可重试异常,但是在实际代码中即使设置了max retries的情况下,还是可能抛出TimeoutException到send callback。
原因在于request.timeout.ms不仅用于判断发送消息的超时,还会用来判断缓存在RecordAccumulator中的batch是否过期(类似于消息发送最大留存时间),一个配置项存在多重含义,配置灵活性低,对用户不直观,当前被迫将request.timeout.ms设置为max值。
用了个野路子,通过和Broker连接的idle来检测消息发送异常(超时),目前能很快感知到集群机器的异常并且切换broker重发。
最近看到kafka也修复了这个配置问题(fix version:2.1.0),通过增加delivery.timeout.ms来配置消息最长发送时间。
TransactionManager会根据消息发送的分区产生递增sequence并保存在一个 TopicPartition -> Integer 的Map当中。在我们producer长时间不发版的情况下,可能因为某个partition(这和我们的发送顺序保证逻辑相关,某些表会频繁刷数据)累计发送数据特别多而使这个Integer溢出。
当前最新版本的kafka实现依然还是Integer,感觉和broker端需要存储这个值到日志相关,想要省存储空间。现在的做法只能是在出现异常告警后或者定期的重启一轮Producer,这个问题不会导致数据丢失或者乱序。
相关链接
https://issues.apache.org/jira/browse/KAFKA-5494 https://kafka.apache.org/documentation/ https://issues.apache.org/jira/browse/KAFKA-5886 https://issues.apache.org/jira/browse/KAFKA-4515 https://issues.apache.org/jira/browse/KAFKA-5621
作者:
王植田,笔名zwangbo,就职于拼多多,公众号是 zwangbo的技术杂文,热爱技术和分享,欢迎交流。
领取专属 10元无门槛券
私享最新 技术干货