1、概述
Spark Streaming是Spark API核心的扩展,支持可扩展,高吞吐量,实时数据流的容错流处理。数据可以从像kafka,flume,,Kinesis或TCP Socket许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理map,reduce,join和window。最后,处理的数据可以推送到文件系统,数据库和实时(dashborad)仪表板。事实上,您可以将Spark的 机器学习(ML)和 图形处理(Graphx)算法应用于数据流。
park Streaming提供了一个高层次的抽象,称为离散流或DStream,它代表连续的数据流。DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,一个DStream被表示为一系列RDD。
2、快速示例
以下示例,及后面部分示例都将使用IDEA。同时请在本地安装好Maven环境,Maven版本3.x以上。
2.1、通过spark-submit提交任务步1:创建一个Maven项目添加spark-streaming的依赖
org.apache.spark
spark-streaming_2.11
2.1.2
步2:开发以下源代码
packagecn.spark.streaming
importorg.apache.spark.SparkConf
importorg.apache.spark.storage.StorageLevel
importorg.apache.spark.streaming.dstream.DStream
importorg.apache.spark.streaming.
/**
*收集时时从网络的传递来的数据进行分析
*/
objectDemo01_NetworkWordCount {
defmain(args: Array[String]): Unit = {
valconf: SparkConf =newSparkConf().setAppName("Streaming_NetworkWordCount");
valssc: StreamingContext =newStreamingContext(conf,Seconds(2));
//声明读取的网络地址和端口
vallines: DStream[String] = ssc.socketTextStream("hadoop201",9999, StorageLevel.MEMORY_AND_DISK);
//处理数据
valwordCount: DStream[(String, Int)] = lines.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _);
wordCount.map(kv => kv._1 +"\t"+ kv._2).print(10);//输出前10行
//启动
ssc.start();
ssc.awaitTermination();
}
}
步3:打包
使用IDEA打包
步4:运行测试
在Linux上安装nc软件,使用命令:
$ sudo yum install -y nc
(关于nc的参数,请自行baidu)
现在启动nc并绑定9999端口:
$nc -lk hadoop2019999
将打好的jar包上传到linux并使用spark-submit提交任务:
-------------------------------------------
Time: 1515499200000 ms
-------------------------------------------
默认情况下,每将收集数据时,都会输出一个时间,如上面所示。
现在在nc端输入一行字符串,并用空格分开:
Jack Mary Rose
查看spark端收集并显示的数据:
-------------------------------------------
Time: 1515499196000 ms
-------------------------------------------
Mary 1
Rose 1
Jack 1
要停止Spartstreaming和nc请按下CTRL+C即可。
2.2、本地调试
Spark-Stream也可以在IDEA中进行测试,只要被监听的服务器地址在同一个网内可见即可。
本地调试,必须要设置master(...)可以是local也可以是standalone的集群或是yarn。以下代码:
packagecn.spark.streaming
importorg.apache.log4j.
importorg.apache.spark.SparkConf
importorg.apache.spark.storage.StorageLevel
importorg.apache.spark.streaming.dstream.DStream
importorg.apache.spark.streaming.
/**
*收集时时从网络的传递来的数据进行分析
*/
objectDemo01_NetworkWordCount {
defmain(args: Array[String]): Unit = {
//设置日志的级别,否则会输出很多INFO的信息
Logger.getLogger("org").setLevel(Level.WARN);
valconf: SparkConf =newSparkConf().setAppName("Streaming_NetworkWordCount");
//在编程过程中必须要设置master,如果在通过spark-submit提交则此项目可以忽略
//通过--master ..参数设置即可
conf.setMaster("local[2]");
//声明StreamingContext设置每2秒执行一次数据读取
valssc: StreamingContext =newStreamingContext(conf,Seconds(2));
//声明读取的网络地址和端口
vallines: DStream[String] = ssc.socketTextStream("hadoop201",9999, StorageLevel.MEMORY_AND_DISK);
//处理数据
valwordCount: DStream[(String, Int)] =
lines.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _);
wordCount.map(kv => kv._1 +"\t"+ kv._2).print(10);//输出前10行
//启动
ssc.start();
ssc.awaitTermination();
}
}
然后直接以本地方式运行,由于在代码中设置了日志的级别为:WARN所以,很多INFO信息将不再显示:
查看IDEA的后台,请保证在Liunx上nc已经先启动,然后启动spartstreaming以后再在nc端输出一些测试的数据,以下是显示的结果:
-------------------------------------------
Time: 1515499824000 ms
-------------------------------------------
ALex1
Hello1
Mike1
Rose1
Jack2
查看4040端口UI界面,您将会发现大量已经完成的Job
2.3、运行在spark://集群上TODO3、基本概念3.1、初始化StreamingContext
要创建Spark Streaming Application,必须要创建StreamingContext对象。可以通过SparkConf对象来创建StreamingContext对象:
valconf: SparkConf =
newSparkConf().setAppName("Streaming_NetworkWordCount");
valssc: StreamingContext =newStreamingContext(conf,Seconds(2));
也可以通过SparkContext对象来创建StreamingContext对象:
valsc:SparkContext =newSparkContext(conf);
valssc2:StreamingContext =newStreamingContext(sc,Seconds(2));
3.2、定义上下文之后,您必须执行以下操作。
1.通过创建输入DStreams来定义输入源。
2.通过将转换和输出操作应用于DStream来定义流式计算。
3.开始接收数据并使用它进行处理streamingContext.start()。
4.等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()。
5.处理可以手动停止使用streamingContext.stop()。
3.3、要记住的要点:
1.一旦StreamingContext已经开始,就不能建立或添加新的流式计算。
2.一旦StreamingContext被停止,它就不能被重新启动。
3.一个JVM中只能有一个StreamingContext同时处于活动状态。
4.停止StreamingContext的stop()也停止了SparkContext。要仅停止StreamingContext,请将可选参数的stop()调用设置stopSparkContext为false。
5.只要先前的StreamingContext在下一个StreamingContext创建之前停止(不停止SparkContext),就可以重新使用SparkContext来创建多个StreamingContext。
3.4、离散流(DStreams)
离散流或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,或者是从源接收的输入数据流,或者是通过转换输入流而生成的处理过的数据流。在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象(详见Spark Programming Guide)。DStream中的每个RDD都包含一定间隔的数据,如下图所示。
DStream上应用的任何操作都会转换为对基础RDD的操作。例如,在前面将线路流转换为字的示例中,该flatMap操作应用于linesDStream中的每个RDD,以生成DStream的wordsRDD。如下图所示。
这些基础的RDD转换由Spark引擎计算。DStream操作隐藏了大部分这些细节,并为开发人员提供了一个更高级别的API。
参考地址:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
领取专属 10元无门槛券
私享最新 技术干货