前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis:告诉我怎么顶住2000万QPS的压力

Redis:告诉我怎么顶住2000万QPS的压力

原创
作者头像
叫我阿柒啊
修改2024-07-02 16:49:29
2974
修改2024-07-02 16:49:29
举报

前言

在多年的SparkStreaming的大数据流处理开发中,除了Kafka,Redis是用的最多的组件。目前生产有多个redis集群,最大的32节点的集群的key已经达到40亿个,峰值2000万的QPS。

Redis在流处理开发中一共有两种应用场景:

  1. 离线更新的维表数据,用于增加流数据的维度信息
  2. 应用实时更新的状态数据

不管是哪种应用场景,最后在SparkStreaming中都需要与Redis进行交互,来完成get、set操作假如SparkStreaming中RDD的时间间隔为1min,那么这个窗口的数据在1min内计算完成才算是"不延迟"。当遇到计算延迟的情况时,如果不与Redis交互,增加core、memory计算资源,或者提高并行度会解决这个问题,

之前开发一个1亿/min数据量的SparkStreaming应用中,发现造成计算延迟原因可能是与Redis交互耗费了太多的时间,这时候再增加计算资源和提高并行度效果不大,所以这时候就要从应用与Redis交互上进行优化。

Redis性能优化

很多时候,我们无法对已部署的Redis服务进行优化,哪怕是有些默认参数不合理,也无法随心所欲地修改参数然后重启集群,进而影响生产。所以我们只能从应用侧的代码中,去代码优化

应用和Redis服务数据交互的过程大致如下:

连接池

在Java中,应用与Redis进行交互的连接类是Jedis。如果每次连接都要创建一个Jedis的话,那么每次TCP三次握手建立连接,关闭时再四次挥手断开连接,虽然内网的情况下速度够快,但是架不住数据量大,因此我们需要保持长连接。

代码语言:java
复制
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMinIdle(5);
poolConfig.setMaxIdle(8);

JedisPool jedisPool = new JedisPool(poolConfig, "localhost", 6379);
Jedis jedis = jedisPool.getResource()

这样,JedisPool来完成Jedis的初始化,我们只需要调用getResource() 就能获取到Jedis,且将JedisPool对象赋值给Jedis的dataSource属性,用来表示Jedis对象是否是从JedisPool获取的。

当我们调用close() 关闭连接时,会对dataSource进行判断,如果为null,则直接断开Redis连接,如果是从连接池获取的,那么会将这个Jedis在放入到JedisPool中。

batch.size

经常和大数据打交道的都会很熟悉一个单词:batch.size。通常指的是两个组件在交互时,一次交互尽量处理一批数据,通过减少通信次数来提高处理效率。

kafka

例如在KafkaProduer中,参数batch.size用来控制一次向kafka发送数据的条数。

但问题又来了,假如batch.size设置为100,数据只有99条,那么就一直不发送吗。所以为了保证batch情况下的数据实时性,于是又定义了linger.ms来控制等待时间,batch.size与linger.ms只要满足一个,就将数据发送到kafka。

Flume

同样,在Flume的配置文件中,也经常看到batch size参数,通常来控制source向channel发送数据、或者sink从channel取出数据的条数。

那么,我们是否也可以将多个set、get操作打包成batch,一次性提交多条操作命令?

pipeline

Jedis提供了pipeline的操作模式,允许一次性执行多个操作命令,然后通过手动执行sync来发送到Redis,返回请求结果。接着上面连接池实现pipeline代码。

代码语言:java
复制
Jedis jedis = jedisPool.getResource()
Pipeline pipeline = jedis.pipelined();
Response<String> res1 = pipeline.get("aa");
Response<String> res2 = pipeline.get("bb");
// 这里也可以进行set、hmget等操作
pipeline.sync();
String s1 = res1.get();
String s2 = res2.get();

jedis.pipelined() 创建pipeline到执行sync发送redis之间,你可以执行多个不同redis命令。通过Jedis执行get操作,返回的是String。而使用pipeline执行,返回的是泛型为String的Response

Response数据回填

这个和NIO里面的Future差不多,通过上面代码也知道,pipeline执行完get(key)之后,不是立即发送到redis的,所以这时候只能返回一个Response对象,预留出一个字段来接收返回值,当执行sync()发送请求到redis之后,再将返回的结果一一回填到对应Response的字段上。

如图,只有sync之后,Response才会有数据,可以debug看一下。

Response debug

我使用docker,启动一个四主四从的redis cluster,对应端口从10001 - 10008。

使用redis-cli set两个key的数据。在cluster模式下,一定要加-c,否则就不会重定向到key所在slot的节点上,现在不懂slot无妨,因为接下来我会讲slot。

代码中使用Jedis连接其中的10001节点(不能使用JedisCluter,后面会讲).

代码语言:java
复制
 Jedis jedis = new Jedis("121.91.xxx.xx", 10001);
