前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >redis实现消息队列

redis实现消息队列

原创
作者头像
shigen
发布于 2023-09-07 23:59:40
发布于 2023-09-07 23:59:40
1.6K0
举报

背景

消息队列(Message Queue)是一种常见的软件架构模式,用于在分布式系统中传递和处理异步消息。它解耦了发送消息的应用程序和接收消息的应用程序之间的直接依赖关系,使得消息的发送者和接收者可以独立地演化和扩展。

消息队列的基本原理是发送者将消息发送到一个中间代理(即消息队列),然后接收者从该中间代理中消费消息。中间代理充当了消息的缓冲区,确保消息的可靠传递和持久化存储(根据需要),同时提供了高吞吐量、低延迟和可伸缩性。

消息队列的原理图
消息队列的原理图

相信在做分布式服务开发的时候,或多或少的使用到了消息队列,如主流的kafkarocketMQ。总结下来,消息队列的优点包括:

  • 异步通信:发送者和接收者之间的解耦,使得它们可以独立地操作和演化,无需实时等待回应。
  • 应用解耦:消息队列使不同的应用程序能够以独立的方式进行开发、部署和伸缩,降低了系统之间的耦合度。
  • 削峰填谷:消息队列可以作为缓冲区,处理突发的请求和高峰期的流量,从而减轻系统的压力。
  • 消息持久化:消息队列可以将消息持久化存储,确保在异常情况下不会丢失消息。
  • 可靠性和扩展性:消息队列提供了高可靠性和可伸缩性,通过多个消费者处理大量的消息。

总而言之,消息队列是一种强大的软件架构模式,通过解耦应用程序之间的依赖关系,提供了高可靠性、高吞吐量和可伸缩性的消息传递机制。它在构建分布式系统、处理异步任务和解决系统耦合等方面发挥着重要作用。

那今天的案例呢,没有使用到kafka rocketMQ, 而是继续我的专题redis

redis实现消息队列

list

list这种数据结构天然的支持消息队列,常用的命令如下:

命令

描述

LPUSH key value

在列表头部插入一个或多个值

RPUSH key value

在列表尾部插入一个或多个值

LPOP key

弹出并返回列表头部的一个值

RPOP key

弹出并返回列表尾部的一个值

LRANGE key start stop

获取列表中指定范围内的所有值

LLEN key

获取列表的长度

list实现消息队列的常用命令案例
list实现消息队列的常用命令案例

好的,这个shigenJava的代码实现以下:

  • 创建消息队列服务类redisMessageQueueService

主要的是三个方法,发送数据、消费数据和判断消息队列是否为空。

  • 消息处理类messProcessor

这个类或者说是组件主要是处理消息,这里简单的在控制台输出打印。

  • 系统的接口messageQueueController

其实就是通过接口的方式调用messageQueueServie,实现消息的发送和接受消费。

那最终的效果是什么样的呢》我本地使用的是curl进行的进一步的测试。

list实现的方式测试效果:

list实现的方式测试效果
list实现的方式测试效果

最后,总结一下list实现消息队列的优缺点:

优点:

  • 简单易用:Redis的List数据结构操作简单,易于理解和使用。
  • 支持多样化操作:List数据结构提供了丰富的操作方法,如插入、删除、获取范围等。

缺点:

消息队列的设计最重要的就是消息的防丢失问题

  • 缺乏消息确认机制:List方式没有内置的消息确认机制,当消费者处理消息失败或发生异常时,消息可能会丢失。
  • 不支持消息持久化:Redis的List数据结构默认存储在内存中,当Redis重启或宕机时,消息也会丢失。
  • 不适合高并发场景:在高并发情况下,List方式可能存在性能问题,因为LPUSH和BRPOP是单线程操作,无法充分利用多核CPU的优势。
  • 不适合多订阅者。现在的list是一对一的模式,不支持一对多的模式。

pub/sub模式

针对list一对一的模式,pub/sub可以实现一对多的模式。

常见的redis操作命令如下:

