前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 系列:Flink 入门不再难!3000字深入浅出 WordCount 实战及精解

Flink 系列:Flink 入门不再难!3000字深入浅出 WordCount 实战及精解

作者头像
create17
发布2024-04-15 17:02:45
4490
发布2024-04-15 17:02:45
举报
文章被收录于专栏:大数据实战演练

大家好,我是create17,见字如面。

在这个数据驱动的时代,掌握大数据技术成为了每一位开发者必不可少的技能。而在众多技术栈中,Flink无疑占据了重要的位置。作为一个高性能、可扩展的实时数据处理框架,Flink已经成为了很多企业和开发者的首选。但对于初学者来说,Flink的学习曲线可能会显得有些陡峭。因此,我们决定打造一系列通俗易懂的Flink学习文章,希望能帮助大家更快地掌握这一强大的技术。

那希望我接下来的分享给大家带来一些帮助和启发🤔

版本说明: Java:1.8 Flink:1.12.0

一、前言

Apache Flink 是一个流处理框架,它允许用户以高吞吐量和低延迟的方式处理实时数据流。Flink 提供了强大的流处理能力,能够处理有界(批处理)和无界(流处理)的数据流。通过 Flink,开发者可以轻松实现复杂的数据处理和分析应用。

今天和大家一起学习 Flink 入门级 demo:WordCount。WordCount 简单来讲就是单词计数,是一般大数据计算框架(Hadoop、Spark、Flink)的入门学习案例,相当于编程语言(Java、Python)中的 HelloWorld 案例,适合刚开始了解 Flink 作业提交流程的同学。

WordCount 程序编写好以后,我们可以本地运行测试,也可以打成 jar 包,使用命令提交 Job 运行。本篇文章,这两种方式我们都试一下。好了,准备好了吗?我们开始吧。

二、编写 WordCount 程序

1、创建 maven 工程

我们使用 Java 语言来编写 WordCount 程序。首先创建 maven 工程,可运行下述代码创建工程:

代码语言:javascript
复制
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.12.0 \
    -DgroupId=org.myorg.quickstart \
    -DartifactId=quickstart \
    -Dversion=0.1 \
    -Dpackage=org.myorg.quickstart \
    -DinteractiveMode=false
  • mvn archetype:generate:这是 Maven 命令的一部分,用于触发 Maven 架构生成的目标。这个命令告诉 Maven 你想要生成一个新的项目,基于指定的架构模板。
  • -DarchetypeGroupId=org.apache.flink:这个参数指定了架构的 group ID。对于 Apache Flink 的快速开始模板,其 group ID 是 org.apache.flink。Group ID 是 Maven 项目的一部分,用于唯一标识项目所属的组织或项目组。
  • -DarchetypeArtifactId=flink-quickstart-java:这个参数指定了架构的 artifact ID,即模板的具体名称。对于 Flink 的快速开始 java 项目,artifact ID 是 flink-quickstart-java。Artifact ID 用于唯一标识一个项目或模块。
  • -DarchetypeVersion=1.12.0:指定了架构的版本号。对于你提供的命令,使用的 Flink 架构版本是 1.12.0。需要注意的是,可能存在多个版本的架构,每个版本可能会有不同的特性或结构。
  • -DgroupId=org.myorg.quickstart:这是你的项目的 group ID。在 Maven 中,group ID 用于唯一标识你的项目所属的组织或项目组。这里,它被设置为 org.myorg.quickstart。
  • -DartifactId=quickstart:这是你的项目的 artifact ID,它是你项目的唯一标识。在这个例子中,它被设置为 quickstart。
  • -Dversion=0.1:这个参数指定了你的项目版本。在这里,项目版本被设置为 0.1。
  • -Dpackage=org.myorg.quickstart:这个参数用于指定你的项目包的基本名称。在 Java 中,包名用于组织和管理类。这里,包名被设置为 org.myorg.quickstart。
  • -DinteractiveMode=false:这个参数用于告诉 Maven 不要进入交互模式。当设置为 false 时,Maven 会使用命令行提供的参数来生成项目,而不会在过程中询问用户输入。

