今天学习一下消息队列,作为程序员,工作上对此再熟悉不过了,
当然也有用redis作为队列的。
完整代码,已经放到
Gitee:https://gitee.com/nateshao/rabbitmq
GitHub:https://github.com/nateshao/rabbitmq
“消息队列是实现应用程序和应用程序之间通信的中间件产品
消息队列(Message Queue)是一个在计算机科学中广泛使用的概念,主要用于异步通信、应用解耦、流量削峰等场景。
简单来说,它就像是一个快递小哥,把快递送到牛马打工人手中就去下个站点配送,牛马打工人什么时候下来拿快递,不需要管了。
以快递小哥为例,我们可以这样理解消息队列:
现在,我们来看看这个场景是如何工作的:
这个例子中,消息队列的优势主要体现在以下几个方面:
同步与异步的场景
同 步 | 异 步 |
---|---|
系统耦合度高 | 参与的各功能模块相对独立,耦合度低 |
并发压力持续向后续服务传导 | 借助消息队列实现流量削峰填谷 |
系统结构缺乏弹性,可扩展性差 | 各功能模块对接消息队列,系统功能扩展方便 |
响应时间长 | 快速响应 |
注 意
官网地址:https://www.rabbitmq.com/
RabbitMQ是一款基于AMQP、由Erlang语言开发的消息队列产品,2007年 Rabbit技术公司发布了它的1.0版本
Producer:消息的发送端,也可以称为消息的生产者
Consumer:消息的接收端,也可以称为消息的消费者
消息发送端或消息消费端到消息队列主体服务器之间的TCP连接
建立TCP连接需要三次握手,反复确认。
所以如果每一次访问RabbitMQ服务器都建立一个Connection开销会极大,效率低下。
所以Channel就是在一个已经建立的Connection中建立的逻辑连接。
如果应用程序支持多线程,那么每个线程创建一个单独的Channel进行通讯。每个Channel都有自己的id,Channel之间是完全隔离的。
核心点就是:实现Connection复用
采用docker安装,一条龙服务~
# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
访问后台管理界面:http://localhost:15672
使用上面创建Docker容器时指定的默认用户名、密码登录:
上面可以看到rabbitmp的可视化界面了
生产者发送消息,消费者接收消息,用最简单的方式实现
也可以看一下官网的demo:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
1、创建工程,以Java为例子
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>
package com.nateshao.hello_world;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author 千羽
* @公众号 程序员千羽
* @Date 2024/5/29 16:00
* @Version 1.0
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建Channel
Channel channel = connection.createChannel();
// 5. 创建队列
// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
// 参数1. queue:队列名称
// 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
// 参数3. exclusive:是否独占。
// 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
// 参数5. arguments:其它参数。
// channel.queueDeclare("simple_queue",true,false,false,null);
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后,会自动执行该方法
// 参数1. consumerTag:标识
// 参数2. envelope:获取一些信息,交换机,路由key...
// 参数3. properties:配置信息
// 参数4. body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
// 参数1. queue:队列名称
// 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3. callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue", true, consumer);
}
}
package com.nateshao.hello_world;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author 千羽
* @公众号 程序员千羽
* @Date 2024/5/29 16:10
* @Version 1.0
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("127.0.0.1");
// 设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为 /
connectionFactory.setVirtualHost("/");
// 设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
// 设置连接密码;默认为guest
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
// queue 参数1:队列名称
// durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// arguments 参数5:队列其它参数
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;千羽!";
// 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// 参数2:路由key,简单模式可以传递队列名称
// 参数3:配置信息
// 参数4:消息内容
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
运行生产者,查看效果
控制台打印
已发送消息:你好;千羽!
Process finished with exit code 0
控制台打印
consumerTag:amq.ctag-bAgTsr5fp03WXRcIyph2sg
Exchange:
RoutingKey:simple_queue
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:你好;千羽!
查看后台管理界面
因为消息被消费掉了,所以RabbitMQ服务器上没有了:
创作不易,感谢大家支持!
参考链接:https://www.bilibili.com/video/BV1sw4m1U7Qe/