DataStream API主要可为分为三个部分,DataSource模块、Transformation模块以及DataSink模块。
内置DataSource:文件数据源
读取类型(WatchType): 其中WatchType共分为两种模式——PROCESS_CONTINUOUSLY和PROCESS_ONCE模式。在PROCESS_CONTINUOUSLY模式下,一旦检测到文件内容发生变化,Flink会将该文件全部内容加载到Flink系统中进行处理。而在PROCESS_ONCE模式下,当文件内容发生变化时,只会将变化的数据读取至Flink中,在这种情况下数据只会被读取和处理一次
可继承RichSourceFunction实现自定义数据源
所有DataStream的转换操作可分为三类类型:
其中Single-DataStream操作定义了对单个DataStream数据集元素的处理逻辑,Multi-DataStream操作定义了对多个DataStream数据集元素的处理逻
Map [DataStream->DataStream]
FlatMap [DataStream->DataStream]: 该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在经典例子WordCount中,将每一行的文本数据切割,生成单词序列对于输入DataStream[String]通过FlatMap函数进行处理,字符串数字按逗号切割,然后形成新的整数数据集。
Filter [DataStream->DataStream]
KeyBy [DataStream->KeyedStream]: 以下两种数据类型将不能使用KeyBy方法对数据集进行重分区: 用户使用POJOs类型数据,但是POJOs类中没有复写hashCode()方法,而是依赖于Object.hasCode(); 任何数据类型的数组结构。
Reduce [KeyedStream->DataStream]:主要目的是将输入的KeyedStream通过传入的用户自定义的ReduceFunction滚动地进行数据聚合处理,其中定义的ReduceFunciton必须满足运算结合律和交换律
Aggregations[KeyedStream->DataStream]: Aggregations是DataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。其实是将Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、minBy、max、maxBy等,这样就不需要用户自己定义Reduce函数
Union[DataStream ->DataStream]
Connect, CoMap,CoFlatMap[DataStream ->DataStream]:Connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来数据集的数据类型
Split [DataStream->SplitStream]:Split算子是将一个DataStream数据集按照条件进行拆分,形成两个数据集的过程,也是union算子的逆向实现。每个接入的数据都会被路由到一个或者多个输出数据集中。如图4-6所示,将输入数据集根据颜色切分成两个数据集
Select [SplitStream ->DataStream]: split函数本身只是对输入数据集进行标记,并没有将数据集真正的实现切分,因此需要借助Select函数根据标记将数据切分成不同的数据集
Iterate[DataStream->IterativeStream->DataStream]: Iterate算子适合于迭代计算场景
物理分区(Physical Partitioning):
在流式计算框架 Flink 中,可以通过 Sink 进行存储操作。官方给出更推荐的说法是连接器 Connector, 第三方中间件作为连接器,既可以当成数据源,也能当成目的地,取决于实现的接口(SourceFunction/SinkFunction)
官方支持的连接器:
PrintSinkFunction:在日常开发中常使用,通过控制台输出结果进行验证数据是否跟自己预期的一致
自定义 SinkFunction:除了官方支持的 Connector 外,还提供了途径,让我们扩展存储方式,通过 addSink() 方法,添加自定义的 SinkFunction
自定义Sink实现:
Code demo:
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
private PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = MyDruidUtils.getConnection();
String sql = "insert into student(name, age, address) values (?, ?, ?);";
ps = connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(List<Student> value, Context context) throws Exception {
for (Student student : value) {
ps.setString(1, student.getName());
ps.setInt(2, student.getAge());
ps.setString(3, student.getAddress());
ps.addBatch();
}
int[] count = ps.executeBatch();
}
}
DataStream API主要分为三个部分组成:DataSource模块、Transformation模块以及DataSink模块,分别代表数据流处理的不同阶段,可以根据实际需要自定义Source和Sink模块。Transformation模块进行实际逻辑处理,Flink提供了相关的算子来进行数据的处理。