你可以编辑上面的 groupId, artifactId, package 成你喜欢的路径。使用上面的参数,Maven 将自动为你创建如下所示的项目结构:

代码语言:javascript
复制
$ tree quickstart
quickstart
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org.myorg.quickstart
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

我们的 pom.xml 文件已经包含了所需的 Flink 依赖,并且在 src/main/java 下有几个示例程序框架。接下来我们将开始编写第一个 Flink 程序。

pom.xml 文件的 flink 相关依赖有:

代码语言:javascript
复制
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
2、编写 Socket WordCount 程序

在本次示例中,我们使用 socket 来模拟实时数据流,然后统计指定周期内每个单词出现的频次。

代码语言:javascript
复制
import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


/**
 * @author create17
 * @date 2024/03/17
 */
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {
        // 创建Flink任务运行的环境,实时数据流
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // DataStream 是 Flink 中做流处理的核心 API
        // 使用换行符来分割从 socket 流中接收到的文本数据,每当它读取到一个换行符,就会将前面的文本作为一个单独的记录(字符串)
        DataStream<String> text = env.socketTextStream("x.x.x.x", 9002, "\n");
        DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 根据空格截取字符串
                for (String word : s.split("\\s")) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                // .sum(1)是一个转换操作,用于在一个keyed stream上进行聚合操作。这里的1是参数,表示在Tuple2<String, Integer>中要进行求和操作的字段索引,
                // 由于Tuple是从0开始索引的,0表示第一个字段(这里是单词),1表示第二个字段(这里是整数计数)。
                .sum(1);

        // 将结果打印到控制台,如果需要有序,需要设置 parallelism 为 1
        wordCounts.print().setParallelism(1);
        // 重要
        env.execute("Socket Window WordCount");

    }

}

我们现在来逐步分析上述代码:

2.1 设置环境

创建 StreamExecutionEnvironment:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这行代码初始化了 Flink 的流执行环境,它是所有 Flink 程序的起点,用于设置执行参数和创建数据源。

2.2 数据源定义

接收 Socket 文本流:

代码语言:javascript
复制
DataStream<String> text = env.socketTextStream("x.x.x.x", 9002, "\n");

这行代码定义了数据源,从指定 IP 地址 (x.x.x.x) 和端口 (9002) 接收文本流,以换行符 (\n) 作为记录的分隔符。这里的 IP 地址应替换为实际的源地址。

2.3 数据转换
  • 使用 flatMap 操作进行单词切分:
代码语言:javascript
复制
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : s.split("\\s")) {
            collector.collect(new Tuple2<>(word, 1));
        }
    }
})

这段代码将文本行切分成单词,并为每个单词生成一个 (单词, 1) 的元组。

  • 按单词进行分组:
代码语言:javascript
复制
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
    @Override
    public Object getKey(Tuple2<String, Integer> value) throws Exception {
        return value.f0;
    }
})

keyBy 根据元组的第一个字段(f0,即单词)进行分组。

2.4 定义窗口

应用滚动窗口:

代码语言:javascript
复制
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

这行代码定义了一个基于处理时间的滚动窗口,窗口大小为 5 秒。每个窗口独立计算过去 5 秒内的数据。

2.5 聚合操作

单词计数累加:

代码语言:javascript
复制
.sum(1)

在每个窗口内,对分组后的单词计数 (1 表示元组的第二个字段) 进行求和。

2.6 输出结果

打印结果并设置并行度:

代码语言:javascript
复制
wordCounts.print().setParallelism(1);

这行代码将计算结果输出到控制台,并将并行度设置为 1,以确保输出的顺序性。

2.7 启动 Flink 程序

执行 Flink 任务:

代码语言:javascript
复制
env.execute("Socket Window WordCount");

最后一行代码启动 Flink 流处理作业。execute 方法触发程序执行,"Socket Window WordCount" 是作业的名称。

3、本地启动 WordCount 程序

3.1、首先使用 nc -l 开启监听 9002 端口:

nc 命令说明:

3.2、然后执行 SocketWindowWordCount 类的 main() 方法,本地启动。

运行报错,提示: java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction

解决办法:

