前言
Flink 1.12 版本在 20 年 12 月已经正式 Release,目前我们的 Flink SQL 作业的 Flink 引擎版本还是 1.10,本文主要用以评估 Flink 1.10 升级到 1.12 整体所能带来的预期收益,同时结合所需投入的成本,决定是否需要升级 Flink SQL 引擎版本到 1.12。本次升级所评估的收益包含 1.11 和 1.12 版本所带来的收益,如有理解错误,欢迎指出,一起交流。
一、 Flink SQL 语法更加简洁,提升实时作业开发效率
FLIP-122 提出了新的 Connector 属性 key, 具体参考 FLIP-122: New Connector Property Keys for New Factory 。FLIP-122 在 Flink 1.11 Released,Flink 1.11 SQL 语法会更加简洁,这能够提升实时用户开发作业的效率。
新的代码结构(Kafka Source 举例):
CREATE TABLE kafka_table (
...
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'hello_world',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);
可以看到,新的 Flink SQL 语法,整体对于用户来说,更为简洁和直观,用户开发时,也会更为的方便。
二、Flink SQL 支持 Kafka Upsert Connector
FLIP-149 云邪提出了 upsert-kafka Connector,具体链接:https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector。首先要理解 upsert 的含义:一条记录(有 主键),如果不存在,则插入,有则更新,全称:insert / update。Upsert-kafka connector 产生一个changelog 流,changelog 流中的数据记录可以理解为 UPSERT 流,也就是INSERT/UPDATE,因为具有相同键的任何现有行都会被覆盖。同样,空值可以用一种特殊的方式理解:带有空值的记录表示“删除”。
Upsert-kafka Connector 对于我们来说,解决最常用的场景是:从 Kafka Topic 按某类 Key 取最新数据,然后下游聚合,最后写入到外部存储。这种通用的实时开发流程一般是:上游为 mysql binglog -> Kafka 的数据同步任务,然后下游需要按照某类key 取最新数据进行聚合等等。
下面是 Flink 1.10 按照 a 取最新值,然后下游进行聚合的 SQL 代码,主要使用到了 last_value:
create table hello_world
(
a varchar
, b bigint
, c bigint
, d bigint
) with (
xxx
);
create view temp_hello as
select
a
, last_value( b ) as b
, last_value( c ) as c
, last_value( d ) as d
from
hello_world
group by
a;
create view temp_world as
select
sum(b) as sum_b
,sum(c) as sum_c
,sum(d) as sum_d
from temp_hello;
在 Flink 1.10 中,当前这类任务开发对于用户来说,还是不够友好,需要很多代码,同时也会造成 Flink SQL 冗长。Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,这也是我们公司内部业务方对实时平台提出的需求。
Flink 1.12 支持了 Flink SQL Kafka upsert connector ,下面是使用 Flink 1.12 代码改写上述逻辑:
CREATE TABLE temp_hello (
a varchar
, b bigint
, c bigint
, d bigint
PRIMARY KEY (a) NOT ENFORCED
) WITH (
xx
);
create view temp_world as
select
sum(b) as sum_b
,sum(c) as sum_c
,sum(d) as sum_d
from temp_hello;
收益:便利用户有这种需要从 kafka 取最新记录操作的实时任务开发,比如这种 binlog -> kafka,然后用户聚合操作,这种场景还是非常多的,这能提升实时作业开发效率,同时 1.12 做了优化,性能会比单纯的 last_value 性能要好
三、Flink Yarn 作业 On k8s 的生产级别能力
之前我们内部 Flink Jar 作业已经全部 K8s 化,Flink SQL 作业由于是推广初期,还是在 Yarn 上面进行运行,为了将实时计算 Flink 全部 K8s 化(去 Yarn),所以我们 Flink SQL 作业也需要迁移到 K8s,目前 Flink 1.12 已经满足生产级别的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社区的 On k8s 能力。
虽然和社区的人沟通,Flink 1.12 on k8s 没有什么问题,但是具体功能还是需要先 POC 验证一下,同时可能社区 Flink on k8s 的能力,可能会限制我们这边一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,这里可能需要改底层源码来进行快速支持(社区有相关 JIRA 要做)。
(Flink 去 Yarn)Flink 1.12 on k8s 对于我们最主要的两个点:
最终实时集群 Flink 作业去 Yarn 化,同时为离线提供更多能够弹性扩缩资源,更好的降低成本。
四、Flink On Hive 能力(生产级别)
目前在有赞已经开始有部分实时业务方希望 Flink 能够支持 Hive,比如 Flink-Hive 近实时的数仓中间层【小时表可更快产出】,以及 Flink 实时任务和离线数据对比功能。而在 Flink 1.12 中,已经支持生产级别 Flink On Hive 任务运行(社区 Commiter 说),所以基于这次 Flink SQL 引擎版本升级,能够支持 Flink on hive 生产功能。
解决部分实时业务方, Flink On Hive 的业务需求,下面是 Flink 1.12 具体 Hive 相关功能:
五、Flink 基于 Savepoint 跨集群迁移能力
当前我们使用 Flink 版本是 1.10,而在Flink 1.11 以下,在任务 Savepoint Meta 文件里面,存储的这次 Savepoint 引用的状态文件路径都是 HDFS 全路径,所以在跨集群迁移时,Savepoint 是不能够进行复用的,所以一旦有集群迁移,Flink SQL 作业状态会丢失,如果有状态强相关的实时作业,可能会有故障风险。该问题已经在 1.11 已经修复,具体可以参考:FLINK-5763:Make savepoints self-contained and relocatable
(稳定性)Flink 作业有基于 Savepoint 跨集群不丢状态的恢复和迁移能力。
六、其他对我们有用收益
其他一些 Flink Bug Fix。