前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >产品经理提需求我不慌了,Doris自定义函数三剑客一把梭!

产品经理提需求我不慌了,Doris自定义函数三剑客一把梭!

作者头像
一臻数据
发布2024-12-24 16:25:34
发布2024-12-24 16:25:34
9300
代码可运行
举报
文章被收录于专栏:一臻数据一臻数据
运行总次数:0
代码可运行

数据工程师小K盯着屏幕发愁。一个看似简单的数据分析需求,却因为复杂的业务规则让他焦头烂额。 "标准SQL函数写不出来,难道要改需求?" 就在这时,他想起了导师曾经提到的Doris"自定义函数",好比打开了一扇魔法之门,让他发现数据分析的世界远比想象中更加精彩。 今天,就让我们一起走进这个神奇的领域,看看如何用自定义函数解锁数据分析的无限可能。

打开数据分析的无限可能

数据分析工程师小张最近接到一个棘手的数据需求 - 计算用户的消费等级。这个看似简单的需求背后,涉及复杂的业务规则:不同时期的消费金额要有不同的权重,还要考虑用户的活跃度、信用评分等多个维度。传统的SQL函数难以满足这种复杂的业务逻辑。

"要是能自己写个函数就好了。"小张嘀咕着。事实上,Doris早就为这种场景准备了完美的解决方案 - 自定义函数。

看完上面这个生动的思维导图,相信大家对Doris的自定义函数已经有了初步印象。Doris支持三类自定义函数:UDF(标量函数)、UDAF(聚合函数)和UDTF(表函数,Doris 3.0 版本开始支持)。它们就像是数据分析的瑞士军刀,能够满足各种复杂的业务场景:

  1. Java UDF 是较为常见的自定义标量函数 (Scalar Function),即每输入一行数据,就会有一行对应的结果输出,较为常见的有 ABS,LENGTH 等。值得一提的是对于用户来讲,Hive UDF 是可以直接迁移至 Doris 的。
  2. Java UDAF 即为自定义的聚合函数 (Aggregate Function),即在输入多行数据进行聚合后,仅输出一行对应的结果,较为常见的有 MIN,MAX,COUNT 等。
  3. JAVA UDTF 即为自定义的表函数 (Table Function),即每输一行数据,可以产生一行或多行的结果,在 Doris 中需要结合 Lateral View 使用可以达到行转列的效果,较为常见的有 EXPLODE,EXPLODE_SPLIT 等。

如上小张画了一张图,展示了Doris UDF的执行过程。当SQL查询包含自定义函数时,Doris会在BE节点的JVM实例中创建相应的函数容器来执行自定义逻辑。这种设计既保证了性能,又提供了足够的灵活性。

Doris自定义函数三剑客

UDF适合处理行级别的数据转换,比如清洗数据、格式转换等。小张最开始的用户等级计算就是典型的UDF场景。一个简单的Java UDF实现如下:

代码语言:javascript
代码运行次数:0
复制
public class UserLevelUDF extends UDF {
    public String evaluate(Double amount, Integer activity, Integer credit) {
        if (amount == null || activity == null || credit == null) {
            return "UNKNOWN";
        }
        double score = amount * 0.5 + activity * 0.3 + credit * 0.2;
        if (score >= 90) return "DIAMOND";
        if (score >= 80) return "GOLD";
        if (score >= 70) return "SILVER";
        return "BRONZE";
    }
}

UDAF则专注于数据聚合。假设我们要计算中位数,传统的SQL就很难实现。使用UDAF,这个需求就变得简单了:

代码语言:javascript
代码运行次数:0
复制
package org.apache.doris.udf.demo;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.logging.Logger;

