Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >吾日三省吾身-深入理解Flink Checkpoint和Savepoint

吾日三省吾身-深入理解Flink Checkpoint和Savepoint

作者头像
LakeShen
发布于 2022-06-23 06:38:50
发布于 2022-06-23 06:38:50
1.1K00
代码可运行
举报
运行总次数:0
代码可运行

前言

为了保证程序的容错恢复以及程序启动时其状态恢复,几乎所有的 Flink 实时任务都会开启 Checkpoint 或者触发 Savepoint 进行状态保存。为了使得用户更加理解这两点区别,本文结合 Flink 1.9 版本,重点讲述 Flink Checkpoint,Savepoint 相关概念以及注意事项,使得用户能够更好的开发实时任务。

1. Checkpoint,Savepoint 异同

首先,为什么会在文章开头对这两点进行介绍,因为有时候用户在开发实时任务时,会对这两点产生困惑,所以这里直接开门见山对这两点进行讲解。

Flink Checkpoint 是一种容错恢复机制。这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。Checkpoint 对于用户层面,是透明的,用户会感觉程序一直在运行。Flink Checkpoint 是 Flink 自身的系统行为,用户无法对其进行交互,用户可以在程序启动之前,设置好实时程序 Checkpoint 相关参数,当程序启动之后,剩下的就全交给 Flink 自行管理。当然在某些情况,比如 Flink On Yarn 模式,某个 Container 发生 OOM 异常,这种情况程序直接变成失败状态,此时 Flink 程序虽然开启 Checkpoint 也无法恢复,因为程序已经变成失败状态,所以此时可以借助外部参与启动程序,比如外部程序检测到实时任务失败时,从新对实时任务进行拉起。

Flink Savepoint 你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级,或者修改并发度等情况,还能从保存的状态位继续启动恢复。Flink Savepoint 一般存储在 HDFS 上面,它需要用户主动进行触发。如果是用户自定义开发的实时程序,比如使用DataStream进行开发,建议为每个算子定义一个 uid,这样我们在修改作业时,即使导致程序拓扑图改变,由于相关算子 uid 没有变,那么这些算子还能够继续使用之前的状态,如果用户没有定义 uid , Flink 会为每个算子自动生成 uid,如果用户修改了程序,可能导致之前的状态程序不能再进行复用。

Checkpoint 和 Savepoint 差异对比:

  1. 概念:Checkpoint 是 自动容错机制 ,Savepoint 程序全局状态镜像 。
  2. 目的:Checkpoint 是 程序自动容错,快速恢复 。Savepoint是 程序修改后继续从状态恢复,程序升级等。
  3. 用户交互:Checkpoint 是 Flink 系统行为 。Savepoint是用户触发。
  4. 状态文件保留策略:Checkpoint默认程序删除,可以设置CheckpointConfig中的参数进行保留 。Savepoint会一直保存,除非用户删除 。

2. Flink Checkpoint

2.1 Flink Checkpoint 原理

Flink Checkpoint 机制保证 Flink 任务运行突然失败时,能够从最近 Checkpoint 进行状态恢复启动,进行错误容忍。它是一种自动容错机制,而不是具体的状态存储镜像。Flink Checkpoint 受 Chandy-Lamport 分布式快照启发,其内部使用分布式数据流轻量级异步快照。

