在PySpark中,可以使用groupBy
和聚合函数来对DataFrame进行分组和聚合操作。然后,我们可以使用withColumn
函数来添加一个新的列,该列的值可以基于已有的列进行计算得出。
以下是在PySpark DataFrame中使用GroupBy和聚合之后传递第三方列的步骤:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [("Alice", 25, "NYC"), ("Bob", 30, "LA"), ("Alice", 35, "LA"), ("Bob", 20, "NYC")]
df = spark.createDataFrame(data, ["Name", "Age", "City"])
df.show()
输出结果:
+-----+---+----+
| Name|Age|City|
+-----+---+----+
|Alice| 25| NYC|
| Bob| 30| LA|
|Alice| 35| LA|
| Bob| 20| NYC|
+-----+---+----+
result = df.groupBy("Name").agg({"Age": "avg"})
result.show()
输出结果:
+-----+--------+
| Name|avg(Age)|
+-----+--------+
| Bob| 25.0|
|Alice| 30.0|
+-----+--------+
withColumn
函数添加一个新的列,该列的值可以基于已有的列进行计算得出。例如,我们添加一个名为"City"的新列,该列的值为每个姓名分组中年龄最大的城市:from pyspark.sql.functions import col, max
result_with_city = result.withColumn("City", col("Name"))
df_max_age = df.groupBy("Name").agg(max("Age").alias("max_age"))
result_with_city = result_with_city.join(df_max_age, "Name")
result_with_city.show()
输出结果:
+-----+--------+----+
| Name|avg(Age)|City|
+-----+--------+----+
| Bob| 25.0| NYC|
|Alice| 30.0| NYC|
+-----+--------+----+
在这个例子中,我们根据姓名对年龄进行了平均聚合,并将结果与原始DataFrame的最大年龄和城市信息进行关联。通过添加City
列,我们将最大年龄对应的城市传递到了最终结果中。
腾讯云相关产品和产品介绍链接地址:
领取专属 10元无门槛券
手把手带您无忧上云