前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试官: 如何保证MQ消息不丢失?

面试官: 如何保证MQ消息不丢失?

作者头像
可为编程
发布2024-05-30 13:10:43
1200
发布2024-05-30 13:10:43
举报

概述

MQ消息丢失的可能存在于方方面面,比如网络问题、MQ挂掉、服务器断电,都会导致消息丢失,那我们如何保障消息的可靠传输就成了很重要的问题。如果是你的话你会怎么回答这个问题呢?

在之前的工作模式中,我们会发现,所有的生产者在推送完消息后就结束或者执行其他任务,并不知晓消息是否发送成功。如果要保证消息的可靠性,需要对消息进行持久化处理。除了设置持久化相关代码外,我们还要保证消息是被推送到MQ中。正常情况下,如果消息经过交换机进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失。

因此接下来会通过生产者丢失消息、如何解决生产者丢失消息、MQ丢失消息、如何解决MQ丢失消息、消费者丢失消息、如何解决消费者丢失消息等方面来具体阐述如何保障消息的可靠性。

关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!

生产者丢失消息

生产者Producer在发布消息给交换器Exchange的时候,突然发生网络故障,导致消息没有到交换器链接就中断,在传输的半路就搞丢了。

解决办法

因此我们有两种方式可以防止这种情况: 第一种:事务模式

代码语言:javascript
复制
channel.txSelect();//声明事务模式
channel.txCommit();//提交事务
chnnel.txRollback();//回滚事务

实现原理:

生产者发送消息时,会检查MQ是否接收到消息,(这里MQ是指交换机)收到消息就执行channel.txCommit() 提交事务,没有被MQ接收到会报错,这时候就可以通过回滚事务channel.txRollback()。然后重试发送消息,如果收到了消息,那么可以提交事务channel.txCommit()。但是这样吞吐量会降下来,因为太耗性能。

代码举例:

生产者Producer

代码语言:javascript
复制
//生产者Producer
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import cn.linkpower.util.MqConnectUtil;

public class Send {
  private static final String queue_name = "test_work_queue_tx";
  
  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    //1、建立连接
    Connection mqConnection = RabbitMQUtil.getMqConnection();
    //2、建立信道(通道)
    Channel channel = mqConnection.createChannel();
    //3、声明队列 非持久化
    channel.queueDeclare(queue_name, false, false, false, null);
    
    //公平分发---
    //为了开启公平分发操作,在消息消费者发送确认收到的指示后,消息队列才会给这个消费者继续发送下一条消息。
    //此处的 1 表示 限制发送给每个消费者每次最大的消息数。
    channel.basicQos(1);
    
    try {
      //开启事务
      channel.txSelect();
      String string = "hello 可为 ";
      System.out.println("send msg = "+string);
      //发送消息
      channel.basicPublish("", queue_name, null, string.getBytes());
      关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
      //模拟异常操作
      //int a = 10/0;
      //无异常  正常执行,则提交事务操作
      channel.txCommit();
    } catch (Exception e) {
      System.out.println("出现异常进行回滚操作");
      channel.txRollback();
    }
    
    //5、使用完毕后,需要及时的关闭流应用
    channel.close();
    mqConnection.close();
  }
}

消费者Consumer

代码语言:javascript
复制
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import cn.linkpower.util.MqConnectUtil;

/**
 * 消息消费者要实现 “公平分发” 的操作,需要关闭自动应答操作;<br>
 * 同时,在处理完消息后,需要向消息队列做“消费完成”的应答!<br>
 * 其实这里都可以自动实现,只要有手动确认模式就走手动,后面会讲。
 */
public class GetMsg1 {
  private final static String queue_name="test_work_queue_tx";
  public static void main(String[] args) throws IOException, TimeoutException {
    //1、建立连接
    Connection mqConnection = MqConnectUtil.getMqConnection();
    //2、获取信道
    final Channel channel = mqConnection.createChannel();
    //3、声明队列
    channel.queueDeclare(queue_name, false, false, false, null);
    关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
    //公平分发---每次只分发一个消息
    channel.basicQos(1);
    
    //4、信访室接受消息
    DefaultConsumer consumer = new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" get msg new1 = " + message );
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }finally {
          System.out.println("get msg new1 done");
          //公平分发--- 消费完成后,需要做相关的回执信息
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
    //5、开启自动应答,默认开启
    //channel.basicConsume(queue_name,true, consumer);
    //公平分发--- 同时需要关闭自动“应答”,
    //RabbitMQ消费者手动应答后不需要再关闭自动应答,因为它们已经是两种不同的模式。
    channel.basicConsume(queue_name, false,consumer);
  }
}

