前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Redis 发布订阅,小功能大用处,真没那么废材!

Redis 发布订阅,小功能大用处,真没那么废材!

作者头像
andyxh
发布于 2020-09-24 07:13:51
发布于 2020-09-24 07:13:51
61600
代码可运行
举报
文章被收录于专栏:程序通事程序通事
运行总次数:0
代码可运行

今天小黑哥来跟大家介绍一下 Redis 发布/订阅功能。

也许有的小伙伴对这个功能比较陌生,不太清楚这个功能是干什么的,没关系小黑哥先来举个例子。

假设我们有这么一个业务场景,在网站下单支付以后,需要通知库存服务进行发货处理。

上面业务实现不难,我们只要让库存服务提供给相关的给口,下单支付之后只要调用库存服务即可。

后面如果又有新的业务,比如说积分服务,他需要获取下单支付的结果,然后增加用户的积分。

这个实现也不难,让积分服务同样提供一个接口,下单支付之后只要调用库存服务即可。

如果就两个业务需要获取下单支付的结果,那也还好,程序改造也快。可是随着业务不断的发展,越来越多的新业务说是要下单支付的结果。

这时我们会发现上面这样的系统架构存在很多问题:

第一,下单支付业务与其他业务重度耦合,每当有个新业务需要支付结果,就需要改动下单支付的业务。

第二,如果调用业务过多,会导致下单支付接口响应时间变长。另外,如果有任一下游接口响应变慢,就会同步导致下单支付接口响应也变长。

第三,如果任一下游接口失败,可能导致数据不一致的情况。比如说下图,先调用 A,成功之后再调用 B,最后再调用 C。

如果在调用 B 接口的发生异常,此时可能就导致下单支付接口返回失败,但是此时 A 接口其实已经调用成功,这就代表它内部已经处理下单支付成功的结果。

这样就会导致 A,B,C 三个下游接口,A 获取成功获取支付结果,但是 B,C 没有拿到,导致三者系统数据不一致的情况。

其实我们仔细想一下,对于下单支付业务来讲,它其实不需要关心下游调用结果,只要有某种机制通知能通知到他们就可以了。

讲到这里,这就需要引入今天需要介绍发布订阅机制。

Redis 发布与订阅

Redis 提供了基于「发布/订阅」模式的消息机制,在这种模式下,消息发布者与订阅者不需要进行直接通信。

如上图所示,消息发布者只需要想指定的频道发布消息,订阅该频道的每个客户端都可以接受到到这个消息。

使用 Redis 发布订阅这种机制,对于上面业务,下单支付业务只需要向支付结果这个频道发送消息,其他下游业务订阅支付结果这个频道,就能收相应消息,然后做出业务处理即可。

这样就可以解耦系统上下游之间调用关系。

接下来我们来看下,我们来看下如何使用 Redis 发布订阅功能。

Redis 中提供了一组命令,可以用于发布消息,订阅频道,取消订阅以及按照模式订阅。

首先我们来看下如何发布一条消息,其实很简单只要使用 publish 指令:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
publish channel message

上图中,我们使用 publish 指令向 pay_result 这个频道发送了一条消息。我们可以看到 redis 向我们返回 0 ,这其实代表当前订阅者个数,由于此时没有订阅,所以返回结果为 0 。

接下来我们使用 subscribe 订阅一个或多个频道

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
subscribe channel [channel ...]

如上图所示,我们订阅 pay_result 这个频道,当有其他客户端往这个频道发送消息,

当前订阅者就会收到消息。

我们子在使用订阅命令,需要主要几点:

第一,客户端执行订阅指令之后,就会进入订阅状态,之后就只能接收 subscribepsubscribeunsubscribepunsubscribe 这四个命令。

第二,新订阅的客户端,是无法收到这个频道之前的消息,这是因为 Redis 并不会对发布的消息持久化的。

相比于很多专业 MQ,比如 kafka、rocketmq 来说, redis 发布订阅功能就显得有点简陋了。不过 redis 发布订阅功能胜在简单,如果当前场景可以容忍这些缺点,还是可以选择使用的。

