首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >在RabbitMQ中指定日期后从特定队列取消订阅

在RabbitMQ中指定日期后从特定队列取消订阅
EN

Stack Overflow用户
提问于 2019-12-18 03:33:44
回答 1查看 349关注 0票数 1

我有一个-upon类,它是GUI中的一个特定操作--启动与RabbitMQ服务器的连接(使用pub/ server )并侦听新事件。

我想添加一个新特性,允许用户设置一个“结束时间”,以阻止我的应用程序监听新事件(停止从队列中消费而不关闭它)。

我试图使用basicCancel方法,但是我找不到一种方法使它对预定义的日期起作用。在我的订阅类中启动一个新线程,在到达给定日期时调用basicCancel是一个好主意,还是有更好的方法来做到这一点?

收听新事件

代码语言:javascript
运行
AI代码解释
复制
    private void listenToEvents(String queueName) {
        try {
              logger.info(" [*] Waiting for events. Subscribed to : " + queueName);

              Consumer consumer = new DefaultConsumer(channel) {

                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body) throws IOException {

                    TypeOfEvent event = null;

                    String message = new String(body);


                    // process the payload
                    InteractionEventManager eventManager = new InteractionEventManager();
                    event = eventManager.toCoreMonitorFormatObject(message);


                    if(event!=null){    
                        String latestEventOpnName = event.getType().getOperationMessage().getOperationName();

                        if(latestEventOpnName.equals("END_OF_PERIOD"))
                            event.getMessageArgs().getContext().setTimestamp(++latestEventTimeStamp);            

                        latestEventTimeStamp = event.getMessageArgs().getContext().getTimestamp();                                    
                        ndaec.receiveTypeOfEventObject(event);                  
                    }
                  }
                };

                channel.basicConsume(queueName, true, consumer);   
             //Should I add the basicCancel here?
        }
        catch (Exception e) {
            logger.info("The Monitor could not reach the EventBus. " +e.toString());
        }     

    }

初始化连接

