Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Apache Flink:Keyed Window与Non-Keyed Window

Apache Flink:Keyed Window与Non-Keyed Window

作者头像
王知无-import_bigdata
修改于 2019-08-17 15:26:27
修改于 2019-08-17 15:26:27
1.5K0
举报

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。

基本概念

Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。 基于Keyed Window进行编程,用户代码基本结构如下所示:

基于Non-Keyed Window进行编程,用户代码基本结构如下所示:

上面两种编程结构的区别在于: 从编程API上看,Keyed Window编程结构,可以直接对输入的stream按照Key进行操作,输入的stream中识别Key,即输入stream中的每个数据元素哪一部分是作为Key来关联这个数据元素的,这样就可以对stream中的数据元素基于Key进行相关计算操作,如keyBy,可以根据Key进行分组(相同的Key必然可以分到同一组中去)。如果输入的stream中没有Key,比如就是一条日志记录信息,那么无法对其进行keyBy操作。而对于Non-Keyed Window编程结构来说,无论输入的stream具有何种结构(比如是否具有Key),它都认为是无结构的,不能对其进行keyBy操作,而且如果使用Non-Keyed Window函数操作,就会对该stream进行分组(具体如何分组依赖于我们选择的WindowAssigner,它负责将stream中的每个数据元素指派到一个或多个Window中),指派到一个或多个Window中,然后后续应用到该stream上的计算都是对Window中的这些数据元素进行操作。

从计算上看,Keyed Window编程结构会将输入的stream转换成Keyed stream,逻辑上会对应多个Keyed stream,每个Keyed stream会独立进行计算,这就使得多个Task可以对Windowing操作进行并行处理,具有相同Key的数据元素会被发到同一个Task中进行处理。而对于Non-Keyed Window编程结构,Non-Keyed stream逻辑上将不能split成多个stream,所有的Windowing操作逻辑只能在一个Task中进行处理,也就是说计算并行度为1。

在实际编程过程中,我们可以看到DataStream的API也有对应的方法timeWindow()和timeWindowAll(),他们也分别对应着Keyed Window和Non-Keyed Window。

WindowFunction与AllWindowFunction

Flink中对输入stream进行Windowing操作后,将到达的数据元素指派到指定的Window中,或者基于EventTime/ProcessingTime,或者基于Count,或者混合EventTime/ProcessingTime/Count,来对数据元素进行分组。那么,在对分配的Window进行操作时,就需要使用Flink提供的函数(Function),而对于Window的操作,分别基于Keyed Window、Non-Keyed Window提供了WindowFunction、AllWindowFunction,通过实现特定的Window函数,能够访问Window相关的元数据,来满足实际应用需要。下面,我们从类设计的角度,来看下对应的继承层次结构:

  • Keyed Window对应的WindowFunction

Keyed Window对应的WindowFunction类图,如下所示:

通常,如果我们想要自定义处理Window中数据元素的处理逻辑,或者访问Window对应的元数据,可以继承自ProcessWindowFunction类来实现。我们看一下ProcessWindowFunction对应的类声明:

对Keyed stream的Window进行操作,上面泛型对应4个类型参数:

IN表示进入到该ProcessWindowFunction的数据元素的类型,例如stream中上一个操作的输出是包含两个String类型的元组,则IN类型对应为(String, String);

OUT表示该ProcessWindowFunction处理后的输出数据元素的类型,例如输出一个String和一个Long的元组,则OUT类型对应为(String, Long);

KEY有一点不同,需要注意,它并不是面向应用编程用户使用的,而且该值不会提供有意义的业务应用含义,在Keyed Window中它是用来跟踪该Window的,一般应用开发中只需要将其作为输出的Key即可,后面我们会有对应的编程实践;

W类型表示该ProcessWindowFunction作用的Window的类型,例如TimeWindow、GlobalWindow。 下面,我们看一下继承自ProcessWindowFunction需要实现的方法,方法签名如下所示:

进入到该Window,对应着其中一个Keyed stream。属于某个Window的数据元素都在elements这个集合中,我们可以对这些数据元素进行处理。通过context可以访问Window对应的元数据信息,比如TimeWindow的开始时间(start)和结束时间(end)。out是一个Collector,负责收集处理后的数据元素并发送到stream下游进行处理。

  • Non-Keyed Window对应的AllWindowFunction

Non-Keyed Window对应的WindowFunction类图,如下所示:

类似地,如果我们想要自定义处理Window中数据元素的处理逻辑,或者访问Window对应的元数据,可以继承自ProcessAllWindowFunction类来实现。我们看一下ProcessAllWindowFunction对应的类声明:

