前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊RabbitMQ那一些事儿之一基础应用

聊聊RabbitMQ那一些事儿之一基础应用

作者头像
小小许
发布2020-06-20 11:18:41
3200
发布2020-06-20 11:18:41
举报
文章被收录于专栏:angularejs学习篇

聊聊RabbitMQ那一些事儿之一基础应用

  Hi,各位热爱技术的小伙伴您们好,今年的疫情害人啊,真心祝愿您和您的家人大家都平平安安,健健康康。年前到现在一直没有总结点东西,写点东西,不然久了自己感觉自己都要被废啦。这个周末花了一些时间来梳理了一下RabbitMQ的相关知识点。先来一个基础篇,先用起来。我也是一个边学习边梳理的过程,如果有什么梳理的不妥之处,多多指点,相互学习,谢谢!

  在使用前,我们首先第一件事情就是环境搭建。至于RabbitMQ的环境搭建,我就不在此啰嗦了,网上一搜一大堆,还没有搭建环境的小伙伴,可以网上找度娘哈,嘿嘿。

一、什么是MQ

  MQ简单的说就是队列,队列的特性就是先进先出。我们其实可以把队列理解为一个消息管道,通过消息管道实现消息传递。最终达到不同的进程间、不同服务间的通讯需要。

  在一个程序中,我们 可以通过MQ实现不同进程间的通讯。在不同程序/服务间,我们同样可以通过MQ来实现相互通讯,这也是本文的重点,这个时候就该今天的主角登场了。

二、RabbitMQ介绍

  RabbitMQ是一个开源的,在AMQP基础完整的,可复用的企业消息系统。我个人的简单的理解就是,实现消息的接收、存储、管理、分发。在操作系统支持上,支持主流的操作系统(Linux、Windows);在开发语言接口支持上,支持所有的主流开发语言;在性能上,支持消息持久化、集群化、高并发等等。

三、RabbitMQ关键词介绍

  Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。

  Virtual Host:一个虚拟概念,其实简单的理解你可以认为是在逻辑上对MQ进行分区隔离,这样避免不同业务的MQ直接交叉感染。一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。

  Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。

  Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。

  Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。

  Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

  Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。

四、RabbitMQ三大角色介绍

  通过上面的一些简单介绍,我相信你对MQ有了一个初步的印象。也许你会云里雾里的,到底是怎么运行起来的啊,来一个实际点的。哈哈,不急,下面马上进入RabbitMQ跑起来阶段。其实要跑起来,我们还要简单介绍一下RabbitMQ重要的三个角色:生产者、服务器、消费者。

  生产者:也就是消息生产方,通过RabbitMQ提高的API,将消息推送到RabbitMQ服务器。

  服务器:RabbitMQ的服务中心,接收生产者生产的消息,并根据分发规则,将消息推送到对应的消费者。

  消费者:顾名思义,就是消息的最终接收处理者。

  这样一来,我相信大家脑海里面已经有一个画面了,生产者--生成消息-->服务器--转发-->消费者(最终处理消息)。这就是一个消息的整体流程和生命周期。

五、RabbitMQ跑起来

  通过上面的介绍,我们应该知道MQ的简单的消息交互的流程。有了这个基础,下面我们就分类来介绍一下三大角色的数据交付方式。整体上来说,数据交互方式上有以下5种方式(5种工作模式),在网上找了一张图,很方便的供大家参考。

  其实通过上面的图,我们会发现,前两种情况,消费者和生成者之间都是直接通过连接,后面三种情况,消费者和生产者直接有一层交换机(Exchange)。这样一来,我们可以从整体上分为两个大类:其一、消息直推队列;其二、消息推送给交换机,交换机根据路由规则转发至队列。

  其实在实际的工作中,第一大类,我们是不会使用到的,都是采用的第二大类来实现实际的项目开发需求。但是第一大类,能够很好的将我们先领我们入门,先简单的把程序跑起来。由于时间原因,今天我们也就先实现第一大类的两种情况,第二大类的,明后天在专门的文章来详细介绍。

简单模式:

简单模式就是只有一个生产者,一个消费者。这个很简单,下面用一个实际例子来说明。直接贴代码:

生产者代码:

代码语言:javascript
复制
/// <summary>
 /// 消息生成者
 /// </summary>
public class Program
{
    static void Main(string[] args)
    {
        // rabbitMQ链接对象
        var factory = new ConnectionFactory();
        // RabbitMQ服务在本地运行
        factory.HostName = "192.168.1.1";
        // RabbitMQ服务端口
        factory.Port = 5672;
        // 用户名
        factory.UserName = "guest";
        // 密码
        factory.Password = "guest";
        // 虚拟主机名称
        factory.VirtualHost = "/";

        // 队列名称
        string queueName = "hello";

        // 创建链接
        using (var connection = factory.CreateConnection())
        {
            // 创建通道
            using (var channel = connection.CreateModel())
            {
                // 创建一个名称为hello的消息队列--当然一步也可以通过RabbitMQ管理后台添加
                // 当已经存在该队列时,不会重复添加,但是如果已存在的队列和新建的队列存在属性差异时,会创建失败,会抛异常,所以在实际使用时,如果要通过程序创建队列,最好要捕捉异常,避免因为这样的问题而导致程序崩溃。
                channel.QueueDeclare(queueName, false, false, false, null);
                Console.WriteLine("我是生成者");

                while (true)
                {
                    Console.WriteLine("请输入你要发送的消息,并按Enter键结束");

                    // 接收用户输入的消息
                    string message = Console.ReadLine();
                    // 消息编码
                    var body = Encoding.UTF8.GetBytes(message);
                    // 向消息服务器推送消息
                    channel.BasicPublish("", queueName, null, body);

                    Console.WriteLine($"已发送 {System.DateTime.Now.ToString("HH:mm:ss")}: {message}");
                }
            }
        }
    }
}

  消费者代码:

代码语言:javascript
复制
 /// <summary>
 /// 消息消费者
 /// </summary>
 public class Program
 {
     static void Main(string[] args)
     {
         // rabbitMQ链接对象
         var factory = new ConnectionFactory();
         // RabbitMQ服务在本地运行
         factory.HostName = "192.168.1.1";
         // RabbitMQ服务端口
         factory.Port = 5672;
         // 用户名
         factory.UserName = "guest";
         // 密码
         factory.Password = "guest";
         // 虚拟主机名称
         factory.VirtualHost = "/";

         // 队列名称
         string queueName = "hello";

         // 创建链接
         using (var connection = factory.CreateConnection())
         {
             // 创建通道
             using (var channel = connection.CreateModel())
             {

                 // 创建一个名称为hello的消息队列--当然一步也可以通过RabbitMQ管理后台添加
                 // 当已经存在该队列时,不会重复添加,但是如果已存在的队列和新建的队列存在属性差异时,会创建失败,会抛异常,所以在实际使用时,如果要通过程序创建队列,最好要捕捉异常,避免因为这样的问题而导致程序崩溃。
                 channel.QueueDeclare(queueName, false, false, false, null);
                 Console.WriteLine("我是消费者");

                 // 创建一个消费者
                 var consumer = new EventingBasicConsumer(channel);
                 // 订阅对应的消息 autoAck:是否自动确认
                 channel.BasicConsume(queueName, autoAck:false, consumer);

                 consumer.Received += (model, ea) =>
                 {
                     var body = ea.Body;
                     var message = Encoding.UTF8.GetString(body);
                     Console.WriteLine($"已接收 {System.DateTime.Now.ToString("HH:mm:ss")}: {message}");

                     // 为了模拟推送过程,在此程序休息1分钟
                     Thread.Sleep(6000);
                     // 确认消费
                     channel.BasicAck(ea.DeliveryTag, false);
                 };
                 Console.ReadLine();
             }
         }
     }
 }

运行结果:

  通过实际的运行结果图,我们很清楚的知道,生产者的消息发生顺序,和消费者消费的顺序是一直的,这也就MQ的基本原理所在。

上面介绍了简单模式,下面我在来介绍一下比简单模式复杂一点的工作模式。

工作模式:

  我理解的简单模式,只是带我们入门,让我们明白MQ的运行效果是咋样的。但是在实际工作中,不可能只会有一个消费者,在实际的生产环境中生产者、消费者都可能会有多个存在,这也就是我们说的工作模式。那么,有多个生成的者的时候,不同的生产者之间又是怎么来消费消息的呢?下面我们先通过实践的例子来说明:

  具体的代码和上面的代码是一样的,我们可以直接开两个消费者就可以实现数据模拟,直接看运行结果:

  同上面的实际运行结果我们可以简单的得出以下结论:

  当一个队列有多个消费者时,在生成的实时消息时,消息队列服务器会轮询的均匀的分发给每一个消费者。

  哈哈哈,注意了,上面的结论我说的是实时消息哦,这里面就包含了一个坑,在实际的使用过程中要特别注意。那就是历史消息处理上,在实际项目使用过程中,我们经常会遇到,当消费者打开时,队列中已经有很多消息待消费,这个时候又该如何保证多个消费均匀分配消息呢?避免忙绿的消费者累死现象。其实很简单,只需在消费端加上如下一个配置即可:

代码语言:javascript
复制
代码语言:javascript
复制
 // 通过Qos设置每次接收消息的条数
 // 三个参数说明
 // prefetchSize:为预取的长度,一般设置为0即可,表示长度不限
 // prefetchCount:表示预取的条数,即发送的最大消息条数
 // global表示是否在Connection中全局设置,true表示Connetion下的所有channel都设置为这个配置。
 channel.BasicQos(prefetchSize: 0,
                  prefetchCount: 1,
                  global: false);

  上面的配置中,最关键的一个参数就是prefetchCount,当我们设置为1时,就是能够实现均匀的分发。下面分别对prefetchCount设置不同的值,来看看不同的效果:   实例一:将prefetchCount设置为10,并生成3条历史消息,然后同时打开两个消费者,看看3条消息的分发消费情况:

  通过图,我们得出,3条历史消息全部推送给了一个消费者,这样就导致了一个消费者累死,一个消费者闲的慌。   实例二:将prefetchCount设置为1,并生成4条历史消息,然后同时打开两个消费者,看看3条消息的分发消费情况:

  通过图,我们得出,4条历史消息平均的分发给了两个消费者,这也是我们想要的效果。   所以在实际工作中,一定要注意这一个细节,不然有可能导致在服务器重启时,有的服务器直接卡死现象。   好了,时间不早了,今天就先写到这,明天我们继续分享后面的几种模式。在分析完每一种模式后,我还好结合实际,封装一个dll出来,供大家参考,到时候也会直接把源码提出来。欢迎大家关注,持续交流。疫情无情,我们学习不能停。加油吧,每一个小伙伴!​

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-03-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 聊聊RabbitMQ那一些事儿之一基础应用
    • 一、什么是MQ
      • 二、RabbitMQ介绍
        • 三、RabbitMQ关键词介绍
          • 四、RabbitMQ三大角色介绍
            • 五、RabbitMQ跑起来
              • 简单模式:
              • 生产者代码:
              • 工作模式:
          相关产品与服务
          消息队列 CMQ 版
          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档