前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3000字 | 一文讲透redis在大数据开发中的应用场景

3000字 | 一文讲透redis在大数据开发中的应用场景

原创
作者头像
叫我阿柒啊
发布2024-07-12 11:03:37
9251
发布2024-07-12 11:03:37
举报
文章被收录于专栏:大数据之路入门到放弃之路

前言

最近写了一篇关于redis的不同架构的文章:Redis:告诉我怎么顶住2000万QPS的压力,主要讲的是在日常开发中,通过哪些优化手段,来提升与redis的交互效率。

同时,文章也从SparkStreaming连接redis cluster的角度切入,讲述了如何通过自定义JedisCluster的pipeline,去减少与redis cluster交互次数,从而降低数据处理延时。

那么,在什么情况下,大数据的实时应用会用到redis呢?

redis应用场景

在SparkStreaming或者flink的实时应用中,redis通常有两种应用场景:

  1. 离线更新的维表数据,用于增加流数据的维度信息,通常按日/月周期进行更新
  2. 应用实时更新的状态数据

场景一中,实时应用只会对redis进行get、hget操作,而场景二是既要get操作、也要set操作。

维度数据

在我日常的数据接入和实时开发中,通常是从对端系统按照约定协议接入数据,然后实时写入生产的kafka,来完成实时业务场景的开发。通常,很多业务场景最终结果都是针对于用户的,但接入到Kafka的数据不可能覆盖所有用户信息,所以我们要从数据库中,按照用户在系统所有数据的唯一标识导入到redis中。

例如,我们在Kafka接入了用户访问某个应用的数据,这些数据可能有用户id、访问开始时间、访问时长、访问流量等字段。但是如果我们想要去分析,哪个年龄段的用户最喜欢这个应用。在采集用户访问应用时,是不可能携带年龄这个字段的。所以,就需要去数据库中查找用户信息表,从而查询用户年龄字段,然后导出放到redis中。

在上述整个离线维度数据的导入redis过程中,需要做以下工作:

1. KV设计

我们知道,在redis中的数据是以KV形式存在的,key是唯一的,所以通常使用用户id(手机号、身份证号)等用作key,然后根据业务场景,加一个前缀用作区分不同的业务,避免不用业务之间相同key互相覆盖。

而value通常使用hash类型,哪怕只有一个字段,我们也要考虑后期的字段扩展,所以,选择hash类型是一个明智的选择。

2. 维度数据导出

维度数据放在MPP中,所以我通常是先导出文件,然后再导入到redis中。如果不想开发程序的话,这里有一个相对于快速导入redis的方法。首先,就是在导出数据的时候,使用sql将数据导出为redis-cli中要执行命令的格式,例如:

代码语言:sql
复制
hset 1 age 10
hset 2 age 30
hset 3 age 14
3.导入redis

然后直接将文件重定向输入< 到redis-cli中:

代码语言:shell
复制
redis-cli [options]  < filename

实际操作如下:

可以看到通过重定向输入,将文件中的数据导入到了redis中。

4. 方案优化

上面的这种shell重定向的方式,使是我用的最多的,通常写一个shell脚本,挂在调度平台上,按照日调度进行更新就行了,这种方式最大的优点就是简单方便、开发成本低,我通常将7000w数据,7个value通常要执行8个小时左右,这种方式缺点还是速度过慢。

有时候我想很快的更新数据看程序结果,我就用我自己开发的Spark批处理程序,我将csv格式的文件放入到HDFS,然后在Spark中指定文件path,在Spark中使用pipeline的方式,将数据写入到redis中。

还是上面7000w的数据,大概1G左右,我们的HDFS一个block的大小为128M,所以这个文件就会自动被分割为8个block,Spark就会启动8个线程去处理。

代码语言:scala
复制
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
if (args.length != 5) {
  print("command arguments less than 5,please check again")
  sc.stop()
}
val lines = sc.textFile(args(0))
// 0 文件路径 1 IP 2 Port 3 password 4 操作
val clusterPipeline = sc.broadcast(JedisClusterPool(args(1), args(2).toInt, args(3), -1, -1, 3, 1000, 1000))
val operator = args(4)
lines
  .map(_.split(" "))
  .foreachPartition(p => {
    val poolPipelineMap = new ConcurrentHashMap[JedisPool, Pipeline](1024)
    val pipelineCount = new ConcurrentHashMap[Pipeline, Int](1024)
    val jedisList = new util.ArrayList[Jedis](1024)
    p.foreach(x => {
      // 自己封装的第一版JedisCluster的pipeline,后续又进行了优化
      val pipeline = clusterPipeline.value.initPipeline(x(0), poolPipelineMap, jedisList, pipelineCount, 512)
      if ("hmset".equals(operator)) {
        pipeline.hmset(x(0), clusterPipeline.value.file2Map(x))
      } else if ("set".equals(operator)) {
        pipeline.set(x(0), x(1))
      }
    })
    clusterPipeline.value.pipelineSync(pipelineCount)
    clusterPipeline.value.releaseConnection(jedisList)
  })

使用textFile读取HDFS上的文件,其中clusterPipeline是我基于JedisCluster封装的基于redis cluster的pipeline模式,最后7000w数据只需要1分钟左右就能导入到redis中。

当然,最后也是对clusterPipeline的操作进行了二次封装,这样在Spark中使用pipeline就更简单。

状态数据

除了上述的离线维度数据的应用场景外,redis还有实时状态数据更新的场景。通常用于状态更新的数据,是Kafka中原始的字段信息。就拿上面讲的用户访问应用的例子来讲,如果我们要记录一个用户一天之内访问应用的次数、或者用户一共访问了应用多长时间那该怎么实现?