可以同ProcessWindowFunction对比一下,发现ProcessAllWindowFunction的泛型参数中没有了用来跟踪Window的KEY,因为Non-Keyed Window只在一个Task中进行处理,其它的OUT和W与前面ProcessWindowFunction类相同,不再累述。 继承自ProcessAllWindowFunction,需要实现的方法,如下所示:

该ProcessAllWindowFunction作用于原始输入的stream,所有的数据元素经过Windowing后,都会经过该方法进行处理,在该方法具体处理逻辑与ProcessWindowFunction.process()类似。

编程实践

现在,我们模拟这样一个场景:某个App开发商需要从多个渠道(Channel)推广App,需要通过日志来分析对应的用户行为(安装、打开、浏览、点击、购买、关闭、卸载),我们假设要实时(近实时)统计分析每个时间段内(如每隔5秒)来自不同渠道的用户的行为。 首先,创建一个模拟生成数据的SourceFunction,实现代码如下所示:

有了该数据源,我们就可以基于该SimulatedEventSource来构建Flink Streaming应用程序了。下面,也分别面向Keyed Window和Non-Keyed Window来编程实践,并比较它们不同之处。

  • Keyed Window编程

我们基于Sliding Window(WindowAssigner)来在stream上生成Window,Window大小size=5s,silde=1s,即每个Window计算5s之内的数据元素,每个1s启动一个Window(查看提交该Flink程序的命令行中指定的各个参数值)。同时,基于上面自定义实现的SimulatedEventSource作为输入数据源,创建Flink stream,然后后续就可以对stream进行各种操作了。

处理stream数据,我们希望能够获取到每个Window对应的起始时间和结束时间,然后输出基于Window(起始时间+结束时间)、渠道(Channel)、行为类型进行分组统计的结果,最后将结果数据实时写入到指定Kafka topic中。

我们实现的Flink程序类为SlidingWindowAnalytics,代码如下所示:

首先,对输入stream进行一个map操作,处理输出 ((渠道, 行为类型), 计数)。 其次,基于该结果进行一个keyBy操作,指定Key为(渠道, 行为类型),得到了多个Keyed stream。

接着,对每个Keyed stream应用Sliding Window操作,设置Sliding Window的size和slide值。

然后,因为我们想要获取到Window对应的起始时间和结束时间,所以需要对Windowing后的stream进行一个ProcessWindowFunction操作,这个是我们自定义实现的,在其中获取到Window起始时间和结束时间,并对Windowing的数据进行分组统计(groupBy),然后输出带有Window起始时间和结束时间,以及渠道、行为类型、统计计数这些信息,对应的实现类为MyReduceWindowFunction,代码如下所示:

上面对应于ProcessWindowFunction的泛型参数的值,分别为:IN=((String, String), Long)、OUT=((String, String, String, String), Long)、KEY=Tuple、W=TimeWindow,这样可以对照方法process()中的各个参数的类型来理解。上述代码中,elements中可能存在多个相同的Key的值,但是具有同一个Key的数据元素一定会在同一个Window中(即elements),我们需要对elements进行一个groupBy的内存计算操作,再对每个group中的数据进行汇总计数,输出为((Window开始时间, Window结束时间, 渠道, 行为类型), 累加计数值)。这样,即可有调用stream上的process方法,将该MyReduceWindowFunction实现的示例作为参数值传进去即可。 最后,通过map操作将结果格式化,输出保存到Kafka中。

运行上面我们实现的Flink程序,执行如下命令:

提交运行后,可以通过Flink Web Dashboard查看Job运行状态。可以在Kafka中查看最终结果数据,对应的输出数据示例如下所示:

通过结果可以看到,采用Sliding Window来指派Window,随着时间流逝各个Window之间存在重叠的现象,这正是我们最初想要的结果。

  • Non-Keyed Window编程

这里,我们基于Tumbling Window(WindowAssigner)来在stream上生成Non-Keyed Window。Tumbling Window也被称为固定时间窗口(Fixed Time Window),各个Window的时间长度相同,Window之间没有重叠。

我们想要达到的目标和前面类似,也希望获取到每个Window对应的起始时间和结束时间,所以需要实现一个ProcessWindowAllFunction,但因为是Non-Keyed Window,只有一个Task来负责对所有输入stream中的数据元素指派Window,这在编程实现中并没有感觉到有太大的差异。实现的Flink程序为TumblingWindowAllAnalytics,代码如下所示:

object TumblingWindowAllAnalytics {

var MAX_LAGGED_TIME = 5000L

def checkParams(params: ParameterTool) = {

if (params.getNumberOfParameters < 5) {

println("Missing parameters!\n"

+ "Usage: Windowing "

+ "--window-result-topic <windowed_result_topic> "

+ "--bootstrap.servers <kafka_brokers> "

+ "--zookeeper.connect <zk_quorum> "

+ "--window-all-lagged-millis <window_all_lagged_millis> "

+ "--window-all-size-millis <window_all_size_millis>")

System.exit(-1)

}

}

def main(args: Array[String]): Unit = {

val params = ParameterTool.fromArgs(args)

checkParams(params)

MAX_LAGGED_TIME = params.getLong("window-all-lagged-millis", MAX_LAGGED_TIME)

val windowAllSizeMillis = params.getRequired("window-all-size-millis").toLong

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val stream: DataStream[(String, String)] = env.addSource(new SimulatedEventSource)

// create a Kafka producer for Kafka 0.9.x

val kafkaProducer = new FlinkKafkaProducer09(

params.getRequired("window-result-topic"),

new SimpleStringSchema, params.getProperties

)

stream

.map(t => {

val channel = t._1

val eventFields = t._2.split("\t")

val ts = eventFields(0).toLong

val behaviorType = eventFields(3)

(ts, channel, behaviorType)

})

.assignTimestampsAndWatermarks(new TimestampExtractor(MAX_LAGGED_TIME))

.map(t => (t._2, t._3))

.timeWindowAll(Time.milliseconds(windowAllSizeMillis))

.process(new MyReduceWindowAllFunction())

.map(t => {

val key = t._1

val count = t._2

val windowStartTime = key._1

val windowEndTime = key._2

val channel = key._3

val behaviorType = key._4

Seq(windowStartTime, windowEndTime,

channel, behaviorType, count).mkString("\t")

})

.addSink(kafkaProducer)

env.execute(getClass.getSimpleName)

}

class TimestampExtractor(val maxLaggedTime: Long)

extends AssignerWithPeriodicWatermarks[(Long, String, String)] with Serializable {

var currentWatermarkTs = 0L

override def getCurrentWatermark: Watermark = {

if(currentWatermarkTs <= 0) {

new Watermark(Long.MinValue)

} else {

new Watermark(currentWatermarkTs - maxLaggedTime)

}

}

override def extractTimestamp(element: (Long, String, String),

previousElementTimestamp: Long): Long = {

val ts = element._1

Math.max(ts, currentWatermarkTs)

}

}

}

上面代码中,我们在输入stream开始处理时,调用DataStream的assignTimestampsAndWatermarks方法为stream中的每个数据元素指派时间戳,周期性地生成WaterMark来控制stream的处理进度(Progress),用来提取时间戳和生成WaterMark的实现参考实现类TimestampExtractor。有关WaterMark相关的内容,可以参考后面的参考链接中给出的介绍。

另外,我们实现了Flink的ProcessWindowAllFunction抽象类,对应实现类为MyReduceWindowAllFunction,用来处理每个Window中的数据,获取对应的Window的起始时间和结束时间,实现代码如下所示:

class MyReduceWindowAllFunction

extends ProcessAllWindowFunction[(String, String), ((String, String, String, String), Long), TimeWindow] {

override def process(context: Context,

elements: Iterable[(String, String)],

collector: Collector[((String, String, String, String), Long)]): Unit = {

val startTs = context.window.getStart

val endTs = context.window.getEnd

val elems = elements.map(t => {

((t._1, t._2), 1L)

})

for(group <- elems.groupBy(_._1)) {

val myKey = group._1

val myValue = group._2

var count = 0L

for(elem <- myValue) {

count += elem._2

}

val channel = myKey._1

val behaviorType = myKey._2

val outputKey = (formatTs(startTs), formatTs(endTs), channel, behaviorType)

collector.collect((outputKey, count))

}

}

private def formatTs(ts: Long) = {

val df = new SimpleDateFormat("yyyyMMddHHmmss")

df.format(new Date(ts))

}

}

与Keyed Window实现中的ProcessWindowFunction相比,这里没有了对应的泛型参数KEY,因为这种情况下只有一个Task处理stream输入的所有数据元素,ProcessAllWindowFunction的实现类对所有未进行groupBy(也无法进行,因为数据元素的Key未知)操作得到的Window中的数据元素进行处理,处理逻辑和前面基本相同。 提交Flink程序TumblingWindowAllAnalytics,执行如下命令行:

参考链接

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

