前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ快速入门

RabbitMQ快速入门

作者头像
用户1216676
发布2018-12-26 15:02:38
4.5K0
发布2018-12-26 15:02:38
举报
文章被收录于专栏:熊二哥

最近一段项目实践中大量使用了基于RabbitMQ的消息中间件,也积累的一些经验和思考,特此成文,望大家不吝赐教。 本文包括RabbitMQ基本概念、进阶概念、实践与思考等三部分,着重强调相关概念基于RabbitMQ进行扩展开发的思路,并简要展示RabbitMQ客户端的编码,接下来通过一个思维导图来展示整体思路,红星表示重点部分。

1.基本概念

官方文档: http://www.rabbitmq.com/#getstarted

1.1.核心实体

进入详细介绍之前,先来看一张简化版的消息流转的模型图。

  • a.生产者Producer发送消息,消息分为消息标识和消息体(payLoad有效载荷)两部分,消息标识中包含Exchange交换器的名字和RoutingKey路由键信息。
  • b.由于交换器之前已经通过2个不同的BindingKey绑定键分别绑定了两个队列,因此交换器可以对比路由键和绑定键,之后将消息路由到匹配的队列中。
  • c.队列将消息推送到指定的一个或多个消费者中,多个消费者会选择最简单的RoundRobin轮训方式进行选择。

Exchange交换器

核心概念,可以简化理解为路由器,其不存储数据,其通常会和一个队列绑定,但也可以绑定到另一个交换器上。其包括4种类型的交换器类型,生产实践中主要使用可以精细管理的direct和topic两种。direct,路由规则为完全匹配;topic,支持完全匹配,也支持模糊匹配;fanout,会将消息转发到该交换器绑定的所有队列中;header,实际中无应用。

Queue队列

用于存储消息,和Kafka的消息模型完全不同,其会将消息存储在Topic中。因此在实现类似ConsumerGroup概念时差异很大,Kafka是可以回溯消息的,但Rabbit新绑定的队列的数据是空的,不能回溯。

Binding绑定

其通过绑定键将交换器和队列关联起来

RouteKey & BindingKey 路由键和绑定键

通常会将路由键和绑定键都称为路由键,其差异是路由键是包含在消息标识中的,而绑定键是用于在交换器和队列间建立绑定关系的,消息会通过它们的匹配情况进行路由。

1.2.通信

通信实体

包括Connection连接和Channel通道,连接通常对应一个基于TCP的Socket,建立Connection的关键参数包括用户名、密码、虚拟主机、主机地址和端口。一个连接可以建立多个Channel实例,推荐控制数量(比如10个),但Channel实例不能在线程间共享,应用程序需要为每一个线程开辟一个Channel。

AMQP协议

Java技术栈汇中,关于消息通信听到比较多的是JMS,而AMQP协议相对更加严格一些,其包括Module Layer,Session Layer, Transport Layer三个层次,业务开发主要接触到的是Module Layer,客户端可以通过Queue.Declare、Basic.Consume等命令进行操作。

1.3.虚拟主机与用户

vhost

虚拟主机,可以在逻辑上看做一台RabbitMQ服务器,其拥有自己的交换器、队列和绑定关系等。RabbitMQ对权限的管理就是基于vhost进行的,默认会创建一个全局的/虚拟主机,通常不推荐直接使用该vhost,而是需要自定义一个vhost便于管理。

User 对于某一个用户,通常包括3种类型的权限:read,允许读取队列数据;write,允许向队列发送数据;config,允许创建队列,如果客户端需要支持添加队列,需要添加该权限,否则会报无权限错误【踩过坑】。

2.进阶概念

2.1.交换器与队列增强

TTL过期时间

目前在两个不同的粒度设置消息的TTL,分别是队列粒度和消息粒度。由于RabbitMQ实际机制的原因,通常都选择的是队列粒度,对于队列粒度来说,队列头的消息一定是最先失效的,因此可以高效的判断和丢弃。而对于消息粒度,其需要在消息真正投递到消费者时进行判断,如果该消息之前的消息并没有失效,那么它将一直存活。

死信交换器DLX

全称为 Dead-Letter-Exchange,也是RabbitMQ扩展开发的核心概念,当一条消息在一个队列中变成死信之后,它能自动的被转发到一个交换器中,这个交换器就是DLX,很多地方称和这个交换器绑定的队列是死信队列, 我并不是完全认同。

消息变为死信的原因

消息被拒绝Nack,Reject,并且requeue参数为false(重点强调一下,生产实践中通常不能打开requeue,因为打开后队列中的消息就会出现乱序的情况,且性能很差);消息过期;队列达到最大长度。 DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 ,实际上就是设置某个队列的属性。

代码语言:javascript
复制
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key" , "dlx-routing-key");
channel.exchangeDeclare("dlx_exchange" , "direct"); //创建 DLX: 
channel.queueDeclare("normal_queue", false, false, false, args); //为队列normal_queue添加 DLX

