首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >PySpark SQL中的用户定义聚合函数

PySpark SQL中的用户定义聚合函数
EN

Stack Overflow用户
提问于 2021-03-09 07:24:31
回答 2查看 603关注 0票数 4

如何在PySpark SQL中实现用户定义的聚合函数?

代码语言:javascript
运行
复制
pyspark version = 3.0.2
python version = 3.7.10

作为一个最小的例子,我想用一个UDAF替换AVG聚合函数:

代码语言:javascript
运行
复制
sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()

rv将位于的位置:

代码语言:javascript
运行
复制
In [2]: rv
Out[2]:
   id  avg(value)
0   1         1.5
1   2         3.5

在查询中,如何使用UDAF替换AVG

例如,这不起作用。

代码语言:javascript
运行
复制
import numpy as np
def udf_avg(x):
    return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = sql.sql('SELECT id, udf_avg(value) FROM df GROUP BY id').toPandas()

其思想是在纯Python中实现UDAF,用于SQL聚合函数不支持的处理(例如,低通过滤器)。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-03-10 03:04:27

可以使用Pandas UDF,其中的定义与Spark 3.0Python 3.6+兼容。有关详细信息,请参阅issuedocumentation

Spark SQL中的完整实现:

代码语言:javascript
运行
复制
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')

@pandas_udf(DoubleType())
def avg_udf(s: pd.Series) -> float:
    return s.mean()
spark.udf.register('avg_udf', avg_udf)

rv = spark.sql('SELECT id, avg_udf(value) FROM df GROUP BY id').toPandas()

带返回值

代码语言:javascript
运行
复制
In [2]: rv
Out[2]:
   id  avg_udf(value)
0   1             1.5
1   2             3.5
票数 2
EN

Stack Overflow用户

发布于 2021-03-09 15:07:58

您可以使用带有GROUPED_AGG类型的Pandas UDF。它接收来自Spark的列作为Pandas Series,这样您就可以在列上调用Series.mean

代码语言:javascript
运行
复制
import pyspark.sql.functions as F

@F.pandas_udf('float', F.PandasUDFType.GROUPED_AGG)  
def avg_udf(s):
    return s.mean()

df2 = df.groupBy('id').agg(avg_udf('value'))

df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
|  1|           1.5|
|  2|           3.5|
+---+--------------+

注册它以在SQL中使用也是可能的:

代码语言:javascript
运行
复制
df.createTempView('df')
spark.udf.register('avg_udf', avg_udf)

df2 = spark.sql("select id, avg_udf(value) from df group by id")
df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
|  1|           1.5|
|  2|           3.5|
+---+--------------+
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66538664

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档