前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

作者头像
沁溪源
发布2022-01-13 12:33:31
1.2K0
发布2022-01-13 12:33:31
举报
文章被收录于专栏:沁溪源

目录

一、绪论

上篇文章介绍了rabbitmq的基本知识、交换机类型实战《【消息队列之rabbitmq】学习RabbitMQ必备品之一》 这篇文章主要围绕着消息确认机制为中心,展开实战;接触过消息中间件的伙伴都知道,消息会存在以下问题: 1、消息丢失问题和可靠性投递问题; 2、消息如何保证顺序消费; 3、消息如何保证幂等性问题,即重复消费问题等等… 本文主要以Rabbitmq消息中间件解决问题一的实践,其他问题小编会重新写文章总结;

故从业务代码设计层面,我们需要保证生产者发送消息可靠性投递到MQ中间件中,其次保证消费者可以从MQ中获取消息并消费成功;

二、生产者

从生产者角度控制消息的可靠性投递实践;rabbitmq提供了以下方式:事务机制和confirm机制; 其他的工具类等相关代码,请移步到《【消息队列之rabbitmq】学习RabbitMQ必备品之一》

2.1事务机制

  • 基础知识: 事务的实现主要是对信道(Channel)的设置,主要的方法有三个: 声明启动事务模式:channel.txSelect() 提交事务:channel.txComment() 回滚事务:channel.txRollback()
  • 实践代码 生产者端代码如下:
代码语言:javascript
复制
package com.itwx.mq.tx;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ProviderTx {

    private static final String QUEUE_NAME = "test_tx_queue";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            // 获取到连接
            connection = ConnectionUtil.getConnection();
            // 获取通道
            channel = connection.createChannel();
            /**
             * 声明一个队列。
             * 参数一:队列名称
             * 参数二:是否持久化
             * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
             * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
             * 参数五:队列的附加属性
             * 注意:
             * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
             * 2.队列名可以任意取值,但需要与消息接收者一致。
             * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // 启动事务,必须用txCommit()或者txRollback()回滚
            channel.txSelect();

            // 假设这里处理业务逻辑
            String message = "hello, tx message!";
            /**
             * 发送消息到MQ
             * 参数一:交换机名称,为""表示不用交换机
             * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
             * 参数三:消息的属性信息
             * 参数四:消息内容的字节数组
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            /**
             * 提交事务之前,如果生产者发生异常,则消息会被回滚;
             * 但是事务此种模式,无法解决broker宕机问题,导致生产者误以为消息已经发送成功;
             */
            //todo 测试异常
            int i = 1/ 0;
            // 提交事务
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    // 回滚。如果未异常会提交事务,此时回滚无影响
                    channel.txRollback();
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                 }
        }
    }
}

代码中含有TODO注释,大家可以结合rabbitmq管理界面,自测生产者事务是否生效等等; 1、业务异常产生,消息回滚测试; 2、生产者无异常产生,测试消息是否发送成功; 缺点: 开始事务属于同步操作,消息发送成功后,生产者端处于阻塞状态,需要等待消息中间件接收消息的响应,降低生产者的吞吐量和性能;

2.2confirm模式

confirm主要存在以下三种方式: 方式一:channel.waitForConfirms()普通发送方确认模式(串行模式); 方式二:channel.waitForConfirmsOrDie()批量确认模式; 方式三:channel.addConfirmListener()异步监听发送方确认模式; 使用confirm模式,大家可以考虑一下如果消息发送失败之后,如何处理补偿机制重新发送?redis+定时任务

串行模式

串行模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传;

1、启动生产者确认模式channel.confirmSelect(); 2、等待消息中间件响应结果channel.waitForConfirms(); 3、处理返回结果或者捕获异常,触发补偿任务;

  • 生产者代码
代码语言:javascript
复制
package com.itwx.mq.confirm;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

public class ProviderConfirm {
    private static final String QUEUE_NAME = "test_one_confirm_queue";
    public static void main(String[] args) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 启动发送者确认模式
        channel.confirmSelect();

        String message = "hello,message! confirmSelect";
        /**
         * 发送消息到MQ
         * 参数一:交换机名称,为""表示不用交换机
         * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
         * 参数三:消息的属性信息
         * 参数四:消息内容的字节数组
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间,发送成功返回true,否则返回false
        boolean sendResult = channel.waitForConfirms();
        if (sendResult) {
            System.out.print("发送成功");
        }
        /**
         * 存在异常情况,需要补偿机制:
         * 1、消息发送失败,即返回false;
         * 2、channel.waitForConfirms 可能返回超时异常
         * 解决方案:重试几次发送或者利用redis+定时任务来完成补发
         */
        channel.close();
        connection.close();
    }
}

批量模式

批量模式:producer每发送一批消息后,调用waitForConfirmsOrDie()方法,而此种模式方法无返回值,只能根据异常进行判断。如果确认失败会抛出IOException和InterruptedException。源码如下:

代码语言:javascript
复制
void waitForConfirmsOrDie() throws IOException, InterruptedException;