分别进行正常测试和异常测试,发现当出现异常时,会进行回滚操作,此时的消息队列并不能接收到任何的消息。

缺点弊端

如果消息在接收到之前,消费者那边出现连接或者信道关闭,那么消息就丢失了;另一方面,这种模式消费者那边可以传递过载的消息,这里我们对消息进行了每次限制一条,如果没有对传递的消息数量进行限制,有可能使得消费者由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,消费者线程被操作系统杀死。

第二种 Confirm模式

通过设置生产者Channel为comfirm模式,该Channel上发布的所有消息都会被指派一个唯一ID(每次从1开始累加),当消息到达生产者指定的消息队列后,broker会返回一个确认给生产者(包含之前的ID),这样生产者就能知道哪条消息成功发送了。

实现原理:

所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

代码实战:

代码语言:javascript
复制
第一步、配置文件中配置消息发送确认
spring.rabbitmq.publisher-confirms = true
#  这里注意
#  publisher-confirm-type: correlated
#  2.1之前采用publisher-confirms: true 之后采用publisher-confirm-type: correlated
#  none表示不启用消息确认机制;
#  correlated表示使用相关模式进行消息确认,即每个消息发送后都会收到一个确认消息;
#  simple表示使用简单模式进行消息确认,即批量发送消息后收到一个确认消息。

关注公众号【可为编程】回复【加群】进入面试技术交流群!!!

代码语言:javascript
复制
第二步、生产者实现RabbitTemplate.ConfirmCallback接口,重写方法

confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
// 当然也可以通过注入的方式自定义confirm listener 看后面手动创建
// 这里是SpringBoot采用自动注入RabbitMQ的形式配置
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{ 
    @Override
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
        .........
    }
}

关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!

代码语言:javascript
复制
//手动创建RabbitMQ对象并设置自定义confirm listener
Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();//开启confirm模式通道
createProductConfirm(channel);

//MQ生产者Confirm模式
public void createProductConfirm(Channel channel){
    // 创建一个有序的集合 保存每一次的tag
    final SortedSet<Long> confirmSortedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    // 设置监听事件 异步监听每条消息的成功与失败
    channel.addConfirmListener(new ConfirmListener() {
        //成功时回调(异步的  此时表示没有问题的回调)
        //每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            if(multiple){
                //当发送多条消息时,他返回可能多条消息的接受情况,也可能返回单条消息的情况
                System.out.println("handleAck --- multiple == true");
                confirmSortedSet.headSet(deliveryTag+1).clear();
            }else{
                //单条消息
                System.out.println("handleAck --- multiple == false");
                confirmSortedSet.remove(deliveryTag);
            }
        }
        //失败时回调
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            if(multiple){
                System.out.println("handleNack --- multiple == true");
                confirmSortedSet.headSet(deliveryTag+1).clear();
            }else{
                //单条消息
                System.out.println("handleNack --- multiple == false");
                confirmSortedSet.remove(deliveryTag);
            }
        }
    });
}

1、生产者将信道(channel)设置为confirm模式。

2、所有在该信道上发布的消息,都会指派一个起始为1且唯一的id,一旦消息被推送至匹配的队列之后,broker就会发送一个(携带id的)确认给生产者,使得生产者知道消息成功到达了目标队列中。

注意∶消息(message)被发布者(publisher)发送给交换机(exchange),然后交换机将收到的消息根据路由规则routingkey分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者consumer,或者消费者按照需求自行获取。

3、如果消息和队列是可持久化(durable = true)的,那么确认消息会将消息写至磁盘后发出。

4、broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示应答这个序列号之前的所有消息,使其假装都已经得到了处理。批量应答会产生消息丢失的情况,所以要保障消息不丢失,应该用非批量应答multiple=false。后面讲消费者时也会讲到。

