首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink DataStream API与Data Table API/SQL集成

Flink DataStream API与Data Table API/SQL集成

作者头像
从大数据到人工智能
发布于 2022-02-24 05:44:09
发布于 2022-02-24 05:44:09
4.5K00
代码可运行
举报
文章被收录于专栏:大数据-BigData大数据-BigData
运行总次数:0
代码可运行

在定义数据处理管道时,Table API 和 DataStream API 同样重要。

DataStream API 在一个相对较低级别的命令式编程 API 中提供了流处理的原语(即时间、状态和数据流管理)。 Table API 抽象了许多内部结构,并提供了结构化和声明性的 API。

两种 API 都可以处理有界和无界流。

处理历史数据时需要管理有界流。 无限流发生在可能首先用历史数据初始化的实时处理场景中。

为了高效执行,这两个 API 都以优化的批处理执行模式提供处理有界流。 但是,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。

目前,DataStream API 和 Table API 都提供了自己的方式来启用批处理执行模式。 在不久的将来,这将进一步统一。

一个 API 中的管道可以端到端定义,而不依赖于另一个 API。 但是,出于各种原因,混合使用这两种 API 可能会很有用:

  • 在 DataStream API 中实现主管道之前,使用表生态系统轻松访问目录或连接到外部系统。
  • 在 DataStream API 中实现主管道之前,访问一些用于无状态数据规范化和清理的 SQL 函数。
  • 如果 Table API 中不存在更底层的操作(例如自定义计时器处理),请不时切换到 DataStream API。

Flink 提供了特殊的桥接功能,使与 DataStream API 的集成尽可能顺畅。

在 DataStream 和 Table API 之间切换会增加一些转换开销。 例如,部分处理二进制数据的表运行时的内部数据结构(即 RowData)需要转换为对用户更友好的数据结构(即 Row)。 通常,可以忽略此开销,但为了完整起见,在此提及。

DataStream和Table之间的转换

Flink 在 Java 和 Scala 中提供了一个专门的 StreamTableEnvironment 用于与 DataStream API 集成。 这些环境使用其他方法扩展常规 TableEnvironment,并将 DataStream API 中使用的 StreamExecutionEnvironment 作为参数。

目前 StreamTableEnvironment 暂不支持开启批量执行模式。 尽管如此,有界流可以使用流式执行模式处理,但效率较低。 但是请注意,通用 TableEnvironment 可以在流式执行或优化的批处理执行模式下工作。

以下代码显示了如何在两个 API 之间来回切换的示例。 Table 的列名和类型自动派生自 DataStream 的 TypeInformation。 由于 DataStream API 本身不支持变更日志处理,因此代码在流到表和表到流的转换过程中假定仅附加/仅插入语义。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");

// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream);

// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");

// interpret the insert-only Table as a DataStream again
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();

// prints:
// +I[Alice]
// +I[Bob]
// +I[John]

fromDataStream 和 toDataStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它还涵盖了使用事件时间和水印。

根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表覆盖到 DataStream 时产生仅插入更改,而且还会产生撤回和其他类型的更新。 在表到流的转换过程中,这可能会导致类似于以下的异常

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

在这种情况下,需要再次修改查询或切换到 toChangelogStream。

以下示例显示了如何转换更新表。 每个结果行都表示更改日志中的一个条目,该条目带有一个更改标志,可以通过对其调用 row.getKind() 进行查询。 在该示例中,Alice 的第二个分数在 (-U) 更改之前和 (+U) 更改之后创建更新。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a DataStream
DataStream<Row> dataStream = env.fromElements(
    Row.of("Alice", 12),
    Row.of("Bob", 10),
    Row.of("Alice", 100));

// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");

// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
    "SELECT name, SUM(score) FROM InputTable GROUP BY name");

// interpret the updating Table as a changelog DataStream
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);

// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();

// prints:
// +I[Alice, 12]
// +I[Bob, 10]
// -U[Alice, 12]
// +U[Alice, 112]

fromChangelogStream 和 toChangelogStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它涵盖了使用事件时间和水印。 它讨论了如何为输入和输出流声明主键和更改日志模式。

