Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Apache-Flink深度解析-State

Apache-Flink深度解析-State

作者头像
王知无-import_bigdata
修改于 2024-03-12 01:54:38
修改于 2024-03-12 01:54:38
1.4K00
代码可运行
举报
运行总次数:0
代码可运行

转载自:https://dwz.cn/xrMCqbk5

摘要:

实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算。如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数据重新新计算一次,还是每次计算都是在上一次计算结果之上进行增量计算呢?答案是肯定的,Apache Flink是基于上一次的计算结果进行增量计算的。

实际问题

在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算。如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数据重新新计算一次,还是每次计算都是在上一次计算结果之上进行增量计算呢?答案是肯定的,Apache Flink是基于上一次的计算结果进行增量计算的。那么问题来了: "上一次的计算结果保存在哪里,保存在内存可以吗?",答案是否定的,如果保存在内存,在由于网络,硬件等原因造成某个计算节点失败的情况下,上一次计算结果会丢失,在节点恢复的时候,就需要将历史上所有数据(可能十几天,上百天的数据)重新计算一次,所以为了避免这种灾难性的问题发生,Apache Flink 会利用State存储计算结果。本篇将会为大家介绍Apache Flink State的相关内容。

什么是State

这个问题似乎有些"弱智"?不管问题的答案是否显而易见,但我还是想简单说一下在Apache Flink里面什么是State?State是指流计算过程中计算节点的中间计算结果或元数据属性,比如 在aggregation过程中要在state中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照。

为什么需要State

与批计算相比,State是流计算特有的,批计算没有failover机制,要么成功,要么重新计算。流计算在 大多数场景 下是增量计算,数据逐条处理(大多数场景),每次计算是在上一次计算结果之上进行处理的,这样的机制势必要将上一次的计算结果进行存储(生产模式要持久化),另外由于 机器,网络,脏数据等原因导致的程序错误,在重启job时候需要从成功的检查点(checkpoint,后面篇章会专门介绍)进行state的恢复。增量计算,Failover这些机制都需要state的支撑。

State 实现

Apache Flink内部有四种state的存储实现,具体如下:

  • 基于内存的HeapStateBackend - 在debug模式使用,不 建议在生产模式下应用;
  • 基于HDFS的FsStateBackend - 分布式文件持久化,每次读写都产生网络IO,整体性能不佳;
  • 基于RocksDB的RocksDBStateBackend - 本地文件+异步HDFS持久化;
  • 还有一个是基于Niagara(Alibaba内部实现)NiagaraStateBackend - 分布式持久化- 在Alibaba生产环境应用;

State 持久化逻辑

Apache Flink版本选择用RocksDB+HDFS的方式进行State的存储,State存储分两个阶段,首先本地存储到RocksDB,然后异步的同步到远程的HDFS。 这样而设计既消除了HeapStateBackend的局限(内存大小,机器坏掉丢失等),也减少了纯分布式存储的网络IO开销。

State 分类

Apache Flink 内部按照算子和数据分组角度将State划分为如下两类:

  • KeyedState - 这里面的key是我们在SQL语句中对应的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段组成的Row的字节数组,每一个key都有一个属于自己的State,key与key之间的State是不可见的;
  • OperatorState - Apache Flink内部的Source Connector的实现中就会用OperatorState来记录source数据读取的offset。

State 扩容重新分配

Apache Flink是一个大规模并行分布式系统,允许大规模的有状态流处理。 为了可伸缩性,Apache Flink作业在逻辑上被分解成operator graph,并且每个operator的执行被物理地分解成多个并行运算符实例。 从概念上讲,Apache Flink中的每个并行运算符实例都是一个独立的任务,可以在自己的机器上调度到网络连接的其他机器运行。

Apache Flink的DAG图中只有边相连的节点有网络通信,也就是整个DAG在垂直方向有网络IO,在水平方向如下图的stateful节点之间没有网络通信,这种模型也保证了每个operator实例维护一份自己的state,并且保存在本地磁盘(远程异步同步)。通过这种设计,任务的所有状态数据都是本地的,并且状态访问不需要任务之间的网络通信。 避免这种流量对于像Apache Flink这样的大规模并行分布式系统的可扩展性至关重要。

如上我们知道Apache Flink中State有OperatorState和KeyedState,那么在进行扩容时候(增加并发)State如何分配呢?比如:外部Source有5个partition,在Apache Flink上面由Srouce的1个并发扩容到2个并发,中间Stateful Operation 节点由2个并发并扩容的3个并发,如下图所示:

在Apache Flink中对不同类型的State有不同的扩容方法,接下来我们分别介绍。

OperatorState对扩容的处理

