先讲下前几天没有发文章,首先是因为我老爸住院,住了五天,跑上跑下的没时间整理这些文章哦。其次接下来要发的主题还没确定。昨天想了下kafka应该专门弄一期,所以接下来可能会有一个星期左右时间讲解kafka
好啦,昨天预热了下kafka基础知识,今天开始进入正文。今天讲kafka的消息生产者。
kafka本身制定了一套二进制通信协议,各种语言客户端可以根据这个二进制协议进行开发并且产生数据发送给kafka
除了java本身在0.9版本以后支持,其他语言的客户端下载地址https://cwiki.apache.org/confluence/display/KAFKA/Clients
接下来讲解的都是以java为客户端的生产者哦,下图是客户端和kafka服务器的工作流程
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//必填
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必填
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必填
props.put("acks", "1"); //0不理睬leader broker端处理结果;all或者-1表示leader broker写入本地日志同事还会等待ISR中的其他副本成功写入;1折中,leader broker写入就返回结果
props.put("retries", 3); //重试
props.put("batch.size", 16384); //默认16KB,就是上图的batch1、batch2
props.put("linger.ms", 1); //消息在缓冲池里延迟多久发送,原则是稍微长点让batch填满可以增加producer的吞吐量
props.put("compression.type", "lz4"); //有三种压缩GZIP
props.put("buffer.memory", 33554432); //消息缓冲池大小,默认32MB
props.put("max.request.ms", 10485760);//producer发送请求的大小
props.put("request.timeout.ms", 30000); //producer发送请求给broker后,broker在规定时间没返回结果就抛出请求超时异常
Producer producer = new KafkaProducer(props);
for(int i = 0; i
// producer.send(new ProducerRecord("test", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord("test", "Hello"));
producer.close();
}
}
结合上述代码和流程图,生产者工作大致如此
分区可以自定义,消息序列化自定义、拦截器自定义(对消息发送到缓冲区前拦截以及broker反馈结果后拦截)等可百度,这里就不讲了,网上很多。类似spring boot配置
--------------------------------------------------------------------------------------------------------
因为producer发送消息,因为异步以及重试机制可能存在消息丢失或者消息乱序,这里有一个无消息丢失配置
produce端配置
acks=all or -1
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=1 //防止消息乱序问题,设置1则producer在某个broker发送响应之前将无法再给该broker发送PRODUCE请求
使用带回调机制的send发送消息,即KafkaProducer.send(record,callback)
Callback逻辑中显式地立即关闭producer,使用close(0) //处理乱序问题,如果不这样producer会将未完成的消息发送出去而造成消息乱序
broker端配置
unclean.leader.election.enable=false //不允许非ISR中的副本被选举为leader
replication.factor=3 //三备份原则
--------------------------------------------------------------------------------------------------------
压缩 LZ4最好
Java多线程执行实例KafkaProducer
分区不多使用一个实例KafkaProducer,分区多使用多个KafkaProducer实例
领取专属 10元无门槛券
私享最新 技术干货