依赖与导入

将 Table API 与 DataStream API 结合的项目需要添加以下桥接模块之一。 它们包括对 flink-table-api-java 或 flink-table-api-scala 的传递依赖以及相应的特定于语言的 DataStream API 模块。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.13.5</version>
  <scope>provided</scope>
</dependency>

配置

TableEnvironment 将采用来自传递的 StreamExecutionEnvironment 的所有配置选项。 但是,不能保证对 StreamExecutionEnvironment 的配置的进一步更改会在 StreamTableEnvironment 实例化后传播到它。 此外,不支持将选项从 Table API 反向传播到 DataStream API。

我们建议在切换到 Table API 之前尽早在 DataStream API 中设置所有配置选项。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// create Java DataStream API

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// set various configuration early

env.setMaxParallelism(256);

env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// then switch to Java Table API

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// set configuration early

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));

// start defining your pipelines in both APIs...

执行行为

这两个 API 都提供了执行管道的方法。 换句话说:如果需要,他们会编译一个作业图,该作业图将提交到集群并触发执行。 结果将流式传输到声明的接收器。

通常,这两个 API 都使用方法名称中的术语执行来标记此类行为。 但是,Table API 和 DataStream API 的执行行为略有不同。

DataStream API

DataStream API 的 StreamExecutionEnvironment 充当构建器模式来构建复杂的管道。 管道可能会分成多个分支,这些分支可能会或可能不会以接收器结束。

必须至少定义一个接收器。 否则,将引发以下异常:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

StreamExecutionEnvironment.execute() 提交整个构建的管道并随后清除构建器。 换句话说:不再声明源和接收器,并且可以将新管道添加到构建器中。 因此,每个 DataStream 程序通常以调用 StreamExecutionEnvironment.execute() 结束。 或者,DataStream.executeAndCollect() 隐式定义了一个接收器,用于将结果流式传输到本地客户端,并且只执行当前分支。

Table API

在 Table API 中,仅在 StatementSet 中支持分支管道,其中每个分支都必须声明一个最终接收器。 TableEnvironment 和 StreamTableEnvironment 都不提供专用的通用 execute() 方法。 相反,它们提供了提交单个源到接收器管道或语句集的方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// execute with explicit sink
tableEnv.from("InputTable").executeInsert("OutputTable")

tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")

tableEnv.createStatementSet()
    .addInsert("OutputTable", tableEnv.from("InputTable"))
    .addInsert("OutputTable2", tableEnv.from("InputTable"))
    .execute()

tableEnv.createStatementSet()
    .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
    .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
    .execute()

// execute with implicit local sink

tableEnv.from("InputTable").execute().print()

tableEnv.executeSql("SELECT * FROM InputTable").print()

为了结合这两种执行行为,对 StreamTableEnvironment.toDataStream 或 StreamTableEnvironment.toChangelogStream 的每次调用都将具体化(即编译)表 API 子管道并将其插入到 DataStream API 管道构建器中。 这意味着之后必须调用 StreamExecutionEnvironment.execute() 或 DataStream.executeAndCollect。 Table API 中的执行不会触发这些“外部部分”。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// (1)

// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print()

// (2)

// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print()

// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute()

处理(仅插入)流

StreamTableEnvironment 提供以下方法来转换和转换为 DataStream API:

  • fromDataStream(DataStream):将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印。
  • fromDataStream(DataStream, Schema):将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
  • createTemporaryView(String, DataStream):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream)) 的快捷方式。
  • createTemporaryView(String, DataStream, Schema):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream, Schema)) 的快捷方式。
  • toDataStream(DataStream):将表转换为只插入更改的流。默认流记录类型是 org.apache.flink.types.Row。单个行时间属性列被写回到 DataStream API 的记录中。水印也被传播。
  • toDataStream(DataStream, AbstractDataType):将表转换为只插入更改的流。此方法接受一种数据类型来表达所需的流记录类型。规划器可能会插入隐式强制转换和重新排序列以将列映射到(可能是嵌套的)数据类型的字段。
  • toDataStream(DataStream, Class):toDataStream(DataStream, DataTypes.of(Class)) 的快捷方式,可以快速反射地创建所需的数据类型。

