org.apache.spark
spark-core_2.11
2.1.1
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class SparkWordCountForJava {
public static void main(String[] args) {
// 初始化spark , local[*]:以*核心数在本地运行
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountForJava");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD textFileRdd = jsc.textFile("C:\\Users\\com\\Desktop\\test.txt");
// 将数据按照切分规则分成一个个单词
JavaRDD flatMapRdd = textFileRdd.flatMap(new FlatMapFunction() {
public Iterator call(String s) throws Exception {
String[] splits = s.split("\t");
List list = Arrays.asList(splits);
return list.iterator();
}
});
// 每个单词作为key,value为1
JavaRDD> mapRdd = flatMapRdd.map(new Function>() {
public Tuple2 call(String s) throws Exception {
return new Tuple2(s, 1);
}
});
// 分组:相同 key 分为一组
JavaPairRDD>> groupByRdd = mapRdd.groupBy(new Function, String>() {
public String call(Tuple2 s) throws Exception {
return s._1;
}
});
// Lmbda 表达式写法 和 mapRdd 、 groupByRdd 值一样
JavaRDD> mapRdd1 = flatMapRdd.map(s -> new Tuple2(s, 1));
JavaPairRDD>> groupByRdd1 = mapRdd1.groupBy(s -> s._1);
// 相同key,value值累加
JavaPairRDD mapValuesRdd = groupByRdd.mapValues(new Function>, Integer>() {
public Integer call(Iterable> v1) throws Exception {
int sum = 0;
for(Tuple2 t:v1) {
sum += t._2;
}
return sum;
}
});
// 行动算子:collect,将数据拉取到driver端
List> list = mapValuesRdd.collect();
System.out.println(list);
}
}
小明 小绿 小黑
小红 小红 小白
小蓝 小蓝 小蓝
小黑 小白 小黑
小红 小红 小黄
小黑 小白 小绿
小红 小蓝 小蓝
小红 小红 小黄
小绿 小蓝 小蓝
小黑 小白 小蓝
[(小绿,3), (小白,4), ( ,9), (小蓝,8), (小黑,5), (小红,7), (小明,1), (小黄,2)]