前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink优化器与源码解析系列--算子Chain策略优化

Flink优化器与源码解析系列--算子Chain策略优化

作者头像
用户7600169
发布2022-04-25 15:52:03
1.2K0
发布2022-04-25 15:52:03
举报
文章被收录于专栏:BigDataplusBigDataplus

背景

Flink 任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为算子链Chain。Chain后的节点,总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效的减少网络传输,降低成本。但如一个任务DAG过大,需根据实时情况对算子链Chain进行拆解操作。接下来对算子链三种策略进行说明、策略对应的使用方法、哪些算子可进行操作和在何处应用并举例讲解。

算子链Operators Chain优化

ChainingStrategy算子链策略

ChainingStrategy 是用来定义算子链接的策略。当一个算子和上游算子链接在一起,这意味着它们会运行在同一个线程。它们会合并为一个有多个运行步骤的算子。Operators Chain 是通过ChainingStrategy 算子链路策略枚举类设置的。源码如下:

代码语言:javascript
复制
public enum ChainingStrategy {
   ALWAYS,
   NEVER,
   HEAD
}

ChainingStrategy支持以下三种方式:

  • ALWAYS 算子会尽可能的链接在一起。为了优化性能,通常需要让算子尽可能链接在一起,同时增加并发度。
  • NEVER 算子将不会和上下游的算子链接在一起。
  • HEAD 算子将不会和上游的算子链接在一起,但是会和下游的算子链接在一起。

在Flink程序中,各算子Operators,如Filter、FlatMap、Map、Project、Sink、Source和Window等都是打开的即取值ALWAYS算子会尽可能的链接在一起。也可通过执行环境变量进行全局关闭,但不一般建议这样,如:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();//设置为全局不可用

ChainingStrategy方式应用

算子链策略默认是全局开启即取值ALWAYS,但也可在单算子或多算子上应用其开启、关闭或局部开启。是通过两种方法disableChaining和startNewChain来设置的,singleOutputOperator的两种方法实现(其他算子也是如此)源码如下:

代码语言:javascript
复制
@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
   return setChainingStrategy(ChainingStrategy.NEVER);
}
@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
   return setChainingStrategy(ChainingStrategy.HEAD);
}

举例说明:

定义一个有限流dataStream,此流上应用三个map简单操作,从应用了startNewChain方法开始后两个map运行在同一线程。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //env.disableOperatorChaining();//设置为全局不可用
    env.setParallelism(4);
    DataStream<String> dataStream = env.fromCollection(Arrays.asList("A","B","C","D","E","F","G","H","I","J","K","L","M","N"));
    DataStream<Tuple2<String,Integer>> dataStreamKV = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String value) throws Exception {
            return Tuple2.of(value,1);
        }
    }).startNewChain().map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
            return Tuple2.of(value.f0,value.f1*100);
        }
    }).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
            return Tuple2.of(value.f0,value.f1-10);
        }
    });
    dataStreamKV.print();
    env.execute();
}

disableChaining对应的是ChainingStrategy.NEVER 算子将不会和上下游的算子链接在一起。方法应用同样如此,笔者不再赘述。

其他优化

在Flink程序运行过程中,并行度取决于每个TaskManager上的slot数量而决定的。slot插槽是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。slot插槽可共享相同的JVM资源,同时对Flink提供维护的心跳等信息。详细可以参考Flink优化器与源码解析系列--内存模型详解里面slot与source资源相关内容。

总结

Operator Chain策略也是程序优化的一种方式。在Flink程序中,是全局默认开启的。多节点合成一个节点可以有效的减少网络传输,降低成本。实时情况对算子链Chain进行拆解操作,灵活运用。也需要配置其他参数进行优化如并行度等等。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 BigDataplus 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档