从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。

virtual CREATE TABLE name (schema) WITH (options) 语句中的 schema 部分可以从 DataStream 的类型信息中自动派生、丰富或完全使用 org.apache.flink.table.api.Schema 手动定义。

虚拟 DataStream 表连接器为每一行公开以下元数据:

Key

Data Type

Description

R/W

rowtime

TIMESTAMP_LTZ(3) NOT NULL

Stream record’s timestamp.

R/W

虚拟 DataStream 表源实现 SupportsSourceWatermark,因此允许调用 SOURCE_WATERMARK() 内置函数作为水印策略,以采用来自 DataStream API 的水印。

fromDataStream 例子

下面的代码展示了如何将 fromDataStream 用于不同的场景。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;

// some example POJO
public static class User {
  public String name;

  public Integer score;

  public Instant event_time;

  // default constructor for DataStream API
  public User() {}

  // fully assigning constructor for Table API
  public User(String name, Integer score, Instant event_time) {
    this.name = name;
    this.score = score;
    this.event_time = event_time;
  }
}

// create a DataStream
DataStream<User> dataStream =
    env.fromElements(
        new User("Alice", 4, Instant.ofEpochMilli(1000)),
        new User("Bob", 6, Instant.ofEpochMilli(1001)),
        new User("Alice", 10, Instant.ofEpochMilli(1002)));

// === EXAMPLE 1 ===

// derive all physical columns automatically

Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9)
// )

// === EXAMPLE 2 ===

// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)

Table table = tableEnv.fromDataStream(
    dataStream,
    Schema.newBuilder()
        .columnByExpression("proc_time", "PROCTIME()")
        .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT NOT NULL,
//  `event_time` TIMESTAMP_LTZ(9),
//  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)

// === EXAMPLE 3 ===

// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategy

Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
            .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
            .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )

// === EXAMPLE 4 ===

// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API

// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
            .watermark("rowtime", "SOURCE_WATERMARK()")
            .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )

// === EXAMPLE 5 ===

// define physical columns manually
// in this example,
//   - we can reduce the default precision of timestamps from 9 to 3
//   - we also project the columns and put `event_time` to the beginning

Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column("event_time", "TIMESTAMP_LTZ(3)")
            .column("name", "STRING")
            .column("score", "INT")
            .watermark("event_time", "SOURCE_WATERMARK()")
            .build());
table.printSchema();
// prints:
// (
//  `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
//  `name` VARCHAR(200),
//  `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection

示例 1 说明了一个不需要基于时间的操作的简单用例。

示例 4 是最常见的用例,其中基于时间的操作(例如窗口或间隔连接)应该是管道的一部分。 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。

示例 5 完全依赖于用户的声明。 这对于将 DataStream API 中的泛型类型(在 Table API 中为 RAW)替换为适当的数据类型很有用。

由于 DataType 比 TypeInformation 更丰富,我们可以轻松启用不可变 POJO 和其他复杂的数据结构。 以下 Java 示例显示了可能的情况。 另请查看 DataStream API 的 Data Types & Serialization 页面以获取有关那里支持的类型的更多信息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;

// the DataStream API does not support immutable POJOs yet,
// the class will result in a generic type that is a RAW type in Table API by default
public static class User {

    public final String name;

    public final Integer score;

    public User(String name, Integer score) {
        this.name = name;
        this.score = score;
    }
}

// create a DataStream
DataStream<User> dataStream = env.fromElements(
    new User("Alice", 4),
    new User("Bob", 6),
    new User("Alice", 10));

// since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
// leading to a table with a single column `f0`

Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
//  `f0` RAW('User', '...')
// )

// instead, declare a more useful data type for columns using the Table API's type system
// in a custom schema and rename the columns in a following `as` projection

Table table = tableEnv
    .fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column("f0", DataTypes.of(User.class))
            .build())
    .as("user");
table.printSchema();
// prints:
// (
//  `user` *User<`name` STRING,`score` INT>*
// )

// data types can be extracted reflectively as above or explicitly defined

