这篇文章就谈谈对mq各种问题的思考,以及不同的mq业务方案的解决,注意这篇文章为了解决在学习三大mq的一些问题,和不同mq差异导致的出现的不同的消息解决方案,这往往是很多人所忽视的,我教你!
通过mq中间件,实现系统组件之间的解耦,通过消息队列通信,减少组件的直接依赖关系,提高灵活性和可维护性。
比如数据库与es的同步,就可以通过mq进行同步,监听binlog,将更新任务发给mq,es监听mq,实现更新。
既解耦,又能慢慢的消费,减轻系统压力
消息队列处理异步任务,提高消息响应速度,并发处理能力,还能保证异步任务的持久化
比如短的死信队列完成延迟消息实现任务调度,rocket自带延迟消息。
通过mq完成不同语言的调用,避免跨语言调用的复杂度
通过mq完成日志收集的大量数据,做一个缓冲,当然这也算削峰的一种
这个问题就在于调用方是否关注这次任务的执行结果,MQ异步和线程池异步一样,都不会有结果反馈,
比如用户登录、权限校验等场景,就需要RPC
发送短信、邮箱,这样的就可以采用mq,当然也可采用线程池的异步,
mq与线程池异步对比
mq的好处就是不会有宕机丢失的风险,比如rabbitmq普通队列是默认开启持久化的,线程池异步的阻塞队列宕机导致消息丢失的风险是解决不了的。
特点是消息只能被一个消费者消费
通过负载均衡,把消费分摊到不同的消费者上,rabbitMQ的workQueue就采用的这种机制
这种模式适用于类似广播的任务,一个消息可以被多个消费者去监听
有主题机制的话,那么订阅该主题的都可以知道,没主题机制的话,那么所有消费者都知道消息
这种消费模式存在于Kafka和RocketMq。rabbitMq是不存在消费者 组模式的。
消费者组,订阅队列中的消息,不同的消费者组都会监听到这个消息,但是,只能被消费者组中的一个消费者消费
比如消息1,被消费者组a,消费者组b订阅,那么最终消费组a和消费者组b中的一个消费者才能消费消息,两个消费者组订阅该消息,那么该消息被消费了两次。
消息丢失
各种原因,网络问题,mq故障,需要保证可靠性传输
消息重复
保证幂等性
消息堆积
宕机、并发过高,等问题,导致堆积
延迟消息
这里知道是延迟消息,指的是消息的延迟,而不是任务调度使用的定时消息
处理过程中,存在延迟,无法满足实时性
版本兼容性
随着消息中间件的迭代,会面临一些功能剔除,或者版本不兼容,等问题,需要不断的关注对应社区活跃度
首先,选技术,首先要满足业务,以及业务未来拓展,三大mq基本都能完成,kafka在大数据领域毋庸置疑胜出
然后就是团队本身熟悉度,以及运维团队设施的搭建,还有学习成本,当然三大mq会一个学习其他两个是很快的
那么选型的话,大数据领域kafka就很强了,直接选它,其他领域,rocketmq和rabbitmq大差不差
Kafka严格意义上不是mq,是一个流处理平台,优点是支持多个生产者消费者,大规模的流数据处理很强大,
缺点是除了java外,还需要掌握Scala语言,运维难度比较大,类似mq,支持常规mq的功能
RocketMq好处是纯java语言,阿里开源,性能强劲,支持零拷贝技术,基于JMS实现
RabbitMq好处就是跨语言,架构简单,支持消费者的扩展,AMQP协议使得它可在多种客户端操作,
缺点是:Erlang开发,修改、阅读源码难度大。
总结来说,kafka大数据领域必然是选它,业务mq,rocketmq和rabbitMq都可以,注意RocketMQ是国产的,国外使用相对较少。
保证消息成功的从生产者到broker,ack确认,broker到消费者,最终完成消费
可靠性投递指的是从生产者到mq中的可靠性投递,
从rabbitMq来说的话,生产者到交换机,到队列,这里叫做可靠性投递
再到消费者,多个点都要保证,才能确定消息的可靠性传输
可靠性投递,也可以说是可靠性传输,消息投递了,肯定是要被消费的。
rabbitMq保证可靠性投递的话,
首先要保证持久化,基于磁盘,而不是内存,持久化又要从交换机持久化、队列持久化、消息持久化
交换机持久化、队列持久化,默认是关闭的,需要在编码中体现处理,消息持久化默认是开启的,
注意:消息持久化会写到磁盘,磁盘肯定是跟不上内存速度的,影响RabbitMq的性能,对于可靠性不是很高的常见,可以关闭持久化,来提高整体系统的吞吐量。
@Bean
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
这两个是RabbitConfig中,我们可以这样做,指定让交换机和队列持久化
消息持久化默认开启,我们也可指定关闭掉
deliveryMode消息持久化配置,
- 消息在发送到队列后,会被保存在内存中如果RabbitMQ服务重启或崩溃,这些消息将会丢失。
- 适用场景:适用于一些临时性消息,对于丢失消息没有严格的要求,例如临时计算结果、通知等。持久化消息,deliveryMode = 2
- 消息在发送到队列后,会被保存在磁盘上,即使RabbitMQ服务重启或崩溃,这些消息仍然可以被恢复和投递
- 适用场景:适用于对消息的可靠性和持久化要求较高的场景,例如重要的业务消息、订单信息、日志记录等。
springboot3.x中,可以这样指定关闭消息持久化,这里对比一下
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate template;
/**
* 测试消息持久化机制和非持久化机制
*/
@Test
void testPersistentSend() {
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","消息1 PERSISTENT");
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","消息2 NON_PERSISTENT",msg->{
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return msg;
});
}
}
消息不开启持久化,mq重启一定会消失吗?
这个问题就有点坑了,
非持久化,也有可能写入磁盘的,当内存紧张时,为了节省内存,会把一部分磁盘当作内存使用,操作系统的swap分区,linux中,free -m可以查看到swap分区,os的知识,嘿嘿。
生产者到交换机,消息丢失
通过comfirmCollback,ack机制保证生产者到交换机的可靠性投递,
生产者投递消息后,Brocker收到消息,会发送ACK确认收到消息,
开启消息确认机制会使整体效率变低,
吞吐量下降,因此不是非常重要的消息不建议开启
- `simple`:同步等待confirm结果,直到超时
- `correlated`:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
# 开启发送确认机制,感知消息是否到达交换机
spring.rabbitmq.publisher-confirm-type=correlated
@Test
void testConfirmCallback() {
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param ack 交换机是否收到消息,true是成功,false是失败
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm=====>");
System.out.println("confirm==== ack="+ack);
System.out.println("confirm==== cause="+cause);
//根据ACK状态做对应的消息更新操作 TODO
}
});
//模拟异常:修改投递的交换机名称
//template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"_error","order.new","任务 3");
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","任务 3");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
交换机需要转发到队列,如果转发失败,也会导致消息丢失
通过returnCallback,消息从交换机到队列失败时触发
交换机到队列有两种策略
@Test
void testReturnCallback() {
//交换机处理消息到路由失败,则会返回给生产者
//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
int code = returned.getReplyCode();
System.out.println("code="+code);
System.out.println("returned="+returned.toString());
}
});
//模拟异常,修改路由key,拼接不存在的路由
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","任务来啦11");
}
消息丢失也可能发生在消费者从brocker消费的过程,这里也需要确保被合理处理
SpringAMQP允许三种确认模式,
manual:手动ack
auto:自动ack
none:关闭ack
只有当brocker收到消费者的ack才会从队列中删除数据
ACK默认是自动确认的,若因故障没有正常消费,则会重新放到队列中。
手动确认这样配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 消费者代码手工消费
- deliveryTag,表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag都会增加
- basicNack和basicReject介绍
- basicReject一次只能拒绝接收一个消息,可以设置是否requeue。
- basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue
@RabbitHandler public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message.toString()); System.out.println("body="+body); //进行手动确认 //消息投递序号 是否批量 channel.basicAck(msgTag,false); //拒收消息 //消息投递序号 是否批量 是否将消息回退到队列 channel.basicNack(msgTag,false,true); //拒收消息 (不支持批量拒收) //消息投递序号 是否将消息回退到队列 channel.basicReject(msgTag, true); }
消费者从brocker消费,ack机制确认消费,是有重复消费问题的,因为没法mq无法避免重复投递的问题,比如ack的时候网络原因,mq不知道已经消费了,就会重复的像消费者投递,
这就需要开发人员自行做好幂等处理,
常规通过唯一标识符,来区分是否处理过,比如常见的数据库唯一索引,来保证幂等问题
当然分库分表下,如果无法通过唯一标识路由到同一个库,那么就可能出现幂等问题,这就需要去重机制来保证幂等性,比如前置布隆过滤器,缓存、分布式锁等方式。
队列中消息不能被及时的消费,导致大量堆积在队列里面
rocketMq Kafka RabbitMq都会有这样的问题
产生消息堆积的可以从mq的生产消费模型去考虑,从生产者到消息中间件、再到消费者,都会发生堆积
消费者:消费者处理速度过慢,或者消费者故障、延迟,无法即使的处理消息,导致消息堆积
生产者:生产者产生速度过快,消费者无法即使处理
MQ消息队列:Mq服务器的性能不足,比如它所在的机器,cpu、内存、磁盘等超载,无法即使的处理消息,导致消息堆积
其他:其他方面也会有这样的问题, 比如网络故障,连接问题,消息在传递过程中过慢,从而导致消息堆积
业务方面,消息消费失败重试,不断的重试,没有设置重试次数,导致消息堆积!。
1、增加消费者的数量,提高消费的处理速度;(注意这个不通用,只适合RabbitMq)
需要注意不能一味的水平扩展消费者
因为其他关键链路性能是否抗的住大量的水平扩展,比如mysq、redis,详细见下方rabbitmq消息堆积解决方案
2、或者提高消费者的处理能力,比如通过并发处理、异步处理提高消费者吞吐量。
这个则要注意通过线程池、队列,把mq拉到程序的队列中,要承担对应的宕机导致消息丢失风险。
增加MQ的服务器资源,cpu、内存、磁盘,提高mq处理能力
也可以通过分区队列将消息分散到多个队列中,提高整体的处理能力。(这个则是Kafka、Rocket采用的)
控制队列容量,避免堆积过多,设置持久化策略。rabbitMQ的懒加载队列,兼顾了持久化和堆积上限
,设置监控系统,比如普罗米修斯,监控消息数量,消费者处理速度,队列状态等等,在堆积发生前,即使的告警,及时采取措施。
上面的策略是通用的一些解决方案,不同的MQ,生产消费模型是不一样的,导致需要针对不同mq的消息堆积解决方案不一样。
RabbitMq、Kafka、RocketMq发生消息堆积,分别该如何去解决?
这里先点一下,增加消费者数量,并不是通用的,只适合RabbitMq
RabbitMQ解决消息堆积,有两点,一是通过惰性队列提高堆积的上限,然后再通过WorkQueue增加消费者数量来提高消息的消费速度
通过惰性队列,利用磁盘而不是内存,扩大队列容积,提高堆积上限,
消息队列常规内存存储,16g 32g等等,但是,惰性队列尽可能的将消息存入磁盘中,在消费者消费到的适合才会被加载到内存
而普通队列:
尽可能的把消息存储在内存中,若开启持久化,在被写入磁盘的适合,内存中也会驻留一份备份
对比:惰性队列相较于普通队列,内存开销很小,因为只有读取消息的时候才会加载到内存,而普通队列,持久化功能仅仅是磁盘备份了一下,内存还是每条消息都占据内存,所以开销小,
缺点是io损耗,性能和容量受限于磁盘
官方说:100w条数据,每条大小1KB,普通队列消耗1GB,惰性队列只消耗1MB,对内存是非常非常友好,也很安全,解决消息堆积时可能会导致消息丢失的问题。
使用配置:
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("joseph.lazy.queue")
.lazy()
.build();
}
@Bean
public Binding orderLazyBinding(Queue lazyQueue, Exchange exchange) {
return BindingBuilder.bind(lazyQueue).to(exchange).with("lazy.order.#").noargs();
}
WorkQueue工作队列
WorkQueue工作队列可以让多个消费者绑定到同一个队列,共同消费消息,不同的消费者处于竞争关系
消息积压的时候,直接增加消费者数量,则是WorkQueue工作队列。
可以把消息分配给多个消费者,只要机器够,不断的增加消费者,就可以了,因为rabbitMQ的生产消费模型简单,直接采用WorkQueue工作模式
但是要注意!不能一味的增加消费者节点实例,底层数据库、redis缓存等其他链路可能扛不住这么多节点的能力,支持不了这么高的性能。
还可以调整并发的线程数,程序内部采用线程池和队列,先把mq中的消息拉到程序中,但是要注意宕机丢失消息的风险
rabbitmq的工作方式,只是给出一个规范,比如我们项目中通常使用topic模式和workqueue模式结合
前面讲mq常见消费模式中,讲到点对点模式、发布订阅模式、消费者组模式
rabbitMq是没有消费者组这个概念的,对于一个队列来说,绑定它的消费者即使有多个,消息也只能被一个消费者使用,
rabbitMq的工作模式的区分,依靠的是交换机和队列绑定消费者数量来做到的,实质上,就是利用的这两个特性
1,交换机
2、点对点消费模型
具体实现的话
不用交换机,producer--->queue--->consumer
通过不用交换机和约束只有一个consumer,实现了simple工作方式
采用交换机与多个queue,每个queue绑定一个消费者,实现rabbitmq的发布订阅模型
topic模式是路由模式的进阶,有通配符,在交换机和队列不直接绑定而是通过路由key,queue与consumer一对一,来完成
然后就是上面消息堆积采用的方案
workqueue核心在于给多个消费者绑定一个队列,队列与消费者一对多,**实现消费者的水平扩展,提高消费速度**给的模型是不采用交换机的!
但是它可以和其他方式结合,比如topic模式,直接扩展jvm实例,换个端口,新增节点,就可采用
总结来说,rabbitMq没有消费者组这个概念,更多的是采用点对点的消费模式,即消息只能被一个消费者消费,比如队列的消息,肯定是只能被与他绑定的一个消费者消费的,发布订阅模式采用的是通过交换机和增加队列的数量,完成的,*workqueue可以和其他的模式结合*
用workqueue解决消息堆积问题,不是不采用topic 了,而是两者结合
Kafka确切的来说不能叫做MQ,是一个分布式流处理平台,由Scala和java编写,也可以当作mq系统使用,但不是纯粹的消息队列
主要目的是打造一个高吞吐量的分布式流处理平台,处理网站中的各种流数据,记录用户行为
kafka本身是有高可用机制的,也就是replication副本机制
核心概念有:
生产者
brocker:
也就是部署kafka的节点
Topic:主题
每个topic有partion分区,副本,副本区分leader和follwer, 多个副本的话,选取一个当leader,死掉之后,剩下的follwer会 通过选举新的leader,这个选举之前是通过分布式协调组件zookeeper来选举的,但是从Kafka3.x开始,就不再需要Zookeeper了
partion:分区
kafka主句存储的基本单元,可以理解为rabbitmq的队列
注意:消费者组中消费者的数量必须小于或者等于Partion数量
replication:
副本
注意:副本的数量一定要小于等于brocker节点数量,否则会报错,因为副本就是通过冗余在brocker实现的
副本还有leader、follower这两个概念。
分区和副本是不同的概念,
需要注意的是,通过不同的Brocker实现的副本,那么不同的topic的leader可能会在不同的brocker里
比如,Brocker a Brocker b实现的副本
topic1的leader在Brocker a里面,topic2的leader在Brocker b里面,不同topic的leader是独立分布的
Kafka能保证同一个partion分区的多个replication副本**不在一个brocker里面**,不然副本同时会宕机就没有副本的意义了
Consumer Group:消费者组
同一个partion分区,只能被消费者组中一个消费者消费,如果有3个消费者组,那么消息将会被消费3次,3个消费者在不同的消费者组中
offset:
偏移量,记录consumer消费某个partion分区的位置.
kafka和rocket的消息堆积问题和rabbitMq处理方案是不一样的
这也是为什么带大家复习kafka的核心架构
在上面的复习中,有个核心要点:
某个topic被消费者组消费的时候,消费者组中的 消费者的数量要小于topic中partion分区数量
拓展: 生产者发送到brocker中,投到哪个分区?
问题目的是这样的:假设一个topic3个分区,某个消费者组有两个消费者,那么会有消费者监听消费两个分区
那么从生产者投放到topic中的时候,消息怎样分配不同的分区的?
那么如果消费者数量增加,想要和rabbitMq的workqueue一样,增加消费者来提高消费速度,就不行了,
当大于分区的时候,增加的消费者没有partion去监听消费,就会白白的增加一个节点,消费不了内容
好,
现在看
kafka消息堆积解决方案
1、增加消费者数量
但是有前提,topic的分区要远大于消费者组中消费者数量,看他们的情况和消息堆积的数量,方案限制大
2、 增加分区
通过增加分区来提高并吸毒,减少每个分区上的对计量 ,但是会引起消息到分区的重分配、以及消费者组的重平衡,导致重复消费的可能,注意幂等性保证
3、临时队列分发
将现有挤压的topic分发到令一个支持更多分区的临时topic,启动更多消费者临时的topic处理挤压的消息
这样处理, 让原先的分区不被业务消费,而是直接中转到新的topic
这三个方案核心就是通过提高消费者数量,关键就是扩展分区数量才能提高消费者数量
4、如果仅仅因为kafka brocker的瓶颈,那就增加broker数量,提高整个集群的消费能力
5、硬件资源不足,导致消费过慢,服务器的原因,那就扩展
6、还有就是提高消费者端处理逻辑,提高消费速度,例如调整拉去速度
所以,还得是先做好监控告警,防范,即使真堆积了,也有快速的解决思路
增加临时topic,指定高的分区数量
public static final String TOPIC_NAME_TEMP = "joseph-shop-order-v10-temp";
@Bean
public NewTopic topicTemp(){
return TopicBuilder.name(TOPIC_NAME_TEMP)
.partitions(15)
.compact()
.build();
}
修改原理消费者逻辑,转发到新的topic
@KafkaListener(topics = {KafkaConfig.TOPIC_NAME},groupId = "joseph-test-gp-v1")
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
// 打印出消息内容
System.out.println("消费:"+record.topic()+"-分区="+record.partition()+"-内容="+record.value());
String uuid = UUID.randomUUID().toString().replace("-","");
//分发临时队列
kafkaTemplate.send(KafkaConfig.TOPIC_NAME_TEMP, "joseph-key"+uuid, record.value());
ack.acknowledge();
}
启动新的临时消费者消费消息
@KafkaListener(topics = {KafkaConfig.TOPIC_NAME_TEMP},groupId = "joseph-test-gp-v1")
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
// 打印出消息内容
System.out.println("消费:"+record.topic()+"-分区="+record.partition()+"-内容="+record.value());
try {
System.out.println("模拟写数据库 IO慢的操作");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ack.acknowledge();
}
这里就不带大家实操了,由于篇幅问题想看的话,会分享在我的mq专栏中,这里仅仅理论的分析一下kafka和rabbitmq的高可用
Kafka本身就是的副本机制,分布式流处理平台,本身就是高可用,之前分布式协调采用的Zookpeer,新版3.x采用的Kraft架构,不再依赖zk,而是采用了一致性算法raft,更高效的协调与选举controller。
Kafka3.x的使用,在这篇博客中
待补充:
至于rabbitMq,需要我们搭建集群来保证高可用,有普通集群与镜像集群。
详细见下面博客
待补充:
11111111111111
本文章从大局观纵览mq的问题,不拘泥于某个mq,因为kafka和rocketMq相似,加上小编也不太熟悉rocketMq,后续学习将分享再mq专栏中。
文章中rabbitmq工作方式的图片源于hollis的java八股文档。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。