通过上一课时的介绍我们了解到,业务线程使用 KafkaProducer.send() 方法发送 message 的时候,会先将其写入RecordAccumulator 中进行缓冲,当 RecordAccumulator...本课时我们就重点来看一下 RecordAccumulator 这个缓冲区的结构。...首先,我们从上图中可以看出,RecordAccumulator 会由业务线程写入、Sender 线程读取,这是一个非常明显的生产者-消费者模式,所以我们需要保证 RecordAccumulator 是线程安全的...RecordAccumulator 、ProducerBatch、MemoryRecordsBuilder 这三个核心类的关系如下图所示: message 格式 既然我们准备深入 KafkaProducer...、ProducerBatch、BufferPool 等底层组件,以及 RecordAccumulator 的核心方法。
〇、前言 在上一篇文章《连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka》中,我们介绍了Main Thread的工作原理,那么在本篇文章中,我们继续介绍第二部分内容:RecordAccumulator...一、RecordAccumulator 在上文中,我们介绍了主线程(Main Thread)的执行流程,当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor)、序列化器...(Serializer)和分区器(Partitioner),最后会暂存到消息收集器(RecordAccumulator)中,那么,本节就来针对其进行介绍。...RecordAccumulator的主要作用是暂存Main Thread发送过来的消息,然后Sender Thread就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数...properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000); 在RecordAccumulator中,我们通过getOrCreateDeque
①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。...发送消息到RecordAccumulator:主线程将创建的消息发送到一个双端队列(deque)叫做RecordAccumulator。...Sender线程的主要工作如下: 从RecordAccumulator拉取消息:Sender线程定期轮询(poll)RecordAccumulator,检查是否有新消息需要发送。...RecordAccumulator: RecordAccumulator是Producer内部的一个共享变量,用于暂存即将发送到Kafka broker的消息。...发送原理小结 总结一下,Kafka的Producer采用异步发送消息的方式, 主线程负责创建和发送消息到RecordAccumulator, 而Sender线程负责从RecordAccumulator
主线程: 负责将消息发送到消息累加器(RecordAccumulator) . Sender线程: 负责将消息累加器(RecordAccumulator)中获取消息并发送到Broker....RecordAccumulator: 主要用来缓存消息, Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory...主线程中发送过来的消息都会被加到 RecordAccumulator 的某个双端队列( Deque )中, RecordAccumulator 的内部为每个分区都维护了 个双端队列,队列中的内容就是 Producer...在 RecordAccumulator 的内部还有一个 BufferPool, 它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用 。
Kafka 生产者结构 Kafka Producer 主要有三个部分组成:主线程、Sender 线程、RecordAccumulator。...主线程:执行序列化、分区、拦截器处理等主要操作,并将消息缓存到 RecordAccumulator 中; Sender 线程:从 RecordAccumulator 中拉数据,发送到 broker; RecordAccumulator...:缓存主线程的消息,并提供给 Sender 线程; 生产者执行流程: 主线程处理数据,包括处理拦截器操作、序列化、分区; 主线程将数据发送给 RecordAccumulator; RecordAccumulator...ProducerRecord) 压缩成 (ProducerBatch),并以分区 (TopicPartition) - 消息队列 (Deque(ProducerBatch)) 的消息存储主线程发来的消息; Sender 从 RecordAccumulator
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。...main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。..."batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator...16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator...16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator
,主要用于存放消息(KafkaProducer主线程往RecordAccumulator中写消息,Sender线程从RecordAccumulator中读消息并发送到Kafka中) 6.解析Broker...// 唤醒Sender线程 Sender.wakeup RecordAccumulator RecordAccumulator是消息队列用于缓存消息,根据TopicPartition对消息分组 重点看下...RecordAccumulator.applend追加消息的流程 // 记录进行applend的线程数 appendsInProgress.incrementAndGet(); // 根据TopicPartition...// 根据准备好的节点信息从缓冲区中获取topicPartion对应的Deque队列中取出ProducerBatch信息 RecordAccumulator.drain // 将消息转移到每个节点的生产请求队列中...通过上面的介绍,我们梳理出了Kafka生产消息的主要流程,涉及到主线程往RecordAccumulator中写入消息,同时后台的Sender线程从RecordAccumulator中获取消息,使用NIO
Kafka 生产者发送消息的执行流程如下:默认情况下,所有的消息会先缓存到 RecordAccumulator 缓存中,再由 Sender 线程拉取消息发送到 Kafka 服务器端,通过 RecordAccumulator...1.1 RecordAccumulator 缓存作用暂存消息:RecordAccumulator 是 Kafk a生产者中的一个关键组件,它充当了一个缓存的角色,用于暂存主线程(Main Thread)...批量发送:RecordAccumulator 通过批量收集消息,减少了单个消息发送的网络请求次数,从而提高了发送效率。...Sender 线程可以从 RecordAccumulator 中批量获取消息,一次性发送到 Kafka 集群,减少了网络传输的资源消耗。...Sender 线程会定期轮询 RecordAccumulator,检查是否有新消息需要发送。
RecordAccumulator 该类扮演的是一个队列的角色,将records追加到MemoryRecords实例中,用于发送到server端。 RecordBatch 一批准备发送的消息。...RecordAccumulator维护了一个ConcurrentMap> batches; RecordBatch维护了一个MemoryRecords...F),构建了一个RecordAccumulator。此时需要关注的两个配置是 batch.size:批量发送的大小。 linger.ms:超时发送的时间。...获取分区号 int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); 3),将消息追加到RecordAccumulator...//将消息追加到RecordAccumulator RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp
在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。...Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。...RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗提升性能。...RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory的配置,默认值为32MB。...主线程中发送过来的消息都会被追回到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch
Sender线程 负责从RecordAccumulator中获取消息并发送到Kafka中。...❞ 「RecordAccumulator」 ❝ RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。...RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory进行配置,默认值是32MB。...❞ 「ProducerBatch」 ❝ Main线程发送过来的消息会被追加到RecordAccumulator的Deque(双端队列)中,在RecordAccumulator的内部每个Partition...❞ 源码分析及图解原理 RecordAccumulator 在RecordAccumulator中,最核心的参数就是: private final ConcurrentMap<TopicPartition
将 message 封装成 ProducerRecord 写入到 RecordAccumulator 中暂存,RecordAccumulator 对象中维护了多个队列,可以看做是 message 的缓冲区...调用 append()方法,将 message 写入到 RecordAccumulator 中暂存。...最后,唤醒 Sender 线程,后续就由 Sender 线程从 RecordAccumulator 中批量发送 message 到 kafka 集群。...我们前面介绍整个 KafkaProducer 流程的时候说过,RecordAccumulator 是一个缓冲区,主线程发送的 message 会先进入 RecordAccumulator,然后 Sender...我们看一下 KafkaProducer.doSend()方法中,有这么一个片段: // 尝试向RecordAccumulator中追加message RecordAccumulator.RecordAppendResult
再往下看,到了第一个重点:RecordAccumulator self...._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self...._batches = collections.defaultdict(collections.deque) RecordAccumulator类是啥呢?...里 RecordAccumulator的大小由参数buffer_memory控制,batch的大小由参数batch_size控制。...send方法实际只是往RecordAccumulator容器里塞消息,Sender线程则在后台不停轮训,符合条件就发送。
调用RecordAccumulator.append()方法,将处理后的数据扔进RecordAccumulator(缓存对象)的RecordAppendResult类属性中; 5....RecordAccumulator.append()方法首先将数据进行队列化放在Deque对象中,Deque包含多个ProducerBatch; 6....对象的RecordAccumulator.append()方法,返回RecordAppendResult对象; RecordAccumulator.RecordAppendResult result =...,RecordAccumulator持有RecordAppendResult对象,其future作为整个producer.send()方法的返回值; public final static class...通过getOrCreateDeque(tp)得到deque队列(持有ProducerBatch对象),ProducerBatch是最小的发送数据实体,RecordAccumulator计算字节数并分配本地资源
2、然后调用上一课时介绍的 RecordAccumulator.ready() 方法,了解 RecordAccumulator 的缓存情况,选出可以向哪些Node 节点发送请求。...expiredBatches() 方法用来获取 RecordAccumulator 中已经过期的 ProducerBatch 集合。...Selector 通过上面的小节,我们已经了解了 RecordAccumulator 中的数据转换成 ClientRequest 的流程。...如果 RecordAccumulator 中有部分数据,但发送条件没有 ready,则 timeout 为消息的超时时长。...如果 RecordAccumulator 没有数据,则 timeout 为 kafka 元数据过期的时长。
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。...main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。...16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator...16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator
NetworkClient中的InFlightRequests、Sender中的inFlightBatches变量和RecordAccumulator中的IncompleteBatches,都用到了这个机制...incomplete 在RecordAccumulator::append中,内存被申请,该Batch被添加到incomplete。...在响应收到后,RecordAccumulator::deallocate被调用,移除对应的Batch,释放内存 ---- Sender::sendProduceRequest中定义的RequestCompletionHandler
2、RecordAccumulator:是一个记录收集器,用于收集客户端发送的消息,并将收集到的消息暂存到客户端缓存中。...流程详解:消息发送的过程中,涉及到两个线程协同工作:1、主线程首先将业务数据封装成ProducerRecord对象,2、之后调用send()方法将消息放入RecordAccumulator(消息收集器,...也可以理解为主线程与Sender线程直接的缓冲区)中暂存,3、Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去, 需要注意的是...hash,然后对分区数取模(Utils.murmur2(key) % numPartitions) ~ 3、如未指定且没key,则轮询发送给分区(低版本采用随机)5、临时缓存(存储) RecordAccumulator...采用了双端队列(Deque)数据结构来临时存储 目的:提高发送数据的吞吐量 确定消息发送的分区后,会在RecordAccumulator寻找对应的Deque,找不到对应的Deque则新建
RecordAccumulator accumulator 消息记录累积器,消息追加的入口(RecordAccumulator 的 append 方法)。...由于在讲解 Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,故接下来重点对上述涉及到 RecordAccumulator 的方法进行一个详细剖析,加强对...2、RecordAccumulator 核心方法详解 2.1 RecordAccumulator 的 ready 方法详解 该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。...2.2 RecordAccumulator 的 drain方法详解 RecordAccumulator#drain public Map> drain...RecordAccumulator#drainBatchesForOneNode private List drainBatchesForOneNode(Cluster cluster