最近写了一篇关于redis的不同架构的文章:Redis:告诉我怎么顶住2000万QPS的压力,主要讲的是在日常开发中,通过哪些优化手段,来提升与redis的交互效率。
同时,文章也从SparkStreaming连接redis cluster的角度切入,讲述了如何通过自定义JedisCluster的pipeline,去减少与redis cluster交互次数,从而降低数据处理延时。
那么,在什么情况下,大数据的实时应用会用到redis呢?
在SparkStreaming或者flink的实时应用中,redis通常有两种应用场景:
场景一中,实时应用只会对redis进行get、hget操作,而场景二是既要get操作、也要set操作。
在我日常的数据接入和实时开发中,通常是从对端系统按照约定协议接入数据,然后实时写入生产的kafka,来完成实时业务场景的开发。通常,很多业务场景最终结果都是针对于用户的,但接入到Kafka的数据不可能覆盖所有用户信息,所以我们要从数据库中,按照用户在系统所有数据的唯一标识导入到redis中。
例如,我们在Kafka接入了用户访问某个应用的数据,这些数据可能有用户id、访问开始时间、访问时长、访问流量等字段。但是如果我们想要去分析,哪个年龄段的用户最喜欢这个应用。在采集用户访问应用时,是不可能携带年龄这个字段的。所以,就需要去数据库中查找用户信息表,从而查询用户年龄字段,然后导出放到redis中。
在上述整个离线维度数据的导入redis过程中,需要做以下工作:
我们知道,在redis中的数据是以KV形式存在的,key是唯一的,所以通常使用用户id(手机号、身份证号)等用作key,然后根据业务场景,加一个前缀用作区分不同的业务,避免不用业务之间相同key互相覆盖。
而value通常使用hash类型,哪怕只有一个字段,我们也要考虑后期的字段扩展,所以,选择hash类型是一个明智的选择。
维度数据放在MPP中,所以我通常是先导出文件,然后再导入到redis中。如果不想开发程序的话,这里有一个相对于快速导入redis的方法。首先,就是在导出数据的时候,使用sql将数据导出为redis-cli中要执行命令的格式,例如:
hset 1 age 10
hset 2 age 30
hset 3 age 14
然后直接将文件重定向输入< 到redis-cli中:
redis-cli [options] < filename
实际操作如下:
可以看到通过重定向输入,将文件中的数据导入到了redis中。
上面的这种shell重定向的方式,使是我用的最多的,通常写一个shell脚本,挂在调度平台上,按照日调度进行更新就行了,这种方式最大的优点就是简单方便、开发成本低,我通常将7000w数据,7个value通常要执行8个小时左右,这种方式缺点还是速度过慢。
有时候我想很快的更新数据看程序结果,我就用我自己开发的Spark批处理程序,我将csv格式的文件放入到HDFS,然后在Spark中指定文件path,在Spark中使用pipeline的方式,将数据写入到redis中。
还是上面7000w的数据,大概1G左右,我们的HDFS一个block的大小为128M,所以这个文件就会自动被分割为8个block,Spark就会启动8个线程去处理。
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
if (args.length != 5) {
print("command arguments less than 5,please check again")
sc.stop()
}
val lines = sc.textFile(args(0))
// 0 文件路径 1 IP 2 Port 3 password 4 操作
val clusterPipeline = sc.broadcast(JedisClusterPool(args(1), args(2).toInt, args(3), -1, -1, 3, 1000, 1000))
val operator = args(4)
lines
.map(_.split(" "))
.foreachPartition(p => {
val poolPipelineMap = new ConcurrentHashMap[JedisPool, Pipeline](1024)
val pipelineCount = new ConcurrentHashMap[Pipeline, Int](1024)
val jedisList = new util.ArrayList[Jedis](1024)
p.foreach(x => {
// 自己封装的第一版JedisCluster的pipeline,后续又进行了优化
val pipeline = clusterPipeline.value.initPipeline(x(0), poolPipelineMap, jedisList, pipelineCount, 512)
if ("hmset".equals(operator)) {
pipeline.hmset(x(0), clusterPipeline.value.file2Map(x))
} else if ("set".equals(operator)) {
pipeline.set(x(0), x(1))
}
})
clusterPipeline.value.pipelineSync(pipelineCount)
clusterPipeline.value.releaseConnection(jedisList)
})
使用textFile读取HDFS上的文件,其中clusterPipeline是我基于JedisCluster封装的基于redis cluster的pipeline模式,最后7000w数据只需要1分钟左右就能导入到redis中。
当然,最后也是对clusterPipeline的操作进行了二次封装,这样在Spark中使用pipeline就更简单。
除了上述的离线维度数据的应用场景外,redis还有实时状态数据更新的场景。通常用于状态更新的数据,是Kafka中原始的字段信息。就拿上面讲的用户访问应用的例子来讲,如果我们要记录一个用户一天之内访问应用的次数、或者用户一共访问了应用多长时间那该怎么实现?
我们知道,不论是SparkStreaming还是flink,流数据处理默认都是无状态的,每一条数据都是独立存在的,数据在计算过后生成结果就会被丢弃,而不会参与到下一次的计算中去。所以我们就要使得实时计算变得有状态(stateful)。
SparkStreaming和flink都提供了状态计算,将状态数据保存在内存中。
val conf = new SparkConf()
val duration = args(3).toInt
val ssc = new StreamingContext(conf, Seconds(duration))
ssc.checkpoint("./tmp")
val streams: DStream[String] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).map(_.value)
val result = streams.map(x => x.split("\t"))
.filter(x => x(0).startsWith("1"))
// 生成(k,v)元组
.map(x => (x(0), Integer.parseInt(x(21)) + Integer.parseInt(x(22)) + Integer.parseInt(x(24))
+ Integer.parseInt(x(25)) + Integer.parseInt(x(30)) + Integer.parseInt(x(31))))
.reduceByKey((x, y) => x + y)
// 状态计算
.mapWithState(StateSpec.function((k: String, v: Option[Int], state: State[Int]) => {
val state_flow = state.getOption().getOrElse(0)
var flag = 0
var sum_flow = 0
if (!state.isTimingOut()) {
val flow = v.get
sum_flow = state_flow + flow
val state_flow_M = state_flow / (1024 * 1024)
val sum_flow_M = sum_flow / (1024 * 1024)
if (state_flow_M < 1024 && sum_flow_M >= 1024) {
flag = 3
} else if (state_flow_M < 500 && sum_flow_M >= 500) {
flag = 2
} else if (state_flow_M < 200 && sum_flow_M >= 200) {
flag = 1
}
state.update(sum_flow)
}
Option((k, flag))
}).timeout(Minutes(182))
)
在SparkStreaming算子中,提供了mapWithState状态计算算子,将上游的数据格式化成(k,v)元组,然后进入mapWithState进行状态计算,state表示在状态数据中,根据k找出来的当前状态,然后通过update将自己计算后的最新状态更新进去。
这里我利用mapWithState实现了一个计数的demo:
程序的数据源是9999端口的socketStream,我向端口写入数据:
随后输出状态数据:
我是同时写入的前六行数据,所以这六条数据都在同一个RDD内,然后再写入最后一行数据,可以看到,在第三个RDD内,2的个数就变成了3。
但是缺点就是如果程序故障,之前的状态数据可能会丢失(虽然有checkpoint)。而且key的过期时间在程序内还是挺难控制的,需要另外大量的逻辑代码实现。所以为了将状态数据与程序本身分离以及开发便捷性,所以通常使用redis。
val result = p.toList.map(x => {
val userID = x._1
val flow = x._2
val response_date = pipeline.hget(userID, "date")
// 从redis中获取当前user的流量
val response_flow_codis = pipeline.hget(userID, "flow")
i += 2
if (i % 512 ==0){
pipeline.sync
}
(userID, flow, response_date, response_flow_codis)
})
pipeline.sync
i = 0
result.foreach(x => {
val userID = x._1
// 当前数据中的流量数据
var flow = x._2
val date = x._3.get
// redis中历史的流量数据
val flow_codis = x._4.get
// 说明flow不是今天的,直接覆盖
if (date == null || date != start_date.value) {
pipeline.hset(userID, "flow", String.valueOf(flow))
pipeline.hset(userID, "date", start_date.value)
} else if (flow_codis != "NN" && date == start_date.value) {
// 累加当前数据流量和redis历史流量,然后更新到redis
flow = Integer.parseInt(flow_codis) + flow
pipeline.hset("userID, "flow", String.valueOf(flow))
}
})
上面的代码首先是从redis中获取当前user的历史流量,然后与当前数据中用户的流量累加,最后生成最新的流量数据,再更新到redis中。
可以看到上面的代码中,我在redis中放入了一个date字段。为什么呢?
这个程序实现的是一个日流量累计的应用场景。也就是说,每天的流量从00:00开始,就要被置空。我遇到到很多开发者,在这种场景下将key设计为userID + yyyymmdd的格式,这样每个用户在每天都会在redis中生成一个新的key,这样就会导致key无限增长,所以为了解决这个问题,开发者又想出给key设置ttl过期时间,然后redis过期清理。
但是这种问题就导致了key一直是变动的,每次查询或者排查问题的时候,都需要先生成yyyymmdd格式的时间拼接。所以为了解决这个问题,我就增加了一个名为date的value,里面存放yyyymmdd格式的时间。
每次在访问redis key的时候,都先去访问date,与当前系统时间比较,判断flow是否为今天的flow,如果date与当前时间不一致(date != start_date.value),则说明flow的值不是今天的,只需要将当前这条数据的流量写入flow覆盖即可,然后将当前时间更新到date中。
上面就是我结合个人实时开发的业务场景,对redis两种应用场景的一个阐述。总的来说,redis作为一个分布式缓存组件,在实时开发中表现出了其优异的性能,但是如何去用、如何用好,还需要开发者本身去思考。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。