窗口函数(Window functions)又称分析函数或开窗函数,它允许你在不改变原始行的情况下,对一组相关的行(称为“窗口”)进行计算和分析。与普通的聚合函数(如SUM、AVG等)不同,窗口函数不会将多行合并为一行,而是为每一行返回一个计算结果,同时保留原始行的详细信息。通常写法为func()over(),详细语法如下:
window_function
[ nulls_option ]
OVER
( [ { PARTITION | DISTRIBUTE } BY partition_col_name = partition_col_val ( [ , ... ] ) ]
{ ORDER | SORT } BY expression [ ASC | DESC ]
[ NULLS { FIRST | LAST } ]
[ , ... ]
[ window_frame ] )
上面函数对应下面几个部分
支持开窗的函数列表,支持开窗函数分为:排序函数(Ranking Functions)、分析函数(Analytic Functions)、聚合函数(Aggregate Functions)
排序函数 | 描述 | 函数具体介绍 |
---|---|---|
RANK | 计算一组值中某个值的排名。结果是在分区排序中,当前行之前或等于当前行的行数加一。该值将在序列中产生间隔。 | https://sparkfunctions.com/rank |
DENSE_RANK | 计算一组值中某个值的排名。结果是先前分配的排名值加一。与 rank 函数不同,dense_rank 不会在排名序列中产生间隔。 | https://sparkfunctions.com/dense_rank |
PERCENT_RANK | 计算一个值在一组值中的百分比排名 | https://sparkfunctions.com/percent_rank |
NTILE | 将每个窗口分区的行分成 n 个桶,范围从 1 到最多 n。 | https://sparkfunctions.com/ntile |
ROW_NUMBER | 根据窗口分区内行的排序,为每一行分配一个唯一的、连续的数字,从一开始。 | https://sparkfunctions.com/row_number |
分析函数 | 描述 | 具体使用方式 |
---|---|---|
CUME_DIST | 计算一个值在分区中相对于所有值的位置 | https://sparkfunctions.com/cume_dist |
LAG | lag(input[, offset[, default]]) - 返回当前行之前第 offset 行的 input 值 | https://sparkfunctions.com/lag |
LEAD | lead(input[, offset[, default]]) - 返回窗口中当前行之后第 offset 行的 input 值 | https://sparkfunctions.com/lead |
NTH_VALUE | nth_value(input[, offset]) - 返回窗口帧从开始处的第 offset 行的 input 值 | https://sparkfunctions.com/nth_value |
FIRST_VALUE | first_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值 | https://sparkfunctions.com/first_value |
LAST_VALUE | last_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值 | https://sparkfunctions.com/last_value |
聚合函数 | 描述 | 具体介绍 |
---|---|---|
any | any(expr) - 如果 expr 中至少有一个值为真,则返回真 | https://sparkfunctions.com/any |
any_value | any_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的某个值 | https://sparkfunctions.com/any_value |
approx_count_distinct | approx_count_distinct(expr[, relativeSD]) - 通过 HyperLogLog++ 返回估计的基数 | https://sparkfunctions.com/approx_count_distinct |
approx_percentile | approx_percentile(col, percentage [, accuracy]) - 返回数值或 ANSI 间隔列 col 的近似百分位数 | https://sparkfunctions.com/approx_percentile |
array_agg | array_agg(expr) - 收集并返回一个非唯一元素的列表 | https://sparkfunctions.com/array_agg |
bit_and | bit_and(expr) - 返回所有非空输入值的按位与(AND),如果没有非空值则返回 null | https://sparkfunctions.com/bit_and |
bit_or | bit_or(expr) - 返回所有非空输入值的按位或(OR),如果没有非空值则返回 null | https://sparkfunctions.com/bit_or |
bit_xor | bit_xor(expr) - 返回所有非空输入值的按位异或(XOR),如果没有非空值则返回 null | https://sparkfunctions.com/bit_xor |
bitmap_construct_agg | bitmap_construct_agg(child) - 返回一个位图,其中设置了来自子表达式所有值的位位置。子表达式很可能是 bitmap_bit_position()。 | https://sparkfunctions.com/bitmap_construct_agg |
bitmap_or_agg | bitmap_or_agg(child) - 返回一个位图,它是子表达式中所有位图的按位或(OR)结果。输入应该是从 bitmap_construct_agg() 创建的位图 | https://sparkfunctions.com/bitmap_or_agg |
bool_and | bool_and(expr) - 如果 expr 的所有值都为真,则返回真 | https://sparkfunctions.com/bool_and |
bool_or | bool_or(expr) - 如果 expr 中至少有一个值为真,则返回真 | https://sparkfunctions.com/bool_or |
collect_list | collect_list(expr) - 收集并返回一个非唯一元素的列表 | https://sparkfunctions.com/collect_list |
collect_set | collect_set(expr) - 收集并返回一个唯一元素的集合。 | https://sparkfunctions.com/collect_set |
corr | corr(expr1, expr2) - 返回一组数字对之间的皮尔逊相关系数 | https://sparkfunctions.com/corr |
count(*) | 返回检索到的总行数,包括包含 null 的行。 | https://sparkfunctions.com/count |
count(expr[, expr...]) | 返回表达式列表中所有表达式都不为 null 的行数 | https://sparkfunctions.com/count |
count(DISTINCT expr[, expr...]) | 返回表达式列表中唯一且非 null 的行数 | https://sparkfunctions.com/count |
count_if | count_if(expr) - 返回表达式中 TRUE 值的数量 | https://sparkfunctions.com/count_if |
count_min_sketch | count_min_sketch(col, eps, confidence, seed) - 返回给定列的计数-最小草图,使用指定的误差界限(eps)、置信度(confidence)和种子(seed) | https://sparkfunctions.com/count_min_sketch |
covar_pop | covar_pop(expr1, expr2) - 返回一组数字对的总体协方差。 | https://sparkfunctions.com/covar_pop |
covar_samp | covar_samp(expr1, expr2) - 返回一组数字对的样本协方差 | https://sparkfunctions.com/covar_samp |
every | every(expr) - 如果 expr 的所有值都为真,则返回真 | https://sparkfunctions.com/every |
first | first(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值。如果 isIgnoreNull 为真,则只返回非空值 | https://sparkfunctions.com/first |
first_value | first_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值。如果 isIgnoreNull 设置为真,则只返回非空值 | https://sparkfunctions.com/first_value |
grouping | grouping(col) - 表示在 GROUP BY 中指定的列是否被聚合,返回值 1 表示已聚合,返回值 0 表示未聚合 | https://sparkfunctions.com/grouping |
grouping_id | grouping_id([col1[, col2 ..]]) - 返回分组的级别,等同于 (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn),其中 n 是 GROUP BY 子句中列的数量,grouping 函数返回的是 0 或 1,表示相应列是否被聚合 | https://sparkfunctions.com/grouping_id |
histogram_numeric | histogram_numeric(expr, nb) - 使用 nb 个箱位对数值 'expr' 进行直方图计算 | https://sparkfunctions.com/histogram_numeric |
hll_sketch_agg | hll_sketch_agg(expr, lgConfigK) - 返回 HllSketch 的可更新二进制表示 | https://sparkfunctions.com/hll_sketch_agg |
hll_union_agg | hll_union_agg(expr, allowDifferentLgConfigK) - 返回估计的独特值数量 | https://sparkfunctions.com/hll_union_agg |
kurtosis | kurtosis(expr) - 根据一组值计算并返回峰态值 | https://sparkfunctions.com/kurtosis |
last | last(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值。如果 isIgnoreNull 设置为真,则只返回非空值 | https://sparkfunctions.com/last |
last_value | last_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值。如果 isIgnoreNull 设置为真,则只返回非空值 | https://sparkfunctions.com/last_value |
max | max(expr) - 返回 expr 的最大值 | https://sparkfunctions.com/max |
max_by | max_by(x, y) - 返回与 y 的最大值相关联的 x 值 | https://sparkfunctions.com/max_by |
mean | mean(expr) - 根据一组值计算并返回平均值 | https://sparkfunctions.com/mean |
median | median(col) - 返回数值或 ANSI interval col 的中位数 | https://sparkfunctions.com/median |
min | min(expr) - 返回 expr 的最小值 | https://sparkfunctions.com/min |
min_by | min_by(x, y) - 返回与 y 的最小值相关联的 x 值 | https://sparkfunctions.com/min_by |
mode | mode(col) - 返回列 col 中出现频率最高的值。忽略 NULL 值。如果所有值都是 NULL,或者没有行,则返回 NULL | https://sparkfunctions.com/mode |
percentile(col, percentage [, frequency]) | percentile(col, percentage [, frequency]) - 返回数值列 col 或 ANSI 间隔列 col 在给定百分比的确切百分位数值 | https://sparkfunctions.com/percentile |
percentile(col, array(percentage1 [, percentage2]...) [, frequency]) | 返回数值列 col 在给定的百分比(或多个百分比)的确切百分位数值数组。百分比数组中的每个值都必须在 0.0 和 1.0 之间。frequency 的值应该是正整数。 | https://sparkfunctions.com/percentile |
percentile_approx | percentile_approx(col, percentage [, accuracy]) - 返回数值列或 ANSI 间隔列 col 的近似百分位数,这是 col 值中排序后(从最小到最大)的最小值,使得不超过 percentage 指定的比例的 col 值小于或等于该值 | https://sparkfunctions.com/percentile_approx |
regr_avgx(y, x) | regr_avgx(y, x) - 返回组中非空值对的自变量的平均值,其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_avgx |
regr_avgy(y, x) | regr_avgy(y, x) - 返回组中非空值对的因变量的平均值,其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_avgy |
regr_count(y, x) | regr_count(y, x) - 返回组中非空数值对的数量,其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_count |
regr_intercept(y, x) | regr_intercept(y, x) - 返回在组中非空值对的单变量线性回归线的截距,其中 y 是因变量,x 是自变量。 | https://sparkfunctions.com/regr_intercept |
regr_r2(y, x) | regr_r2(y, x) - 返回组中非空对的确定系数,其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_r2 |
regr_slope(y, x) | regr_slope(y, x) - 返回组中非空值对的线性回归线的斜率,其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_slope |
regr_sxx(y, x) | regr_sxx(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * VAR_POP(x),其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_sxx |
regr_sxy(y, x) | regr_sxy(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * COVAR_POP(y, x),其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_sxy |
regr_syy(y, x) | regr_syy(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * VAR_POP(y),其中 y 是因变量,x 是自变量 | https://sparkfunctions.com/regr_syy |
skewness | skewness(expr) - 返回根据一个组中的值计算出的偏度值 | https://sparkfunctions.com/skewness |
some | some(expr) - 如果expr中至少有一个值为真,则返回真 | https://sparkfunctions.com/some |
std | std(expr) - 返回根据一个组中的值计算出的样本标准差。 | https://sparkfunctions.com/std |
stddev | stddev(expr) - 返回根据一个组中的值计算出的样本标准差 | https://sparkfunctions.com/stddev |
stddev_pop | stddev_pop(expr) - 返回根据一个组中的值计算出的总体标准差 | https://sparkfunctions.com/stddev_pop |
stddev_samp(expr) | stddev_samp(expr) - 返回根据一个组中的值计算出的样本标准差 | https://sparkfunctions.com/stddev_samp |
sum | sum(expr) - 返回根据一个组中的值计算出的总和 | https://sparkfunctions.com/sum |
try_avg | try_avg(expr) - 从一组值中计算平均值,如果发生溢出,则结果为null | https://sparkfunctions.com/try_avg |
try_sum | try_sum(expr) - 从一组值中计算总和,如果发生溢出,则结果为null | https://sparkfunctions.com/try_sum |
var_pop | var_pop(expr) - 返回根据一个组中的值计算出的总体方差 | https://sparkfunctions.com/var_pop |
var_samp | var_samp(expr) - 返回根据一个组中的值计算出的样本方差 | https://sparkfunctions.com/var_samp |
variance | variance(expr) - 返回根据一个组中的值计算出的样本方差。 | https://sparkfunctions.com/variance |
[ nulls_option ] 指定在评估窗口函数时是否跳过空值。
RESPECT NULLS
表示不跳过空值IGNORE NULLS
表示跳过空值。 如果未指定,默认值为 RESPECT NULLS。 仅 LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE函数可以使用IGNORE NULLS
聚合函数开窗可以排序也可以不排序。
举例: 样例数据
--建表语句
CREATE TABLE t_employees
(
name STRING,
dept STRING,
salary BIGINT,
age BIGINT
);
--插入数据
INSERT INTO t_employees
VALUES ("Lisa", "Sales", 10000, 35),
("Evan", "Sales", 32000, 38),
("Fred", "Engineering", 21000, 28),
("Alex", "Sales", 30000, 33),
("Tom", "Engineering", 23000, 33),
("Jane", "Marketing", 29000, 28),
("Helen", "Marketing", 29000, 40),
("Jeff", "Marketing", 35000, 38),
("Paul", "Engineering", 29000, 23),
("Chloe", "Engineering", 23000, 25);
--样例数据
+--------+--------------+---------+------+
| name | dept | salary | age |
+--------+--------------+---------+------+
| Lisa | Sales | 10000 | 35 |
| Evan | Sales | 32000 | 38 |
| Fred | Engineering | 21000 | 28 |
| Alex | Sales | 30000 | 33 |
| Tom | Engineering | 23000 | 33 |
| Jane | Marketing | 29000 | 28 |
| Helen | Marketing | 29000 | 40 |
| Jeff | Marketing | 35000 | 38 |
| Paul | Engineering | 29000 | 23 |
| Chloe | Engineering | 23000 | 25 |
+--------+--------------+---------+------+
sum()函数举例: 1.计算每个员工所在部门的部门总薪水
select
name,
dept,
salary,
age,
sum(salary)over(partition by dept) as dept_total_salary
from t_employees;
--执行结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 96000 |
| Tom | Engineering | 23000 | 33 | 96000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Chloe | Engineering | 23000 | 25 | 96000 |
| Jane | Marketing | 29000 | 28 | 93000 |
| Helen | Marketing | 29000 | 40 | 93000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 72000 |
| Evan | Sales | 32000 | 38 | 72000 |
| Alex | Sales | 30000 | 33 | 72000 |
+--------+--------------+---------+------+--------------------+
注意: 这里dept_total_salayr列得出的结果接统计了部门全部员工的总薪资。
2.按照员工薪资排序,从低到高,每个部门截止到该员工的累积薪水是多少;(累积求和)
--不限定窗口框架
select
name,
dept,
salary,
age,
sum(salary)over(partition by dept order by salary asc) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 21000 |
| Tom | Engineering | 23000 | 33 | 67000 |
| Chloe | Engineering | 23000 | 25 | 67000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Jane | Marketing | 29000 | 28 | 58000 |
| Helen | Marketing | 29000 | 40 | 58000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 10000 |
| Alex | Sales | 30000 | 33 | 40000 |
| Evan | Sales | 32000 | 38 | 72000 |
+--------+--------------+---------+------+--------------------+
--使用rows限定窗口框架
select name,
dept,
salary,
age,
sum(salary)
over (partition by dept order by salary asc rows BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 21000 |
| Tom | Engineering | 23000 | 33 | 44000 |
| Chloe | Engineering | 23000 | 25 | 67000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Jane | Marketing | 29000 | 28 | 29000 |
| Helen | Marketing | 29000 | 40 | 58000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 10000 |
| Alex | Sales | 30000 | 33 | 40000 |
| Evan | Sales | 32000 | 38 | 72000 |
+--------+--------------+---------+------+--------------------+
--使用range限定窗口框架
select name,
dept,
salary,
age,
sum(salary)
over (partition by dept order by salary asc range BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 21000 |
| Tom | Engineering | 23000 | 33 | 67000 |
| Chloe | Engineering | 23000 | 25 | 67000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Jane | Marketing | 29000 | 28 | 58000 |
| Helen | Marketing | 29000 | 40 | 58000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 10000 |
| Alex | Sales | 30000 | 33 | 40000 |
| Evan | Sales | 32000 | 38 | 72000 |
+--------+--------------+---------+------+--------------------+
注意:
值
,关注Tom所在行的结果,聚合了包含Chloe的薪水;要求: 取出每个部门薪水最低的员工记录,要求每个部门仅取出一行记录
分析: 为了保证每个部门仅取出一行记录,我们使用row_number函数来进行处理,具体语句和执行结果如下:
--执行SQL
select name,
dept,
salary,
age
from (select name,
dept,
salary,
age,
row_number() over (partition by dept order by salary asc) as rn
from t_employees) t
where rn = 1
--执行结果
+-------+--------------+---------+------+
| name | dept | salary | age |
+-------+--------------+---------+------+
| Fred | Engineering | 21000 | 28 |
| Jane | Marketing | 29000 | 28 |
| Lisa | Sales | 10000 | 35 |
+-------+--------------+---------+------+
注意: 关注Marketing部门的记录,取出的是Jane的记录。Jane确实是最低的,但是同时Hellen的薪资也是一样的。虽然当前满足了需求内容,但在实际生产中,发生流程重跑,则数据内容可能发生变化,数据校验出现前后不一致,较难排查。还会影响下游使用,例如使用结果数据计算最低薪水员工的平均年龄,数据重跑之后平均年龄发生变化。所以保证排序唯一十分重要;
可以在排序时指定空值是排在最前面还是最后面,测试数据中没有空值,仅写SQL了
--样例SQL
select name,
dept,
salary,
age,
sum(salary)
over (partition by dept order by salary asc nulls first range BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;
窗口框架指定窗口从哪一行开始以及在哪里结束 语法
{ RANGE | ROWS } { frame_start | BETWEEN frame_start AND frame_end }
框架指定方式分为range方式和rows方式,如果不指定默认为range方式,
frame_start
和frame_end
可以为以下内容
UNBOUNDED PRECEDING
: 从分区的第一行开始
offset PRECEDING
:从当前行之前的第 offset
行开始
CURRENT ROW
:当前行
offset FOLLOWING
:到当前行之后的第 offset
行结束
UNBOUNDED FOLLOWING
:到分区的最后一行结束
如果未指定 frame_end
,则默认值为 CURRENT ROW