foldByKey函数是PairRDD<K, V>对V做合并处理,方法是这样的
可以看到,第一个参数是zeroValue,这个就是用来对原始的V做合并操作的,后面的参数是一个JFunction操作。
对于一个PairRDD,如Array(("A",0),("A",2),("B",1),("B",2),("C",1))
进行foldByKey(2)、并且function是x+y的操作时,运算过程是这样的,先将2去加上key为"A"的第一个元素的value,变成了("A", 2),然后拿这个初始化的结果再去执行"A"与后续元素,结果就是("A", 4)。对于key为"B"的结果就是("B", 5)
看代码:
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author wuweifeng wrote on 2018/4/18.
*/
public class Test {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("A", 10));
data.add(new Tuple2<>("A", 20));
data.add(new Tuple2<>("B", 2));
data.add(new Tuple2<>("B", 3));
data.add(new Tuple2<>("C", 5));
JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
//初始值为2,那么就会将2先与第一个元素做一次Function操作,将结果再与下一个元素结合
Map map = originRDD.foldByKey(2, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 * v2;
}
}).collectAsMap();
//{A=400, C=10, B=12}
System.out.println(map);
}
}
注意,zeroValue只与同一个key的第一个value进行计算,而不是对所有的value进行计算。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有