在本文中,我们将深入探讨Flink新颖的检查点机制是如何工作的,以及它是如何取代旧架构以实现流容错和恢复。我们在各种类型的流处理应用程序上对Flink性能进行测试,并通过在Apache Storm(一种广泛使用的低延迟流处理器)上运行相同的实验来进行对比。
在流处理中保证高性能同时又要保证容错是比较困难的。在批处理中,当作业失败时,可以容易地重新运行作业的失败部分来重新计算丢失的结果。这在批处理中是可行的,因为文件可以从头到尾重放。但是在流处理中却不能这样处理。数据流是无穷无尽的,没有开始点和结束点。带有缓冲的数据流可以进行重放一小段数据,但从最开始重放数据流是不切实际的(流处理作业可能已经运行了数月)。此外,与仅具有输入和输出的批处理作业相比,流计算是有状态的。这意味着除了输出之外,系统还需要备份和恢复算子状态。由于这个问题比较复杂,因此在开源生态系统中有许多容错方法去尝试解决这个问题。
用于容错机制对整个框架的架构有比较深的影响。很难将不同的容错机制进行插件化来整合到现有框架中。因此,在我们选择一个流处理框架时,容错机制也非常重要。
下面我们去了解一下流处理架构的几种容错方法,从记录确认
到微批处理
,事务更新
和分布式快照
。我们将从以下几个维度讨论不同方法的优缺点,最终选出融合不同方法优点适合流处理程序的融合方法:
上面我们忽略了一个共同特征,即失败后的快速恢复,不是因为它不重要,而是因为(1)所有介绍的系统都能够基于完全并行进行恢复,以及(2)在有状态的应用程序中,状态恢复的瓶颈通常在于存储而不是计算框架。
虽然流处理已经在金融等行业中广泛使用多年,但最近流处理才成为大数据基础设施的一部分。开源框架的可用性一直在推动着流处理的发展。开源中第一个广泛使用的大规模流处理框架可能是Apache Storm。Storm使用上游备份和记录确认机制来保证在失败后重新处理消息。请注意,Storm不保证状态一致性,任何可变状态的处理都需要委托给用户处理(Storm的Trident API可以确保状态一致性,将在下一节中介绍)。
记录确认机制的工作方式如下:算子(Operator)处理的每条记录都会向前一个算子发回一个已经处理过的确认。拓扑的 Source 节点会保留它产生的所有元组的一个备份。直到 Source 中记录收到其所产生的到Sink的所有派生记录的确认之后,就可以删除上游备份的备份。当发生故障时,如果没有收到所有的确认,Source 记录就会重新发送。这种机制可以保证不会丢失数据,但很有可能导致重复处理记录(我们称之为At-Least-Once语义)。Storm 使用一种巧妙的机制来实现这种容错方式,每个数据源记录只需要几个字节的存储空间就可以跟踪确认。Twitter Heron 保持与 Storm 相同的确认机制,但提高了记录重放的效率(从而提高了恢复时间和整体吞吐量)。
纯记录确认体系结构,无论其性能如何,都无法提供Exactly-once语义保证,这给应用程序开发人员带来了删除重复数据的负担。对于某些应用程序而言,这可能是可以接受的,但对于其他应用程可能并不能接受。Storm的机制的其他问题还有吞吐量低和流量控制的问题,在出现背压的情况下,记录确认机制会导致上游节点错误地认为数据处理出现了故障(实际上仅仅是由于出现背压导致记录来不及处理,而无法发送确认)。这导致了基于微批处理的流式架构的发展。
Storm和先前的流处理系统不能满足一些对大规模应用程序至关重要的需求,特别是高吞吐量,快速并行恢复以及托管状态的Exactly-once语义。
容错流式架构的下一个发展阶段是微批处理或离散化流。这个想法非常简单:为了解决连续计算模型(处理和缓冲记录)所带来的记录级别同步的复杂性和开销,连续计算分解为一系列小的原子性的批处理作业(称为微批次)。每个微批次可能会成功或失败,如果发生故障,重新计算最近的微批次即可。
微批处理可以应用到现有引擎(有能力进行数据流计算)之上。例如,可以在批处理引擎(例如,Spark)之上应用微批处理以提供流功能(这是Spark Streaming背后的基本机制),也可以应用于流引擎之上(例如,Storm)提供 Exactly-once 语义保证和状态恢复(这是Storm Trident背后的基本机制)。在 Spark Streaming 中,每个微批次计算都是一个 Spark 作业,而在 Trident 中,每个微批次中的所有记录都会被合并为一个大型记录。
基于微批处理的系统可以实现上面列出的多个的要求(Exactly-once语义保证,高吞吐量),但也有不足之处:
微批处理模型的最大局限可能是它连接了两个不应连接的概念:应用程序定义的窗口大小和系统内部恢复间隔。假设一个程序(下面示例是Flink代码)每5秒聚合一次记录:
dataStream
.map(transformRecords)
.groupBy("sessionId")
.window(Time.of(5, TimeUnit.SECONDS))
.sum("price")
这些应用非常适合微批处理模型。系统累积5秒的数据,对它们求和,并在流上进行一些转换后进行聚合计算。下游应用程序可以直接消费上述5秒聚合后的结果,例如在仪表板上显示。但是,现在假设背压开始起作用(例如,由于计算密集型的 transformRecords 函数),或者 devops 团队决定通过将时间间隔增加到10秒来控制作业的吞吐量。然后,微批次大小变的不可控制(在出现背压情况下),或者直接变为10秒(第二种情况)。这意味着下游应用程序(例如,包含最近5秒统计的 Web 仪表板)读取的聚合结果是错误的,下游应用程序需要自己处理此问题。
微批处理可以实现高吞吐量和Exactly-Once语义保证,但是当前的实现是以抛弃低延迟,流量控制和纯流式编程模型为代价实现上述目标的。显而易见的问题是,是否有两全其美的办法:保持连续计算模型的所有优势,同时还能保证Exactly-Once语义并提供高吞吐量。后面讨论的后流式架构实现了这种组合,并将微批处理作为流式处理的基本模型。
通常,微批处理被认为是一次处理一条记录的替代方法。这是一种错误的认识:连续算子不需要一次只处理一条记录。实际上,所有精心设计的流处理系统(包括下面讨论的Flink和Google Dataflow)在通过网络传输之前都会缓冲许多记录,同时又具备连续的处理能力。
在保留连续算子模型(低延迟,背压容错,可变状态等)的优势的同时又保证Exactly-Once处理语义的一种强大而又优雅的方法是原子性地记录需要处理的数据并更新到状态中。失败后,可以从日志中重新恢复状态以及需要处理的记录。
例如,在Google Cloud Dataflow中实现了此概念。系统将计算抽象为一次部署并长期运行的连续算子的DAG。在Dataflow中,shuffle是流式传输的,中间结果不需要物化(译者注:数据的计算结果放在内存中)。这为低延迟提供了一种自然的流量控制机制,因为中间过程的缓冲可以缓解背压,直到反压到数据源(基于Pull模式的数据源,例如Kafka消费者可以处理这个问题)。该模型还提供了一个优雅的流编程模型,可以提供更丰富的窗口而不是简单的基于时间的窗口以及可以更新到长期可变的状态中。值得注意的是,流编程模型包含微批处理模型。
例如,下面Google Cloud Dataflow程序(请参阅此处)会创建一个会话窗口,如果某个key的事件没有在10分钟内到达,则会触发该会话窗口。在10分钟后到达的数据将会启动一个新窗口。
PCollection<String> items = ...;
PCollection<String> session_windowed_items = items.apply(
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))))
这在流处理模型中很容易实现,但在微批处理模型中却很难实现,因为窗口不对应固定的微批量大小。
这种架构的容错工作原理如下。通过算子的每个中间记录与更新的状态以及后续产生的记录一起创建一个提交记录,该记录以原子性的方式追加到事务日志或插入到数据库中。在失败的情况下,重放部分数据库日志来恢复计算状态,以及重放丢失的记录。
Apache Samza遵循类似的方法,但只能提供At-Least-Once语义保证,因为它使用Apache Kafka作为后台存储。Kafka(现在)不提供事务编写器,因此对状态和后续产生的流记录的更新不能作为原子事务一起提交。
事务更新体系结构具有许多优点。事实上,它实现了我们在本文开头提出的所有需求。该体系结构的基础是能够频繁地写入具有高吞吐量的分布式容错存储系统中。分布式快照(在下一节中进行了解释)将拓扑的状态作为一个整体进行快照,从而减少了对分布式存储的写入量和频率。
提供 Exactly-Once 语义保证的问题实际上可以归结为确定当前流式计算所处的状态(包括正在处理中记录以及算子状态),然后生成该状态的一致性快照,并将快照存储在持久存储中。如果可以经常执行上述操作,那么从故障中恢复意味着仅从持久存储中恢复最新快照,并将流数据源(例如,Apache Kafka)回退到生成快照的时间点再次’重放’。Flink的分布式快照算法可以参阅本文; 在下文中,我们会给出一个简短的总结。
Flink的分布式快照算法基于Chandy和Lamport在1985年设计的一种算法,用于生成分布式系统当前状态的一致性快照(详细介绍请参阅此处),不会丢失信息且不会记录重复项。Flink使用的是Chandy Lamport算法的一个变种,定期生成正在运行的流拓扑的状态快照,并将这些快照存储到持久存储中(例如,存储到HDFS或内存中文件系统)。检查点的存储频率是可配置的。
这有点类似于微批处理方法,两个检查点之间的所有计算都作为一个原子整体,要么全部成功,要么全部失败。然而,只有这一点的相似之处。Chandy Lamport算法的一个重要特点是我们不必在流处理中按下’暂停’按钮(译者注:等待检查点完成之后)来调度下一个微批次。相反,常规数据处理一直运行,数据到达就会处理,而检查点发生在后台。引用原始论文:
全局状态检测算法会被设计在基础计算上:它们必须同时运行,但不能改变基础计算。
因此,这种架构融合了连续算子模型(低延迟,流量控制和真正的流编程模型),高吞吐量,Chandy-Lamport算法提供的的Exactly-Once语义保证的优点。除了备份有状态计算的状态(其他容错机制也需要这样做)之外,这种容错机制几乎没有其他开销。对于小状态(例如,计数或其他统计),备份开销通常可以忽略不计,而对于大状态,检查点间隔会在吞吐量和恢复时间之间进行权衡。
最重要的是,该架构将应用程序开发与流量控制和吞吐量控制分开。更改快照间隔对流作业的结果完全没有影响,因此下游应用程序可以放心地依赖于接收到的正确结果。
Flink的检查点机制基于流经算子和渠道的 ‘barrier’(认为是Chandy Lamport算法中的一种’标记’)来实现。Flink的检查点的描述改编自Flink文档。
‘Barrier’ 在 Source 节点中被注入到普通流数据中(例如,如果使用Apache Kafka作为源,’barrier’ 与偏移量对齐),并且作为数据流的一部分与数据流一起流过DAG。’barrier’ 将记录分为两组:当前快照的一部分(’barrier’ 表示检查点的开始),以及属于下一个快照的那些组。
‘Barrier’ 流向下游并在通过算子时触发状态快照。算子首先将所有流入的流分区的 ‘barrier’ 对齐(如果算子具有多个输入),并会缓存较快的分区数据(上游来源较快的流分区将被缓冲数据以等待来源较慢的流分区)。当算子从每个输入流中都收到 ‘barrier’ 时,会检查其状态(如果有)并写到持久存储中。一旦完成状态写检查,算子就将 ‘barrier’ 向下游转发。请注意,在此机制中,如果算子支持,则状态写检查既可以是异步(在写入状态时继续处理),也可以是增量(仅写入更改)。
一旦所有数据接收器(Sink)都收到 ‘barrier’,当前检查点就完成了。故障恢复意味着只需恢复最新的检查点状态,并从最新记录的 ‘barrier’ 对应的偏移量重放数据源。分布式快照在我们在本文开头所要达到的所有需求中得分很高。它们实现了高吞吐量的Exactly-Once语义保证,同时还保留了连续算子模型以及低延迟和自然流量控制。
下表总结了我们讨论的每个体系结构如何支持这些功能。
记录确认机制 | 微批次 | 事务更新 | 分布式快照 | |
---|---|---|---|---|
语义保证 | At Least Once | Exactly Once | Exactly One | Exactly One |
延迟 | 非常低 | 高 | 低(事务延迟) | 非常低 |
吞吐量 | 低 | 高 | 中到高(取决于分布式事务存储的吞吐量) | 高 |
计算模型 | 流式 | 微批次 | 流式 | 流式 |
容错开销 | 高 | 低 | 取决于分布式事务存储的吞吐量 | 低 |
流控制 | 有问题 | 有问题 | 自然 | 自然 |
应用程序逻辑与容错分离 | 部分(超时很重要) | 否(微批量大小会影响语义) | 是 | 是 |
为了说明Apache Flink的性能,我们进行了一系列实验来研究吞吐量,延迟以及容错机制的影响。下面所有实验都是在Google Compute Engine上进行,使用30个实例,每个实例包含4个内核和15 GB内存。所有Flink实验均使用截至7月24日的最新代码修订版进行,所有Storm实验均使用0.9.3版。可以在此处找到用于评估的所有代码。
我们在有30节点120个核的集群上测量Flink和Storm在两个不同程序上的吞吐量。第一个程序是并行流式grep任务,它在流中搜索包含与正则表达式匹配的字符串的事件。
Flink实现了每个核每秒150万个元素的连续吞吐量。这样集群的总吞吐量达到每秒1.82亿个元素。测试得到的Flink延迟为零,因为作业不涉及网络,也不涉及微批处理。当开启Flink容错机制,设置每5秒进行一次Checkpoint,我们只看到吞吐量的轻微下降(小于2%),没有引入任何延迟。
Storm集群在关闭记录确认机制的情况下(因此没有任何准确性保证)实现了每核每秒约82,000个元素的吞吐量,99%的处理延迟在10毫秒左右。集群的总吞吐量为每秒57万个元素。当启用记录确认机制(保证At-Least-Once语义)时,Storm的吞吐量降至每核每秒4700个元素,延迟也增加到30-120毫秒。接下来,我们配置了Storm Trident,其微批量大小为200,000个元组。Trident实现了每个核每秒75,000个元素的吞吐量(总吞吐量与关闭容错机制的Storm的大致相同)。然而,这是以3000毫秒的延迟(99%)为代价的。
我们可以看到Flink的吞吐量比Trident高出20倍以上,吞吐量比Storm高300倍。在保持高吞吐的情况下,Flink还保证延迟为零。我们还看到,不使用微批次处理模型,高吞吐量不会以延迟为代价。Flink还链接数据源和接收器任务形成任务链,从而仅在单个JVM内交换记录句柄。
我们还进行了如下实验,将核从40个扩展到120个。跟我们预期一样,所有框架都线性扩展,因为grep是一个易于并行处理的程序。现在让我们看一个不同的实验,它按键进行流分组,从而通过网络对流进行Shuffle。我们在30台机器的集群中运行此作业,其系统配置与以前相同。Flink实现了每核每秒大约720,000个事件的吞吐量,启动检查点后降至690,000。请注意,Flink在每个检查点都要备份算子的状态,而Storm则不支持。此示例中的状态相对较小(计数和摘要,每个检查点每个算子的大小小于1M)。具有At-Least-Once语义保证的Storm具有每核每秒约2,600个事件的吞吐量。
能够处理大规模事件是至关重要的。另一方面,在流处理中尤为重要的是延迟。对于欺诈检测或IT安全等应用程序,以毫秒为单位对事件进行处理意味着可以防止问题出现,而超过100毫秒的延迟通常意味着问题只能在问题发生之后才能发现,而这时候发现意义已经不大了。
当应用程序开发人员可以允许一定的延迟时,通常需要把延迟限制在一定范围内。我们测量流记录分组作业的几个延迟界限,该作业通过网络对数据进行Shuffle。下图显示了观察到的中位数延迟,以及第90百分位,第95百分位和第99百分位延迟(例如,50毫秒的第99百分位的延迟意味着99%的元素到达管道的末端不到50毫秒)。
在以最大吞吐量运行时,Flink的中位数延迟为26毫秒,第99百分位延迟为51毫秒,这意味着99%的延迟都低于51毫秒。打开Flink的检查点机制(启用Exact-Once语义保证)并没有增加可观察到的延迟。但此时,我们确实看到较高百分位数的延迟增加,观察到的延迟大约为150毫秒(译者注:没太搞懂)。出现延迟增加的原因是需要对齐流,算子等待接收所有输入的 ‘barrier’。Storm具有非常低的中位数延迟(1毫秒),并且第99百分位的延迟也是51毫秒。
对于大多数应用程序而言,让人感兴趣的是能够在可接受的延迟上维持高吞吐量,具体取决于特定应用程序的延迟要求。在Flink中,用户可以使用缓冲区超时时间(Buffer Timeout)来调整可接受的延迟。这是什么意思?Flink算子在将记录发送到下一个算子之前会暂存储在缓冲区中。通过指定缓冲区超时时间,例如10毫秒,我们可以告诉Flink在缓冲区满了时或者到达10毫秒时发送缓冲区数据。较低的缓冲区超时时间通常意味着较低的延迟,可能以吞吐量为代价。在上面的实验中,缓冲区超时时间设置为50毫秒,这解释了为什么99%的记录延迟在50毫秒以下。
下面说明了延迟如何影响Flink的吞吐量。因为较低的延迟保证意味着缓冲较少的数据,所以必然会产生一定的吞吐量成本。下图显示了不同缓冲区超时时间下的Flink吞吐量。该实验再次使用流记录分组作业。
如果指定缓冲区超时时间为零,流经算子的记录不会缓冲而是立即转发到下一个算子。在这个延迟优化设置中,Flink可以实现50%的元素延迟在0毫秒,以及99%的元素延迟在20毫秒以下。相应的吞吐量为每个核每秒24,500个事件。当我们增加缓冲区超时时间时,我们会看到延迟增加,吞吐量会同时增加,直到达到吞吐量峰值,缓冲区填充速度超过超时到期时间。缓冲区超时时间为50毫秒时,系统达到每个核每秒750,000个事件的吞吐量峰值,99%的处理延迟在50毫秒以下。
我们最后一个实验评估了检查点机制的正确性和恢复的开销。我们运行一个需要强一致性的流式程序,并定期杀死工作节点。
我们的测试程序受到网络安全/入侵检测等用例的启发,并使用规则来检查事件序列的有效性(例如,身份验证令牌,登录,服务交互)。该程序从Kafka并行读取事件流,并通过生成的实体(例如,IP地址或用户ID)对事件进行分组。对于每个事件,程序根据一些规则检测目前为止生成实体对应事件序列是否有效(例如,’服务交互’ 必须在 ‘登录’ 之前)。对于无效序列,程序会发布警报。如果没有Exactly-Once语义保证,发生故障时将不可避免地产生无效的事件序列并导致程序发布错误警报。
我们在一个30节点的集群中运行这个程序,其中 YARN chaos monkey
进程每5分钟杀死一个随机的YARN容器。我们保留备用 Worker(TaskManagers),这样系统可以在发生故障后立即取的新资源并继续运行,而无需等待YARN配置新容器。Flink将重新启动失败的 Worker 并在后台将其加入到集群,以确保备用Worker始终可用。
为了模拟的效果,我们使用并行数据生成器将事件推送到Kafka,这些生成器每个核的速度大约为每秒30,000个事件。下图显示了数据生成器的速率(红线),以及Flink作业从Kafka读取事件并使用规则验证事件序列的吞吐量(蓝线)。