前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Redis发布订阅模式

Redis发布订阅模式

原创
作者头像
ruochen
修改2021-11-23 08:12:40
修改2021-11-23 08:12:40
9960
举报
1、说明

Redis自身提供了发布/订阅(publish/subscribe)模式。实现方式大致流程如下图:

发布订阅三个角色:发布者,订阅者和Channel。

2、redis发布订阅命令

  • 发布者使用命令publish + channel + msg
代码语言:txt
复制
127.0.0.1:6379[1]> publish channel01 "important things"
代码语言:txt
复制
(integer) 0
代码语言:txt
复制
127.0.0.1:6379[1]> publish channel01 "important things"
代码语言:txt
复制
(integer) 1
代码语言:txt
复制
127.0.0.1:6379[1]> publish channel01 "hello girl"
代码语言:txt
复制
(integer) 2

注意:发布返回的是订阅者数量,发布的消息不会持久化,没有订阅者时候,发布消息会丢失,当在发布消息之后对channel进行订阅不会收到之前发布的消息。

  • 订阅者使用命令subscribe + channel
代码语言:txt
复制
127.0.0.1:6379[1]> subscribe channel01
代码语言:txt
复制
Reading messages... (press Ctrl-C to quit)
代码语言:txt
复制
1) "subscribe"
代码语言:txt
复制
2) "channel01"
代码语言:txt
复制
3) (integer) 1
代码语言:txt
复制
1) "message"
代码语言:txt
复制
2) "channel01"
代码语言:txt
复制
3) "important things"
代码语言:txt
复制
1) "message"
代码语言:txt
复制
2) "channel01"
代码语言:txt
复制
3) "hello somebody"

使用subscribe客户端进入订阅状态,该状态下不能使用与“发布/订阅”无关的其他命令。

3、RedisTemplate 实现发布订阅

  • 发布者,使用convertAndSend(channel , message)方法实现消息发布
代码语言:txt
复制
@RequestMapping("/pubSub")
代码语言:txt
复制
@RestController
代码语言:txt
复制
public class RedisComprehensive {
代码语言:txt
复制
    private static final String CHANNEL = "channel_01";
代码语言:txt
复制
    @Autowired
代码语言:txt
复制
    private RedisTemplate redisTemplate;
代码语言:txt
复制
    @GetMapping(value = "/publishMessage")
代码语言:txt
复制
    public void publishMessage(String message){
代码语言:txt
复制
        // 发布者
代码语言:txt
复制
        redisTemplate.convertAndSend(CHANNEL,message);
代码语言:txt
复制
    }
代码语言:txt
复制
}
  • 订阅者分析

(1)配置redis定义消息容器RedisMessageListenerContainer。

addMessageListener(MessageListenerAdapter,PatternTopic):

新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。

setTopicSerializer(RedisSerializer) :对频道内容进行序列化解析

(2)配置适配器MessageListenerAdapter

MessageListenerAdapter(Object delegate, String defaultListenerMethod)

delegate消息订阅者类,defaultListenerMethod消息处理方法

【实现代码如下】:

代码语言:txt
复制
@Configuration
代码语言:txt
复制
public class RedisConfig {
代码语言:txt
复制
    private static final String CHANNEL = "channel_01";
代码语言:txt
复制
     /**
代码语言:txt
复制
     * @name: container
     * @description: Redis订阅消息监听容器
     * @param connectionFactory
     * @param adapter
     * @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
    */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter adapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 可以添加多个MessageListener
        container.addMessageListener(adapter,new PatternTopic(CHANNEL));
        return container;
    }
代码语言:txt
复制
    /**
代码语言:txt
复制
     * @name: listenerAdapter
     * @description: 配置消息处理适配器
     * @param msgReceiveListener
     * @return: org.springframework.data.redis.listener.adapter.MessageListenerAdapter
     *
    */
    @Bean
    public MessageListenerAdapter listenerAdapter(MsgReceiveListener msgReceiveListener){
         // messageListenerAdapter 传入一个消息接受的处理器,利用反射的方式调用对应的处理方法
        return new MessageListenerAdapter(msgReceiveListener,"onMessage");
    }
}
代码语言:txt
复制
import lombok.extern.slf4j.Slf4j;
代码语言:txt
复制
import org.springframework.beans.factory.annotation.Autowired;
代码语言:txt
复制
import org.springframework.data.redis.connection.Message;
代码语言:txt
复制
import org.springframework.data.redis.connection.MessageListener;
代码语言:txt
复制
import org.springframework.data.redis.core.RedisTemplate;
代码语言:txt
复制
import org.springframework.data.redis.serializer.RedisSerializer;
代码语言:txt
复制
import org.springframework.stereotype.Component;
代码语言:txt
复制
@Component
代码语言:txt
复制
@Slf4j
代码语言:txt
复制
public class MsgReceiveListener implements MessageListener {
代码语言:txt
复制
    @Autowired
代码语言:txt
复制
    private RedisTemplate redisTemplate;
代码语言:txt
复制
    @Override
代码语言:txt
复制
    public void onMessage(Message message, byte[] pattern) {
代码语言:txt
复制
        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
代码语言:txt
复制
        String body = serializer.deserialize(message.getBody());
代码语言:txt
复制
        String chanel = serializer.deserialize(message.getChannel());
代码语言:txt
复制
        log.info("接收的消息:{},使用的chanel:{}",body,chanel);
代码语言:txt
复制
    }
代码语言:txt
复制
}

执行结果:

代码语言:txt
复制
2021-06-24 15:06:18.436  INFO 28520 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 13 ms
代码语言:txt
复制
2021-06-24 15:06:18.540  INFO 28520 --- [    container-2] c.l.r.listener.MsgReceiveListener        : 接收的消息:这是一个发布消息,使用的chanel:channel_01

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、说明
  • 2、redis发布订阅命令
  • 3、RedisTemplate 实现发布订阅
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档