Checkpoint 保存的状态在程序取消时,默认会进行清除。Checkpoint 状态保留策略有两种:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示当程序取消时,删除 Checkpoint 存储文件。 RETAIN_ON_CANCELLATION 表示当程序取消时,保存之前的 Checkpoint 存储文件 用户可以结合业务情况,设置 Checkpoint 保留模式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 开启 checkpoint */
env.enableCheckpointing(10000);
/** 设置 checkpoint 保留策略,取消程序时,保留 checkpoint 状态文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

默认情况下,Flin k不会触发一次 Checkpoint 当系统有其他 Checkpoint 在进行时,也就是说 Checkpoint 默认的并发为1。针对 Flink DataStream 任务,程序需要经历从 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图四个步骤,其中在 ExecutionGraph 构建时,会初始化 CheckpointCoordinator。ExecutionGraph通过ExecutionGraphBuilder.buildGraph方法构建,在构建完时,会调用 ExecutionGraph 的enableCheckpointing方法创建CheckpointCoordinator:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void enableCheckpointing(
            CheckpointCoordinatorConfiguration chkConfig,
            List<ExecutionJobVertex> verticesToTrigger,
            List<ExecutionJobVertex> verticesToWaitFor,
            List<ExecutionJobVertex> verticesToCommitTo,
            List<MasterTriggerRestoreHook<?>> masterHooks,
            CheckpointIDCounter checkpointIDCounter,
            CompletedCheckpointStore checkpointStore,
            StateBackend checkpointStateBackend,
            CheckpointStatsTracker statsTracker) {
        // 前面部分代码省略....


        // create the coordinator that triggers and commits checkpoints and holds the state
        checkpointCoordinator = new CheckpointCoordinator(
            jobInformation.getJobId(),
            chkConfig,
            tasksToTrigger,
            tasksToWaitFor,
            tasksToCommitTo,
            checkpointIDCounter,
            checkpointStore,
            checkpointStateBackend,
            ioExecutor,
            SharedStateRegistry.DEFAULT_FACTORY,
            failureManager);


        // register the master hooks on the checkpoint coordinator
        for (MasterTriggerRestoreHook<?> hook : masterHooks) {
            if (!checkpointCoordinator.addMasterHook(hook)) {
                LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
            }
        }
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    // 后面部分代码省略....

CheckpoinCoordinator 是 Flink 任务 Checkpoint 的关键,针对每一个 Flink 任务,都会初始化一个 CheckpointCoordinator 类,来触发 Flink 任务 Checkpoint。下面是 Flink 任务 Checkpoint 大致流程:

Flink 会定时在任务的 Source Task 触发 Barrier,Barrier是一种特殊的消息事件,会随着消息通道流入到下游的算子中。只有当最后 Sink 端的算子接收到 Barrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成。所以在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,我们可以在Web UI上面看到各个 Task 的Barrier 对齐时间

2.2 Flink Checkpoint 语义

Flink Checkpoint 支持两种语义:Exactly OnceAt least Once,默认的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具体是针对 Flink 状态而言。具体语义含义如下:

Exactly Once 含义是:保证每条数据对于 Flink 的状态结果只影响一次。打个比方,比如 WordCount程序,目前实时统计的 "hello" 这个单词数为5,同时这个结果在这次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又来2个 "hello" 单词,突然程序遇到外部异常容错自动回复,从最近的 Checkpoint 点开始恢复,那么会从单词数 5 这个状态开始恢复,Kafka 消费的数据点位还是状态 5 这个时候的点位开始计算,所以即使程序遇到外部异常自我恢复,也不会影响到 Flink 状态的结果。

At Least Once 含义是:每条数据对于 Flink 状态计算至少影响一次。比如在 WordCount 程序中,你统计到的某个单词的单词数可能会比真实的单词数要大,因为同一条消息,你可能将其计算多次。

Flink 中 Exactly Once 和 At Least Once 具体是针对 Flink 任务状态而言的,并不是 Flink 程序对其处理一次。举个例子,当前 Flink 任务正在做 Checkpoint,该次Checkpoint还么有完成,该次 Checkpoint 时间端的数据其实已经进入 Flink 程序处理,只是程序状态没有最终存储到远程存储。当程序突然遇到异常,进行容错恢复,那么就会从最新的 Checkpoint 进行状态恢复重启,上一部分还会进入 Flink 系统处理:

上图中表示,在进行 chk-5 Checkpoint 时,突然遇到程序异常,那么会从 chk-4 进行恢复,那么之前chk-5 处理的数据,会再次进行处理。

Exactly Once 和 At Least Once 具体在底层实现大致相同,具体差异表现在 Barrier 对齐方式处理:

如果是 Exactly Once 模式,某个算子的 Task 有多个输入通道时,当其中一个输入通道收到 Barrier 时,Flink Task 会阻塞处理该通道,其不会处理这些数据,但是会将这些数据存储到内部缓存中,一旦完成了所有输入通道的 Barrier 对齐,才会继续对这些数据进行消费处理。

对于 At least Once,同样针对某个算子的 Task 有多个输入通道的情况下,当某个输入通道接收到 Barrier 时,它不同于Exactly Once,At Least Once 会继续处理接受到的数据,即使没有完成所有输入通道 Barrier 对齐。所以使用At Least Once 不能保证数据对于状态计算只有一次影响。

2.3 Flink Checkpoint 参数配置及建议
  1. 当 Checkpoint 时间比设置的 Checkpoint 间隔时间要长时,可以设置 Checkpoint 间最小时间间隔 。这样在上次 Checkpoint 完成时,不会立马进行下一次 Checkpoint,而是会等待一个最小时间间隔,然后在进行该次 Checkpoint。否则,每次 Checkpoint 完成时,就会立马开始下一次 Checkpoint,系统会有很多资源消耗 Checkpoint。
  2. 如果Flink状态很大,在进行恢复时,需要从远程存储读取状态恢复,此时可能导致任务恢复很慢,可以设置 Flink Task 本地状态恢复。任务状态本地恢复默认没有开启,可以设置参数state.backend.local-recovery值为true进行激活。
  3. Checkpoint保存数,Checkpoint 保存数默认是1,也就是保存最新的 Checkpoint 文件,当进行状态恢复时,如果最新的Checkpoint文件不可用时(比如HDFS文件所有副本都损坏或者其他原因),那么状态恢复就会失败,如果设置 Checkpoint 保存数2,即使最新的Checkpoint恢复失败,那么Flink 会回滚到之前那一次Checkpoint进行恢复。考虑到这种情况,用户可以增加 Checkpoint 保存数。
  4. 建议设置的 Checkpoint 的间隔时间最好大于 Checkpoint 的完成时间。

下图是不设置 Checkpoint 最小时间间隔示例图,可以看到,系统一致在进行 Checkpoint,可能对运行的任务产生一定影响:

3. Flink Savepoint

3.1 Flink Savepoint 原理

Flink Savepoint 作为实时任务的全局镜像,其在底层使用的代码和Checkpoint的代码是一样的,因为Savepoint可以看做 Checkpoint在特定时期的一个状态快照。

Flink 在触发Savepoint 或者 Checkpoint时,会根据这次触发的类型计算出在HDFS上面的目录:

如果类型是 Savepoint,那么 其 HDFS 上面的目录为:Savepoint 根目录+savepoint-jobid前六位+随机数字,具体如下格式:

Checkpoint 目录为 chk-checkpoint ID,具体格式如下:

一次 Savepoint 目录下面至少包括一个文件,既 _metadata文件。当然如果实时任务某些算子有状态的话,那么在 这次 Savepoint目录下面会包含一个 _metadata 文件以及多个状态数据文件。_metadata文件以绝对路径的形式指向状态文件的指针。

社区方面,在以前的 Flink 版本,当用户选择不同的状态存储,其底层状态存储的二进制格式都不相同。针对这种情况,目前 FLIP-41 对于 Keyed State 使用统一的二进制文件进行存储。这里的 Keyed State 主要是针对 Savepoint 的状态,Checkpoint 状态的存储可以根据具体的状态后端进行存储,允许状态存储底层格式的差异。对于 Savepoint 状态底层格式的统一,应用的状态可以在不同的状态后端进行迁移,更方便应用程序的恢复。重做与状态快照和恢复相关的抽象,当实现实现新状态后端时,可以降低开销,同时减少代码重复。

3.2 Flink Savepoint 触发方式

Flink Savepoint 触发方式目前有三种:

  1. 使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint,
  2. 使用 flink cancel -s 命令,取消作业时,并触发 Savepoint.
  3. 使用 Rest API 触发 Savepoint,格式为:/jobs/:jobid /savepoints
3.3 Flink Savepoint 注意点
  1. 使用 flink cancel -s 命令取消作业同时触发 Savepoint 时,会有一个问题,可能存在触发 Savepoint 失败。比如实时程序处于异常状态(比如 Checkpoint失败),而此时你停止作业,同时触发 Savepoint,这次 Savepoint 就会失败,这种情况会导致,在实时平台上面看到任务已经停止,但是实际实时作业在 Yarn 还在运行。针对这种情况,需要捕获触发 Savepoint 失败的异常,当抛出异常时,可以直接在 Yarn 上面 Kill 掉该任务。
  2. 使用 DataStream 程序开发时,最好为每个算子分配 uid,这样即使作业拓扑图变了,相关算子还是能够从之前的状态进行恢复,默认情况下,Flink 会为每个算子分配 uid,这种情况下,当你改变了程序的某些逻辑时,可能导致算子的 uid 发生改变,那么之前的状态数据,就不能进行复用,程序在启动的时候,就会报错。
  3. 由于 Savepoint 是程序的全局状态,对于某些状态很大的实时任务,当我们触发 Savepoint,可能会对运行着的实时任务产生影响,个人建议如果对于状态过大的实时任务,触发 Savepoint 的时间,不要太过频繁。根据状态的大小,适当的设置触发时间。
  4. 当我们从 Savepoint 进行恢复时,需要检查这次 Savepoint 目录文件是否可用。可能存在你上次触发 Savepoint 没有成功,导致 HDFS 目录上面 Savepoint 文件不可用或者缺少数据文件等,这种情况下,如果在指定损坏的 Savepoint 的状态目录进行状态恢复,任务会启动不起来。

5. 总结

本文没有过多的讲述源码,考虑大家的都能够读懂,其语言竟可能通俗一一点。如果有需要改进的地方,希望大家能够指出。后续我会不断的和大家一起大数据相关的技术,和大家一起交流学习。

参考资料

  1. FLIP-41 Unify Binary format for Keyed State
  2. FlIP-47-Checkpoints vs Savepoints
  3. Apache Kafka Connector
  4. Flink Savepoints
  5. Flink Checkpoints
  6. Flink Checkpointing
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LakeShen 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
JobManager 数据结构-史上最通俗易懂的Flink源代码深入分析教程
Flink中,JobManager内部维护了多个数据结构,用于存储和管理作业的元数据信息。以下是JobManager中常用的数据结构:
jack.yang
2025/04/05
730
JobManager 数据结构-史上最通俗易懂的Flink源代码深入分析教程
Flink Checkpoint 原理流程以及常见失败原因分析
目前有赞实时任务主要以 Flink 为主,为了保证实时任务的容错恢复以及停止重启时的状态恢复,几乎所有的实时任务都会开启 Checkpoint 或者触发 Savepoint 进行状态保存。由于 Savepoint 底层原理的实现和 Checkpoint 几乎一致,本文结合 Flink 1.9 版本,重点讲述 Flink Checkpoint 原理流程以及常见原因分析,让用户能够更好的理解 Flink Checkpoint,从而开发出更健壮的实时任务。
有赞coder
2020/08/24
9510
Flink Checkpoint 原理流程以及常见失败原因分析
Flink可靠性的基石-checkpoint机制详细解析
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。
五分钟学大数据
2021/01/27
5.5K0
Flink可靠性的基石-checkpoint机制详细解析
Flink面试八股文(上万字面试必备宝典)
Flink是一个面向流处理和批处理的分布式数据计算引擎,能够基于同一个Flink运行,可以提供流处理和批处理两种类型的功能。 在 Flink 的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流。
五分钟学大数据
2022/01/05
2.5K0
Flink面试八股文(上万字面试必备宝典)
全网最全系列 | Flink原理+知识点总结(4万字、41知识点,66张图)
Flink四大基石分别是:Time (时间)、Window(窗口)、State (状态)、Checkpoint(检查点)。
大数据老哥
2022/04/07
5.2K0
全网最全系列 | Flink原理+知识点总结(4万字、41知识点,66张图)
Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)
我们在Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交一文中对Flink的Checkpoint做过详细的介绍。
大数据真好玩
2021/11/23
7.1K0
Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)
Flink高频面试题,附答案解析
Checkpoint容错机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport algorithm”算法。
五分钟学大数据
2021/07/06
2.8K0
Flink高频面试题,附答案解析
面试被问到Flink的checkpoint问题,给问懵逼了....
Checkpoint 机制
木野归郎
2020/07/02
1K0
面试被问到Flink的checkpoint问题,给问懵逼了....
图解 Flink Checkpoint 原理及在 1.11 版本的优化
上次发文,提到了 Flink 可以非常高效的进行有状态流的计算,通过使用 Flink 内置的 Keyed State 和 Operator State,保存每个算子的状态。
kk大数据
2020/12/29
2.6K0
图解 Flink Checkpoint 原理及在 1.11 版本的优化
深入理解 Flink 容错机制
场景描述:作为分布式系统,尤其是对延迟敏感的实时计算引擎,Apache Flink 需要有强大的容错机制,以确保在出现机器故障或网络分区等不可预知的问题时可以快速自动恢复并依旧能产生准确的计算结果。
大数据真好玩
2019/10/15
2.2K0
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
在 Flink 的框架中,进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。
大数据真好玩
2021/11/23
2.5K0
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
一文搞懂 checkpoint 全过程
前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。
shengjk1
2020/07/06
1.3K0
Flink DataStream—— 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理
​ 最近一次项目当中需要将大量数据保存再Flink程序当中用作缓存数据一共后续数据使用,隧对最近使用到的状态、检查点、保存点等原理和使用进行一个总结
俺也想起舞
2021/04/25
4.5K0
Flink DataStream—— 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理
学习Flink,看这篇就够了
批处理在大数据世界有着悠久的历史。早期的大数据处理基本上是批处理的天下。批处理主要操作大容量的静态数据集,并在计算过程完成之后返回结果。所以批处理面对的数据集通常具有以下特征:
saintyyu
2021/11/22
3.2K1
学习Flink,看这篇就够了
Flink学习笔记(5) -- Flink 状态(State)管理与恢复
  我们前面写的word count的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
挽风
2021/04/13
3.4K0
Flink学习笔记(5) -- Flink 状态(State)管理与恢复
五万字 | Flink知识体系保姆级总结
一、Flink简介 二、Flink 部署及启动 三、Flink 运行架构 四、Flink 算子大全 五、流处理中的 Time 与 Window 六、Flink 状态管理 七、Flink 容错 八、Flink SQL 九、Flink CEP 十、Flink CDC 十一、基于 Flink 构建全场景实时数仓 十二、Flink 大厂面试题
五分钟学大数据
2021/09/22
4.5K0
那个男人竟然不会Flink的CheckPoint机制
这里已经是Flink的第三篇原创啦。第一篇《Flink入门教程》讲解了Flink的基础和相关概念,第二篇《背压原理》讲解了什么是背压,在Flink背压大概的流程是怎么样的。
Java3y
2020/12/31
8760
那个男人竟然不会Flink的CheckPoint机制
Flink实战(10)-checkpoint容错保证
Checkpoint 是 自动容错恢复机制,Savepoint 某个时间点的全局状态镜像
JavaEdge
2024/08/06
1630
Flink实战(10)-checkpoint容错保证
Flink 实践之 Savepoint
保障 flink 作业在 配置迭代、flink 版本升级、蓝绿部署中的数据一致性,提高容错、降低恢复时间;
Flink 实战演练
2022/07/26
1.9K0
爆肝 3 月,3w 字、15 章节详解 Flink 状态管理!(建议收藏)
不多说了,本文从盘古开天辟地(状态是啥?)开始说 Flink State。如下为本文目录,诚意满满。
公众号:大数据羊说
2022/04/04
1.9K0
爆肝 3 月,3w 字、15 章节详解 Flink 状态管理!(建议收藏)
推荐阅读
相关推荐
JobManager 数据结构-史上最通俗易懂的Flink源代码深入分析教程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验