前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >dotNet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期

dotNet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期

作者头像
郑子铭
发布2025-01-08 11:14:05
发布2025-01-08 11:14:05
8300
代码可运行
举报
运行总次数:0
代码可运行

在 RabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。

前言

关于RabbitMq的更多知识点在:

https://www.dotnetshare.com

下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期

1. 安装RabbitMQ客户端库

首先,你需要安装RabbitMQ的.NET客户端库。这可以通过NuGet包管理器来完成:

代码语言:javascript
代码运行次数:0
复制
Install-Package RabbitMQ.Client

2. 配置RabbitMQ连接字符串

在你的appsettings.json文件中,添加RabbitMQ的连接配置:

代码语言:javascript
代码运行次数:0
复制
{
  "RabbitMQ": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest"
  }
}

3. 创建RabbitMQ服务配置类

创建一个配置类来封装RabbitMQ的连接信息:

代码语言:javascript
代码运行次数:0
复制
public class RabbitMQOptions
{
    public string HostName { get; set; }
    public int Port { get; set; }
    public string UserName { get; set; }
    public string Password { get; set; }
}

4. 注册RabbitMQ服务

Startup.cs或程序启动时的配置方法中,注册RabbitMQ服务:

代码语言:javascript
代码运行次数:0
复制
// 绑定RabbitMQ配置
builder.Services.Configure<RabbitMQOptions>(builder.Configuration.GetSection("RabbitMQ"));

// 注册RabbitMQ连接工厂
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>(sp =>
{
    var options = sp.GetRequiredService<IOptions<RabbitMQOptions>>().Value;
    var factory = new ConnectionFactory() { HostName = options.HostName, Port = options.Port, UserName = options.UserName, Password = options.Password };
    return new RabbitMQConnection(factory);
}); 

// 添加RabbitMQService的服务注册
builder.Services.AddSingleton<RabbitMQService>();

5. 创建RabbitMQ连接和通道工厂

创建一个工厂类来管理RabbitMQ的连接和通道:

代码语言:javascript
代码运行次数:0
复制
    public interfaceIRabbitMQConnection : IDisposable
    {
        Task<IChannel> CreateChannel();
    }

    publicclassRabbitMQConnection : IRabbitMQConnection
    {
        privatereadonly ConnectionFactory _factory;
        privatereadonly IConnection _connection;
        privatebool _isDisposed;

        public RabbitMQConnection(ConnectionFactory factory)
        {
            _factory = factory ?? thrownew ArgumentNullException(nameof(factory));
            _connection = factory.CreateConnectionAsync().Result;
        }

        public async Task<IChannel> CreateChannel()
        {
            EnsureNotDisposed();
            returnawait _connection.CreateChannelAsync();
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (_isDisposed) return;

            if (disposing)
            {
                // Free any other managed objects here.
            }

            // Free any unmanaged objects here.
            _connection.Dispose();

            _isDisposed = true;
        }

        ~RabbitMQConnection()
        {
            Dispose(false);
        }

        private void EnsureNotDisposed()
        {
            if (_isDisposed)
            {
                thrownew ObjectDisposedException(nameof(RabbitMQConnection));
            }
        }
    }

6. 使用RabbitMQ服务

在你的服务或消费者中,注入IRabbitMQConnection并使用它来创建模型(channel):

代码语言:javascript
代码运行次数:0
复制
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;

publicclassRabbitMQService
{
    privatereadonly IRabbitMQConnection _connection;

    public RabbitMQService(IRabbitMQConnection connection)
    {
        _connection = connection ?? thrownew ArgumentNullException(nameof(connection));
    }


    public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default)
    {
         
            try
            {
                usingvar channel = _connection.CreateChannel();
                var mesjson = JsonSerializer.Serialize(message);
                Console.WriteLine("发送消息:" + mesjson);
                var body = Encoding.UTF8.GetBytes(mesjson);
                var properties = new RabbitMQ.Client.BasicProperties
                {
                    Persistent = true// 设置消息持久化
                };
                channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);

            }
            catch (OperationCanceledException ex)
            {
                Console.WriteLine($"Operation was canceled: {ex.Message}");
                //throw; // Re-throw if you want to propagate the cancellation
            }
            catch (Exception ex)
            {
                Console.WriteLine($"An error occurred: {ex.Message}");
                //throw; // Re-throw if you want to propagate the error
            }  
    }

    public async Task ReceiveAsync(string queueName, Func<IChannel, byte[], Task> callback, CancellationToken cancellationToken = default)
    {
        var channel = _connection.CreateChannel();
        await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            try
            {
                // 直接传递 model 和 body 给 callback,不需要转换
                await callback(channel, body);
            }

            finally
            {
                //await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
            }
        };
        await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
        // Prevent the method from returning immediately
        await Task.Delay(-1, cancellationToken);
    }
}

7.生产端和消费端的使用

消费
代码语言:javascript
代码运行次数:0
复制
var app = builder.Build();


var rabbitMQService = app.Services.GetRequiredService<RabbitMQService>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{
    // 处理接收到的消息
    //string message = Encoding.UTF8.GetString(body);
    //Console.WriteLine($"收到消息 message: {message}");
    //// 确认消息
    //await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);

}, cancellationToken);
生产端
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DotNet NB 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 1. 安装RabbitMQ客户端库
  • 2. 配置RabbitMQ连接字符串
  • 3. 创建RabbitMQ服务配置类
  • 4. 注册RabbitMQ服务
  • 5. 创建RabbitMQ连接和通道工厂
  • 6. 使用RabbitMQ服务
  • 7.生产端和消费端的使用
    • 消费
    • 生产端
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档