Flink 提供了不同的状态终端,可以指定状态的存储方式和位置。
状态可以存储在Java的堆内或堆外。根据你的状态终端,Flink 也可以管理应用程序的状态,这意味着 Flink 可以处理内存管理(可能会溢出到磁盘,如果有必要),以允许应用程序存储非常大的状态。默认情况下,配置文件 flink-conf.yaml 为所有Flink作业决定其状态终端。
但是,默认的状态终端配置也可以被每个作业的配置覆盖,如下所示。
Java版本:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);Scala版本:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(...)开箱即用,Flink 内置了如下状态终端:
MemoryStateBackendFsStateBackendRocksDBStateBackend如果没有配置,系统默认使用MemoryStateBackend。
MemoryStateBackend 将数据以对象的形式保存在 Java 堆上。键值对状态和窗口算子拥有保存值,触发器等的哈希表。
在进行检查点操作时,状态终端对状态进行快照,并将其作为检查点确认消息的一部分发送给 JobManager(master),并将存储在其堆上。
MemoryStateBackend 可以配置为使用异步快照。尽管我们强烈建议使用异步快照来避免阻塞管道,但请注意,这是一项新功能,目前默认情况下不会启用。要启用此功能,用户可以在实例化 MemoryStateBackend的构造函数中设置相应的布尔值 true,例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);MemoryStateBackend 的使用限制:
MemoryStateBackend 的构造函数中增加。akka frame大小。JobManager 内存中。MemoryStateBackend 适用场景:
record-at-a-time 函数的作业(Map,FlatMap,Filter,…)。 Kafka消费者只需要很少的状态。FsStateBackend 使用文件系统URL(类型,地址,路径)进行配置,如 hdfs://namenode:40010/flink/checkpoints 或 file:///data/flink/checkpoints。
FsStateBackend 将正在使用的数据保存在 TaskManager 的内存中。在进行检查点操作时,将状态快照写入配置的文件系统文件和目录中。较小的元数据存储在 JobManager 的内存中(或者在高可用性模式下,存储在元数据检查点中)。
FsStateBackend 默认使用异步快照,以避免在写入状态检查点时阻塞处理管道。如果要禁用此功能,用户可以在实例化 FsStateBackend 的构造函数中将对应的布尔值设置为 false,例如:
new FsStateBackend(path,false);FsStateBackend 适用场景:
RocksDBStateBackend 使用文件系统URL(类型,地址,路径)进行配置,例如 hdfs://namenode:40010/flink/checkpoints 或 file:///data/flink/checkpoints。
RocksDBStateBackend 将 正在使用的数据保存在 RocksDB 数据库中,其位于 TaskManager 数据目录下(默认情况下)。进行检查点操作时,整个 RocksDB 数据库进行检查点操作存储到配置的文件系统和目录中。较小的元数据存储在 JobManager 的内存中(或者在高可用性模式下,存储在元数据检查点中)。
RocksDBStateBackend 总是执行异步快照。
RocksDBStateBackend 使用限制:
RocksDB 的JNI桥接API基于 byte [],每个键和每个值支持的最大大小为 2^31 个字节。重要的是在 RocksDB 中使用合并操作的状态(例如ListState)可以累积超过2^31字节,然后在下一次检索时会失败。目前这是 RocksDB JNI 的限制。RocksDBStateBackend 适用场景:
请注意,你可以保存的状态数量仅受可用磁盘空间的限制。与保存状态到内存的 FsStateBackend 相比,这可以保存非常大的状态。但是,这也意味着在这个状态终端下可以达到的最大吞吐量将会降低。
RocksDBStateBackend 是目前唯一个提供增量检查点的终端(见这里)。
如果你不指定,默认的状态终端是 jobmanager。如果你希望为集群中的所有作业建立不同的默认值,可以在 flink-conf.yaml 中定义一个新的默认状态终端来完成。默认的状态终端可以被每个作业的配置覆盖,如下所示。
作业状态终端在作业的 StreamExecutionEnvironment 上设置,如下例所示:
Java版本:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));Scala版本:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))可以使用配置键 state.backend 在 flink-conf.yaml 配置文件中配置默认状态终端。
配置的值可以是 jobmanager(MemoryStateBackend),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend),或实现状态终端工厂 FsStateBackendFactory 类的全限定类名,例如 RocksDBStateBackend 的 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory。
如果默认状态终端设置为 filesystem,state.backend.fs.checkpointdir 定义了检查点数据存储目录。
配置文件中的示例部分可能如下所示:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints备注: Flink版本:1.4
原文:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html