Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:
在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。
案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。
代码示例如下:
package com.lanson.structuredStreaming
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}
object StreamTableAPI {
def main(args: Array[String]): Unit = {
//1.创建对象
val spark: SparkSession = SparkSession.builder().master("local")
.appName("StreamTableAPI")
.config("spark.sql.shuffle.partitions", 1)
.config("spark.sql.warehouse.dir", "./my-spark-warehouse")
.getOrCreate()
spark.sparkContext.setLogLevel("Error");
import spark.implicits._
//2.读取socket数据,注册流表
val df: DataFrame = spark.readStream
.format("socket")
.option("host", "node3")
.option("port", 9999)
.load()
//3.对df进行转换
val personinfo: DataFrame = df.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
}).toDF("id", "name", "age")
//4.将以上personinfo 写入到流表中
personinfo.writeStream
.option("checkpointLocation","./checkpoint/dir1")
.toTable("mytbl")
import org.apache.spark.sql.functions._
//5.读取mytbl 流表中的数据
val query: StreamingQuery = spark.readStream
.table("mytbl")
.withColumn("new_age", col("age").plus(6))
.select("id", "name", "age", "new_age")
.writeStream
.format("console")
.start()
query.awaitTermination()
}
}
以上代码编写完成后启动,向控制台输入以下数据:
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
结果输入如下:
注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。
Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:
实时处理,以下是支持的Triggers:
Trigger Type | 描述 |
---|---|
Unspecified(默认) | 代码使用:Trigger.ProcessingTime(0L)。代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。 |
Fixed interval micro-batches(固定间隔批次) | 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。如果没有新数据可用,则不会启动微批处理。 |
One-time micro-batch(仅一次性触发) | 代码使用:Trigger.Once()只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。 |
Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段)) | 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。 |
Fixed interval micro-batches(固定间隔批次)
One-time micro-batch(仅一次性触发)
Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))
下面以读取Socket数据为例,Scala代码演示各个模式
代码如下:
//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream
.format("console")
.start()
query.awaitTermination()
代码如下:
//3.用户指定固定间隔批次触发查询
val query: StreamingQuery = frame.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
// .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS)
.start()
query.awaitTermination()
注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。
代码如下:
//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
query.awaitTermination()
Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。
代码如下:
//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream
.format("console")
//每10ms 记录一次状态,而不是执行一次
.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS))
.option("checkpointLocation","./checkpint/dir4")
.start()
query.awaitTermination()
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有