— THE END —

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink最难知识点再解析 | 时间/窗口/水印/迟到数据处理
时间、窗口、水印、迟到数据这四个知识点几乎是Flink这个框架最难点。我之前发了很多文章来解释。很多同学仍然理解不了。
王知无-import_bigdata
2020/02/24
5.1K3
全网最详细4W字Flink入门笔记(中)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/07/21
5850
Flink 中极其重要的 Time 与 Window 详细解析(深度好文,建议收藏)
流式:就是数据源源不断的流进来,也就是数据没有边界,但是我们计算的时候必须在一个有边界的范围内进行,所以这里面就有一个问题,边界怎么确定? 无非就两种方式,根据时间段或者数据量进行确定,根据时间段就是每隔多长时间就划分一个边界,根据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会详细讲解。
五分钟学大数据
2021/01/25
1.6K0
Flink 中极其重要的 Time 与 Window 详细解析(深度好文,建议收藏)
全网最详细4W字Flink入门笔记(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/10/16
1K0
全网最详细4W字Flink入门笔记(下)
Flink应用案例统计实现TopN的两种方式
窗口的计算处理,在实际应用中非常常见。对于一些比较复杂的需求,如果增量聚合函数 无法满足,我们就需要考虑使用窗口处理函数这样的“大招”了。 网站中一个非常经典的例子,就是实时统计一段时间内的热门 url。例如,需要统计最近 10 秒钟内最热门的两个 url 链接,并且每 5 秒钟更新一次。我们知道,这可以用一个滑动窗口 来实现,而“热门度”一般可以直接用访问量来表示。于是就需要开滑动窗口收集 url 的访问 数据,按照不同的 url 进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N” 问题。 很显然,简单的增量聚合可以得到 url 链接的访问量,但是后续的排序输出 Top N 就很难 实现了。所以接下来我们用窗口处理函数进行实现。
Maynor
2022/06/30
1.5K0
Flink应用案例统计实现TopN的两种方式
聊聊flink的window操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
code4it
2019/01/01
2.9K0
聊聊flink的window操作
Flink处理函数实战之四:窗口处理
本文是《Flink处理函数实战》系列的第四篇,内容是学习以下两个窗口相关的处理函数:
程序员欣宸
2021/04/19
1.8K0
Flink处理函数实战之四:窗口处理
基于 flink 的电商用户行为数据分析【6】| APP市场推广统计
本篇是flink 的「电商用户行为数据分析」的第6篇文章,为大家带来的是市场营销商业指标统计分析之APP市场推广统计的内容,通过本期内容的学习,你同样能够学会处理一些特定场景领域下的方法。话不多说,我们直入正题!
大数据梦想家
2021/01/27
5350
基于 flink 的电商用户行为数据分析【6】|  APP市场推广统计
Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例
我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题。Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。本文将介绍如何在Flink上进行窗口的计算。
PP鲁
2020/02/17
8.2K0
Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例
【Flink】 WaterMark 详解
在设计上 Flink 认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。
857技术社区
2022/05/17
1.3K0
【Flink】 WaterMark 详解
彻底搞清Flink中的Window(Flink版本1.8)
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
create17
2021/04/07
1.6K0
彻底搞清Flink中的Window(Flink版本1.8)
聊聊flink的Allowed Lateness
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java
code4it
2019/01/08
2K0
聊聊flink的Allowed Lateness
Flink实战(七) - Time & Windows编程
掌握Flink中三种常用的Time处理方式,掌握Flink中滚动窗口以及滑动窗口的使用,了解Flink中的watermark。
JavaEdge
2019/07/23
9640
Flink实战(七) - Time & Windows编程
5分钟Flink - 时间语义和Watermark
在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。
Python编程爱好者
2020/09/08
7270
5分钟Flink - 时间语义和Watermark
Flink window
我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题。Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理
awwewwbbb
2022/05/30
1.8K0
Flink window
Flink 的窗口指定者和函数
窗口是处理无限流的核心。窗口拆分将流拆为有限数量数据的bucket,这样就可以应用计算。
前Thoughtworks-杨焱
2021/12/07
8710
一网打尽Flink中的时间、窗口和流Join
首先,我们会学习如何定义时间属性,时间戳和水位线。然后我们将会学习底层操作process function,它可以让我们访问时间戳和水位线,以及注册定时器事件。接下来,我们将会使用Flink的window API,它提供了通常使用的各种窗口类型的内置实现。我们将会学到如何进行用户自定义窗口操作符,以及窗口的核心功能:assigners(分配器)、triggers(触发器)和evictors(清理器)。最后,我们将讨论如何基于时间来做流的联结查询,以及处理迟到事件的策略。
王知无-import_bigdata
2021/09/22
1.9K0
全网最详细4W字Flink全面解析与实践(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中。
BookSea
2023/11/02
1.1K0
全网最详细4W字Flink全面解析与实践(下)
[白话解析] Flink的Watermark机制
对于Flink来说,Watermark是个很难绕过去的概念。本文将从整体的思路上来说,运用感性直觉的思考来帮大家梳理Watermark概念。
罗西的思考
2020/09/07
5.8K1
快速入门Flink (9) —— DataStream API 开发之【Time 与 Window】
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中, 每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
大数据梦想家
2021/01/21
1.1K0
相关推荐
Flink最难知识点再解析 | 时间/窗口/水印/迟到数据处理
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档