
写 Spark 的同学,应该都被数据倾斜折磨过。那种感觉就像:你把代码写的很完美,参数配得也算专业,结果一跑,Stage 卡在 99% 不动,UI 上只有一个 task 在疯狂燃烧 CPU,其余 199 个 task 都是秒跑。你想杀任务吧,又怕丢数据;不杀吧,看着那一个 task 占用着 50GB 数据在那里孤军奋战。
我第一次见到倾斜的时候还以为 Spark 出 bug 了。后来才明白:这是我太天真,不是 Spark 不行,是数据不均匀。
下面说的不是官方文档,而是我在生产环境踩过的真实坑。里面有很多“别人不会写,但你必须懂”的细节。
如果要一句话形容倾斜,那一定是:
大部分 task 都秒跑,只有少数(通常 1~3 个)task 特别慢,慢到让你怀疑人生,stage 卡在 90%-99% 不动。
更经典的几个表现:
Spark UI 上会看到一个 task 跑了 1600 秒,其他 task 只有 20 秒。这就是赤裸裸的数据倾斜。
这个我以前也没意识到,总想着优化 SQL、调参数、repartition。后来发现有些倾斜跟技术没关系,纯业务锅。
比如:
这些业务不均匀导致 key 分布畸形,Spark 只能把拥有同样 key 的数据集中到某些 task 中,这就注定倾斜。
我曾遇到一个非常典型的坑:某个订单表里 user_id = 0 的记录占了整张表的 28%。这是因为历史兼容逻辑写错了,某些老订单 user_id 没写入,业务直接给默认 0。那一次真的把我给整不会了。
Spark 倾斜的高发地带有两个:join 和 groupByKey。
尤其是:
我遇到一次特别刺激的:一个任务 join 日志表和商品表,本来数据不大,但某个商品 ID 每天有 2.3 亿条曝光记录。结果整个 job 直接卡死。
这个问题你在 SQL 层根本看不出来,只有运行时才暴露。
这个算子本身就不是很推荐用。
它会把同 key 的数据全聚集到同一个 executor,很容易爆。
我们后来一般用 reduceByKey 或者 aggregateByKey,至少能在 map 侧先聚合一部分,减少 shuffled 数据量。
我一般看三样东西:
打开 Spark UI → Stages → 某个 Stage → Tasks
如果你看到:
这肯定倾斜。
倾斜 task 的 shuffle read 是其他 task 的几十倍,比如:
这个指标特别直接。
我们监控平台里可以看到:
这就是倾斜。
我第一次用加盐的时候是从朋友口中学来的,他说,“把 key 复制成多个 key,不就可以分散了吗?”
道理其实很简单:
key=123 的所有数据被分到一个 partition123_0, 123_1, ..., 123_9示例代码:
val salted = data.map(row => {
val salt = scala.util.Random.nextInt(10)
((row.key + "_" + salt), row.value)
})用在 join 上一般需要:
例如:
val bigSalted = big.map(r => ((r.key, Random.nextInt(10)), r.value))
val smallExpanded = small.flatMap(r => (0 until 10).map(i => ((r.key, i), r.value)))
bigSalted.join(smallExpanded)这个方式特别灵,对热门 key 比如 null、0、'-1' 效果特别明显。缺点也明显:
Spark 能自动广播小表,但自动广播有个阈值,比如:
spark.sql.autoBroadcastJoinThreshold=10MB很多公司把阈值调大,比如 50MB 或 100MB,但仍会遇到“某张维表 200MB,而大表 200GB”的情况。按照默认逻辑,这是 shuffle join,会产生倾斜。
但如果你强制广播小表:
SELECT /*+ BROADCAST(dim) */ *
FROM fact f
JOIN dim d
ON f.key = d.keySpark 会把小表加载到 executor 内存,用 Hash Join,一次性解决 shuffle 问题。性能差异巨大:
我见过一个任务,从 19 分钟 → 48 秒,就是靠 broadcast。
缺点:
repartition 并不是万能的,但可以缓解某些倾斜。适用场景:
示例:
val newData = data.repartition(300, $"key")我最常用的场景是:
但 repartition 不能解决单 key 超级热门的问题,这时候必须加盐或其他方法。
我认为这是所有优化里“最优雅”的方式,因为它不破坏业务逻辑,也不会让数据量膨胀。
如果你直接写:
rdd.map(x => (x.userId, x.amount))
.reduceByKey(_ + _)reduceByKey 会自动做 map-side combine,减少 shuffle 的量。
但很多人用 SparkSQL:
SELECT user_id, SUM(amount)
FROM order
GROUP BY user_idSparkSQL 没有强制 map-side aggregate,导致单个用户数据太多时会直接倾斜。你可以让 SparkSQL 做预聚合:
spark.sql.shuffle.partitions=800
spark.sql.adaptive.enabled=true或改用 RDD,或者临时用:
SELECT user_id, SUM(amount) as amount
FROM (
SELECT user_id, amount
FROM order
WHERE dt='2025-12-01'
) t
GROUP BY user_id预聚合往往能让数据量直接下降 3~10 倍。
AQE(Adaptive Query Execution)确实很厉害,自动优化倾斜 join,比如:
但实际用下来,我觉得 AQE 有三个小问题:
有时候某个 partition 5GB,Spark 判定为 skew,但业务层面不 skew,拆分反而慢。
统计信息不准时,AQE 的判断也会错。
比如 key=null 占整张表 40%,AQE 也救不了。
所以 AQE 是锦上添花,不是救命的。
很多新人排查 Spark 时不看 UI,这是非常大的误区。
我排查倾斜任务一般这么看:
可以看到 Stage 卡在哪里,比如 shuffle 写太慢、reduce 太慢。
特别是:
如果某个 task 的 GC Time = 300s,而其他只有 5s,那就是倾斜。
我重点看:
CPU 特别高的一般就是倾斜点。
数据倾斜是业务增长带来的必然问题。我们能做的,就是认识它、接受它、优化它,从而解决在开发过程中遇到的问题。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。