Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Redis之stream类型解读

Redis之stream类型解读

原创
作者头像
一个风轻云淡
发布于 2023-09-23 12:31:15
发布于 2023-09-23 12:31:15
44500
代码可运行
举报
文章被收录于专栏:java学习javajava学习java
运行总次数:0
代码可运行

​基本介绍

Redis stream(流)是一种数据结构,其作用类似于仅追加日志,但也实现了多个操作来克服典型仅追加日志的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,如消费者群体。 您可以使用流实时记录和同时联合事件。

Redis 为每个stream(流)条目生成一个唯一的 ID。可以在以后使用这些 ID 检索其关联的条目,或读取和处理流中的所有后续条目。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

数据结构

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容

消息

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。XADD key ID field value[field value ...]

  • 消息 ID:消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
  • 消息内容:消息内容就是键值对,形如 hash 结构的键值对。
消费组

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息ID开始消费,这个 ID 用来初始化 last_delivered_id 变量。每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者

消费者内部会有一个状态变量pending_ids,维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

基本使用命令

概述

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息
xadd 命令

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

返回值:该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

xtrim 命令

XTRIM 将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"

返回值:返回从流中删除的条目数。

xdel 命令

从指定流中移除指定的条目,并返回成功删除的条目的数量。在传递的ID不存在的情况下,返回的数量可能与传递的ID数量不同。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XDEL key ID[ID ...]
  • key:队列名称。
  • ID:消息 ID
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

返回值:删除的条目数量。

xlen 命令

返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。

与其他的Redis类型不同,零长度流是可能的,所以你应该调用TYPE或者EXISTS来检查一个key是否存在。一旦内部没有任何的条目(例如调用XDEL后),流不会被自动删除,因为可能还存在与其相关联的消费者组。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3

返回值:流中包含的条目数量

xrange 命令

xrange命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"

返回值:该命令返回ID与指定范围匹配的条目。返回的条目是完整的,这意味着ID和所有组成条目的字段都将返回。此外,返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

xread 命令

从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN等等。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XREAD[COUNT count][BLOCK milliseconds] STREAMS key[key ...]id[id ...]
  • count:数量。
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式。
  • key :队列名。
  • id:消息 ID。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

返回值:该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。 当使用BLOCK时,超时时将返回一个空回复(nil)。

xgroup 命令

使用 XGROUP CREATE 创建消费者组,语法格式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XGROUP CREATE mystream consumer-group-name $
xreadgroup 命令

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。
xack 命令

XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。当一条消息交付到某个消费者时,它将被存储在PEL中等待处理,这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。

一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理,且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
XACK key group ID[ID ...]

返回值:该命令返回成功确认的消息数。某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认),而且XACK不会把他们算到成功确认的数量中。

