原生方式
无论是生产者还是消费者,引入的依赖都是kafka-clients,maven坐标如下:
生产者
kafka生产者对象就是KafkaProducer,构造方式如下:
消息
KafkaProducer构造好后,需要构造待发送的消息。kafka消息对象是ProducerRecord,根据源码可知,构造方式有多种:
创建消息:
下面构造一个最常用的ProducerRecord,只指定topic和value,由kafka去决定分区:
消费者
kafka消费者者对象就是KafkaConsumer,构造方式如下:
订阅并消费
集成spring方式
现在的项目一般标配了spring,通过spring集成kafka能够大大的方便业务开发。集成方式也比较简单,只需增加如下maven坐标:
生产者
spring集成kafka的生产者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):
发送消息
发送消息进行如下封装,封装后如果要发送kafka消息,只需一行代码即可,例如(obj就是要发送的消息对象):
消费者
spring集成kafka的消费者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):
消息处理
由上面的配置可以,指定的topic,其消息由OpenAccountKafkaListener处理,OpenAccountKafkaListener的核心源码如下:
显而易见,spring集成kafka后,消费端的简单的很多。另外,我们在没用使用spring集成kafka时可以拿到kafak消费者异步提交,也可以同步提交,但是集成spring后,如何实现呢?客官老爷们稍安勿躁,继续往下看。
消息处理(第二版)
深入发送消息
前面已经介绍了如何使用kafka生产者发送消息,以及如何用消费者接收消息,包括原生方式和spring集成方式,接下来我们跟踪源码看看消息在调用KafkaProducer中的send()后发送到kafka broker之前需要经过哪些处理。
拦截器
无论是同步调用send(),还是异步调用send()发送消息,最终都是调用下面的方法:
由这段代码可知,消息发送前第一步就是调用拦截器(如果有的话),拦截器可以对消息进行加工。后面会单独有一篇文章详细的分析拦截器。
接下来调用doSend()方法,源码如下:
获取集群信息
由doSend()方法源码可知,获取集群信息的源码就在waitOnMetadata()中,其源码如下:
通过这段源码分析可知,当我们构造KafkaProducer时指定的bootstrap.servers的值,不一定要和kafka集群信息完全一致,kafka-client可以通过参数bootstrap.servers指定的broker,然后从broker上获取到整个kafka集群元数据信息。但是即使是这样,参数bootstrap.servers也建议尽量完整。例如整个集群有3个broker,如果bootstrap.servers只指定了1个broker,那么当这个broker宕机后,虽然集群状态可用。但是
序列化
即经过拦截器链后另一个非常重要的操作:对key&value的序列化。核心代码是如下两行,对key的序列化,调用的方法是构造KafkaProducer时参数key.serializer指定的serializer,对value的序列化,调用的方法是构造KafkaProducer时参数value.serializer指定的serializer:
分区
接下来就是选择分区,核心代码如下:
总结
根据上面的分析可知,消息发送经过的几个重要过程按照先后顺序依次是:拦截器,获取元数据,序列化,选择分区。接下来的文章会一一详细分析这些必要重要的过程。
领取专属 10元无门槛券
私享最新 技术干货