我们选取Apache Flink中某个具体Connector实现实例进行介绍,以MetaQ为例,MetaQ以topic方式订阅数据,每个topic会有N>0个分区,以上图为例,加上我们订阅的MetaQ的topic有5个分区,那么当我们source由1个并发调整为2个并发时候,State是怎么恢复的呢? state 恢复的方式与Source中OperatorState的存储结构有必然关系,我们先看MetaQSource的实现是如何存储State的。首先MetaQSource 实现了ListCheckpointed<T extends Serializable>,其中的T是Tuple2<InputSplit,Long>,我们在看ListCheckpointed接口的内部定义如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface ListCheckpointed<T extends Serializable>; {
List<T> snapshotState(long var1, long var3) throws Exception;

void restoreState(List&lt;T&gt; var1) throws Exception;
}

我们发现 snapshotState方法的返回值是一个List<T>,T是Tuple2<InputSplit,Long>,也就是snapshotState方法返回List<Tuple2<InputSplit,Long>>,这个类型说明state的存储是一个包含partiton和offset信息的列表,InputSplit代表一个分区,Long代表当前partition读取的offset。InputSplit有一个方法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface InputSplit extends Serializable {
int getSplitNumber();
}

也就是说,InputSplit我们可以理解为是一个Partition索引,有了这个数据结构我们在看看上面图所示的case是如何工作的?当Source的并行度是1的时候,所有打partition数据都在同一个线程中读取,所有partition的state也在同一个state中维护,State存储信息格式如下:

如果我们现在将并发调整为2,那么我们5个分区的State将会在2个独立的任务(线程)中进行维护,在内部实现中我们有如下算法进行分配每个Task所处理和维护partition的State信息,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
List<Integer> assignedPartitions = new LinkedList<>();
for (int i = 0; i < partitions; i++) {
if (i % consumerCount == consumerIndex) {
assignedPartitions.add(i);
}
}

这个求mod的算法,决定了每个并发所处理和维护partition的State信息,针对我们当前的case具体的存储情况如下:

那么到现在我们发现上面扩容后State得以很好的分配得益于OperatorState采用了List<T>的数据结构的设计。另外大家注意一个问题,相信大家已经发现上面分配partition的算法有一个限制,那就是Source的扩容(并发数)是否可以超过Source物理存储的partition数量呢?答案是否定的,不能。目前Apache Flink的做法是提前报错,即使不报错也是资源的浪费,因为超过partition数量的并发永远分配不到待管理的partition。

KeyedState对扩容的处理

对于KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState一样,这种分配方式大多数情况是恢复的state不是本地已有的state,需要一次网络拷贝,这种效率比较低,OperatorState采用这种简单的方式进行处理是因为OperatorState的state一般都比较小,网络拉取的成本很小,对于KeyedState往往很大,我们会有更好的选择,在Apache Flink中采用的是Key-Groups方式进行分配。

什么是Key-Groups

Key-Groups 是Apache Flink中对keyed state按照key进行分组的方式,每个key-group中会包含N>0个key,一个key-group是State分配的原子单位。在Apache Flink中关于Key-Group的对象是 KeyGroupRange, 如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class KeyGroupRange implements KeyGroupsList, Serializable {
...
...
private final int startKeyGroup;
private final int endKeyGroup;
...
...
}

KeyGroupRange两个重要的属性就是 startKeyGroup和endKeyGroup,定义了startKeyGroup和endKeyGroup属性后Operator上面的Key-Group的个数也就确定了。

什么决定Key-Groups的个数

key-group的数量在job启动前必须是确定的且运行中不能改变。由于key-group是state分配的原子单位,而每个operator并行实例至少包含一个key-group,因此operator的最大并行度不能超过设定的key-group的个数,那么在Apache Flink的内部实现上key-group的数量就是最大并行度的值。

GroupRange.of(0, maxParallelism)如何决定key属于哪个Key-Group 确定好GroupRange之后,如何决定每个Key属于哪个Key-Group呢?我们采取的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法会将key划分到指定的key-group中,如下:

如上实现我们了解到分配Key到指定的key-group的逻辑是利用key的hashCode和maxParallelism进行取余操作来分配的。如下图当parallelism=2,maxParallelism=10的情况下流上key与key-group的对应关系如下图所示:

如上图key(a)的hashCode是97,与最大并发10取余后是7,被分配到了KG-7中,流上每个event都会分配到KG-0至KG-9其中一个Key-Group中。 每个Operator实例如何获取Key-Groups 了解了Key-Groups概念和如何分配每个Key到指定的Key-Groups之后,我们看看如何计算每个Operator实例所处理的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex);
int startGroup = splitRange.getStartGroup();
int endGroup = splitRange.getEndGroup();
return new KeyGroupRange(startGroup, endGroup - 1);
}

