前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis消息队列 | Stream

Redis消息队列 | Stream

作者头像
一个架构师
发布2022-06-27 15:18:12
1.4K0
发布2022-06-27 15:18:12
举报
文章被收录于专栏:从码农的全世界路过

在RedisV5.0之前, 如果想实现队列功能, 只能用list或者pub/sub实现, 但它们都有自己的缺点.

使用list方式, 缺少ack确认, 不能做广播, 不能分组消费;

使用pub/sub方式, 消息发布后, 客户端不能立刻接收就会丢失消息;

在RedisV5.0的时候, 提供了Stream类型实现队列功能. 其中包括: 生成消息ID, 消息确认, 分组消费等功能.

Stream有一个消息链表, 将所有加入的消息都串联起来, 每个消息都有一个唯一的ID和对应的内容. 消息是持久化的, Redis重启时, 消息不会丢失.

1

XADD

向指定队列中添加信息.

代码语言:javascript
复制
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

key 队列名称, 如果不存在就创建新队列;

[MAXLEN|MINID [=|~] threshold [LIMIT count]] 队列的长度,最小ID等信息;

*|ID 消息ID,可以自定义或Redis 自动生成; 自定义生成时, 需要保证单调递增; 使用符号"*"表示由Redis生成; 按时间戳-序号规则生成, 其中时间戳是毫秒级的Redis服务器时间;

field value 消息内容, 1个或多个KV键值对;

代码语言:javascript
复制
127.0.0.1:6379> xadd mystream 1 k1 v1 k2 v2
"1-0"
127.0.0.1:6379> xadd mystream * k3 v3
"1622699291143-0"
# 批量添加
127.0.0.1:6379> multi 
OK
127.0.0.1:6379(TX)> xadd mystream * k4 v4
QUEUED
127.0.0.1:6379(TX)> xadd mystream * k5 v5
QUEUED
127.0.0.1:6379(TX)> exec
1) "1622700074168-0"
2) "1622700074168-1"

2

XRANGE 与 XREVRANGE

xrange按ID范围读取记录, 从数据 ID的小老到大的顺序读取; xrevrange则是反向读取, 按 ID从大到小的顺序读取.

代码语言:javascript
复制
xrange key start end [COUNT count]
127.0.0.1:6379> xrange mystream - + count 4
1) 1) "1-0"
   2) 1) "k1"
      2) "v1"
      3) "k2"
      4) "v2"
2) 1) "1622699291143-0"
   2) 1) "k3"
      2) "v3"
3) 1) "1622700074168-0"
   2) 1) "k4"
      2) "v4"
4) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
127.0.0.1:6379> 
127.0.0.1:6379> XREVRANGE mystream + - count 4
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
2) 1) "1622700074168-0"
   2) 1) "k4"
      2) "v4"
3) 1) "1622699291143-0"
   2) 1) "k3"
      2) "v3"
4) 1) "1-0"
   2) 1) "k1"
      2) "v1"
      3) "k2"
      4) "v2"
127.0.0.1:6379> 

3

XREAD

可以按阻塞和非阻塞两种方式读取消息信息.

代码语言:javascript
复制
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

[COUNT count] 用于限定获取的消息数量

[BLOCK milliseconds] 阻塞模式下的超时时间, 默认为非阻塞模式;

ID 指定信息ID开始读取; 0表示从第一条消息开始; 阻塞模式下, 表示最新的消息ID; 在非阻塞模式下无意义.

代码语言:javascript
复制
127.0.0.1:6379> xread count 1 block 100 streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"
127.0.0.1:6379> xread count 1 block 100 streams mystream $
(nil)

4

XLEN

队列数据个数

代码语言:javascript
复制
127.0.0.1:6379> xlen mystream
(integer) 4

5

XTRIM

限制队列长度;

~ 表示可以不十分精确的限制队列N条, 可以多出一些, 但是不能少. 可以减轻 Redis服务准确计算队列长度的压力;

代码语言:javascript
复制
127.0.0.1:6379> XTRIM mystream MAXLEN ~ 1000

6

XDEL

删除指定ID消息

代码语言:javascript
复制
127.0.0.1:6379> xadd mystream * k6 v6
"1622702539939-0"
127.0.0.1:6379> xrange mystream 1622700074168-1 +
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
2) 1) "1622702539939-0"
   2) 1) "k6"
      2) "v6"
127.0.0.1:6379> xdel mystream 1622702539939-0
(integer) 1
127.0.0.1:6379> xrange mystream 1622700074168-1 +
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
127.0.0.1:6379>

7

XGROUP

队列消费分组的创建和删除, 以及消费者的创建和销毁.

Stream可以挂多个消费组, 各消费组之间互相独立,不受影响, 每个消费组都会有个游标last_delivered_id标识当前消费组的消费位置.

同一消费组中, 可以有多个消费者, 协同消费消息, 每读取一条消息, last_delivered_id都会下移一位, 同时会记录在PEL(pending_ids)中, 直到消息被ACK确认后, 才会从PEL中移除.

代码语言:javascript
复制
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupnanme]

# 创建消费分组

