首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何使用Trigger和Evictor测试Flink全局窗口

基础概念

Flink全局窗口(Global Window):全局窗口是一种没有明确时间边界的窗口类型,所有事件都会被分配到同一个全局窗口中。全局窗口通常用于需要处理所有数据的场景,例如统计所有数据的总数。

Trigger:触发器定义了窗口何时应该进行计算和输出结果。对于全局窗口,由于没有明确的时间边界,需要自定义触发器来决定何时进行计算。

Evictor:驱逐器定义了在窗口计算之前应该移除哪些数据。它可以用于控制窗口中数据的数量或时间范围。

相关优势

  • 灵活性:通过自定义触发器和驱逐器,可以灵活地控制窗口的计算时机和数据范围。
  • 精确控制:可以根据具体需求定制窗口的行为,例如在特定条件下进行计算或移除旧数据。

类型

  • 自定义Trigger:可以根据数据的时间戳或其他条件来触发窗口计算。
  • 自定义Evictor:可以根据数据的时间戳或其他条件来移除窗口中的数据。

应用场景

  • 实时统计:例如统计所有数据的总数或平均值。
  • 异常检测:通过全局窗口和自定义触发器,可以在特定条件下检测异常数据。

示例代码

以下是一个使用自定义Trigger和Evictor测试Flink全局窗口的示例代码:

代码语言:txt
复制
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class GlobalWindowExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> events = env.fromElements(
                new Event(1, System.currentTimeMillis()),
                new Event(2, System.currentTimeMillis() + 1000),
                new Event(3, System.currentTimeMillis() + 2000)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        events
                .windowAll(GlobalWindows.create())
                .trigger(new CustomTrigger())
                .evictor(new CustomEvictor())
                .reduce(new ReduceFunction<Event>() {
                    @Override
                    public Event reduce(Event value1, Event value2) throws Exception {
                        return new Event(value1.getId() + value2.getId(), value1.getTimestamp());
                    }
                })
                .print();

        env.execute("Global Window Example");
    }

    public static class Event {
        private int id;
        private long timestamp;

        public Event(int id, long timestamp) {
            this.id = id;
            this.timestamp = timestamp;
        }

        public int getId() {
            return id;
        }

        public long getTimestamp() {
            return timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "id=" + id +
                    ", timestamp=" + timestamp +
                    '}';
        }
    }

    public static class CustomTrigger extends Trigger<Event, TimeWindow> {
        @Override
        public TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            if (timestamp >= window.maxTimestamp() - 1000) {
                return TriggerResult.FIRE;
            }
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        }
    }

    public static class CustomEvictor implements Evictor<Event, TimeWindow> {
        @Override
        public void evictBefore(Iterable<Event> elements, int size, TimeWindow window, EvictorContext evictorContext) {
            // 移除窗口中时间戳小于当前时间减去2秒的数据
            long cutoff = System.currentTimeMillis() - 2000;
            elements.forEach(event -> {
                if (event.getTimestamp() < cutoff) {
                    elements.remove(event);
                }
            });
        }

        @Override
        public void evictAfter(Iterable<Event> elements, int size, TimeWindow window, EvictorContext evictorContext) {
        }
    }
}

参考链接

常见问题及解决方法

问题1:全局窗口计算结果不正确

原因:可能是由于触发器或驱逐器的逻辑不正确,导致窗口计算时机或数据范围不符合预期。

解决方法:仔细检查自定义触发器和驱逐器的逻辑,确保它们按照预期工作。可以通过打印日志或调试来验证触发器和驱逐器的行为。

问题2:窗口计算延迟过高

原因:可能是由于触发器的条件过于严格,导致窗口计算被频繁触发,或者驱逐器移除的数据过多,导致窗口中数据量过大。

解决方法:调整触发器的条件,使其在合适的时机触发计算。同时,合理设置驱逐器的逻辑,确保窗口中保留必要的数据。

通过以上方法,可以有效地测试和使用Flink全局窗口,并解决相关问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Flink window

该 function 决定如何计算窗口中的内容, 而 Trigger 决定何时窗口中的数据可以被 function 计算 也可以指定一个 Evictor ),在 trigger 触发之后,Evictor...在代码中,Flink 处理基于时间的窗口使用的是 TimeWindow, 它有查询开始结束 timestamp 以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()...clear() 方法处理在对应窗口被移除时所需的逻辑。 Evictors Flink窗口模型允许在 WindowAssigner Trigger 之外指定可选的 Evictor。...Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素 Flink 内置有三个 evictor: CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量...而使用 ProcessWindowFunction 需要累积窗口中所有的元素 使用 Evictor 可以避免预聚合, 因为窗口中的所有数据必须先经过 evictor 才能进行计算 Reference