​​​​​我正在参与2023腾讯技术创作特训营第二期有奖征文,瓜分万元奖池和键盘手表

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
⑨【Stream】Redis流是什么?怎么用?: Stream [使用手册]
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
.29.
2023/11/26
4420
⑨【Stream】Redis流是什么?怎么用?: Stream [使用手册]
Redis消息队列 | Stream
在RedisV5.0之前, 如果想实现队列功能, 只能用list或者pub/sub实现, 但它们都有自己的缺点.
一个架构师
2022/06/27
1.4K0
Redis命令详解:Streams
Redis5.0迎来了一种新的数据结构Streams,没有了解过的同学可以先阅读前文,今天来介绍一下Streams相关的命令。
Jackeyzhe
2020/03/11
2.3K0
【Redis】四大特殊的数据类型之 Stream
我们都知道 Redis 提供了丰富的数据类型,特殊的有四种:BitMap、HyperLogLog、Geospatial、Stream。
sidiot
2023/08/31
6560
【Redis】四大特殊的数据类型之 Stream
Redis学习(二)
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Vincent-yuan
2021/05/11
7350
Redis学习(二)
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。
勇哥java实战
2024/04/15
4500
Redis(8)——发布/订阅与Stream
发布/ 订阅系统 是 Web 系统中比较常用的一个功能。简单点说就是 发布者发布消息,订阅者接受消息,这有点类似于我们的报纸/ 杂志社之类的: (借用前边的一张图)
我没有三颗心脏
2020/03/20
1.4K0
Redis(8)——发布/订阅与Stream
Redis数据结构:Stream类型全面解析
Redis Stream 是 Redis 5.0 版本引入的一种新的数据类型,它是一个持久化的、可查询的、可扩展的消息队列服务。
栗筝i
2023/10/16
9160
redis灵魂拷问:如何使用stream实现消息队列
redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的。PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了。
jinjunzhu
2021/01/05
3.2K0
redis灵魂拷问:如何使用stream实现消息队列
深入剖析 Redis5.0 全新数据结构 Streams(消息队列的新选择)
Redis 5.0 全新的数据类型:streams,官方把它定义为:以更抽象的方式建模日志的数据结构。Redis的streams主要是一个append only的数据结构,至少在概念上它是一种在内存中表示的抽象数据类型,只不过它们实现了更强大的操作,以克服日志文件本身的限制。
芋道源码
2019/10/29
2.1K0
Redis5新特性Streams作消息队列
本文所使用 Redis 版本为 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不同。
ytao
2020/06/04
6700
Redis5新特性Streams作消息队列
求不更学不动之Redis5.0新特性Stream尝鲜
Redis5.0最近被作者突然放出来了,增加了很多新的特色功能。而Redis5.0最大的新特性就是多出了一个数据结构Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者坦言Redis Stream狠狠地借鉴了Kafka的设计。
老钱
2018/08/14
6480
[译] Redis Streams介绍
我大学的时候英语6级没过,因此但凡懂点英语的同学,如果你进到此页面,尽量去阅读原文,链接在下方原文地址.最次也要对照着原文阅读,以免我出了什么差错(这是不可避免的),坑了别的小伙伴.
呼延十
2019/07/01
2.1K0
【redis】 属于redis的 “消息队列”:redis stream(浅析)
这以前只知道redis有类似于消息队列的发布/订阅,还真不知道它居然悄咪咪的有“消息队列”呀哈。
看、未来
2021/12/27
1.3K0
【redis】 属于redis的 “消息队列”:redis stream(浅析)
Redis进阶-Stream多播的可持久化的消息队列
PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。
小小工匠
2021/08/17
2.9K0
Redis基础教程(十六):Redis Stream
在现代分布式系统中,消息队列和事件驱动架构变得越来越重要,它们在异步处理、解耦服务组件、实现事件驱动的微服务等方面发挥着关键作用。Redis,作为一款多功能的开源数据结构存储系统,自4.0版本开始引入了Stream数据结构,为构建高效的消息队列和事件驱动系统提供了新的可能。本文将深入解析Redis Stream的特性、操作命令,并通过具体案例展示其在实际场景中的应用。
用户11147438
2024/07/12
6000
Redis Stream 实践
stream 是一个日志形式的存储结构,可以往里追加数据,每条数据都会生成一个时间戳ID,stream 也有便捷的读取数据的模型。
dys
2018/08/01
1.3K0
工具系列 | Redis Stream 类型的消息队列
Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。
Tinywan
2021/02/25
1.5K0
工具系列 | Redis Stream 类型的消息队列
Redis中的Stream数据类型作为消息队列的尝试
Redis的List数据类型作为消息队列,已经比较合适了,但存在一些不足,比如只能独立消费,订阅发布又无法支持数据的持久化,相对前两者,Redis Stream作为消息队列的使用更为有优势。
Java_老男孩
2019/12/02
1.4K0
别再用 Redis List 实现消息队列了,Stream 专为队列而生
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
PHP开发工程师
2022/03/04
1.2K0
别再用 Redis List 实现消息队列了,Stream 专为队列而生
相关推荐
⑨【Stream】Redis流是什么?怎么用?: Stream [使用手册]
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验