消息队列是一种在应用程序之间传递数据的通信机制,它基于 发布-订阅
模式,将消息发送者(发布者)和消息接收者(订阅者)解耦,使得它们可以独立地进行消息的发送和接收。
在消息队列中,消息发送者将消息发送到队列中,而消息接收者则从队列中获取消息进行处理。消息队列提供了一种异步的通信方式,即发送者发送消息后不需要等待接收者的回复,而可以立即继续执行其他操作。同时,消息队列还可以实现消息的持久化存储,确保消息在发送和接收过程中的可靠性。
消息队列的应用场景非常广泛,例如:
RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议),并提供了可靠的消息传递机制。
RabbitMQ使用Erlang语言编写,具有高度可靠、可扩展、灵活和可插拔的特性,被广泛应用于分布式系统、微服务架构、异步任务处理等场景。
RabbitMQ基于生产者和消费者模型工作。生产者将消息发送到RabbitMQ的交换机,然后交换机将消息路由到一个或多个队列,消费者从队列中获取消息并进行处理。
RabbitMQ 支持多种消息传递模式,同时还提供了消息的持久化、消息优先级、消息确认机制等特性,确保消息的可靠性和可靠传输。
RabbitMQ是一个成熟、可靠的消息队列中间件,提供了强大的消息传递机制和丰富的特性,被广泛应用于分布式系统和异步消息处理中。
RabbitMQ 提供了简单易用的 API 和管理界面,使得开发者可以快速上手并进行配置和管理,相比之下,Kafka 的配置和管理相对复杂一些。
RabbitMQ 支持多种消息传递模式和消息的路由选择机制,可以根据需求进行灵活的消息处理,而Kafka更适用于大规模的高吞吐量流式处理,通常使用发布-订阅模式。
RabbitMQ 具备持久化消息、消息确认机制等特性,可以确保消息的可靠传输,而 Kafka 通过多副本机制和消息日志的方式,提供了高度可靠性的消息传递。
RabbitMQ 可以通过设置队列的限流策略和消费者的消费速率来进行流量控制和削峰处理,能够保护消费者免受过多的消息推送,而Kafka则将消费者的消费速率控制交给消费者自身,在高并发场景下可能需要额外处理。
RabbitMQ 提供了多种编程语言的客户端,开发者可以根据自己的编程需求选择合适的客户端进行交互,而Kafka的客户端主要集中在Java语言上,对其他语言的支持相对较少。
RabbitMQ 拥有庞大的开源社区和丰富的生态系统,提供了丰富的插件和集成工具,方便开发者进行扩展和集成,Kafka的生态系统相对较小,但在大数据领域有广泛的应用和支持。
Erlang 是 RabbitMQ 消息服务的基础环境,就像 Java 的 JDK 一样,是必须安装的。
Erlang 官网下载地址:下载地址。
因为我们要把 RabbitMQ 服务装在服务器上,所以同学们可以在服务器上下载 Erlang 安装包,或者下载后手动上传至服务器。
下载完成后双击安装包,按照提示流程正常安装即可,截图如下所示。
变量名如下,变量值是安装的路径,如下图所示。
ERLANG_HOME
验证命令如下:
erl -v
RabbitMQ 需要在 Github 中下载,下载地址。
RabbitMQ 版本需要和 Erlang 对应,本文安装的是
3.9.5
版本。
安装流程如下图所示。
安装完成后,使用 cmd 窗口,进入 RabbitMQ 的 sbin 目录,如下图所示。
接着输入以下命令,完成初始化安装。
rabbitmq-plugins enable rabbitmq_management
打开浏览器,输入:
http://localhost:15672
账号密码都是:
guest
RabbitMQ 默认端口为15672,用户名和密码都为guest,是不允许外部访问的。
所以我们要添加新用户,实现外网访问,操作流程如下图所示。
点击添加后,输入新用户的账号和密码,如下图所示。
Virtual Host 需要允许添加的用户访问,如下图所示。
进入子界面后,选择用户后提交,如下图所示。
然后,我们就完成了外网访问的配置。
打开 IDEA 工具,新建项目,如下图所示。
新项目创建完成后,如下图所示。
首先,请在 pom.xml
中引入依赖,代码如下。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml 配置如下。
spring:
rabbitmq:
host: 118.126.82.167
port: 5672
username: zwz
password: 123456
listener:
simple:
retry:
enabled: true
max-attempts: 5
initial-interval: 2s
请同学们创建
SimpleProducer
工具类,代码如下。
package cn.zwz.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SimpleProducer {
public static void main(String[] args) {
//1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//1.1 设置连接IP
connectionFactory.setHost("118.126.82.167");
//1.2 设置连接端口
connectionFactory.setPort(5672);
//1.3 设置用户名
connectionFactory.setUsername("zwz");
//1.4 设置密码
connectionFactory.setPassword("123456");
//1.5 设置虚拟访问节点,就是消息发送的目标路径
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 创建连接Connection
connection = connectionFactory.newConnection("ZWZ-Connection");
//3. 通过连接获取通道Channel
channel = connection.createChannel();
//4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
String queueName = "ZWZ-TOPIC";
/**
* channel.queueDeclare有5个参数
* params1: 队列的名称
* params2: 是否要持久化, false:非持久化 true:持久化
* params3: 排他性,是否独占队列
* params4: 是否自动删除,如果为true,队列会随着最后一个消费消费完后将队列自动删除,false:消息全部消费完后,队列保留
* params5: 携带的附加参数
*/
channel.queueDeclare(queueName, true, false, false, null);
//5. 消息内容
String message = "HELLO World!";
//6. 将消息发送到队列
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
//7. 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8. 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
发消息很简单,运行 main
函数即可。
发送成功后,后台可以接收到数据,如下图所示。
请同学们创建
SimpleConsumer
工具类,代码如下。
package cn.zwz.send;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SimpleConsumer {
public static void work() {
//1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//1.1 设置连接IP
connectionFactory.setHost("118.126.82.167");
//1.2 设置连接端口
connectionFactory.setPort(5672);
//1.3 设置用户名
connectionFactory.setUsername("zwz");
//1.4 设置密码
connectionFactory.setPassword("123456");
//1.5 设置虚拟访问节点,就是消息发送的目标路径
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 创建连接Connection
connection = connectionFactory.newConnection("ZWZ-Connection");
//3. 通过连接获取通道Channel
channel = connection.createChannel();
//4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
String queueName = "ZWZ-TOPIC";
//5. 接收消息并消费消息
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收到的消息内容是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失败。。。");
}
});
System.out.println("开始接受消息。。。。");
//阻断程序
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
//7. 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8. 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
接着在启动类上配置运行,代码如下。
package cn.zwz.send;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SendApplication {
public static void main(String[] args) {
new SimpleConsumer().work();
SpringApplication.run(SendApplication.class, args);
}
}
请同学们运行 SpringBoot 启动类,然后再次发送消息,就可以看到消息内容了,如下图所示。
本文首先简单介绍了 RabbitMQ,然后和 Kafka 等热门消息队列进行对比,最后演示了 RabbitMQ 的完整安装配置整合流程,帮助零基础的小白入门 RabbitMQ 开发。