推荐解决方法2,idea 启动类配置 include depencies with "Provided" scope,配置如下图所示:

启动后,需要在 nc 监听的那里,输入文本:

每当程序读取到一个换行符(\n),就会将前面的文本作为一个单独的记录(字符串),然后将这单独记录根据空格切分统计单词数量。

输出如下图所示:

4、Flink 运行 jar 包启动

4.1 部署源码

在这里我将 flink-1.12.0 源码包放到了 Linux 虚机上,配置好 Java 环境,然后配置 Flink 环境变量。编辑 /etc/profile,填写以下内容,并保存。

代码语言:javascript
复制
# Flink HOME
FLINK_HOME=/opt/flink-1.12.0
export PATH=$FLINK_HOME/bin:$PATH

然后再执行 source /etc/profile 即可。最后执行 flink --version 验证效果。

4.2 运行 WordCount 程序

将打的 jar 包,放到 Linux 虚机上,然后运行命令:

代码语言:javascript
复制
# 因为配置了flink的环境变量,所以在任意目录下都可以执行flink命令
flink run -c org.myorg.quickstart.SocketWindowWordCount /tmp/quickstart-0.1.jar
  • run:这个子命令用来提交和启动一个 Flink 作业。
  • -c:指定包名和类名,这个类是你要运行的 Flink 作业的逻辑。
  • /tmp/quickstart-0.1.jar:这是包含你 Flink 应用程序的 JAR 文件的路径。这个 JAR 文件应该包含了上述类及其依赖项。

提交成功后,我们可以访问 Flink Web UI,查看任务运行日志:

在 nc -l 9002 的命令窗口,造些数据,如下图所示:

查看 flink WordCount 程序输出日志: 因为程序里设置的 wordCounts.print(),是控制台输出,所以我们的统计结果在 Stdout 里面:

5、将统计结果打印到文件中

上面我们是将统计结果打印到控制台,现在我们将统计结果打印到文件中。 自动生成的这个 maven 项目,好像缺少了 slf4j-api 依赖,添加如下:

代码语言:javascript
复制
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-api</artifactId>
   <version>1.7.30</version> <!-- Make sure to use the correct version -->
</dependency>

然后自定义 Sink 端,继承 SinkFunction:

代码语言:javascript
复制
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class WordCountSink implements SinkFunction<Tuple2<String, Integer>> {

    private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        logger.info("Word: {}, windows Count: {}", value.f0, value.f1);
    }
}

然后返回到 SocketWindowWordCount.java 中,将 wordCounts.print().setParallelism(1); 注释掉,添加 wordCounts.addSink(new WordCountSink()).name("WordCount log Sink");就 OK 了。

提交 jar 包,运行如下:

三、总结

本文主要介绍了 Apache Flink 这一流处理框架的基本使用,以及如何通过实现 WordCount 程序来学习 Flink 的基本编程模型。本文从创建 Maven 工程开始,详细介绍了如何编写、本地启动以及通过jar包运行 WordCount 程序,包括环境设置、数据源定义、数据转换、定义窗口、聚合操作和输出结果等关键步骤。

此外,还提到了如何将统计结果输出到文件中,以及解决运行中可能遇到的问题。

文档通过逐步分析代码和执行过程,帮助读者理解 Flink 程序的开发和运行流程,适合刚开始了解 Flink 作业提交流程的同学。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-04-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据实战演练 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、编写 WordCount 程序
    • 1、创建 maven 工程
      • 2、编写 Socket WordCount 程序
        • 2.1 设置环境
          • 2.2 数据源定义
            • 2.3 数据转换
              • 2.4 定义窗口
                • 2.5 聚合操作
                  • 2.6 输出结果
                    • 2.7 启动 Flink 程序
                    • 3、本地启动 WordCount 程序
                      • 3.1、首先使用 nc -l 开启监听 9002 端口:
                        • 3.2、然后执行 SocketWindowWordCount 类的 main() 方法,本地启动。
                        • 4、Flink 运行 jar 包启动
                          • 4.1 部署源码
                            • 4.2 运行 WordCount 程序
                              • 5、将统计结果打印到文件中
                              • 三、总结
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档