在当今的技术环境中,多云架构几乎成为了企业的标配。这种架构为企业提供了更多的选择和议价能力,有助于避免对单一供应商的依赖。同时,多云架构还能提高系统的高可用性,降低因单点故障带来的风险。然而,随之而来的是复杂性的增加。例如,在多云部署的情景中,以 RocketMQ 为例,可能会出现 producer 和 consumer 分布在不同云集群的情况。在这种场景下,位于 B 云的 consumer 可能无法接收到 A 云中 producer 生成的消息。
因此,在多云环境中部署的 RocketMQ 需要一种特定的通信机制,以实现消息在不同云环境间的选择性投递。
希沃主要采用 Java 技术栈,而 RocketMQ 本身也是基于 Java 开发的。然而,为何我们选择使用 Rust 来编写 RocketMQ 的代理呢?首先,Java 在内存基础消耗方面较大,且其垃圾回收(GC)机制是一个显著的弱点。我们希望找到一种没有 GC、内存使用可控且对开发者友好的语言,以开发我们所有后续的高性能中间件。C++ 是一个很好的替代选择,我们内部有核心系统就是用 C++ 开发的,它具有超高性能和极低的内存占用。
然而,C++ 也存在其自身的问题,例如,如果不小心,很容易遇到内存相关的问题(如内存泄漏、野指针等),而且其包管理也不尽人意。因此,我们转向了 Rust。在编写了几个示例项目并验证其可行性后,我们陆续开发了基于 Rust 的多个工具,包括 RDB 分析工具、Zookeeper 代理、模拟 Redis Slave 以实现可靠的跨云同步,以及 JVM 内存 dump 分析工具。此次 RocketMQ 代理的开发,标志着 Rust 在我们的在线上环境中的正式应用。
为了在多云环境中有效地管理 RocketMQ 的消息传递,我们采用了一个直接而有效的方法。我们在 A 云中的 producer 端部署了一个称为 hyper-consumer 的组件。这个 hyper-consumer 负责订阅那些需要在 B 云中被消费 Topic 和TAGS。
接下来,hyper-consumer 执行的消费逻辑相当于它本身变身为 B 云中的一个 producer。它将在 A 云中消费的消息重新投递到 B 云。通过这种方式,hyper-consumer 充当了两个云环境之间的“中间商”,像一座桥梁一样实现了消息的跨云转运。
实际上,这个初步的想法仔细一琢磨是存在很多问题的:
这些问题的出现使得我们需要进一步深入思考和改进这一方案,以确保跨云消息传递的可靠性和效率。
在与业界大厂的交流中,得知他们在多云的架构设计中,为了更好地进行流量管控,他们几乎对每个中间件都部署了一个代理层。以 RocketMQ 为例,一些公有云服务提供商在其多云架构中,通过代理层实现了对 RocketMQ 4.X 和 RocketMQ 5.X 版本的兼容,主要是涉及到 4.X 版本的 Remoting 协议和 5.X 版本的 grpc/protobuf 协议)
基于这一思路,我们也可以采用类似的代理机制来解决跨云消息传递的问题。通过部署一个 proxy,不仅可以解决消息传递的问题,还可以带来其他额外的好处,比如:
于是有了下面这样的架构图
在这种架构中,我们会部署两个关键的 proxy:一个用于 NameServer,另一个用于 Broker。整个流程大致分为以下四个步骤:
GetRouteInfoByTopic
的请求进行特殊处理名词解释:在代理模型中,downstream 一般是指发起请求的客户端,upstream 一般是指被代理的服务端(类比 Nginx)
简单来说,proxy 无非就是作为中间层做两件事情:
在这个过程中,proxy 可以对来自 downstream 的请求做一定的修改、处理,也可以对来自 upstream 的响应做同样的处理。
协议模块作为最底层的模块,主要是实现 rocketmq 的 Remoting 协议(主要参考了 JAVA 的 4.X 版本),主要提供:
不过首先遇到的挑战就是,得手搓 RocketMQ 的协议解析,因为我们还在使用 4.x 版本 RocketMQ,官方的 RocketMQ Client 只支持 5.x 的协议,首先得手搓一个协议解析器。好在 RocketMQ 的协议够简单,又有 Java 版本的代码可以参考,于是很快就撸了一个协议的 codec。详细的协议介绍可参考「给 wireshark 写一个 RocketMQ 协议解析的 lua 插件」
RocketMQ 命令都以 RemotingCommand 结构体的传递,对应的 rust 结构体如下:
这里使用 tokio 来作为底层的网络通信框架,这里实现一个 tokio 的 PktDecoder
代理模块就是整个 proxy 最核心的模块,包含了所有的代理逻辑,根据 RocketMQ 4.X 的 Remoting 协议,结合 JAVA 的原生实现,实际上就是请求分发器(Handler)基于请求携带的 code 字段来区分本次请求的类型,根据类型找到对应的处理器(Processor)来处理本次请求。
talk is cheap, show you the code:(好的代码不需要注释🐶):
#[async_trait::async_trait]
pub trait Handler: Send + Sync {
async fn on_req(&self, proxy_name: &str, req: &mut RemotingCommand);
async fn on_resp(&self, req_code: i32, resp: &mut RemotingCommand);
}
#[async_trait::async_trait]
pub trait Processor: Send + Sync {
async fn process_request(&self, _: &str, _: &mut RemotingCommand) {
// default nothing to do
}
async fn process_response(&self, _: &mut RemotingCommand) {
// default nothing to do
}
}
前面起到过代理的类型分为下面这两种:
值得庆幸的是,namesrv 和 broker 的协议是一样的,其中目前比较关键的消息类型有下面这些:
对于SendMessage(10),这个应该是3.X版本的发送消息的code,目前业务大部分都是基于4.X版本,所以这个code算是已经弃用了,暂时可以不考虑
请求类型(code) | 代理类型 | 说明 | 代理逻辑 |
---|---|---|---|
GetRouteInfoByTopic(105) | namesrv-proxy | producer从namesrv端获取Topic路由信息的请求。 | 将upstream(namesrv)返回的路由信息中的broker地址修改为 broker-proxy 的地址,这个是让producer通过broker-proxy发送消息的关键。 |
SendMessageV2(310) | broker-proxy | producer 向broker发送消息的请求。 | 计算是否需要将本消息也投递到另外一个云上(多云投递消息)。 |
SendBatchMessage(320) | broker-proxy | producer 向broker发送 批量 消息的请求。 | 同上。 |
PullMessage(11) | broker-proxy | consumer从broker拉取信息的请求(实际上push、pull模式底层都是基于这个来实现的)。 | 根据consumer在多云的部署情况以及该Topic的消息是否需要重复消费的配置,决定返回什么消息让consumer消费。 |
这里还有小问题,broker proxy 收到一个到来的消息,该怎么判断这个消息是否需要投递到其他云上呢?其实非常简单,主要依据是目标云上有没有 在线的consumer。如果没有在线的 consumer,则不进行投递。
由于跨云网络的不稳定性和较高的时延,实测从阿里云到腾讯云,非同城(距离很近)也至少需要 6ms,这种网络环境下,直接同步进行消息投递不是一个好的选择。同时也不能在投递缓慢或者网络中断时,把所有消息全部缓存在内存中,因此,为了确保数据的安全和可靠传输,我们需要引入一个数据持久化层。
存储模块主要用于 RocketMQ 消息的持久化,可以在某些特定情况下(例如由于网络临时不可用导致消息暂时转发失败时)将消息持久化保存下来,这里选择了 RocksDB 作为持久化方案。RocksDB 因其高效和稳定的性能,已被许多大型开源数据库项目采用,比如 TiKV 等。
我们将尽可能使用一个固定大小 channel 缓存待投递的数据,一旦内存中的 channel 达到其容量上限,则将消息存储到 RocksDB 中,然后不断的 drain 其中的数据进行处理。
这样可以保证在跨云网络中断时,MQ 消息不会丢失。
服务模块是最顶层的功能模块,主要包括:
线程模型如下图:
我们通过 prometheus 暴露采集点,监控 RocksDB 的消息堆积、处理时延等,这里不展开。
根据实际情况,RocketMQ Proxy 是以 RocketMQ 集群为单位进行部署的,覆盖整个 RocketMQ 集群的 name server、broker,以单个 name server,2 个 broker 的 RocketMQ 集群为例:
在 JSON 规范中,键(key)必须是字符串:
RocketMQ 返回的 brokerDatas 中的 brokerAddrs 中的 broker 列表都是以整数作为 key。
可恶的 FastJson 居然纵容了这一行为,(如果你坚守一下你的底线,可能 RocketMQ 协议就不是这样了)
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
// 这里的 key 是 Long
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
但是在 rust serde-json
看来,这个 json 是非法的,反序列时会失败,无奈只能去修改 serde-json
的源码。
通过上面的方式,临时 hack 解决了这一问题,但也不用想着合并到 serde 主分支了,作者估计大概率不会接受这种奇怪的 pr。
rust-rocksdb 库的 WriteBatchIterator trait 没有提供 put_cf
和 delete_cf
方法,导致当使用多 column family 时,无法遍历到数据。
RocksDB 提供了一个丰富的 C++ 接口,然而在 Rust 绑定的版本中,这些回调方法并没有被完全暴露,活脱脱一个阉割版。
于是继续修改 rust-rocksdb 的代码,解决了这个问题。
在第一版的压测过程中,发现即使给 broker-proxy 分配了 1G ~ 2G 的内存,有时还是会导致容器 OOM,即使不 OOM,内存还是涨得非常高,并且内存不会随着压测流量结束而降低。
这个措施纯属是“算命”,尝试将程序的内存分配器改为早期 Rust 默认使用的 jemalloc(可以提供更好的多核性能以及更好的避免内存碎片)。
Cargo.toml:
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.4"
main.rs:
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
尝试将内存 dump 下来分析,在使用 jemalloc
的基础上,添加一个简单的 dump 接口(编译选项中需要设置 profile.release.debug
为 true):
const PROF_ACTIVE: &'static [u8] = b"prof.active\0";
const PROF_DUMP: &'static [u8] = b"prof.dump\0";
const PROFILE_OUTPUT: &'static [u8] = b"profile.out\0";
fn set_prof_active(active: bool) {
let name = PROF_ACTIVE.name();
name.write(active).expect("Should succeed to set prof");
}
#[get("/dump_profile")]
async fn dump_profile() -> impl Responder {
let name = PROF_DUMP.name();
name.write(PROFILE_OUTPUT).expect("Should succeed to dump profile");
HttpResponse::Ok().body("dump profile success")
}
查看内存 dump 的结果,发现了端倪:
内存中存在大量的 trace 和 span 相关的数据,看上去都是由 console_subscriber 引入的,这个是早期开发为了让 tokio 线程可视化方便调试而加入的组件,是非必须的,去掉之后,果然内存消耗只有 100M ~ 200M。
关于 console-subscriber 对内存的使用问题,可以参考 github 的 issue
早期的压测过程还发现了一个不太符合预期的问题:在给予比较大的压测流量的情况下,通过 top 查看到的 CPU 的变化的上下波动比较大,CPU 会经常出现空闲时间,变化也非常频繁,平均负载只有 100%~200%,预期的 CPU 表现应该是充分利用 CPU,避免在流量比较大的时候 CPU 出现空闲时间。
简单梳理了一下,猜测是消费线程比较少的问题,此时的消息消费线程模型大致为:
OriginConsumeThread.png
这里瓶颈可能出现在 Drainer 线程,目前只启用单个 Drainer 线程的原因是需要维护一个“消费位点”,如果多个线程去 fetch 消息的话,这个消费位点维护起来比较困难,那么,即使 Deliver 线程再多,没有待处理消息的话也无济于事,CPU 利用率不高。
RocketMQ 消息到达 Proxy 的时候同步入库是为了避免消息丢失,然而 Drainer 发送消息到队列后会马上更新消费位点,即使 Deliver 线程有失败重新保存到 RocksDB 的措施,也会有可能丢失内存中的消息。
又由于 Drainer 线程是批量从 RocksDB 中 fetch 消息,没有比较优雅的办法保证绝对不丢消息且不重复投递消息,除非一条同步投递并逐步更新消费位点,这个对性能影响太大了。
简单来说就是为了高性能,允许丢消息,被丢弃的消息数最大可能是 queue 的大小,如果确实需要保证不丢消息的话,还可以选择同步分发消息(经测试,性能下降较大,QPS 大概只有异步的 1/6)
那么,在允许宕机时内存中的消息丢失的情况下,提升 CPU 利用率可以从跳过 Drainer 线程入手:
经过上面的简单改造之后,CPU 利用率提升了。
基于实际生产环境的公有云:
proxy 的相关配置如下:
resources:
limits:
cpu: 4
memory: 512Mi
requests:
cpu: 1
memory: 100Mi
在较小消息、60000 QPS 的情况下,资源消耗大致为:
压测过程的 top 如下:
在较大消息、60000 QPS 的情况下,资源消耗大致为:
压测过程的 top 如下:
不得不说,rust 内存消耗真的是太优秀了,不像某ava,启动完一两 G 内存没了。