除了上面的功能以外的,Redis 还支持模式匹配的订阅方式。简单来说,客户端可以订阅一个带 * 号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给消息给这些频道时,订阅这个模式的客户端也将会到收到消息。

使用 Redis 订阅模式,我们需要使用一个新的指令 psubscribe

我们执行下面这个指令:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
psubscribe pay.*

那么一旦有其他客户端往 pay 开头的频道,比如 pay_resultpay_xxx,我们都可以收到消息。

如果需要取消订阅模式,我们需要使用相应punsubscribe 指令,比如取消上面订阅的模式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
punsubscribe pay.*

Redis 客户端发布订阅使用方式

基于 Jedis 开发发布/订阅

聊完 Redis 发布订阅指令,我们来看下 Java Redis 客户端如何使用发布订阅。

下面的例子主要基于 Jedis,maven 版本为: <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> 其他 Redis 客户端大同小异。

jedis 发布代码比较简单,只需要调用 Jedis 类的 publish 方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 生产环境千万不要这么使用哦,推荐使用 JedisPool 线程池的方式 
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxxxx");
jedis.publish("pay_result", "hello world");

订阅的代码就相对复杂了,我们需要继承 JedisPubSub 实现里面的相关方法,一旦有其他客户端往订阅的频道上发送消息,将会调用 JedisPubSub 相应的方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("收到订阅频道:" + channel + " 消息:" + message);

    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("收到具体订阅频道:" + channel + "订阅模式:" + pattern + " 消息:" + message);
    }

}

其次我们需要调用 Jedis 类的 subscribe 方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");

当有其他客户端往 pay_result频道发送消息时,订阅将会收到消息。

不过需要注意的是,jedis#subscribe 是一个阻塞方法,调用之后将会阻塞主线程的,所以如果需要在正式项目使用需要使用异步线程运行,这里就不演示具体的代码了。

基于 Spring-Data-Redis 开发发布订阅

原生 jedis 发布订阅操作,相对来说还是有点复杂。现在我们很多应用已经基于 SpringBoot 开发,使用 spring-boot-starter-data-redis ,可以简化发布订阅开发。

首先我们需要引入相应的 startter 依赖:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>lettuce-core</artifactId>
            <groupId>io.lettuce</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

这里我们使用 Jedis 当做底层连接客户端,所以需要排除 lettuce,然后引入 Jedis 依赖。

然后我们需要创建一个消息接收类,里面需要有方法消费消息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class Receiver {
    private AtomicInteger counter = new AtomicInteger();

    public void receiveMessage(String message) {
        log.info("Received <" + message + ">");
        counter.incrementAndGet();
    }

    public int getCount() {
        return counter.get();
    }
}

接着我们只需要注入 Spring- Redis 相关 Bean,比如:

  • StringRedisTemplate,用来操作 Redis 命令
  • MessageListenerAdapter ,消息监听器,可以在这个类注入我们上面创建消息接受类 Receiver
  • RedisConnectionFactory, 创建 Redis 底层连接
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class MessageConfiguration {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅指定频道使用 ChannelTopic
        // 订阅模式使用 PatternTopic
        container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        // 注入 Receiver,指定类中的接受方法
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

}

最后我们使用 StringRedisTemplate#convertAndSend 发送消息,同时 Receiver 将会收到一条消息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@SpringBootApplication
public class MessagingRedisApplication {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        Receiver receiver = ctx.getBean(Receiver.class);

        while (receiver.getCount() == 0) {
            template.convertAndSend("pay_result", "Hello from Redis!");
            Thread.sleep(500L);
        }

        System.exit(0);
    }
}

Redis 发布订阅实际应用

Redis Sentinel 节点发现

Redis Sentinel 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。

今天这里我们不详细解释 Redis Sentinel 详细原理,主要来看下 Redis Sentinel 如何使用发布订阅机制。

Redis Sentinel 节点主要使用发布订阅机制,实现新节点的发现,以及交换主节点的之间的状态。

如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello 频道发送消息,并且每个 Sentinel 都会订阅这个节点。

这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息。

这样一旦有的新节点加入,它往这个频道发送消息,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。