命令

描述

PUBLISH channel message

将消息 message 发送到指定的频道 channel

SUBSCRIBE channel channel ...

订阅一个或多个频道,接收这些频道中发布的消息

UNSUBSCRIBE [channel channel ...]

取消订阅一个或多个频道

PSUBSCRIBE pattern pattern ...

订阅一个或多个符合给定模式的频道

PUNSUBSCRIBE [pattern pattern ...]

取消订阅一个或多个符合给定模式的频道

PUBSUB subcommand [argument argument ...]

获取关于 Redis Pub/Sub 状态的信息

我们在控制台测试一下:

那具体的代码如何实现呢?这里依旧选取的是Java代码作为案例的设计。

  • 定义消息发布的接口并实现发送消息的操作MessagePublisherImpl
  • 消息订阅者messageSubscriberImpl
  • 配置类中加上redisMessageListenerContainer的bean
  • controller测试

服务运行,接口测试一下:

订阅多个topic的话,这样设置:

代码语言:java
AI代码解释
复制
container.addMessageListener(messageListener, new PatternTopic("pub_channel"));
// 监听多个topic
container.addMessageListener(messageListener, new PatternTopic("pub_channel1"));

ok,貌似这种方式也显得很nice,至少比list的实现方式更nice,那它能解决实际的问题吗?我们总结一下这种方式的优缺点:

优点:

  • 实现了多个消费者订阅同一个topic

缺点

  • 数据不可靠:Redis 的 pub/sub 模式没有任何持久化机制,如果发布的消息在订阅者还没有收到前发生宕机,那么这些消息将会丢失。因此,如果需要确保数据的可靠性和持久化,需要使用 Redis 的其他数据结构或者使用 Redis 的 AOF 或 RDB 持久化机制。
  • 消息不能防止重复消费:Redis 的 pub/sub 模式不支持消息的确认和回调机制,因此,当订阅者收到消息时,无法对其进行确认,也就无法防止重复消费

那有什么好的解决方式呢?stream应需求而生。

stream

Redis 的 Stream 是一个基于时间序列的数据结构,用于存储和处理消息。Stream 可以看作是一个由消息组成的日志,每个消息都有一个唯一的 ID(可以是时间戳或其他方式生成),并且可以对消息进行按照时间的顺序和优先级进行排序。

Stream 可以支持多个消费者,并且可以保证每个消费者只能消费一次。Stream 还可以在一个组内进行消费者间负载均衡,以提高系统的可扩展性和高可用性。

常用的API如下:

API

描述

XADD

向指定的 Stream 中添加一个条目(消息)XADD key ID field string field string ...

XDEL

从指定的 Stream 中删除一个或多个条目

XRANGE

获取指定范围内的条目

XREVRANGE

获取指定范围内的逆序条目

XLEN

获取 Stream 中的条目数量

XREAD

从一个或多个 Stream 中读取待处理的条目

XGROUP

创建、管理和操作消费者组

XACK

确认一个或多个已处理的条目

XCLAIM

批量方式对待处理的条目进行声明和处理

XPENDING

获取待处理的条目信息

XTRIM

删除指定范围之外的条目

XINFO

获取 Stream 的相关信息

参考文章:基于Redis的Stream类型的完美消息队列解决方案

添加和读取消息的命令测试如下:

shigen在敲命令的时候也觉得很繁琐,有点麻烦,还是期待Java代码的api去操作消息队列。

参考文章:redis灵魂拷问:如何使用stream实现消息队列 如何在Springboot中使用Redis5的Stream

  • 定义生产消息的messageProcuder

主要是用来实现消息的发送

  • 消息的接受messageReceiver

实现了消息的ack

  • 测试接口

测试中发现了如下错误:

使用stream并不适合用jedis作为连接池。因为我之前的案例都是基于jedis的,在这里果断的放弃了。

