前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Spark UDF函数迁移到StarRocks

Spark UDF函数迁移到StarRocks

原创
作者头像
码之有理
发布2024-12-09 11:49:58
发布2024-12-09 11:49:58
24600
代码可运行
举报
运行总次数:0
代码可运行

前言

StarRocks 提供了丰富的函数,方便您在日常数据查询和分析时使用。除了常见的函数分类,StarRocks 也支持 ARRAY、JSON、MAP、STRUCT 等半结构化函数,支持 Lambda 高阶函数。如果以上函数都不符合您的需求,您还可以自行编写 Java UDF 来满足业务需求。StarRocks 还提供 Hive Bitmap UDF 功能,您可以在 Hive 里计算生成 Bitmap 后,再导入 StarRocks;将 StarRocks 里生成的 Bitmap,导出到 Hive,方便其它系统使用。

Spark UDF 是一种强大的工具,允许开发者创建自定义函数以扩展 Spark SQL 的功能。然而,当需要将这些 UDF 迁移到 StarRocks 时,可能会遇到一些挑战,因为两个系统在架构和功能上有所不同。

StarRocks 的 UDF 支持

自 2.2.0 版本起,StarRocks 支持使用 Java 语言编写用户定义函数(User Defined Function,简称 UDF)。

自 3.0 版本起,StarRocks 支持 Global UDF,您只需要在相关的 SQL 语句(CREATE/SHOW/DROP)中加上 GLOBAL 关键字,该语句即可全局生效,无需逐个为每个数据库执行此语句。您可以根据业务场景开发自定义函数,扩展 StarRocks 的函数能力。

目前 StarRocks 支持的 UDF 包括用户自定义标量函数(Scalar UDF)、用户自定义聚合函数(User Defined Aggregation Function,UDAF)、用户自定义窗口函数(User Defined Window Function,UDWF)、用户自定义表格函数(User Defined Table Function,UDTF)。

官方文档:https://docs.starrocks.io/zh/docs/sql-reference/sql-functions/JAVA_UDF/

StarRocks 的FE节点支持开启UDF,默认关闭。在客户端可以通过如下命令查询是否开启,只有拥有 cluster_admin 角色的用户才可以执行集群管理相关命令。enable_udf参数是静态参数。

代码语言:javascript
代码运行次数:0
复制
ADMIN SHOW FRONTEND CONFIG LIKE "%udf%";

FE 参数分为动态参数和静态参数。

  • 动态参数可通过 SQL 命令进行在线配置和调整,方便快捷。需要注意通过 SQL 命令所做的动态设置在重启 FE 后会失效。如果想让设置长期生效,建议同时修改 fe.conf 文件。
  • 静态参数必须在 FE 配置文件 fe.conf 中进行配置和调整。调整完成后,需要重启 FE 使变更生效。

参数是否为动态参数可通过 ADMIN SHOW CONFIG 返回结果中的 IsMutable 列查看。TRUE 表示动态参数。

静态和动态参数均可通过 fe.conf 文件进行修改。

参考FE节点的所有配置:https://docs.starrocks.io/zh/docs/administration/management/FE_configuration/

开发并使用UDF

官方文档:https://docs.starrocks.io/zh/docs/sql-reference/sql-functions/JAVA_UDF/

此文档只以Scalar UDF函数为例,更多类型UDF函数参考官方文档。

开发Java UDF函数

SparkUDF函数示例,如下是一个提取字符串括号中ID的UDF工具类。

代码语言:javascript
代码运行次数:0
复制
package com.tencent.sparkpad.udf
//scalastyle:off
import org.apache.spark.sql.api.java.UDF1

class ExtractIdUDF extends UDF1[String, String] {
  override def call(targetStr: String): String = {
    if (targetStr == null || targetStr.isEmpty) {
      return "0"
    }

    val reg = "(?<=\\()[^\\(]*?(?=\\)$)".r
    val matchList = reg.findAllMatchIn(targetStr.trim).toList
    if (matchList.nonEmpty) {
      return matchList.last.toString()
    }
    "0"
  }

}

将SparkUDF转为Java UDF函数,新建一个普通的Java项目,配置pom.xml,在package下新建一个普通类,,evaluate 方法为Scalar UDF 调用入口,必须是 public 成员方法,函数不能加 static 关键字。

