RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用,课程不对此模式进行讲解)
首先我们讲解简单模式:
接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。
特点:
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。
首先先在服务器启动RabbitMQ服务,然后进入到管控台,如下图:
首先,我们需要引进依赖:
<!-- 添加RabbitMQ依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
* 参数1:队列名
* 参数2:是否持久化,true表示MQ重启后队列还在。
* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数4:是否自动删除,true表示不再使用队列时自动删除队列
* 参数5:其他额外参数
5. 发送消息
* 参数1:交换机名,""表示默认交换机
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
6. 关闭信道和连接
具体代码如下图:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
// 2.建立连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
/**
* 4.创建队列,如果队列已存在,则使用该队列
* 参数1:队列名
* 参数2:是否持久化,true表示MQ重启后队列还在。
* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数4:是否自动删除,true表示不再使用队列时自动删除队列
* 参数5:其他额外参数
*/
channel.queueDeclare("simple_queue",false,false,false,null);
// 5.发送消息
String message = "Hello, RabbitMQ!";
/**
* 参数1:交换机名,""表示默认交换机
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
*/
channel.basicPublish("","simple_queue",null,message.getBytes());
// 6.关闭信道和连接
channel.close();
connection.close();
System.out.println("--- 发送成功 ---");
}
}
运行成功之后控制台打印日志,并且在管控台也可以看得到对应的队列和发送的消息,如下图:
按照正常的程序,上面已经有生产者发送消息了,因此还需要消费者获取的,前三步都是和生产者一样,第四步就是需要进行监听队列:
* 参数1:监听的队列名
* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
具体代码如下:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
// 2.建立连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
/**
* 4.监听队列
* 参数1:监听的队列名
* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
*/
channel.basicConsume("simple_queue",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body,"UTF-8");
System.out.println("接受消息,消息为: "+message);
}
});
}
}
运行成功之后一直接收生产者发送的消息:如下图生产者已经发送了三次并且都成功接收:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。