摘要本文介绍一下Flink一些基本概念并行度、slot及对应的组件
开发flink应用我们需要引入对应的maven依赖 flink-java、flink-streaming-java,以及 flink-clients(客户端,也可以省略)
<!-- 引入 Flink 相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
在属性中,我们定义了,这指代的是所依赖的 Scala 版本。这有一点 奇怪:Flink 底层是 Java,而且我们也只用 Java API,为什么还会依赖 Scala 呢?这是因为 Flink 的架构中使用了 Akka 来实现底层的分布式通信,而 Akka 是用 Scala 开发的。
执行环境 -> source -> 算子逻辑 -> sink
1、创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
flink在1.12版本之前的流处理和批处理提供了两套api,从1.12官方推荐使用DataStream API 然后在提交任务 指定是流处理还是批处理
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
3> (world,1)
2> (hello,1)
4> (flink,1)
2> (hello,2)
2> (hello,3)
1> (java,1)
前面的数据是指本地执行的不同线程,所以是乱序的,代表1~4代表了并行线程是4,并行度4,本地环境默认并行度是运行电脑的cpu个数
Flink组件 client(客户端) jobManager(作业管理器,相当master) taskManager(任务管理器,工作者,相当于worker)
jobmanager包含3三个组件 1、jobMaster:处理单独的job,和具体的job一一对应 2、resourceManager注意:这是Flink内置的资源管理器要跟跟其他平台的区分开 3、分发器:提供一个rest接口用来提交应用,并为每个新提交的作业启动一个新的jobmaster
“资源”,主要是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。 slot是最小的调度单位,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 的数量限制了 TaskManager 能够并行处理的任务数量。
作业提交流程步骤: 1、客户端将程序通过分发器提供的rest接口,提交到jobmanager 2、分发器启动jobmaster,并将作业提交给jobmaster 3、jobmaster将jobGraph解析成可执行的executionGraph,得到所需的资源数量即slot的个数,然后向资源管理器请求资源 4、资源管理器判断当前是否有足够的资源,没有就启动新的taskManager 5、taskManager启动后向资源管理器注册自己的任务槽 6、资源管理器通知taskManager为新的作业提供slots 7、TaskManager 连接到对应的 JobMaster,提供 slots。 8、JobMaster 将需要执行的任务分发给 TaskManager。 9、TaskManager 执行任务,互相之间可以交换数据。
算子任务 source就是一个算子任务,sink也是,sum,map等都是
算子子任务 在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask), 这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。
同一个算子子任务只能在不同的slot执行,不同算子的任务可以共享任务槽
所以我们要算这个作业需要多少slot,只需要找到算子任务最大的并行度,即算子子任务的个数
算子链 一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通 (forwarding)模式入map、filter、flatMap 等算子都是这种 one-to-one,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。 并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task) 可以合并起来形成算子链一起共享一个slot 为什么这样设计?可以减少线程之间的切换,和基于缓存器的数据交换 ,减少延时,提高吞吐量
槽位slot
任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。 slot是最小的调度单位,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 的数量限制了 TaskManager 能够并行处理的任务数量。
设置一个taskManager的slot数量 : taskmanager.numberOfTaskSlots: 8Slot 和并行度确实都跟程序的并行执行有关,但两者是完全不同的概念。 简单来说,taskslot 是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力 , 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置; 而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。 换句话说,并行度如果小于等于集群中可用 slot 的总数,程序是可以正常执行的,因为 slot 不一定要全部占用,有十分力气可以只用八分; 而如果并行度大于可用 slot 总数,导致超出了并行能力上限,那么心有余力不足,程序就只好等待资源管理器分配更多的资源了。
并行度 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。
1、代码中设置,算子后面跟上并行度设置,优先级最高 stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); 2、代码中设置,执行环境设置,这样所有的算子并行度都一样,优先级中 env.setParallelism(2); 3、如果代码中没设置,可以在提交作业的时候使用“-p”参数来设置,优先级低于代码设置,高于配置文件 3、配置文件设置,优先级最低 parallelism.default: 2
package _1wordcount;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 批处理 单词统计
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//创建执行环境 这里是批处理的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//流处理执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment
// .getExecutionEnvironment();
DataSource<String> dataSource = env
.readTextFile("D:\\java\\code\\flink\\flinkdemo\\src\\main\\resources\\_1wordcount.txt");
//map 做数据转换,输入1个返回1个,就是做类型转换
//flatMap 打散,平坦,输入1个,可以返回0个、1个、N个,(如下面按空格分隔,返回多个单词)
//keyby用于流处理,groupBy用在批处理
//这里返回的是一个元祖是因为groupBy只能返回元祖,不然会报错
DataSet<Tuple2<String, Long>> dataSet = dataSource
.flatMap(new WordCountFlatMap()).groupBy(0).sum(1);
dataSet.print();
//输出结果
/*(flink,1)
(world,1)
(hello,3)
(java,1)*/
}
public static class WordCountFlatMap implements FlatMapFunction<String, Tuple2<String, Long>> {
//input是输入的字符串
//collector 用来输出
public void flatMap(String input, Collector<Tuple2<String, Long>> collector) throws Exception {
if (StringUtils.isNotBlank(input)) {
for (String word : input.split(" ")) {
if (word.length() < 1) {
continue;
}
collector.collect(new Tuple2<>(word, 1L));
}
}
}
}
}
package _1wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 流处理单词统计
*/
public class WordCountStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
//使用nc测试
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 10008);
streamSource.setParallelism(1);//设置并行度
/*DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作。
先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)*/
DataStream<WordWithCount> dataStream = streamSource
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String input, Collector<WordWithCount> collector) throws Exception {
for (String word : input.split(" ")) {
if (word.length() < 1) {
continue;
}
WordWithCount wordWithCount = new WordWithCount();
wordWithCount.setCount(1L);
wordWithCount.setWord(word);
collector.collect(wordWithCount);
}
}
}).keyBy("word").window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count");
dataStream.print();
//DataStream流式应用需要显示指定execute()方法运行程序,如果不调用则Flink流式程序不会执行
//对于DataSet API输出算子中已经包含了对execute()方法的调用,不需要显式调用execute()方法,否则程序会出异常。
env.execute("streaming word count");
/*3> WordWithCount{word='java', count=1}
5> WordWithCount{word='hello', count=1}
13> WordWithCount{word='flink', count=1}
5> WordWithCount{word='hello', count=1}
5> WordWithCount{word='hello', count=1}
13> WordWithCount{word='flink', count=1}
5> WordWithCount{word='hello', count=2}
13> WordWithCount{word='flink', count=2}
5> WordWithCount{word='hello', count=1}
13> WordWithCount{word='flink', count=1}
3> WordWithCount{word='java', count=1}
4> WordWithCount{word='f', count=1}
5> WordWithCount{word='hello', count=2}*/
}
public static class WordWithCount {
private String word;
private Long count;
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
}
}