代码语言:javascript
运行
AI代码解释
复制
  public String initiateConnection(Timestamp endTime) {

        Properties props = new Properties();
        try {
            props.load(new FileInputStream(everestHome+ "/monitoring-system/rabbit.properties"));
         }catch(IOException e){
             e.printStackTrace();
        }                       

        RabbitConfigure config = new RabbitConfigure(props,props.getProperty("queuName").trim());

        ConnectionFactory factory = new ConnectionFactory();

        exchangeTopic = new HashMap<String,String>();
        String exchangeMerged = config.getExchange();
        logger.info("Exchange=" + exchangeMerged);
        String[] couples = exchangeMerged.split(";");

        for(String couple : couples)
        {
            String[] infos = couple.split(":");
            if (infos.length == 2)
            {
                exchangeTopic.put(infos[0], infos[1]);
            }
            else
            {
                logger.error("Invalid Exchange Detail: " + couple);
            }
        }

        for(Entry<String, String> entry : exchangeTopic.entrySet()) {

            String exchange = entry.getKey();
            String topic = entry.getValue();

            factory.setHost(config.getHost());
            factory.setPort(Integer.parseInt(config.getPort()));
            factory.setUsername(config.getUsername());
            factory.setPassword(config.getPassword());

            try {
                connection1= factory.newConnection();
                channel = connection1.createChannel();
                channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
                /*Map<String, Object> args = new HashMap<String, Object>();
                args.put("x-expires", endTime.getTime());*/
                channel.queueDeclare(config.getQueue(),false,false,false,null);
                channel.queueBind(config.getQueue(),exchange,topic);            
                logger.info("Connected to RabbitMQ.\n Exchange: " + exchange + " Topic: " + topic +"\n Queue Name is: "+ config.getQueue());
                return config.getQueue();
            } catch (IOException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            } catch (TimeoutException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            }
        }
        return null;
    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-20 03:59:24

您可以创建一个延迟的队列,设置离开的时间,这样您在那里推送的消息就会在您想要停止您的消费者时立即被死信记录下来。

然后,您必须将死信交换绑定到一个队列,该队列的使用者将在收到消息后立即停止另一个队列。

当您有RabbitMq时,永远不要使用线程,您可以使用延迟消息做很多有趣的事情!

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59391151

复制
相关文章
rabbitmq消息队列——"发布订阅"
三、”发布订阅” 上一节的练习中我们创建了一个工作队列。队列中的每条消息都会被发送至一个工作进程。这节,我们将做些完全不同的事情——我们将发送单个消息发送至多个消费者。这种模式就是广为人知的“发布订阅
用户1141560
2017/12/26
9840
rabbitmq消息队列——"发布订阅"
RabbitMQ发布订阅实战-实现延时重试队列
RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门教程开始学习。
用户2131907
2018/05/15
3.3K1
go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式
使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。
高久峰
2023/06/18
2500
RabbitMQ六种队列模式之发布订阅模式
在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者。本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。
黎明大大
2021/03/09
2.2K0
rabbitmq发布订阅
对的,以前我们发送消息是直接由生产者将消息发送到队列,可是这种方式官方是不推荐的!
止术
2020/09/15
4920
rabbitmq主题订阅
上一篇文章讲述了关于直接连接交换机根据key找到对应队列的方式,实现特殊消息特殊队列消费的目的,但是事实上,生产环境下,对于消息的复杂性远不是这样就能够解决的!比如:你要监控有个用户的操作行为,用户的操作行为太多了 增删改查,如果一个一个的写难免会有遗漏,这个时候,我们可以用通配符 user.* 轻松解决!这就是mq的主题模式!
止术
2020/09/15
2030
RabbitMQ:订阅模型-消息订阅模式
订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。
栗筝i
2022/12/28
1.6K0
RabbitMQ:订阅模型-消息订阅模式
RabbitMQ——镜像队列Master故障后的处理
默认情况下,镜像队列的master出现故障时,最老的mirror会被提升为新的master。如果新提升为master的这个mirror与原有的master并未完成数据的同步,那么就会出现数据的丢失,而实际应用中,出现数据丢失可能会导致出现严重后果。
陈猿解码
2023/02/28
5120
RabbitMQ——镜像队列Master故障后的处理
消息队列RabbitMQ核心:交换机(路由、主题、发布订阅)
在上一篇的学习中,使用创建了一个工作队列,我们假设的是工作队列背后,每个任务都恰好交给一个消费者(工作进程)。之前都是将消息发送到队列中,然后由消费者进行消费,其实在RabbitMQ有一个默认的交换机,在发消息时无需指定交换机。
百思不得小赵
2022/12/20
8700
消息队列RabbitMQ核心:交换机(路由、主题、发布订阅)
消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
消息队列RabbitMQ提供了六种工作模式:简单模式、work queues、发布订阅模式、路由模式、主题模式、发布确认模式。本文将介绍前三种工作模式。所有的案例代码都是使用Java语言实现。
百思不得小赵
2022/12/07
5720
消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
RabbitMQ在php中的使用----发布与订阅
安装教程 rabbitmq和php的amqp扩展教程网上有很多,大家可以自行查询,例如:Linux系统安装RabbitMQ及PHP安装amqp拓展库详细教程
美团骑手
2021/01/18
2K0
MQ界的“三兄弟”:Kafka、ZeroMQ和RabbitMQ,有何区别?该如何选择?
在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。
网络技术联盟站
2023/07/22
12.5K0
MQ界的“三兄弟”:Kafka、ZeroMQ和RabbitMQ,有何区别?该如何选择?
RabbitMQ(三) ——发布订阅
RabbitMQ(三)——发布订阅 (原创内容,转载请注明来源,谢谢) 一、概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生
用户1327360
2018/03/07
1.2K0
RabbitMQ(三) ——发布订阅
【EventBus】EventBus 源码解析 ( 取消订阅 )
【EventBus】EventBus 使用示例 ( 最简单的 EventBus 示例 ) 示例中 , 在 MainActivity 中调用
韩曙亮
2023/03/29
5230
RabbitMQ死信队列在SpringBoot中的使用
nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以.
喜欢天文的pony站长
2020/06/26
1.5K0
RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]
遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息模型分别为(P代表生产者,C代表消费者,红色代表队列):
全栈程序员站长
2022/07/18
1.8K0
RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]
RabbitMQ死信队列在SpringBoot中的使用
nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以.
喜欢天文的pony站长
2020/06/29
1.2K0
RabbitMQ死信队列在SpringBoot中的使用
rabbitmq实例_rabbitmq创建队列
RabbitMQ是一个受欢迎的消息代理,通常用于应用程序之间或者程序的不同组件之间通过消息来进行集成。具有高可用高并发的优点,适合集群服务器。采用 Erlang实现, 对主要的编程语言都有客户端支持。
全栈程序员站长
2022/11/09
1.4K0
rabbitmq实例_rabbitmq创建队列
RabbitMQ入门-消息订阅模式
###消息派发 上篇《RabbitMQ入门-消息派发那些事儿》发布之后,收了不少反馈,其中问的最多的还是有关消息确认以及超时等场景的处理。 楼主,有遇到消费者后台进程不在,但consumer连接还在,当前消息是unacked状态,导致这个消息一直不被消费 队列在等待回复的时候,这个消息是怎么存放的?如果一直没有返回有超时么? ... 这里再对消息确认做以下补充 有关超时 RabbitMQ是没有超时概念的,如果一个消费者消费一条消息要花费很长时间,比如10分钟,那么这个过程会一直进行下去。除非你采用其他策略来
JackieZheng
2018/01/16
8920
RabbitMQ入门-消息订阅模式
点击加载更多

相似问题

从rabbitmq队列中仅获取一条消息并取消订阅

20

使用RabbitMQ订阅远程队列

24

在特定日期条带化取消订阅

33

在指定日期后取消条带订阅。

210

没有订阅者的RabbitMQ队列

10
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文