前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >怎么用 Redis 快速实现一个延迟队列?

怎么用 Redis 快速实现一个延迟队列?

作者头像
用户1278550
发布于 2020-11-02 06:24:53
发布于 2020-11-02 06:24:53
75700
代码可运行
举报
文章被收录于专栏:idbaidba
运行总次数:0
代码可运行

在后端服务中,经常有这样一种场景,写数据库操作在异步队列中执行,且这个异步队列是多进程运行的,这时如果对同一资源进行写库操作,很有可能产生数据被覆盖等问题,于是就需要业务层在更新数据库之前进行加锁,这样保证在更改同一资源时,没有其他更新操作干涉,保证数据一致性

但如果在更新前对数据库更新加锁,那此时又来了新的更新数据库的请求,但这个更新操作不能丢弃掉,需要延迟执行,那这就需要添加到延迟队列中,延迟执行。

那么如何实现一个延迟队列?

利用RedisSortedSetString这两种结构,就可以轻松实现。

具体实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# coding: utf8

"""Delay Queue"""

import json
import time
import uuid

import redis


class DelayQueue(object):

    """延迟队列"""

    QUEUE_KEY = 'delay_queue'
    DATA_PREFIX = 'queue_data'

    def __init__(self, conf):
        host, port, db = conf['host'], conf['port'], conf['db']
        self.client = redis.Redis(host=host, port=port, db=db)

    def push(self, data):
        """push

        :param data: data
        """
        # 唯一ID
        task_id = str(uuid.uuid4())
        data_key = '{}_{}'.format(self.DATA_PREFIX, task_id)
        # save string
        self.client.set(data_key, json.dumps(data))
        # add zset(queue_key=>data_key,ts)
        self.client.zadd(self.QUEUE_KEY, data_key, int(time.time()))
        
    def pop(self, num=5, previous=3):
        """pop多条数据

        :param num: pop多少个
        :param previous: 获取多少秒前push的数据
        """
        # 取出previous秒之前push的数据
        until_ts = int(time.time()) - previous
        task_ids = self.client.zrangebyscore(
            self.QUEUE_KEY, 0, until_ts, start=0, num=num)
        if not task_ids:
            return []

        # 利用删除的原子性,防止并发获取重复数据
        pipe = self.client.pipeline()
        for task_id in task_ids:
            pipe.zrem(self.QUEUE_KEY, task_id)
        data_keys = [
            data_key
            for data_key, flag in zip(task_ids, pipe.execute())
            if flag
        ]
        if not data_keys:
            return []
        # load data
        data = [
            json.loads(item)
            for item in self.client.mget(data_keys)
        ]
        # delete string key
        self.client.delete(*data_keys)
        return data

实现思路

push

push数据时,执行如下几步:

  • 生成一个唯一key,这里使用uuid4生成(uuid4是根据随机数生成的,重复概率非常小)
  • 把数据序列化后存入这个唯一keyString结构中
  • 把这个唯一key加到SortedSet中,score是当前时间戳

这里利用SortedSet记录添加数据的时间,便于在获取时根据时间获取之前的数据,达到延迟的效果。 而真正的数据则存放在String结构中,等获取时先拿到数据的key再获取真正的数据。

这里可能有人会疑问,为什么不把真正的数据放到SortedSetname中?

  • 把数据放入name中可能会产生瞬间写入相同数据导致数据多条变一条的情况
  • 把数据序列化放到SortedSetname中有些过大,不太符合使用习惯
pop

pop是可以获取多条数据的,上面的代码默认是获取延迟队列中3秒前的5条数据,具体思路如下:

  • 计算previous秒前的时间戳,使用SortedSetzrangebysocre方法获取previous秒之前添加的唯一key
  • 如果SortedSet中有数据,则利用Redis删除的原子性,使用zrem依次删除SortedSet的元素,如果删除成功,则使用,防止多进程并发执行此方法,拿到相同的数据
  • 那到可用的唯一key,从String中获取真正的数据即可

