前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

作者头像
Lansonli
发布2021-10-09 16:47:10
发布2021-10-09 16:47:10
49400
代码可运行
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客
运行总次数:0
代码可运行

SparkStreaming实战案例一 WordCount

需求

从TCP Socket数据源实时消费数据,对每批次Batch数据进行词频统计WordCount,流程图如下:

准备工作

1.在node01上安装nc命令

nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

代码语言:javascript
代码运行次数:0
复制
yum install -y nc

2.在node01启动客户端工具发送消息

代码语言:javascript
代码运行次数:0
复制
 nc -lk 9999

代码实现

http://spark.apache.org/docs/latest/streaming-programming-guide.html

从官方文档可知,提供两种方式构建StreamingContext实例对象,如下:

 第一种方式:构建SparkConf对象

 第二种方式:构建SparkContext对象

完整代码如下所示:

代码语言:javascript
代码运行次数:0
复制
package cn.itcast.streaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
 */
object SparkStreamingDemo01_WordCount {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //batchDuration the time interval at which streaming data will be divided into batches
    //流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)

    val resultDStream: DStream[(String, Int)] = inputDStream
      .filter(StringUtils.isNotBlank(_))
      .flatMap(_.trim.split("\\s+"))
      .map((_, 1))
      .reduceByKey(_ + _)

    resultDStream.print(10)

    // 启动并等待程序停止
    // 对于流式应用来说,需要启动应用
    ssc.start()
    // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
    ssc.awaitTermination()
    // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
    //注意:
    //上面的代码可以做WordCount,但是只能对当前批次的数据进行累加!
  }
}

应用监控

运行上述词频统计案例,登录到WEB UI监控页面:http://localhost:4040/

查看相关监控信息。

 其一、Streaming流式应用概要信息

运行结果监控截图:

每批次Batch数据处理总时间TD = 批次调度延迟时间SD + 批次数据处理时间PT

 其二、性能衡量标准

SparkStreaming实时处理数据性能如何(是否可以实时处理数据)??如何衡量的呢??

需要满足:

每批次数据处理时间TD  <=  BatchInterval每批次时间间隔

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/04/21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SparkStreaming实战案例一 WordCount
    • 需求
      • 准备工作
    • 代码实现
      •  第一种方式:构建SparkConf对象
      •  第二种方式:构建SparkContext对象
      • 完整代码如下所示:
    • 应用监控
      •  其一、Streaming流式应用概要信息
      •  其二、性能衡量标准
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档