我有一个-upon类,它是GUI中的一个特定操作--启动与RabbitMQ服务器的连接(使用pub/ server )并侦听新事件。
我想添加一个新特性,允许用户设置一个“结束时间”,以阻止我的应用程序监听新事件(停止从队列中消费而不关闭它)。
我试图使用basicCancel方法,但是我找不到一种方法使它对预定义的日期起作用。在我的订阅类中启动一个新线程,在到达给定日期时调用basicCancel是一个好主意,还是有更好的方法来做到这一点?
收听新事件
private void listenToEvents(String queueName) {
try {
logger.info(" [*] Waiting for events. Subscribed to : " + queueName);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
TypeOfEvent event = null;
String message = new String(body);
// process the payload
InteractionEventManager eventManager = new InteractionEventManager();
event = eventManager.toCoreMonitorFormatObject(message);
if(event!=null){
String latestEventOpnName = event.getType().getOperationMessage().getOperationName();
if(latestEventOpnName.equals("END_OF_PERIOD"))
event.getMessageArgs().getContext().setTimestamp(++latestEventTimeStamp);
latestEventTimeStamp = event.getMessageArgs().getContext().getTimestamp();
ndaec.receiveTypeOfEventObject(event);
}
}
};
channel.basicConsume(queueName, true, consumer);
//Should I add the basicCancel here?
}
catch (Exception e) {
logger.info("The Monitor could not reach the EventBus. " +e.toString());
}
}
初始化连接
public String initiateConnection(Timestamp endTime) {
Properties props = new Properties();
try {
props.load(new FileInputStream(everestHome+ "/monitoring-system/rabbit.properties"));
}catch(IOException e){
e.printStackTrace();
}
RabbitConfigure config = new RabbitConfigure(props,props.getProperty("queuName").trim());
ConnectionFactory factory = new ConnectionFactory();
exchangeTopic = new HashMap<String,String>();
String exchangeMerged = config.getExchange();
logger.info("Exchange=" + exchangeMerged);
String[] couples = exchangeMerged.split(";");
for(String couple : couples)
{
String[] infos = couple.split(":");
if (infos.length == 2)
{
exchangeTopic.put(infos[0], infos[1]);
}
else
{
logger.error("Invalid Exchange Detail: " + couple);
}
}
for(Entry<String, String> entry : exchangeTopic.entrySet()) {
String exchange = entry.getKey();
String topic = entry.getValue();
factory.setHost(config.getHost());
factory.setPort(Integer.parseInt(config.getPort()));
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
try {
connection1= factory.newConnection();
channel = connection1.createChannel();
channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
/*Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", endTime.getTime());*/
channel.queueDeclare(config.getQueue(),false,false,false,null);
channel.queueBind(config.getQueue(),exchange,topic);
logger.info("Connected to RabbitMQ.\n Exchange: " + exchange + " Topic: " + topic +"\n Queue Name is: "+ config.getQueue());
return config.getQueue();
} catch (IOException e) {
logger.error(e.getMessage());
e.printStackTrace();
} catch (TimeoutException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
return null;
}
发布于 2019-12-20 03:59:24
您可以创建一个延迟的队列,设置离开的时间,这样您在那里推送的消息就会在您想要停止您的消费者时立即被死信记录下来。
然后,您必须将死信交换绑定到一个队列,该队列的使用者将在收到消息后立即停止另一个队列。
当您有RabbitMq时,永远不要使用线程,您可以使用延迟消息做很多有趣的事情!
https://stackoverflow.com/questions/59391151
复制相似问题