
前方干货预警: 这可能是你能够找到的最清晰最系统的SQL入门和进阶教程。
程序员的日常工作避免不了处理大量数据。
处理数据的标准语言是SQL,常见的包括MySQL/HiveSQL/SparkSQL/Trino,它们的语法基本相差不大。(本教程以HiveSQL语法为主)
扎实的SQL功底可以让你在数据的海洋中畅游无阻。
下面这10个问题,可以小小地检查一下你的SQL基础是否扎实。
以上的这些问题,都可以在下面的这个HiveSQL教程中找到。enjoy!
公众号算法美食屋后台回复关键词:SQL,获取本文markdown源代码!
根据用途,SQL 语法可以分为一下几个部分。
注意:SQL 对大小写不敏感!
DDL(Data Definition Language)
关键词:
常用语句:
--创建分区表
DROP TABLE IF EXISTS basedb.sales;
CREATE TABLE IF NOT EXISTS basedb.sales
(sale_id INT,
product string COMMENT '产品名称',
amount double COMMENT '销量')
PARTITIONED BY (year INT, month INT)DML(Data Manipulatin Language)
关键词:
注意:insert插入数据到分区有两种方式, Insert overwrite 和 Insert into, 前者是覆盖分区,后者是追加数据。
常用语句:
-- 插入数据到指定分区(覆盖分区)
INSERT OVERWRITE TABLE basedb.sales PARTITION (year='2025', month='01')
SELECT id, amount, date
FROM tmpdb.raw_sales;
-- 动态插入数据到分区(追加数据)
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT INTO TABLE basedb.sales PARTITION (year, month)
SELECT id, amount, date, year, month
FROM tmpdb.raw_sales;--数据查询select语句常用模版
SELECT ...
FROM 表1 JOIN 表2
ON 等值连接
WHERE 分组前过滤条件
GROUP BY 分组字段
HAVING 分组后过滤条件
ORDER BY 排序字段DCL(Data Control Language)
关键词:
常用语句:
-- 授权用户访问表权限
GRANTSELECT, INSERT, UPDATE, DELETEONTABLE basedb.sales TOUSER user1;
-- 撤销用户访问表权限
REVOKESELECT, INSERT, UPDATE, DELETEONTABLE basedb.sales FROMUSER user1;
-- 提交事务
COMMIT;
-- 回滚事务
ROLLBACK;按颗粒度从大到小的顺序,Hive数据存储单元被组织为:
数据库是一个名称空间,作用是避免表、视图、分区、列等的命名冲突。
数据库还可用于为用户或用户组实施安全性
具有相同模式的同质数据单元。
一个数据库里可以有多张表。
每个表可以有一个或多个分区键,用于确定数据的存储方式。
分区除了作为存储单元外,还允许用户有效地识别满足指定条件的行。
分区列(Partition columns)是虚拟列,它们不是数据本身的一部分,而是在加载时派生的。
每个分区中的数据又可以基于表的某一列的散列函数的值被划分为桶。
分桶表用于需要高效连接操作、数据抽样和均匀负载的场景,特别是在大数据集和复杂查询中。
分桶和分区的区别:
--创建同时有分区和分桶的表
CREATE TABLE orders (
order_id INT,
user_id INT,
amount DOUBLE,
order_date STRING
)
PARTITIONED BY (order_month STRING) -- 按月份分区
CLUSTERED BY (user_id) INTO 10 BUCKETS -- 按用户ID分桶,共10个桶
STORED AS ORC; -- 使用ORC存储格式,提升查询性能
-- BucketMapJoin 支持大表对大表的连接,且不用shuffle.
--1,创建桶表
CREATE TABLE large_table_A (
id INT,
value STRING
)
CLUSTERED BY (id) INTO8 BUCKETS
STORED AS ORC;
CREATE TABLE large_table_B (
id INT,
description STRING
)
CLUSTERED BY (id) INTO8 BUCKETS
STORED AS ORC;
INSERTover large_table_A
--2,启用 BucketMapJoin 优化
SET hive.optimize.bucketmapjoin=true;
SET hive.optimize.bucketmapjoin.sortedmerge=true;
-- 基于桶的连接查询
SELECT a.id, a.value, b.description
FROM large_table_A a
JOIN large_table_B b
ON a.id = b.id;hive 支持常用的基础数据类型和一些复杂类型。
Hive 支持隐式转换 和 显式转换。
(1) 隐式转换
例如:
-- 隐式将 TINYINT 转为 INT
SELECT CAST(1 AS TINYINT) + 100;
-- STRING 自动转为 DOUBLE
SELECT '3.14' + 2.0;(2) 显式转换
例如:
-- 数值转字符串
SELECT CAST(123.45 AS STRING);
-- 字符串转日期
SELECT CAST('2023-01-01' AS DATE);
-- 字符串转整数
SELECT CAST('100' AS INT);注意:显式转换失败时会返回 NULL。应确保源数据符合目标类型的格式要求。
SQL语句的执行顺序指的是在执行一个完整的SQL查询时,各个子句的处理顺序。
以下是SQL SELECT语句的执行顺序:
下面是一个范例
SELECT DISTINCT column1, column2
FROM table1
JOIN table2 ON table1.id = table2.id
WHERE condition
GROUP BY column1
HAVING condition
ORDER BY column1
LIMIT 10;
执行顺序为:
可以用 explain 关键字来查看SQL语句的执行计划,以便更好地理解SQL语句的执行顺序。
查询语句可以嵌入子查询。
常见的子查询形式是使用括号包裹起来的子查询。
但是这种方式嵌套很深,可读性很差。
hive 和 mysql 等主流关系型数据库的较高版本,都支持 with as 语法。
--嵌套的子查询写法
SELECT subquery.column1, subquery.column2, table2.column3
FROM (
SELECT column1, column2
FROM table1
WHERE condition
) AS subquery
JOIN table2 ON subquery.id = table2.id
WHERE another_condition
ORDER BY subquery.column1;-- with as 子查询写法
WITH subquery_name AS (
SELECT column1, column2
FROM table1
WHERE condition
) --注意这里没有分号,with as 不是一条独立的语句,而是作为一个子查询
SELECT subquery_name.column1, subquery_name.column2, table2.column3
FROM subquery_name
JOIN table2 ON subquery_name.id = table2.id
WHERE another_condition
ORDER BY subquery_name.column1;join 是根据 关键字段 横向连接 两个表。要求两个表具有相同的关键字段。
join有多种不同的类型。
最常用的join 就是 inner join。
inner join 只返回两个表中关联字段匹配的记录。
inner join 中的 inner可以省略,直接写作join。
SELECT a.*, b.*
FROM table_a a
JOIN table_b b
ON a.id = b.id;返回左表中的所有记录,以及右表中匹配的记录。如果右表没有匹配,则返回 NULL。
SELECT a.*, b.*
FROM table_a a
LEFT OUTER JOIN table_b b
ON a.id = b.id;与左外连接相反,返回右表中的所有记录及左表中匹配的记录。
SELECT a.*, b.*
FROM table_a a
RIGHT OUTER JOIN table_b b
ON a.id = b.id;返回两个表中所有的记录,当没有匹配时以 NULL 代替。
SELECT a.*, b.*
FROM table_a a
FULL OUTER JOIN table_b b
ON a.id = b.id;Hive 中的 LEFT SEMI JOIN 用于返回左表中在右表中存在匹配记录的所有行,但只返回左表的字段。
当只关心左表是否存在关联数据时非常高效。
SELECT a.*
FROM table_a a
LEFT SEMI JOIN table_b b
ON a.id = b.id;Hive 从 2.0 版本开始,正式支持 LEFT ANTI JOIN。
LEFT ANTI JOIN 用于返回左表中在右表中没有匹配记录的数据,也就是常说的 anti join.
--存在左表中,但是不在右表中
SELECT a.*
FROM table_a a
LEFT ANTI JOIN table_b b
ON a.join_key = b.join_key;如果是较低版本的hive,可以通过 left join 后判断右表字段是否为NULL 来达到相同效果。
-- 通过 LEFT JOIN 后判断右表字段是否为 NULL 来达到 anti join 的效果
SELECT a.*
FROM table_a a
LEFT JOIN table_b b
ON a.join_key = b.join_key
WHERE b.join_key IS NULL;返回两个表的笛卡尔积,即每个表的每一行都与另一个表的每一行组合在一起。
SELECT a.*, b.*
FROM table_a a
CROSS JOIN table_b b;join是横向连接,而union是纵向拼接。
union操作需要拼接的两个表具有完全相同的字段数量和字段数据类型(可以列名不一样,结果会取第一个表的列名)。
需要注意的是,在Hive中一般只支持UNION ALL,即直接连接各个查询的结果,而不会自动去除重复的记录。
这与SQL中有些数据库默认的UNION(去重)行为略有不同。
如果希望得到不重复的数据,通常需要在UNION ALL之后使用DISTINCT或其他去重方法。
SELECT column_a FROM table1
UNION ALL
SELECT column_a FROM table2;如果你希望去除重复行,可以再在外面嵌套一层去重逻辑。
SELECT DISTINCT column_a FROM (
SELECT column_a FROM table1
UNION ALL
SELECT column_a FROM table2
) t;
在实践中,UNION ALL 这个函数常常用来手工新建一些比较小的表,例如模型/数据版本控制表。
DROP TABLE IF EXISTS base_db.predict_data_version;
CREATE TABLE IF NOTEXISTS base_db.predict_data_version AS
select'V1.1'as predict_version, 0as used_version unionall
select'V1.5'as predict_version, 0as used_version unionall
select'V2.0'as predict_version, 0as used_version unionall
select'V2.1.0'as predict_version, 0as used_version unionall
select'V2.1.5'as predict_version, 0as used_version unionall
select'V3.0.0'as predict_version, 1as used_version;
--线上应用数据数据在Hive中,GROUP BY操作用于将数据分组,并对每个分组应用聚合函数。
常用的聚合函数包括:
可以在GROUP BY子句中使用HAVING子句来过滤分组。
HAVING子句类似于WHERE子句,但它只能用于分组后的过滤。
SELECT
region,
category,
COUNT(*) AS num_sales,
SUM(sales_amount) AS total_sales
FROM sales_data
GROUPBY region, category
HAVINGCOUNT(*) >10ANDSUM(sales_amount) >8000;
GROUP BY 操作对于 数据科学/数据分析 具有至关重要的意义。
从一定意义上说,数据科学的精髓 就是 统计量对比。
没有统计量,就无法抓住关键信息,就无法避免被淹没在充满噪声的样本数据的海洋中。
没有对比,就无法感知差异,就无法用雄辩的实验数据验证我们的逻辑假设。
数据科学常常是按照这样的模式进行的:
下面这个例子是针对 一个 销量预测实验数据 构造的统计量对比范例代码。
一方面在 不同版本之间对比,另一方面也在不同日期的预测之间对比。
-- 各个版本不同日期销量预测总量对比
WITH summed_predictions AS (
select predict_version, cal_dt, sum(num_pred) as num_pred_sum
from mkt_db.sales_predict_data
where cal_dt between'2024-12-05'and'2025-01-22'
and predict_version in ('V2.0','V3.0.0','V3.1.0')
groupby predict_version, cal_dt
)
select cal_dt,
SUM(CASEWHEN predict_version ='V2.0'THEN num_pred_sum ELSE0END) AS sum_V2_0,
SUM(CASEWHEN predict_version ='V3.0.0'THEN num_pred_sum ELSE0END) AS sum_V3_0_0,
SUM(CASEWHEN predict_version ='V3.1.0'THEN num_pred_sum ELSE0END) AS sum_V3_1_0
from summed_predictions
groupby cal_dt
orderby cal_dt;分窗操作 和 分组操作 具有一些相同点,也有不同点。
相同点是 它们 都要根据一些字段将数据分块(分窗/分组),并在分块范围内应用计算函数。
不同点是 分窗操作 在分窗上应用的是 窗口函数,不会改变返回结果的行数。
而 分组操作在分组上应用的是 聚合函数, 会把每个分组变成一行,减少了返回结果的行数。
分窗操作通过 over子句来实现。
基本语法如下
<聚合/窗口函数> OVER (
[PARTITION BY <分组列>]
[ORDER BY <排序列>]
[ROWS/RANGE BETWEEN <窗口范围>]
)PARTITION BY:指定分组列,将数据分成不同的组。ORDER BY:指定排序列,对每个分组内的数据进行排序。ROWS/RANGE BETWEEN:指定窗口范围,定义窗口的大小。其中ROWS BETWEEN是根据物理行来确定窗口范围。
RANGE BETWEEN是根据排序的值的来确定窗口范围。
需要注意的是 有没有指定窗口范围 会影响后续窗口函数取值的含义。
-- 未指定 窗口范围,计算的是全窗口内的总工资(每行都返回一样的值)
SELECT
emp_id,
salary,
SUM(salary) OVER (
PARTITIONBY department
) as window_sum
FROM
employees;
-- 指定 ROWS,计算的是窗口范围内的累积工资(每行返回的值是递增的)
SELECT
emp_id,
salary,
SUM(salary) OVER (
PARTITIONBY department
ORDERBY emp_id
ROWSBETWEEN UNBOUNDED PRECEDING ANDCURRENTROW
) as cumulative_sum
FROM
employees;分窗操作的常见用途包括:
-- 1. 计算每个类别的销售排名:
SELECT
category,
sales_date,
sales_amount,
RANK() OVER (
PARTITIONBY category
ORDERBY sales_amount DESC
) AS sales_rank
FROM sales_data;
-- 2. 计算每个类别的累计销售额
SELECT
category,
sales_date,
sales_amount,
SUM(sales_amount) OVER (
PARTITIONBY category
ORDERBY sales_date
ROWSBETWEEN UNBOUNDED PRECEDING ANDCURRENTROW
) AS cumulative_sales
FROM sales_data;
-- 3. 计算每个类别的移动平均销售额(计算前面6行和当前行的均值)
-- 如果 sales_date 列具有重复值(即同一天有多条记录),参与计算的依然是7行。
RANGEBETWEEN 是基于值的范围,所以它关注的是日期范围,而 ROWSBETWEEN 是基于行的范围,所以它关注的是行数。
SELECT
category,
sales_date,
sales_amount,
AVG(sales_amount) OVER (
PARTITIONBY category
ORDERBY sales_date
ROWSBETWEEN6 PRECEDING ANDCURRENTROW
) AS rolling_avg
FROM sales_data;
-- 3. 计算每个类别的移动平均销售额(计算过去6天和当前行的均值)
-- 如果 sales_date 列具有重复值(即同一天有多条记录),参与计算的可能会超过7行。
SELECT
category,
sales_date,
sales_amount,
AVG(sales_amount) OVER (
PARTITIONBY category
ORDERBY sales_date
RANGEBETWEENINTERVAL6DAY PRECEDING ANDCURRENTROW
) AS rolling_avg
FROM
sales_data;
-- (1) ROW_NUMBER(): 返回分区中每行的唯一行号,从1开始。
-- 假设salary取值: 8w, 7w, 5w, 5w, 3w, 1w, 1w
-- row_num结果取值:1, 2, 3, 4, 5, 6, 7
SELECT
emp_id,
salary,
ROW_NUMBER() OVER (
ORDERBY salary DESC
) as row_num
FROM
employees;
-- (2) RANK(): 返回分区中每行的排名,相同值的行会有相同的排名,排名中有跳跃。
-- 假设salary取值: 8w, 7w, 5w, 5w, 3w, 1w, 1w
-- rank的 结果取值: 1, 2, 3, 3, 5, 6, 6
SELECT
emp_id,
salary,
RANK() OVER (
ORDERBY salary DESC
) as rank
FROM
employees;
-- (3) DENSE_RANK(): 返回分区中每行的排名,相同值的行会有相同的排名,但排名不跳跃。
-- 假设salary取值: 8w, 7w, 5w, 5w, 3w, 1w, 1w
-- dense_rank取值: 1, 2, 3, 3, 4, 5, 5
SELECT
emp_id,
salary,
DENSE_RANK() OVER (
ORDERBY salary DESC
) as dense_rank
FROM
employees;
-- (4) LAG(): 返回当前行前面第 N 行的值,若不存在则返回默认值。
--例子:计算销量前后值的差异
SELECT
sales_date,
sales_amount,
LAG(sales_amount, 1, 0) OVER (ORDERBY sales_date) AS previous_sales,
sales_amount -LAG(sales_amount, 1, 0) OVER (ORDERBY sales_date) AS sales_difference
FROM
sales_data;
-- (5) LEAD(): 返回当前行后面第 N 行的值,若不存在则返回默认值。
--例子:联合使用LEAD和LAG来计算滚动平均值(3日滚动)。
SELECT
sales_date,
sales_amount,
(sales_amount +
LAG(sales_amount, 1, 0) OVER (ORDERBY sales_date) +
LEAD(sales_amount, 1, 0) OVER (ORDERBY sales_date)) /3AS moving_average
FROM
sales_data;在Hive中,实现条件分支逻辑的常用方法包括
下面是这些方法的具体范例。
-- 1,IF(condition, true_value, false_value)
SELECT
order_id,
amount,
IF(amount >100, 'High', 'Low') AS amount_category
FROM orders;
-- 2,CASE WHEN condition THEN result [WHEN ...] ELSE result END
SELECT
order_id,
amount,
CASE
WHEN amount >100THEN'High'
WHEN amount >50THEN'Medium'
ELSE'Low'
ENDAS amount_category
FROM orders;
-- 3,COALESCE(value1, value2, ..., valueN)
SELECT
order_id,
COALESCE(NULL, 'Default Value', amount) AS amount_value
FROM orders;
-- 4, NVL(value, default_value)
SELECT
order_id,
NVL(amount, 0) AS amount_value
FROM orders;
hive中常用的日期函数包括以下一些。
当前日期/时间
日期提取
日期格式化
日期算术
下面是是范例代码
SELECT
CURRENT_DATE() as today,
DATE_ADD(CURRENT_DATE(), 7) as next_week,
DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss') as formatted_time,
DATEDIFF('2024-12-31', CURRENT_DATE()) as days_until_now;Hive提供了丰富的字符串处理函数,下面是一些范例。
-- 字符串长度
SELECT LENGTH('Hello') -- 返回5
-- 字符串连接
SELECT CONCAT('Hello', ' ', 'World') -- 返回'Hello World'
-- 字符串拼接(带分隔符)
SELECT CONCAT_WS(',', 'Hello', 'World') -- 返回'Hello,World'
-- 大小写转换
SELECTUPPER('hello') -- 返回'HELLO'
SELECTLOWER('HELLO') -- 返回'hello'
-- 子字符串
SELECT SUBSTR('Hello World', 1, 5) -- 返回'Hello'
-- 去除空格
SELECTTRIM(' Hello ') -- 返回'Hello'
SELECT LTRIM(' Hello ') -- 从左侧去除空格
SELECT RTRIM(' Hello ') -- 从右侧去除空格
-- 查找子串位置
SELECT INSTR('Hello World', 'World') -- 返回7
-- 字符串替换
SELECT REGEXP_REPLACE('Hello World', 'World', 'Hive') -- 返回'Hello Hive'
-- 字符串分割
SELECT SPLIT('Hello,World,Hive', ',') -- 返回数组['Hello', 'World', 'Hive']
-- 获取字符串第一个出现位置
SELECT LOCATE('o', 'Hello World') -- 返回5
-- 重复字符串
SELECT REPEAT('Hi', 3) -- 返回'HiHiHi'
-- 字符串反转
SELECT REVERSE('Hello') -- 返回'olleH'
-- 从左/右填充字符
SELECT LPAD('Hi', 5, '*') -- 返回'***Hi'
SELECT RPAD('Hi', 5, '*') -- 返回'Hi***'
hive 性能优化 的前提是 需要理解 hive 引擎(mapreduce 或 spark)的 mapreduce原理。
mapreduce思想原理:分而治之,合并结果。
mapreduce = map + shuffle + reduce。
举个例子。
-- 统计每个部门的平均工资
SELECT dept_id, AVG(salary)
FROM employees
GROUP BY dept_id;处理过程:
就像在食堂:
有一些计算是只需要 map 操作的。例如: where条件过滤, 简单的select查询, union all 合并。
还有一些计算可能会触发 shuffle 和 reduce 操作。例如: join表连接,group by表分组,order by 排序,over 表分窗。
hive性能优化的瓶颈最容易发生在shuffle过程,其次是reduce过程,map较少成为性能瓶颈
主要是shuffle过程需要先将数据按照key排序,然后将排序的数据序列化后通过网络传输按照key发送给不同的reducer,然后reducer接收到数据后还要反序列化。排序,序列化和反序列化,以及网络传输都是非常耗时的。
reduce过程可能会因为某个key太多,一个reducer要处理的数据量太大,发生数据倾斜的性能瓶颈。
hive中有一些常用的配置,可以根据需要调整它们来优化hive的性能。
-- 谓词下推优化(将过滤条件下推到靠近源表)
SET hive.optimize.ppd=true; -- 启用谓词下推
SET hive.optimize.ppd.storage=true; -- 启用存储层谓词下推
-- 启用向量化
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
-- 启动并行执行 (每个worker多线程增加并行度)
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=16; -- 线程数根据资源情况设置
-- 启动数据压缩
SET hive.exec.compress.intermediate=true; --中间数据压缩
SET mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec; --压缩格式
SET hive.exec.compress.output=true; --最终数据压缩
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec; --压缩格式
-- 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000; -- 256MB
SET hive.merge.smallfiles.avgsize=16000000; -- 16MB
-- 设置mapper处理的数据量
SET mapreduce.input.fileinputformat.split.maxsize=256000000; -- 256MB
SET mapreduce.input.fileinputformat.split.minsize=128000000; -- 128MB
--设置reducer数量
SET mapreduce.job.reduces=10; -- 根据具体任务调整
group by 可能会因为某个key对应的数据量太大,发生数据倾斜导致性能瓶颈。
可以通过以下一些方式优化。
map端预聚合,开启数据倾斜优化,以及 通过子查询打散数据。
--before(原始查询)
SELECT category_id, COUNT(*) as count
FROM sales_data
GROUPBY category_id;
--after(优化方案)
-- 方案1:开启Map端预聚合
SET hive.map.aggr=true;
SET hive.groupby.mapaggr.checkinterval=100000;
-- 方案2:开启倾斜数据优化(自动处理数据倾斜)
SET hive.groupby.skewindata=true;
-- 方案3:通过子查询打散数据(基本上等价于hive.groupby.skewindata的原理)
SELECT category_id, SUM(cnt) as count
FROM (
SELECT category_id, COUNT(*) as cnt
FROM sales_data
GROUPBY category_id, CAST(RAND()*100ASINT) -- 添加随机数打散
) t
GROUPBY category_id;join的原理从性能优化角度来说,有以下四种类型:
其中最常用的性能优化方法是把普通的ReduceJoin修改为 MapJoin 或者 SkewJoin。(最实用的优化,更改配置即可)
下面分别介绍这些Join方式的实现原理。
ReduceJoin
普通的join就是是ReduceJoin,两份数据都要shuffle,然后在reduce阶段完成join,遇到大表时大数据shuffle会比较耗时。
MapJoin:
MapJoin 过程完全在 Map 阶段完成,无需将数据发送到 Reduce 节点,彻底避免了普通Join的 Shuffle 和 Reduce 的开销。
MapJoin的 实现方式是将小表广播到所有Map任务的内存中,大表的数据按分片分配到各个Map 任务,逐行读取并与内存中的小表哈希表进行匹配实现join。
SkewJoin:
SkewJoin主要是为了自动解决数据join过程中可能出现的数据倾斜问题。
SkewJoin的实现方式是进行倾斜检测和数据分离,将倾斜key作mapjoin处理,非倾斜key普通join处理,最后合并数据。
BucketMapJoin:
BucketMapJoin主要针对两个采用相同的分桶方式分桶的表,并且连接使用的key恰好就是分桶key的情形。
BucketMapJoin无需shuffle,因为数据根据桶号已经预处理了,直接把相同桶号的两份数据放到同一个mapper上处理就可以了。
每个mapper会将两份数据中较小的那份数据做成哈希表放到内存中,然后跟较大的那份数据流式匹配完成join的过程。
此外,还有一些手动优化方法。
例如,倾斜值隔离处理 以及 加随机数打乱 等方法。
(谨慎使用,比较复杂,代码可读性差)
下面我们看join优化的一些范例代码。
-- before(原始查询)
-- 方案0:原始ReduceJoin,也就是普通Join
SELECT a.user_id, a.order_id, b.user_name
FROM orders a
JOIN users b ON a.user_id = b.user_id;
-- after(优化方案)
-- 方案1:开启MapJoin 优化(适用于小表Join大表,小表的默认阈值是25M)
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=25000000;
-- 方案2:开启 SkewJoin 优化(自动处理数据倾斜,非倾斜key常规处理,倾斜key使MapJoin)
SET hive.optimize.skewjoin=true;
SET hive.skewjoin.key=100000;
-- 方案3: 手动处理,倾斜值隔离处理 (常用于null导致的倾斜)
WITH non_null_users AS (
SELECT*
FROM orders
WHERE user_id ISNOT NULL
),
null_users AS (
SELECT*
FROM orders
WHERE user_id ISNULL
)
SELECT*
FROM (
-- 处理非NULL值
SELECT a.*, b.user_name
FROM non_null_users a
JOIN users b ON a.user_id = b.user_id
UNIONALL
-- 处理NULL值
SELECT a.*, NULLas user_name
FROM null_users a
) t;
-- 方案4:手动处理,对随机数打散(加盐)
--步骤①: 对订单表中倾斜的user_id进行打散处理
WITH orders_expanded AS (
SELECT
-- 对倾斜的user_id增加随机后缀 0-9
CASE
WHEN user_id ='hot_user'THEN CONCAT(user_id, '_', CAST(RAND()*10ASINT))
ELSE user_id
ENDAS new_user_id,
user_id as original_user_id, -- 保留原始user_id
order_id
FROM orders
),
--步骤②: 将用户表复制10份,每份对应一个随机后缀
users_expanded AS (
-- 处理热点用户数据
SELECT
CONCAT(user_id, '_', suffix) AS new_user_id,
user_id as original_user_id,
user_name
FROM users
CROSSJOIN (
SELECTCAST(id AS STRING) AS suffix
FROM (
SELECT0as id UNIONALLSELECT1UNIONALLSELECT2
UNIONALLSELECT3UNIONALLSELECT4UNIONALLSELECT5
UNIONALLSELECT6UNIONALLSELECT7UNIONALLSELECT8
UNIONALLSELECT9
) numbers
) suffixes
WHERE user_id ='hot_user'
UNIONALL
-- 处理非热点用户数据
SELECT
user_id AS new_user_id,
user_id as original_user_id,
user_name
FROM users
WHERE user_id !='hot_user'
)
--步骤③: 最终JOIN查询
SELECT
o.original_user_id AS user_id, -- 使用原始user_id
o.order_id,
u.user_name
FROM orders_expanded o
JOIN users_expanded u
ON o.new_user_id = u.new_user_id;
最后,如果两个表都使用相同的方式进行分桶,且分桶的key也是连接的key,可以使用BucketMapJoin。
-- 创建桶表
CREATE TABLE bucket_user_logs (
user_id INT,
log_time STRING,
action STRING
)
CLUSTERED BY (user_id) INTO4 BUCKETS
STORED AS ORC;
CREATE TABLE bucket_user_info (
user_id INT,
user_name STRING,
user_age INT
)
CLUSTERED BY (user_id) INTO4 BUCKETS
STORED AS ORC;
-- 启用动态分区和分桶表插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.enforce.bucketing=true;
-- 插入数据到分区分桶表
INSERT INTOTABLE bucket_user_logs
SELECT user_id, log_time, action
FROM user_logs;
INSERT INTOTABLE bucket_user_info
SELECT user_id, user_name, user_age
FROM user_info;
-- 启用 BucketMapJoin 优化
SET hive.optimize.bucketmapjoin=true;
SET hive.optimize.bucketmapjoin.sortedmerge=true;
-- 基于桶的连接查询
SELECT l.user_id, l.log_time, l.action, i.user_name, i.user_age
FROM bucket_user_logs l
JOIN bucket_user_info i
ON l.user_id = i.user_id;hive排序主要用到的有 ordered by 和 sort by.
此外,还有一些相关的用法,如 distributed by 和 clustered by.
下面是它们的对比:
排序常见的优化是取大表topK的排序优化,下面是范例代码。
-- 原始方案: 大表直接 ordered by 取 topK
SELECT sale_id, amount, product_id, sale_date
from sales
ORDERBY amount DESC -- 按销售额排序
LIMIT 100; -- 获取前100条记录
-- 优化方案: distribute by + sort by 后再 order by .
SELECT*
FROM (
SELECT sale_id, amount, product_id, sale_date
FROM sales
DISTRIBUTE BY RAND() -- 随机分布数据
SORT BY amount DESC -- 按销售额降序排序
LIMIT 100 -- 先限制前1000条记录
) t
ORDERBY amount DESC -- 最终按销售额降序排序
LIMIT 100; -- 获取前100条记录
、