前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL UDF重复调用问题解决方案

Flink SQL UDF重复调用问题解决方案

原创
作者头像
皮皮熊
发布2023-05-31 20:42:49
1.4K1
发布2023-05-31 20:42:49
举报
文章被收录于专栏:大数据与实时计算

Flink SQL UDF重复调用/执行问题

UDF重复调用问题

UDF重复调用的问题在某些情况下可能会对Flink SQL用户造成困扰,例如下面的SQL语句:

代码语言:SQL
复制
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as key3
FROM (
  SELECT dump_json_to_map(col1) as my_map
  FROM T
)

dump_json_to_map会被执行3次。分析对应的graph日志开看,Flink会把我们的代码反向'优化'成类似如下sql:

代码语言:SQL
复制
SELECT dump_json_to_map(col1)['key1'] as key1, dump_json_to_map(col1)['key2'] as key2, dump_json_to_map(col1)['key3'] as key3 FROM T

会造成性能和正确性的问题:

  • UDF包含计算密集型的逻辑,整个作业的性能就会受到很大影响
  • UDF是有状态的UDF(如链接Redis等外部存储),则会导致重复计算,中间状态可能因为无法幂等的操作而被破坏,最终导致正确性出现问题flink 有状态udf 引起血案一

这个Flink社区已有对应的讨论,但是已知没有具体的后续,详见:FLINK-21573

解决方案

解决方案一

修改Flink内核源码,需要团队成员具备维护Flink内核的能力和权力。

参考这篇文章

思路摘要:

  • 复写udf的isDeterministic()方法
  • CodeGeneratorContext中添加可重用的UDF表达式及其result term的容器
  • ExprCodeGenerator入手(函数调用都属于RexCall),找到UDF代码生成的方法,即BridgingFunctionGenUtil#generateScalarFunctionCall(),if (isDeterministic)块内的代码实现了UDF表达式重用,即重用生成的第一个result term。

解决方案二(推荐)

来自好友kyle大佬的实战经验:增加一层透传专用的UDTF。

实现参考:

代码语言:java
复制
public class PassThroughUdtf extends TableFunction<String> {
    private static final long serialVersionUID = 1093578798410129502L;

    // 仅为示例,需要根据自己的场景修改入参和输出的数据类型
    public void eval(String param){
        collect(param);
    }
}

然后改造下原有SQL

代码语言:sql
复制
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as key3
FROM (
  SELECT my_map 
  FROM T ,lateral table(passThrough(dump_json_to_map(col1))) as T(my_map)
)

增加PassThroughUdtf后对整体性能影响不大,就可以相对简单地解决UDF重复调用的问题。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • UDF重复调用问题
  • 解决方案
    • 解决方案一
      • 解决方案二(推荐)
      相关产品与服务
      云数据库 Redis®
      腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档