
FileSink 是 Flink 中的 Sink 函数,用于将处理后的数据输出到文件系统。它能够处理实时数据流,并提供灵活的配置选项,允许用户定义输出文件的格式、路径和写入策略。
FileSink 是 Apache Flink 中的一种 Sink 函数,用于将流处理的结果数据输出到文件系统。其原理涉及到 Flink 的数据流处理模型以及文件系统的操作。
总的来说,FileSink 的原理包括了对数据流的缓冲和批处理、数据分桶、写入策略配置、事务支持、故障恢复和文件系统操作等多个方面,通过这些机制的组合,可以实现高效可靠地将数据写入到文件系统中。
在Apache Flink中,FileSink是一种用于将数据写入文件的输出操作符。FileSink提供了一些滚动策略(Rolling Policy)的配置选项,这些策略用于控制如何滚动输出文件。滚动策略决定了何时创建新文件、如何确定文件名称以及何时关闭旧文件。以下是Flink中常见的滚动策略及其用途:
按时间滚动(Rolling Policy by Time): 这种策略会根据时间间隔滚动文件,通常是根据处理的时间戳或者是固定的时间窗口。
// 例如,每小时滚动一次的设置
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
.builder()
.withRolloverInterval(TimeUnit.HOURS.toMillis(1))
.build();按大小滚动(Rolling Policy by Size): 这种策略会根据文件大小滚动文件,通常用于限制每个文件的最大大小。
// 例如,每个文件大小限制为100 MB的设置
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024 * 1024 * 100) // 100 MB
.build();按条件滚动(Rolling Policy by Condition): 这种策略根据特定条件滚动文件,比如当满足一定的事件数量或者是自定义的条件时滚动文件。
// 例如,每1000个事件滚动一次的设置
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024 * 1024 * 100) // 100 MB
.withMaxPartSize(1000) // 每1000个事件滚动一次
.build();按快照滚动(Rolling Policy by checkPoint):主要是针对forBulkFormat列模式的滚动策略,比较特殊
CheckpointRollingPolicy<String,String> rollingPolicy = new CheckpointRollingPolicy<>() {
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String s) throws IOException {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long l) throws IOException {
return false;
}
};这些滚动策略可以根据具体的需求进行组合和定制。例如,你可以同时设置按时间和按大小滚动文件,以确保输出文件既满足时间要求又不会超过一定的大小限制。FileSink的滚动策略提供了灵活的配置选项,以满足各种输出场景的需求。
在Flink中,FileSink使用Bucket桶的概念来组织和管理文件, 所谓桶,即数据应该去往哪个文件夹 。
按照时间分桶(Time-based Formatting): Bucket可以按照时间进行格式化,这在基于时间的窗口操作中很有用。
// 按照每小时一个Bucket进行分桶
BucketAssigner<T, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");按照其他条件分桶(Custom Formatting): 用户也可以自定义Bucket的格式化方式,根据特定的条件来组织Bucket。
// 自定义数据格式化
BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
@Override
public String getBucketId(String element, Context context) {
// 获取当前时间戳(以秒为单位)
long timestamp = System.currentTimeMillis() / 1000;
// 将时间戳转换为 LocalDateTime 对象
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// 定义日期时间格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// 格式化日期时间对象为指定格式的字符串
return formatter.format(dateTime);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};Flink的FileSink还支持在写入文件时对数据进行压缩,以减少存储空间和提高传输效率。
**forRowFormat 行模式下:**自定义内容限定于文件内部,想对文件进行压缩等操作,则很难办到; forBulkFormat 列模式下: **不仅可以对文件内部操作,也可以轻松做到对文件压缩等操作;
通过这些格式化和压缩的配置选项,FileSink提供了灵活性,使用户能够根据具体需求选择适当的方式来组织输出文件并控制文件大小。
不压缩(No Compression): 默认情况下,FileSink是不压缩数据的
// 不进行压缩
FileSink<String> sink = FileSink
.forRowFormat(outputPath, new SimpleStringEncoder<T>("UTF-8"))
.withBucketAssigner(customBucketAssigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig)
.build();Gzip 压缩: FileSink支持使用Gzip算法对输出文件进行压缩。
package com.aurora.demo.FileSink;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import java.io.IOException;
/**
* 描述:自定义列模式的文件压缩算法
*
* @author 浅夏的猫
* @version 1.0.0
* @date 2024-02-08 01:20:31
*/
public class CustomBulkWriterFactory implements BulkWriter.Factory<String> {
@Override
public BulkWriter<String> create(FSDataOutputStream out) throws IOException {
GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(out);
return new BulkWriter<>() {
@Override
public void addElement(String element) throws IOException {
gzipOutput.write(element.getBytes());
}
@Override
public void flush() throws IOException {
gzipOutput.flush();
}
@Override
public void finish() throws IOException {
gzipOutput.close();
}
};
}
}其他压缩算法: 除了Gzip,FileSink还支持其他压缩算法,直接看源码包,也可以得到,例如Snappy算法等,可以根据需求选择合适的压缩算法。
commons-compress-1.21.jar
META-INF
org
apache
commons
compress
archivers
changes
compressors
brotli
bzip2
deflate
deflate64
gzip
lz4
lz77support
lzma
lzw
pack200
snappy
xz
z
zstandard
CompressorException
CompressorInputStream
CompressorOutputStream
CompressorStreamFactory
CompressorStreamProvider
FileNameUtil
harmony
java
parallel
utils
MemoryLimitException
PasswordRequiredExceptionFinished 状态与 In-progress 状态的文件只能通过命名来区分
Flink 允许用户给 Part 文件名添加一个前缀和/或后缀。 使用 OutputFileConfig来完成上述功能。 例如,Sink 将在创建文件的文件名上添加前缀 “prefix” 和后缀 “.ext”,如下所示:
└── 2019-08-25--12
├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
└── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11代码实现如图
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();从 1.15 版本开始 FileSink 开始支持已经提交 pending 文件的合并,从而允许应用设置一个较小的时间周期并且避免生成大量的小文件。 尤其是当用户使用 bulk 格式 的时候: 这种格式要求用户必须在 checkpoint 的时候切换文件。
enableCompactionOnCheckpoint:表示相隔几个checkpoint做一次合并,默认1
setNumCompactThreads: 合并文件使用多少个线程,默认1
FileCompactor:合并算法
(1) IdenticalFileCompactor:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件;
(2) ConcatFileCompactor:可以自定义两个文件直接的分割符,由构造方法传入。
(3) RecordWiseFileCompactor:自定义内容比较多
FileSink<String> fileSink = FileSink
//指定文件目录与文件写入编码格式
.forRowFormat(path, new SimpleStringEncoder<String>("UTF-8"))
//设置合并策略,
.enableCompact(FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build(),new ConcatFileCompactor()).build();这一功能开启后,在文件转为 pending 状态与文件最终提交之间会进行文件合并。这些 pending 状态的文件将首先被提交为一个以 . 开头的 临时文件。这些文件随后将会按照用户指定的策略和合并方式进行合并并生成合并后的 pending 状态的文件。 然后这些文件将被发送给 Committer 并提交为正式文件,在这之后,原始的临时文件也会被删除掉。
当开启文件合并功能时,用户需要指定 FileCompactStrategy 与 FileCompactor 。
FileCompactStrategy 指定何时以及哪些文件将被合并。 目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 次数已经达到指定次数时, FileSink 将创建一个异步任务来合并当前缓存的文件。
FileCompactor 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中。 根据如何写文件,它可以分为两类:
CompactingFileWriter 的一个例子是 ConcatFileCompactor ,它直接将给定的文件进行合并并将结果写到输出流中。CompactingFileWriter 会逐条读出输入文件的记录用户,然后和FileWriter一样写入输出文件中。CompactingFileWriter 的一个例子是 RecordWiseFileCompactor ,它从给定的文件中读出记录并写出到 CompactingFileWriter 中。用户需要指定如何从原始文件中读出记录。注意事项1 一旦启用了文件合并功能,此后若需要再关闭,必须在构建FileSink时显式调用disableCompact方法。
注意事项2 如果启用了文件合并功能,文件可见的时间会被延长。
在分布式环境下,节点故障是常见的情况。FileSink 具备故障恢复机制,能够在节点故障后重新启动并继续写入数据,保证数据的完整性和可靠性。以下是一些关键的机制:
Apache Flink的FileSin(例如BucketingSink)主要用于将流处理应用程序的结果写入分布式文件系统。以下是FileSink实际应用场景的一些例子:
FileSink是一个常见的选择。例如,你可以将某个时间窗口内的聚合结果写入文件,以便进行后续的分析或离线处理。FileSink可以将这些结果以可查询的格式写入文件系统。这使得报表或分析结果对于离线查询、共享和长期存储变得更加方便。FileSink可以作为一种通用的输出端,将处理过的数据以文件形式输出。这使得不同系统之间的数据交换更加灵活,因为文件是一种通用的数据交换格式。在这些场景中,FileSink的配置选项(如文件路径、格式化选项、分区策略等)可以根据具体的需求进行调整,以满足不同应用的要求。请注意,实际的应用场景可能需要根据具体的业务需求和数据特性进行定制。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsy</groupId>
<artifactId>aurora_flink_connector_file</artifactId>
<version>1.0-SNAPSHOT</version>
<!--属性设置-->
<properties>
<!--java_JDK版本-->
<java.version>11</java.version>
<!--maven打包插件-->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--编译编码UTF-8-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--输出报告编码UTF-8-->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<!--通用依赖-->
<dependencies>
<!-- flink读取Text File文件依赖 start-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.18.0</version>
</dependency>
<!-- flink读取Text File文件依赖 end-->
<!-- flink基础依赖 start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<!-- flink基础依赖 end -->
</dependencies>
<!--编译打包-->
<build>
<finalName>${project.name}</finalName>
<!--资源文件打包-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.aurora.KafkaStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--插件统一管理-->
<pluginManagement>
<plugins>
<!--maven打包插件-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--编译打包插件-->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.*;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.UUID;
/**
* 描述:flink集成FileSink,forRowFormat行模式
*
* @author 浅夏的猫
* @version 1.0.0
* @date 2024-02-07 16:11:50
*/
public class FileRowSinkStreaming {
public static void main(String[] args) throws Exception {
//=============1.分桶策略=========================================
// 自定义分桶策略
BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
@Override
public String getBucketId(String element, Context context) {
// 获取当前时间戳(以秒为单位)
long timestamp = System.currentTimeMillis() / 1000;
// 将时间戳转换为 LocalDateTime 对象
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// 定义日期时间格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// 格式化日期时间对象为指定格式的字符串
return formatter.format(dateTime);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
// 默认基于时间的窗口分桶策略
// BucketAssigner<String, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");
//=============2.滚动策略===============================================
DefaultRollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder()
//15分钟则会滚动
.withRolloverInterval(Duration.ofMinutes(15))
//从没接收延时5分钟之外的新纪录则滚动
.withInactivityInterval(Duration.ofMinutes(5))
//文件大小已经达到 1GB 则滚动
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build();
//================3.文件命名策略==================================
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();
//================4.合并策略======================================
FileCompactStrategy fileCompactStrategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build();
//合并算法,3种
//第1种:可以自定义两个文件直接的分割符,由构造方法传入
ConcatFileCompactor fileCompactor = new ConcatFileCompactor();
//第2种:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件;
// IdenticalFileCompactor fileCompactor = new IdenticalFileCompactor();
//第3种:自定义内容比较多
// RecordWiseFileCompactor.Reader.Factory<String> stringFactory = new RecordWiseFileCompactor.Reader.Factory<>() {
// @Override
// public RecordWiseFileCompactor.Reader<String> createFor(Path path) throws IOException {
// //需自定义
// return null;
// }
// };
// RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor(stringFactory);
// 创建FileSink
FileSink<String> fileSink = FileSink
//指定文件目录与文件写入编码格式
.forRowFormat(new Path("D:\\flink"), new SimpleStringEncoder<String>("UTF-8"))
//设置合并策略
.enableCompact(fileCompactStrategy, fileCompactor)
//分桶策略,不设置采用默认的分桶策略DateTimeBucketAssigner,基于时间的分配器,每小时产生一个桶,即yyyy-mm-dd-hh
.withBucketAssigner(customBucketAssigner)
//指定文件前后缀输出
.withOutputFileConfig(outputFileConfig)
//默认滚动策略,每隔多久把临时文件合并一次
// .withBucketCheckInterval(1000)
//自定义滚动策略,三个条件满足任意一个都会滚动
.withRollingPolicy(rollingPolicy).build();
// 创建 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建简单批模式数据源
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("运维测试", "运维开发", "追风的少年"));
// 把数据源的全部数据写入到文件,注意一旦开启文件合并,则必须设置uid,否则直接启动报错
dataStreamSource.sinkTo(fileSink).uid("1");
// 执行任务
env.execute("FileRowSinkStreaming");
}
}import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import java.io.IOException;
/**
* 描述:自定义列模式的文件压缩算法
*
* @author 浅夏的猫
* @version 1.0.0
* @date 2024-02-08 01:20:31
*/
public class CustomBulkWriterFactory implements BulkWriter.Factory<String> {
@Override
public BulkWriter<String> create(FSDataOutputStream out) throws IOException {
GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(out);
return new BulkWriter<>() {
@Override
public void addElement(String element) throws IOException {
gzipOutput.write(element.getBytes());
}
@Override
public void flush() throws IOException {
gzipOutput.flush();
}
//注意:此方法不能关闭Factory传入的流,这是框架完成的事!!!否则程序直接俄报错
@Override
public void finish() throws IOException {
// gzipOutput.close();
}
};
}
}import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.ConcatFileCompactor;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
/**
* 描述:flink集成FileSink,forBulkFormat列模式
*
* @author 浅夏的猫
* @version 1.0.0
* @date 2024-02-07 16:11:50
*/
public class FileBulkSinkStreaming {
public static void main(String[] args) throws Exception {
//=============1.分桶策略=========================================
// 自定义分桶策略
BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
@Override
public String getBucketId(String element, Context context) {
// 获取当前时间戳(以秒为单位)
long timestamp = System.currentTimeMillis() / 1000;
// 将时间戳转换为 LocalDateTime 对象
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// 定义日期时间格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// 格式化日期时间对象为指定格式的字符串
return formatter.format(dateTime);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
// 默认基于时间的窗口分桶策略
// BucketAssigner<String, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");
//=============2.滚动策略===============================================
CheckpointRollingPolicy<String,String> rollingPolicy = new CheckpointRollingPolicy<>() {
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String s) throws IOException {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long l) throws IOException {
return false;
}
};
//================3.文件命名策略==================================
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();
//================4.合并策略======================================
FileCompactStrategy fileCompactStrategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build();
//合并算法,3种
//第1种:可以自定义两个文件直接的分割符,由构造方法传入
ConcatFileCompactor fileCompactor = new ConcatFileCompactor();
//第2种:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件;
// IdenticalFileCompactor fileCompactor = new IdenticalFileCompactor();
//第3种:自定义内容比较多
// RecordWiseFileCompactor.Reader.Factory<String> stringFactory = new RecordWiseFileCompactor.Reader.Factory<>() {
// @Override
// public RecordWiseFileCompactor.Reader<String> createFor(Path path) throws IOException {
// //需自定义
// return null;
// }
// };
// RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor(stringFactory);
// 创建FileSink
FileSink<String> fileSink = FileSink
//指定文件目录与文件写入编码格式
.forBulkFormat(new Path("D:\\flink"), new CustomBulkWriterFactory())
//设置合并策略
.enableCompact(fileCompactStrategy, fileCompactor)
//分桶策略,不设置采用默认的分桶策略DateTimeBucketAssigner,基于时间的分配器,每小时产生一个桶,即yyyy-mm-dd-hh
.withBucketAssigner(customBucketAssigner)
//指定文件前后缀输出
.withOutputFileConfig(outputFileConfig)
//默认滚动策略,每隔多久把临时文件合并一次
// .withBucketCheckInterval(1000)
//自定义滚动策略,三个条件满足任意一个都会滚动
.withRollingPolicy(rollingPolicy)
.build();
// 创建 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建简单批模式数据源
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("运维测试", "运维开发", "追风的少年"));
// 把数据源的全部数据写入到文件,注意一旦开启文件合并,则必须设置uid,否则直接启动报错
dataStreamSource.sinkTo(fileSink).uid("2");
// 执行任务
env.execute("FileRowSinkStreaming");
}
}FileSink 是 Apache Flink 中实现数据写入文件的核心组件,通过详细介绍其基本概念、实现细节和示例代码,让大家对其有了更全面的认识。期望这些信息能帮助大家更好地理解和应用 FileSink,在实际项目中实现数据写入文件的需求,欢迎评论交流!!!