本篇主要演练使用Flink-Cep+Groovy+Aviator 来实现一个物联网监控规则中的一个场景案例,后续将会介绍如何实现规则动态变更。
技术背景简介
Flink-Cep 是flink中的高级library,用于进行复杂事件处理,例如某一类事件连续出现三次就触发告警,可以类比Siddhi、Esper;
Groovy 是一种动态脚本语言,可以让用户输入代码变成后台可执行代码,像刷题网站leetcode 应该就是用了这么类似的一个东西;
Aviator 用于执行求值表达式,例如求1>2的值,得到true,为什么用这个东西,也跟后续动态规则变更相关,接下来的案例也会具体介绍。
案例分析
物联网通常都是设备数据,比喻说设备的温度、耗电量等等,会有对设备的监控,例如求设备连续三个点的值大于10且三个点的求和值大于100,要求将这三个点发送到下游进行处理,首先看一下直接使用Flink-Cep api的实现:
在这里使用了一种变相的实现方式,先使用start的Pattern通过times(2) 与 consecutive 来限定连续两个点的值大于10,然后在使用一个next的Pattern, 限定输入数值大于10, 并且求得满足start-Pattern的数据值与当前点数值的和大于100, 最终就会输出我们需要的数据。
但是在实际中,特别是在面向C端用户或者是监控类的每个业务都有自己的监控阈值,因此规则会是一个不断动态变更的过程,通常会定义一个规则模板,模板里面的条件是可动态变更的。用户定义的Pattern在flink里面会被解析成为NFA(代表了一个匹配的流程),NFA生成是不可更改的,所以要想NFA可变,就要求Pattern可动态生成,然后去替换程序里面的NFA,所以我们就需要Groovy这样的脚本语言能够动态生成Pattern对象,对于规则里面的条件value.value>10, 对于规则配置来说就是一个条件表达式,要是条件表达式可执行可使用Aviator。
实现
基于上面的分析,现在思路已经非常清楚了,首先定义一个该场景下的规则模板,也就是Pattern模板是通过Groovy定义的:
在这里面的 _script_、_fieldName_、_sum_ 全部都是参数,需要做变量替换,比喻说
替换成为了
表示从流数据里面value字段要求其值大于10。
解析这个groovy脚本,执行其 getPattern 方法获取我们需要的规则定义对象:
现在重点看一下FilterCondition 定义,表示的一个自定义继承SimpleCondition的实现:
ParseValueFunction 表示的是一个Aviator自定义函数,就是上述提到的getValue函数,它的目的是解析流数据里面的具体字段数值,这里面就是解析value字段的值:
理解了这些之后,在看第二个Pattern条件where2实现就比较清楚了
至此一个简单的Flink-cep+Groovy+Aviator实现已经完成。
总结
本篇以一个简单的demo来介绍Flink-cep+Groovy+Aviator的实现流程,为后续介绍Flink-Cep如何实现动态规则变更打下基础,尽情期待。。。
领取专属 10元无门槛券
私享最新 技术干货