PhxQueue 是微信开源的一款基于 Paxos 协议实现的高可用、高吞吐和高可靠的分布式队列,保证At-Least-Once Delivery,在微信内部广泛支持微信支付、公众平台等多个重要业务。
Github开源地址:https://github.com/Tencent/phxqueue
请PhxQueue给一个Star !欢迎提出你的issue和PR!
消息队列作为成熟的异步通信模式,对比常用的同步通信模式,有如下优势:
微信初期使用的分布式队列(称为旧队列)是微信后台自研的重要组件,广泛应用在各种业务场景中,为业务提供解耦、缓存、异步化等能力。
旧队列以 Quorum NRW 作为同步机制,其中 N=3、W=R=2,刷盘方式采用异步刷盘,兼顾了性能和可用性。
随着业务发展,接入业务种类日益增多,旧队列逐渐显得力不从心,主要不足如下:
另外旧队列还存在出队去重、负载均衡等其他方面的问题亟需改善。上述种种促使了我们考虑新的方案。
Kafka 是大数据领域常用的消息队列,最初由 LinkedIn 采用 Scala 语言开发,用作 LinkedIn 的活动流追踪和运营系统数据处理管道的基础。
其高吞吐、自动容灾、出入队有序等特性,吸引了众多公司使用,在数据采集、传输场景中发挥着重要作用,详见Powerd By Kafka。
但我们充分调研了 Kafka,认为其在注重数据可靠性的场景下,有如下不足:
1. Kafka 性能与同步刷盘的矛盾
Kafka 在开启配置 log.flush.interval.messages=1,打开同步刷盘特性后,吞吐会急剧下降。
该现象由如下因素导致:
2. Kafka replica 同步设计上的不足
Kafka replica 同步设计概要:
Kafka Broker leader 会跟踪与其保持同步的 follower 列表,该列表称为ISR(即in-sync Replica)。如果一个 follower 宕机,或者落后太多,leader 将把它从ISR中移除。
该同步方式偏重于同步效率,但是在可用性方面表现略显不足:
也就是说,任意一个 Broker 故障时,读写成功率会在一段时间内降为0。
在同步复制场景下,需要等待所有节点返回ack。
通过对比 Kafka replica 与 Paxos 的表现,我们认为在同步方式上 Paxos 是更好的选择:
所以,我们基于旧队列,用 Paxos 协议改造了同步逻辑,并且进行了包括同步刷盘之内的多项优化,完成了 PhxQueue。
PhxQueue 目前在微信内部广泛支持微信支付、公众平台等多个重要业务,日均入队达千亿,分钟入队峰值达一亿。
其设计出发点是高数据可靠性,且不失高可用和高吞吐,同时支持多种常见队列特性。
PhxQueue支持的特性如下:
PhxQueue 由下列5个模块组成。
Store 作为队列存储,引入了 PhxPaxos 库,以 Paxos 协议作副本同步。只要多数派节点正常工作及互联,即可提供线性一致性读写服务。
为了提高数据可靠性,同步刷盘作为默认开启特性,且性能不亚于异步刷盘。
在可用性方面,Store 内有多个独立的 paxos group,每个 paxos group 仅 master 提供读写服务,平时 master 动态均匀分布在 Store 内各节点,均衡接入压力,节点出灾时自动切换 master 到其它可用节点。
Producer 作为消息生产者,根据 key 决定消息存储路由。相同 key 的消息默认路由到同一个队列中,保证出队顺序与入队顺序一致。
Consumer 作为消费者,以批量拉取的方式从 Store 拉消息,支持多协程方式批量处理消息。
Consumer 以服务框架的形式提供服务,使用者以实现回调的方式,根据不同主题(Topic),不同处理类型(Handler)定义具体的消息处理逻辑。
Scheduler 的作用是,收集 Consumer 全局负载信息, 对 Consumer 做容灾和负载均衡。当使用者没有这方面的需求时,可以省略部署 Scheduler,此时各 Consumer 根据配置权重决定与队列的处理关系。
部署 Scheduler 后,Scheduler leader 与所有 Conusmer 维持心跳,在收集 Consumer 的负载信息的同时,反向调整 Consumer 与队列的处理关系。
当 Scheduler leader 宕机了后,Scheduler 依赖下述分布式锁服务选举出新 leader,不可用期间仅影响 Consumer 的容灾和负载均衡,不影响 Consumer 的正常消费。
Lock 是一个分布式锁,其接口设计非常通用化,使用者可以选择将 Lock 独立部署,提供通用分布式锁服务。
Lock 在 PhxQueue 中的作用有如下两点:
(1). 为 Scheduler 选举 leader; (2). 防止多个 Consumer 同时处理一条队列。
Lock 同样也是可选择部署的模块:
若部署了 Scheduler,就必须部署 Lock 为 Scheduler 选举出 leader; 否则,若业务对重复消费不敏感,可选择不部署 Lock。
这里所指的重复消费场景是:若省略部署 Scheduler 的话,Consumer 需要通过读取配置得知可处理的队列集合;当队列有变更(如队列缩扩容)时,各 Consumer 机器上的配置改变有先有后,这时各 Consumer 在同一时间看到的配置状态可能不一样,导致一段时间内两个 Consumer 都认为自己该消费同一个队列,造成重复消费。Lock 的部署可以避免该场景下的重复消费。(注意,即使省略部署 Lock,该场景仅造成重复消费,而不会造成乱序消费)
PhxQueue Store 通过 PhxPaxos 协议进行副本复制。
PhxPaxos 的工程实现方式分为三层:app 层负责处理业务请求,paxos 层执行 paxos同步过程,状态机层更新业务状态。
其中,app 层发起 paxos 提议,paxos 层各节点通过 paxos 协议共同完成一个 paxos log 的确认,之后状态机以 paxos log 作为的输入作状态转移,更新业务的状态,最后返回状态转移结果给 app 层。
一致的状态机层,加上来自 paxos 层的一致输入,就产生一致的状态转移,从而保证多个节点强一致。
这里我们要基于 PhxPaxos 在状态机层实现一个队列,就需要作如下概念映射:
整体上队列状态机和 paxos 能很好地切合。
未经优化的 Paxos 协议并未解决同步刷盘的写放大问题。而且,其副本同步效率不如 Kafka。
原因是,Kafka 的副本同步是流式批量的,而 Paxos 协议是以 paxos log 为单位串行同步,每个 paxos log 的同步开销是 1个RTT + 1次刷盘。
在多DC部署的场景下,ping 时延可达4ms,这样会导致单个 paxos group 的理论最高 TPS 仅250。
我们采用多 paxos group 部署 以及 Group Commit 的方式来同时解决同步刷盘的写放大问题以及Paxos吞吐问题。
如上图, 我们部署多个paxos group,以 paxos group 作为 Group Commit 的单位,一个 paxos group 内对应多个queue,将多个queue在一段时间内入队的数据合并在一起,当等待耗时或积累数据数目达到阀值,才会触发一次Paxos同步和同步刷盘,等待期间前端阻塞。
与Kafka的Producer批量逻辑相比,在存储层以 Group Commit 进行批量合并的好处如下:
(1). 业务层无需关注如何组织请求进行批量; (2). 在存储层以 paxos group 为单位的聚合效果比上层聚合效果更好。
下面分别从设计、性能、存储层 failover 过程三方面对比 PhxQueue 与 Kafka。
PhxQueue 架构虽然与 Kafka 等常见分布式队列类似,但设计上仍有不少独特之处。为了能让对 Kafka 有一定了解的读者更方便地了解 PhxQueue,下面列出了两者的对比。
注:以下对比基于相同的数据可靠性场景:少数派节点失效,不会造成数据丢失,且整体依旧可用。
CPU: 64 x Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz
Memory: 64 GB
Network: 10 Gigabit Ethernet
Disk: SSD Raid 10
Cluster Nodes: 3
Ping: 1ms
开启 Producer Batch:
关闭 Producer Batch:
以上场景,PhxQueue 瓶颈在 cpu,使用率达70% ~ 80%。
主要对比杀死存储层的一个节点后,对整体吞吐的影响。
表现:
将 replica.lag.time.max.ms 从 10s 调整为 60s(延长时间方便观察),然后 kill Broker 0,挑选3个 partition,观察 ISR 变化如下:
第一阶段(未 kill Broker 0):
Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
第二阶段(kill Broker 0 后持续8s):
Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
第三阶段(持续1分钟左右):
Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
第四阶段(至此入队成功率完全恢复):
Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1
其中,第二/三阶段标红处对应的partition入队成功率受损:
而实际观察,第二/三阶段期间完全没吞吐,原因是压测工具不断报连接失败,停止了写入。
压测工具输出:
30551 records sent, 6107.8 records/sec (0.06 MB/sec), 1733.9 ms avg latency, 5042.0 max latency.
30620 records sent, 6117.9 records/sec (0.06 MB/sec), 1771.9 ms avg latency, 5076.0 max latency.
30723 records sent, 6123.8 records/sec (0.06 MB/sec), 1745.4 ms avg latency, 5009.0 max latency.
30716 records sent, 6127.3 records/sec (0.06 MB/sec), 1841.1 ms avg latency, 5299.0 max latency.
30674 records sent, 6133.6 records/sec (0.06 MB/sec), 1621.3 ms avg latency, 4644.0 max latency.
>>> kill Broker 0 here (入队成功率受损)>>>
10580 records sent, 123.4 records/sec (0.00 MB/sec), 1537.1 ms avg latency, 84236.0 max latency. <<---吞吐下降严重
11362 records sent, 132.3 records/sec (0.00 MB/sec), 1658.3 ms avg latency, 84232.0 max latency.
11367 records sent, 132.3 records/sec (0.00 MB/sec), 1582.4 ms avg latency, 84228.0 max latency.
11236 records sent, 130.9 records/sec (0.00 MB/sec), 1694.2 ms avg latency, 84240.0 max latency.
11406 records sent, 132.8 records/sec (0.00 MB/sec), 1650.5 ms avg latency, 84233.0 max latency.
压测工具连接Broker失败日志:
[2017-08-16 15:38:22,844] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-16 15:38:22,859] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
原因分析:
Kafka Broker leader 是通过 Controller 选举出来的,ISR 列表是 leader 维护的。
前者的的租约是 Controller 定义的,后者的租约是 Broker 配置 replica.lag.time.max.ms 指定的。
所以,第二阶段持续时间较短,是 Controller 的租约时间决定的,第三阶段持续时间较长,是 replica.lag.time.max.ms 决定的。
当 Broker 0 被 kill 时,前者影响本来 Broker 0 是 leader 的 1/3 partitions 的入队成功率,后者影响 Broker 0 作为 follower 的 2/3 partitions 的入队成功率。
表现:
测试过程:
将 Store master 租约时长从10s调整为60s(延长时间方便观察),然后kill store 0,观察某 Producer 入队成功率:
关闭换队列重试特性:
>>> kill store 0 here (入队成功率受损)>>>
-------------------------------------------
-- total: 192323
-- time(ms): 10015
-- qps: 19203.49
-- routine_sleep: 73.88%
-- retcode cnt percent
-- -1 22097 11.49 <<--- 失败:连接失败
-- 0 125905 65.47 <<--- 成功:仍有66%成功率
-- 10102 44321 23.05 <<--- 失败:提示需要重定向到 master
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 0 0.00
-- < 5 610 0.32
-- < 10 7344 3.82
-- < 20 18937 9.85
-- < 50 36067 18.75
-- < 100 6971 3.62
-- < 200 20239 10.52
-- < 500 59059 30.71
-- < 1000 30601 15.91
-- >= 1000 12495 6.50
>>> (入队成功率完全恢复)>>>
-------------------------------------------
-- total: 198955
-- time(ms): 10001
-- qps: 19893.51
-- routine_sleep: 98.00%
-- retcode cnt percent
-- 0 198955 100.00 <<--- 成功:100%成功率
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 2 0.00
-- < 5 5895 2.96
-- < 10 30830 15.50
-- < 20 65887 33.12
-- < 50 95403 47.95
-- < 100 753 0.38
-- < 200 185 0.09
-- < 500 0 0.00
-- < 1000 0 0.00
-- >= 1000 0 0.00
开启换队列重试特性:
>>> kill store 0 here (入队成功率受损)>>>
-------------------------------------------
-- total: 134752
-- time(ms): 10001
-- qps: 13473.85
-- routine_sleep: 77.43%
-- retcode cnt percent
-- -202 14 0.01 <<--- 失败:超时
-- -1 2712 2.01 <<--- 失败:连接失败
-- 0 127427 94.56 <<--- 成功:仍有94%成功率
-- 10102 4572 3.39 <<--- 失败:提示需要重定向到 master
-- 10105 27 0.02 <<--- 失败:master 未选举出来
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 4 0.00
-- < 5 3284 2.44
-- < 10 10704 7.94
-- < 20 22109 16.41
-- < 50 32752 24.31
-- < 100 4541 3.37
-- < 200 4331 3.21
-- < 500 11265 8.36
-- < 1000 19706 14.62
-- >= 1000 26056 19.34
>>> (入队成功率完全恢复)>>>
-------------------------------------------
-- total: 198234
-- time(ms): 10014
-- qps: 19795.69
-- routine_sleep: 94.36%
-- retcode cnt percent
-- 0 198234 100.00 <<--- 成功:100%成功率
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 0 0.00
-- < 5 3875 1.95
-- < 10 22978 11.59
-- < 20 53000 26.74
-- < 50 87575 44.18
-- < 100 6204 3.13
-- < 200 6468 3.26
-- < 500 11963 6.03
-- < 1000 5637 2.84
-- >= 1000 534 0.27
小结:
在存储层 failover 过程中,PhxQueue 和 Kafka 的入队成功率均有一定时长的下降,PhxQueue 的入队成功率在66% ~ 100%,Kafka 的入队成功率在0% ~ 33%; PhxQueue 开启换队列重试特性后,failover 过程中入队成功率保持在90+%; PhxQueue 和 Kafka 均能自动切换 master,最终入队成功率完全恢复。
PhxQueue 在存储层做了很多的努力:实现了 master 自动切换,且仍然保证线性一致,切换期间仍然高可用;保证了同步刷盘的吞吐,其性能不亚于异步刷盘。
另外实现了大部分队列实用特性,例如出入队顺序一致、多订阅、限速、消息重放等,适用于各种业务场景。
目前 PhxQueue 已在微信内部大规模使用,也正式开源。
我们将保持 PhxQueue 开源版本与内部版本的一致,欢迎读者试用并反馈意见。
Github开源地址:https://github.com/Tencent/phxqueue
请PhxQueue给一个Star ! 欢迎提出你的issue和PR
转载自【腾讯开源】公众号,腾讯官方开源资讯,期待您的关注。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。