消息中间件可以理解成就是一个服务软件,保存信息的容器,比如生活中的快递云柜. 我们把数据放到消息中间件当中, 然后通知对应的服务进行获取 消息中间件是在消息的传输过程中保存信息的容器
消息中间件应用场景
同步技术
dubbo是一中同步技术, 实时性高, controller调用service项目, 调用就执行, 如果service项目中的代码没有执行完, controller里面的代码一致等待结果.
异步技术
mq消息中间件技术(jms) 是一种异步技术, 消息发送方, 将消息发送给消息服务器, 消息服务器未必立即处理.什么时候去处理, 主要看消息服务器是否繁忙, 消息进入服务器后会进入队列中, 先进先出.实时性不高.
jms的全称叫做Java message service (Java消息服务) jms是jdk底层定义的规范 各大厂商都是实现这个规范的技术
ActiveMQ:是apache的一个比较老牌的消息中间件, 它比较均衡, 既不是最安全的, 也不是最快的. RabbitMQ:是阿里巴巴的一个消息中间件, 更适合金融类业务, 它对数据的安全性比较高.能够保证数据不丢失. Kafka:Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;适合处理海量数据。
TextMessage: 一个字符串对象 MapMessage:key-value ObjectMessage:一个序列化的 Java 对象 BytesMessage:一个字节的数据流 StreamMessage:Java 原始值的数据流
一个发送方, 一个接收方. 也可以多个发送方, 一个接收方, 主要是接收方必须是第一个.
一个发送方, 多个接收方. 发送方也可以是多个, 主要看接收方, 接收方必须是多个
链接: https://pan.baidu.com/s/1B0ZW3_Z3xcamUCniNjd10Q 提取码: avr2
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
创建QueueProducer
public class QueueProducer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象, 指定发送的队列名称, 队列名称可以随意起名, 但是发送到哪里, 就要从哪里去接收
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}
创建QueueConsumer
public class QueueConsumer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
运行QueueProducer后运行QueueConsumer
TopicConsumer1
public class TopicConsumer1 {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
TopicConsumer2
public class TopicConsumer2 {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
TopicProducer
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}