Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark实时(四):Strctured Streaming简单应用

Spark实时(四):Strctured Streaming简单应用

作者头像
Lansonli
发布于 2025-05-24 01:41:41
发布于 2025-05-24 01:41:41
8600
代码可运行
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客
运行总次数:0
代码可运行

Strctured Streaming简单应用

一、Output Modes输出模式

Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:

  • Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
  • Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
  • Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。

二、​​​​​​​​​​​​​​Streaming Table API

在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。

案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。

代码示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.lanson.structuredStreaming

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

object StreamTableAPI {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("StreamTableAPI")
      .config("spark.sql.shuffle.partitions", 1)
      .config("spark.sql.warehouse.dir", "./my-spark-warehouse")
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error");
    import spark.implicits._

    //2.读取socket数据,注册流表
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node3")
      .option("port", 9999)
      .load()

    //3.对df进行转换
    val personinfo: DataFrame = df.as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0).toInt, arr(1), arr(2).toInt)
      }).toDF("id", "name", "age")

    //4.将以上personinfo 写入到流表中
    personinfo.writeStream
      .option("checkpointLocation","./checkpoint/dir1")
      .toTable("mytbl")

    import org.apache.spark.sql.functions._

    //5.读取mytbl 流表中的数据
    val query: StreamingQuery = spark.readStream
      .table("mytbl")
      .withColumn("new_age", col("age").plus(6))
      .select("id", "name", "age", "new_age")
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }
}

以上代码编写完成后启动,向控制台输入以下数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22

结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。

三、​​​​​​​​​​​​​​Triggers

Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:

实时处理,以下是支持的Triggers:

Trigger Type

描述

Unspecified(默认)

代码使用:Trigger.ProcessingTime(0L)。代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

代码使用:Trigger.Once()只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

  • 代码使用:Trigger.ProcessingTime(0L)。
  • 代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

  • 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)
  • 查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定
  1. 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理
  2. 如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。
  3. 如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

  • 代码使用:Trigger.Once()
  • 只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

  • 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)
  • 以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

下面以读取Socket数据为例,Scala代码演示各个模式

1、​​​​​​​unspecified(默认模式)

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream
  .format("console")
  .start()

query.awaitTermination()
2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//3.用户指定固定间隔批次触发查询
    val query: StreamingQuery = frame.writeStream
      .format("console")
      .trigger(Trigger.ProcessingTime("5 seconds"))
//      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS)
      .start()
    query.awaitTermination()

注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()
query.awaitTermination()
4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)

Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream
  .format("console")
  //每10ms 记录一次状态,而不是执行一次
  .trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS))
  .option("checkpointLocation","./checkpint/dir4")
  .start()