代码语言:javascript
复制
127.0.0.1:6379> XGROUP create mystream mygroup 0
OK
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

# 设置开始处理消息ID

代码语言:javascript
复制
127.0.0.1:6379> XGROUP setid mystream mygroup 1622699291143-0
OK
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1622699291143-0"
127.0.0.1:6379>

删除消费分组

代码语言:javascript
复制
127.0.0.1:6379> xgroup destroy mystream mygroup 
(integer) 1
127.0.0.1:6379> xinfo groups mystream
(empty array)
127.0.0.1:6379>

创建指定分组中消费者

代码语言:javascript
复制
127.0.0.1:6379> XGROUP CREATECONSUMER mystream mygroup myconsumer
(integer) 1
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"
   2) "myconsumer"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 41127
2) 1) "name"
   2) "myconsumer1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 1974

删除指定分组中消费者

代码语言:javascript
复制
127.0.0.1:6379> XGROUP delconsumer mystream mygroup myconsumer1
(integer) 0
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"
   2) "myconsumer"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 110278
127.0.0.1:6379>

8

XREADGROUP

指定消费组内的消费者读取消息

代码语言:javascript
复制
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

[COUNT count] 每次获取消息的数量;

[BLOCK milliseconds] 阻塞模式和超时时间;

[NOACK] 不需要确认消息, 适用于不怎么重要的可以丢失的消息;

key [key ...] 指定Stream key;

ID [ID ...] 指定的消息 ID; > 指定读取所有未消费的消息, 其他值指定被挂起的消息;

查看当前Stream 消费组信息, 确认最后消费位置

代码语言:javascript
复制
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1622699291143-0"

查看Stream 数据

代码语言:javascript
复制
127.0.0.1:6379> xrange mystream - +
1) 1) "1-0"
   2) 1) "k1"
      2) "v1"
      3) "k2"
      4) "v2"
2) 1) "1622699291143-0"
   2) 1) "k3"
      2) "v3"
3) 1) "1622700074168-0"
   2) 1) "k4"
      2) "v4"
4) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"

消费Stream队列中下一条消息

代码语言:javascript
复制
127.0.0.1:6379> XREADGROUP group mygroup myconsumer count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1622700074168-0"
         2) 1) "k4"
            2) "v4"
127.0.0.1:6379>

9

XPENDING

查看未进行确认的数据

代码语言:javascript
复制
127.0.0.1:6379> xpending mystream mygroup - +  10
1) 1) "1622700074168-0"
   2) "myconsumer"
   3) (integer) 25605849
   4) (integer) 1
127.0.0.1:6379>

10

XACK

确认当前消费分组处理完毕该消息

代码语言:javascript
复制
127.0.0.1:6379> xack mystream mygroup 1622700074168-0
(integer) 1
127.0.0.1:6379> xpending mystream mygroup - +  10
(empty array)
127.0.0.1:6379>

11

XCLAIM

可能因为服务异常等原因, 消息长时间未能得到消费或者确认, 可以通过 xclaim 命令读取指定 ID 或者空闲时间的数据, 重新消费数据, 并再次确认(xack)消息;

代码语言:javascript
复制
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

ID [ID ...] 消息的 ID;

[IDLE ms] 设置消息的空闲时间, 如果不提供, 默认为 0;

[TIME ms-unix-time] 和IDLE相同, unix 时间戳;

RETRYCOUNT 设置重试次数, 通常 xclaim 不会改变这个值, 它通常用于 xpending 命令, 用来发现一些长时间未被处理的消息;

FORCE 在 PEL 中创建待处理消息, 即使指定的 ID 尚未分配给客户端的PEL;

JUSTID 只返回认领的消息 ID 数组, 不返回实际消息;

代码语言:javascript
复制
127.0.0.1:6379> XREADGROUP group mygroup myconsumer count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1622700074168-1"
         2) 1) "k5"
            2) "v5"
127.0.0.1:6379> xpending mystream mygroup - +  10
1) 1) "1622700074168-1"
   2) "myconsumer"
   3) (integer) 3256
   4) (integer) 1
127.0.0.1:6379> XCLAIM mystream  mygroup myconsumer 10 1622700074168-1 
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"

12

XINFO

Stream也提供了查看队列元数据的命令

代码语言:javascript
复制
127.0.0.1:6379> xinfo help
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

查看队列基本信息, 包括消费分组, 以及对应的最后消费数据 ID 信息

代码语言:javascript
复制
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

查看指定消费分组的消费者

代码语言:javascript
复制
127.0.0.1:6379> xinfo consumers mystream mygroup1
1) 1) "name"
   2) "myconsumer1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 19009

查看队列元信息

代码语言:javascript
复制
127.0.0.1:6379> xinfo stream mystream
 1) "length"
 2) (integer) 1
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1622603679677-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1622603679677-0"
    2) 1) "k1"
       2) "v1"
13) "last-entry"
14) 1) "1622603679677-0"
    2) 1) "k1"
       2) "v1"

总结

以上, 就是 RedisV5.0版本的Stream的基本功能了.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从码农的全世界路过 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档