这一篇是一些简单的Spark操作,如去重、合并、取交集等,不管用不用的上,做个档案记录。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
/**
* 去除重复的元素,不过此方法涉及到混洗,操作开销很大
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestDistinct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> data = Arrays.asList(1, 1, 2, 3, 4, 5);
JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
List<Integer> results = originRDD.distinct().collect();
System.out.println(results);
}
}
结果是[4, 1, 3, 5, 2]
这个就是简单的将两个RDD合并到一起
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
/**
* 合并两个RDD
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestUnion {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> two = Arrays.asList(1, 6, 7, 8, 9);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Integer> results = oneRDD.union(twoRDD).collect();
System.out.println(results);
}
}
结果是[1, 2, 3, 4, 5, 1, 6, 7, 8, 9]
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
/**
* 返回两个RDD的交集
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestIntersection {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> two = Arrays.asList(1, 6, 7, 8, 9);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Integer> results = oneRDD.intersection(twoRDD).collect();
System.out.println(results);
}
}
结果[1]
RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
/**
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestSubtract {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> two = Arrays.asList(1, 6, 7, 8, 9);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Integer> results = oneRDD.subtract(twoRDD).collect();
System.out.println(results);
}
}
结果:[2, 3, 4, 5]
笛卡尔积就是两两组合的所有组合,这个的开销非常大,譬如A是["a","b","c"],B是["1","2","3"],那笛卡尔积就是(1 a)(1 b)(1 c)(2 a)(2 b)(2 c)(3 a)(3 b)(3 c)
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* 返回笛卡尔积,开销很大
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestCartesian {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3);
List<Integer> two = Arrays.asList(1, 4, 5);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Tuple2<Integer, Integer>> results = oneRDD.cartesian(twoRDD).collect();
System.out.println(results);
}
}
注意,返回的是键值对
[(1,1), (1,4), (1,5), (2,1), (2,4), (2,5), (3,1), (3,4), (3,5)]