AMQP
(Advanced Message Queuing Protocol
)是一种高级消息队列协议
Exchange
),队列(Queue
)等AMQP
还定义了一个网络协议,允许客户端应用通过该协议与消息代理和 AMQP
模型进行交互通信RabbitMQ
是遵从 AMQP
协议的,换句话说,RabbitMQ
就是 AMQP
协议的 Erlang
的视线(当然 RabbitMQ
还支持 STOMP2
,MQTT2
等协议)
AMQP
的模型结构和 RabbitMQ
的模型结构是一样的在程序中加入相关依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.25.0</version>
</dependency>
生产消息,并把消息投放到队列中
建立连接,需要建立一个连接工厂,然后把信息都放进去,需要连接的时候,直接就从工厂里面拿就可以了
建立连接需要的信息
// 1. 建立连接,创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // 需要提前开放端口号
connectionFactory.setUsername("study"); // 账号
connectionFactory.setPassword("study"); // 密码
connectionFactory.setVirtualHost("coding"); // 虚拟主机
// 工厂建立好了之后,从里面拿出一个连接
Connection connection = connectionFactory.newConnection();
在建立好连接之后,就需要在连接里面创建一个信道,供消息传输
// 2. 开启信道
Channel channel = connection.createChannel();
可以直接使用内置的交换机,就不需要进行操作
声明队列需要用到一个 queueDeclare
方法
// 4. 声明队列
/**
* 声明队列使用的方法的参数
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* 参数说明:
* var1(queue): 队列名称
* var2(durable): 是否持久化
* var3(exclusive): 是否独占
* var4(autoDelete): 是否自动删除
* var5(arguments): 参数
*/
channel.queueDeclare("hello", true, false, false, null);
hello
的队列,就会创建;有则不创建durable
:是否持久化 true
:设置队列为持久化,持久化的队列会存盘,服务器重启之后,消息不会丢失exclusive
:是否独占,只能有一个消费者监听队列 Connection
关闭时,是否删除队列autoDelete
:是否自动删除,当没有 Consumer
的时候,自动删除掉发送消息的时候,会用到 basicPublish()
方法
// 5. 发送消息
/**
* basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * 参数说明:
* var1(exchange): 交换机名称
* var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)
* var3(props): 配置
* var4(body): 消息
*/
for (int i = 0; i < 10; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", "hello", null, msg.getBytes());
}
System.out.println("消息发送成功");
routingKey
要和队列名称一样,才可以路由到对应的队列上去显示地关闭 Channel
是一个好习惯,但这不是必须的,Connection
关闭的时候,Channel
也会自动关闭
// 6. 资源释放
channel.close();
connection.close();
此时我们运行程序,就能看到 RabbitMQ
界面客户端里面:
消费者接收消息,我们也要先建立连接。此处建立连接的思路和生成者的一样
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("coding");
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
// 取出连接
Connection connection = connectionFactory.newConnection();
创建信道,供 Brocker
和消费者之间进行通信
// 2. 创建 Channel
Channel channel = connection.createChannel();
这里用到的方法,和创建生产者中用到的方法一样—— queueDeclare()
// 3. 声明队列
channel.queueDeclare("hello", true, false, false, null);
hello
:队列的名字true
:持久化,队列会存盘,服务器重启之后,消息不会丢失false
:不是独占,不是只有一个消费者监听队列false
:不是自动删除,当没有 Consumer
的时候,也不会自动删除掉null
:没有其他参数这里会用到 basicConsume()
方法
// 4. 消费消息
/**
* basicConsume(String var1, boolean var2, Consumer var3) * 参数说明:
* var1(queue): 队列名称
* var2(autoAck): 是否自动确认
* var3(callback): 接收到消息后,执行的逻辑
*/
channel.basicConsume("hello", true, consumer);
Consumer
用于定义消息消费者的行为。当我们需要从 RabbitMQ
接收消息的时候,需要提供一个实现了 Consumer
接口的对象
DefaultConsumer consumer = new DefaultConsumer(channel){
// 从队列中收到消息,就会执行的方法
@Override
/**
* handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) * 参数说明:
* consumerTag: 消费者标签,通常是消费者在订阅队列时指定的
* envelope: 包含消息的封包信息,如队列名称,交换机等
* properties: 一些配置信息
* body: 消息的具体内容
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//TODO
System.out.println("接收到消息: " + new String(body));
}
};
DefaultConsumer
是 RabbitMQ
提供的一个默认消费者,实现了 Consumer
接口handleDelivery()
// 5. 释放资源
channel.close();
connection.close();
在释放资源之前可以让程序休眠两秒钟,等待回调函数执行完之后再释放,就可以看到打印出来的完整信息了
package rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立连接,创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // 需要提前开放端口号
connectionFactory.setUsername("study"); // 账号
connectionFactory.setPassword("study"); // 密码
connectionFactory.setVirtualHost("coding"); // 虚拟主机
// 工厂建立好了之后,从里面拿出一个连接
Connection connection = connectionFactory.newConnection();
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 声明交换机 使用内置的交换机
// 4. 声明队列
/**
* 声明队列使用的方法的参数
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* 参数说明:
* var1(queue): 队列名称
* var2(durable): 可持久化
* var3(exclusive): 是否独占
* var4(autoDelete): 是否自动删除
* var5(arguments): 参数
*/
channel.queueDeclare("hello", true, false, false, null);
// 交换机要将信息传给对应的队列,所以他们之间存在绑定关系
// 这个绑定关系我们先省略掉,因为我们使用的是内置交换机,
// 5. 发送消息
/**
* basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * 参数说明:
* var1(exchange): 交换机名称
* var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)
* var3(props): 配置
* var4(body): 消息
*/
for (int i = 0; i < 10; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", "hello", null, msg.getBytes());
}
System.out.println("消息发送成功");
// 6. 资源释放
//channel.close();
//connection.close();
}
}
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("coding");
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
Connection connection = connectionFactory.newConnection();
// 2. 创建 Channel Channel channel = connection.createChannel();
// 3. 声明队列
channel.queueDeclare("hello", true, false, false, null);
// 4. 消费消息
/**
* basicConsume(String var1, boolean var2, Consumer var3) * 参数说明:
* var1(queue): 队列名称
* var2(autoAck): 是否自动确认
* var3(callback): 接收到消息后,执行的逻辑
*/
DefaultConsumer consumer = new DefaultConsumer(channel){
// 从队列中收到消息,就会执行的方法
@Override
/**
* handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) * 参数说明:
* consumerTag: 消费者标签,通常是消费者在订阅队列时指定的
* envelope: 包含消息的封包信息,如队列名称,交换机等
* properties: 一些配置信息
* body: 消息的具体内容
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//TODO
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume("hello", true, consumer);
// 等待程序执行完成
Thread.sleep(2000);
// 5. 释放资源
channel.close();
connection.close();
}
}
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有