前段时间,我在内存中实现了一个简单异步通知框架。但由于没有持久化功能,应用重启就会导致数据丢失,且不支持分布式和集群。今天这篇笔记,引入了 Redis 来解决这些问题,以下是几点理由:
基于 Redis 实现延时队列也有几种方法,展开详细讲讲。
Redis 2.8.0 版本以后就具有了 键事件通知(注,还有个键空间通知,注意区别),基于 Pub/Sub 发布订阅实现,详见 官网。而我们正好可以利用这个特性,实现异步通知的延迟功能,数据流转如下:
异步结果通知实现——基于Redis实现,我这操作很可以
大概逻辑:当首次通知、或通知失败时,设置(重新设置)在 Redis 对应的 Key 的过期时间,Redis 会监听过期事件,发生事件时通知订阅者,订阅者接收到事件,做逻辑处理。下面看具体的实现。
首先,修改 Redis 端配置打开功能。由于该功能会消耗一些 CPU 性能,所以在配置文件中是 默认关闭 的。Ex表示打开 键过期事件通知,每当有过期键被删除时发送,订阅者能收到 接收到被执行事件的键的名字
notify-keyspace-events Ex
其次,想要在 SpringBoot 中,订阅到 Redis 的事件,也需要两个步骤: 1、继承 org.springframework.data.redis.listener.adapter.MessageListenerAdapter 类,创建自己的监听器
@Component
public class OrderExpireEventListener extends MessageListenerAdapter {
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
String msg = redisWrapper.getRedisTemplate().getStringSerializer().deserialize(body);
// do something...
// 假如通知失败,需要重新计算下次通知时间,设置 Redis
// 至于数据类型,String 即可
}
}
2、将创建的监听器,注册(委托设计模式)给 RedisMessageListenerContainer
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory,
OrderExpireEventListener adapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(adapter, new PatternTopic("__keyevent@0__:expired"));
return container;
}
这里有个点需要注意下,那就是 Redis 的键设计。
代码中的 keyevent@0:expired 频道匹配意味着,编号为 0 的库中所有键过期时间都会被订阅到。而这个 Redis 可能不单单只有这个业务在使用,有可能存在其他的业务也在使用。总不可能来个任意的键都会需要去做过期处理。最好是有个通用的设计规则,对 Key 的含义分割。比如:产品固定前缀:业务:业务属性:业务唯一标识
app1:trans:notice:1615283234
代表:系统名为 app1 的 在交易模块 的 订单号为 1615283234 的通知业务的消息。当监听器解析 Key 失败时则说明是其他的键过期,不做处理。一旦解析成功,则对消息进行路由分发。
键搞定了,值就看业务情况而定。如果是通知的话,必须带上当前是第几次通知,根据这个再加上策略才能算出下次通知时间(该键的过期时间)。
一般简单的方法都存在多少的缺陷,这种方式也不例外。引用 Redis 官网的一段话:
Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost
意思是说:Redis 目前发布订阅基于 发送即忘 策略,且没有 ACK 机制,意味着客户端重启掉线期间,消息会丢失。加上 Pub/Sub 消息 没有持久化机制,假如当订阅客户端由于网络原因没收到,想再次重试,这是没法实现的。
假如此时我还想跟内存队列那样子能够 对消息的延迟时间进行自动排序,该如何实现呢?除此之外,Pub/Sub 是广播机制,假如存在多个订阅者,那么就会 同时收到键过期的消息,此时又该如何处理 消息竞争 问题?
这时候我们要引入 Redis 的 Sorted Set 数据结构。关于这个数据结构简单来说是 支持排序的 Set,靠的是与之关联的浮点值,称为 score 来实现的。值得注意的是,这个排序并不是放进去的时候排,是拿出来的时候(联想到 性能 问题,后面有讲)。这里引用一段官网的话:
Moreover, elements in a sorted sets are taken in order (so they are not ordered on request, order is a peculiarity of the data structure used to represent sorted sets).
所以我们只需要将消息延迟执行的时间戳作为分数值,就能解决上文所说的排序问题,当然由于该结构是 Redis 的基本功能,自然也支持持久化,也就是解决了消息丢失问题。
大概设计如下:
异步结果通知实现——基于Redis实现,我这操作很可以
首先看看,消费者线程该如何实现(SpringBoot 环境下)
@Slf4j
@Component
public class ConsumerTask {
@Autowired
RedisTemplate<String, Object> redisTemplate;
// Sorted Set 队列键
private static String KEY = "TEST:ZSET";
@Scheduled(cron = "0/1 * * * * ?")
public void run() {
try {
this.doRun();
} catch (Exception e) {
log.error("消费异常", e);
}
}
private void doRun() {
// zrange 分数从小到大 zrevrange 分数从大到小
// 拿出最新的待处理消息
Set<ZSetOperations.TypedTuple<Object>> tuples =
redisTemplate.opsForZSet().rangeWithScores(KEY, 0, 0);
if (CollectionUtils.isEmpty(tuples)) {
log.info("队列无数据");
return ;
}
ZSetOperations.TypedTuple<Object> typedTuple = tuples.iterator().next();
if (typedTuple == null) {
log.info("队列无数据");
return ;
}
Double score = typedTuple.getScore();
Object value = typedTuple.getValue();
if (System.currentTimeMillis() < score) {
log.info("未到执行时间...");
return ;
}
Long zrem = redisTemplate.opsForZSet().remove(KEY, value);
if (Long.compare(1L, zrem) == 0) {
log.info("删除数据成功,开始处理,数据:{}", value.toString());
// do someting...
// 假如通知失败,需要重新计算通知时间(score 值)并在 Redis 设置(ZADD)该消息
}
else {
log.info("被其他的消费端抢占,不处理...");
}
}
}
跟之前的 推模式 相比,这次采用的是 拉模式,尽管在多个消费端可能同时拿到同一个消息,不过这里通过 Long zrem = redisTemplate.opsForZSet().remove(KEY, value) 这方法,利用了 rem 命令的原子性 解决了竞争问题,也就是说只会有一个客户端删除成功。
仔细观察的话,可以看到我们拿到的时间戳是 Long 类型的,但是 Spring 提供的 Sorted Set 操作 api 参数是 Double 类型
org.springframework.data.redis.core.ZSetOperations#add(K, V, double)
org.springframework.data.redis.core.ZSetOperations#rangeByScore(K, double, double)
那会不会有精确丢失问题?所以输出看下最大最小值
System.out.println(Long.MAX_VALUE); // 2 的 64 次方-1,19 个数位
System.out.println(Long.MIN_VALUE); // 负的 2 的 64 次方
System.out.println(Double.MAX_VALUE); // 2 的 1024 次方 -1,308 个数位
System.out.println(Double.MIN_VALUE); // 2 的 -1074次方
可以看到 Double 最大值远远大于 Long 类型,加上时间戳不会有负数,所以可以放心转换。
在这里不演示生产者代码,过于简单,就是调用 zadd 命令而已。这里也需要注意,如果是异步通知场景 zadd 的值必须带上这是第几次通知,就如前面的方案一样。
到此为止,第一种方案存在的问题在第二种方案全部解决了。下面看一种网上的比较多的实现方式。
跟上一种相比多了一个 List 数据结构。先来看下加入 List 之后的整个设计图
异步结果通知实现——基于Redis实现,我这操作很可以
不得不说刚开始看见这种方案时,是存在疑惑的。因为上面的 Sorted Set 已经实现了功能,为什么要引入 List 数据结构增加系统的复杂度?唯一能看到的好处就是 List 数据结构提供了 阻塞 操作?经过与同事讨论后,得出下面几点结论:
这里需要注意的一点是,搬运操作有多个命令一起完成,如下伪代码:
// 1、从 Sorted Set 中拿出 score 值在 前五秒 到 目前(包含现在)的所有元素
Date now = new Date();
Date fiveSecondBefore = DateUtils.addSeconds(now, -5);
Set<Object> objects = redisTemplate.opsForZSet()
.rangeByScore("Sorted Set:Key", fiveSecondBefore.getTime(), now.getTime());
if (CollectionUtils.isEmpty(objects)) {
return ;
}
// 2、将这些元素从 Sorted Set 中删除
Long removeResult = redisTemplate.opsForZSet().remove("Sorted Set:Key", objects);
if (Long.compare(removeResult, objects.size()) != 0) {
return ;
}
// 3、将这些元素放进 List
Long result = redisTemplate.opsForList().leftPushAll("List:Key", objects);
rangeByScore、remove、leftPushAll 这几个操作不具有原子性,可能在中途发生异常、宕机等情况,导致在搬运过程中丢失或重复搬运。 好在 Redis 提供了执行 lua 脚本功能,会保证同一脚本以原子性(atomic) 的方式执行,所以我们只需要原子性操作的多个步骤整合在自定义 lua 脚本中即可,如下:
local list_key = KEYS[1];
local sorted_set_key = KEYS[2];
local now = ARGV[1];
local sorted_set_size = redis.call('ZCARD', sorted_set_key)
if (tonumber(sorted_set_size) <= 0) then
return
end
local members = redis.call('ZRANGEBYSCORE', sorted_set_key, 0, tonumber(now));
if (next(members) == nil) then
return
end
for key,value in ipairs(members)
do
local zscore = redis.call('ZSCORE',sorted_set_key,value);
if (tonumber(now) < tonumber(zscore)) then
return zscore;
end
redis.call('ZREM', sorted_set_key, value);
redis.call('RPUSH', list_key, value);
end
local topmember = redis.call('ZRANGE', sorted_set_key, 0, 0);
local nextvalue = next(topmember);
if (nextvalue == nil) then
return
end
for k,v in ipairs(topmember)
do
return redis.call('ZSCORE', sorted_set_key, v);
end
下面是 SpringBoot 定时调用该 lua 脚本进行搬运的示例代码:
@Scheduled(cron = "0/1 * * * * ?")
public void run4() {
ClassPathResource resource = new ClassPathResource("sorted_set_to_list.lua");
String luaScript = FileUtils.readFileToString(resource.getFile());
DefaultRedisScript<String> redisScript = new DefaultRedisScript<>(luaScript, String.class);
//
List<String> keys = Lists.newArrayList("TEST:LIST", "TEST:ZSET");
String now = String.valueOf(System.currentTimeMillis());
// 注意这里的序列化器,需要换成 StringSerializer
// 替换的默认的 Jackson2JsonRedisSerializer
String executeResult = redisTemplate.execute(redisScript, redisTemplate.getStringSerializer(),
redisTemplate.getStringSerializer(), keys, now);
log.info("lua 脚本执行结果:{}", executeResult);
}
最后再来看看消费者该如何实现
@Component
@Slf4j
public class ListConsumer implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
Executors.newSingleThreadExecutor().submit(new PopEventRunner());
}
private static class PopEventRunner implements Runnable {
@Override
public void run() {
RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) SpringUtil.getBean3("redisTemplate");
while (true) {
try {
Object leftPop = redisTemplate.opsForList().leftPop("TEST:LIST", Integer.MAX_VALUE, TimeUnit.SECONDS);
if (leftPop == null) {
continue ;
}
// do something...
// 当通知失败时,重新计算通知时间并设置(ZADD)Redis
} catch (Exception e) {
log.error("监听异常", e);
sleep(5); // 发生异常睡五秒
}
}
}
}
}
监听容器的刷新事件,创建监听单线程,无限循环阻塞监听队列。相对于前一种实现方案,该方案确实更加的贴合。但仍有优化的余地,比如:
相对前一篇内存实现,Redis 这种方式更加的可靠,且在允许一点时间的误差和牺牲一点消息可靠性下,不失为一种 性价比高 的选择。假如当前景就是不允许有这些损失,那还有什么解决方案吗?到时候我们再来讲终极杀招,使用 RabbitMQ 来实现。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有