本文已收录至Github,推荐阅读 👉 Java随想录
接前面上篇,此为中篇。
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
先来看下Flink提供的状态有哪些,Flink中状态分为两种类型:
基于KeyedStream上的状态,这个状态是跟特定的Key绑定,KeyedStream流上的每一个Key都对应一个State,每一个Operator可以启动多个Thread处理,但是相同Key的数据只能由同一个Thread处理,因此一个Keyed状态只能存在于某一个Thread中,一个Thread会有多个Keyed state。
Operator State与Key无关,而是与Operator绑定,整个Operator只对应一个State。比如:Flink中的Kafka Connector就使用了Operator State,它会在每个Connector实例中,保存该实例消费Topic的所有(partition, offset)映射。
Flink针对Keyed State提供了以下可以保存State的数据结构
<IN, OUT>
:保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState
相反的是, 聚合类型可能与添加到状态的元素的类型不同。使用 add(IN)
添加的元素会调用用户指定的 AggregateFunction
进行聚合。ReducingState
相反,聚合类型可能与添加到状态的元素类型不同。 使用add(T)
添加的元素会调用用户指定的 FoldFunction
折叠成聚合值。案例1:使用ValueState keyed state检查车辆是否发生了急加速
object ValueStateTest {
case class CarInfo(carId: String, speed: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01", 8888)
stream.map(data => {
val arr = data.split(" ")
CarInfo(arr(0), arr(1).toLong)
}).keyBy(_.carId)
.map(new RichMapFunction[CarInfo, String]() {
//保存上一次车速
private var lastTempState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
val lastTempStateDesc = new ValueStateDescriptor[Long]("lastTempState", createTypeInformation[Long])
lastTempState = getRuntimeContext.getState(lastTempStateDesc)
}
override def map(value: CarInfo): String = {
val lastSpeed = lastTempState.value()
this.lastTempState.update(value.speed)
if ((value.speed - lastSpeed).abs > 30 && lastSpeed != 0)
"over speed" + value.toString
else
value.carId
}
}).print()
env.execute()
}
}
案例2:使用 MapState 统计单词出现次数
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//MapState 实现 WordCount
object KeyedStateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(List("I love you","hello spark","hello flink","hello hadoop"))
val pairStream = stream.flatMap(_.split(" ")).map((_,1)).keyBy(_._1)
pairStream.map(new RichMapFunction[(String,Int),(String,Int)] {
private var map:MapState[String,Int] = _
override def open(parameters: Configuration): Unit = {
//定义map state存储的数据类型
val desc = new MapStateDescriptor[String,Int]("sum",createTypeInformation[String],createTypeInformation[Int])
//注册map state
map = getRuntimeContext.getMapState(desc)
}
override def map(value: (String, Int)): (String, Int) = {
val key = value._1
val v = value._2
if(map.contains(key)){
map.put(key,map.get(key) + 1)
}else{
map.put(key,1)
}
val iterator = map.keys().iterator()
while (iterator.hasNext){
val key = iterator.next()
println("word:" + key + "\t count:" + map.get(key))
}
value
}
}).setParallelism(3)
env.execute()
}
}
案例3:使用ReducingState统计每辆车的速度总和
import com.msb.state.ValueStateTest.CarInfo
import org.apache.flink.api.common.functions.{ReduceFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//统计每辆车的速度总和
object ReduceStateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01", 8888)
stream.map(data => {
val arr = data.split(" ")
CarInfo(arr(0), arr(1).toLong)
}).keyBy(_.carId)
.map(new RichMapFunction[CarInfo, CarInfo] {
private var reduceState: ReducingState[Long] = _
override def map(elem: CarInfo): CarInfo = {
reduceState.add(elem.speed)
println("carId:" + elem.carId + " speed count:" + reduceState.get())
elem
}
override def open(parameters: Configuration): Unit = {
val reduceDesc = new ReducingStateDescriptor[Long]("reduceSpeed", new ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
}, createTypeInformation[Long])
reduceState = getRuntimeContext.getReducingState(reduceDesc)
}
})
env.execute()
}
}
案例4:使用AggregatingState统计每辆车的速度总和
import com.msb.state.ValueStateTest.CarInfo
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor, ReducingState, ReducingStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//统计每辆车的速度总和
object ReduceStateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01", 8888)
stream.map(data => {
val arr = data.split(" ")
CarInfo(arr(0), arr(1).toLong)
}).keyBy(_.carId)
.map(new RichMapFunction[CarInfo, CarInfo] {
private var aggState: AggregatingState[Long,Long] = _
override def map(elem: CarInfo): CarInfo = {
aggState.add(elem.speed)
println("carId:" + elem.carId + " speed count:" + aggState.get())
elem
}
override def open(parameters: Configuration): Unit = {
val aggDesc = new AggregatingStateDescriptor[Long,Long,Long]("agg",new AggregateFunction[Long,Long,Long] {
//初始化累加器值
override def createAccumulator(): Long = 0
//往累加器中累加值
override def add(value: Long, acc: Long): Long = acc + value
//返回最终结果
override def getResult(accumulator: Long): Long = accumulator
//合并两个累加器值
override def merge(a: Long, b: Long): Long = a+b
},createTypeInformation[Long])
aggState = getRuntimeContext.getAggregatingState(aggDesc)
}
})
env.execute()
}
}
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的enableCheckpointing()方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000);
这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。
检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。具体可以通过调用检查点配置的 setCheckpointStorage()来配置,需要传入一个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)。对于实际生产应用,我们一般会将 CheckpointStorage 配置为高可用的分布式文件系统(HDFS,S3 等)。
Flink中基于异步轻量级的分布式快照技术提供了Checkpoint容错机制,分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理,包括上面提到的用户自定义使用的Keyed State和Operator State,当未来程序出现问题,可以基于保存的快照容错。
Flink会在输入的数据集上间隔性地生成checkpoint barrier,通过栅栏(barrier)将间隔时间段内的数据划分到相应的checkpoint中。当程序出现异常时,Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。例如在KafkaConsumer算子中维护offset状态,当系统出现问题无法从Kafka中消费数据时,可以将offset记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。
默认情况Flink不开启检查点,用户需要在程序中通过调用方法配置和开启检查点,另外还可以调整其他相关参数
开启检查点并且指定检查点时间间隔为1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值
env.enableCheckpointing(1000)
选择exactly-once语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,与此同时,Flink的性能也相对较弱,而at-least-once语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。如下通过setCheckpointingMode()方法来设定语义模式,默认情况下使用的是exactly-once模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
超时时间指定了每次Checkpoint执行过程中的上限时间范围,一旦Checkpoint执行时间超过该阈值,Flink将会中断Checkpoint过程,并按照超时处理。该指标可以通过setCheckpointTimeout方法设定,默认为10分钟
env.getCheckpointConfig.setCheckpointTimeout(5 60 1000)
该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
设置为RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留CheckPoint数据,以便根据实际需要恢复到指定的CheckPoint
设置为DELETE_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会删除CheckPoint数据,只有Job执行失败的时候才会保存CheckPoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制。Savepoints是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
要使用Savepoints,需要按照以下步骤进行:
bin/flink savepoint <jobID> targetDirectory
其中,<jobID>
是您要保存状态的Flink作业的Job ID,[targetDirectory]
是可选的目标目录,用于保存Savepoint数据。如果没有提供targetDirectory
,Savepoint将会保存到Flink配置中所配置的状态后端中。
bin/flink run -s :savepointPath :runArgs
其中,savepointPath
是之前生成的Savepoint的路径,runArgs
是您提交作业时的其他参数。
在Flink中提供了StateBackend来存储和管理状态数据
Flink一共实现了三种类型的状态管理器:MemoryStateBackend、FsStateBackend、RocksDBStateBackend
基于内存的状态管理器将状态数据全部存储在JVM堆内存中。基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。
Flink将MemoryStateBackend作为默认状态后端管理器
env.setStateBackend(new MemoryStateBackend(100*1024*1024))
注意:聚合类算子的状态会同步到JobManager内存中,因此对于聚合类算子比较多的应用会对JobManager的内存造成一定的压力,进而影响集群。
和MemoryStateBackend有所不同,FsStateBackend是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统
env.setStateBackend(new FsStateBackend("path",true))
如果path是本地文件路径,其格式:file:///
如果path是HDFS文件路径,格式为:hdfs://
第二个参数代表是否异步保存状态数据到HDFS,异步方式能够尽可能避免checkpoint的过程中影响流式计算任务。FsStateBackend更适合任务量比较大的应用,例如:包含了时间范围非常长的窗口计算,或者状态比较大的场景。
RocksDBStateBackend是Flink中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend需要单独引入相关的依赖包到工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.2</version>
</dependency>
env.setStateBackend(new RocksDBStateBackend("hdfs://"))
RocksDBStateBackend采用异步的方式进行状态数据的Snapshot,任务中的状态数据首先被写入本地RockDB中,这样在RockDB仅会存储正在进行计算的热数据,而需要进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。
与FsStateBackend相比,RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。
全局配置需要需改集群中的配置文件,修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.backend: jobmanager
state.backend.rocksdb.checkpoint.transfer.thread.num: 1 同时操作RocksDB的线程数
state.backend.rocksdb.localdir: 本地path RocksDB存储状态数据的本地文件路径
在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有数据都到齐了才开始处理。所以聚合计算其实在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。
说白了窗口就是将无界流通过窗口切割成一个个的有界流,窗口是左开右闭的。
Flink中的窗口分为两类:基于时间的窗口(Time-based Window)和基于数量的窗口(Count-based Window)。
时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。
计数窗口包含了:滚动计数窗口和滑动计数窗口。
时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以由不同的功能应用。
根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。
滚动窗口每个窗口的大小固定,且相邻两个窗口之间没有重叠。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。
基于时间的滚动窗口:
DataStream<T> input = ...
// tumbling event-time windows
input
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function> (...)
// tumbling processing-time windows
input
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function> (...)
在上面的代码中,我们使用了TumblingEventTimeWindows
和TumblingProcessingTimeWindows
来创建基于Event Time或Processing Time的滚动时间窗口。窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time
中的seconds
、minutes
、hours
和days
来设置。
基于计数的滚动窗口:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TumblingCountWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
input
.keyBy(value -> 1)
.countWindow(3)
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
})
.print();
env.execute();
}
}
在上面的代码中,我们使用了countWindow
方法来创建一个基于数量的滚动窗口,窗口大小为3个元素。当窗口中的元素数量达到3时,窗口就会触发计算。在这个例子中,我们使用了reduce
函数来对窗口中的元素进行求和。
滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。同样,滑动窗口也可以基于时间和计算定义。
滑动窗口的参数有两个:窗口大小和滑动步长。滑动步长是固定的。
基于时间的滑动窗口:
DataStream<T> input = ...
// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function> (...)
基于计数的滑动窗口:
DataStream<T> input = ...
input
.keyBy(...)
.countWindow(10, 5)
.<window function> (...)
countWindow
方法来创建一个基于计数的滑动窗口,窗口大小为10个元素,滑动步长为5个元素。当窗口中的元素数量达到10时,窗口就会触发计算。
会话窗口是Flink中一种基于时间的窗口类型,每个窗口的大小不固定,且相邻两个窗口之间没有重叠。“会话”终止的标志就是隔一段时间没有数据来:
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<T> input = ...
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function> (...)
在上面的代码中,使用了EventTimeSessionWindows
来创建基于Event Time的会话窗口。withGap
方法用来设置会话窗口之间的间隔时间,当两个元素之间的时间差超过这个值时,它们就会被分配到不同的会话窗口中。
在Flink中,数据流可以按键分区(keyed)或非按键分区(non-keyed)。按键分区是指将数据流根据特定的键值进行分区,使得相同键值的元素被分配到同一个分区中。这样可以保证相同键值的元素由同一个worker实例处理。只有按键分区的数据流才能使用键分区状态和计时器。
非按键分区是指数据流没有根据特定的键值进行分区。这种情况下,数据流中的元素可以被任意分配到不同的分区中。
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作。
按键分区窗口:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class KeyedWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
input
.keyBy(value -> 1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
})
.print();
env.execute();
}
}
在上面的代码中,使用了keyBy
方法来对数据流进行按键分区,然后使用window
方法来创建一个基于Event Time的滚动时间窗口。在这个例子中,我们使用了reduce
函数来对窗口中的元素进行求和。
非按键分区窗口:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class NonKeyedWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
AllWindowedStream<Long, ?> windowedStream = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
windowedStream.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}).print();
env.execute();
}
}
在上面的代码中,使用了windowAll
方法来对非按键分区的数据流进行窗口操作。windowAll
方法接受一个WindowAssigner
参数,用来指定窗口类型。然后使用了reduce
函数来对窗口中的元素进行求和。
按键分区窗口(Keyed Windows)经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
非按键分区(Non-Keyed Windows)如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式
所谓的“窗口函数”(window functions),就是定义窗口如何进行计算的操作。
窗口函数根据处理的方式可以分为两类:增量聚合函数和全量聚合函数。
增量聚合函数每来一条数据就立即进行计算,中间保持着聚合状态;但是不立即输出结果。等到窗口到了结束时间需要输出计算结果的时候,取出之前聚合的状态直接输出。
常见的增量聚合的函数有:reduce(reduceFunction)、aggregate(aggregateFunction)、sum()、min()、max()。
下面是一个使用增量聚合函数的Java代码示例:
DataStream<Tuple2<String, Integer>> input = ...
input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
return new Tuple2<>(t0.f0, t0.f1 + t1.f1);
}
});
这段代码首先使用keyBy
方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的时间窗口,并使用reduce
方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是将具有相同key(即f0相同)的元素的第二个元素(f1)相加。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值之和的数据流。
另外还有一个常用的函数是聚合函数(AggregateFunction),ReduceFunction和AggregateFunction都是增量聚合函数,但它们之间有一些区别。AggregateFunction则更加灵活,ReduceFunction的输入类型、输出类型和中间状态类型必须相同,而AggregateFunction则允许这三种类型不同。
例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用ReduceFunction,那么我们应该先把数据转换成二元组 (sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。而使用AggregateFunction则可以更加简单地实现这个需求。
下面是使用AggregateFunction计算平均值的代码示例:
DataStream<Tuple2<String, Double>> input = ...
input
.keyBy(new KeySelector<Tuple2<String, Double>, String>() {
@Override
public String getKey(Tuple2<String, Double> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple2<Double, Integer>, Double>() {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0, 0);
}
@Override
public Tuple2<Double, Integer> add(Tuple2<String, Double> value, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
});
这段代码首先使用keyBy
方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的翻滚事件时间窗口,并使用aggregate
方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是计算具有相同key(即f0相同)的元素的第二个元素(f1)的平均值。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值平均值的数据流。
全量聚合函数(Full Window Functions)是指在整个窗口中的所有数据都准备好后才进行计算。Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。
与增量聚合函数不同,全窗口函数可以访问窗口中的所有数据,因此可以执行更复杂的计算。例如,可以计算窗口中数据的中位数,或者对窗口中的数据进行排序。
WindowFunction接收一个Iterable类型的输入,其中包含了窗口中所有的数据。ProcessWindowFunction则更加强大,它不仅可以访问窗口中的所有数据, 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。WindowFunction作用可以被 ProcessWindowFunction 全覆盖。一般在实际应用,用 ProcessWindowFunction比较多,直接使用 ProcessWindowFunction 就可以了。
下面是使用WindowFunction计算窗口内数据总和的代码示例:
public class SumWindowFunction extends WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> value : input) {
sum += value.f1;
}
out.collect(new Tuple2<>(key, sum));
}
}
DataStream<Tuple2<String, Integer>> input = ...
input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new SumWindowFunction());
下面是一个使用ProcessWindowFunction统计网站1天UV的代码示例。在这个例子中,我们使用了状态来存储每个窗口中访问过网站的用户ID,以便在窗口结束时计算UV。此外,我们还使用了定时器,在窗口结束时触发计算UV的操作。我们还使用了context对象来获取窗口的开始时间和结束时间,并将它们输出到结果中:
public class UVProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, Tuple3<String, Long, Integer>, String, TimeWindow> {
private ValueState<Set<String>> userIdState; // 状态,用来存储每个窗口中访问过网站的用户ID
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
ValueStateDescriptor<Set<String>> stateDescriptor = new ValueStateDescriptor<>("userIdState", new SetTypeInfo<>(Types.STRING));
userIdState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
Set<String> userIds = userIdState.value();
if (userIds == null) {
userIds = new HashSet<>();
}
for (Tuple2<String, String> value : input) {
userIds.add(value.f0); // 将用户ID添加到状态中
}
userIdState.update(userIds);
context.timerService().registerEventTimeTimer(context.window().getEnd()); // 注册定时器,在窗口结束时触发计算UV的操作
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
super.onTimer(timestamp, ctx, out);
Set<String> userIds = userIdState.value();
if (userIds != null) {
long windowStart = ctx.window().getStart();
out.collect(new Tuple3<>(ctx.getCurrentKey(), windowStart, userIds.size())); // 计算UV并输出结果,包括窗口的开始时间和结束时间
userIdState.clear(); // 清空状态
}
}
}
DataStream<Tuple2<String, String>> input = ... // 输入数据流,其中第一个字段为用户ID,第二个字段为网站URL
input.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f1; // 按照网站URL分组
}
})
.window(TumblingEventTimeWindows.of(Time.days(1))) // 设置窗口大小为1天
.process(new UVProcessWindowFunction());
全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。所以运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。
增量聚合的优点:高效,输出更加实时。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。
全窗口的优点:提供更多的信息,可以认为是更加“通用”的窗口操作。
它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者ProcessWindowFunction。
// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
下面我们举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接,想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class UrlCountViewExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(100);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序流的watermark生成
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream.print("input");
//统计每个url的访问量
stream.keyBy(data -> data.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
.print();
env.execute();
}
//增量聚合,来一条数据 + 1
public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
//包装窗口信息,输出UrlViewCount
public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
Long count = elements.iterator().next();
out.collect(new UrlViewCount(s,count,start,end));
}
}
}
为了方便处理,单独定义了一个POJO类,来表示输出结果的数据类型
public class UrlViewCount {
public String url;
public Long count;
public Long windowStart;
public Long windowEnd;
public UrlViewCount() {
}
public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
this.url = url;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "UrlViewCount{" +
"url='" + url + '\'' +
", count=" + count +
", windowStart=" + new Timestamp(windowStart) +
", windowEnd=" + new Timestamp(windowEnd) +
'}';
}
}
代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一,得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。
窗口重叠是指在使用滑动窗口时,多个窗口之间存在重叠部分。这意味着同一批数据可能会被多个窗口同时处理。
例如,假设我们有一个数据流,它包含了0到9的整数。我们定义了一个大小为5的滑动窗口,滑动距离为2。那么,我们将会得到以下三个窗口:
在这个例子中,窗口1和窗口2之间存在重叠部分,即2, 3, 4。同样,窗口2和窗口3之间也存在重叠部分,即4, 5, 6。
enableOptimizeWindowOverlap
方法是用来启用Flink的窗口重叠优化功能的。它可以减少计算重叠窗口时的计算量。
在我之前给出的代码示例中,我没有使用enableOptimizeWindowOverlap
方法来启用窗口重叠优化功能。这意味着Flink不会尝试优化计算重叠窗口时的计算量。
如果你想使用窗口重叠优化功能,你可以在你的代码中添加以下行:
env.getConfig().enableOptimizeWindowOverlap();
这将启用窗口重叠优化功能,Flink将尝试优化计算重叠窗口时的计算量。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。