我们知道,不论是SparkStreaming还是flink,流数据处理默认都是无状态的,每一条数据都是独立存在的,数据在计算过后生成结果就会被丢弃,而不会参与到下一次的计算中去。所以我们就要使得实时计算变得有状态(stateful)

1. mapWithState状态计算

SparkStreaming和flink都提供了状态计算,将状态数据保存在内存中

代码语言:scala
复制
val conf = new SparkConf()
val duration = args(3).toInt
val ssc = new StreamingContext(conf, Seconds(duration))
ssc.checkpoint("./tmp")

val streams: DStream[String] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    ).map(_.value)
    
val result = streams.map(x => x.split("\t"))
      .filter(x => x(0).startsWith("1"))
      // 生成(k,v)元组
      .map(x => (x(0), Integer.parseInt(x(21)) + Integer.parseInt(x(22)) + Integer.parseInt(x(24))
        + Integer.parseInt(x(25)) + Integer.parseInt(x(30)) + Integer.parseInt(x(31))))
      .reduceByKey((x, y) => x + y)
       // 状态计算
      .mapWithState(StateSpec.function((k: String, v: Option[Int], state: State[Int]) => {
        val state_flow = state.getOption().getOrElse(0)
        var flag = 0
        var sum_flow = 0
        if (!state.isTimingOut()) {
          val flow = v.get
          sum_flow = state_flow + flow
          val state_flow_M = state_flow / (1024 * 1024)
          val sum_flow_M = sum_flow / (1024 * 1024)
          if (state_flow_M < 1024 && sum_flow_M >= 1024) {
            flag = 3
          } else if (state_flow_M < 500 && sum_flow_M >= 500) {
            flag = 2
          } else if (state_flow_M < 200 && sum_flow_M >= 200) {
            flag = 1
          }
          state.update(sum_flow)
        }
        Option((k, flag))
      }).timeout(Minutes(182))
      )

在SparkStreaming算子中,提供了mapWithState状态计算算子,将上游的数据格式化成(k,v)元组,然后进入mapWithState进行状态计算,state表示在状态数据中,根据k找出来的当前状态,然后通过update将自己计算后的最新状态更新进去。

这里我利用mapWithState实现了一个计数的demo:

程序的数据源是9999端口的socketStream,我向端口写入数据:

随后输出状态数据:

我是同时写入的前六行数据,所以这六条数据都在同一个RDD内,然后再写入最后一行数据,可以看到,在第三个RDD内,2的个数就变成了3。

2. redis实现状态计算

但是缺点就是如果程序故障,之前的状态数据可能会丢失(虽然有checkpoint)。而且key的过期时间在程序内还是挺难控制的,需要另外大量的逻辑代码实现。所以为了将状态数据与程序本身分离以及开发便捷性,所以通常使用redis。

代码语言:scala
复制
val result = p.toList.map(x => {
  val userID = x._1
  val flow = x._2
  val response_date = pipeline.hget(userID, "date")
  // 从redis中获取当前user的流量
  val response_flow_codis = pipeline.hget(userID, "flow")
  i += 2
  if (i % 512 ==0){
    pipeline.sync
  }
  (userID, flow, response_date, response_flow_codis)
})
pipeline.sync
i = 0
result.foreach(x => {
  val userID = x._1
  // 当前数据中的流量数据
  var flow = x._2
  val date = x._3.get
  // redis中历史的流量数据
  val flow_codis = x._4.get
  // 说明flow不是今天的,直接覆盖
  if (date == null || date != start_date.value) {
    pipeline.hset(userID, "flow", String.valueOf(flow))
    pipeline.hset(userID, "date", start_date.value)
  } else if (flow_codis != "NN" && date == start_date.value) {
    // 累加当前数据流量和redis历史流量,然后更新到redis
    flow = Integer.parseInt(flow_codis) + flow
    pipeline.hset("userID, "flow", String.valueOf(flow))
  }
})

上面的代码首先是从redis中获取当前user的历史流量,然后与当前数据中用户的流量累加,最后生成最新的流量数据,再更新到redis中。

3. 状态计算key设计

可以看到上面的代码中,我在redis中放入了一个date字段。为什么呢?

这个程序实现的是一个日流量累计的应用场景。也就是说,每天的流量从00:00开始,就要被置空。我遇到到很多开发者,在这种场景下将key设计为userID + yyyymmdd的格式,这样每个用户在每天都会在redis中生成一个新的key,这样就会导致key无限增长,所以为了解决这个问题,开发者又想出给key设置ttl过期时间,然后redis过期清理。

但是这种问题就导致了key一直是变动的,每次查询或者排查问题的时候,都需要先生成yyyymmdd格式的时间拼接。所以为了解决这个问题,我就增加了一个名为date的value,里面存放yyyymmdd格式的时间。

每次在访问redis key的时候,都先去访问date,与当前系统时间比较,判断flow是否为今天的flow,如果date与当前时间不一致(date != start_date.value),则说明flow的值不是今天的,只需要将当前这条数据的流量写入flow覆盖即可,然后将当前时间更新到date中。

结语

上面就是我结合个人实时开发的业务场景,对redis两种应用场景的一个阐述。总的来说,redis作为一个分布式缓存组件,在实时开发中表现出了其优异的性能,但是如何去用、如何用好,还需要开发者本身去思考。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • redis应用场景
    • 维度数据
      • 1. KV设计
      • 2. 维度数据导出
      • 3.导入redis
      • 4. 方案优化
    • 状态数据
      • 1. mapWithState状态计算
      • 2. redis实现状态计算
      • 3. 状态计算key设计
  • 结语
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档