Table table3 = tableEnv
    .fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column(
                "f0",
                DataTypes.STRUCTURED(
                    User.class,
                    DataTypes.FIELD("name", DataTypes.STRING()),
                    DataTypes.FIELD("score", DataTypes.INT())))
            .build())
    .as("user");
table.printSchema();
// prints:
// (
//  `user` *User<`name` STRING,`score` INT>*
// )

createTemporaryView 示例

DataStream 可以直接注册为视图(可能通过模式丰富)。

从 DataStream 创建的视图只能注册为临时视图。 由于它们的内联/匿名性质,无法将它们注册到永久目录中。

下面的代码展示了如何在不同的场景下使用 createTemporaryView。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;

// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
    Tuple2.of(12L, "Alice"),
    Tuple2.of(0L, "Bob"));

// === EXAMPLE 1 ===

// register the DataStream as view "MyView" in the current session
// all columns are derived automatically

tableEnv.createTemporaryView("MyView", dataStream);

tableEnv.from("MyView").printSchema();

// prints:
// (
//  `f0` BIGINT NOT NULL,
//  `f1` STRING
// )

// === EXAMPLE 2 ===

// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`

// in this example, the derived NOT NULL information has been removed

tableEnv.createTemporaryView(
    "MyView",
    dataStream,
    Schema.newBuilder()
        .column("f0", "BIGINT")
        .column("f1", "STRING")
        .build());

tableEnv.from("MyView").printSchema();

// prints:
// (
//  `f0` BIGINT,
//  `f1` STRING
// )

// === EXAMPLE 3 ===

// use the Table API before creating the view if it is only about renaming columns

tableEnv.createTemporaryView(
    "MyView",
    tableEnv.fromDataStream(dataStream).as("id", "name"));

tableEnv.from("MyView").printSchema();

// prints:
// (
//  `id` BIGINT NOT NULL,
//  `name` STRING
// )

toDataStream 示例

下面的代码展示了如何在不同的场景中使用 toDataStream。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;

// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {

    public String name;

    public Integer score;

    public Instant event_time;
}

tableEnv.executeSql(
    "CREATE TABLE GeneratedTable "
    + "("
    + "  name STRING,"
    + "  score INT,"
    + "  event_time TIMESTAMP_LTZ(3),"
    + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    + ")"
    + "WITH ('connector'='datagen')");

Table table = tableEnv.from("GeneratedTable");

// === EXAMPLE 1 ===

// use the default conversion to instances of Row

// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated

DataStream<Row> dataStream = tableEnv.toDataStream(table);

// === EXAMPLE 2 ===

// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type

// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated

DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);

// data types can be extracted reflectively as above or explicitly defined

DataStream<User> dataStream =
    tableEnv.toDataStream(
        table,
        DataTypes.STRUCTURED(
            User.class,
            DataTypes.FIELD("name", DataTypes.STRING()),
            DataTypes.FIELD("score", DataTypes.INT()),
            DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));

请注意,toDataStream 仅支持非更新表。 通常,基于时间的操作(例如窗口、间隔连接或 MATCH_RECOGNIZE 子句)非常适合与投影和过滤器等简单操作相邻的仅插入管道。

具有产生更新的操作的管道可以使用 toChangelogStream。

处理变更流

在内部,Flink 的表运行时是一个变更日志处理器。 概念页面描述了动态表和流如何相互关联。

StreamTableEnvironment 提供以下方法来公开这些变更数据捕获 (CDC) 功能:

  • fromChangelogStream(DataStream):将变更日志条目流解释为表格。流记录类型必须是 org.apache.flink.types.Row,因为它的 RowKind 标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改的更改日志(在 org.apache.flink.types.RowKind 中枚举)作为默认的 ChangelogMode。
  • fromChangelogStream(DataStream, Schema):允许为 DataStream 定义类似于 fromDataStream(DataStream, Schema) 的模式。否则语义等于 fromChangelogStream(DataStream)。
  • fromChangelogStream(DataStream, Schema, ChangelogMode):完全控制如何将流解释为变更日志。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。
  • toChangelogStream(Table):fromChangelogStream(DataStream)的逆操作。它生成一个包含 org.apache.flink.types.Row 实例的流,并在运行时为每条记录设置 RowKind 标志。该方法支持各种更新表。如果输入表包含单个行时间列,它将被传播到流记录的时间戳中。水印也将被传播。
  • toChangelogStream(Table, Schema):fromChangelogStream(DataStream, Schema)的逆操作。该方法可以丰富产生的列数据类型。如有必要,计划者可能会插入隐式强制转换。可以将行时间写为元数据列。
  • toChangelogStream(Table, Schema, ChangelogMode):完全控制如何将表转换为变更日志流。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。

