本文中,云朵君将和大家一起学习如何从 PySpark DataFrame 编写 Parquet 文件并将 Parquet 文件读取到 DataFrame 并创建视图/表来执行 SQL 查询。还要学习在 SQL 的帮助下,如何对 Parquet 文件对数据进行分区和检索分区以提高性能。
Pyspark SQL 提供了将 Parquet 文件读入 DataFrame 和将 DataFrame 写入 Parquet 文件,DataFrameReader
和DataFrameWriter
对方法parquet()
分别用于读取和写入/创建 Parquet 文件。Parquet 文件与数据一起维护模式,因此它用于处理结构化文件。
下面是关于如何在 PySpark 中写入和读取 Parquet 文件的简单说明,我将在后面的部分中详细解释。
df.write.parquet("/tmp/out/people.parquet")
parDF1=spark.read.parquet("/temp/out/people.parquet")
之前,我详细讲解过,首先让我们了解一下什么是 Parquet 文件以及它相对于 CSV、JSON 等文本文件格式的优势。
Apache Parquet
文件是一种列式存储格式,适用于 Hadoop 生态系统中的任何项目,无论选择何种数据处理框架、数据模型或编程语言。
https://parquet.apache.org/
在查询列式存储时,它会非常快速地跳过不相关的数据,从而加快查询执行速度。因此,与面向行的数据库相比,聚合查询消耗的时间更少。
Parquet 能够支持高级嵌套数据结构,并支持高效的压缩选项和编码方案。
Pyspark SQL 支持读取和写入 Parquet 文件,自动捕获原始数据的模式,它还平均减少了 75% 的数据存储。Pyspark 默认在其库中支持 Parquet,因此我们不需要添加任何依赖库。
由于我们没有 Parquet 文件,我们从 DataFrame 编写 Parquet。首先,使用方法 spark.createDataFrame()
从数据列表创建一个 Pyspark DataFrame。
data =[("James ","","Smith","36636","M",3000),
("Michael ","Rose","","40288","M",4000),
("Robert ","","Williams","42114","M",4000),
("Maria ","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)]
columns=["firstname", "middlename",
"lastname", "dob",
"gender", "salary"]
df=spark.createDataFrame(data,columns)
在上面的示例中,它创建了一个 DataFrame,其中包含 firstname、middlename、lastname、dob、gender、salary
列。
现在通过调用DataFrameWriter类的parquet()
函数从PySpark DataFrame创建一个parquet文件。当将DataFrame写入parquet文件时,它会自动保留列名及其数据类型。Pyspark创建的每个分区文件都具有 .parquet
文件扩展名。
df.write.parquet("/PyDataStudio/output/people.parquet")
Pyspark 在 DataFrameReader 类中提供了一个parquet()
方法来将 Parquet 文件读入 dataframe。下面是一个将 Parquet 文件读取到 dataframe 的示例。
parDF=spark.read.parquet("/PyDataStudio/output/people.parquet")
使用 append
追加保存模式,可以将数据框追加到现有的 Parquet 文件中。如要覆盖使用 overwrite
覆盖保存模式。
df.write.mode('append') \
.parquet("/PyDataStudio/output/people.parquet")
df.write.mode('overwrite') \
.parquet("/PyDataStudio/output/people.parquet")
Pyspark Sql 提供在 Parquet 文件上创建临时视图以执行 sql 查询。在你的程序存在之前,这些视图都可用。
parqDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
现在来看看在 Parquet 文件上执行 SQL 查询。为了执行 sql 查询,我们不从 DataFrame 中创建,而是直接在 parquet 文件上创建一个临时视图或表。
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/PyDataStudio/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()
在这里,我们从 people.parquet
文件创建了一个临时视图 PERSON
。这给出了以下结果。
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Michael | Rose| |40288| M| 4000|
| James | | Smith|36636| M| 3000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
当我们对 PERSON
表执行特定查询时,它会扫描所有行并返回结果。这与传统的数据库查询执行类似。在 PySpark 中,我们可以通过使用 PySpark partitionBy()
方法对数据进行分区,以优化的方式改进查询执行。
df.write.partitionBy("gender","salary") \
.mode("overwrite") \
.parquet("/PyDataStudio/output/people2.parquet")
当检查 people2.parquet
文件时,它有两个分区 gender
和 salary
。
下面的示例解释了将分区 Parquet 文件读取到 gender=M
的 DataFrame 中。
parDF2=spark.read.parquet("/PyDataStudio/output/people2.parquet/gender=M")
parDF2.show(truncate=False)
上述示例的输出如下所示。
+---------+----------+--------+-----+------+
|firstname|middlename|lastname|dob |salary|
+---------+----------+--------+-----+------+
|Robert | |Williams|42114|4000 |
|Michael |Rose | |40288|4000 |
|James | |Smith |36636|3000 |
+---------+----------+--------+-----+------+
在这里,我在分区 Parquet 文件上创建一个表,并执行一个比没有分区的表执行得更快的查询,从而提高了性能。
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"/PyDataStudio/output/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()
上述示例的输出如下所示。
+---------+----------+--------+-----+------+
|firstname|middlename|lastname| dob|salary|
+---------+----------+--------+-----+------+
| Maria | Anne| Jones|39192| 4000|
| Jen| Mary| Brown| | -1|
+---------+----------+--------+-----+------+
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("parquetFile").getOrCreate()
data =[("James ","","Smith","36636","M",3000),
("Michael ","Rose","","40288","M",4000),
("Robert ","","Williams","42114","M",4000),
("Maria ","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)
df.write.mode("overwrite").parquet("/PyDataStudio/output/people.parquet")
parDF1=spark.read.parquet("/PyDataStudio/output/people.parquet")
parDF1.createOrReplaceTempView("parquetTable")
parDF1.printSchema()
parDF1.show(truncate=False)
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show(truncate=False)
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/PyDataStudio/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()
df.write.partitionBy("gender","salary").mode("overwrite").parquet("/PyDataStudio/output/people2.parquet")
parDF2=spark.read.parquet("/PyDataStudio/output/people2.parquet/gender=M")
parDF2.show(truncate=False)
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"/PyDataStudio/output/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()