此外注意,写测试demo时,由于存在消息延迟等现象,故发送消息结束之后,主线程休眠5000s或者更多,之后再关闭信道连接;

  • 生产者代码
代码语言:javascript
复制
package com.itwx.mq.confirm;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.concurrent.TimeUnit;


public class ProviderBatchConfirm {
    private static final String QUEUE_NAME = "test_batch_confirm_queue";
    public static void main(String[] args) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 启动发送者确认模式
        channel.confirmSelect();

        String message = "hello,message! confirmSelect";
        /**
         * 发送消息到MQ
         * 参数一:交换机名称,为""表示不用交换机
         * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
         * 参数三:消息的属性信息
         * 参数四:消息内容的字节数组
         */
        for (int i = 1; i<=5; i++) {
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送第" + i +"条消息成功");
        }
        // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
        try {
            channel.waitForConfirmsOrDie();
        } catch (Exception e) {
            e.printStackTrace();
        }

        TimeUnit.SECONDS.sleep(5000);
        //TODO,补偿机制只能依赖于捕获超时异常进行消息补发;
        channel.close();
        connection.close();
    }
}

异步模式

异步模式,开发者可以定义ConfirmListener实现类处理消息发送成功或者失败情况,重写handleNackhandleAck方法; handleNack():消息接收失败的通知方法,开发者可以在这里重新投递消息; handleAck():消息发送成功之前,需要把消息先存起来,比如用KV存储,接收到ack后删除;

  • 生产者代码
代码语言:javascript
复制
package com.itwx.mq.confirm;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ProviderAsyncConfirm {
    private static final String QUEUE_NAME = "test_async_confirm_queue";
    public static void main(String[] args) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 启动发送者确认模式
        channel.confirmSelect();

        String message = "hello,message! confirmSelect";
        /**
         * 发送消息到MQ
         * 参数一:交换机名称,为""表示不用交换机
         * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
         * 参数三:消息的属性信息
         * 参数四:消息内容的字节数组
         */
        for (int i = 1; i<=5; i++) {
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送第" + i +"条消息成功");
        }
        //异步监听确认和未确认的消息
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 消息没有确认的回调方法
             * 参数一:没有确认的消息的编号
             * 参数二: 是否没有确认多个
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //消息接收失败的通知方法,用户可以在这里重新投递消息
                System.out.println(String.format("未确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));
            }

            /**
             * 消息确认后回调
             * 参数一: 确认的消息的编号,从1开始递增
             * 参数二: 当前消息是否同时确认了多个
             * 消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                //发送端投递消息前,需要把消息先存起来,比如用KV存储,接收到ack后删除
                System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));
            }
        });
        //主线程休眠,等待异步回调消息
        TimeUnit.SECONDS.sleep(10000);
        channel.close();
        connection.close();
    }

}

三、消费者

3.1手动ACK

如果触发手动ACK机制,需要改动以下东西:

  • 将自动ACK改为false;
代码语言:javascript
复制
/**
         * 1、设置成手动ACK,即使消费者已经获取了消息,但是未及时ACK回复生产者,然后消费者宕机,消息队列会认为该消费未被消息;故此种情况会存在重复消费的情况;
         * 2、设置成手动ACK,即使消费者发生异常或者宕机情况,保证消息不丢失;
         */
          /**
         * 参数明细:
         * 参数:String queue, boolean autoAck, Consumer callback
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
  • 考虑以下情况: 1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失) 2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态) 3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认) (消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready)
  • 小编demo写了TODO测试用例,注意测试
  • 消费者代码
代码语言:javascript
复制
package com.itwx.mq.ack;

import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author wangxuan
 * @date 2021/12/15 11:44 上午
 * @describe
 */
public class ConsumerACK {
    private final static String QUEUE_NAME = "wx_test_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        /**
         * 参数明细
         * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                //TODO 手动抛异常,造成消息丢失现象
                //测试情况2
//                int i= 1 / 0;

                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println("consumer receive message:" + msg + ",messageId:" + deliveryTag + ",exchange name:" + exchange);
                /**
                 * 考虑以下情况:
                 * 1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失)
                 * 2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态)
                 * 3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认)
                 *          (消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready)
                 *
                 */
                //消息消费成功,手动ACK,
                //测试情况三,注释
//                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列,第二个参数:是否自动进行消息确认。
        /**
         * 参数明细:
         * 参数:String queue, boolean autoAck, Consumer callback
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        //设置成手动ACK,避免重要消息丢失
        /**
         * 1、设置成手动ACK,即使消费者已经获取了消息,但是未及时ACK回复生产者,然后消费者宕机,消息队列会认为该消费未被消息;故此种情况会存在重复消费的情况;
         * 2、设置成手动ACK,即使消费者发生异常或者宕机情况,保证消息不丢失;
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

参考资料: RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/01/04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 一、绪论
  • 二、生产者
    • 2.1事务机制
      • 2.2confirm模式
        • 串行模式
        • 批量模式
        • 异步模式
    • 三、消费者
      • 3.1手动ACK
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档