,可以通过以下步骤来实现:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-01-01'), (2, '2022-02-01')], ['id', 'date'])
df2 = spark.createDataFrame([(3, '2022-03-01'), (4, '2022-04-01')], ['id', 'date'])
union_df = df1.union(df2)
latest_date_df = union_df.groupBy('id').agg({'date': 'max'}).withColumnRenamed('max(date)', 'latest_date')
final_df = union_df.join(latest_date_df, on='id', how='left')
final_df = final_df.withColumn('is_latest_date', when(col('date') == col('latest_date'), 'Yes').otherwise('No'))
完整代码示例如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-01-01'), (2, '2022-02-01')], ['id', 'date'])
df2 = spark.createDataFrame([(3, '2022-03-01'), (4, '2022-04-01')], ['id', 'date'])
union_df = df1.union(df2)
latest_date_df = union_df.groupBy('id').agg({'date': 'max'}).withColumnRenamed('max(date)', 'latest_date')
final_df = union_df.join(latest_date_df, on='id', how='left')
final_df = final_df.withColumn('is_latest_date', when(col('date') == col('latest_date'), 'Yes').otherwise('No'))
final_df.show()
这段代码的功能是将两个Spark数据帧(df1和df2)合并为一个数据帧,然后为每个id找到最新的日期,并在原始数据帧中添加一个新列,用于标识是否是最新日期。结果将打印出来。
腾讯云相关产品和产品介绍链接地址可以参考腾讯云官方文档和网站,以获取最新的信息。
领取专属 10元无门槛券
手把手带您无忧上云