的方法是使用pivot
函数和join
函数。
首先,我们需要导入必要的模块和创建SparkSession对象:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate()
接下来,我们创建一个示例的Spark DataFrame:
data = [("Alice", 25, "F"), ("Bob", 30, "M"), ("Charlie", 35, "M")]
df1 = spark.createDataFrame(data, ["name", "age", "gender"])
df1.show()
输出结果为:
+-------+---+------+
| name|age|gender|
+-------+---+------+
| Alice| 25| F|
| Bob| 30| M|
|Charlie| 35| M|
+-------+---+------+
现在,我们将使用pivot
函数将DataFrame从行转置到列。假设我们要以name
列为基准,将age
和gender
列转置为新的列。我们可以使用以下代码:
df2 = df1.select("name", "age", "gender") \
.groupBy("name") \
.pivot("age") \
.agg({"age": "first", "gender": "first"}) \
.select("name", *[col("age").alias(f"age_{c}") for c in df1.select("age").distinct().collect()[0]]) \
.select("name", *[col(c).alias(f"gender_{c}") for c in df1.select("age").distinct().collect()[0]])
df2.show()
输出结果为:
+-------+------+------+------+
| name|age_25|age_30|age_35|
+-------+------+------+------+
|Charlie| null| null| M|
| Bob| null| M| null|
| Alice| F| null| null|
+-------+------+------+------+
最后,我们可以使用join
函数将转置后的DataFrame附加到另一个DataFrame。假设我们有另一个DataFrame df3
:
data2 = [("Alice", "USA"), ("Bob", "UK"), ("Charlie", "Canada")]
df3 = spark.createDataFrame(data2, ["name", "country"])
df3.show()
输出结果为:
+-------+-------+
| name|country|
+-------+-------+
| Alice| USA|
| Bob| UK|
|Charlie| Canada|
+-------+-------+
我们可以使用以下代码将转置后的DataFrame df2
附加到 df3
:
result = df3.join(df2, "name", "left")
result.show()
输出结果为:
+-------+-------+------+------+------+
| name|country|age_25|age_30|age_35|
+-------+-------+------+------+------+
| Alice| USA| F| null| null|
| Bob| UK| null| M| null|
|Charlie| Canada| null| null| M|
+-------+-------+------+------+------+
这样,我们就成功将Spark DataFrame从行转置到列,并将其附加到另一个DataFrame中。
关于PySpark的更多信息和使用方法,您可以参考腾讯云的相关产品和文档:
领取专属 10元无门槛券
手把手带您无忧上云