jedis.auth("1qaz@WSX");
Pipeline pipeline = jedis.pipelined();
Response<String> res1 = pipeline.get("aa");
pipeline.sync();
String s1 = res1.get();

然后使用pipeline获取key为aa的value值。在sync()处打上断点,debug启动。

可以看到,response的data属性此时为null。执行下一步,也就是执行完sync()之后,data就有值了,返回的应该是我们之前set的1,这里49是字符串1在ACSII码表中的编号。

所以,通常pipeline中执行一批(batch)的命令之后,再执行sync()将所有命令发送到redis,我在SparkStreaming的开发中,通常将batch设置为256或者512,也就是一次执行256或512个命令。

至于256还是512,可以根据返回value的大小、或者请求类型来评估。至此,两种应用层面对redis的优化已经讲完,

但是因为pipeline相当于和一个redis建立了一个通道,然后批量发送数据,所以只有jedis才有pipeline。具体什么意思呢,还是需要从redis常见的架构说起。

Redis常见架构

redis有多种运行模式,单点、哨兵、集群。目前在生产中,集群模式是用的最多的。而提到集群模式默认就是Redis自带cluster模式。但多年以来在大数据流处理开发中,我觉得最好用的cluster还是豌豆荚开源的Codis,不幸的是在很早之前已经停止了更新和维护。

我们上面提到pipeline只针对于Jedis,原因为何,还得先从Redis的存储架构说起。

在redis中,一共划分了16384个slot,每个key都会分布在其中的一个slot上。

单点模式

对于单点模式,16384个slot都在一个节点上,不论什么样的key都会在这个节点上,我们通过Jedis和这个节点建立连接即可。

代码语言:java
复制
Jedis jedis = new Jedis("ip", 10001);
Pipeline pipeline = jedis.pipelined();
Response<String> res1 = pipeline.get("aa");
Response<String> res2 = pipeline.get("bb");
pipeline.sync();

这里只需要记住一句话:Jedis只能连接单个redis服务

而对于cluster模式来说,集群中的每个节点均分slot,我们在存入一个key的时候,要考虑:key在哪个slot中?这个slot在哪个节点上? 所以,这是集群模式下使用pipeline要面临的问题。

codis

在codis的架构中,分为codis proxy代理层server服务层。 codis proyx让整个codis看起来就像一个"大的单点redis",同时proxy实现了redis协议,所以用户可以使用Jedis连接codis(大的单点redis)。

反观server服务层是由多个的redis实例组成的,16384个slot均匀分布在了各个redis实例上,但是server层在proxy之下,对于用户来说是透明不可见的。用户实际上通过Jedis连接的是proxy代理层,而proxy会将根据每个key计算出要对应的slot,然后再找到slot对应的redis节点,然后进行各种操作。

底层的这些工作都是codis proxy来做,我们只需要连接到codis proxy,然后默认16384个slot都在codis proxy上,根本不用去管底层的slot具体在哪个redis中。

代码语言:java
复制
val poolConfig = new JedisPoolConfig

val poolx: JedisResourcePool = RoundRobinJedisPool.create()
  .curatorClient(redisHost, 30000)
  .zkProxyDir("/jodis/" + database)
  .poolConfig(poolConfig)
  .timeoutMs(60000)
  .build()
  
val jedis: Jedis = poolx.getResource

所以,codis也是基于Jedis连接的,只不过连接的是codis proxy而非具体的redis。

redis cluster

与codis相同的是,redis cluster也是由多个的redis实例组成的,16384个slot均匀分布在了各个redis实例上。不同的是,redis cluster没有proxy代理层,server层是直接暴露给用户的。我们上面也说了,Jedis只能连接单个节点。

如果使用Jedis连接到redis cluster中的某一个节点,那么这个Jedis只能操作这个节点分布的slot的key。例如我在redis-cli中,在操作bb这个key时,可以看到redirect重定向到了10003节点,当我再操作aa这个key时,又重定向到了10001节点。

所以aa分布在10001节点的的slot中,bb存放在10003节点的slot中,可以使用Jedis进行测试:

代码语言:java
复制
Jedis jedis = new Jedis("121.91.xx.xxx", 10001);
jedis.auth("1qaz@WSX");
System.out.println(jedis.get("aa"));
System.out.println(jedis.get("bb"));

因为bb是在10003节点的slot上,而我Jedis连接的是10001节点,所以在获取bb的时候接直接报错。

因为Jedis只能操作一个节点的slot,所以才封装了JedisCluster来操作redis cluster的所有节。

通过JedisCluster连接到整个redis集群,然后执行set、get等命令。

代码语言:java
复制
HashSet<HostAndPort> jedisClusterNodes = new java.util.HashSet();
jedisClusterNodes.add(new HostAndPort("121.91.xx.xxx", 10001));
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(-1);
config.setMaxIdle(-1);
config.setMaxWaitMillis(1000);
JedisCluster cluster = new JedisCluster(jedisClusterNodes, 1000, 1000, 10, "1qaz@WSX", config);
cluster.get("aa");
cluster.get("bb");