从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。

因为 fromChangelogStream 的行为与 fromDataStream 类似,我们建议在继续之前阅读上一节

此虚拟连接器还支持读取和写入流记录的行时元数据。

虚拟表源实现 SupportsSourceWatermark

fromChangelogStream 使用示例

下面的代码展示了如何将 fromChangelogStream 用于不同的场景。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

// === EXAMPLE 1 ===

// interpret the stream as a retract stream

// create a changelog DataStream
DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print();

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -D |                          Alice |          12 |
// | +I |                          Alice |         100 |
// +----+--------------------------------+-------------+

// === EXAMPLE 2 ===

// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)

// create a changelog DataStream
DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// interpret the DataStream as a Table
Table table =
    tableEnv.fromChangelogStream(
        dataStream,
        Schema.newBuilder().primaryKey("f0").build(),
        ChangelogMode.upsert());

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print();

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -U |                          Alice |          12 |
// | +U |                          Alice |         100 |
// +----+--------------------------------+-------------+

示例 1 中显示的默认 ChangelogMode 对于大多数用例来说应该足够了,因为它接受各种更改。

但是,示例 2 显示了如何通过使用 upsert 模式将更新消息的数量减少 50% 来限制传入更改的种类以提高效率。 可以通过为 toChangelogStream 定义主键和 upsert 更改日志模式来减少结果消息的数量。

toChangelogStream使用示例

下面的代码展示了如何在不同的场景下使用 toChangelogStream。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.*;

// create Table with event-time
tableEnv.executeSql(
    "CREATE TABLE GeneratedTable "
    + "("
    + "  name STRING,"
    + "  score INT,"
    + "  event_time TIMESTAMP_LTZ(3),"
    + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    + ")"
    + "WITH ('connector'='datagen')");

Table table = tableEnv.from("GeneratedTable");

// === EXAMPLE 1 ===

// convert to DataStream in the simplest and most general way possible (no event-time)

Table simpleTable = tableEnv
    .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
    .as("name", "score")
    .groupBy($("name"))
    .select($("name"), $("score").sum());

tableEnv
    .toChangelogStream(simpleTable)
    .executeAndCollect()
    .forEachRemaining(System.out::println);

// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]

// === EXAMPLE 2 ===

// convert to DataStream in the simplest and most general way possible (with event-time)

DataStream<Row> dataStream = tableEnv.toChangelogStream(table);

// since `event_time` is a single time attribute in the schema, it is set as the
// stream record's timestamp by default; however, at the same time, it remains part of the Row

dataStream.process(
    new ProcessFunction<Row, Void>() {
        @Override
        public void processElement(Row row, Context ctx, Collector<Void> out) {

             // prints: [name, score, event_time]
             System.out.println(row.getFieldNames(true));

             // timestamp exists twice
             assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
        }
    });
env.execute();

// === EXAMPLE 3 ===

// convert to DataStream but write out the time attribute as a metadata column which means
// it is not part of the physical schema anymore

DataStream<Row> dataStream = tableEnv.toChangelogStream(
    table,
    Schema.newBuilder()
        .column("name", "STRING")
        .column("score", "INT")
        .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
        .build());

// the stream record's timestamp is defined by the metadata; it is not part of the Row

dataStream.process(
    new ProcessFunction<Row, Void>() {
        @Override
        public void processElement(Row row, Context ctx, Collector<Void> out) {

            // prints: [name, score]
            System.out.println(row.getFieldNames(true));

            // timestamp exists once
            System.out.println(ctx.timestamp());
        }
    });
env.execute();

