在讲述Old Producer的开篇,我们展示过sync和async的代码,不妨这里在赘述一下:
这里可以看到sync的情况什么都不需要做,而async的情况就需要开启ProducerSendThread, 而在ProducerSendThread构造函数列表中的queue就是指客户端暂存消息的LinkedBlockingQueue。
注意在Scala语言中是没有break语句的,这点与Java不同,Scala的相当于每个case语句末尾都加上了一个break语句,所以读到相关源码的同学不要误以为是Java中语法的那种情况。
在async模式下,消息发送不是直接调用sync模式下的DefaultEventHandler的handle()方法,而是调用kafka.producer.Producer的asyncSend方法如下(只展示主要内容):
一长串Scala源码会不会看的一头雾水?这里来一步一步的分析一下:
3. tryToHandle(events)就是调用DefaultEventHandler类中的handle()方法,接下去的工作就和Sync模式的相同。
4. 如果在等待的时间内没有获取到相应的消息,那么无需等待 events.size >= batchSize条件的满足就可以发送消息。
在讲述Sync模式的时候笔者画过一份结构图,这里也来画一幅Async结构图来收尾,与Sync模式的类似,具体如下:
篇幅限制,更多内容将在下一篇中进行介绍,关注本微信公众号,了解更多细节。
领取专属 10元无门槛券
私享最新 技术干货