我们有一个非常大的,我们需要在它上执行一个groupBy操作。
我们试过了
df_gp=df.groupBy('some_column').count()
它花了很长时间(它已经运行了超过17小时,但没有结果)。
我也试过
df_gp=df.groupBy('some_column').agg(count)
但据我所知,这种行为是一样的。
关于更多情况:
我们正在Zeppelin上运行此操作(Version0.8.0),使用%Spark2。interpreter
查询Hive创建的)。
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
req=""" SELECT *
FROM table
where isodate='2020-07-27'
"""
df = hive.executeQuery(req)
count()
或cache()
在分钟以下)。我在不同的来源上读过关于Spark的groupBy
的文章,但是根据我收集到的here,Dataframes不需要在内存中加载或洗牌键,所以即使在大型数据仓库中,它也不会是一个问题。
我知道,在这么大的数据量上使用groupBy
可能需要一些时间,但这实在是太过分了。我想有些内存参数可能需要调优,或者我们执行groupBy操作的方式有问题吗?
编辑--我忘了--在groupBy
之前,在Dataframe上有一些UDF正在处理。我试过:
在大型Dataframe上的
groupBy
,没有UDFs :给出的结果在处理的数据same的样本上小于minutegroupBy
:与以前的相同
所以我们认为UDF是问题的真正原因,而不是groupBy
发布于 2020-08-27 05:10:04
几个神话先爆发
.groupBy('some_column').count()
和.groupBy('some_column').count()
是相同的groupBy
引起洗牌,这篇文章的意思是它只对必要的列数据进行洗牌(没有在groupBy或agg函数中使用的额外列)。我在不同的来源上读过关于Spark的groupBy的文章,但是从我在这里收集的资料来看,Dataframes不需要在内存中加载或洗牌键,所以即使在大型Dataframes上,它也不会成为一个问题。
现在来谈谈你的问题
如果更多的数据被重新洗牌,spark.sql.shuffle.partitions
设置为低(200个默认值),那么spark.sql.shuffle.partitions
可能需要时间。在这种情况下,1核心将有大量的杂乱数据来聚合
groupBy
中使用的列有数据倾斜,也可能需要很长时间,因为它将导致大量数据转到单个执行器核心。
解决方案
aggregate
spark.sql.shuffle.partitions
提高到一个更高的值(在我的经验中,应该在<amount_of_data_shuffled_in_gb>/100MB
附近,以确保一个核心获得大约100 MB的数据到,可以通过在数据(盐析) https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da中引入随机性来解决)。发布于 2020-08-27 00:24:42
由于底层的Hive查询,而不是由于groupBy
操作,它可能运行缓慢。正如您可能知道的那样,星星之火可以进行懒惰的评估,因此延迟可能来自上述任何一个。测试它的一种方法是对数据文件进行cache()
或在对其执行groupBy之前调用一个简单的groupBy。如果您看到同样的问题,这是因为蜂巢查询的执行,解决方案将在那里看起来不同。您还可以尝试从文件中读取数据,并查看在执行groupBy时是否注意到相同的执行时间。
https://stackoverflow.com/questions/63611463
复制