前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >MQ回退消息 springboot

MQ回退消息 springboot

作者头像
一个风轻云淡
发布2022-11-13 11:19:42
发布2022-11-13 11:19:42
63600
代码可运行
举报
文章被收录于专栏:java学习javajava学习java
运行总次数:0
代码可运行

Mandatory参数

  在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。  

 消息生产者:

代码语言:javascript
代码运行次数:0
复制
@Slf4j
@RestController
public class MessageProduce implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    private  void init()
    {
        rabbitTemplate.setConfirmCallback(this);
        /**
         * true:
         *       交换机无法将消息进行路由的时候,会将该消息返回给生产者
         * false:
         *       如果发现消息无法进行路由,则直接将消息扔掉
         */
        rabbitTemplate.setMandatory(true);
        //将回退消息交给谁处理
        rabbitTemplate.setReturnCallback(this);
    }
    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message)
    {
        //让消息绑定一个id值
        CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1);
        rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData1);
        log.info("发送消息id位{}内容为{}",correlationData1.getId(),message+"key1");
        log.info("发送消息id位{}内容为{}",correlationData1.getId(),message+"key2");

    }
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id= correlationData!=null?correlationData.getId():"";
        if(b)
        {
            log.info("交换机收到消息确认成功;id{}",id);
        }
        else {
            log.error("消息id{}未成功投递到交换机,原因是:{}",id,s);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey)  {
            log.info("消息:{}被服务器退回,退回的原因是{},交换机是{}",
                    new String(message.getBody()),replyText,exchange,routingKey);
    }
}

回调接口

代码语言:javascript
代码运行次数:0
复制
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack ,String cause) {
        String id=correlationData!=null?correlationData.getId():"";
        if(ack)
        {
            log.info("交换机已经收到id为{}的消息",id);
        }
        else
        {
            log.info("交换机还未收到id未:{}的消息,原因是{}",cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.error("消息{},被交换机{}退回。退回的原因是:{},路由key为{}",new String(message.getBody()),
                exchange,replyText,routingKey);
    }
}

消费者

代码语言:javascript
代码运行次数:0
复制
@Component
@Slf4j
public class ConfirmConsumer {
    public static  final  String CONFIRM_QUEUE_NAME="confirm.queue";
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message)
    {
        String s = new String(message.getBody());
        log.info("接收到队列confirm.queue消息:{}",s);
    }
}

http://localhost:8989/sendMessage/8888

结果:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Mandatory参数
  •  消息生产者:
  • 回调接口
  • 消费者
  • 结果:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档