关注公众号【可为编程】回复【加群】进入面试技术交流群!!!

事务机制和 confirm机制最大的不同:

事务机制是同步的,你提交一个事务之后会阻塞在那儿,加了事务就不能再加confirm机制。

confirm机制是异步的,你送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。

MQ中丢失消息

RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。

解决办法:

设置持久化有两个关注点:

第一个是创建 queue 的时候将其设置为持久化

第二个是发送消息的时候将消息设置为持久化

第一种:元配置持久化

在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。

代码实战:

代码语言:javascript
复制
 // 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());

第二种:消息持久化

发送消息的时候将消息的deliveryMode设置为2,或者是

MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

关注公众号【可为编程】回复【加群】进入面试技术交流群!!!

代码实战

下面是采用springboot整合的rabbitTemplate来实现消息发送

代码语言:javascript
复制
 MessageProperties props = new MessageProperties();
   props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式

   Message message = new Message(payload, props);
   rabbitTemplate.send(exchange, routingKey, message);

   // 或者直接传递Message对象,其中包含已设置持久化的MessageProperties
   rabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));
   // 或者使用MessagePostProcessor接口动态设置消息属性
   rabbitTemplate.convertAndSend(
       exchange,
       routingKey,
       messageBody,
       new MessagePostProcessor() {
           @Override
           public Message postProcessMessage(Message message) throws AmqpException {
               message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
               return message;
           }
       }
   );

下面是采用SpringBoot中通过new来创建连接来实现消息发送

代码语言:javascript
复制
 Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();
 if (Objects.nonNull(connection)) {
      log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");
      Channel channel = connection.createChannel();
      //消息持久化到磁盘,避免因为突然断电或重启导致消息丢失
      channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));
      log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}

关注公众号【可为编程】回复【加群】进入面试技术交流群!!!

消息持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

消费端丢失消息

消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了, RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。如果消费者消费完没有设置basicAck,没有确认消息就会一直存在RabbitMQ当中。所以久而久之就会造成消息堆积,造成消息重复消费和阻塞RabbitMQ。

解决办法:

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。确认模式主要分为下面三种:

代码语言:javascript
复制
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
代码语言:javascript
复制
注意:在springboot项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式:
代码语言:javascript
复制
spring.rabbitmq.listener.simple.acknowledge-mode = manual

如果成功消费消息,一般调用下面的代码用于确认消息成功处理完成

代码语言:javascript
复制
@Component
@RabbitListener(queues = {RabbitConfig.QUEUE_A, RabbitConfig.QUEUE_B})
public class MsgReceiver {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        try {
            logger.info("【Consumer02成功接收到消息】>>> {}", msg);
            // 确认收到消息,只确认当前消费者的一个消息收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                logger.info("【Consumer02】消息已经回滚过,拒绝接收消息 :{}", msg);
                // 拒绝消息,并且不再重新进入队列
                //public void basicReject(long deliveryTag, boolean requeue)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                logger.info("【Consumer02】消息即将返回队列重新处理 :{}", msg);
                //设置消息重新回到队列处理
                // requeue表示是否重新回到队列,true重新入队
                //public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }
}

关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!

总结

结合代码实战和原理解读了RabbitMQ消息丢失以及对应的解决方案,正如上面图示所言,每一个角色都要兼顾才能够保障消息彻底的不丢失,这些只是MQ为我们提供的一些机制,在日常开发和维护中,网络问题,服务器问题,消息并发量问题处理不好都会导致消息丢失,因此我们要结合实际情况,尽量在编码的时候做万无一失。

JVM“裁员”也讲究算法

JVM垃圾回收也是在“裁员”

消息队列的使用场景综述

一篇文章搞懂Redis缓存雪崩与缓存击穿

还不懂缓存穿透?Redis缓存穿透深度剖析

Redis概述和安装

IOC容器创建bean实例的4种方式

由表及里分析Spring-IOC容器始末

Spring中的核心概念

关于高并发你必须知道的几个概念

线程的创建方式对比与线程池相关原理剖析

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 可为编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第二种 Confirm模式
相关产品与服务
消息队列 CMQ
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档