消息中间件一般主要用来做 异步处理、应用解耦、流量削峰、日志处理 等方面。
一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。
异步处理
比如有一个订单系统,还要一个库存系统,用户下订单后要调用库存系统来处理,直接调用话,库存系统出现问题咋办呢?
应用解耦
举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间高密度访问绝对不可以,起码要加入一个MQ来实现削峰。
流量削峰
用户通过 WebUI 访问发送请求到时候后端如何接受跟处理呢一般?
日志处理
消息中间件具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ、炙手可热的Kafka、阿里巴巴自主开发RocketMQ等。
对比 | ActiveMQ | RabbitMQ | RocketMQ | Kafka | ZeroMQ |
---|---|---|---|---|---|
单机吞吐量 | 比RabbitMQ低 | 2.6w/s(消息做持久化) | 11.6w/s | 17.3w/s | 29w/s |
开发语言 | Java | Erlang,基于AMQP协议 | Java | Scala/Java | C |
主要维护者 | Apache | Mozilla/Spring | Alibaba | Apache | iMatix |
成熟度 | 成熟 | 成熟 | 开源版本不够成熟 | 比较成熟 | 只有C/PHP等版本成熟 |
订阅形式 | 点对点(p2p)、广播(发布-订阅) | 提供了4种:direct, topic ,Headers和fanout。fanout就是广播模式 | 基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式 | 基于topic以及按照topic进行正则匹配的发布订阅模式 | 点对点(p2p) |
持久化 | 支持少量堆积 | 支持少量堆积 | 支持大量堆积 | 支持大量堆积 | 不支持 |
顺序消息 | 不支持 | 不支持 | 支持 | 支持 | 不支持 |
性能稳定性 | 好 | 好 | 一般 | 较差 | 很好 |
集群方式 | 支持简单集群模式,比如’主-备’,对高级集群模式支持不好 | 支持简单集群,'复制’模式,对高级集群模式支持不好 | 常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master | 天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave | 不支持 |
管理界面 | 一般 | 较好 | 一般 | 无 | 无 |
RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP:
AdvancedMessageQueuingProtocol:高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ模型
由 Exchange、Queue、RoutingKey 三个才能决定一个从 Exchange 到 Queue 的唯一的线路。
simple模式 生产者产生消息,将消息放入队列,消费者监听消息队列,如果队列中有消息就消费掉,消息被拿走后会自动从队列中删除,存在隐患需要消费者设置ACK确认,消费者处理完后要及时发送ack给队列,否则会造成内存溢出。
简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。
work工作模式 生产者将消息放入队列,消费者可以有多个。一般有两种模式:
fanout模式 类似公众号的订阅跟发布,属于 fanout 模式,不处理路由键。不需要指定routingKey,我们只需要把队列绑定到交换机, 消息就会被发送到所有到队列中:
direct:处理路由键,需要指定routingKey,此时生产者发送数据到MQ的时候会指定key,任务队列也会指定key,只有key一样消息才会被传送到队列中。如下图
direct模式 缺点:路由key必须要明确,无法实现规则性模糊匹配。
将路由键跟某个模式匹配,生产者会带 routingKey,但是消费者的MQ会带模糊routingKey:
topic模式
#
表示匹配 >=1个字符。*
表示匹配一个。如果需要指定模式一般是在消费者端设置,灵活性调节。
模式 | 生产者Queue | 生产者exchange | 生产者routingKey | 消费者exchange | 消费者queue | routingKey |
---|---|---|---|---|---|---|
Simple(简单模式少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
WorkQueue(多个消费者少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
fanout(publish/subscribe模式) | 不指定 | 指定 | 不指定 | 指定 | 指定 | 不指定 |
direct(路由模式) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消费者 routingKey 精确指定多个 |
topic(主题模糊匹配) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消费者 routingKey 进行模糊匹配 |
生成者生产消息后消息带有 routing Key,通过routing Key 消费者队列被绑定到交换器上,消息到达交换器根据交换器规则匹配,常见交换器如下:
信道是生产消费者与rabbit通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,就是说 RabbitMQ 在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ 都有唯一的ID来保证信道私有性,对应唯一的线程使用。用信道而不用 TCP 的原因是由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。
消息丢失主要分为 生产者丢失消息、消息列表丢失消息、消费者丢失消息。
RabbitMQ 提供 transaction 和 confirm 模式来确保生产者不丢消息。
transaction 机制就是说:发送消息前,开启事务(channel.txSelect),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务channel.txCommit()。事务卡顿会导致后面无法发送,官方说加入事务机制MQ会降速250倍。
confirm(发送方确认模式)模式用的居多:一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个从1开始的唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个包含消息的唯一ID 的 ACK给生产者,这就使得生产者知道消息已经正确到达目的队列了,如果 RabbitMQ 没能处理该消息,则会发送一个 Nack (not acknowledged) 消息给你,你可以进行重试操作。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和 confirm 机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样,如果消息持久化磁盘之前,RabbitMQ 挂了后生产者收不到Ack信号,生产者会自动重发。
通过如下持久化设置,即使 RabbitMQ 挂了重启后也能恢复数据。
关于持久化其实是个权衡问题,持久化可能会导致系统QPS下降,所以一般仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的持久化不会导致系统性能瓶颈。
消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息。
解决方案:处理消息成功后,手动回复确认消息。消费者跟消息队列的连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息,保证数据的最终一致性。
注意点:
消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。
消息重复消费的场景大概可以分为 生产者端重复消费 和 消费者端重复消费,解决办法是是通过幂等性来保证重复消费的消息不对结果产生影响即可。
顺序性 必要性:生产者的信息是[插入、更新、删除],消费者执行顺序是[删除、插入、更新],这是跟预期不一致的。
出现消费乱序一般是如下两种情况:
多个消费者乱序
多线程乱序
一个队列保证前后
内存队列实现顺序
RabbitMQ 是基于主从(非分布式)做高可用性的。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
单机版的 就是 Demo 级别,生产系统一般没人用单机模式。
在 N 台机器上启动 N 个 RabbitMQ 实例。创建的 queue 只会放在一个 RabbitMQ 实例上,但每个MQ实例都 同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费时如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。让集群中多个节点来服务某个 queue 的读写操作来提高吞吐量。
RabbitMQ 的高可用模式,在镜像集群模式下,你创建的 queue无论元数据还是 queue 里的消息都会存在于多个实例上,每个 RabbitMQ 节点都有这个 queue 的全部数据的。写消息到 queue 的时候都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
死信 Dead Letter 是 RabbitMQ 中的一种消息机制,当消费消息时队列里的消息出现以下情况那么该消息将成为死信。死信消息会被RabbitMQ进行特殊处理,如果配置了 死信队列 信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃:
死信队列 并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机只不过是用来接受死信的普通交换机,所以可以为任何类型,比如Direct、Fanout、Topic。
适用场景:
在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,在系统因为参数解析、数据校验、网咯拨打等导致异常后通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息。
死信消息的生命周期:
死信消息是 RabbitMQ 为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,明白死信队列运行机制后就知道这些 Exchange 和 Queue 想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。
TTL(Time To Live) 是 RabbitMQ 中一个 消息 或 队列 的属性,如果一条消息设置了 TTL属性或者进入了有 TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为死信。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000); // ms
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
区别:
延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。一般用在如下场景:
保证顺序性
这种时候只能操作临时扩容,以更快的速度去消费数据了。具体操作步骤和思路如下:
消息挤压处理
在RabbitMQ 中有推模式跟拉模式,平时开发多为推模式。
一般是个开放题,考察有没有从架构角度整体构思和设计的思维以及能力。不求看过源码起码但的知道基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好(强行为下篇Kafka做铺垫)。