本文已收录至Github,推荐阅读 👉 Java随想录
接前面中篇,此为下篇。
在Spark中有DataFrame这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。Flink也提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API以及SQL能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套API编写流式应用和批量应用,从而达到真正意义的批流统一
在 Flink 1.8 架构里,如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。 Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,阿里巴巴的 Blink 团队在这方面做了大量的工作,已经实现了 Table API & SQL 层的流批统一。阿里巴巴已经将 Blink 开源回馈给 Flink 社区。
在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能,取名叫: Blink Planner。在使用Table API和SQL开发Flink应用之前,通过添加Maven的依赖配置到项目中,在本地工程中引入相应的依赖库,库中包含了Table API和SQL接口。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
和DataStream API一样,Table API和SQL中具有相同的基本编程模型。首先需要构建对应的TableEnviroment创建关系型编程环境,才能够在程序中使用Table API和SQL来编写应用程序,另外Table API和SQL接口可以在应用中同时使用,Flink SQL基于Apache Calcite框架实现了SQL标准协议,是构建在Table API之上的更高级接口。
首先需要在环境中创建TableEnvironment对象,TableEnvironment中提供了注册内部表、执行Flink SQL语句、注册自定义函数等功能。根据应用类型的不同,TableEnvironment创建方式也有所不同,但是都是通过调用create()方法创建
流计算环境下创建TableEnviroment:
//创建流式计算的上下文环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(env)
Table API 顾名思义,就是基于“表”(Table)的一套 API,专门为处理表而设计的,它提供了关系型编程模型,可以用来处理结构化数据,支持表和视图的概念。在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了,非常实用。
下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从CSV文件中读取数据,然后执行简单的查询并将结果写入到另一个CSV文件中。
首先我们需要导入maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
</dependency>
代码示例如下:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
public class TableAPIExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
DataSet<Tuple2<String, Integer>> data = env.readCsvFile("input.csv")
.includeFields("11")
.types(String.class, Integer.class);
Table table = tableEnv.fromDataSet(data, "name, age");
tableEnv.createTemporaryView("people", table);
Table result = tableEnv.sqlQuery("SELECT name, age FROM people WHERE age > 30");
DataSet<Tuple2<String, Integer>> output = tableEnv.toDataSet(result, Tuple2.class);
output.writeAsCsv("output.csv");
env.execute();
}
}
在这个例子中,使用readCsvFile
方法从CSV文件中读取数据,并使用includeFields
和types
方法指定要包含的字段和字段类型。接下来,使用fromDataSet
方法将数据集转换为表,并使用createTemporaryView
方法创建一个临时视图。然后,使用sqlQuery
方法执行SQL查询,并使用toDataSet
方法将结果转换为数据集。最后,使用writeAsCsv
方法将结果写入到CSV文件中,并使用execute
方法启动执行。
除了上面这种写法外,我们还有下面2种写法:
//这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明
//“$”符号用来指定表中的一个字段。代码和直接执行SQL是等效的。
Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"),$("user"))
//这其实是一种简略的写法,我们将 Table 对象名 eventTable 直接以字符串拼接的形式添加到 SQL 语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视图的步骤。
Table clickTable = tableEnvironment.sqlQuery("select url, user from " +eventTable);
在环境中注册之后,我们就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
得到的 newTable 是一个中间转换结果,如果之后又希望直接使用这个表执行 SQL,又该怎么做呢?由于 newTable 是一个 Table 对象,并没有在表环境中注册;所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用:
tableEnv.createTemporaryView("NewTable", newTable);
这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图” (createTemporaryView)。
// 将表转换成数据流,并打印
tableEnv.toDataStream(aliceVisitTable).print();
// 将数据流转换成表。
// 另外,我们还可以在 fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));
在Flink中,动态表(Dynamic Tables)是一种特殊的表,它可以随时间变化。它们通常用于表示无限流数据,例如事件流或服务器日志。与静态表不同,动态表可以在运行时插入、更新和删除行。
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作持续查询(Continuous Query)。
下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从Kafka主题中读取数据,然后执行持续查询并将结果写入到另一个Kafka主题中。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class DynamicTableExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE input (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'input-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
tableEnv.executeSql("CREATE TABLE output (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'output-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
Table result = tableEnv.sqlQuery("SELECT name, age FROM input WHERE age > 30");
tableEnv.toAppendStream(result, Row.class).print();
result.executeInsert("output");
env.execute();
}
}
在这个例子中,首先创建了一个StreamExecutionEnvironment
来设置执行环境,并使用StreamTableEnvironment.create
方法创建了一个StreamTableEnvironment
。然后,使用executeSql
方法创建了两个Kafka表:一个用于读取输入数据,另一个用于写入输出数据。接下来,使用sqlQuery
方法执行持续查询,并使用toAppendStream
方法将结果转换为数据流。最后,使用executeInsert
方法将结果写入到输出表中,并使用execute
方法启动执行。
在 Table API编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。
其中最简单的当然就是连接到控制台打印输出:
CREATE TABLE ResultTable (
user STRING,
cnt BIGINT
WITH (
'connector' = 'print'
);
需要导入maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
创建一个连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为 Kafka,并定义必要的配置参数。
CREATE TABLE KafkaTable (
`user` STRING,
`url` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
创建 JDBC 表的方法与前面Kafka 大同小异:
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;
在Flink中创建一张表有两种方法:
Table API中已经提供了TableSource从外部系统获取数据,例如常见的数据库、文件系统和Kafka消息队列等外部系统。
Flink允许用户从本地或者分布式文件系统中读取和写入数据,在Table API中可以通过CsvTableSource类来创建,只需指定相应的参数即可。但是文件格式必须是CSV格式的。其他文件格式也支持(在Flink还有Connector的来支持其他格式或者自定义TableSource)
//创建流式计算的上下文环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建Table API的上下文环境
val tableEvn = StreamTableEnvironment.create(env)
val source = new CsvTableSource("D:\\code\\StudyFlink\\data\\tableexamples"
, Array[String]("id", "name", "score")
, Array(Types.INT, Types.STRING, Types.DOUBLE)
)
//将source注册成一张表 别名:exampleTab
tableEvn.registerTableSource("exampleTab",source)
tableEvn.scan("exampleTab").printSchema()
代码最后不需要env.execute(),这并不是一个流式计算任务
前面已经知道Table API是构建在DataStream API和DataSet API之上的一层更高级的抽象,因此用户可以灵活地使用Table API将Table转换成DataStream或DataSet数据集,也可以将DataSteam或DataSet数据集转换成Table,这和Spark中的DataFrame和RDD的关系类似
Flink支持把自定义POJOs类的所有case类的属性名字变成字段名,也可以通过基于字段偏移位置和字段名称两种方式重新修改:
//导入table库中的隐式转换
import org.apache.flink.table.api.scala._
// 基于位置重新指定字段名称为"field1", "field2", "field3"
val table = tStreamEnv.fromDataStream(stream, 'field1, 'field2, 'field3)
// 将DataStream转换成Table,并且将字段名称重新成别名
val table: Table = tStreamEnv.fromDataStream(stream, 'rowtime as 'newTime, 'id as 'newId,'variable as 'newVariable)
注意:要导入隐式转换。如果使用as 修改字段,必须修改表中所有的字段。
在Table对象上使用select操作符查询需要获取的指定字段,也可以使用filter或where方法过滤字段和检索条件,将需要的数据检索出来。
object TableAPITest {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//初始化Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(streamEnv)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val data = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
val table: Table = tableEvn.fromDataStream(data)
//查询
tableEvn.toAppendStream[Row](
table.select('sid,'callType as 'type,'callTime,'callOut))
.print()
//过滤查询
tableEvn.toAppendStream[Row](
table.filter('callType==="success") //filter
.where('callType==="success")) //where
.print()
tableEvn.execute("sql")
}
其中toAppendStream函数是吧Table对象转换成DataStream对象。
举例:我们统计每个基站的日志数量。
val table: Table = tableEvn.fromDataStream(data)
tableEvn.toRetractStream[Row](
table.groupBy('sid).select('sid, 'sid.count as 'logCount))
.filter(_._1==true) //返回的如果是true才是Insert的数据
.print()
在代码中可以看出,使用toAppendStream和toRetractStream方法将Table转换为DataStreamT数据集,T可以是Flink自定义的数据格式类型Row,也可以是用户指定的数据格式类型。在使用toRetractStream方法时,返回的数据类型结果为DataStream(Boolean,T),Boolean类型代表数据更新类型,True对应INSERT操作更新的数据,False对应DELETE操作更新的数据。
用户可以在Table API中自定义函数类,常见的抽象类和接口是:
案例:使用Table完成基于流的WordCount
object TableAPITest2 {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//初始化Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(streamEnv)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888)
val table: Table = tableEvn.fromDataStream(stream,'words)
var my_func =new MyFlatMapFunction()//自定义UDF
val result: Table = table.flatMap(my_func('words)).as('word, 'count)
.groupBy('word) //分组
.select('word, 'count.sum as 'c) //聚合
tableEvn.toRetractStream[Row](result)
.filter(_._1==true)
.print()
tableEvn.execute("table_api")
}
//自定义UDF
class MyFlatMapFunction extends TableFunction[Row]{
//定义类型
override def getResultType: TypeInformation[Row] = {
Types.ROW(Types.STRING, Types.INT)
}
//函数主体
def eval(str:String):Unit ={
str.trim.split(" ")
.foreach({word=>{
var row =new Row(2)
row.setField(0,word)
row.setField(1,1)
collect(row)
}})
}
}
}
Flink支持ProcessTime、EventTime和IngestionTime三种时间概念,针对每种时间概念,Flink Table API中使用Schema中单独的字段来表示时间属性,当时间字段被指定后,就可以在基于时间的操作算子中使用相应的时间属性。
在Table API中通过使用.rowtime来定义EventTime字段,在ProcessTime时间字段名后使用.proctime后缀来指定ProcessTime时间属性.
案例:统计最近5秒钟,每个基站的呼叫数量
object TableAPITest {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//指定EventTime为时间语义
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(1)
//初始化Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(streamEnv)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val data = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
.assignTimestampsAndWatermarks( //引入Watermark
new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)){//延迟2秒
override def extractTimestamp(element: StationLog) = {
element.callTime
}
})
//设置时间属性
val table: Table = tableEvn.fromDataStream(data,'sid,'callOut,'callIn,'callType,'callTime.rowtime)
//滚动Window ,第一种写法
val result: Table = table.window(Tumble over 5.second on 'callTime as 'window)
//第二种写法
val result: Table = table.window(Tumble.over("5.second").on("callTime").as("window"))
.groupBy('window, 'sid)
.select('sid, 'window.start, 'window.end, 'window.rowtime, 'sid.count)
//打印结果
tableEvn.toRetractStream[Row](result)
.filter(_._1==true)
.print()
tableEvn.execute("sql")
}
}
上面的案例是滚动窗口,如果是滑动窗口也是一样,代码如下:
//滑动窗口,窗口大小为:10秒,滑动步长为5秒 :第一种写法
table.window(Slide over 10.second every 5.second on 'callTime as 'window)
//滑动窗口第二种写法 table.window(Slide.over("10.second").every("5.second").on("callTime").as("window"))
企业中Flink SQL比Table API用的多。
Flink SQL 是 Apache Flink 提供的一种使用 SQL 查询和处理数据的方式。它允许用户通过 SQL 语句对数据流或批处理数据进行查询、转换和分析,无需编写复杂的代码。Flink SQL 提供了一种更直观、易于理解和使用的方式来处理数据,同时也可以与 Flink 的其他功能无缝集成。
Flink SQL 支持 ANSI SQL 标准,并提供了许多扩展和优化来适应流式处理和批处理场景。它能够处理无界数据流,具备事件时间和处理时间的语义,支持窗口、聚合、连接等常见的数据操作,还提供了丰富的内置函数和扩展插件机制。
下面是一个简单的 Flink SQL 代码示例,展示了如何使用 Flink SQL 对流式数据进行查询和转换。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkSqlExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 设置并行度为1,方便观察输出结果
// 创建 Kafka 数据源
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
DataStream<String> sourceStream = env.addSource(kafkaConsumer);
// 注册数据源表
env.createTemporaryView("source_table", sourceStream, "message");
// 执行 SQL 查询和转换
String query = "SELECT message, COUNT(*) AS count FROM source_table GROUP BY message";
DataStream<Result> resultStream = env.sqlQuery(query).map(value -> new Result(value.getField(0), value.getField(1)));
// 打印结果
resultStream.print();
env.execute("Flink SQL Example");
}
// 自定义结果类
public static class Result {
public String message;
public Long count;
public Result() {}
public Result(String message, Long count) {
this.message = message;
this.count = count;
}
@Override
public String toString() {
return "Result{" +
"message='" + message + '\'' +
", count=" + count +
'}';
}
}
}
在上述示例中,我们使用 Apache Kafka 作为数据源,并创建了一个消费者从名为 "input-topic" 的 Kafka 主题中读取数据。然后,我们将数据流注册为名为 "source_table" 的临时表。
接下来,我们使用 Flink SQL 执行 SQL 查询和转换。在这个例子中,我们查询 "source_table" 表,对 "message" 字段进行分组并计算每个消息出现的次数。查询结果会映射到自定义的 Result
类,并最终通过 print()
方法打印到标准输出。
最后,我们通过调用 env.execute()
方法来启动 Flink 作业的执行。
复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。
CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。
在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等。
Flink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:
定义Pattern可以是单次执行模式,也可以是循环执行模式。单词执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的
val start = Pattern.begin[Event]("start_pattern")
下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。
start.where(_.getCallType == "success")
对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。
//指定循环触发4次
start.times(4);
//可以执行触发次数范围,让循环执行次数在该范围之内
start.times(2, 4);
start.times(4).optional();
start.times(2, 4).optional();
//触发2、3、4次,尽可能重复执行
start.times(2, 4).greedy();
//触发0、2、3、4次,尽可能重复执行
start.times(2, 4).optional().greedy();
// 触发一次或者多次
start.oneOrMore();
//触发一次或者多次,尽可能重复执行
start.oneOrMore().greedy();
// 触发0次或者多次
start.oneOrMore().optional();
// 触发0次或者多次,尽可能重复执行
start.oneOrMore().optional().greedy();
// 触发两次或者多次
start.timesOrMore(2);
// 触发两次或者多次,尽可能重复执行
start.timesOrMore(2).greedy();
// 不触发或者触发两次以上,尽可能重复执行
start.timesOrMore(2).optional().greedy();
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。
// 把通话成功的事件挑选出来
start.where(_.getCallType == "success")
// 把通话成功,或者通话时长大于10秒的事件挑选出来
val start = Pattern.begin[StationLog]("start_pattern")
.where(_.callType=="success")
.or(_.duration>10)
pattern.oneOrMore.until(_.callOut.startsWith("186"))
将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。
val strict: Pattern[Event] = start.next("middle").where(...)
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
.notNext() —— 不想让某个事件严格紧邻前一个事件发生。
.notFollowedBy() —— 不想让某个事件在两个事件之间发生。
注意:
//指定模式在10秒内有效
pattern.within(Time.seconds(10));
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
//cep 做模式检测
val patternStream = CEP.pattern[EventLog](dataStream.keyBy(_.id),pattern)
得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。
通过Select Funciton抽取正常事件
可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, IterableIN],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。
def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {
//获取pattern中的startEvent
val startEvent = pattern.get("start_pattern").get.next
//获取Pattern中middleEvent
val middleEvent = pattern.get("middle").get.next
//返回结果
OUT(startEvent, middleEvent)}
通过Flat Select Funciton抽取正常事件
Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = { //获取pattern中startEvent
val startEvent = pattern.get("start_pattern").get.next
//获取Pattern中middleEvent
val middleEvent = pattern.get("middle").get.next
//并根据startEvent的Value数量进行返回
for (i <- 0 to startEvent.getValue) {
collector.collect(OUT(startEvent, middleEvent))
}}
通过Select Funciton抽取超时事件
如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。
// 通过CEP.pattern方法创建
PatternStream val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) //创建OutputTag,并命名为timeout-output
val timeoutTag = OutputTag[String]("timeout-output")
//调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] = patternStream.select(timeoutTag){
//超时事件获取
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>
TimeoutEvent()//返回异常事件
} {
//正常事件获取
pattern: Map[String, Iterable[Event]] =>
NormalEvent()
//返回正常事件
}
//调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag)
在大数据领域,大多数开源框架(Hadoop、Spark、Flink)都是基于JVM运行,但是JVM的内存管理机制往往存在着诸多类似OutOfMemoryError的问题,主要是因为创建过多的对象实例而超过JVM的最大堆内存限制,却没有被有效回收掉,这在很大程度上影响了系统的稳定性,尤其对于大数据应用,面对大量的数据对象产生,仅仅靠JVM所提供的各种垃圾回收机制很难解决内存溢出的问题。在开源框架中有很多框架都实现了自己的内存管理,例如Apache Spark的Tungsten项目,在一定程度上减轻了框架对JVM垃圾回收机制的依赖,从而更好地使用JVM来处理大规模数据集。
Flink也基于JVM实现了自己的内存管理,将JVM根据内存区分为Unmanned Heap、Flink Managed Heap、Network Buffers三个区域。在Flink内部对Flink Managed Heap进行管理,在启动集群的过程中直接将堆内存初始化成Memory Pages Pool,也就是将内存全部以二进制数组的方式占用,形成虚拟内存使用空间。新创建的对象都是以序列化成二进制数据的方式存储在内存页面池中,当完成计算后数据对象Flink就会将Page置空,而不是通过JVM进行垃圾回收,保证数据对象的创建永远不会超过JVM堆内存大小,也有效地避免了因为频繁GC导致的系统稳定性问题。
JobManager在Flink系统中主要承担管理集群资源、接收任务、调度Task、收集任务状态以及管理TaskManager的功能,JobManager本身并不直接参与数据的计算过程中,因此JobManager的内存配置项不是特别多,只要指定JobManager堆内存大小即可。
jobmanager.heap.size:设定JobManager堆内存大小,默认为1024MB。
TaskManager作为Flink集群中的工作节点,所有任务的计算逻辑均执行在TaskManager之上,因此对TaskManager内存配置显得尤为重要,可以通过以下参数配置对TaskManager进行优化和调整。
Flink将JVM堆内存切分为三个部分,其中一部分为Network Buffers内存。Network Buffers内存是Flink数据交互层的关键内存资源,主要目的是缓存分布式数据处理过程中的输入数据。。通常情况下,比较大的Network Buffers意味着更高的吞吐量。如果系统出现“Insufficient number of network buffers”的错误,一般是因为Network Buffers配置过低导致,因此,在这种情况下需要适当调整TaskManager上Network Buffers的内存大小,以使得系统能够达到相对较高的吞吐量。
目前Flink能够调整Network Buffer内存大小的方式有两种:一种是通过直接指定Network Buffers内存数量的方式,另外一种是通过配置内存比例的方式。
直接设定Nework Buffer数量需要通过如下公式计算得出:
NetworkBuffersNum = total-degree-of-parallelism \* intra-node-parallelism * n
其中total-degree-of-parallelism表示每个TaskManager的总并发数量,intra-node-parallelism表示每个TaskManager输入数据源的并发数量,n表示在预估计算过程中Repar-titioning或Broadcasting操作并行的数量。intra-node-parallelism通常情况下与Task-Manager的所占有的CPU数一致,且Repartitioning和Broadcating一般下不会超过4个并发。可以将计算公式转化如下:
NetworkBuffersNum = <slots-per-TM>^2 \* < TMs>* 4
其中slots-per-TM是每个TaskManager上分配的slots数量,TMs是TaskManager的总数量。对于一个含有20个TaskManager,每个TaskManager含有8个Slot的集群来说,总共需要的Network Buffer数量为8^2*204=5120个,因此集群中配置Network Buffer内存的大小约为160M较为合适。
计算完Network Buffer数量后,可以通过添加如下两个参数对Network Buffer内存进行配置。其中segment-size为每个Network Buffer的内存大小,默认为32KB,一般不需要修改,通过设定numberOfBuffers参数以达到计算出的内存大小要求。
从1.3版本开始,Flink就提供了通过指定内存比例的方式设置Network Buffer内存大小。
本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。文章持续更新,可以关注公众号第一时间阅读。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。