这里最重要的是第二步,在拿出SortedSet的数据后,一定要防止其他进程并发获取到相同的数据,所以在这里使用zrem依次删除元素,保证只有删除成功的进程才能使用这条数据。

使用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# coding: utf8

import time

from delay import DelayQueue

redis_conf = {'host': '127.0.0.1', 'port': 6379, 'db': 0}

# 构造延迟队列对象
queue = DelayQueue(redis_conf)

# push 20条数据
for i in range(20):
    item = {'user': 'user-{}'.format(i)}
    queue.push(item)
    
# 从延迟队列中马上获取10条数据
data = queue.pop(num=10)
# 刚添加的马上获取是获取不到的
assert len(data) == 0

# 休眠10秒
time.sleep(10)
# 从延迟队列中获取10条数据
data = queue.pop(num=10)
assert len(data) == 10

# 从延迟队列中获取截止到5秒之前添加的10条数据
data = queue.pop(num=10, previous=5)
assert len(data) == 10

使用就比较简单了,在实际使用过程中,每次在处理正常队列时,通过上面的方法获取一下延迟队列的数据,如果延迟队列中有数据,那么按照业务正常处理就可以了,这样就达到了数据延迟处理的效果。

来源 | http://kaito-kidd.com/2016/12/26/delay-queue-based-on-redis/

