
在分布式系统中,消息中间件是实现异步解耦、流量削峰的核心组件。RocketMQ作为Apache顶级项目,凭借高吞吐、低延迟、高可用的特性,成为阿里等大厂核心业务的消息中枢。本文将从基础认知、架构设计、开发实战、核心原理四个维度,带你吃透RocketMQ。
RocketMQ前身为阿里自研的MetaQ(参考Kafka架构,Java开发),2012年开源,2017年9月成为Apache顶级项目,与ActiveMQ、Kafka、Pulsar同属Apache消息中间件生态。
目前分为社区开源版和阿里云商业版(Aliware MQ),本文基于4.7.1版本展开,支持Java、C/C++、Python、Go四种开发语言,可支撑双十一万亿级消息流转,TPS达几十万。
RocketMQ的架构由六大核心组件构成,各司其职实现消息的生产、路由、存储与消费。
组件 | 核心作用 | 关键特性 |
|---|---|---|
NameServer | 轻量级路由中心,管理Broker注册与发现 | AP架构,放弃强一致性;2. Broker每30s发心跳,10s探活,120s无心跳则剔除;3. 客户端30s拉取一次路由 |
Broker | 消息存储与转发的核心进程,单机支撑10万QPS | 支持主从副本,默认读写在Master,slaveReadEnable=true时Slave可参与读负载;2. 集中式存储所有Topic消息 |
Topic | 消息的逻辑分类(非物理存储单元) | 自动创建(autoCreateTopicEnable=true);2. 与生产/消费者为多对多关系 |
Producer | 消息生产者,仅向Master节点写数据 | 定时拉取路由,与Broker建立TCP长连接;2. 支持批量、顺序、事务、延迟等特殊消息 |
Consumer | 消息消费者,可连接Master和Slave | 支持Pull(长轮询)和Push(基于Pull封装)模式;2. 消费模式分集群(轮询)和广播(全量接收) |
MessageQueue | 逻辑分片(类似Kafka的Partition),实现消息负载与水平扩展 | 由writeQueueNums定数量,readQueueNums定消费线程;2. 读写队列建议一致,否则可能导致消息漏消费 |
早期MetaQ曾用ZooKeeper做服务注册,但RocketMQ仅需轻量级元数据管理:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>核心是指定生产组和NameServer地址,构造Message时必填Topic,可选Tag/Keys做消息过滤与索引:
DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
producer.setNamesrvAddr("192.168.8.147:9876");
producer.start();
// 构造消息并发送
Message msg = new Message("test_topic", "TagA", "OrderID001", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);通过订阅Topic(支持Tag通配符),注册监听器并返回消费状态:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
consumer.setNamesrvAddr("192.168.8.147:9876");
consumer.subscribe("test_topic", "*"); // 订阅所有Tag的消息
consumer.registerMessageListener((msgs, context) -> {
msgs.forEach(msg -> System.out.println(new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>在application.properties配置核心参数:
rocketmq.name-server=192.168.8.147:9876
rocketmq.producer.group=springboot_producer_group
rocketmq.producer.send-message-timeout=3000RocketMQTemplate,支持同步、异步、单向三种发送方式:@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMsg() {
rocketMQTemplate.syncSend("springboot_topic:TagA", "Spring Boot集成RocketMQ");
}@RocketMQMessageListener注解监听Topic,指定消费模式:@RocketMQMessageListener(topic = "springboot_topic", consumerGroup = "springboot_consumer_group")
public class MsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("收到消息:" + msg);
}
}实现局部有序需满足三大条件:
consumeMode=ORDERLY),底层通过TreeMap排序offset并加锁保证有序。基于“半消息+二次确认+消息回查”实现分布式事务一致性:
开源版支持18级固定延迟(如level3对应10s,最高2h),商业版可自定义时间:
// 设置延迟等级
msg.setDelayTimeLevel(3);原理是Broker将延迟消息暂存到系统Topic,到期后投递到目标Topic。
RocketMQ采用“CommitLog集中存储+ConsumeQueue索引”的设计:
消费者启动/增减时,默认20s执行一次Rebalance,支持立即触发,提供6种分配策略(默认连续分配),队列数建议多于消费者数以实现均匀负载。
RECONSUME_LATER,消息进入%RETRY%重试队列,按延迟等级重试16次;%DLQ%死信队列,需人工介入处理。RocketMQ凭借轻量架构、高性能存储和丰富的消息机制,成为分布式系统的核心消息中间件。无论是基础的消息收发,还是复杂的分布式事务、顺序消息场景,都能提供稳定可靠的支撑。掌握其核心原理与实战技巧,可大幅提升分布式系统的异步通信能力。
