Apache Kafka是一款开源的消息引擎系统,也是一个分布式流处理平台。除此之外,Kafka还能够被用作分布式存储系统(极少)。
A. 常见的两种消息引擎系统传输协议(即用什么方式把消息传输出去)
B. Kafka在设计之初就旨在提供三个方面的特性:
C. 作为流处理平台,Kafka与其他主流大数据流式计算框架相比,优势有两点:
Kafka体系架构 = M个producer +N个broker +K个consumer+ZK集群
一个topic 可以拥有若干个partition(从 0 开始标识partition ),分布在不同的broker 上, 实现发布与订阅时负载均衡。producer 通过自定义的规则将消息发送到对应topic 下某个partition,以offset标识一条消息在一个partition的唯一性。
一个partition拥有多个replica,提高容灾能力。replica 包含两种类型:leader 副本(负责读写请求)、follower副本(负责同步leader副本消息,通过副本选举实现故障转移)。
partition在机器磁盘上以log 体现,采用顺序追加日志的方式添加新消息、实现高吞吐量。
Kafka类型 | 特点 | 场景 |
---|---|---|
Apache Kafka | 最“正宗”的 Kafka。 1. 迭代速度快,社区响应度高,更高的把控度;2. 只提供读写磁盘文件的连接器,与其他外部系统交互时需要自行编写代码实现; 3. 没有没有提供任何监控框架或工具,但是可以借助Kafka manage、kafka eagler等第三方框架进行监控 | 需要一个消息引擎系统亦或是简单的流处理应用场景,同时需要对系统有较大把控度 |
Confluent Kafka | Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,比如跨数据中心备份、Schema 注册中心以及跨数据中心备份和集群监控等。但相关文档资料不全,普及率较低(Schema 注册中心:集中管理 Kafka 消息格式;REST proxy :支持开放 HTTP 接口的方式访问) | 需要用到 Kafka 的一些高级特性 |
Cloudera/Hortonworks Kafka | 集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理 | 需要快速地搭建消息引擎系统,或者需要搭建的是多框架构成的数据平台且 Kafka 只是其中一个组件 |
Kafka 版本命名规则正式从 4 位
Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0
0.7版本:只提供了最基本的消息队列功能
0.8版本:引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案
0.9.0.0版本:增加了基础的安全认证/权限功能;使用Java重写了新版本消费者API;引入了Kafka Connect组件
0.10.0.0版本:引入Kafka Streams,正式升级成分布式流处理平台
0.11.0.0版本:提供了幂等性Producer API以及事务API;对Kafka消息格式做了重构
1.0和2.0版本:主要还是Kafka Streams的各种改进
操作系统、磁盘、磁盘容量和带宽
A. 操作系统
kafka支持Linux、Windows 和 macOS的服务端部署,但一般选择Linux作为服务器部署(I/O 模型、数据网络传输效率、社区支持度)
上 Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的 I/O 性能。
Kafka 需要在磁盘和网络间进行大量数据传输。Linux 部署 Kafka 能够享受到零拷贝技术(Zero Copy,当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输)所带来的快速数据传输特性。
B. 磁盘
C. 磁盘容量
新增消息数消息留存时间平均消息大小备份数是否启用压缩
D. 带宽
(1) Broker端参数
logdirs
和logdir
zookeeper.connect
。listeners
、advertised.lis- teners
和hostname/port
auto.createtopicsenable
、unclean.leaderelection.enable
和auto.leader.rebal- anceenable
log.retention.{hours minutes ms}
、log.retention.bytes
和message.maxbytes
。(2) Topic级别参数
retentionms
:规定了该Topic消息被保存的时长reten-tionbytes
:规定了要为该Topic预留多大的磁盘空间maxmessage.bytes
:决定了KafkaBroker能够正常接收该Topic的最大消息大小 (3) JVM参数
KAFKA_HEAP_OPTS
KAFKA_JVM_PERFORMANCE_OPTS
:(4) 操作系统参数
Kafka的主题(Topic)是承载真实数据的逻辑容器,主题之下还分为若干个分区,Kafka消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会在某一个分区中,而不会在多个分区中被保存多份。
(1) 产生原因
(2) 分区策略
分区策略是指决定生产者将消息发送到那个分区的算法。
Kafka提供了默认的分区策略是 轮询,同时kafka也支持用户自己制定。
常见的分区策略:
(3) 小结
分区是实现负载均衡,系统伸缩性,进而实现Kafka高吞吐量的重要机制。在搭建时就应当仔细的规划生产者端的分区策略,避免数据倾斜,使得某些分区成为性能瓶颈,这样极易引发下游消费数据的性能下降。
压缩秉承了用时间换空间的经典trade-off思想,即用CPU的时间去换取磁盘空间或网络I/O传输量,Kafka的压缩算法也是出于这种目的。
(1) 如何压缩
了解Kafka如何压缩消息,首先要清楚Kafka的消息格式,目前kafka有两大类消息格式,社区称之为V1版本和V2版本。
A:共同点
B:不同点:引入V2的目的主要是针对V1版本的一些弊端做了修正
(2) 何时压缩
在kafka中可能发生压缩的地方:生产者端和Broker端
A:生产者端:配置compression.type参数即表示指定类型的压缩算法。
B:有两种情况会样Broker端也可能进行压缩
(3) 何时解压
通常情况下解压发生在消费者端。
A:这个流程是Producer发送的压缩消息到Broker,Broker原封不动的保存起来,当Consumer程序请求这部分消息时,Broker原样发出去,当下消息到的Consumer端后,由Consumer自行解压。
B:Consume之所以知道这些消息是用何种压缩算法的,是因为Kafka会将启用了哪种压缩算法封装到消息集合中,当Consumer读取到消息集合时,就会知道消息所使用的压缩算法。
(4) 压缩算法对比
在Kafka2.1.0版本之前,仅支持GZIP,Snappy和LZ4。2.1.0后还支持Zstandard算法(Facebook开源,能够提供超高压缩比)
A:一个压缩算法的优劣,有两个重要指标:压缩比和压缩/解压缩吞吐量,两者都是越高越好。
B:吞吐量:LZ4>Snappy>zstd和GZIP,压缩比:zstd>LZ4>GZIP>Snappy
(5) 最佳实践
(1) 什么是不丢失
Kafka只对“已提交”的消息(committed message)做有限度持久化保证。 A:“已提交”的定义:Kafka的若干个(可自定义配置为一个或全部)Broker成功接收到,并写入日志后即为以成功提交。 B:有限度的持久化保证:kafka的消息不丢失的前提是N个Broker中至少有一个存活。
(2) 消息丢失场景
A:生产者丢失消息: (1):Kafka Producer是异步发送消息,使用producer.send(msg)发送消息,可以立即得到响应,但不能确定是否真的发送成功。 (2):网络抖动,消息本身不合格都会导致Broker无法正常接收消息 解决:使用带有回调的producer.send(msg,callback),回调可以准确的告诉我们消息是否真的发送成功。
B:消费者丢失消息: (1)Consumer端的位移数据出现异常,导致消息被略过 解决:先消费消息,在更新位移记录(这个可能会导致重复消费问题) (2)多个Consumer实例同时消费,但部分实例消费失败,原因是每个确认消息是否成功消费,位移数据就已经被更新。 解决:如果是多线程异步处理消费消息,consumer,程序就不要开启自动提交位移,让应用程序手动提交。
(3) 最佳实践
A:使用带回调通知的方法,发送消息
B:Producer端设置相关参数:
C:Broker端设置相关参数:
D:Consumer端设置相关参数:
设置enable.auto.commit=false,表示关闭自动提交,使用手动提交位移方式。
(1) 什么是拦截器
拦截器:允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。
Kafka 拦截器分为生产者拦截器和消费者拦截器。
生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。这两种拦截器都支持链的方式,Kafka 会按照添加顺序依次执行拦截器逻辑。
(2) 典型使用场景
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
计算消息从 Producer 端到 Consumer 端平均的处理延时
A. 实现生产者拦截器:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略Jedis初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
//在发送消息前更新总的已发送消息数
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
}
B. 实现消费者拦截器:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; //省略Jedis初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
//在真正消费一批消息前首先更新它们的总延时
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
//累计得到这批消息总的端到端处理延时并更新到Redis中
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
//总延时和总消息数
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
Apache Kafka的所有通信都是基于TCP的。
(1) 为什采用TCP
多路复用请求:multiplexing request,是将两个或多个数据合并到底层—物理连接中的过程。TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。 严格讲,TCP并不能多路复用,只是提供可靠的消息交付语义保证,如自动重传丢失的报文。
(2) 何时创建TCP连接
A. 在创建KafkaProducer实例时
B. 可能在更新元数据后,或在消息发送时
(3) 何时关闭TCP连接
Producer端关闭TCP连接的方式有两种:用户主动关闭、kafka自动关闭
kafka自动关闭时,TCP连接是在Broker端被关闭的,但这个连接请求是客户端发起的,对TCP而言这是被动的关闭,被动关闭会产生大量的CLOSE_WAIT连接。
(1) 消息交付可靠性
常见的可靠性保障有三种:
Kafka默认提供交付可靠性保障是至少一次。
Kafka消息交付可靠性保障以及精确处理一次语义通过两种机制来实现的:幂等性(Idempotence)和事务(Transaction)。
(2) 幂等性(Idempotence)
A. 含义及优点
“幂等”原是数学概念,指某些操作或函数能够被执行多次,但每次得到的结果都不变。
计算机领域的含义:
幂等性的优点:最大的优势是可以安全地重试任何冥等性操作,因为他们不会破坏系统状态
B. 幂等性Producer:
开启:设置props.put(“enable.idempotence”,true)或props.put(ProducerConfig.ENABLE_IDEMPOTENC_CONFIG,true)。
特征:开启后,Kafka自动做消息的重复去重。
实现思路:用空间换取时间,Broker端多保存一些字段,当Producer发送了具有相同字段值的消息后,Broker就可以知道这些消息重复,就将这些消息丢弃。
作用范围:
(3) 事务(Transaction)
A. 事务概念
B. 事务型Producer
开启:
特征:
关键事项:
Consumer Group :Kafka提供的可扩展且具有容错性的消息者机制。
(1) 重要特征
A:组内可以有多个消费者实例(Consumer Instance)。
B:消费者组的唯一标识被称为Group ID,组内的消费者共享这个公共的ID。
C:消费者组订阅主题,主题的每个分区只能被组内的一个消费者消费
D:消费者组机制,同时实现了消息队列模型和发布/订阅模型。
(2) 重要问题
A:消费组中的实例与分区的关系
消费者组中的实例个数,最好与订阅主题的分区数相同,否则多出的实例只会被闲置。一个分区只能被一个消费者实例订阅。
B:消费者组的位移管理方式
C:消费者组的重平衡
重平衡:本质上是一种协议,规定了消费者组下的每个消费者如何达成一致,来分配订阅topic下的每个分区。
触发条件:
影响:Rebalance 的设计是要求所有consumer实例共同参与,全部重新分配所有用分区。并且Rebalance的过程比较缓慢,这个过程消息消费会中止。
(1) 产生原因
A :老版本的Kafka会把位移信息保存在Zk中,当Consumer重启后,自动从Zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性。
B :zk不适用于高频的写操作,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存。实现高持久性和高频写操作。
(2) 特点
A :位移主题是一个普通主题,同样可以被手动创建,修改,删除。。
B :位移主题的消息格式是kafka定义的,不可以被手动修改,若修改格式不正确,kafka将会崩溃。
C :位移主题保存了三部分内容:Group ID,主题名,分区号。
(3) 创建
A :当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。也可以手动创建
B :分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50
C :副本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3
(4) 使用
A :当Kafka提交位移消息时会使用这个主题
B :位移提交得分方式有两种:手动和自动提交位移。
C :推荐使用手动提交位移,自动提交位移会存在问题:只有consumer一直启动设置,他就会无限期地向主题写入消息。
(5) 清理
A :Kafka使用Compact策略来删除位移主题中的过期消息,避免位移主题无限膨胀。
B :kafka提供专门的后台线程定期巡检待compcat的主题,查看是否存在满足条件的可删除数据。
(6) 注意事项
A :建议不要修改默认分区数,在kafka中有些许功能写死的是50个分区
B :建议不要使用自动提交模式,采用手动提交,避免消费者无限制的写入消息。
C :后台定期巡检线程叫Log Cleaner,若线上遇到位移主题无限膨胀占用过多磁盘,应该检查此线程的工作状态。
(1) 什么是重平衡
A :让一个Consumer Group下所有的consumer实例就如何消费订阅主题的所有分区达成共识的过程。
B :在重平衡过程中,所有Consumer实例共同参与,在协调者组件的帮助下,完成订阅分区的分配。
C :整个过程中,所有实例都不能消费任何消息,因此对Consumer的TPS影响很大
(2) 为什要避免重平衡
A :Rebalance影响Consumer端的TPS,因为重平衡过程中消费者不能消费消息
B :Rebalance很慢,如果有数百个消费者实例,整个过程耗时可能达到几个小时
C :Rebalance效率低,这个过程是全员参与,通常不考虑局部性原理,但局部性原理对系统性能提升特别重要。
D :真实的业务场景中,很多Rebalance都是计划外或是不必要的。
(3) 何时会触发重平衡
A :组成员数量发生变化
B :订阅主题数量发生变化
C :订阅主题分区数发生变化
(4) 要避免哪些重平衡
最常见的是消费者数发生变化触发的重平衡,其他的重平衡是不可避免的,但消费者数量变化是可避免的
A :Consumer实例增加
当启动一个配置相同的group.id值的consumer程序时,就是向这个组中增加一个消费者实例,这中情况一般是我们为了提升消费者端的TPS,是计划内的,所以也不用避免。
B :Consumer实例减少
按计划的减少消费者实例,同样不用避免,而计划外的减少触发的重平衡才是要关注的。
(5) 如何避免重平衡
在某些情况下,Consumer实例会被Coordinateor错误地认为“已停止”,进而被踢出Group。这种情况导致的重平衡是需要避免的。
A :Consumer实例不能及时的发送心跳请求
当消费者组完成重平衡后,每个Consumer实例都会定期地向Coordinator发送心跳请求,如这个心跳请求没有被及时发送,Coordinator就会认为该Consumer已经掉线,将其从组中移除,并开启新一轮重平衡。
解决:Consumer端参数设置优化
建议:session.timeout.ms=6s Heartbeat.interval.ms=2s
保证Consumer实例在判定为“dead”之前,能够发送至少3轮的心跳请求,即session.timeout.ms >=3 * heartbeat.interval.ms。
B :Consumer消费时间过长
消费者端处理了一个很重的消费逻辑,耗时较长,导致Consumer端应用程序两次调用poll方法的时间超出设置的最大时间间隔。
解决:将max.poll.interval.ms参数设置较大一些;优化消费者端业务逻辑,压缩消费耗时
C :GC影响
Consumer端的GC表现也会导致频繁的重平衡,频繁的Ful GC会导致长时间的断顿。
解决:JVM调优。
(1) 概念
概念区分
提交位移
(2) 作用及特点
提交位移的作用:
提交位移主要是为了表征Consumer的消费进度,这样当Consumer发生故障重启后,能够从kafka中读取之前提交的位移值,从相应的位置继续消费,避免从头在消费一遍。
位移提交的特点:位移提交的语义保障是由你来负责的,Kafka只会“无脑”地接受你提交的位移。位移提交错误,就会消息消费错误。
(2) 位移提交方式
从用户的角度讲,位移提交分为自动提交和手动提交;从Consumer端的角度而言,位移提交分为同步提交和异步提交。
手动提交最好是同步和异步结合使用,正常用异步提交,如果异步提交失败,用同步提交方式补偿提交。
更精细化的位移管理:使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)。
A :定义
所谓CommitFailedException,是指Consumer客户端在提交位移时出现了错误或异常,并且并不可恢复的严重异常。
B :产生原因
C :解决方案
(1) Kafka Java Consumer 单线程设计原理
在Kafka从0.10.1.0版本开始,KafkaConsumer就变成双线程设计即:用户主线程和心跳线程。
所以,新版本Consumer设计了单线程+轮询的机制。这种设计能够较好的实现非阻塞式的消息获取。
单线程设计优点:
(2) 多线程方案
KafkaConsumer类不是线程安全的(thread-safe)。所有的网络I/O处理都是发生在用户主线程中,所以不能在多线程中共享同一个KafkaConsumer实例,否则程序会抛ConcurrentModificationException异常。
方案 | 描述 | 优点 | 缺点 |
---|---|---|---|
方案一 | 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,负责完整的消息获取,消息处理流程。 | 方便实现,速度快,无线程间交互开销,易于维护分区的消息顺序 | 占用更多的系统资源,线程数受限于主题分区数,扩展性差。线程自己处理消息容易超时,进而引发Rebalance |
方案二 | 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是多个,每个线程维护专属的KafkaConsumer实例,处理消息则交由特定的线程池来做。 | 可独立扩展消费获取线程数和worker线程数,伸缩性好 | 难以维护分区内的消息消费顺序,处理链路拉长,不易于位移提交管理,实现难度高。 |
(1) 何时创建
A :消费者和生产者不同,在创建KafkaConsumer实例时不会创建任何TCP连接。
原因:是因为生产者入口类KafkaProducer在构建实例时,会在后台启动一个Sender线程,这个线程是负责Socket连接创建的。
B :TCP连接是在调用KafkaConsumer.poll方法时被创建。在poll方法内部有3个时机创建TCP连接
消费者程序会创建3类TCP连接:
(2) 何时关闭
和生产者相似,消费者关闭Socket也分为主动关闭和Kafka自动关闭。
若消费者程序中使用了循环的方式来调用poll方法消息消息,以上的请求都会被定期的发送到Broker,所以这些socket连接上总是能保证有请求在发送,从而实现“长连接”的效果。
当第三类TCP连接成功创建后,消费者程序就会废弃第一类TCP连接,之后在定期请求元数据时,会改为使用第三类TCP连接。对于一个运行了一段时间的消费者程序来讲,只会有后面两种的TCP连接。
(1) 为什么要监控
A :对于Kafka消费者,最重要的事情就是监控它们的消费进度(消费的滞后程度)常称为:Consumer Lag
B :Lag的单位是消息数,他直接反映了一个消费者的运行情况。一个正常的消费者的Lag应当很小,设置为0。这表明消费者能够及时地消费生产者生产出来的消息。反之,一个消费者Lag值很大的话表明它无法跟上生产者的速度。
C :如果消费者速度无法匹及生产者的数据,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那些数据就失去了享有Zero Copy技术的条件,不得不从磁盘中读取,进一步拉大了与生产者的差距。并且会越来大。
所以,在实际业务场景中必须时刻关注消费者的消费进度。一旦出现Lag逐步增加的趋势,就要立即定位问题,及时处理。
(2) 如何监控
(3) 方法分析
A :Kafka自带命令
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息 > --describe --group <group 名称 >
<Kafka broker 连接信息 >:主机:端口
<group 名称 > :要监控的消费组的 group.id值
展示的信息:主题,分区,该消费者组最新消费消息的位移值(CURRENT-OFFSET值),每个分区当前最新生产的消息的位移值(LOG-END-OFFSET),LAG(前两者的差值),消费者实例ID,消费者连接Broker的主机名以及消费者的CLENT-ID信息。
B :Kafka Java Consumer API
C :Kafka JMX监控指标
使用Kafka默认提供 的JMX监控指标来监控消费者的Lag值。
(1) 概念
A. 副本机制:
副本机制(Replication):也称为备份机制,通常是指分布式在多台网络互连的机器上保存有相同的数据拷贝。
好处:提供数据冗余、提供高伸缩性、改善数据局部性(将数据放入与用户地理位置相近的地方)。
Kafka的副本机制,只实现了提供数据冗余的价值。
B. 副本
Kafka有主题的概念,每个主题又分为若干个分区。副本的概念是在分区层级下定义的,每个分区配置有若干个副本。
Kafka的副本(Replica),本质是一个只能追加写消息的提交日志。
根据Kafka副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的Broker上,从而能够一定程度上避免部分Broker宕机带来的数据不可用。
(2) 副本角色
A :为解决分区下多个副本的内容一致性问题,常用方案就是采用基于领导者的副本机制。
B :在kafka中,副本分两类:领导者副本和追随者副本。每个分区在创建时都选举一个副本,称为领导者副本,其余的副本自动成为追随者副本。
C :Kafka的副本机制比其他分布式系统严格。Kafka的追随者副本不对外提供服务。所有的请求都要由领导者副本处理。追随者副本唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
Kafka的追随者副本不对外提供服务的原因:
D :当领导者副本所在Broker宕机了,Kafka依托于Zookeeper提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个新的领导者。当老的Leader副本重启回来后,只能作为追随者副本加入到集群中。
基于领导者(Leader-based)的副本机制:
(3) Kafka副本机制的优点
A :方便实现“Read-your-writes”
B :方便实现单调读(Monotonic Reads)
(4) In-sync Replicas(ISR)同步副本
A :追随者副本定期的异步拉取领导者副本中的数据,这存在不能和Leader实时同步的风险。
B :Kafka引入了In-sync Replicas。ISR中的副本都是于Leader同步的副本,相反,不在ISR中的追随者副本就是被认为是与Leader不同步的。
C :Leader 副本天然就在ISR中,即ISR不只是追随者副本集合,他必然包括Leader副本。甚至某些情况下,ISR只有Leade这一个副本。
D :follower副本是否与leader同步的判断标准取决于Broker端参数 replica.lag.time.max.ms参数值。默认为10秒,只要一个Follower副本落后Leader副本的时间不连续超过10秒,那么Kafka就认为该Follower副本与leader是同步的,即使此时Follower副本中保存的消息明显小于Leader副本中的消息。
E :如果同步过程持续慢于Leader副本消息的写入速度,那么replica.lag.time.max.ms时间后,此Follower副本就会被认为是与Leader副本不同步的,因此不能再放入ISR中。此时,kafka会自动收缩ISR的进度,将该副本“踢出”ISR。ISR是一个动态调整的集合,而非静态不变的。
(5) Unclean 领导者选举(Unclean Leader Election)
A :ISR是可以动态调整的,所以会出现ISR为空的情况,由于Leader副本天然就在ISR中,如果ISR为空了,这说明Leader副本也挂掉了,Kafka需要重新选举一个新的Leader。
B :Kafka把所有不在ISR中的存活副本都会称为非同步副本。通常,非同步副本落后Leader太多,如果让这些副本做为新的Leader,就可能出现数据的丢失。在kafka中,选举这种副本的过程称为Unclean领导者选举。
C :Broker端参数unclean.leader.election.enable 控制是否允许Unclean领导者选举。开启Unclean领导者选举可能会造成数据丢失,但它使得分区Leader副本一直存在,不至于停止对外提供服务,因此提升了高可用性。禁止Unclean领导者选举的好处是在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
Apache Kafka 自己定义了组请求协议,用于实现各种交互操作。常见有:
Kafka定义了很多类似的请求格式,所有的请求都是通过TCP网络以Socket的方式进行通讯的。
(1) Broker端的请求处理流程
A :常用请求处理方案
B :Kafka的方案:使用Reactor模式
(2) Kafka的请求处理方式
A :Reactor模式中,多个客户端发送请求到Reactor,Reactor中的请求分发线程Dispatcher会将不同的请求下发到多个工作线程
Acceptor线程只用于请求分发,不涉及具体逻辑处理,因此有很高的吞吐量。而工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。
B :kakfa中,Broker端有个SocketServer组件,类似于Reactor模式中的Dispatcher,他也有对应的Acceptor线程和一个工作线程池,在kafka中,被称为网络线程池
Broker端参数num.network.threads,用于调整该网络线程池的线程数,默认为4,表示每台Broker启动时,会创建3个网络线程,专门处理客户端发送的请求。
C :Acceptor线程采用轮询的方式将入站请求公平的发送到所有网络线程中。
D :当网络线程接收到请求后,Kafka在这个环节又做了一层异步线程池的处理
E :IO线程池中的线程是执行请求逻辑的线程。Broker端参数num.io.threads控制了这个线程数,默认为8,表示每台Broker启动后自动创建8个IO线程处理请求。
F :请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。原因在于Dispatcher只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送Repsone给客户端,所有这些Response没必要放在一个公共的地方。
G :Purgatory组件,专门用来缓存延时请求(Delayed Requset)。
如设置了acks=all的PRODUCE请求,该请求要必须等待ISR中所有副本都接收了消息后才能返回,此时处理该请求的IO线程就必须瞪大其他Broker的写入结果。当请求不能立即处理时,他就会暂存在Purgatory中。待满足了完成条件,IO线程会继续处理该请求,并将Response放入到对应的网络线程的响应队列中
(3) Kafka对请求的处理特点
社区采取的方案是,同时创建两套完全样的组件,实现两类请求的分离。
(1) 重平衡的通知
A :重平衡过程通过消息者端的心跳线程(Heartbeat Thread)通知到其他消费者实例。
B :Kafka Java消费者需要定期地发送心跳请求到Broker端的协调者,以表明它还存活着。
C :重平衡的通知机制是通过心跳线程来完成的,当协调者决定开启新一轮重平衡后,他会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了”REBALANCE_IN_PROGRESS”,就能立即知道重平衡开始了。
D :消费者端的参数 heartbeat.interval.ms的真实用途是控制重平衡通知的频率。
(2) 消费者组状态机
Kafka设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。
A :kafka消费者组状态
(1)Empty:组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。
(2)Dead:组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者保存着当前向它注册过的所有组信息,所谓元数据就是类似于这些注册信息。
(3)PreparingRebalance:消费者组准备开启重平衡,此时所有成员都要重新请求加消费者组
(4)CompletingRebalance:消费者组下所有成员已经加入,各个成员正在等待分配方案。
(5)Stable:消费者组的稳定状态。该状态表明重平衡已经完成,组内成员能够正常消费数据了。
B :Kafka定期自动删除过期位移的条件:消费者处于Empty状态。
如果消费者组停了很长时间(超过7天),那么Kafka很可能就把该组的位移数据删除了。
(3) 消费者端重平衡流程
A :重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。
B :在消费者端,重平衡分为两个步骤:
C :当组内成员加入组时,他会向协调者发送JoinGroup请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
D :通常情况下,第一个发送JoinGroup 请求的成员自动成为领导者。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
E :选出领导者之后,协调者会把消费者组订阅信息封装进JoinGroup请求的响应中,然后发给领导者,由领导统一做出分配方案后,进入下一步:发送SyncGroup请求。
F :领导者向协调者发送SyncGroup请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送SyncGroup请求,只是请求体中并没有实际内容。这一步的目的是让协调者接收分配方案,然后统一以SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。
(4) Broker端重平衡场景剖析
A :新成员入组
当协调者收到新的JoinGroup请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制他们开启新一轮的重平衡。
B :组成员主动离组
消费者实例所在线程或进程调用close()方法主动通知协调者他要退出。这个场景涉及第三类请求:LeaveGroup请求。协调者收到LeaveGroup请求后,依然会以心跳响应的方式通知其他成员。
C :组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。崩溃离组是被动的,协调者通常需要等待一段时间才能感知,这段时间一般是由消费者端参数session.timeout.ms控制的。
D :重平衡时协调者对组内成员提交位移的处理
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后在开启正常JoinGroup/SyncGroup请求发送。
(1) 简介
控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是Apache Zookeeper的帮助下管理和协调整个Kafka集群。 集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器。
特点:控制器是重度依赖Zookeeper。
产生:控制器是被选出来的,Broker在启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举控制器的规则是:第一个成功创建/controller节点的Broker会被指定为控制器。
(2) 功能
A :主题管理(创建,删除,增加分区)
当执行kafka-topics脚本时,大部分的后台工作都是控制器来完成的。
B :分区重分配
Kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能。
C :Preferred领导者选举
Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leade的方案。
D :集群成员管理(新增Broker,Broker主动关闭,Broker宕机)
控制器组件会利用watch机制检查Zookeeper的/brokers/ids节点下的子节点数量变更。当有新Broker启动后,它会在/brokers下创建专属的znode节点。一旦创建完毕,Zookeeper会通过Watch机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化。进而开启后续新增Broker作业。
侦测Broker存活性则是依赖于刚刚提到的另一个机制:临时节点。每个Broker启动后,会在/brokers/ids下创建一个临时的znode。当Broker宕机或主机关闭后,该Broker与Zookeeper的会话结束,这个znode会被自动删除。同理,Zookeeper的Watch机制将这一变更推送给控制器,这样控制器就能知道有Broker关闭或宕机了,从而进行善后。
E :数据服务
控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
(3) 数据保存与故障转移
A. 控制器数据保存:控制器中保存的这些数据在Zookeeper中也保存了一份。每当控制器初始化时,它都会从Zookeeper上读取对应的元数据并填充到自己的缓存中。
B. 控制器故障转移(Failover):当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来替代之前失败的控制器。
(4) 内部设计原理
A :控制器的多线程设计
控制器是多线程的设计,会在内部创建很多线程。如:
这些线程还会访问共享的控制器缓存数据,为了维护数据安全性,控制在代码中大量使用ReetrantLock同步机制,进一步拖慢了整个控制器的处理速度。
B :在0.11版对控制器的低沉设计进了重构
第一个改进(最大改进):把多线程的方案改成了单线程加事件对列的方案。
第二个改进:将之前同步操作Zookeeper全部改为异步操作。
Zookeeper本身的API提供了同步写和异步写两种方式。同步操作zk,在有大量主题分区发生变更时,Zookeeper容易成为系统的瓶颈。
(1) 概念
A :水位:多用于流式处理领域,如Spark Streaming 或Flink框架中都有水位的概念。
在教科书中关于水位定义:在即刻T,任意创建时间(Event Time)为T ’ ,且T ’ <= T的所有事件都已经到达或被观测到,那么T就被定义为水位。在《Streaming System》一书则是这样表述水位:水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。
B :kafka的水位概念:kafka的水位不是时间戳,与时间无关。他是和位置信息绑定的,它是用消息位移来表征的。 Kafka源码使用的表述是高水位。在Kafka中也有低水位(Low Watermark),它是与Kafka删除消息相关的概念。
(2) 高水位作用
关键点:
(3) 高水位更新机制
在Leader副本所在Broker上,还保存了其他Follower副本的LEO值。而其他Broker上仅仅保存该分区的某个Follower副本。Kafka将Leader副本所在Broker上保存的这些Follower副本称为远程副本。
Kafka副本机制在运行过程中,会更新Broker1上Follower副本的高水位和LEO值,同时也会更新Broker0上Leader副本的高水位和LEO,以及所有远程副本的LEO。但它不会更新远程副本的高水位值。Broker0上保存这些远程副本的作用是帮助Leader副本确定其高水位,即分区高水位。
A. Leader副本上的更新机制
处理生产者请求的逻辑:
a. 写入消息到本地磁盘。
b. 更新分区高水位值
1. 获取Leader副本所在Broker端保存的所有远程副本LEO值{LEO-1,LEO-2,……,LEO-n}。
2. 获取Leader副本高水位值:currentHW。
3. 更新currentHW = max(currentHW ,min(leo-1,leo-2,……leo-n)).
处理follwer副本拉取消息的逻辑:
a. 读取磁盘(或页缓存)中的消息数据
b. 使用Follower副本发送请求中的位移值更新远程副本LEO值。
c. 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)
B. Follower副本上的更新机制
从Leader拉取消息的处理逻辑:
a. 写入消息到本地磁盘
b. 更新LEO值
c. 更新高水位值
1. 获取Leader发送的高水位值:currentHW。
2. 获取步骤2中更新过的LEO值:currentLEO。
3. 更新高水位为min(currentHW,currentLEO)。
(4) Leader Epoch
Leader Epoch:用来规避因高水位更新错配导致的各种不一种问题。所谓Leader Epoch大致可以认为是Leader版本。
A :组成:由两部分数据组成。
B :Kafka Broker会在内存中为每个分区都缓存Leader Epoch数据,同时他还会定期地将这些信息持久化到一个checkpoint文件中。
(1) 创建主题
Kafka提供了自带的Kafka-topic脚本用于帮助用户创建主题。
bin/kafka-topic.sh --bootstarp-server broker_host:port --create –topic my_topic --partitions 1 --replication-factor 1
create 表明要创建主题的行为,而partitions和replication factor分别设置了主题的分区数以及每个分区下的副本数。
(2) 查询主题
查询所有主题的列表:
/bin/kafka-topic.sh --bootstrap-server broker_host:port --list
查询单个主题的详细数据:
/bin/kafka-topic.sh --bootstrap-server broker_host:port --describe --topic
(3) 修改主题
A :修改分区
/bin/kafka-topic.sh --bootstrap-server broker_host : port --alter --topic --partitions <新分区数>
区数一定要比原有分区数大。
B :修改主题级别参数:使用kafka-configs脚本修改对应的参数。
修改主题级别的max.message.bytes :
/bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topic --entity-name --alter --add-config max.message.bytes=10485760
这个命令里使用的 –zookeeper,也可以使用 --bootstrap-server,只是他是用来设置动态参数的。
C :变更副本数
使用kafka-reassign-partitions 脚本,增加副本数
D :修改主题限速
这是指设置Leader副本和follower 副本使用的带宽。有时候,需要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。 要做到这个需要先设置leader.replication.throttled.rate和follower.replication.throttled.rate
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
E :主题分区迁移
同样是使用kafka-ressign-partitions脚本。
F :删除主题
/bin/kafka-topic.sh –bootstrap-server broker_host:port --delete --topic
删除主题的操作是异步的,执行完这条命令不代表主题立即就被删除了,它仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启主题删除操作。
(4) 常见主题错误处理
A:主题删除失败
造成主题删除最常见的原因有两个:副本所在Broker宕机了;待删除主题的部分分区依然在执行迁移过程。
解决:
在执行最后一步时,要慎重,因为他可能造成大面积的分区Leader重选举。事实上,仅仅执行前两步也是可以的,只是Controller缓存中没有清空删除主题,不影响使用。
B:_consumer_offset占用太多的磁盘
如果发现这个主题占用了过多的磁盘空间,就要显示的使用jstack 命令查看kafka-log-cleaner-thread前缀线程状态。
一般配置的做法是,一次性在server.properties文件中配置好所有参数后,启动Broker。在需要变更任何参数时,必须要重启Broker。
在1.1.0版本中正是引入了动态Broker参数(Dynamic Broker Configs)。
概念:所谓动态,就是指修改参数值后,无需重启Broker就能立即生效,在server.properties中配置的参数称之为静态参数(Static Configs)。
(1) 分类
(2) 使用场景
(3) 配置存储
A :首先Kafka将动态Broker参数保存在Zookeeper中
B :/config/brokers znode才是真正保存动态Broker参数的地方,该znode下有两大类子节点:
C :参数的优先级别:per-broker参数 > cluster-wide参数 > static参数 > Kafka默认值。
(4) 如何配置
A :使用Kafka自带的Kafka自带的Kafka-configs脚本。
如果要设置cluster-wide范围的动态参数,需要显式指定entity-default。
B :较大几率被动态调整的参值
当需要实现重复消费历史数据的时候,就需要重设消费者组位移
(1) 重设位移的原因
A :Kafka和传统的消费引擎在设计上有很大区别,其中一个比较显著的区别是:Kafka的消息费者读取消息是可以重演的(replayable)
B :如RabbitMQ或ActiveMQ这样的传统消息中间件,他们处理和响应消息的方式是破坏性的(destructive),一旦消息被成功处理,就会被从Broker上删除。
C :Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据,是只读操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此他能够很容易的修改位移值,实现重复消费历史数据的功能。
(2) 重设位移的策略
A :位移维度:根据位移值来重设。直接把消费者的位移值重设成我们给定的值。
B :时间维度:可以给定一个时间,让消费者吧位移调整成大于该时间的最小位移;亦可以给出一段时间间隔,如30分钟,然后让消费者直接将位移调回30分钟之前的位移值。
七种策略:
(3) 重设的方法
A :通过消费者API来实现
B :通过Kafka-consumer-groups命令行脚本来实现
消费者API注意事项:
总之:使用Java API的方式来实现重设策略的主要入口方法,就是seek方法。
2.2版本提供了30多个Shell脚本
(1) 引入原因
(2) 使用及功能
使用:需要在工程中显示的地增加依赖。
九大类功能:
(3) 工作原理
A :从设计上来看,AdminClient是一个双线程的设计:前端主线程和后端I/O线程。
B :AdminClient在内部大量使用生产者—消费者模型将请求生产和处理解耦
C :前端主线程会创建一个名为Call的请求对象实例。
该实例的有两个主要任务:
一旦创建好Call实例,前端主线程会将其放入到新请求队列(New Call Queue)中,此时,前端主线程的任务就算完成了。他只需要等待
结果返回即可。剩下的所有事情都是后端I/O线程的工作了。 D :后端I/O线程,该线程使用了3个队列(新请求队列、待发送请求队列、处理中请求队列)来承载不同时期的请求对象
使用3个队列的原因:
(4) 构造和销毁AdminClient实例
A :切记它的的完整路径是org.apche.kafka.clients.admin.AdminClient。
B :创建AdminClient实例和创建KafkaProducer或KafkaConsumer实例的方法是类似的,你需要手动构造一个Properties对象或Map对象,然后传给对应的方法。
(1) 监控维度
监控维度 | 指标 |
---|---|
主机监控 | 含义:指监控Kafka集群Broker所在的节点机器的性能。 常见的主机监控指标:机器负载、CPU使用率、内存使用率、磁盘I/O使用率、网络I/O使用率、TCP连接数、打开文件数和inode使用情况 |
JVM监控 | 3个指标:FullGC发生频率和时长、活跃对象大小和应用线程总数 |
集群监控 | 5个方法:查看Broker进程是否启动,端口是否建立;查看Broker端关键日志;查看Broker端关键线程的运行状态;查看Broker端的关键JMX指标;监控Kafka客户端 |
(4) 主流的Kafka监控框架
监控框架 | |
---|---|
JMXTool工具 | 可以实时查看KafkaJMX指标,不过只能应用于简单的监控场景 |
Kafka Manager | 作为一款强大的Kafka开源监控框架,它提供了丰富的实时监控指标以及适当的管理功能,非常适合一般的Kafka集群监控 |
Burrow | 目前提供的功能十分有限,但质量是很有保证的 |
JMXTrans+InfluxDB+Grafana | 可以在一套监控框架中同时监控企业的多个关键技术组件 |
ConfluentControlCenter | 目前已知的最强大的Kafka监控框架了。实时地监控 Kafka 集群,方便操作和搭建基于 Kafka 的实时流处理应用;提供了统一式的主题管理功 |
调优目标:高吞吐量、低延时。
优化漏斗:自上而下分为应用程序层、框架层、JVM层和操作系统层。层级越靠上,调优的效果越明显。
调优类型 | 建议 |
---|---|
操作系统 | 挂载文件系统时禁掉atime更新;选择ext4或XFS文件系统;swap空间的设置;页缓存大小 |
JVM(堆设置和GC收集器) | 将JVM 堆大小设置成 6~8GB;建议使用 G1 收集器,方便省事,比 CMS 收集器的优化难度小 |
Broker端 | 保持服务器端和客户端版本一致 |
应用层 | 要频繁地创建Producer和Consumer对象实例;用完及时关闭;合理利用多线程来改善性能 |
调优吞吐量(TPS)和延时:
调优 TPS:
参数列表 | |
---|---|
Broker端 | 适当增加num.replica.fetchers参数值,但不超过CPU核数 |
调优GC参数以避免经常性的Full GC | |
Producer端 | 适当增加batch.size参数值,比如从默认的16KB增加到512KB或1MB |
适当增加linger.ms参数值,比如10~100 | |
设置compression.type=lz4或zstd | |
设置acks=0或1 | |
设置retries=0 | |
如果多线程共享同一个Producer实例,则增加buffer.memory参数值 | |
Consumer端 | 采用多Consumer进程或线程同时消费数据 |
增加fetch.min.bytes参数值,比如设置成1KB或更大 |
调优延时:
参数列表 | |
---|---|
Broker端 | 适当设置num.replica.fetchers值 |
Producer端 | 设置linger.ms=0 |
不启用压缩,即设置compression.type=none | |
设置ackes=1 | |
Consumer端 | 设置fetch.min.bytes=1 |
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。