前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【BlogBook书】10、RabbitMQ:消息队列

【BlogBook书】10、RabbitMQ:消息队列

作者头像
老张的哲学
发布2024-02-22 15:47:22
1350
发布2024-02-22 15:47:22
举报
文章被收录于专栏:NetCore 从壹开始

框架中重点使用RabbitMQ和Kafka作为消息队列的中间件工具,本章节说明RabbitMQ的具体使用方式。

框架已经完整的封装了EventBus事件总线,如果想要使用更加完善的消息队列的方式,建议使用事件总线,当然也可以直接如下文进行使用。

一、相关的依赖注入配置

代码语言:javascript
复制
// 服务注册
builder.Services.AddRabbitMQSetup();

相关参数设置

代码语言:javascript
复制
"RabbitMQ": {
   "Enabled": true,
   "Connection": "xxxxx",
   "UserName": "admin",
   "Password": "admin",
   "Port": "5672",
   "RetryCount": 2
 },

二、使用方式

1、在任意位置做消息的发送

代码语言:javascript
复制
private readonly IRabbitMQPersistentConnection _persistentConnection;
 
 public ValuesController(IRabbitMQPersistentConnection persistentConnection)
 {
    _persistentConnection = persistentConnection;
 }


 /// <summary>
 /// 测试Rabbit消息队列发送
 /// </summary>
 [HttpGet]
 [AllowAnonymous]
 public IActionResult TestRabbitMqPublish()
 {
     if (!_persistentConnection.IsConnected)
     {
         _persistentConnection.TryConnect();
     }
     _persistentConnection.PublishMessage("Hello, RabbitMQ!", exchangeName: "blogcore", routingKey: "myRoutingKey");
     return Ok();
 }

2、在需要的地方进行消费

代码语言:javascript
复制
/// <summary>
 /// 测试Rabbit消息队列订阅
 /// </summary>
 [HttpGet]
 [AllowAnonymous]
 public IActionResult TestRabbitMqSubscribe()
 {
     if (!_persistentConnection.IsConnected)
     {
         _persistentConnection.TryConnect();
     }

     _persistentConnection.StartConsuming("myQueue");
     return Ok();
 }

3、具体的代码,请参考RabbitMQPersistentConnection.cs即可

代码语言:javascript
复制
 /// <summary>
 /// 发布消息
 /// </summary>
 /// <param name="message"></param>
 /// <param name="exchangeName"></param>
 /// <param name="routingKey"></param>
 public void PublishMessage(string message, string exchangeName, string routingKey)
 {
     using var channel = CreateModel();
     channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true);
     var body = Encoding.UTF8.GetBytes(message);
     channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
 }

 /// <summary>
 /// 订阅消息
 /// </summary>
 /// <param name="queueName"></param>
 public void StartConsuming(string queueName)
 {
     using var channel = CreateModel();
     channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

     var consumer = new AsyncEventingBasicConsumer(channel);
     consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>(
         async (a, b) =>
         {
             var Headers = b.BasicProperties.Headers;
             var msgBody = b.Body.ToArray();
             var message = Encoding.UTF8.GetString(msgBody);
             await Task.CompletedTask;
             Console.WriteLine("Received message: {0}", message);

             //bool Dealresult = await Dealer(b.Exchange, b.RoutingKey, msgBody, Headers);
             //if (Dealresult) channel.BasicAck(b.DeliveryTag, false);
             //else channel.BasicNack(b.DeliveryTag, false, true);
         }
         );

     channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

     Console.WriteLine("Consuming messages...");
 }

最终效果:

发布一条消息:

消费这个消息

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 NetCore 从壹开始 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
事件总线
腾讯云事件总线(EventBridge)是一款安全,稳定,高效的云上事件连接器,作为流数据和事件的自动收集、处理、分发管道,通过可视化的配置,实现事件源(例如:Kafka,审计,数据库等)和目标对象(例如:CLS,SCF等)的快速连接,当前 EventBridge 已接入 100+ 云上服务,助力分布式事件驱动架构的快速构建。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档