-The End-

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 yangyidba 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Redis学习笔记之延时队列
这些情况都可以使用延时队列来做,实现延时队列比较场景的有使用消息队列MQ来实现,比如RocketMQ等等,也可以使用Redis来实现,本博客主要介绍一下Redis实现延时队列
SmileNicky
2019/03/11
2.3K0
一口气说出 6种 延时队列的实现方案
五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败。所以在这能看出和大佬之间的差距,人家没白没夜的更文,比你优秀的人比你更努力,难以望其项背,真是让我自愧不如。
程序员小富
2020/05/08
2.4K0
一口气说出 6种 延时队列的实现方案
模型上线不用愁,批量推理来加油
作为一个算法工程师,在日常工作中难免会碰到模型上线的问题。对于一些要求不高的场合,简单找一个web框架实现一下接口就能搞定:对于每个用户请求,调用模型得到结果再返回。但这种朴素的实现往往无法最大化利用GPU,对于性能要求比较高的场景应付起来就略显吃力。
朴素人工智能
2021/07/08
1.4K0
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
在工作中,我们有时候会遇到这样的场景,比如下单之后超过30分钟未支付自动取消订单,还有就比如过期/生效通知等等,这些场景一般有两种方法解决: 第一种可以通过定时任务扫描符合条件的去执行,第二种就是提前通过消息队列发送延迟消息到期自动消费。 本文我要介绍的就是通过第二种方式来实现这种业务逻辑,只不过这次不是使用MQ而是直接使用的是Redission提供的RDelayedQueue延迟队列。
用户2031163
2024/03/02
1.9K0
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
Go 语言实战:构建强大的延迟任务队列
延迟队列是一种数据结构,用于处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有到达指定的时间点时才能从队列中取出并执行。
frank.
2024/01/22
8040
Go 语言实战:构建强大的延迟任务队列
Dyno-queues 分布式延迟队列 之 辅助功能
本系列我们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,通过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。
罗西的思考
2021/03/04
4600
Dyno-queues 分布式延迟队列 之 辅助功能
Redis 延迟队列实现(基于PHP)
Redis 可以利用 zset (有序列表)来实现,将消息序列化成一个字符串作为 zset的 value; 这个消息的到期处理时间作为 score,利用多个线程轮询 zset 获取到期的任务进行处理。 多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理; 因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行。
陈大剩博客
2023/03/06
9590
Redis 延迟队列实现(基于PHP)
分布式之延时任务方案解析
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别
lyb-geek
2018/07/26
6850
分布式之延时任务方案解析
实战:常见的延时队列解决方案及代码实现,真的很全:MQ、Redis、JDK队列、Netty时间轮~
DelayedQueue是一个无界阻塞队列,内部有一个优先队列,当使用put方法添加元素到DelayQueue时,会塞一个延时条件,DelayedQueue会按照延时条件排序,最先过期的排在队首,只有元素过期了,才能从队首取出数据,取出数据的方法有take和poll
AI码师
2022/12/22
2.3K0
实战:常见的延时队列解决方案及代码实现,真的很全:MQ、Redis、JDK队列、Netty时间轮~
Redis的三种延迟队列 - Java技术债务
在现代分布式系统设计中,延迟队列作为一种重要的数据结构,广泛应用于消息延迟处理、任务调度、缓存失效、订单超时处理等场景。Redis,作为一个高性能的键值对存储系统,凭借其丰富的数据结构、原子操作、发布/订阅模式以及Lua脚本支持,成为了实现延迟队列的理想选择。
Java技术债务
2024/07/10
7540
Redis 延迟队列实现(基于PHP)
Redis 可以利用 zset (有序列表)来实现,将消息序列化成一个字符串作为 zset的 value; 这个消息的到期处理时间作为 score,利用多个线程轮询 zset 获取到期的任务进行处理。 多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理; 因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行。
高久峰
2023/09/18
4630
Redis 延迟队列实现(基于PHP)
Redis 如何实现延时任务队列
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
Tinywan
2024/05/11
8240
Redis 如何实现延时任务队列
面试官:怎么不用定时任务实现关闭订单?
点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java 2021 超神之路,很肝~ 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网络应用框架 Netty 源码解析 消息中间件 RocketMQ 源码解析 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析 作业调度中间件 Elastic-Job 源码解析 分布式事务中间件 TCC-Transaction
芋道源码
2022/07/14
4070
面试官:怎么不用定时任务实现关闭订单?
Redis 实现延迟队列?深深被折服!!
地址:www.cnblogs.com/xiaowei123/p/13222710.html
架构师修炼
2020/08/27
6910
Redis 实现延迟队列?深深被折服!!
生成订单30分钟未支付,则自动取消,该怎么实现?
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别
良月柒
2021/09/03
6710
用 Redis 做一个可靠的延迟队列
点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java 2021 超神之路,很肝~ 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网络应用框架 Netty 源码解析 消息中间件 RocketMQ 源码解析 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析 作业调度中间件 Elastic-Job 源码解析 分布式事务中间件 TCC-Transaction
芋道源码
2022/07/19
4670
用 Redis 做一个可靠的延迟队列
013:Redis延时队列
我们平时习惯于使用 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加 异步消息传递功能。这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力。
李玺
2021/11/22
2.3K0
013:Redis延时队列
田哥 手把手教 你用 Redis 做延迟消息队列
看到消息队列,我们肯定会想到各种MQ,比如:RabbitMQ,acivityMQ、RocketMQ、Kafka等。
田维常
2021/04/02
4K0
田哥 手把手教 你用 Redis 做延迟消息队列
面试官:生成订单 30 分钟未支付,则自动取消,该怎么实现?
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别
路人甲Java
2023/08/29
3640
面试官:生成订单 30 分钟未支付,则自动取消,该怎么实现?
图文结合!Redis延迟队列golang高效实践
导语 | 本文主要讲述如何使用golang基于Redis实现延迟消息队列组件。希望对有需求的同学有所帮助。 一、背景 业务中经常会有这样的场景: 到期后自动执行指定操作; 查询某个任务是否完成,未完成等待一定时间再次查询; 回调通知,当回调失败时,等待后重试;等等还有其他很多类似的场景。 很多时候我们会直接通过一个本地定时器来帮我们完成这个任务。如果我们的系统是多实例分布式的,本地定时器就会面临很多问题,如:怎么保证重复处理的问题;统一管控的问题等等。面对本地定时器遇到的问题,我们可以使用分布式延迟
腾讯云开发者
2022/06/02
1.7K0
图文结合!Redis延迟队列golang高效实践
推荐阅读
相关推荐
Redis学习笔记之延时队列
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验