前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >5 spark入门键值对foldByKey

5 spark入门键值对foldByKey

作者头像
天涯泪小武
发布于 2019-01-17 04:00:38
发布于 2019-01-17 04:00:38
1.2K00
代码可运行
举报
文章被收录于专栏:SpringCloud专栏SpringCloud专栏
运行总次数:0
代码可运行

foldByKey函数是PairRDD<K, V>对V做合并处理,方法是这样的

可以看到,第一个参数是zeroValue,这个就是用来对原始的V做合并操作的,后面的参数是一个JFunction操作。

对于一个PairRDD,如Array(("A",0),("A",2),("B",1),("B",2),("C",1))

进行foldByKey(2)、并且function是x+y的操作时,运算过程是这样的,先将2去加上key为"A"的第一个元素的value,变成了("A", 2),然后拿这个初始化的结果再去执行"A"与后续元素,结果就是("A", 4)。对于key为"B"的结果就是("B", 5)

看代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author wuweifeng wrote on 2018/4/18.
 */
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("A", 10));
        data.add(new Tuple2<>("A", 20));
        data.add(new Tuple2<>("B", 2));
        data.add(new Tuple2<>("B", 3));
        data.add(new Tuple2<>("C", 5));

        JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
        //初始值为2,那么就会将2先与第一个元素做一次Function操作,将结果再与下一个元素结合
        Map map = originRDD.foldByKey(2, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 * v2;
            }
        }).collectAsMap();

        //{A=400, C=10, B=12}
        System.out.println(map);
    }
}

