上文中,我们一起了解了 一文搞定 Flink 消费消息的全流程,接下来呢,我们一起来看一下 checkpoint barrier 的全流程。
首先呢,Job 启动的时候,Flink 会 startCheckpointScheduler
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
// make sure all prior timers are cancelled
periodicScheduling = true;
long initialDelay = ThreadLocalRandom.current().nextLong(
minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
// 定时任务
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
通过定时任务来触发 checkpoint。 到 Task.triggerCheckpoint
// trigger operator chain task trigger checkpoint
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
return FutureUtils.completedExceptionally(new CheckpointException(message));
到 Task.triggerCheckpointBarrier
// trigger operator chain trigger checkpoint 最终触发 triggerCheckpointBarrier
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
//实际上就是 StreamTask Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
// source ->flatMap
// invokable 实际上是 operator chain
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
// build a local closure
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
Runnable runnable = new Runnable() {
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
try {
// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
} finally {
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
} else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
我们以 SourceStreamTask 为例,进入
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + getName() + '.', e);
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
return false;
// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// Step (2): Send the checkpoint barrier downstream
发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的
// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
else {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
Exception exception = null;
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
try {
//类似于 barrier 的另一种消息
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
if (exception != null) {
throw exception;
return false;
整个 Flink Graph 首次出现 checkpoint barrier。 需要注意的是主动触发 checkpoint 的只有 trigger operator( 在生成 ExecutionGraph 时会生成 trigger operator,ack operator,confirm operator,这些task 本质上是 operator chain ) ,trigger operator 我们可以简单的理解成 streamSource operator。 换言之,streamSource operator 触发了 checkpoint,一直到把 checkpoint 广播到下游,最后做 checkpoint state ( StreamSource operator 的 state )。 具体是怎么广播到下游的,其实与普通消息的传递类似,可以参考 一文搞定 Flink 消费消息的全流程
然后下游的算子 比如 flatMap 在 OneInputStreamTask ( 以此为例 ) 中消费消息
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
接下来,直接到 BarrierBuffer (当设置 checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 时 )
// 从 ResultSubPartition 中获取数据
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
Optional<BufferOrEvent> next;
if (currentBuffered == null) {
// 如果当前有堆积的 boe,直接从 InputGate 中获取,否则从缓存中获取(通过 BufferSpiller 缓存的数据)
// 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据
next = inputGate.getNextBufferOrEvent();
else {
next = Optional.ofNullable(currentBuffered.getNext());
if (!next.isPresent()) {
return getNextNonBlocked();
if (!next.isPresent()) {
if (!endOfStream) {
// end of input stream. stream continues with the buffered data
endOfStream = true;
return getNextNonBlocked();
else {
// final end of both input and buffered data
return null;
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
// if the channel is blocked, we just store the BufferOrEvent
// barrier 对齐
else if (bufferOrEvent.isBuffer()) {
return bufferOrEvent;
// 处理 barrier
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
if (!endOfStream) {
// process barriers only if there is a chance of the checkpoint completing
//包括 source 其实都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
return bufferOrEvent;
接下来就是最为关键的逻辑 处理 barrier
// 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint,
// 如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来,
// 如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
if (totalNumberOfInputChannels == 1) {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
// 触发 checkpoint
// -- general code path for multiple input channels --
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
if (barrierId == currentCheckpointId) {
// regular case
else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
// let the task know we are not completing this
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
// abort the current checkpoint
// begin a the new checkpoint
beginNewAlignment(barrierId, channelIndex);
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
beginNewAlignment(barrierId, channelIndex);
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
// barrier 对齐之后才会触发 checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
// 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(),
// 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的
最终 notifyCheckpoint 有会调用 StreamTask 的 performCheckpoint ,开始 flatMap 的 checkpoint barrier 一些列操作,比如广播 barrier,然后做自己的 checkpoint state。循环往复,直至最后。