首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Spark 任务为什么会倾斜?数据倾斜产生原因与三种解决方案

Spark 任务为什么会倾斜?数据倾斜产生原因与三种解决方案

原创
作者头像
用魔法才能打败魔法
发布2025-12-05 17:32:28
发布2025-12-05 17:32:28
1380
举报

前言

写 Spark 的同学,应该都被数据倾斜折磨过。那种感觉就像:你把代码写的很完美,参数配得也算专业,结果一跑,Stage 卡在 99% 不动,UI 上只有一个 task 在疯狂燃烧 CPU,其余 199 个 task 都是秒跑。你想杀任务吧,又怕丢数据;不杀吧,看着那一个 task 占用着 50GB 数据在那里孤军奋战。

我第一次见到倾斜的时候还以为 Spark 出 bug 了。后来才明白:这是我太天真,不是 Spark 不行,是数据不均匀。

下面说的不是官方文档,而是我在生产环境踩过的真实坑。里面有很多“别人不会写,但你必须懂”的细节。


数据倾斜

如果要一句话形容倾斜,那一定是:

大部分 task 都秒跑,只有少数(通常 1~3 个)task 特别慢,慢到让你怀疑人生,stage 卡在 90%-99% 不动。

更经典的几个表现:

  • 某个 task 处理的数据量是其他 task 的 几十倍、几百倍
  • Shuffle read size 某一个 executor 爆了,比如别人 30MB,它一个 18GB。
  • Executor 内存疯狂上涨,甚至 OOM。
  • CPU usage:某台机器 400%,其他机器 15%。
  • GC 次数:某个 executor 每分钟 GC 200 次,其他十几个 executor 没啥压力。

Spark UI 上会看到一个 task 跑了 1600 秒,其他 task 只有 20 秒。这就是赤裸裸的数据倾斜。

2. 为什么业务会产生倾斜

这个我以前也没意识到,总想着优化 SQL、调参数、repartition。后来发现有些倾斜跟技术没关系,纯业务锅。

比如:

  • 某电商平台 80% 的订单在几个“爆品”上。
  • 某直播平台,某位主播几乎占了全平台 35% 的互动数据。
  • 某社交数据,userType='0' 的占比是其他类型的几十倍。
  • 某活动表,活动 ID 有个默认值 0,写入出现异常时全部落到 0。

这些业务不均匀导致 key 分布畸形,Spark 只能把拥有同样 key 的数据集中到某些 task 中,这就注定倾斜。

我曾遇到一个非常典型的坑:某个订单表里 user_id = 0 的记录占了整张表的 28%。这是因为历史兼容逻辑写错了,某些老订单 user_id 没写入,业务直接给默认 0。那一次真的把我给整不会了。

3. 哪些算子最容易倾斜

Spark 倾斜的高发地带有两个:join 和 groupByKey。

① join

尤其是:

  • 大表 join 大表
  • join key 分布极度不均匀
  • 某一边有大量某个热门 key(例如 0、null、默认值)

我遇到一次特别刺激的:一个任务 join 日志表和商品表,本来数据不大,但某个商品 ID 每天有 2.3 亿条曝光记录。结果整个 job 直接卡死。

这个问题你在 SQL 层根本看不出来,只有运行时才暴露。

② groupByKey

这个算子本身就不是很推荐用。

它会把同 key 的数据全聚集到同一个 executor,很容易爆。

我们后来一般用 reduceByKey 或者 aggregateByKey,至少能在 map 侧先聚合一部分,减少 shuffled 数据量。

4. 怎么确认已经发生倾斜

我一般看三样东西:

① stage 的 task time 分布

打开 Spark UI → Stages → 某个 Stage → Tasks

如果你看到:

  • 200 个 task 中 197 个都跑几十秒
  • 但有 3 个 task 跑了二十分钟

这肯定倾斜。

② shuffle read size

倾斜 task 的 shuffle read 是其他 task 的几十倍,比如:

  • 大部分 task read 30MB
  • 倾斜 task read 18GB

这个指标特别直接。

③ executor 的 CPU 和 GC

我们监控平台里可以看到:

  • 某个 executor CPU 300%
  • GC 每秒 8 次
  • 其他 executor CPU 10%

这就是倾斜。

5. 方案 1:加盐(salting)为什么有用?

我第一次用加盐的时候是从朋友口中学来的,他说,“把 key 复制成多个 key,不就可以分散了吗?”

道理其实很简单:

  • 原来 key=123 的所有数据被分到一个 partition
  • 加盐后变成 123_0, 123_1, ..., 123_9
  • 数据被随机分到多个 partition

示例代码:

代码语言:scala
复制
val salted = data.map(row => {
    val salt = scala.util.Random.nextInt(10)
    ((row.key + "_" + salt), row.value)
})

用在 join 上一般需要:

  1. 大表加盐(扩容 key)
  2. 小表扩展(复制 key)

例如:

代码语言:scala
复制
val bigSalted = big.map(r => ((r.key, Random.nextInt(10)), r.value))
val smallExpanded = small.flatMap(r => (0 until 10).map(i => ((r.key, i), r.value)))
bigSalted.join(smallExpanded)