// === EXAMPLE 4 ===

// for advanced users, it is also possible to use more internal data structures for efficiency

// note that this is only mentioned here for completeness because using internal data structures
// adds complexity and additional type handling

// however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
// also structured types can be represented as `Row` if needed

DataStream<Row> dataStream = tableEnv.toChangelogStream(
    table,
    Schema.newBuilder()
        .column(
            "name",
            DataTypes.STRING().bridgedTo(StringData.class))
        .column(
            "score",
            DataTypes.INT())
        .column(
            "event_time",
            DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
        .build());

// leads to a stream of Row(name: StringData, score: Integer, event_time: Long)

有关示例 4 中的数据类型支持哪些转换的更多信息,请参阅 Table API 的数据类型页面。

toChangelogStream(Table).executeAndCollect() 的行为等同于调用 Table.execute().collect()。 但是, toChangelogStream(Table) 可能对测试更有用,因为它允许在 DataStream API 的后续 ProcessFunction 中访问生成的水印。

TypeInformation 和 DataType 之间的映射

DataStream API 使用 org.apache.flink.api.common.typeinfo.TypeInformation 的实例来描述在流中传输的记录类型。特别是,它定义了如何将记录从一个 DataStream 运算符序列化和反序列化到另一个。它还有助于将状态序列化为保存点和检查点。

Table API 使用自定义数据结构在内部表示记录,并向用户公开 org.apache.flink.table.types.DataType 以声明将数据结构转换为的外部格式,以便在源、接收器、UDF 或 DataStream 中使用API。

DataType 比 TypeInformation 更丰富,因为它还包含有关逻辑 SQL 类型的详细信息。因此,在转换过程中会隐式添加一些细节。

Table 的列名和类型自动派生自 DataStream 的 TypeInformation。使用 DataStream.getType() 检查是否已通过 DataStream API 的反射类型提取工具正确检测到类型信息。如果最外层记录的 TypeInformation 是 CompositeType,则在派生表的 schema 时将在第一级展平。

TypeInformation 转为 DataType

将 TypeInformation 转换为 DataType 时适用以下规则:

  • TypeInformation 的所有子类都映射到逻辑类型,包括与 Flink 内置序列化器对齐的可空性。
  • TupleTypeInfoBase 的子类被转换为行(对于 Row)或结构化类型(对于元组、POJO 和案例类)。
  • BigDecimal 默认转换为 DECIMAL(38, 18)。
  • PojoTypeInfo 字段的顺序由以所有字段作为参数的构造函数确定。 如果在转换过程中未找到,则字段顺序将按字母顺序排列。
  • 无法表示为列出的 org.apache.flink.table.api.DataTypes 之一的 GenericTypeInfo 和其他 TypeInformation 将被视为黑盒 RAW 类型。 当前会话配置用于实现原始类型的序列化程序。 届时将无法访问复合嵌套字段。
  • 有关完整的翻译逻辑,请参阅 TypeInfoDataTypeConverter。

使用 DataTypes.of(TypeInformation) 在自定义模式声明或 UDF 中调用上述逻辑。

DataType转为TypeInformation

表运行时将确保正确地将输出记录序列化到 DataStream API 的第一个运算符。

之后,需要考虑 DataStream API 的类型信息语义。

本文为从大数据人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/1946396

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。
公众号:大数据羊说
2022/04/04
3.6K0
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
(5)Flink CEP SQL四种匹配模式效果演示
从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配
NBI大数据
2022/08/24
5400
(5)Flink CEP SQL四种匹配模式效果演示
(2)Flink CEP SQL严格近邻代码演示-风控系统构建利器
上一篇我们对Flink CEP做了简单介绍,这一篇我们通过代码来演示一下Flink CEP SQL中的严格近邻效果:
NBI大数据
2022/08/13
5070
(2)Flink CEP SQL严格近邻代码演示-风控系统构建利器
2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment
Lansonli
2021/10/11
8770
flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
公众号:大数据羊说
2022/04/04
3.1K0
flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)
​flink实战-flink streaming sql 初体验
SQL,Structured Query Language:结构化查询语言,作为一个通用、流行的查询语言,不仅仅是在传统的数据库,在大数据领域也变得越来越流行,hive、spark、kafka、flink等大数据组件都支持sql的查询,使用sql可以让一些不懂这些组件原理的人,轻松的来操作,大大的降低了使用的门槛,今天我们先来简单的讲讲在flink的流处理中如何使用sql.
大数据技术与应用实战
2020/09/15
2K0
Flink重点难点:Flink Table&SQL必知必会(二)
介绍了 Flink Table & SQL的一些核心概念,本部分将介绍 Flink 中窗口和函数。
王知无-import_bigdata
2021/09/22
2.2K0
2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三
使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
Lansonli
2021/10/11
4690
Flink重点难点:Flink Table&SQL必知必会(一)
Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。
王知无-import_bigdata
2021/09/22
2.3K0
Flink重点难点:维表关联理论和Join实战
数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中内置有两个可以根据时间条件对数据流进行Join的算子:基于间隔的Join和基于窗口的Join。本节我们会对它们进行介绍。
王知无-import_bigdata
2021/09/22
4.9K0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
哈喽各位,本章主要写的是FlinkSQL也是Flink章节的倒数第二篇了,最后还有一篇FlinkCEP,稍后会出,耐心关注哦!好了,进入正题!!!!
857技术社区
2023/02/23
3.8K0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
重要|Flink SQL与kafka整合的那些事儿
flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。
Spark学习技巧
2019/06/03
3.4K0
Flink Table API & SQL 基本操作
本文主要展示了 Table API 和 SQL 程序的常见结构,如何创建注册 Table,查询 Table,以及如何输出 Table。
smartsi
2022/04/17
3.7K0
Flink学习笔记(9)-Table API 和 Flink SQL
• Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
挽风
2022/05/11
2.4K0
Flink学习笔记(9)-Table API 和 Flink SQL
零基础学Flink:Flink SQL(上)
前面几篇内容,我们结合案例来介绍了,两流Join,热销榜,以及状态容错,今天我们依旧基于这个数据,来说说Flink SQL,如果对原理有兴趣的同学,也可以移步到《Stream SQL 的执行原理与 Flink 的实现 》,去了解相关内容。
麒思妙想
2020/07/10
1.2K0
(6)Flink CEP SQL模拟账号短时间内异地登录风控预警
(1)通过将xxx平台用户登录时的登录日志发送到kafka(本文代码演示用的socket);
NBI大数据
2022/08/30
6870
(6)Flink CEP SQL模拟账号短时间内异地登录风控预警
干货 | 五千字长文带你快速入门FlinkSQL
最近几天因为工作比较忙,已经几天没有及时更新文章了,在这里先给小伙伴们说声抱歉…临近周末,再忙再累,我也要开始发力了。接下来的几天,菌哥将为大家带来关于FlinkSQL的教程,之后还会更新一些大数据实时数仓的内容,和一些热门的组件使用!希望小伙伴们能点个关注,第一时间关注技术干货!
大数据梦想家
2021/01/27
2.2K0
干货 | 五千字长文带你快速入门FlinkSQL
聊聊flink的Table API及SQL Programs
序 本文主要研究一下flink的Table API及SQL Programs flink-forward-sf-2017-timo-walther-table-sql-api-unified-apis-for-batch-and-stream-processing-8-638.jpg 实例 // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironm
code4it
2019/01/21
2.2K0
聊聊flink的Table API及SQL Programs
一篇文章让深入理解Flink SQL 时间特性
基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
大数据老哥
2021/02/04
1.8K0
一篇文章让深入理解Flink SQL 时间特性
Flink 的三种WordCount(文末领取Flink书籍)
今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。
Python编程爱好者
2022/09/21
1.1K0
Flink 的三种WordCount(文末领取Flink书籍)
推荐阅读
相关推荐
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
更多 >
交个朋友
加入[数据] 腾讯云技术交流站
获取数据实战干货 共享技术经验心得
加入数据技术工作实战群
获取实战干货 交流技术经验
加入[数据库] 腾讯云官方技术交流站
数据库问题秒解答 分享实践经验
换一批
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档