前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >updateStateByKey

updateStateByKey

作者头像
编程那点事
发布2023-02-25 16:00:48
2360
发布2023-02-25 16:00:48
举报
文章被收录于专栏:java编程那点事java编程那点事

updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。 1、首先,要定义一个state,可以是任意的数据类型; 2、其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。

对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。

当然,对于每个新出现的key,也会执行state更新函数。

注意,updateStateByKey操作,要求必须开启Checkpoint机制。

案例:基于缓存的实时wordcount程序(在实际业务场景中,这个是非常有用的)

代码语言:javascript
复制
/**
* 基于updateStateByKey算子实现缓存机制的实时wordcount程序
* @author Administrator
*
*/
public class UpdateStateByKeyWordCount {
​public static void main(String[] args) {
​​SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount");  
​​JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
​​// 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
​​// 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
​​// 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
​​// 内存数据丢失的时候,可以从checkpoint中恢复数据
// 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
​​jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint");  
​​// 然后先实现基础的wordcount逻辑
​​JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
​​JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

​​​private static final long serialVersionUID = 1L;

​​​@Override
​​​public Iterable<String> call(String line) throws Exception {
​​​​return Arrays.asList(line.split(" "));  
​​​}
​​});

​​JavaPairDStream<String, Integer> pairs = words.mapToPair(

​​​​new PairFunction<String, String, Integer>() {

​​​​​private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Tuple2<String, Integer> call(String word) ​​​​​​​throws Exception {
​​​​​​return new Tuple2<String, Integer>(word, 1);
​​​​​}
​​​​});
​​// 到了这里,就不一样了,之前的话,是不是直接就是pairs.reduceByKey
​​// 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
​​// 然后,可以打印出那个时间段的单词计数
​​// 但是,有个问题,你如果要统计每个单词的全局的计数呢?
​​// 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
​​// 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
​​// 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
​​JavaPairDStream<String, Integer> wordCounts = pairs.updateStateByKey(

// 这里的Optional,相当于Scala中的样例类,就是Option,可以这么理解
​​​​// 它代表了一个值的存在状态,可能存在,也可能不存在
​​​​new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {

​​​​​private static final long serialVersionUID = 1L;

// 这里两个参数
​​​​​// 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数
​​​​​// 第一个参数,values,相当于是这个batch中,这个key的新的值,可能有多个吧
​​​​​// 比如说一个hello,可能有2个1,(hello, 1) (hello, 1),那么传入的是(1,1)
​​​​​// 第二个参数,就是指的是这个key之前的状态,state,其中泛型的类型是你自己指定的
​​​​​@Override
​​​​​public Optional<Integer> call(List<Integer> values,​​​​​​​Optional<Integer> state) throws Exception {
​​​​​​// 首先定义一个全局的单词计数
​​​​​​Integer newValue = 0;
​​​​​​// 其次,判断,state是否存在,如果不存在,说明是一个key第一次出现
​​​​​​// 如果存在,说明这个key之前已经统计过全局的次数了
​​​​​​if(state.isPresent()) {
​​​​​​​newValue = state.get();
​​​​​​}
// 接着,将本次新出现的值,都累加到newValue上去,就是一个key目前的全局的统计
​​​​​​// 次数
​​​​​​for(Integer value : values) {
​​​​​​​newValue += value;
​​​​​​}
​​​​​​return Optional.of(newValue);  
​​​​​}
​​​​});
​​// 到这里为止,相当于是,每个batch过来是,计算到pairs DStream,就会执行全局的updateStateByKey
​​// 算子,updateStateByKey返回的JavaPairDStream,其实就代表了每个key的全局的计数
​​// 打印出来
​​wordCounts.print();
​​jssc.start();
​​jssc.awaitTermination();
​​jssc.close();
​}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-02-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档