/*UDAF 计算中位数*/
public class MedianUDAF {
Logger log = Logger.getLogger("MedianUDAF");

//状态存储
public static class State {
    //返回结果的精度
    int scale = 0;
    //是否是某一个 tablet 下的某个聚合条件下的数据第一次执行 add 方法
    boolean isFirst = true;
    //数据存储
    public StringBuilder stringBuilder;
}

//状态初始化
public State create() {
    State state = new State();
    //根据每个 tablet 下的聚合条件需要聚合的数据量大小,预先初始化,增加性能
    state.stringBuilder = new StringBuilder(1000);
    return state;
}


//处理执行单位处理各自 tablet 下的各自聚合条件下的每个数据
public void add(State state, Double val, int scale) throws IOException {
    if (val != null && state.isFirst) {
        state.stringBuilder.append(scale).append(",").append(val).append(",");
        state.isFirst = false;
    } else if (val != null) {
        state.stringBuilder.append(val).append(",");
    }
}

//处理数据完需要输出等待聚合
public void serialize(State state, DataOutputStream out) throws IOException {
    //目前暂时只提供 DataOutputStream,如果需要序列化对象可以考虑拼接字符串,转换 json,序列化成字节数组等方式
    //如果要序列化 State 对象,可能需要自己将 State 内部类实现序列化接口
    //最终都是要通过 DataOutputStream 传输
    out.writeUTF(state.stringBuilder.toString());
}

//获取处理数据执行单位输出的数据
public void deserialize(State state, DataInputStream in) throws IOException {
    String string = in.readUTF();
    state.scale = Integer.parseInt(String.valueOf(string.charAt(0)));
    StringBuilder stringBuilder = new StringBuilder(string.substring(2));
    state.stringBuilder = stringBuilder;
}

//聚合执行单位按照聚合条件合并某一个键下数据的处理结果 ,每个键第一次合并时,state1 参数是初始化的实例
public void merge(State state1, State state2) throws IOException {
    state1.scale = state2.scale;
    state1.stringBuilder.append(state2.stringBuilder.toString());
}

//对每个键合并后的数据进行并输出最终结果
public Double getValue(State state) throws IOException {
    String[] strings = state.stringBuilder.toString().split(",");
    double[] doubles = new double[strings.length + 1];
    doubles = Arrays.stream(strings).mapToDouble(Double::parseDouble).toArray();

    Arrays.sort(doubles);
    double n = doubles.length - 1;
    double index = n * 0.5;

    int low = (int) Math.floor(index);
    int high = (int) Math.ceil(index);

    double value = low == high ? (doubles[low] + doubles[high]) * 0.5 : doubles[high];

    BigDecimal decimal = new BigDecimal(value);
    return decimal.setScale(state.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
}

//每个执行单位执行完都会执行
public void destroy(State state) {
}
}

UDTF即每输一行数据,可以产生一行或多行的结果。它和 UDF 函数一样,需要用户自主实现一个 evaluate 方法,但是 UDTF 函数的返回值必须是 Array 类型:

代码语言:javascript
代码运行次数:0
复制
public class UDTFStringTest {
    public ArrayList<String> evaluate(String value, String separator) {
        if (value == null || separator == null) {
            return null;
        } else {
            return new ArrayList<>(Arrays.asList(value.split(separator)));
        }
    }
}

性能优化与最佳实践

系统迁移是UDF的一大应用场景。许多企业在迁移到Doris时,会发现原系统中有些特殊的函数在Doris中并不存在。这时,我们可以创建同名的自定义函数,让业务代码无缝迁移。

数据分析场景则是UDF最闪耀的舞台。从简单的数据清洗到复杂的机器学习预测,UDF都能胜任。有了自定义函数,数据分析师就像拥有了自己的魔法工具箱,能够随心所欲地处理数据。

在UDF开发中,性能是个关键问题。通过在UDF中使用Static变量,我们可以有效减少资源消耗。比如加载一个大型字典文件:

代码语言:javascript
代码运行次数:0
复制
public class DictUDF {
    private static Map<String, String> dict = new HashMap<>();
    static {
        // 只加载一次字典文件
        loadDict();
    }
    public String evaluate(String key) {
        return dict.getOrDefault(key, "NOT_FOUND");
    }
}

这种静态加载的方式,让每个查询实例都能共享同一份数据,大大提升了性能。

正如厨师需要各种厨具才能烹饪美味佳肴,数据分析师也需要强大的工具来应对各种数据处理需求。Doris的自定义函数就是这样一套"数据厨具",让我们能够灵活应对各种数据分析场景。

下期,我们将一起探讨其它更有趣有用有价值的内容,敬请期待!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-12-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一臻数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 打开数据分析的无限可能
  • Doris自定义函数三剑客
  • 性能优化与最佳实践
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档