query.awaitTermination()
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark实时(六):Output Sinks案例演示
当我们对流式数据处理完成之后,可以将数据写出到Flie、Kafka、console控制台、memory内存,或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。
Lansonli
2025/05/24
750
Spark实时(六):Output Sinks案例演示
Structured Streaming快速入门详解(8)
接着上一篇《Spark Streaming快速入门系列(7)》,这算是Spark的终结篇了,从Spark的入门到现在的Structured Streaming,相信很多人学完之后,应该对Spark摸索的差不多了,Spark是一个很重要的技术点,希望我的文章能给大家带来帮助。
刘浩的BigDataPath
2021/04/13
1.6K0
Structured Streaming快速入门详解(8)
2021年大数据Spark(四十七):Structured Streaming Sink 输出
在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下:
Lansonli
2021/10/11
1.1K0
Spark报错记录:Overloaded method foreachBatch with alternatives
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives0. 写在前面1. 报错2. 代码及报错信息3. 原因及纠错4. 参考链接
WHYBIGDATA
2023/01/31
6530
Spark报错记录:Overloaded method foreachBatch with alternatives
初识Structured Streaming
我们可以通过交易数据接口以非常低的延迟获得全球各个比特币交易市场的每一笔比特币的成交价,成交额,交易时间。
lyhue1991
2021/01/29
4.5K0
初识Structured Streaming
Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍
一,概述 Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计算。随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等等。计算的执行也是基于优化后的sparksql引擎。通过checkpointing
Spark学习技巧
2018/01/30
2.5K0
Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍
数据湖(十六):Structured Streaming实时写入Iceberg
目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。
Lansonli
2022/07/11
9310
数据湖(十六):Structured Streaming实时写入Iceberg
❤️Spark的关键技术回顾,持续更新!【推荐收藏加关注】❤️
集群环境:CDH版本是5.14.0这个版本 但由于spark对应的5.14.0的CDH版本的软件默认的版本是1.6.0同时阉割了SarkSQL,需要重新编译 原因: 因为Cloudera公司认为有了impala就不需要再使用sparkSQL的功能了,同时也是为了推广impala,所以直接阉割掉了sparkSQL的模块。 解决: 使用Apache的版本的spark来进行重新编译
Lansonli
2021/10/11
5330
Note_Spark_Day13:Structured Streaming(内置数据源、自定义Sink(2种方式)和集成Kafka)
此检查点位置必须是HDFS兼容文件系统中的路径,两种方式设置Checkpoint Location位置:
Maynor
2021/12/07
2.8K0
Note_Spark_Day13:Structured Streaming(内置数据源、自定义Sink(2种方式)和集成Kafka)
Spark笔记17-Structured Streaming
Structured Streaming将实时数据视为一张正在不断添加数据的表。
皮大大
2021/03/02
7180
Spark实时(五):InputSource数据源案例演示
在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。
Lansonli
2025/05/24
970
Spark实时(五):InputSource数据源案例演示
Spark Structured Streaming + Kafka使用笔记
这篇博客将会记录Structured Streaming + Kafka的一些基本使用(Java 版)
大鹅
2020/10/29
3.6K0
Note_Spark_Day14:Structured Streaming(以结构化方式处理流式数据,底层分析引擎SparkSQL引擎)
连续处理(Continuous Processing)是“真正”的流处理,通过运行一个long-running的operator用来处理数据。
Maynor
2021/12/07
2.6K0
Note_Spark_Day14:Structured Streaming(以结构化方式处理流式数据,底层分析引擎SparkSQL引擎)
Spark流计算Structured Streaming实践总结
Structured Streaming是基于Spark SQL引擎的可扩展、可容错流计算引擎。用户可以向使用批计算一样的方式使用流计算。Spark SQL持续增量计算流数据输出结果。目前支持多种开发语言Scala、Java、Python、R等等。通过checkpoint和wal机制确保端到端exactly-once语义。
用户9421738
2024/08/30
3000
Spark流计算Structured Streaming实践总结
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的 end-to-end exactly-once guarantees。
Lansonli
2021/10/11
1.5K0
Structured Streaming教程(2) —— 常用输入与输出
Structured Streaming 提供了几种数据源的类型,可以方便的构造Steaming的DataFrame。默认提供下面几种类型:
用户1154259
2018/07/31
1.4K0
2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
在SparkStreaming中窗口统计分析:Window Operation(设置窗口大小WindowInterval和滑动大小SlideInterval),按照Streaming 流式应用接收数据的时间进行窗口设计的,其实是不符合实际应用场景的。
Lansonli
2021/10/11
1.7K0
看了这篇博客,你还敢说不会Structured Streaming?
本篇博客,博主为大家带来的是关于Structured Streaming从入门到实战的一个攻略,希望感兴趣的朋友多多点赞支持!!
大数据梦想家
2021/01/27
1.7K0
看了这篇博客,你还敢说不会Structured Streaming?
Spark 2.0 Structured Streaming 分析
Spark 2.0 将流式计算也统一到DataFrame里去了,提出了Structured Streaming的概念,将数据源映射为一张无线长度的表,同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据,复用了其对象的Catalyst引擎。
用户2936994
2018/08/27
7990
Spark 2.0 Structured Streaming 分析
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。
Lansonli
2021/10/11
1.4K0
推荐阅读
相关推荐
Spark实时(六):Output Sinks案例演示
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验