除此之外,每次往这个频道发送消息内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。

以上都是对于 Redis 服务端来讲,对于客户端来讲,我们也可以用到发布订阅机制。

Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外提供。

对于我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接受操作指令),

客户端可以订阅 +switch-master频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的消息。

redission 分布式锁

redission 开源框架提供一些便捷操作 Redis 的方法,其中比较出名的 redission 基于 Redis 的实现分布式锁。

今天我们来看下 Redis 的实现分布式锁中如何使用 Redis 发布订阅机制,提高加锁的性能。

PS:redission 分布式锁实现原理,可以参考之前写过的文章:

  1. 可重入分布式锁的实现方式
  2. Redis 分布式锁,看似简单,其实真不简单

首先我们来看下 redission 加锁的方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();

RLock 继承自 Java 标准的 Lock 接口,调用 lock 方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被阻塞,直到其他客户端释放这把锁。

这里其实有个问题,当前阻塞的线程如何感知分布式锁已被释放呢?

这里其实有两种实现方法:

第一钟,定时查询分布时锁的状态,一旦查到锁已被释放(Redis 中不存在这个键值),那么就去加锁。

实现伪码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
while (true) {
  boolean result=lock();
  if (!result) {
    Thread.sleep(N);
  }
}

这种方式实现起来起来简单,不过缺点也比较多。

如果定时任务时间过短,将会导致查询次数过多,其实这些都是无效查询。

如果定时任务休眠时间过长,那又会导致加锁时间过长,导致加锁性能不好。

那么第二种实现方案,就是采用服务通知的机制,当分布式锁被释放之后,客户端可以收到锁释放的消息,然后第一时间再去加锁。

这个服务通知的机制我们可以使用 Redis 发布订阅模式。

当线程加锁失败之后,线程将会订阅 redisson_lock__channel_xxx(xx 代表锁的名称) 频道,使用异步线程监听消息,然后利用 Java 中 Semaphore 使当前线程进入阻塞。

一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx 发送解锁消息。

等异步线程收到消息,将会调用 Semaphore 释放信号量,从而让当前被阻塞的线程唤醒去加锁。

ps:这里只是简单描述了 redission 加锁部分原理,出于篇幅,这里就不再消息解析源码。 感兴趣的小伙伴可以自己看下 redission 加锁的源码。

通过发布订阅机制,被阻塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。

ps: 这种方式,性能确实提高,但是实现起来的复杂度也很高,这部分源码有点东西,快看晕了。

总结

今天我们主要介绍 Redis 发布订阅功能,主要对应的 Redis 命令为:

  • subscribe channel [channel ...] 订阅一个或多个频道
  • unsubscribe channel 退订指定频道
  • publish channel message 发送消息
  • psubscribe pattern 订阅指定模式
  • punsubscribe pattern 退订指定模式

我们可以利用 Redis 发布订阅功能,实现的简单 MQ 功能,实现上下游的解耦。

不过需要注意了,由于 Redis 发布的消息不会被持久化,这就会导致新订阅的客户端将不会收到历史消息。

所以,如果当前的业务场景不能容忍这些缺点,那还是用专业 MQ 吧。

