有两种数据格式。One - df_table_a是通过直接从Hive表读取创建的。另一个df_table_a_slow是通过在df_table_a之上应用UDF转换来创建的。
df_table_a = spark.sql('SELECT INT_COL, DATE_WEEK FROM table_a')
shift_date_udf = F.udf(lambda day: day - pd.offsets.Week(3, weekday=0), DateType())
df_table_a_slow = df_fast.withColumn('DATE_WEEK', shift_date_udf('DATE_WEEK'))然后是df_table_b dataframe,它也是通过直接从Hive表读取来创建的。
df_table_b = spark.sql('SELECT INT_COL, DATE_WEEK, OTHER_COLUMN FROM table_b')现在,我们将df_table_b加入到上面定义的两个数据框架中。
df_fast_join = df_table_a.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')
df_slow_join = df_table_a_slow.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')我想对这两个联接的执行时间进行计时,所以这里有一个函数来近似转换时间:
def time_transformation(df, subject):
start = time.time()
cnt = df.count()
end = time.time()
print(f'>>> {subject} - {cnt}', end - start)结果:
time_transformation(df_fast_join, 'fast join')
>> fast join - 75739267 37.43
time_transformation(df_slow_join, 'slow join')
>> slow join - 75739267 553.32UDF转换本身似乎不需要太多时间:
time_transformation(df_slow, 'df_slow')
>> df_slow - 75739267 0.25两个联接的执行计划有一行不同:
+- BatchEvalPython [<lambda>(DATE_WEEK#1)], [INT_COL#0, DATE_WEEK#1, pythonUDF0#843]问题:为什么将UDF应用于其中一个数据流会使连接速度降低10倍以上?怎么能修好呢?
发布于 2019-08-02 12:45:12
Python是如何工作的?
在执行器中处理行时,将其序列化,然后发送到Python解释器。然后在Python解释器中反序列化行,并在其上应用UDF。此UDF将对行进行更改。现在,Row将再次被序列化,并被发送回executor JVM,后者对其进行反序列化并进行进一步的工作。因此,这个额外的显着地减缓了每一行的处理速度,而,所以python是非常慢的。
如何加快速度?
如果您可以使用一些SQL操作符或Scala实现同样的目标,这将运行得更快。
发布于 2019-08-02 10:42:07
UDF函数速度慢。特别是当您使用python时: dataframes本质上是JVM对象,python是一次应用一行。每次应用这些代码时,代码都会被序列化。看看这里。从链接中可以读到:
这些用户定义的函数在时间上只运行一行,因此需要承受较高的序列化和调用开销.因此,许多数据管道在Java和Scala中定义UDF,然后从Python调用它们。
https://stackoverflow.com/questions/57324714
复制相似问题