好了,以上就是《redis实现消息队列》的全部内容了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
使用redis作为延迟队列方案对比
项目中经常需要做某个操作, 然后一定时间之后看这个操作的执行结果。 要么使用定时任务扫描, 要么使用延时队列(任务)来实现。 在主流的 MQ 中支持延时消息的有 RabbitMQ 和 RocketMQ, 如果没有使用这个两个 MQ, 譬如使用了 Kafka, 又想使用延时消息的功能可以使用 Redis来实现。
leobhao
2024/11/29
2820
使用redis作为延迟队列方案对比
Redis进阶-Redis 4种MQ 方案对比
我们知道redis 5.x版本,作者提供了stream这种基于radix tree 基数树的数据结构,解决使用Redis实现MQ“百花齐放”的乱象。
小小工匠
2021/08/17
1.4K0
Redis Stream实现消息队列中间件
在日常开发中,很多时候我们可能会使用队列实现异步任务的分发。例如用户下单的积分成长值增加、消息发送等等常见。这种场景可以使用Redis中的list数据类型来实现队列功能。但存在不足的几点:
兔云小新LM
2022/06/08
9050
Redis Stream实现消息队列中间件
Redis 中使用 list,streams,pub/sub 几种方式实现消息队列
使用 Redis 实现消息队列 普通的订阅 基于模式(pattern)的发布/订阅 看下源码实现 分析下源码实现 stream 的结构 streamCG 消费者组 streamConsumer 消费者结构 分析下源码实现 基于List的消息队列 基于 Streams 的消息队列 发布订阅 总结 参考 ◆使用 Redis 实现消息队列 Redis 中也是可以实现消息队列 不过谈到消息队列,我们会经常遇到下面的几个问题 1、消息如何防止丢失; 2、消息的重复发送如何处理; 3、消息的顺序性问题; 关于 mq
IT大咖说
2022/03/21
1.3K0
Redis高级特性之Pub/Sub与Stream
在Stream之前,Redis PUB/SUB亦可可实现消息的传递及广播,但消息不支持持久化,不记录消费端状态,并且“Fire and Forgot”,可靠性无法保证。
vitofliu
2019/07/03
4K0
Redis面试题之如何实现一个消息队列
使用Redis实现队列,一般会选择List这种数据类型。因为List数据类型,可以做到先进先出、先进后出的操作。这里的先进先出也就是队列思想,先进后出也就是堆栈思想。使用下面的两个命令,就可以实现一个队列功能:
兔云小新LM
2024/09/27
2390
Redis 中如何实现的消息队列?实现的方式有几种?
细心的你可能发现了,本系列课程中竟然出现了三个课时都是在说消息队列,第 10 课时讲了程序级别的消息队列以及延迟消息队列的实现,而第 15 课时讲了常见的消息队列中间件 RabbitMQ、Kafka 等,由此可见消息队列在整个 Java 技术体系中的重要程度。本课时我们将重点来看一下 Redis 是如何实现消息队列的。
码农架构
2021/02/23
8.7K1
Redis 中如何实现的消息队列?实现的方式有几种?
Redis实践:构建高效消息队列与深入解析BRPOP命令
Redis, 作为一种高性能的键值存储系统,通过提供丰富的数据结构和操作,被广泛应用于各种场景中,包括作为消息队列的实现工具。消息队列是一种在消息的发送者和接收者之间建立的、存储消息的容器,用于异步处理和传输数据,以及分离处理过程。下面列举了Redis中实现消息队列的一些关键功能和操作。
运维开发王义杰
2024/04/15
1.3K0
Redis实践:构建高效消息队列与深入解析BRPOP命令
redis灵魂拷问:如何使用stream实现消息队列
redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的。PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了。
jinjunzhu
2021/01/05
3.2K0
redis灵魂拷问:如何使用stream实现消息队列
Redis实现消息队列的4种方案
Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数据结构,如 字符串,散列,列表,集合,带有范围查询的排序集(sorted sets),位图(bitmaps),超级日志(hyperloglogs),具有半径查询和流的地理空间索引。Redis具有内置复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过Redis Sentinel和Redis Cluster自动分区。
allsmallpig
2021/02/25
2.7K0
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。
勇哥java实战
2024/04/15
4580
Redis专题(四) ——Redis排序、消息队列、优化存储
Redis专题(四) ——Redis排序、消息队列、优化存储 (原创内容,转载请注明来源,谢谢) 一、排序 1、命令 SORTkey [ALPHA] [DESC] [LIMIT start end],对列表、集合和有序集合进行排序,当加上alpha参数后,则可以按照字典顺序排序,加上desc则倒序排序,加上limit则支持分页。 2、关键参数 by参数:by key:*->val,可以指定排序的标准,可以自己传入一个list,也可以指定某个列进行排序。
用户1327360
2018/03/07
2.9K0
Redis专题(四) ——Redis排序、消息队列、优化存储
Redis 消息队列思想
缺点:无法避免消息丢失(拿到消息后,消息就在队列删除了,如果宕机,消息相当于没了)、只支持1个消费者(不像广播模式,多个消费者支持消费)
收心
2022/08/23
3680
Redis Stream——作为消息队列的典型应用场景
Redis最新的大版本5.0已经RC1了,其中最重要的Feature莫过于 Redis Stream 了,关于Redis Stream的基本使用介绍和设计理念可以看我之前的一篇文章(Redis Stream简介)。 Redis Stream 本质上是在Redis内核上(非Redis Module)实现的一个消息发布订阅功能组件。相比于现有的 PUB/SUB 、 BLOCKED LIST ,其虽然也可以在简单的场景下作为消息队列来使用,但是 Redis Stream 无疑要完善很多。 Redis Stream 提供了消息的持久化和主备复制功能、新的RadixTree数据结构来支持更高效的内存使用和消息读取、甚至是类似于 Kafka 的 Consumer Group 功能。今天我们重点关注怎么在实际业务场景下去使用 Redis Stream 。
猿哥
2019/05/13
2K0
Redis Stream——作为消息队列的典型应用场景
把Redis当作队列来用,真的合适吗?
也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。
_Kaito
2021/04/28
7.6K1
把Redis当作队列来用,真的合适吗?
深入理解Redis的Pub/Sub模式
Pub/Sub(发布/订阅)是一种消息传递模式,它允许一个或多个订阅者监听一个特定的主题(频道),当有新的消息发布到该主题时,所有订阅者都会收到通知。
烟雨平生
2023/11/07
1.8K0
深入理解Redis的Pub/Sub模式
重学SpringBoot3-集成Redis(六)之消息队列
Redis 不仅是一个高效的缓存解决方案,也具备强大的消息队列功能。通过 Redis 的 发布/订阅(Pub/Sub) 机制,开发者可以轻松实现服务之间的通信和消息传递功能,而无需引入专门的消息队列工具。这篇文章将介绍如何通过 Spring Boot 3 和 Redis 实现消息队列的发布与订阅功能。
CoderJia
2024/10/18
4280
重学SpringBoot3-集成Redis(六)之消息队列
使用redis做消息队列
消息队列基于生产者-消费者模型,生产者负责生产消息并将其发送到队列中,消费者则从队列中获取消息并进行处理。这种模型能够实现异步通信、解耦系统和提高系统的可伸缩性。在Redis中,我们可以利用其数据结构以及相关命令来实现消息队列的功能。
GeekLiHua
2025/01/21
1630
Redis 学习笔记(六)Redis 如何实现消息队列
消息队列(Messeage Queue,MQ)是在分布式系统架构中常用的一种中间件技术,从字面表述看,是一个存储消息的队列,所以它一般用于给 MQ 中间的两个组件提供通信服务。
归思君
2023/10/16
4.4K0
Redis 学习笔记(六)Redis 如何实现消息队列
学习 Redis - 2. 场景操作
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
叉叉敌
2021/12/06
4050
学习 Redis - 2. 场景操作
相关推荐
使用redis作为延迟队列方案对比
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档