注意,zeroValue只与同一个key的第一个value进行计算,而不是对所有的value进行计算。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年04月18日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
4 spark入门键值对聚合操作combineByKey
combineByKey是spark中一个核心的高级函数,其他多个键值对函数都是用它来实现的,如groupByKey,reduceByKey等等。
天涯泪小武
2019/01/17
1.8K0
2 Spark入门reduce、reduceByKey的操作
代码很简单,第一个就是将各个数累加。reduce顺序是1+2,得到3,然后3+3,得到6,然后6+4,依次进行。
天涯泪小武
2019/01/17
8140
6 spark入门键值对操作sortByKey、groupByKey、groupBy、cogroup
从名字就能看到,是将Key排序用的。如一个PariRDD-["A":1, "C":4, "B":3, "B":5],按Key排序的话就是A、B、C。注意,这个方法只是对Key进行排序,value不排序。
天涯泪小武
2019/01/17
2.4K0
1 Spark入门各种map的操作,java语言
Spark基本操作主要就是各种map、reduce,这一篇从各种map开始。由于scala不熟悉,而且语法太精简,虽然代码量少了,但是可读性差了不少,就还是用Java来操作。
天涯泪小武
2019/01/17
7410
3 Spark入门distinct、union、intersection,subtract,cartesian等数学运算
这一篇是一些简单的Spark操作,如去重、合并、取交集等,不管用不用的上,做个档案记录。
天涯泪小武
2019/01/17
1.1K0
spark RDD算子(八)之键值对关联操作
github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8 先从spark-learning中的一张图大致了解其功能
天涯泪小武
2019/05/26
1.8K0
【Spark篇】---Spark中Action算子
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。
LhWorld哥陪你聊算法
2018/09/13
1K0
【Spark篇】---Spark中Action算子
Spark 第一个Spark程序WordCount
使用上述命令打包后,会在项目根目录下的target目录生成jar包。打完jar包后,我们可以使用spark-submit提交任务:
smartsi
2019/08/07
3600
【Spark篇】---Spark中Transformations转换算子
Spark中默认有两大类算子,Transformation(转换算子),懒执行。action算子,立即执行,有一个action算子 ,就有一个job。
LhWorld哥陪你聊算法
2018/09/13
6930
【Spark篇】---Spark中Transformations转换算子
【Spark篇】---Spark中transformations算子二
coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
LhWorld哥陪你聊算法
2018/09/13
9900
【Spark篇】---Spark中transformations算子二
流计算与批处理的区别是什么?请举例说明。
流计算和批处理是两种不同的数据处理模型,它们在数据到达和处理方式上存在一些区别。下面我将通过一个具体的案例来说明流计算和批处理的区别。
GeekLiHua
2025/01/21
930
Spark入门第一步:WordCount之java版、Scala版
Spark入门第一步:WordCount之java版、Scala版 Spark入门系列,第一步,编写WordCount程序。 我们分别使用java和scala进行编写,从而比较二者的代码量 数据文件 通过读取下面的文件内容,统计每个单词出现的次数 java scala python android spark storm spout bolt kafka MQ elasticsearch logstash kibana hive hbase mysql oracle sqoop hadoop hdfs m
趣学程序-shaofeer
2019/10/29
1.8K0
大数据算法设计模式(1) - topN spark实现
topN算法,spark实现 package com.kangaroo.studio.algorithms.topn; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFuncti
用户1225216
2018/03/05
1.3K0
Spark Core入门1【Spark集群安装、高可用、任务执行流程、使用Scala/Java/Lambda编写Spark WordCount】
Spark是一种快速、通用、可扩展的大数据分析引擎,包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目。
Java架构师必看
2021/05/14
1.5K0
Spark Core入门1【Spark集群安装、高可用、任务执行流程、使用Scala/Java/Lambda编写Spark WordCount】
Spark学习之WordCount(Java版)
一、pom.xml 添加spark-core依赖包 org.apache.spark spark-core_2.11 2.1.1 二、代码实现 package spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import or
静谧星空TEL
2021/04/27
3850
【Spark篇】---SparkStreaming算子操作transform和updateStateByKey
今天分享一篇SparkStreaming常用的算子transform和updateStateByKey。
LhWorld哥陪你聊算法
2018/09/13
1.2K0
【Spark篇】---SparkStreaming算子操作transform和updateStateByKey
【Spark篇】---SparkStream初始与应用
SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
LhWorld哥陪你聊算法
2018/09/13
6360
【Spark篇】---SparkStream初始与应用
Win7 Eclipse 搭建spark java1.8(lambda)环境:WordCount helloworld例子
Win7 Eclipse 搭建spark java1.8(lambda)环境:WordCount helloworld例子 lambda表达式是java8给我们带来的一个重量的新特性,借用lambda表达式可以让我们的程序设计更加简洁。 package com; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class WordCountLambda { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("WordCountLambda马克-to-win @ 马克java社区:").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("E://temp//input//friend.txt"); JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" "))); JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> new Tuple2<>(word, 1)); JavaPairRDD<String, Integer> results = wordAndOne.reduceByKey((x, y) -> x + y); /* 下面一句也能工作。*/ // reduced.saveAsTextFile("E://temp//input//friend1.txt"); /*word:o1abc count:4 word:45 count:1 word:77 count:1*/ results.foreach(new VoidFunction<Tuple2<String,Integer>>() { public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println("word:" + tuple._1 + " count:" + tuple._2); } }); /*resultsPair is (o1abc,4) resultsPair is (45,1) resultsPair is (77,1)*/ List<Tuple2<String,Integer>> resultsPairs = results.collect(); for (Tuple2<String, Integer> resultsPair : resultsPairs) {
马克java社区
2019/07/19
4480
Win7 Eclipse 搭建spark java1.8(lambda)环境:WordCount helloworld例子
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
在Spark中,Shuffle是指将数据重新分区的过程,通常在数据的重新分区和聚合操作中发生。Shuffle过程是Spark中性能关键的一部分,它对于作业的性能和可伸缩性有着重要的影响。
GeekLiHua
2025/01/21
1220
大数据算法设计模式(2) - 左外链接(leftOuterJoin) spark实现
左外链接(leftOuterJoin) spark实现 package com.kangaroo.studio.algorithms.join; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.
用户1225216
2018/03/05
7140
推荐阅读
相关推荐
4 spark入门键值对聚合操作combineByKey
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验