代码语言:javascript
代码运行次数:0
复制
package org.example;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ExtractIdUDF {

    public final String evaluate(String targetStr) {
        if (targetStr == null || targetStr.isEmpty()) {
            return "0";
        }

        String reg = "(?<=\\()[^\\(]*?(?=\\)$)";
        Pattern pattern = Pattern.compile(reg);
        Matcher matcher = pattern.matcher(targetStr.trim());

        String lastMatch = "0";
        while (matcher.find()) {
            lastMatch = matcher.group();
        }

        return lastMatch;
    }
}


 // 测试示例
// String testStr1 = "Example (123)";
// String testStr2 = "Another example (456)";
// String testStr3 = "No match here";

// System.out.println(call(testStr1)); // 输出: 123
// System.out.println(call(testStr2)); // 输出: 456
// System.out.println(call(testStr3)); // 输出: 0

Java和SQL中的类型映射关系参考如下:

<!--br {mso-data-placement:same-cell;}--> td {white-space:nowrap;border:1px solid #dee0e3;font-size:10pt;font-style:normal;font-weight:normal;vertical-align:middle;word-break:normal;word-wrap:normal;}

SQL TYPE

Java TYPE

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INT

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

STRING/VARCHAR

java.lang.String

然后把项目打包,在targets目录下生成两个Jar包

代码语言:javascript
代码运行次数:0
复制
mvn package

target/xxx-1.0-SNAPSHOT-jar-with-dependencies.jar包上传到StarRocks可以访问的文件服务器,如nginx,nginx的示例配置如下:

代码语言:javascript
代码运行次数:0
复制
server {
    listen       8888 default_server;

    location / {
        root /data/file-server/;
        autoindex on;
        autoindex_exact_size off;
        autoindex_localtime on;
        charset utf-8;
    }
}

StarRocks创建UDF函数

创建UDF函数的语法如下:

代码语言:javascript
代码运行次数:0
复制
CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]

创建示例中提取字符串中ID的UDF函数如下:

代码语言:javascript
代码运行次数:0
复制
CREATE FUNCTION ExtractIdUDF(string) 
RETURNS string
PROPERTIES (
    "symbol" = "org.example.ExtractIdUDF", 
    "type" = "StarrocksJar",
    "file" = "http://9.x.x.x:8888/xxx-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

函数名ExtractIdUDF后代表参数的类型和个数,RETURNS 表示返回的类型。

PROPERTIES 的参数解释如下:

  • symbol:指定当前UDF函数的入口,一个Jar包中可以有多个UDF函数,根据实际情况修改。
  • type:类型StarrocksJar表示当前是通过Java编写的UDF函数。
  • file:指定Jar包的HTTP请求地址。
  • analytic:指定该函数是否可以用作分析函数,当所创建的函数是窗口函数,固定取值为 true。

默认创建的UDF函数仅在当前database可以使用,如果需要在其它database可以访问,需要创建全局UDF函数,即加上GLOBAL关键字CREATE GLOBAL FUNCTION xxx ...

管理UDF函数

查询UDF函数和查询全局UDF函数。

代码语言:javascript
代码运行次数:0
复制
SHOW FUNCTIONS;
SHOW GLOBAL FUNCTIONS;

删除UDF函数,需要指定函数名和参数,删除全局UDF函数时加上GLOBAL关键字

代码语言:javascript
代码运行次数:0
复制
DROP FUNCTION ExtractIdUDF(string) 
-- DROP GLOBAL FUNCTION ExtractIdUDF(string) 

使用UDF函数

调用自定义的ExtractIdUDF函数,返回字段:extractidudf('Hello(123)')的值为123。

代码语言:javascript
代码运行次数:0
复制
SELECT ExtractIdUDF("Hello(123)");

常见问题

StarRocks Java版本和Jar包编译Java版本不一致

Error Code: 1064. org/example/ExtractIdUDF has been compiled by a more recent version of the Java Runtime (class file version 65.0), this version of the Java Runtime only recognizes class file versions up to 55.0

这个错误信息表明你的UDF类 org.example.ExtractIdUDF是用比Starrocks环境支持的Java版本更高的版本编译的。错误中提到,你的类文件版本是65.0,这对应于Java 21,而你的Starrocks环境只支持到类文件版本55.0,即Java 11之前的版本(Java 11及以下)。因此需要降低编译的Java版本到Java11及以下,推荐使用Java1.8。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • StarRocks 的 UDF 支持
  • 开发并使用UDF
    • 开发Java UDF函数
    • StarRocks创建UDF函数
    • 管理UDF函数
    • 使用UDF函数
  • 常见问题
    • StarRocks Java版本和Jar包编译Java版本不一致
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档