1.6K20
  • 彻底搞清Flink中的Window(Flink版本1.8)

    WindowAssigner负责将每个传入数据元分配给一个或多个窗口Flink带有预定义的窗口分配器,用于最常见的用例 即翻滚窗口, 滑动窗口,会话窗口全局窗口。...使用时,我们要设置SlideSize。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。...Evictor 它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前 Flink窗口模型允许在窗口分配器触发器之外指定一个可选的驱逐器(Evictor...对于CountWindow,我们可以直接使用已经定义好的Trigger:CountTrigger trigger(CountTrigger.of(2)) Evictor(可选) 驱逐者,即保留上一window...最简单的情况,如果业务不是特别复杂,仅仅是基于TimeCount,我们其实可以用系统定义好的WindowAssigner以及TriggerEvictor来实现不同的组合: window 出现数据倾斜

    1.4K40

    Flink DataStream多样化

    Global Windows:全局的window,默认永远不触发窗口,需要自定义Trigger来触发窗口 Evictor 在我们的WindowedStream中我们可以看到一个evictor方法,...TimeEvictor:设定一个阀值interval,删除窗口内小于最大时间戳(本窗口内)-interval的元素 Trigger 在我们的WindowedStream中我们可以看到一个trigger...方法,该方法主要用来判断是一个窗口是否需要被触发,每个WindowsAssigner都自带一个默认的TriggerTrigger的 定义如下: public abstract class Trigger...FIRE_AND_PURGE:触发窗口,然后销毁窗口 TimeWaterMark 之前我们已经说过在Flink中对Time进行了精细划分: EventTime:事件发生的时间 ProcessingTime...某个时间戳以前的数据我都收到了,由于我们的WaterMark也只是一个估计值,因此即使设置了WaterMark,也有可能收到之前的数据(这些数据称为late elements),Flink中可以使用以下方法来处理这些数据

    25210

    Flink Window&Time 原理

    Timestamp 的抽取 如果你指定 Flink 需要使用 EventTime,那么你就需要在 WatermarkStrategy 策略中通过 withTimestampAssigner 指定如何从你的事件中抽取出...实际上是全局并行度为1的窗口(即便你手动指定多并行度也是无效的) 一个完整的 WindowStream 的处理流程大概是这样的,数据经过 assigner 的挑选进入对应的窗口,经过 trigger...除此之外,滚动窗口还实现好了一个默认的 Trigger 触发器 EventTimeTrigger,也就是说使用滚动窗口默认不需要再指定触发器了,至于触发器是什么待会儿会介绍,这里只是需要知道它是有默认触发器实现的...通过使用 GlobalWindows 来指定使用全局窗口,需要注意的是:全局窗口没有默认的触发器,也就是数据默认永远不会触发。 所以,如果需要用到全局窗口,一定记得指定窗口触发器。...Evictors Flink窗口模型允许在 WindowAssigner Trigger 之外指定可选的 Evictor,在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素,我们也称它为剔除器

    57330

    Flink Watermark 机制及总结

    前言 Flink 水印机制,简而言之,就是在 Flink 使用 Event Time 的情况下,窗口处理事件乱序事件延迟的一种设计方案。...本文从基本的概念入手,来看下 Flink 水印机制的原理使用方式。...WindowAssigner 负责将每一个到来的元素分配给一个或者多个窗口(window), Flink 提供了一些常用的预定义的窗口分配器,即:滚动窗口、滑动窗口、会话窗口全局窗口。...3.窗口驱逐器(Evictor) Flink窗口模型允许指定一个除了 WindowAssigner Trigger 之外的可选参数 Evitor,这个可以通过调用 evitor(...)...先后介绍了 Time 的类型,Windows 的组成,Event Time Watermark 的使用场景方式,重点是 Watermark 的设计方案如何解决窗口处理事件乱序事件延迟的问题。

    1.8K00

    Flink深度学习流处理核心组件 Time&Window 深度解析

    WindowAssigner、Trigger Evictor;Window 中怎么处理乱序数据,乱序数据是否允许延迟,以及怎么处理迟到的数据;整个 Window 的数据流程,以及 Window 中怎么保证...下面我们重点来看 window(), evictor() trigger() 这几个方法。...,更详细的描述可以参考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore evicAfter 两个方法。...中指定是允许延迟的最大时间(默认为 0),可以使用下面的代码进行设置 设置allowedLateness 之后,迟来的数据同样可以触发窗口,进行输出,利用 Flink 的 side output 机制...至此,Time、Window 相关的所有内容都已经讲解完毕,主要包括为什么要有 Window;Window 中的三个核心组件:WindowAssigner、Trigger Evictor;Window

    32620

    彻底搞清 Flink 中的 Window 机制

    ,则触发上个窗口的计算 三、WindowAPI 3.1 windowwindowAll 使用keyby的流,应该使用window方法 未使用keyby的流,应该调用windowAll方法 区别:...3.3 evictor evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行 用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor...3.4 trigger trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger, 如果默认的 trigger 不能满足你的需求...PURGE 清空整个 window 的元素并销毁窗口 四、WindowAPI调用案例示例 4.1 基于时间的滚动滑动窗口 测试数据 信号灯编号通过该信号灯的车的数量 9,3 9,2 9,7 4,9...测试数据 信号灯编号通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 package

    1.1K40

    Flink Watermark 机制及总结

    作者:黄龙,腾讯 CSIG 高级工程师 Flink Watermark 前言 Flink 水印机制,简而言之,就是在 Flink 使用 Event Time 的情况下,窗口处理事件乱序事件延迟的一种设计方案...本文从基本的概念入手,来看下 Flink 水印机制的原理使用方式。...WindowAssigner 负责将每一个到来的元素分配给一个或者多个窗口(window), Flink 提供了一些常用的预定义的窗口分配器,即:滚动窗口、滑动窗口、会话窗口全局窗口。...3.窗口驱逐器(Evictor) Flink窗口模型允许指定一个除了 WindowAssigner Trigger 之外的可选参数 Evitor,这个可以通过调用 evitor(...)...先后介绍了 Time 的类型,Windows 的组成,Event Time Watermark 的使用场景方式,重点是 Watermark 的设计方案如何解决窗口处理事件乱序事件延迟的问题。

    1.4K30

    2021年大数据Flink(十八):Flink Window操作

    个数据统计最近xx个数据 ​​​​​​​按照slidesize分类 窗口有两个重要的属性: 窗口大小size滑动间隔slide,根据它们的大小关系可分为: tumbling-window:滚动窗口...--用的较少 注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算 Window的API windowwindowAll...evictor--了解 evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行 用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor...Flink 提供了如下三种通用的 evictor: * CountEvictor 保留指定数量的元素 * TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval...trigger--了解 trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger, 如果默认的 trigger 不能满足你的需求,则可以自定义一个类

    88410

    Flink 窗口之Window机制

    但是,Apache Flink 作为一个为生产环境而生的流处理器,具有易于使用并且表达能力很强的 API 来定义高级流分析程序。...剖析Flink窗口机制 Flink 的内置 Time Windows Count Windows 覆盖了各种常见的窗口用例。...WindowAssigner 将元素分配给一个或多个窗口,也可能会创建新的窗口窗口本身只是一系列元素的标识符,并且可以提供一些可选的元信息,例如,在使用 TimeWindow 时的开始结束时间。...请注意,在清除窗口之前,窗口会一值消耗内存。 触发 Trigger 时,可以将窗口元素列表提供给可选的 Evictor。...如果没有定义 Evictor,则 Trigger 直接将所有窗口元素交给窗口计算函数。 窗口计算函数接收一个窗口的元素(可能先由 Evictor 进行过滤),并为该窗口计算一个或多个结果元素。

    1.3K20

    写给大忙人看的 Flink Window原理

    Window 可以说是 Flink 中必不可少的 operator 之一,在很多场合都有很非凡的表现。今天呢,我们就一起来看一下 window 是如何实现的。...window operator evictor evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考 org.apache.flink.streaming.api.windowing.evictors.Evictor...的 evicBefore evicAfter 两个方法。...trigger trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,...FIRE_AND_PURGE 触发窗口,然后销毁窗口 window code package org.apache.flink.streaming.connectors.kafka; import

    70720

    Flink学习笔记

    Flink 基础 Flink特性 流式计算是大数据计算的痛点,第1代实时计算引擎Storm对Exactly Once 语义窗口支持较弱,使用的场景有限且无法支持高吞吐计算;Spark Streaming...是必须要指定的属性; Window Assigner用来决定某个元素被分配到哪个/哪些窗口中去;Trigger触发器决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自己的TriggerEvictor...驱逐者在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。...一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。...机制,分布式快照可以将同一时间点的Task/Operator状态数据全局统一快照处理,包括Keyed StateOperator State Savepoints是检查点的一种特殊实现,底层使用CheckPoint

    93410
    领券