先决条件 本教程假定 RabbitMQ 已经安装,并运行在
localhost
标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们。
在第 教程[2] 中,我们学习了如何使用工作队列在多个工作单元之间分配耗时任务。
但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用 或 RPC 。
在本篇教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有什么耗时任务值得分发,那干脆就创建一个返回斐波那契数列的虚拟 RPC 服务吧。
为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。该类将暴露一个名为Call
的方法,用来发送 RPC 请求并且保持阻塞状态,直到接收到应答为止。
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
关于 RPC 的说明 尽管 RPC 在计算机中是一种很常见的模式,但它经常受到批评。问题出现在当程序员不知道一个函数是本地调用还是一个耗时的 RPC 请求。这样的混淆,会导致系统不可预测,以及给调试增加不必要的复杂性。误用 RPC 可能会导致不可维护的混乱代码,而不是简化软件。 牢记这些限制,请考虑如下建议:
当有疑问的时候可以先避免使用 RPC。如果可以的话,考虑使用异步管道 - 而不是类似 RPC 的阻塞,其会将结果以异步的方式推送到下一个计算阶段。
一般来讲,基于 RabbitMQ 进行 RPC 通信是非常简单的,客户端发送一个请求消息,然后服务端用一个响应消息作为应答。为了能接收到响应,我们需要在发送请求过程中指定一个'callback'队列地址。
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
// ... then code to read a response message from the callback_queue ...
消息属性 AMQP 0-9-1 协议在消息中预定义了一个包含 14 个属性的集合,大多数属性很少使用,但以下情况除外:
Persistent
:将消息标记为持久的(值为2)或者瞬时的(其他值),可以参考 教程[2]。DeliveryMode
:熟悉 AMQP 协议的人可以选择此属性而不是熟悉协议的人可以选择使用此属性而不是Persistent
,它们控制的东西是一样的。ContentType
:用于描述编码的 mime 类型。例如,对于经常使用的 JSON 编码,将此属性设置为:application/json
是一种很好的做法。ReplyTo
:通常用于命名回调队列。CorrelationId
:用于将 RPC 响应与请求相关联。
在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列,但是这种方式效率低。幸运的是我们有一种更好的方式,那就是为每个客户端创建一个独立的回调队列。
这种方式会引出一个新的问题,在收到响应的回调队列中,它无法区分响应属于哪一个请求,此时便是CorrelationId
属性的所用之处。我们将为每个请求的CorrelationId
设置一个唯一值。之后当我们在回调队列接收到响应的时候,再去检查下这个属性是否和请求中的值匹配,如此一来,我们就可以把响应和请求关联起来了。如果出现一个未知的CorrelationId
值,我们可以安全的销毁这个消息,因为这个消息不属于我们的请求。
你可能会问,为什么我们应该忽略回调队列中的未知的消息,而不是用错误来标识失败呢?这是因为于服务器端可能存在竞争条件。虽然不太可能,但是 RPC 服务器可能在仅发送了响应消息而未发送消息确认的情况下挂掉,如果出现这种情况,RPC 服务器重启之后将会重新处理该请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,并且理想情况下 RPC 应该是幂等的。
我们的 RPC 会是这样工作:
ReplyTo
(设置为回调队列)和CorrelationId
(为每个请求设置唯一值)。rpc_queue
队列。ReplyTo
属性设置的队列将带有结果的消息发送回客户端。CorrelationId
属性。如果它与请求中的值匹配,则返回对应用程序的响应。斐波纳契 任务:
private static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
我们宣布我们的斐波那契函数。并假定只允许有效的正整数输入。 (不要期望这个适用于大数字,它可能是最慢的递归实现)。
我们的 RPC 服务端代码 RPCServer.cs 看起来如下所示:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class RPCServer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
///
/// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
///
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
服务端代码非常简单:
channel.BasicQos
中的prefetchCount
值。BasicConsume
访问队列,然后注册一个交付处理程序,并在其中完成工作并发回响应。我们的 RPC 客户端 RPCClient.cs 代码:
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take(); ;
}
public void Close()
{
connection.Close();
}
}
public class Rpc
{
public static void Main()
{
var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
}
}
客户端代码稍微复杂一些:
Call
方法用来生成实际的 RPC 请求。CorrelationId
编号并保存它,while 循环会使用该值来捕获匹配的响应。ReplyTo
和CorrelationId
。CorrelationId
是否是我们正在寻找的那一个。如果是这样,它就会保存该响应。客户发出请求:
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
现在是查看 RPCClient.cs 和 RPCServer.cs 的完整示例源代码(包括基本异常处理)的好时机哦。
像往常一样设置(请参见 教程[1]]):
我们的 RPC 服务现已准备就绪,现在可以启动服务端:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
要请求斐波纳契数,请运行客户端:
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
这里介绍的设计并不是 RPC 服务的唯一可能实现,但它仍具有一些重要优势:
QueueDeclare
一样同步调用。因此,对于单个 RPC 请求,RPC 客户端只需要一次网络往返。我们的代码很简单,也并没有尝试去解决更复杂(但很重要)的问题,比如就像:
如果您想进行实验,您可能会发现 管理 UI 对于查看队列非常有用。
本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。