所有发送到direct exchange 的消息被转发到Routekey中指定的Queue
注意: Direct模式可以使用RabbitMQ自带的Exchange(default exchange), 所以不需要将Exchange进行任何Binding操作, 消息传递时RouteKey必须完全匹配才会被队列接收, 否则该消息会被丢弃
RabbitMQHelper
package com.dance.redis.mq.rabbit;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitMQHelper {
private static final String HOST = "192.168.247.142";
private static final Integer PORT = 5672;
private static final String VH = "/";
private static final String USERNAME = "root";
private static final String PASSWORD = "123456";
private static final Boolean ARE = true;
private static final Integer NRI = 3000;
public static final String EXCHANGE_TYPE_DIRECT = "direct";
public static final String EXCHANGE_TYPE_TOPIC = "topic";
public static final String EXCHANGE_TYPE_FANOUT = "fanout";
public static final String EXCHANGE_TYPE_HEADERS = "headers";
private static Connection connection = null;
public static Channel getChannel() throws IOException, TimeoutException {
if (null == connection) {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VH);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setAutomaticRecoveryEnabled(ARE);
connectionFactory.setNetworkRecoveryInterval(NRI);
connection = connectionFactory.newConnection();
}
return connection.createChannel();
}
public static AMQP.Exchange.DeclareOk exchangeDeclare(Channel channel, String exchangeName, String exchangeType) throws IOException {
return channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
}
public static AMQP.Queue.DeclareOk queueDeclare(Channel channel, String queueName) throws IOException {
return channel.queueDeclare(queueName, false, false, false, null);
}
public static void closeConnection() throws IOException {
connection.close();
}
public static Consumer buildConsumer(Channel channel){
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("recvive message:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
}
}
创建这个帮助类, 是为了减少一些通用代码的编写, 后续也会使用
消费者
package com.dance.redis.mq.rabbit.direct;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import java.util.concurrent.TimeUnit;
public class RabbitMQ4DirectExchangeConsumer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_direct_exchange";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_DIRECT);
RabbitMQHelper.queueDeclare(channel,queueName);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
Consumer consumer = RabbitMQHelper.buildConsumer(channel);
channel.basicConsume(queueName, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
package com.dance.redis.mq.rabbit.direct;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
public class RabbitMQ4DirectExchangeProducer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_direct_exchange";
String routingKey = "test_direct_routingKey";
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
先启动消费者
然后启动生产者
查看消费者
已经收到了生产者的消息