public GroupRange getSplitRange(int numSplits, int splitIndex) {
...
final int numGroupsPerSplit = getNumGroups() / numSplits;
final int numFatSplits = getNumGroups() % numSplits;

int startGroupForThisSplit;
int endGroupForThisSplit;
if (splitIndex &lt; numFatSplits) {
startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1);
endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit + 1;
} else {
startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits;
endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit;
}
if (startGroupForThisSplit &gt;= endGroupForThisSplit) {
return GroupRange.emptyGroupRange();
} else {
return new GroupRange(startGroupForThisSplit, endGroupForThisSplit);
}
}

上面代码的核心逻辑是先计算每个Operator实例至少分配的Key-Group个数,将不能整除的部分N个,平均分给前N个实例。最终每个Operator实例管理的Key-Groups会在GroupRange中表示,本质是一个区间值;下面我们就上图的case,说明一下如何进行分配以及扩容后如何重新分配。 假设上面的Stateful Operation节点的最大并行度maxParallelism的值是10,也就是我们一共有10个Key-Group,当我们并发是2的时候和并发是3的时候分配的情况如下图:

如上算法我们发现在进行扩容时候,大部分state还是落到本地的,如Task0只有KG-4被分出去,其他的还是保持在本地。同时我们也发现,一个job如果修改了maxParallelism的值那么会直接影响到Key-Groups的数量和key的分配,也会打乱所有的Key-Group的分配,目前在Apache Flink系统中统一将maxParallelism的默认值调整到4096,最大程度的避免无法扩容的情况发生。