最后介绍了两个使用 Redis 发布订阅功能使用场景供大家参考。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-09-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
MySQL索引原理——B树
1、MyISAM是MySQL 5.5之前版本默认的存储引擎,从5.5之后,InnoDB开始成为MySQL默认的存储引擎。MyISAM和InnoDB都是使用B+树实现主键索引、唯一索引和非主键索引。
saintyyu
2021/11/22
6910
MySQL索引原理——B树
mysql之索引的工作机制
当db的量达到一定数量级之后,每次进行全表扫描效率就会很低,因此一个常见的方案是建立一些必要的索引作为优化手段,那么问题就来了:
一灰灰blog
2018/03/26
1.5K0
mysql之索引的工作机制
常见公司MySQL面试题全集
事务:事务是访问和更新数据库的程序执行的一个逻辑单元;事务中可能包含一个或多个sql语句,这些语句要么都执行,要么都不执行。作为一个关系型数据库,MySQL支持事务。
用户1685462
2021/07/16
4180
SQL,何必在忆之一(索引与执行计划篇)
B树(英语:B-tree)是一种自平衡的树,能够保持数据有序。这种数据结构能够让查找数据、顺序访问、插入数据及删除的动作,都在对数时间内完成。B树,概括来说是一个一般化的二叉查找树(binary search tree)一个节点可以拥有2个以上的子节点。与自平衡二叉查找树不同,B树适用于读写相对大的数据块的存储系统,例如磁盘。B树减少定位记录时所经历的中间过程,从而加快存取速度。B树这种数据结构可以用来描述外部存储。这种数据结构常被应用在数据库和文件系统的实现上。
PayneWu
2021/01/25
4630
数据库:MySQL相关知识整理,值得收藏!
选择:MyISAM相对简单,所以在效率上要优于InnoDB。如果系统插入和查询操作多,不需要事务和外键,选择MyISAM,如果需要频繁的更新、删除操作,或者需要事务、外键、行级锁的时候,选择InnoDB。
小明互联网技术分享社区
2022/02/17
5050
数据库:MySQL相关知识整理,值得收藏!
解决程序慢,要学会预测表容积,不能一味地加索引
索引是应用程序设计和开发的一个重要方面。如果索引过多,应用程序中的更新、删除等操作会变慢,性能会受到影响;如果索引过少,对查询性能又会产生影响。
CSDN技术头条
2018/07/30
1.1K0
解决程序慢,要学会预测表容积,不能一味地加索引
彻底搞懂MySQL的索引
MyISAM和InnoDB是MySQL最常用的两个存储引擎,本文将进行详尽的介绍和对比。对于MySQL其余几种存储引擎,请读者自行搜索学习。
全菜工程师小辉
2019/08/16
9270
MySQL索引的原理及使用
  上篇文章中学习了MySQL库的架构以及存储引擎,了解了基本索引(普通索引,唯一索引,主键索引),着重介绍了innerDB的存储方式以及内存模型,本篇文章和大家探讨一下MySQL库中索引的原理以及索引底层的数据结构。
会说话的丶猫
2020/08/06
9930
MySQL索引的原理及使用
SQL学习笔记五之MySQL索引原理与慢查询优化
一般的应用系统,读写比例在10:1左右,而且插入操作和一般的更新操作很少出现性能问题,在生产环境中,我们遇到最多的,也是最容易出问题的,还是一些复杂的查询操作,因此对查询语句的优化显然是重中之重。说起加速查询,就不得不提到索引了。
Jetpropelledsnake21
2019/02/15
9010
MySql知识体系总结(2021版)请收藏!!
MySQL的存储引擎架构将查询处理与数据的存储/提取相分离。下面是MySQL的逻辑架构图:
IT大咖说
2021/07/19
1.4K0
MySQL 索引原理 图文讲解
在数据库中,索引可以理解为是一种单独的,物理的对数据库表中的一列或者多列的值进行排序的一种存储结构。它的作用是能让我们快速检索到想要的数据,好比字典的目录,通过目录的页码能快速找到我们想查找的内容。
陈皮的JavaLib
2021/03/23
8960
MySQL 索引原理 图文讲解
MySQL底层存储B-Tree和B+Tree原理分析
互联网小阿祥
2023/05/28
3.2K0
MySQL底层存储B-Tree和B+Tree原理分析
Mysql索引
索引是什么了,查阅了官方文档。官方文档写了索引的作用和没有索引会带来全表扫描,非常费时间。 Indexes are used to find rows with specific column values quickly. Without an index, MySQL must begin with the first row and then read through the entire table to find the relevant rows. 简单的说索引是提高查询速度。这个很好理解,就像是以前的英文词典,找单词如果没有前面目录的话,效率很低,得全文找一遍。
mikelLam
2022/10/31
2.6K0
MySQL 索引
  我们用一个例子来逐渐引出啥是索引。话说大老板东哥有一天想体验一下快递小哥的生活,就去自家快递公司准备干活了,一进仓库看到一地的快递,兴冲冲的就问旁边的快递小哥 “这么多快递,我要找一个人的快递怎么办?”。快递小哥说 “你可以一件件找,直到找到你要的那件快递”,东哥一听脸顿时黑了 “淦!上十万件快递你要我一件件找,是想累死我,然后继承我的白条吗?” 说完一甩手扭头就会豪宅去了。   第二天,快递公司老板去找东哥说 “领导,我们已经改进了,再去指导指导呗”。东哥一听,哎呀!动作挺快,然后就又到快递公司了,问 “你们想出什么办法了吗”。快递小哥连忙回答 “我们给所有的快递都编了号,做了一个表格,只要从表格中找到编号就可以找到快递了”,东哥心想,我从上十万的名单里找出了编码,还要去上十万的快递里扒出快递,还是太累了就说 “我时间有限有没有更快的办法”。   快递公司老板一听,这还得了,大 BOOS 不满意了,得亏有备用方案,就说 “领导,我们还有个方案,我们做个快递柜,1 ~ 10 号快递放 0 号,10 ~ 20 放 1 号,依次类推,只要找到了快递编码,很快就可以找到快递了”。东哥一听,不错哈!这么干就快多了,但是我还要从上十万的表格中找出编码,难受啊!一脸的难受。快递公司老板冷汗直流,这是嫌找编码满了啊,该怎么办,BOOS 一怒,回家种地。这时一个程序员站住来说 “领导,我们还有个方案,我们把表格进行优化,按照姓名首字母来分类,就可以很快的找到指定的名字和编码”。东哥大喜,升职加薪!   从上面的例子可以推出,如果没有索引,必须遍历整个表,直到指定快递被找到为止;有了索引之后,即可在索引中查找。由于索引是经过某种算法优化过的,因而查找次数要少的多。可见,索引是用来定位的。官方来讲就是:索引是对数据库表中一列或多列的值进行排序的一种结构,使用索引可快速访问数据库表中的特定信息。
