最近刚学完RabbitMQ,顺便整理了一下常用的面试题,用于总结跟回顾,也供各位大佬参考,如有不对的地方,欢迎指出哈!
举个例子:如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式:
A 过一段时间去调用 B 的查询 api 查询
A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。
这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
RabbitMQ
RabbitMQ
是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ
是一个快递站,一个快递员帮你传递快件。RabbitMQ
与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
RabbitMQ
各组件的功能AMQP
实体服务。**Connection** **极大减少了操作系统建立** **TCP connection** **的开销**
RabbitMQ
常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。RoutingKey
,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据RoutingKey
:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey
,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”
。不得不看一下经典的图了,如下:point_down:
AMQP 协议模型由三部分组成:生产者、消费者和服务端,执行流程如下:
可以认为是无限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。
根据 AMQP 协议规定,消息体的大小由 64-bit 的值来指定,所以你就可以知道到底能发多大的数据了
消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
PS:(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)
拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue(消息队列)而已,确实是麻烦点;
或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
生产者发送消息RabbitMQ Server 消息丢失
RabbitMQ Server中存储的消息丢失
RabbitMQ Server中存储的消息分发给消费者者丢失
发送过程中存在网络问题,导致消息没有发送成功
代码问题,导致消息没发送
消息没有持久化,服务器重启导致存储的消息丢失
消费端接收到相关消息之后,消费端还没来得及处理消息,消费端机器就宕机了
处理消息存在异常
针对上面的情况,确保消息不丢失
生产者发送消息RabbitMQ Server 消息丢失解决方案:
常用解决方案:发送方确认机制(publisher confirm)
开启AMQP的事务处理(不推荐)
RabbitMQ Server中存储的消息丢失解决方案:
消息回退:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者
设置持久化:保证重启过程中,交换机和队列也是持久化的
RabbitMQ Server到消费者消息丢失解决方案:
手动ack确认机制
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack
的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认分为三种:
**单独发布确认**:这是一种简单的确认方式,它是一种**同步确认发布**的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,
waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:**发布速度特别的慢,**因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
**批量发布确认**:上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:**当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息**。当然这种方案仍然是同步的,也一样阻塞消息的发布。
**异步发布确认**:异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
为什么不推荐呢,因为它是同步的,一条消息发送之后会使发送端阻塞,以等待RabbitMQ Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低,这就跟单独发布确认一样。
如何使用:在生产者发送消息之前,通过channel.txSelect开启一个事务,接着发送消息, 如果消息投递server失败,进行事务回滚channel.txRollback,然后重新发送, 如果server收到消息,就提交事务channel.txCommit
第一种保证消息丢失,只能够保证发送方发送消息成功到达交换机,若此时服务器存在问题或者绑定的routingKey不正确,导致消息发送失败,那么消息最终也会丢失。
**采用消息回退:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者**
设置持久化
源码:
mandatory参数
true:交换机无法将消息进行路由时,会将该消息返回给生产者
false:如果发现消息无法进行路由,则直接丢弃
public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException {
this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。
这时需要采用**备份交换机**了
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,
就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
具体代码请参考这篇:
上面我们的角度是站在生产者的方向,但是如果服务器重启了,此时交换机和队列都不存在了,消息存在也发送不了,这时需要把交换机和队列都持久化。
/\*\*
\* 生成一个队列
\* 1.队列名称
\* 2.队列里面的消息是否持久化 默认消息存储在内存中
\* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
\* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
\* 5.其他参数
\*/
channel.queueDeclare(QUEUE\_NAME, false, false, false, null);
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答
//将自动应答关闭
boolean autoAck = false;
channel.basicConsume(TASK\_QUEUE\_NAME, autoAck, deliverCallback, consumerTag -> { });
由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用**信道**的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。
支持。在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列。
在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。
消费端配置手动ACK确认机制
结合数据库进行状态标记处理
支持事务消息。前面在第9题中保证生产者不丢失消息,提到可以使用AMQP的事务,但是它是同步的,所以不怎么推荐使用
事务的实现主要是对信道(Channel)的设置,主要方法如下:
channel.txSelect() 声明启动事务模式
2.channel.txCommit() 提交事务
3.channel.txRollback()回滚事务
消息持久化,当然前提是队列必须持久化
声明队列必须设置持久化 durable 设置为 true.
消息推送投递模式必须设置持久化,deliveryMode 设置为 2(持久)。
消息已经到达持久化交换器。
消息已经到达持久化队列。
由于特定的**原因导致** **queue** **中的某些消息无法被消费**,这样的消息如果没有后续的处理,就变成了死信消息
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
可以用于实现延迟队列
支持。延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
订单在十分钟之内未支付则自动取消
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
用户注册成功后,如果三天内没有登陆则进行短信提醒
用户发起退款,如果三天内没有得到处理则通知相关运营人员
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
消息设置TTL
配置了死信队列
控制台页面:添加一个
x-max-priority
在这里插入图片描述生产者添加优先级,案例代码
public class Product {
private static final String QUEUE\_NAME = "hello";
public static void main(String[] args) throws Exception {
try(Channel channel = RabbitMQConfig.getChannel()){
//给消息赋予一个 priority 属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i < 11; i++) {
String msg = "info" + i;
if(i==5){
channel.basicPublish("", QUEUE\_NAME, basicProperties, msg.getBytes());
}else{
channel.basicPublish("", QUEUE\_NAME, null, msg.getBytes());
}
System.out.println("发送消息完成:" + msg);
}
}
}
}
消费者队列中代码添加优先级
public class Consumer {
private static final String QUEUE\_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQConfig.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> map = new HashMap<>();
map.put("x-max-priority", 10);
channel.queueDeclare(QUEUE\_NAME, true, false, false, map);
System.out.println("消费者等待启动接收消息......");
DeliverCallback deliverCallback = (consumerTag, delivery) ->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
};
channel.basicConsume(QUEUE\_NAME, true, deliverCallback, (consumerTag) ->{
System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}
}
队列可能会产生消息堆积
队列对性能(吞吐量)的要求不是非常高,例如TPS 1万以下的场景
希望队列有稳定的生产消费性能,不受内存影响而波动
方法:**临时扩容,快速处理积压的消息**
先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉;
临时创建原先 N 倍数量的 queue ,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue 中;
接着,临时征用 N 倍的机器来部署 consumer,每个 consumer 消费一个临时 queue 的数据
等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer 机器消费消息。
这种做法相当于临时将 queue 资源和 consumer 资源扩大 N 倍,以正常 N 倍速度消费。
采用**“批量重导”**的方式,在流量低峰期,写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。
如果消息积压在RabbitMQ里,并且长时间都没处理掉,导致RabbitMQ都快写满了,这种情况肯定是临时扩容方案执行太慢;这种时候只好采用 **“丢弃+批量重导”** 的方式来解决了。首先,临时写个程序,连接到RabbitMQ里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低RabbitMQ的压力,然后在流量低峰期时去手动查询重导丢失的这部分数据。
要考虑三点:伸缩性、持久化、可用性
伸缩性:需要扩容的时候可以快速扩容,增加吞吐量和容量;可以参考
kafaka
的设计理念,broker -> topic -> partition,每个partition放一个机器,就存一部分数据;资源不够了,给topic增加partition,然后做数据迁移,增加机器;持久化:也就是数据要不要写入磁盘,不写入吧,进程挂了,数据就丢失了,写入磁盘该如何高效写入呢?
kafaka
的思路:顺序读写,采用磁盘缓存(Page Cache)的策略,操作系统采用预读和后写的方式,对磁盘进行优化。预读:磁盘顺序读取的效率是很高的(不需要寻道时间,只需要很少的旋转时间)。而在读取磁盘某块数据时,同时会顺序读取相邻地址的数据加载到PageCache,这样在读取后续连续数据时,只需要从PageCache中读取数据,相当于内存读写,速度会特别快
后写:数据并不是直接写入到磁盘,而是默认先写入到Page Cache,再由Page Cache刷新到磁盘,刷新频率是由操作系统周期性的sync触发的(用户也可以手动调用sync触发刷新操作)。后写的方式大大减少对磁盘的总写入次数,提高写入效率
可用性:分布式系统的高可用几乎都是通过冗余实现的,Kafka同样如此。Kafka的消息存储到partition中,每个partition在其他的broker中都存在多个副本。对外通过主partition提供读写服务,当主partition所在的broker故障时,通过HA机制,将其他Broker上的某个副本partition会重新选举成主partition,继续对外提供服务。
面试专栏合集:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。