这个方式特别灵,对热门 key 比如 null、0、'-1' 效果特别明显。缺点也明显:

  • 数据量变大
  • 代码看起来很脏
  • salt 数量不好调,少了不够,多了浪费

6. 方案 2:广播 join(Broadcast Hash Join)特别适合大表 join 小表

Spark 能自动广播小表,但自动广播有个阈值,比如:

代码语言:txt
复制
spark.sql.autoBroadcastJoinThreshold=10MB

很多公司把阈值调大,比如 50MB 或 100MB,但仍会遇到“某张维表 200MB,而大表 200GB”的情况。按照默认逻辑,这是 shuffle join,会产生倾斜。

但如果你强制广播小表:

代码语言:sql
复制
SELECT /*+ BROADCAST(dim) */ *
FROM fact f
JOIN dim d
ON f.key = d.key

Spark 会把小表加载到 executor 内存,用 Hash Join,一次性解决 shuffle 问题。性能差异巨大:

  • shuffle join:可能几十 GB shuffle
  • broadcast join:直接 0 shuffle

我见过一个任务,从 19 分钟 → 48 秒,就是靠 broadcast。

缺点:

  • 小表太大不行,内存扛不住
  • 小表必须能完整放进 executor

7. 方案 3:repartition

repartition 并不是万能的,但可以缓解某些倾斜。适用场景:

  • 数据本身不是极度倾斜,只是分区不合理
  • join 前两边的分区方式不一致
  • groupByKey 前手动调整分区让数据更均匀

示例:

代码语言:scala
复制
val newData = data.repartition(300, $"key")

我最常用的场景是:

  • join 双方 repartition 到同一个 key
  • group 之前 repartition 下,让 Spark 分区打散

但 repartition 不能解决单 key 超级热门的问题,这时候必须加盐或其他方法。


8. 数据预聚合

我认为这是所有优化里“最优雅”的方式,因为它不破坏业务逻辑,也不会让数据量膨胀。

案例:订单金额汇总

如果你直接写:

代码语言:scala
复制
rdd.map(x => (x.userId, x.amount))
   .reduceByKey(_ + _)

reduceByKey 会自动做 map-side combine,减少 shuffle 的量。

但很多人用 SparkSQL:

代码语言:sql
复制
SELECT user_id, SUM(amount)
FROM order
GROUP BY user_id

SparkSQL 没有强制 map-side aggregate,导致单个用户数据太多时会直接倾斜。你可以让 SparkSQL 做预聚合:

代码语言:txt
复制
spark.sql.shuffle.partitions=800
spark.sql.adaptive.enabled=true

或改用 RDD,或者临时用:

代码语言:sql
复制
SELECT user_id, SUM(amount) as amount
FROM (
    SELECT user_id, amount
    FROM order
    WHERE dt='2025-12-01'
) t
GROUP BY user_id

预聚合往往能让数据量直接下降 3~10 倍。

9. Spark SQL 自动优化

AQE(Adaptive Query Execution)确实很厉害,自动优化倾斜 join,比如:

  • 自动切换 broadcast
  • 自动合并小文件
  • 自动拆分 skew partition

但实际用下来,我觉得 AQE 有三个小问题:

小问题 1:阈值不好调

有时候某个 partition 5GB,Spark 判定为 skew,但业务层面不 skew,拆分反而慢。

小问题 2:AQE 依赖 stage 统计信息

统计信息不准时,AQE 的判断也会错。

小问题 3:极端热点 key

比如 key=null 占整张表 40%,AQE 也救不了。

所以 AQE 是锦上添花,不是救命的。

10. 监控 Spark 任务

很多新人排查 Spark 时不看 UI,这是非常大的误区。

我排查倾斜任务一般这么看:

看 Stage Timelines

可以看到 Stage 卡在哪里,比如 shuffle 写太慢、reduce 太慢。

看 Task Summary

特别是:

  • Duration
  • Shuffle Read
  • Shuffle Write
  • GC Time
  • Input Size

如果某个 task 的 GC Time = 300s,而其他只有 5s,那就是倾斜。

看 Executors 页面

我重点看:

  • executor CPU 使用率
  • Heap Memory
  • GC 次数
  • Task 运行数量

CPU 特别高的一般就是倾斜点。

结语

数据倾斜是业务增长带来的必然问题。我们能做的,就是认识它、接受它、优化它,从而解决在开发过程中遇到的问题。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 数据倾斜
  • 2. 为什么业务会产生倾斜
  • 3. 哪些算子最容易倾斜
    • ① join
    • ② groupByKey
  • 4. 怎么确认已经发生倾斜
    • ① stage 的 task time 分布
    • ② shuffle read size
    • ③ executor 的 CPU 和 GC
  • 5. 方案 1:加盐(salting)为什么有用?
  • 6. 方案 2:广播 join(Broadcast Hash Join)特别适合大表 join 小表
  • 7. 方案 3:repartition
  • 8. 数据预聚合
    • 案例:订单金额汇总
  • 9. Spark SQL 自动优化
    • 小问题 1:阈值不好调
    • 小问题 2:AQE 依赖 stage 统计信息
    • 小问题 3:极端热点 key
  • 10. 监控 Spark 任务
    • 看 Stage Timelines
    • 看 Task Summary
    • 看 Executors 页面
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档