但不论是codis还是redis cluster,最终目的都是连接到底层的多个redis服务。所以JedisCluster也实现了codis proxy的功能,这样才能将数据放到相应的redis上。

1. redis连接

JedisCluter存放数据时,底层使用的还是Jedis。

在初始化JedisCluster时,其connectionHandler属性的cache属性,会自动为所有的redis节点创建一个JedisPool,存放在nodes中,并且在slots属性中,将16384个slot与所在redis的JedisPool形成映射关系。如下图所示:

所以JedisCluster就拿到了每个slot与redis的关系与JedisPool,接下来JedisCluster再知道哪个key应该放在哪个slot上,就能直接获取对应slot所在redis的Jedis连接,将key放在redis中。

2. key的计算

JedisClustr每次在执行set、get等操作前,都会通过JedisClusterCRC16getSlot() ,通过计算出key所在的slot。

再调用getConnectionFromSlot() ,利用计算出来的slot,从cache中获取这个slot对应的JedisPool。

调用的是cache的getSlotPool() ,从1中生成的slots属性中,获取对应JedisPool。

最后一个key就通过JedisCluster的接口,一步步放入到了对应redis的slot中。

JedisCluster底层为集群中所有redis,都创建了一个JedisPool。但纵观整个JedisCluster,没有给用户暴露出来Jedis对象,也就是说JedisCluster面对的是cluster所有的节点,而非其中某一个节点。所以JedisCluster是无法使用pipeline的。

redis cluster的pipeline

我在SparkStreaming中,处理1亿/min数据使用redis cluster的时候,会因为与redis cluster交互过多而导致数据延迟,为此在刚从codis切换到redis cluster的时候,尝试了很多办法。

lettuce

redis cluster除了JedisCluster,也有一个开源的lettuce,其中的async异步类pipeline操作。

代码语言:java
复制
RedisURI uri = RedisURI.builder()
        .withHost("47.102.xxx.xxx")
        .withPassword("1qaz@WSX".toCharArray())
        .withPort(10001)
        .build();
RedisClusterClient client = RedisClusterClient.create(uri);
StatefulRedisClusterConnection<String, String> connect = client.connect();
RedisAdvancedClusterAsyncCommands<String, String> async = connect.async();
async.set("key1", "v1");
async.set("key2", "v2");
async.flushCommands();
Thread.sleep(1000 * 3);
connect.close();
client.shutdown();

但是实测之后,效率还是提不上去,有兴趣的朋友可以自己尝试一下。还有一个Redisson客户端,测试效果也不太理想。后来我还是自己实现了一个基于JedisCluster的pipeline客户端,现在生产在用,效率还是很快的。

自定义pipeline

上面讲了,JedisCluster的connectionHandler属性的cache属性的slots属性,建立了slot与JedisPool的关系,但是JedisCluster中没有对外暴露cache。

万幸的是,cache是protected,而不是private。

也就是说,JedisClusterConnectionHandler的子类可以获得cache,所以实现子类来获取slots。

利用slots的映射关系,我们也建立几个map映射,来实现针对于每个redis节点,我们都能获取到一个Jedis,来开启pipeline

  1. <JedisPool, Jedis>: 在打入一个key时,计算出key对应的slot,然后从slots中获取到对应的JedisPool,从JedisPool中取出一个Jedis,建立一个<JedisPool, Jedis>的映射,如果下次的key也在这个slot,就直接使用这个Jedis。保证一个redis实例只有一个Jedis,这样pipeline才有意义。
  2. <Jedis, Pipeline>:通过这个映射,保证一个Jedis只开启一次Pipeline,而且相同redis上的key都可以通过这个pipeline来操作。
  3. <Pipeline, num>:这个是用来设置sync的阈值,num是记录使用这个pipeline操作的次数,遍历,到达设定阈值的时候就执行sync,将pipeline的命令发送到redis中。

具体实现思路可以参考早期的文章:JedisCluster Pipeline的实现思路

在这个理论基础上,后来根据自己的实际开发需求,使用java + scala开发除了适配SparkStreaming版本的JedisCluster pipeline。

结语

这就是我个人在大数据开发中,对redis使用的一些经验之谈。感觉redis有好多要写的,除了上面的这些优化手段之外,例如hash结构中K/V的合理设计等等一系列手段。

最近,陪伴了七年之久的codis,在无法通过升级来修复漏洞的情况下,宣布下线。虽然很早就开始从codis向redis cluster靠拢,但是此刻也不免感叹一声"自古美人叹迟暮,不许英雄见白头"。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Redis性能优化
    • 连接池
      • batch.size
        • kafka
        • Flume
      • pipeline
        • Response数据回填
        • Response debug
    • Redis常见架构
      • 单点模式
        • codis
          • redis cluster
            • 1. redis连接
            • 2. key的计算
        • redis cluster的pipeline
          • lettuce
            • 自定义pipeline
            • 结语
            相关产品与服务
            云数据库 Redis
            腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档