Demo_Null
2020/09/28
2K0
MySQL 索引
索引不是越多越好,理解索引结构原理,才有助于我们建立合适的索引!
MySQL支持诸多存储引擎,而各种存储引擎对索引的支持也各不相同,因此MySQL数据库支持多种索引类型,如BTree索引,哈希索引,全文索引等等。为了避免混乱,本文将只关注于BTree索引,因为这是平常使用MySQL时主要打交道的索引。
Java程序猿
2021/03/29
9690
玩转Mysql系列 - 第22篇:mysql索引原理详解
Mysql系列的目标是:通过这个系列从入门到全面掌握一个高级开发所需要的全部技能。
路人甲Java
2019/10/15
1K0
玩转Mysql系列 - 第22篇:mysql索引原理详解
mysql基础知识(6)
可以从几个维度去看这个问题,查询是否够快,效率是否稳定,存储数据多少,以及查找磁盘次数,为什么不是二叉树,为什么不是平衡二叉树,为什么不是B树,而偏偏是B+树呢?
恒辉信达
2024/11/20
930
【MySQL(2)| MySQL索引机制】
索引对于良好的性能非常关键,尤其是当表中的数据量越来越大时,索引对性能的影响愈发重要。
周三不加班
2019/09/03
1.1K0
【MySQL(2)| MySQL索引机制】
数据库索引结构知多少
前几天在看 2018 云栖大会,来自中科院计算所的陈世敏研究员在“数据库内核专场”做了一场《NVM在数据库领域的研究和探索 》的报告演讲。在30分钟的演讲中,其中有近10页PPT的内容和B+Tree这种索引有关。
东山絮柳仔
2021/03/23
6070
彻底理解 MySQL 的索引机制,终于不再因为 MySQL 优化而被面试官鄙视了
每当我们遇到数据库查询耗时过长,总会第一时间想到,在经常使用的条件上添加索引。我们知道索引会帮我们更快地查询到想要的数据,但是我们真的清楚究竟什么是索引,为什么索引能帮我们将查询时间缩短十倍百倍甚至更多吗?接下来请大家根据下文,一起深入索引的世界吧。
Java学习部落
2020/12/22
2.2K0
相关推荐
MySQL索引原理——B树
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验