RabbitMQ针对这个问题,提供了以下几个机制来解决:
本篇博客我们先讲解下生产者确认机制,剩余的机制后续单独写博客进行讲解。
要想保证消息不丢失,首先我们得保证生产者能成功的将消息发送到RabbitMQ服务器。
但在之前的示例中,当生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何消息给生产者的,也就是默认情况下生产者是不知道消息有没有正确的到达服务器。
从basicPublish方法的返回类型我们也能知晓:
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
this.basicPublish(exchange, routingKey, false, props, body);
}
为了更好理解,我们将之前的生产者Producer类中的channel.queueDeclare(QUEUE_NAME, false, false, false, null);
注释:
package com.zwwhnly.springbootaction.rabbitmq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列,不存在的话自动创建
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
此时运行代码,因为队列不存在,消息肯定没地方存储,但是程序却并未出错,也就是消息丢失了但是我们却并不知晓。
RabblitMQ针对这个问题,提供了两种解决方案:
RabblitMQ客户端中与事务机制相关的方法有以下3个:
新建事务生产者类TransactionProducer,代码如下:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TransactionProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列,不存在的话自动创建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.txSelect();
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
运行代码,发现队列新增成功,消息发送成功:
稍微修改下代码,看下异常机制的事务回滚:
try {
channel.txSelect();
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
int result = 1 / 0;
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
因为int result = 1 / 0;
肯定会触发java.lang.ArithmeticException异常,所以事务会回滚,消息发送失败:
如果要发送多条消息,可以将channel.basicPublish,channel.txCommit等方法放在循环体内,如下所示:
channel.txSelect();
int loopTimes = 10;
for (int i = 0; i < loopTimes; i++) {
try {
// 发送消息
String message = "Hello World!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
}
虽然事务能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚。但是使用事务机制会“吸干”RabbitMQ的性能,因此建议使用下面讲到的发送方确认机制。
发送方确认机制是指生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到RabbitMQ服务器之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。
如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack指令。
如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。
事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。
相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息。生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认后,生产者应用程序便可以通过回调方法来处理该确认消息。
新建确认生产类NormalConfirmProducer,代码如下:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class NormalConfirmProducer {
private final static String EXCHANGE_NAME = "normal-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
try {
channel.confirmSelect();
// 发送消息
String message = "normal confirm test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("send message success");
} else {
System.out.println("send message failed");
// do something else...
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
channel.confirmSelect();将信道设置成confirm模式。channel.waitForConfirms();等待发送消息的确认消息,如果发送成功,则返回ture,如果发送失败,则返回false。
如果要发送多条消息,可以将channel.basicPublish,channel.waitForConfirms等方法放在循环体内,如下所示:
channel.confirmSelect();
int loopTimes = 10;
for (int i = 0; i < loopTimes; i++) {
try {
// 发送消息
String message = "normal confirm test" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("send message success");
} else {
System.out.println("send message failed");
// do something else...
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
运行结果:
send message successsend message successsend message successsend message successsend message successsend message successsend message successsend message successsend message successsend message success
如果不开启信道的confirm模式,调用channel.waitForConfirms()会报错:
注意事项:
1)事务机制和publisher confirm机制是互斥的,不能共存。
如果企图将已开启事务模式的信道再设置为publisher confirm模式,RabbitMQ会报错:
channel.txSelect();
channel.confirmSelect();
如果企图将已开启publisher confirm模式的信道再设置为事务模式,RabbitMQ也会报错:
channel.confirmSelect();
channel.txSelect();
2)事务机制和publisher confirm机制确保的是消息能够正确地发送至RabbitMQ,这里的“发送至RabbitMQ”的含义是指消息被正确地发往至RabbitMQ的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。
比如上面的NormalConfirmProducer类发送的消息,发送到了交换器normal-confirm-exchange,但是该交换器并没有绑定任何队列,从业务角度来讲,消息仍然是丢失了。
普通confirm模式是每发送一条消息后就调用channel.waitForConfirms()方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。因此相比于事务机制,性能提升的并不多。
批量confirm模式是每发送一批消息后,调用channel.waitForConfirms()方法,等待服务器的确认返回,因此相比于5.1中的普通confirm模式,性能更好。
但是不好的地方在于,如果出现返回Basic.Nack或者超时情况,生产者客户端需要将这一批次的消息全部重发,这样会带来明显的重复消息数量,如果消息经常丢失,批量confirm模式的性能应该是不升反降的。
代码示例:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class BatchConfirmProducer {
private final static String EXCHANGE_NAME = "batch-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
int batchCount = 100;
int msgCount = 0;
BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
try {
channel.confirmSelect();
while (msgCount <= batchCount) {
String message = "batch confirm test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
// 将发送出去的消息存入缓存中,缓存可以是一个ArrayList或者BlockingQueue之类的
blockingQueue.add(message);
if (++msgCount >= batchCount) {
try {
if (channel.waitForConfirms()) {
// 将缓存中的消息清空
blockingQueue.clear();
} else {
// 将缓存中的消息重新发送
}
} catch (InterruptedException e) {
e.printStackTrace();
// 将缓存中的消息重新发送
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
异步confirm模式是在生产者客户端添加ConfirmListener回调接口,重写接口的handAck()和handNack()方法,分别用来处理RabblitMQ回传的Basic.Ack和Basic.Nack。
这两个方法都有两个参数,第1个参数deliveryTag用来标记消息的唯一序列号,第2个参数multiple表示的是是否为多条确认,值为true代表是多个确认,值为false代表是单个确认。
示例代码:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class AsyncConfirmProducer {
private final static String EXCHANGE_NAME = "async-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
int batchCount = 100;
long msgCount = 1;
SortedSet<Long> confirmSet = new TreeSet<Long>();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Ack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
// 注意这里需要添加处理消息重发的场景
}
});
// 演示发送100个消息
while (msgCount <= batchCount) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, "", null, "async confirm test".getBytes());
confirmSet.add(nextSeqNo);
msgCount = nextSeqNo;
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
运行结果:
Ack,SeqNo:1,multiple:falseAck,SeqNo:2,multiple:falseAck,SeqNo:3,multiple:falseAck,SeqNo:4,multiple:falseAck,SeqNo:5,multiple:falseAck,SeqNo:6,multiple:falseAck,SeqNo:7,multiple:falseAck,SeqNo:8,multiple:falseAck,SeqNo:9,multiple:falseAck,SeqNo:10,multiple:false
注意:多次运行,发现每次运行的输出结果是不一样的,说明RabbitMQ端回传给生产者的ack消息并不是以固定的批量大小回传的。
到目前为止,我们了解到4种模式(事务机制,普通confirm,批量confirm,异步confirm)可以实现生产者确认,让我们来对比下它们的性能,简单修改下以上示例代码中发送消息的数量,比如10000条,以下为4种模式的耗时:
发送10000条消息,事务机制耗时:2103发送10000条消息,普通confirm机制耗时:1483发送10000条消息,批量confirm机制耗时:281发送10000条消息,异步confirm机制耗时:214
可以看出,事务机制最慢,普通confirm机制虽有提升但是不多,批量confirm和异步confirm性能最好,大家可以根据自己喜好自行选择使用哪种机制,个人建议使用异步confirm机制。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有