###测试环境 hadoop10伪分布式:flink hdfs
package day160616;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CheckPointTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//------------------------checkpoint-start-------------------
env.enableCheckpointing(1000); //开启checkpoint,每隔1s执行checkpoint操作
//状态后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop10:8020/flink-checkpoint"));
//取消作业时,不删除检查点目录
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig
.ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
//通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
//------------------------checkpoint-end-----------------------
DataStreamSource<String> ds = env.socketTextStream("hadoop10", 9999);
// 将输入数据流转换为 Tuple2<String, Integer> 类型的流,其中第一个字段为城市名称,第二个字段为数字。
// 使用 map 函数将输入数据字符串解析为 Tuple2 对象,并按照城市名称进行分组。
KeyedStream<Tuple2<String, Integer>, String> ds2 = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] arr = value.split(",");
return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
}
}).keyBy(v -> v.f0);
// 在分组后的流上应用 map 函数,使用 RichMapFunction 实现对每个元素的处理逻辑
ds2.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
// 在 open 方法中,初始化一个 ValueState<Integer> 状态变量,用于存储当前城市的最大值。
ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> valueStateDescriptor
= new ValueStateDescriptor<Integer>("vs1", Integer.class);
state = getRuntimeContext().getState(valueStateDescriptor);
}
/**
* 城市名称,数字
* 输入 输出
* 北京,20 北京,20
* 北京,30 北京,30
* 北京,10 北京,30
*/
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
//1.获取状态中的值
//2.状态中有值
// 输入的值和状态的值进行比较大小,谁大返回谁
//3.状态中没有值
// 返回输入的值
//4.更新状态
// 在 map 方法中,首先获取状态中存储的旧值。然后,根据以下条件更新最大值:
// 如果状态中没有值或者输入值大于状态值,则将输入值设为最大值。
// 如果状态中已经有值且输入值小于或等于状态值,则保持状态值不变
Integer oldValue = state.value();
if (oldValue == null || oldValue < value.f1) {
oldValue = value.f1;
}
state.update(oldValue);
return Tuple2.of(value.f0, oldValue);//返回城市名称和最大值作为输出。
}
}).print();
//该代码实现了在流中根据城市名称进行分组,并计算每个城市的最大值。
// 在每个元素到达时,它会检查当前城市的最大值,并更新状态。
// 然后将城市名称和最大值作为输出打印出来。
env.execute();
}
}
将代码打包,上传至hadoop10 /opt/app/flink,jar;
执行命令运行jar包:
[root@hadoop10 app]# flink run -c day160616.CheckPointTest /opt/app/flink.jar Job has been submitted with JobID ee5811b41a5e8c5d7dd052ed78db14b4
;
在webui界面查看运行任务;
代码运行逻辑为:9999端口输入城市,温度
,代码将记录下当前城市的最高温度在stdout进行打印,如绿色框线示意。若下次的温度低于当前的最高温度,则继续输出曾经记录的最高温度。直到新的最高温度高于当前的最高温度,重新记录最高温度输出;
模拟取消任务,验证Checkpoint机制。现在取消该job,然后再重新运行[root@hadoop10 app]# flink run -c day160616.CheckPointTest /opt/app/flink.jar Job has been submitted with JobID 77861182fcb0c82677eabd40629a91ff
;
再次通过9999端口发送测试案例,发现state并没有将设置好的checkpoint镜像读出,而是又重新计算了当前城市的最大值。
关闭该作业,重新输入启动命令[root@hadoop10 app]# flink run -c day160616.CheckPointTest -s hdfs://hadoop10:8020/flink-checkpoint/77861182fcb0c82677eabd40629a91ff/chk-434 /opt/app/flink.jar Job has been submitted with JobID 6a5034e14fb103e594a266ca554e2558
,注意红色框线部分的checkpoint恢复镜像位置以及指令的填写;
再次进入webui的任务管理界面,并且通过9999端口发送新的测试案例。本次checkpoint测试成功,恢复了从上一轮取消的作业中记录的城市状态,zhoukou和zhengzhou的最大值,由绿色框线示意。