摘要处理函数(ProcessFunction)了。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
在我们之前学习的API,不管事聚合、转换或者开窗操作都是基于DataStream进行操作的,我们统称DataSream API. 但是我们知道这些API无法访问时间戳或者当前事件的事件时间。 因此Flink还提供了更低层API让我们直面数据流的基本元素:数据事件、状态、及时间让我们对流有完全的控制权,我们称这一层接口叫“处理函数”(ProcessFunction)
处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。 同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。 用法:stream.process(new MyProcessFunction()) 调用process方法传入一个 ProcessFunction 作为参数,用来定义处理逻辑。
Flink提供了8个不同的处理函数:
ProcessFunction
KeyedProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
注意:注意定时器timer 只能在keyed streams 上面使用。
处理函数提供了两个方法:
抽象方法:public abstract void processElement(I value, Context ctx, Collector<O> out)
每个元素进来都会调用一次 value输入的值,ctx上下文可以获取时间用来注册定时器,out用来输出
非抽象方法:public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
timestamp时间戳,触发的时间如果是事件语义就是水位线
TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。 这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个key注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。 利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多1秒一次。 这里注意定时器的时间戳必须是毫秒数,所以我们得到整秒之后还要乘以1000。定时器默认的区分精度是毫秒。
下面是一个KeyedProcessFunction的案例:10s温度连续上升就预警
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
K表示分组的类型
I表示输入的类型
O表示输出的类型
package _8processFunction;
import dto.SensorReadingDTO;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import util.DateUtil;
//10s温度连续上升就预警
public class ProcessFunction_1_KeyedProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 10008);
DataStream<SensorReadingDTO> dataStream = streamSource
.map(new MapFunction<String, SensorReadingDTO>() {
@Override
public SensorReadingDTO map(String input) throws Exception {
if (StringUtils.isNotBlank(input)) {
String[] infoArray = input.split(",");
SensorReadingDTO sensorReadingDTO = new SensorReadingDTO();
sensorReadingDTO.setId(infoArray[0]);
sensorReadingDTO.setTimestamp(Long.valueOf(infoArray[1]) * 1000);
sensorReadingDTO.setTemperature(Double.valueOf(infoArray[2]));
sensorReadingDTO.setTimestampStr(DateUtil.format(sensorReadingDTO.getTimestamp()));
return sensorReadingDTO;
}
return null;
}
});
dataStream.keyBy(SensorReadingDTO::getId).process(new MyProcess(10000)).print();
env.execute();
}
public static class MyProcess extends KeyedProcessFunction<String, SensorReadingDTO, String> {
private Integer interval;
private ValueState<Double> tempValueState;
private ValueState<Long> timerTimestampValueState;//只是为了清除定时器的时候用
public MyProcess(Integer interval) {
this.interval = interval;
}
@Override
public void processElement(SensorReadingDTO sensorReadingDTO, Context context,
Collector<String> collector) throws Exception {
Double curTemp = sensorReadingDTO.getTemperature();
double lastTemp = tempValueState.value() != null ? tempValueState.value() : curTemp;
if (lastTemp > curTemp) {
//温度出现下降 重新计算,所以删除定时器(但是温度还要设置)
context.timerService().deleteProcessingTimeTimer(timerTimestampValueState.value());
timerTimestampValueState.clear();
} else if (lastTemp <= curTemp && timerTimestampValueState.value() == null) {
//温度上升,没有定时器要注册 如 温度 10 20 30 20的时候是不用注册定时器的
long warningTimestamp = context.timerService().currentProcessingTime() + interval;
context.timerService().registerProcessingTimeTimer(warningTimestamp);
timerTimestampValueState.update(warningTimestamp);
}
tempValueState.update(curTemp);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
throws Exception {
out.collect("传感器" + ctx.getCurrentKey() + "温度值连续" + interval + "ms上升");
timerTimestampValueState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
tempValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("last-temp", Double.class));
timerTimestampValueState = getRuntimeContext()
.getState(new ValueStateDescriptor("timer_timestamp", Long.class));
}
@Override
public void close() throws Exception {
tempValueState.clear();
timerTimestampValueState.clear();
}
}
}