为了保证程序的容错恢复以及程序启动时其状态恢复,几乎所有的 Flink 实时任务都会开启 Checkpoint 或者触发 Savepoint 进行状态保存。为了使得用户更加理解这两点区别,本文结合 Flink 1.9 版本,重点讲述 Flink Checkpoint,Savepoint 相关概念以及注意事项,使得用户能够更好的开发实时任务。
首先,为什么会在文章开头对这两点进行介绍,因为有时候用户在开发实时任务时,会对这两点产生困惑,所以这里直接开门见山对这两点进行讲解。
Flink Checkpoint 是一种容错恢复机制。这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。Checkpoint 对于用户层面,是透明的,用户会感觉程序一直在运行。Flink Checkpoint 是 Flink 自身的系统行为,用户无法对其进行交互,用户可以在程序启动之前,设置好实时程序 Checkpoint 相关参数,当程序启动之后,剩下的就全交给 Flink 自行管理。当然在某些情况,比如 Flink On Yarn 模式,某个 Container 发生 OOM 异常,这种情况程序直接变成失败状态,此时 Flink 程序虽然开启 Checkpoint 也无法恢复,因为程序已经变成失败状态,所以此时可以借助外部参与启动程序,比如外部程序检测到实时任务失败时,从新对实时任务进行拉起。
Flink Savepoint 你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级,或者修改并发度等情况,还能从保存的状态位继续启动恢复。Flink Savepoint 一般存储在 HDFS 上面,它需要用户主动进行触发。如果是用户自定义开发的实时程序,比如使用DataStream进行开发,建议为每个算子定义一个 uid,这样我们在修改作业时,即使导致程序拓扑图改变,由于相关算子 uid 没有变,那么这些算子还能够继续使用之前的状态,如果用户没有定义 uid , Flink 会为每个算子自动生成 uid,如果用户修改了程序,可能导致之前的状态程序不能再进行复用。
Checkpoint 和 Savepoint 差异对比:
Flink Checkpoint 机制保证 Flink 任务运行突然失败时,能够从最近 Checkpoint 进行状态恢复启动,进行错误容忍。它是一种自动容错机制,而不是具体的状态存储镜像。Flink Checkpoint 受 Chandy-Lamport 分布式快照启发,其内部使用分布式数据流轻量级异步快照。
Checkpoint 保存的状态在程序取消时,默认会进行清除。Checkpoint 状态保留策略有两种:
DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION
DELETE_ON_CANCELLATION 表示当程序取消时,删除 Checkpoint 存储文件。 RETAIN_ON_CANCELLATION 表示当程序取消时,保存之前的 Checkpoint 存储文件 用户可以结合业务情况,设置 Checkpoint 保留模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 开启 checkpoint */
env.enableCheckpointing(10000);
/** 设置 checkpoint 保留策略,取消程序时,保留 checkpoint 状态文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
默认情况下,Flin k不会触发一次 Checkpoint 当系统有其他 Checkpoint 在进行时,也就是说 Checkpoint 默认的并发为1。针对 Flink DataStream 任务,程序需要经历从 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图四个步骤,其中在 ExecutionGraph 构建时,会初始化 CheckpointCoordinator。ExecutionGraph通过ExecutionGraphBuilder.buildGraph方法构建,在构建完时,会调用 ExecutionGraph 的enableCheckpointing方法创建CheckpointCoordinator:
public void enableCheckpointing(
CheckpointCoordinatorConfiguration chkConfig,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
List<MasterTriggerRestoreHook<?>> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) {
// 前面部分代码省略....
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
jobInformation.getJobId(),
chkConfig,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
checkpointStateBackend,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY,
failureManager);
// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
if (!checkpointCoordinator.addMasterHook(hook)) {
LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
}
}
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// 后面部分代码省略....
CheckpoinCoordinator 是 Flink 任务 Checkpoint 的关键,针对每一个 Flink 任务,都会初始化一个 CheckpointCoordinator 类,来触发 Flink 任务 Checkpoint。下面是 Flink 任务 Checkpoint 大致流程:
Flink 会定时在任务的 Source Task 触发 Barrier,Barrier是一种特殊的消息事件,会随着消息通道流入到下游的算子中。只有当最后 Sink 端的算子接收到 Barrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成。所以在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,我们可以在Web UI上面看到各个 Task 的Barrier 对齐时间
Flink Checkpoint 支持两种语义:Exactly Once 和 At least Once,默认的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具体是针对 Flink 状态而言。具体语义含义如下:
Exactly Once 含义是:保证每条数据对于 Flink 的状态结果只影响一次。打个比方,比如 WordCount程序,目前实时统计的 "hello" 这个单词数为5,同时这个结果在这次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又来2个 "hello" 单词,突然程序遇到外部异常容错自动回复,从最近的 Checkpoint 点开始恢复,那么会从单词数 5 这个状态开始恢复,Kafka 消费的数据点位还是状态 5 这个时候的点位开始计算,所以即使程序遇到外部异常自我恢复,也不会影响到 Flink 状态的结果。
At Least Once 含义是:每条数据对于 Flink 状态计算至少影响一次。比如在 WordCount 程序中,你统计到的某个单词的单词数可能会比真实的单词数要大,因为同一条消息,你可能将其计算多次。
Flink 中 Exactly Once 和 At Least Once 具体是针对 Flink 任务状态而言的,并不是 Flink 程序对其处理一次。举个例子,当前 Flink 任务正在做 Checkpoint,该次Checkpoint还么有完成,该次 Checkpoint 时间端的数据其实已经进入 Flink 程序处理,只是程序状态没有最终存储到远程存储。当程序突然遇到异常,进行容错恢复,那么就会从最新的 Checkpoint 进行状态恢复重启,上一部分还会进入 Flink 系统处理:
上图中表示,在进行 chk-5 Checkpoint 时,突然遇到程序异常,那么会从 chk-4 进行恢复,那么之前chk-5 处理的数据,会再次进行处理。
Exactly Once 和 At Least Once 具体在底层实现大致相同,具体差异表现在 Barrier 对齐方式处理:
如果是 Exactly Once 模式,某个算子的 Task 有多个输入通道时,当其中一个输入通道收到 Barrier 时,Flink Task 会阻塞处理该通道,其不会处理这些数据,但是会将这些数据存储到内部缓存中,一旦完成了所有输入通道的 Barrier 对齐,才会继续对这些数据进行消费处理。
对于 At least Once,同样针对某个算子的 Task 有多个输入通道的情况下,当某个输入通道接收到 Barrier 时,它不同于Exactly Once,At Least Once 会继续处理接受到的数据,即使没有完成所有输入通道 Barrier 对齐。所以使用At Least Once 不能保证数据对于状态计算只有一次影响。
state.backend.local-recovery
值为true
进行激活。下图是不设置 Checkpoint 最小时间间隔示例图,可以看到,系统一致在进行 Checkpoint,可能对运行的任务产生一定影响:
Flink Savepoint 作为实时任务的全局镜像,其在底层使用的代码和Checkpoint的代码是一样的,因为Savepoint可以看做 Checkpoint在特定时期的一个状态快照。
Flink 在触发Savepoint 或者 Checkpoint时,会根据这次触发的类型计算出在HDFS上面的目录:
如果类型是 Savepoint,那么 其 HDFS 上面的目录为:Savepoint 根目录+savepoint-jobid前六位+随机数字,具体如下格式:
Checkpoint 目录为 chk-checkpoint ID,具体格式如下:
一次 Savepoint 目录下面至少包括一个文件,既 _metadata
文件。当然如果实时任务某些算子有状态的话,那么在 这次 Savepoint目录下面会包含一个 _metadata
文件以及多个状态数据文件。_metadata
文件以绝对路径的形式指向状态文件的指针。
社区方面,在以前的 Flink 版本,当用户选择不同的状态存储,其底层状态存储的二进制格式都不相同。针对这种情况,目前 FLIP-41 对于 Keyed State 使用统一的二进制文件进行存储。这里的 Keyed State 主要是针对 Savepoint 的状态,Checkpoint 状态的存储可以根据具体的状态后端进行存储,允许状态存储底层格式的差异。对于 Savepoint 状态底层格式的统一,应用的状态可以在不同的状态后端进行迁移,更方便应用程序的恢复。重做与状态快照和恢复相关的抽象,当实现实现新状态后端时,可以降低开销,同时减少代码重复。
Flink Savepoint 触发方式目前有三种:
flink savepoint
命令触发 Savepoint,其是在程序运行期间触发 savepoint,flink cancel -s
命令,取消作业时,并触发 Savepoint./jobs/:jobid /savepoints
flink cancel -s
命令取消作业同时触发 Savepoint 时,会有一个问题,可能存在触发 Savepoint 失败。比如实时程序处于异常状态(比如 Checkpoint失败),而此时你停止作业,同时触发 Savepoint,这次 Savepoint 就会失败,这种情况会导致,在实时平台上面看到任务已经停止,但是实际实时作业在 Yarn 还在运行。针对这种情况,需要捕获触发 Savepoint 失败的异常,当抛出异常时,可以直接在 Yarn 上面 Kill 掉该任务。uid
,这样即使作业拓扑图变了,相关算子还是能够从之前的状态进行恢复,默认情况下,Flink 会为每个算子分配 uid
,这种情况下,当你改变了程序的某些逻辑时,可能导致算子的 uid
发生改变,那么之前的状态数据,就不能进行复用,程序在启动的时候,就会报错。本文没有过多的讲述源码,考虑大家的都能够读懂,其语言竟可能通俗一一点。如果有需要改进的地方,希望大家能够指出。后续我会不断的和大家一起大数据相关的技术,和大家一起交流学习。