RPC
(Remote Procedure Call
), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术
Http
远程调用RabbitMQ
实现 RPC
通信的过程,大概是通过两个队列实现一个可回调的过程
reply_to
:回调队列的名称correlation_id
:不能重复,用来确保请求和响应是一对replyTo
字段,这个字段制定了一个回调队列,服务端处理之后,会把响应结果发送到这个队列replyTo
指定的回调队列correlationID
属性,以确保它是所期望的响应 大致流程:
replyTo
、CorrelationID
)correlationID
)replyTo
,设置 correlationID
)public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
客户端代码主要流程如下:
RPC_REQUEST_QUEUE
,声明本次请求的唯一标志 correlationID
RPC_REQUEST_QUEUE
和 correlationID
配置到要发送的消息队列中channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
//发送请求(使用内置交换机)
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
使用阻塞队列,来存储回调结果
// 接收响应
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
//逻辑是比对 correlationID 是否一致
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调信息:" + respMsg);
if (correlationID.equals(properties.getCorrelationId())) {
// 如果 correlationID 校验一致,说明就是我们要的响应
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
// 获取回调的结果
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了
System.out.println("[RPC Client 响应结果]: " + result);
package rabbitmq.rpc;
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.*;
/**
* RPC Client * 1. 发送请求
* 2. 接收响应
*/
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
//4. 发送请求(使用内置交换机)
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
//5. 接收响应
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
//逻辑是比对 correlationID 是否一致
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调信息:" + respMsg);
if (correlationID.equals(properties.getCorrelationId())) {
// 如果 correlationID 校验一致,说明就是我们要的响应
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了
System.out.println("[RPC Client 响应结果]: " + result);
}
}
服务端代码主要流程如下:
//设置一次只能接收一条消息
channel.basicQos(1);
如果不设置 basicQos
,RabbitMQ
会使用默认的 Qos
设置,其 prefetchCount
默认值为 0
prefetchCount
为 0
时,RabbitMQ
会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者在 RPC
模式下,同上期望的是一对一的消息处理,即一个请求对应一个相应。消费者在处理完一个消息并确认之后,才会接收到下一条消息
接收消息,并做出相应处理
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body, "UTF-8");
System.out.println("接收到请求:" + request);
String responses = "针对 request:" + request + ",响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
RabbitMQ
消息确定机制
RabbitMQ
中,basicConsume
方法的 autoAck
参数用于指定消费者是否应该自动向消息对类确认消息 autoAck=true
):消息对类在将消息发送给消费者之后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费autoAck=false
):消息队列在将消息发送给消费者之后,需要消费者显式地调用 basicAck
方法来确认消息。手动确认提供了更高的可靠性,确保消息不会意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景package rabbitmq.rpc;
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RPC server * 1. 接收请求
* 2. 发送响应
*/
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 接收请求
//设置一次只能接收一条消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body, "UTF-8");
System.out.println("接收到请求:" + request);
String responses = "针对 request:" + request + ",响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
}
}
运行服务端
接收到请求:hello rpc...