点击上方“服务端思维”,选择“设为星标”
回复”669“获取独家整理的精选资料集
回复”加群“加入全国服务端高端社群「后端圈」
作者 | incentchma
出品 | 腾讯技术工程
作者:vincentchma,腾讯 IEG 后台开发工程师
分布式消息队列中间件是是大型分布式系统中常见的中间件。消息队列主要解决应用耦合、异步消息、流量削锋等问题,具有高性能、高可用、可伸缩和最终一致性等特点。消息队列已经逐渐成为企业应用系统内部通信的核心手段,使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等,此外,利用数据库(如 Redis、MySQL 等)也可实现消息队列的部分基本功能。
单机消息队列可以通过操作系统原生的进程间通信机制来实现,如消息队列、共享内存等。比如我们可以在共享内存中维护一个双端队列:
消息产出进程不停地往队列里添加消息,同时消息消费进程不断地从队尾有序地取出这些消息。添加消息的任务我们称为 producer,而取出并使用消息的任务,我们称之为 consumer。这种模式在早期单机多进程模式中比较常见, 比如 IO 进程把收到的网络请求存入本机 MQ,任务处理进程从本机 MQ 中读取任务并进行处理。
单机 MQ 易于实现,但是缺点也很明显:因为依赖于单机 OS 的 IPC 机制,所以无法实现分布式的消息传递,并且消息队列的容量也受限于单机资源。
即使用存储组件(如 Mysql 、 Redis 等)存储消息, 然后在消息的生产侧和消费侧实现消息的生产消费逻辑,从而实现 MQ 功能。以 Redis 为例, 可以使用 Redis 自带的 list 实现。Redis list 使用 lpush 命令,从队列左边插入数据;使用 rpop 命令,从队列右边取出数据。与单机 MQ 相比, 该方案至少满足了分布式, 但是仍然带有很多无法接受的缺陷。
针对上述缺点,redis 5.0 开始引入 stream 数据类型,它是专门设计成为消息队列的数据结构,借鉴了很多 kafka 的设计,但是随着很多分布式 MQ 组件的出现,仍然显得不够友好, 毕竟 Redis 天生就不是用来做消息转发的。
随着时代的发展,一个真正的消息队列,已经不仅仅是一个队列那么简单了,业务对 MQ 的吞吐量、扩展性、稳定性、可靠性等都提出了严苛的要求。因此,专用的分布式消息中间件开始大量出现。常见的有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等等。
消息队列本质上是一个消息的转发系统, 把一次 RPC 就可以直接完成的消息投递,转换成多次 RPC 间接完成,这其中包含两个关键环节:
1.消息转储;
2.消息投递:时机和对象;
基于此,消息队列的整体设计思路是:
MQ 组件要实现和生产者以及消费者进行通信功能, 这里涉及到 RPC 通信问题。消息队列的 RPC,和普通的 RPC 没有本质区别。对于负载均衡、服务发现、序列化协议等等问题都可以借助现有 RPC 框架来实现,避免重复造轮子。
存储可以做成很多方式。比如存储在内存里,存储在分布式 KV 里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次 failover,最终投递出去也未尝不可。常见的消息队列普遍两种形式都支持。
从速度来看,理论上,文件系统>分布式 KV(持久化)>分布式文件系统>数据库,而可靠性却相反。还是要从支持的业务场景出发作出最合理的选择。
MQ 的高可用,依赖于 RPC 和存储的高可用。通常 RPC 服务自身都具有服务自动发现,负载均衡等功能,保证了其高可用。存储的高可用, 例如 Kafka,使用分区加主备模式,保证每一个分区内的高可用性,也就是每一个分区至少要有一个备份且需要做数据的同步。
push 和 pull 模型各有利弊,两种模式也都有被市面上成熟的消息中间件选用。
1.慢消费
慢消费是 push 模型最大的致命伤,如果消费者的速度比发送者的速度慢很多,会出现两种恶劣的情况:
1.消息在 broker 的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在 broker 端保存。
2.broker 推送给 consumer 的消息 consumer 无法处理,此时 consumer 只能拒绝或者返回错误。
而 pull 模式下,consumer 可以按需消费,不用担心自己处理不了的消息来骚扰自己,而 broker 堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull 模式比较合适。
2.消息延迟与忙等
这是 pull 模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次 pull 取到消息了还可以继续去 pull,如果没有 pull 取到则需要等待一段时间重新 pull。
即消费者应该在什么时机消费消息。一般有以下三种方式:
至于如何选择,也要结合具体的业务场景来决定。比如,对及时性要求高的数据,可用采用方式 3 来完成。
不管是 JMS 规范中的 Topic/Queue,Kafka 里面的 Topic/Partition/ConsumerGroup,还是 AMQP(如 RabbitMQ)的 Exchange 等等, 都是为了维护消息的消费关系而抽象出来的概念。本质上,消息的消费无外乎点到点的一对一单播,或一对多广播。另外比较特殊的情况是组间广播、组内单播。比较通用的设计是,不同的组注册不同的订阅,支持组间广播。组内不同的机器,如果注册一个相同的 ID,则单播;如果注册不同的 ID(如 IP 地址+端口),则广播。
例如 pulsar 支持的订阅模型有:
通常会在公共存储上维护广播关系,如 config server、zookeeper 等。
常见的高级特性有可靠投递、消息丢失、消息重复、事务等等,他们并非是 MQ 必备的特性。由于这些特性可能是相互制约的,所以不可能完全兼顾。所以要依照业务的需求,来仔细衡量各种特性实现的成本、利弊,最终做出最为合理的设计。
如何保证消息完全不丢失?
直观的方案是,在任何不可靠操作之前,先将消息落地,然后操作。当失败或者不知道结果(比如超时)时,消息状态是待发送,定时任务不停轮询所有待发送消息,最终一定可以送达。但是,这样必然导致消息可能会重复,并且在异常情况下,消息延迟较大。
例如:
即对于任何不能确认消息已送达的情况,都要重推消息。但是,随着而来的问题就是消息重复。在消息重复和消息丢失之间,无法兼顾,要结合应用场景做出取舍。
当 broker 把消息投递给消费者后,消费者可以立即确认收到了消息。但是,有些情况消费者可能需要再次接收该消息(比如收到消息、但是处理失败),即消费者主动要求重发消息。所以,要允许消费者主动进行消费确认。
对于 push 模式,要求支持分区且单分区只支持一个消费者消费,并且消费者只有确认一个消息消费后才能 push 另外一个消息,还要发送者保证发送顺序唯一。
对于 pull 模式,比如 kafka 的做法:
但是这样也只是实现了消息的分区有序性,并不一定全局有序。总体而言,要求消息有序的 MQ 场景还是比较少的。
Kafka 是一个分布式发布订阅消息系统。它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用(如 Storm、Spark、Flink)。在大数据系统中,数据需要在各个子系统中高性能、低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理,但 Kafka 出现了,它可以高效的处理实时消息和离线消息,降低编程复杂度,使得各个子系统可以快速高效的进行数据流转,Kafka 承担高速数据总线的作用。
kafka实现原理6
一个典型的 kafka 集群包含若干 Producer,若干个 Broker(kafka 支持水平扩展)、若干个 Consumer Group,以及一个 zookeeper 集群。Producer 使用 push 模式将消息发布到 broker。consumer 使用 pull 模式从 broker 订阅并消费消息。多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中。kafka 通过 zookeeper 管理集群配置及服务协同。
这样就组成了一个高性能的分布式消息发布和订阅系统。Kafka 有一个细节是和其他 mq 中间件不同的点,producer 发送消息到 broker 的过程是 push,而 consumer 从 broker 消费消息的过程是 pull,主动去拉数据。而不是 broker 把数据主动发送给 consumer。
Producer 发送消息到 broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition 里,这样就实现了负载均衡。如果一个 Topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈,而有了 Partition 后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。
如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
即使是顺序存取,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka 使用了页缓存和零拷贝技术。当进程准备读取磁盘上的文件内容时, 操作系统会先查看待读取的数据是否在页缓存中,如果存在则直接返回数据, 从而避免了对物理磁盘的 I/O 操作;
如果没有命中, 则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存, 之后再将数据返回给进程。一个进程需要将数据写入磁盘, 那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在, 则会先在页缓存中添加相应的页, 最后将数据写入对应的页。被修改过后的页也就变成了脏页, 操作系统会在合适的时间把脏页中的数据写入磁盘, 以保持数据的 一 致性。
Kafka 中大量使用了页缓存, 这是 Kafka 实现高吞吐的重要因素之 一 。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的, 但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync),可以通过参数来控制。
同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。但是实际使用上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。
页缓存的好处:
零拷贝技术可以减少 CPU 的上下文切换和数据拷贝次数。
常规方式
应用程序一次常规的数据请求过程,发生了 4 次拷贝,2 次 DMA 和 2 次 CPU,而 CPU 发生了 4 次的切换。(DMA 简单理解就是,在进行 I/O 设备和内存的数据传输的时候,数据搬运的工作全部交给 DMA 控制器,而 CPU 不再参与任何与数据搬运相关的事情)
零拷贝的方式
通过零拷贝优化,CPU 只发生了 2 次的上下文切换和 3 次数据拷贝。
Kafka 允许进行批量发送消息,先将消息缓存在内存中,然后一次请求批量发送出去,这种策略将大大减少服务端的 I/O 次数。
Kafka 还支持对消息集合进行压缩,Producer 可以通过 GZIP 或 Snappy 格式对消息集合进行压缩,Producer 压缩之后,在 Consumer 需进行解压,虽然增加了 CPU 的工作,但在对大数据处理上,瓶颈在网络上而不是 CPU,所以这个成本很值得。
Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader,然后无论该 Topic 的 Replication Factor 为多少,Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其本地 Log。
每个 Follower 都从 Leader pull 数据。这种方式上,Follower 存储的数据顺序与 Leader 保持一致。Follower 在收到该消息后,向 Leader 发送 ACK, 并把消息写入其 Log。一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,该消息就被认为已经 commit 了,Leader 将增加 HW 并且向 Producer 发送 ACK。
为了提高性能,每个 Follower 在接收到数据后就立马向 Leader 发送 ACK,而非等到数据写入 Log 中。因此,对于已经 commit 的消息,Kafka 只能保证它被存于多个 Replica 的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被 Consumer 消费。Consumer 读消息也是从 Leader 读取,只有被 commit 过的消息才会暴露给 Consumer。Kafka Replication 的数据流如下图所示:
对于 Kafka 而言,定义一个 Broker 是否“活着”包含两个条件:
Leader 会跟踪与其保持同步的 Replica 列表,该列表称为 ISR(即 in-sync Replica)。如果一个 Follower 宕机,或者落后太多,Leader 将把它从 ISR 中移除。这里所描述的“落后太多”指 Follower 复制的消息落后于 Leader 后的条数超过预定值或者 Follower 超过一定时间未向 Leader 发送 fetch 请求。Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。
完全同步复制要求所有能工作的 Follower 都复制完,这条消息才会被认为 commit,这种复制方式极大的影响了吞吐率(高吞吐率是 Kafka 非常重要的一个特性)。异步复制方式下,Follower 异步的从 Leader 复制数据,数据只要被 Leader 写入 log 就被认为已经 commit,这种情况下如果 Follower 都复制完都落后于 Leader,而如果 Leader 突然宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower 可以批量的从 Leader 复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了 Follower 与 Leader 的差距。
一条消息只有被 ISR 里的所有 Follower 都从 Leader 复制过去才会被认为已提交。这样就避免了部分数据被写进了 Leader,还没来得及被任何 Follower 复制就宕机了,而造成数据丢失(Consumer 无法消费这些数据)。而对于 Producer 而言,它可以选择是否等待消息 commit。这种机制确保了只要 ISR 有一个或以上的 Follower,一条被 commit 的消息就不会丢失。
Leader 故障
leader 发生故障后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Kafka 在 ZooKeeper 中动态维护了一个 ISR(in-sync replicas),这个 ISR 里的所有 Replica 都跟上了 leader,只有 ISR 里的成员才有被选为 Leader 的可能。在这种模式下,对于 f+1 个 Replica,一个 Partition 能在保证不丢失已经 commit 的消息的前提下容忍 f 个 Replica 的失败。
LEO:每个副本最大的 offset。
HW:消费者能见到的最大的 offset,ISR 队列中最小的 LEO。
follower 发生故障后会被临时踢出 ISR 集合,待该 follower 恢复后,follower 会 读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步数据操作。等该 follower 的 LEO 大于等于该 partition 的 HW,即 follower 追上 leader 后,就可以重新加入 ISR 了。
由于 Broker 存储着特定分区的数据, 因此,不管是 Broker 还是分区的扩缩容,都是比较复杂的,属于典型的“有状态服务”扩缩容问题。接下来,我们看一下 Pulsar 是怎么针对 kafka 的不足进行优化的。
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。在消息领域,Pulsar 是第一个将存储计算分离云原生架构落地的开源项目。
在 kafka 的基础上,把数据存储功能从 Broker 中分离出来,Broker 仅面向生产者、消费者提供数据读写能力,但其自身并不存储数据。而在 Broker 层下面使用 Bookie 作为存储层,承担具体的数据存储职责。在 Pulsar 中,broker 的含义和 kafka 中的 broker 是一致的,就是一个运行的 Pulsar 实例, 提供多个分区的读写服务。由于 broker 层不在承担数据存储职责,使得 Broker 层成为无状态服务。这样一来,Broker 的扩缩容就变得非常简单。
相比之下,服务存储集于一体的 Kafka 就非常难以扩容。
1.在 Kafka 分区(Partition)概念的基础上,按照时间或大小,把分区切分成分片(Segment)。
2.同一个分区的分片,分散存储在集群中所有的 Bookie 节点上。
3.同一个分片,拥有多个副本,副本数量可以指定,存储于不同的 Bookie 节点。
和 Kafka 一样,Pulsar 也使用了顺序读写和零拷贝等技术来提高系统的性能。
此外,Pulsar 还设计了分层缓存机制,在服务层和存储层都做了分层缓存,来提高性能。
分片存储解决了分区容量受单节点存储空间限制的问题,当容量不够时,可以通过扩容 Bookie 节点的方式支撑更多的分区数据,也解决了分区数据倾斜问题,数据可以均匀的分配在 Bookie 节点上。
Broker 和 Bookie 灵活的容错以及无缝的扩容能力让 Apache Pulsar 具备非常高的可用性,实现了无限制的分区存储。
在 Pulsar 中 Broker 是无状态的,可以通过增加节点的方式实现快速扩容。当需要支持更多的消费者或生产者时,可以简单地添加更多的 Broker 节点来满足业务需求。Pulsar 支持自动的分区负载均衡,在 Broker 节点的资源使用率达到阈值时,会将负载迁移到负载较低的 Broker 节点。新增 Broker 节点时,分区也将在 Brokers 中做平衡迁移,一些分区的所有权会转移到新的 Broker 节点。
存储层的扩容,通过增加 Bookie 节点来实现。通过资源感知和数据放置策略,流量将自动切换到新的 Apache Bookie 中,整个过程不会涉及到不必要的数据搬迁。即扩容时,不会将旧数据从现有存储节点重新复制到新存储节点。
如图所示,起始状态有四个存储节点,Bookie1, Bookie2, Bookie3, Bookie4,以 Topic1-Part2 为例,当这个分区的最新的存储分片是 SegmentX 时,对存储层扩容,添加了新的 Bookie 节点,BookieX,BookieY,那么当存储分片滚动之后,新生成的存储分片, SegmentX+1,SegmentX+2,会优先选择新的 Bookie 节点(BookieX,BookieY)来保存数据。
如下图,假设当存储分片滚动到 SegmentX 时,Broker2 节点失败。此时生产者和消费者向其他的 Broker 发起请求,这个过程会触发分区的所有权转移,即将 Broker2 拥有的分区 Topic1-Part2 的所有权转移到其他的 Broker(Broker3)。
由于数据存储和数据服务分离,所以新 Broker 接管分区的所有权时,它不需要复制 Partiton 的数据。新的分区 Owner(Broker3)会产生一个新的分片 SegmentX+1, 如果有新数据到来,会存储在新的分片 Segment x+1 上,不会影响分区的可用性。
即当某个 Broker 实例故障时,整个集群的消息存储能力仍然完好。此时,集群只是丧失了特定分区的消息服务,只需要把这些分区的服务权限分配给其他 Broker 即可。
注意,和 Kafka 一样, Pulsar 的一个分区仍然只能由一个 Broker 提供服务,否则无法保证消息的分区有序性。
如下图,假设 Bookie 2 上的 Segment 4 损坏。Bookie Auditor 会检测到这个错误并进行复制修复。Bookie 中的副本修复是 Segment 级别的多对多快速修复,BookKeeper 可以从 Bookie 3 和 Bookie 4 读取 Segment 4 中的消息,并在 Bookie 1 处修复 Segment 4。如果是 Bookie 节点故障,这个 Bookie 节点上所有的 Segment 会按照上述方式复制到其他的 Bookie 节点。
所有的副本修复都在后台进行,对 Broker 和应用透明,Broker 会产生新的 Segment 来处理写入请求,不会影响分区的可用性。
基于上述的设计特点,Pulsar 提供了很多特性。
Pulsar 另外一个有吸引力的特性是提供了读写分离的能力,读写分离保证了在有大量滞后消费(磁盘 IO 会增加)时,不会影响服务的正常运行,尤其是不会影响到数据的写入。读写分离的能力由 Bookie 提供,简单说一下 Bookie 存储涉及到的概念:
Entry 的读写入过程下图所示,数据的写入流程:
数据的读取流程:
一般在进行 Bookie 的配置时,会将 Journal 和 Ledger 存储磁盘进行隔离,减少 Ledger 对于 Journal 写入的影响,并且推荐 Journal 使用性能较好的 SSD 磁盘,读写分离主要体现在:
所以,数据写入是主要是受 Journal 磁盘的负载影响,不会受 Ledger 磁盘的影响。另外,Segment 存储的多个副本都可以提供读取服务,相比于主从副本的设计,Apache Pulsar 可以提供更好的数据读取能力。
通过以上分析,Apache Pulsar 使用 Apache BookKeeper 作为数据存储,可以带来下列的收益:
Pulsar 提供了多种订阅方式来消费消息,分为三种类型:独占(Exclusive),故障切换(Failover)或共享(Share)。
当消费者断开连接时,所有传递给它但是未被确认(ack)的消息将被重新分配和组织,以便发送给该订阅上剩余的剩余消费者。
消息确认(ACK)的目的就是保证当发生故障后,消费者能够从上一次停止的地方恢复消费,保证既不会丢失消息,也不会重复处理已经确认(ACK)的消息。在 Pulsar 中,每个订阅中都使用一个专门的数据结构–游标(Cursor)来跟踪订阅中的每条消息的确认(ACK)状态。每当消费者在分区上确认消息时,游标都会更新。
Pulsar 提供两种消息确认方法:
上图说明了单条确认和累积确认的差异(灰色框中的消息被确认并且不会被重新传递)。对于累计确认,M12 之前的消息被标记为 Acked。对于单独进行 ACK,仅确认消息 M7 和 M12, 在消费者失败的情况下,除了 M7 和 M12 之外,其他所有消息将被重新传送。
— 本文结束 —