前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Spark入门第一步:WordCount之java版、Scala版

Spark入门第一步:WordCount之java版、Scala版

作者头像
趣学程序-shaofeer
发布2019-10-29 16:36:20
发布2019-10-29 16:36:20
1.8K00
代码可运行
举报
文章被收录于专栏:upuptop的专栏upuptop的专栏
运行总次数:0
代码可运行

Spark入门第一步:WordCount之java版、Scala版

Spark入门系列,第一步,编写WordCount程序。

我们分别使用java和scala进行编写,从而比较二者的代码量

数据文件 通过读取下面的文件内容,统计每个单词出现的次数

代码语言:javascript
代码运行次数:0
运行
复制
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hadoop hdfs map reduce
java scala python android
spark storm spout bolt
kafka MQ
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hive hbase mysql oracle sqoop
hadoop hdfs map reduce

代码实现

•使用java代码进行编写

代码语言:javascript
代码运行次数:0
运行
复制
package top.wintp.java_spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
 * @author: pyfysf
 * <p>
 * @qq: 337081267
 * <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 * <p>
 * @blog: http://wintp.top
 * <p>
 * @email: pyfysf@163.com
 * <p>
 * @time: 2019/10/26
 */
public class SparkWordCount {
    public static void main(String[] args) {
        //    复杂模式
        //    创建SparkConf
        SparkConf conf = new SparkConf();
        conf.setAppName("spark_demo_java");
        conf.setMaster("local");

        //    创建javaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //    读取文件
        JavaRDD<String> lines = sc.textFile("./data/words.txt");
        //    截取单词
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split("\\s+")).iterator();
            }
        });
        //    对单词进行计数
        JavaPairRDD<String, Integer> pairWord = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //    根据key进行计算
        JavaPairRDD<String, Integer> result = pairWord.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i, Integer i2) throws Exception {
                return i + i2;
            }
        });

        //打印结果
        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });
        sc.stop();
    }
}

•利用lamda表达式简化java代码

代码语言:javascript
代码运行次数:0
运行
复制
package top.wintp.java_spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
 * @author: pyfysf
 * <p>
 * @qq: 337081267
 * <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 * <p>
 * @blog: http://wintp.top
 * <p>
 * @email: pyfysf@163.com
 * <p>
 * @time: 2019/10/26
 */
public class SparkWordCount {
    public static void main(String[] args) {
        //lamda表达式
        SparkConf conf = new SparkConf();
        conf.setAppName("spark_demo_java");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("./data/words.txt");
        JavaRDD<String> words = lines.flatMap((String line) -> Arrays.asList(line.split("\\s+")).iterator());
        JavaPairRDD<String, Integer> pairWords = words.mapToPair((String s) -> new Tuple2<>(s, 1));
        JavaPairRDD<String, Integer> result = pairWords.reduceByKey(Integer::sum);
        result.foreach((Tuple2<String, Integer> res) -> System.out.println(res));
        sc.stop();
    }
}

•使用scala代码编写

代码语言:javascript
代码运行次数:0
运行
复制
package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author: pyfysf
 *          <p>
 * @qq: 337081267
 *      <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 *        <p>
 * @blog: http://wintp.top
 *        <p>
 * @email: pyfysf@163.com
 *         <p>
 * @time: 2019/10/26
 */
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    //    完整版
    //    创建配置对象
    val conf = new SparkConf()
    //    设置运行模式
    conf.setMaster("local")
    //    设置任务名称
    conf.setAppName("sparkTest")
    //    创建SparkContext对象
    val sc = new SparkContext(conf)
    //    读取文件
    val lines = sc.textFile("./data/words.txt")
    //    切割文件
    val words = lines.flatMap((line: String) => {
      line.split("\\s+")
    })
    //    对word进行计数
    val pariWrod = words.map((tmp: String) => {
      new Tuple2(tmp, 1)
    })
    //    根据key进行聚合
    val result = pariWrod.reduceByKey((v1: Int, v2: Int) => {
      v1 + v2
    })
    //    输出结果
    result.foreach(println)
    //    释放资源
    sc.stop()
  }
}

•利用scala的特性简化代码

代码语言:javascript
代码运行次数:0
运行
复制
package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author: pyfysf
 *          <p>
 * @qq: 337081267
 *      <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 *        <p>
 * @blog: http://wintp.top
 *        <p>
 * @email: pyfysf@163.com
 *         <p>
 * @time: 2019/10/26
 */
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    //    简洁版
    val conf = new SparkConf().setAppName("sparkDemo").setMaster("local")
    val sc = new SparkContext(conf)
    val result = sc.textFile("./data/words.txt")
                    .flatMap(_.split("\\s+"))
                    .map((_, 1))
                    .reduceByKey(_ + _)
    result.foreach(println)
    sc.stop()
  }
}

建议大家对于java版和scala版本的这两种方式都要掌握。特别是scala的一行代码版本。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 趣学程序 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark入门第一步:WordCount之java版、Scala版
  • 代码实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档