本篇简单介绍了Apache Flink中State的概念,并重点介绍了OperatorState和KeyedState在扩容时候的处理方式。Apache Flink State是支撑Apache Flink中failover,增量计算,Window等重要机制和功能的核心设施。后续介绍failover,增量计算,Window等相关篇章中也会涉及State的利用,当涉及到本篇没有覆盖的内容时候再补充介绍。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Apache-Flink深度解析-State
摘要:实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算。如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数据重新新计算一次,还是每次计算都是在上一次计算结果之上进行增量计算呢?答案是肯定的,Apache Flink是基于上一次的计算结果进行增量计算的。
王知无-import_bigdata
2019/04/24
7090
Apache-Flink深度解析-State
Flink State 状态原理解析
State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。
吴云涛
2024/03/12
4460
Flink State 状态原理解析
eBay:Flink的状态原理讲一下……
状态在 Flink 中叫作 State,用来保存中间计算结果或者缓存数据。根据是否需要保存中间结果,分为无状态计算和有状态计算。对于流计算而言,时间持续不断地产生,如果每次计算都是相互独立的,不依赖于上下游的事件,则是无状态计算。如果计算需要依赖于之前或者后续的事件,则是有状态计算。State 是实现有状态计算的下的 Exactly-Once 的基础。
857技术社区
2022/05/17
9310
eBay:Flink的状态原理讲一下……
云原生架构下B站Flink存算分离的改造实践
在当前整个行业及公司内部降本增效的大背景下,B站内部也在积极推进实时与在线业务资源的整合,往云原生架构迁移,统一资源池与调度,提升资源利用效率。不过面临的现实问题就是,不同业务场景下,资源的规格诉求不尽相同。在线的业务资源池,由于在线业务的属性,一般只具备很强的计算能力而基本不带存储以及io能力。Flink虽然是一个计算引擎,但是由于其stateful的特性,在很多计算场景下,对存储和io其实有比较强的诉求,因此实时的资源池,同时具备很强的存算能力。两种资源池的整合,必然面临兼容性问题,考虑到大数据整体的存算分离发展趋势,我们尝试对Flink进行存算分离的改造,核心工作就是statebackend的远程化。
从大数据到人工智能
2023/05/03
9590
云原生架构下B站Flink存算分离的改造实践
学习Flink,看这篇就够了
批处理在大数据世界有着悠久的历史。早期的大数据处理基本上是批处理的天下。批处理主要操作大容量的静态数据集,并在计算过程完成之后返回结果。所以批处理面对的数据集通常具有以下特征:
saintyyu
2021/11/22
3.2K1
学习Flink,看这篇就够了
深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)
导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce、Hive等;实时计算也被称作流计算,代表技术是Storm、Spark Streaming、Flink等。本文系统地介绍了流式计算的相关知识,并着重介绍了Flink的实现原理细节,便于大家快速地理解和掌握流式计算,并基于Flink完成业务开发。 一、流式计算和批处理 批处理在大数据世界有着悠久的历史。早期的大数据处理基本上是批处理的天下。批处理主要操作大容量的静态数据集,并在计算过
腾讯云开发者
2021/11/10
1.7K0
深入研究Apache Flink中的可缩放状态
•本来打算写一个flink源码分析的系列文章,但由于事情太多,又不太想输出低质量的文章,所以开始看一些好的flink相关博客,本文译自https://www.ververica.com/blog/apache-flink-at-mediamath-rescaling-stateful-applications ;•flink中state的划分和介绍;•flink 中operator state在什么时候会进行rescale以及如何进行rescale?;•flink 中keyed state的when and how?。
山行AI
2021/03/24
1.7K0
Apache-Flink深度解析-SQL概览
SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Language](结构化英语查询语言),旨在操纵和检索存储在IBM原始准关系数据库管理系统System R中的数据。SEQUEL后来改为SQL,因为“SEQUEL”是英国Hawker Siddeley飞机公司的商标。我们看看这款用于特技飞行的英国皇家空军豪客Siddeley Hawk T.1A (Looks great):
王知无-import_bigdata
2019/03/26
7780
ApacheFlink深度解析-FaultTolerance
本系列文章来自云栖社区,对Flink的解析兼具广度和深度,适合对Flink有一定研究的同学学习。
王知无-import_bigdata
2019/03/19
7520
ApacheFlink深度解析-FaultTolerance
Apache-Flink深度解析-概述
Apache Flink 的命脉 "命脉" 即生命与血脉,常喻极为重要的事物。系列的首篇,首篇的首段不聊Apache Flink的历史,不聊Apache Flink的架构,不聊Apache Flink的功能特性,我们用一句话聊聊什么是 Apache Flink 的命脉?我的答案是:Apache Flink 是以"批是流的特例"的认知进行系统设计的。
王知无-import_bigdata
2019/03/12
1.4K2
Apache-Flink-持续查询(ContinuousQueries)
摘要:实际问题 我们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,如果计算永远不会结束的话,那么计算结果何时输出呢?本篇将介绍Apache Flink利用持续查询来对流计算结果进行持续输出的实现原理。
王知无-import_bigdata
2019/04/08
1.6K0
Apache-Flink-持续查询(ContinuousQueries)
零基础学Flink:状态与容错
在上一篇《零基础学Flink:实时热销榜Top5(案例)》文档中我们介绍了如何计算实时热销榜。在案例的最后TopNHot类中,我们使用了状态类。
麒思妙想
2020/07/10
4480
2021年大数据Flink(二十六):​​​​​​​State代码示例
下图就 word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:
Lansonli
2021/10/09
7150
Apache-Flink-持续查询(ContinuousQueries)
我们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,如果计算永远不会结束的话,那么计算结果何时输出呢?本篇将介绍Apache Flink利用持续查询来对流计算结果进行持续输出的实现原理。
王知无-import_bigdata
2019/03/26
2.1K0
[源码解析] Flink UDAF 背后做了什么
本文涉及到Flink SQL UDAF,Window 状态管理等部分,希望能起到抛砖引玉的作用,让大家可以借此深入了解这个领域。
罗西的思考
2020/09/07
1.2K0
[源码解析] Flink UDAF 背后做了什么
Apache Flink 简单介绍和入门
Apache Flink 是一个==分布式大数据处理引擎==,可对==有限数据流和无限数据流==进行==有状态计算==。可部署在==各种集群环境==,对各种大小的数据规模进行快速计算。
星哥玩云
2022/07/27
5140
Apache Flink 简单介绍和入门
《基于Apache Flink的流处理》读书笔记
前段时间详细地阅读了 《Apache Flink的流处理》 这本书,作者是 Fabian Hueske&Vasiliki Kalavri,国内崔星灿翻译的,这本书非常详细、全面得介绍了Flink流处理,并且以气象数据的例子讲解其中的使用,我把其中一些比较重要的句子做了比较,并且分享给大家。有一些我不是很理解,需要以后慢慢去消化,我就不做详细的展开。
班班
2022/10/11
1.2K0
《基于Apache Flink的流处理》读书笔记
干货 | Flink Connector 深度解析
作者介绍:董亭亭,快手大数据架构实时计算引擎团队负责人。目前负责 Flink 引擎在快手内的研发、应用以及周边子系统建设。2013 年毕业于大连理工大学,曾就职于奇虎 360、58 集团。主要研究领域包括:分布式计算、调度系统、分布式存储等系统。
大数据真好玩
2019/09/12
2.6K0
干货 | Flink Connector 深度解析
独家 | 一文读懂Apache Flink技术
本文来自9月1日在成都举行的Apache Flink China Meetup,分享来自于云邪。
数据派THU
2018/12/04
9860
独家 | 一文读懂Apache Flink技术
Flink 极简教程: 架构及原理 Apache Flink® — Stateful Computations over Data Streams
Apache Flink 是一个分布式流计算引擎,用于在无边界和有边界数据流上进行有状态的计算。
一个会写诗的程序员
2022/01/04
3.4K0
Flink 极简教程: 架构及原理 Apache Flink® — Stateful Computations over Data Streams
相关推荐
Apache-Flink深度解析-State
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验