命名规范:队列类型,[生产者.消费者.队列名后缀];Topic类型,[生产者.exchange.队列名后缀]

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。使用延迟消息场景如在订单系统中,希望用户下单后30分钟内支付,否则取消订单。那么业务系统可以在下单后,发送延迟消息,到达指定时间后消费该消息来判断是否支持。该方式在数据量比较大的场景中比通过Job扫描数据表合适。

在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面 所介绍的DLX和TTL模拟出延迟队列的功能,这部分在实践与思考部分进行介绍。

持久化

交换器和队列元数据持久化和消息的持久化,消息的持久化可以直接使用MessageProperties.PERSISTENT_TEXT_PLAIN

2.2.生产者

生产者客户端的代码比较简洁,如下所示。

代码语言:javascript
复制
byte[] messageBodyBytes = "Hello , Xionger!".getBytes();
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
        messageBodyBytes);

从高可用HA的角度不经要问,消息的生产者将消息发送出去之后,Broker是否收到消息。RabbitMQ针对这个问题提供了两种解决方案,分别是事务机制和发送方确认PublisherConfirm。发送者确认的实现继续细分为3种形式,包括单条同步、批量同步和异步方式。事务机制和单条同步确认方式的性能都比较差,通常只能达到2000QPS左右,因此通常推荐使用发送方确认的批量方式和异步方式,其QPS可以达到8000QPS以上。其中批量方式也存在一个隐患,即发送一批消息到服务端时,如果有一条消息失败,那么该批次所有消息都需要重试。因此目前生产实践中 ,使用的是异步方式,简化的代码实践如下所示。

代码语言:javascript
复制
SortedSet confirmSet = Sets.newTreeSet();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag - 1);
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //omit
        //消息重新投递处理
    }
});

tip: 这部分在服务端ack时有一个优化,只会回传当前最大的标识,可以有效减少比对次数。

2.3.消费者

消费模式:拉模式,推模式,RabbitMQ推荐推模式,保持消息消费的有序性。

代码语言:javascript
复制
boolean autoAck = false;
channel.basicQos(64);//prefetchCount
channel.basicConsume(queueName, autoAck, "myConsumerTag",
        new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                channel.basicAck(deliveryTag, false);
            }
        });

tip:

对于消息生产者,过去还有一个消息投递不可达被返回的概念,涉及mandatory和immediate两个参数,但其在生产实践中并不常用。

3.实践与思考

3.1.环境搭建

安装:Mac环境 brew install rabbitmq,非常简便

管理界面

  • unacked: 消费端没有Ack的数量
  • Publish: 推送消息的QPS
  • Deliver(manual ack): 手动Ack
  • durable: 持久化
  • Policy: 队列的规则
  • Mirrors: 镜像Broker

3.2.Client组件开发

在介绍了RabbitMQ主要知识后,扩展的分享一个简易的基于RabbitMQ消息中间件的思路。由于RabbitMQ是基于Erlang开发,虽然很棒但毕竟比较小众,Java技术栈的工程师一般很难去修改RabbitMQ的源码,因此通常只是通过构建一个合理的客户端SDK来支持业务开发。

生产者

生产者目标比较简单,需要实现健壮性强的的发送者确认机制【异步】和支持队列分片,队列分片可以给队列加上后缀标识,然后轮训处理即可。

消费者

消费者部分希望支持消费失败的重试机制、死信队列及其报警机制,以支持3次重试消费为例,整体思路如下图所示。【借助之前介绍的TTL和DLX】

3.3.场景思考

RabbitMQ最大的特点是成熟度高,管理功能全面,近似开箱即用,二次开发实现一个简单靠谱的客户端就足以满足大部分的场景,尤其对于初创企业、中小企业来说是一个非常棒的选择。

  • 高可用HA:源生支持镜像服务器、同步模型等机制
  • 高吞吐:可以通过队列分片的方式支持大量的QPS,比如单个队列推荐QPS4000以下,健康的水位在2000左右,那么就需要通过二次开发客户端来实现队列分片。

参考资料

[1]朱忠华.RabbitMQ实战指南[M].电子工业出版社:北京,2017.11:.

下期预告:深入理解MySQL索引机制 善良比聪明更重要--张小龙

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-12-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.基本概念
    • 1.1.核心实体
      • 1.2.通信
        • 1.3.虚拟主机与用户
        • 2.进阶概念
          • 2.1.交换器与队列增强
            • 2.2.生产者
              • 2.3.消费者
              • 3.实践与思考
                • 3.1.环境搭建
                  • 3.2.Client组件开发
                    • 3.3.场景思考
                      • 参考资料
                  相关产品与服务
                  轻量应用服务器
                  轻量应用服务器(TencentCloud Lighthouse)是新一代开箱即用、面向轻量应用场景的云服务器产品,助力中小企业和开发者便捷高效的在云端构建网站、Web应用、小程序/小游戏、游戏服、电商应用、云盘/图床和开发测试环境,相比普通云服务器更加简单易用且更贴近应用,以套餐形式整体售卖云资源并提供高带宽流量包,将热门软件打包实现一键构建应用,提供极简上云体验。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档