大家好,我是create17,见字如面。
在这个数据驱动的时代,掌握大数据技术成为了每一位开发者必不可少的技能。而在众多技术栈中,Flink无疑占据了重要的位置。作为一个高性能、可扩展的实时数据处理框架,Flink已经成为了很多企业和开发者的首选。但对于初学者来说,Flink的学习曲线可能会显得有些陡峭。因此,我们决定打造一系列通俗易懂的Flink学习文章,希望能帮助大家更快地掌握这一强大的技术。
那希望我接下来的分享给大家带来一些帮助和启发🤔
版本说明: Java:1.8 Flink:1.12.0
Apache Flink 是一个流处理框架,它允许用户以高吞吐量和低延迟的方式处理实时数据流。Flink 提供了强大的流处理能力,能够处理有界(批处理)和无界(流处理)的数据流。通过 Flink,开发者可以轻松实现复杂的数据处理和分析应用。
今天和大家一起学习 Flink 入门级 demo:WordCount。WordCount 简单来讲就是单词计数,是一般大数据计算框架(Hadoop、Spark、Flink)的入门学习案例,相当于编程语言(Java、Python)中的 HelloWorld 案例,适合刚开始了解 Flink 作业提交流程的同学。
WordCount 程序编写好以后,我们可以本地运行测试,也可以打成 jar 包,使用命令提交 Job 运行。本篇文章,这两种方式我们都试一下。好了,准备好了吗?我们开始吧。
我们使用 Java 语言来编写 WordCount 程序。首先创建 maven 工程,可运行下述代码创建工程:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.12.0 \
-DgroupId=org.myorg.quickstart \
-DartifactId=quickstart \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false
你可以编辑上面的 groupId, artifactId, package 成你喜欢的路径。使用上面的参数,Maven 将自动为你创建如下所示的项目结构:
$ tree quickstart
quickstart
├── pom.xml
└── src
└── main
├── java
│ └── org.myorg.quickstart
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
我们的 pom.xml 文件已经包含了所需的 Flink 依赖,并且在 src/main/java 下有几个示例程序框架。接下来我们将开始编写第一个 Flink 程序。
pom.xml 文件的 flink 相关依赖有:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
在本次示例中,我们使用 socket 来模拟实时数据流,然后统计指定周期内每个单词出现的频次。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @author create17
* @date 2024/03/17
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 创建Flink任务运行的环境,实时数据流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream 是 Flink 中做流处理的核心 API
// 使用换行符来分割从 socket 流中接收到的文本数据,每当它读取到一个换行符,就会将前面的文本作为一个单独的记录(字符串)
DataStream<String> text = env.socketTextStream("x.x.x.x", 9002, "\n");
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
// 根据空格截取字符串
for (String word : s.split("\\s")) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .sum(1)是一个转换操作,用于在一个keyed stream上进行聚合操作。这里的1是参数,表示在Tuple2<String, Integer>中要进行求和操作的字段索引,
// 由于Tuple是从0开始索引的,0表示第一个字段(这里是单词),1表示第二个字段(这里是整数计数)。
.sum(1);
// 将结果打印到控制台,如果需要有序,需要设置 parallelism 为 1
wordCounts.print().setParallelism(1);
// 重要
env.execute("Socket Window WordCount");
}
}
我们现在来逐步分析上述代码:
创建 StreamExecutionEnvironment:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这行代码初始化了 Flink 的流执行环境,它是所有 Flink 程序的起点,用于设置执行参数和创建数据源。
接收 Socket 文本流:
DataStream<String> text = env.socketTextStream("x.x.x.x", 9002, "\n");
这行代码定义了数据源,从指定 IP 地址 (x.x.x.x) 和端口 (9002) 接收文本流,以换行符 (\n) 作为记录的分隔符。这里的 IP 地址应替换为实际的源地址。
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : s.split("\\s")) {
collector.collect(new Tuple2<>(word, 1));
}
}
})
这段代码将文本行切分成单词,并为每个单词生成一个 (单词, 1) 的元组。
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
keyBy 根据元组的第一个字段(f0,即单词)进行分组。
应用滚动窗口:
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
这行代码定义了一个基于处理时间的滚动窗口,窗口大小为 5 秒。每个窗口独立计算过去 5 秒内的数据。
单词计数累加:
.sum(1)
在每个窗口内,对分组后的单词计数 (1 表示元组的第二个字段) 进行求和。
打印结果并设置并行度:
wordCounts.print().setParallelism(1);
这行代码将计算结果输出到控制台,并将并行度设置为 1,以确保输出的顺序性。
执行 Flink 任务:
env.execute("Socket Window WordCount");
最后一行代码启动 Flink 流处理作业。execute 方法触发程序执行,"Socket Window WordCount" 是作业的名称。
nc 命令说明:
运行报错,提示: java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
解决办法:
推荐解决方法2,idea 启动类配置 include depencies with "Provided" scope,配置如下图所示:
启动后,需要在 nc 监听的那里,输入文本:
每当程序读取到一个换行符(\n),就会将前面的文本作为一个单独的记录(字符串),然后将这单独记录根据空格切分统计单词数量。
输出如下图所示:
在这里我将 flink-1.12.0 源码包放到了 Linux 虚机上,配置好 Java 环境,然后配置 Flink 环境变量。编辑 /etc/profile,填写以下内容,并保存。
# Flink HOME
FLINK_HOME=/opt/flink-1.12.0
export PATH=$FLINK_HOME/bin:$PATH
然后再执行 source /etc/profile 即可。最后执行 flink --version 验证效果。
将打的 jar 包,放到 Linux 虚机上,然后运行命令:
# 因为配置了flink的环境变量,所以在任意目录下都可以执行flink命令
flink run -c org.myorg.quickstart.SocketWindowWordCount /tmp/quickstart-0.1.jar
提交成功后,我们可以访问 Flink Web UI,查看任务运行日志:
在 nc -l 9002 的命令窗口,造些数据,如下图所示:
查看 flink WordCount 程序输出日志: 因为程序里设置的 wordCounts.print(),是控制台输出,所以我们的统计结果在 Stdout 里面:
上面我们是将统计结果打印到控制台,现在我们将统计结果打印到文件中。 自动生成的这个 maven 项目,好像缺少了 slf4j-api 依赖,添加如下:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version> <!-- Make sure to use the correct version -->
</dependency>
然后自定义 Sink 端,继承 SinkFunction:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WordCountSink implements SinkFunction<Tuple2<String, Integer>> {
private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
logger.info("Word: {}, windows Count: {}", value.f0, value.f1);
}
}
然后返回到 SocketWindowWordCount.java 中,将 wordCounts.print().setParallelism(1); 注释掉,添加 wordCounts.addSink(new WordCountSink()).name("WordCount log Sink");就 OK 了。
提交 jar 包,运行如下:
本文主要介绍了 Apache Flink 这一流处理框架的基本使用,以及如何通过实现 WordCount 程序来学习 Flink 的基本编程模型。本文从创建 Maven 工程开始,详细介绍了如何编写、本地启动以及通过jar包运行 WordCount 程序,包括环境设置、数据源定义、数据转换、定义窗口、聚合操作和输出结果等关键步骤。
此外,还提到了如何将统计结果输出到文件中,以及解决运行中可能遇到的问题。
文档通过逐步分析代码和执行过程,帮助读者理解 Flink 程序的开发和运行流程,适合刚开始了解 Flink 作业提交流程的同学。