Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink与Spark读写parquet文件全解析

Flink与Spark读写parquet文件全解析

作者头像
从大数据到人工智能
发布于 2022-01-27 13:20:21
发布于 2022-01-27 13:20:21
6.3K20
代码可运行
举报
文章被收录于专栏:大数据-BigData大数据-BigData
运行总次数:0
代码可运行

Parquet介绍

Parquet 是一种开源文件格式,用于处理扁平列式存储数据格式,可供 Hadoop 生态系统中的任何项目使用。 Parquet 可以很好地处理大量复杂数据。它以其高性能的数据压缩和处理各种编码类型的能力而闻名。与基于行的文件(如 CSV 或 TSV 文件)相比,Apache Parquet 旨在实现高效且高性能的平面列式数据存储格式。

Parquet 使用记录粉碎和组装算法,该算法优于嵌套命名空间的简单展平。 Parquet 经过优化,可以批量处理复杂数据,并具有不同的方式来实现高效的数据压缩和编码类型。 这种方法最适合那些需要从大表中读取某些列的查询。 Parquet 只需读取所需的列,因此大大减少了 IO。 Parquet 的一些好处包括:

  • 与 CSV 等基于行的文件相比,Apache Parquet 等列式存储旨在提高效率。查询时,列式存储可以非常快速地跳过不相关的数据。因此,与面向行的数据库相比,聚合查询耗时更少。这种存储方式已转化为节省硬件并最大限度地减少访问数据的延迟。
  • Apache Parquet 是从头开始构建的。因此它能够支持高级嵌套数据结构。 Parquet 数据文件的布局针对处理大量数据的查询进行了优化,每个文件在千兆字节范围内。
  • Parquet 旨在支持灵活的压缩选项和高效的编码方案。由于每一列的数据类型非常相似,每一列的压缩很简单(这使得查询更快)。可以使用几种可用的编解码器之一来压缩数据;因此,可以对不同的数据文件进行不同的压缩。
  • Apache Parquet 最适用于交互式和无服务器技术,如 AWS Athena、Amazon Redshift Spectrum、Google BigQuery 和 Google Dataproc。

Parquet 和 CSV 的区别

CSV 是一种简单且广泛使用的格式,被 Excel、Google 表格等许多工具使用,许多其他工具都可以生成 CSV 文件。 即使 CSV 文件是数据处理管道的默认格式,它也有一些缺点:

  • Amazon Athena 和 Spectrum 将根据每次查询扫描的数据量收费。
  • 谷歌和亚马逊将根据存储在 GS/S3 上的数据量向您收费。
  • Google Dataproc 收费是基于时间的。

Parquet 帮助其用户将大型数据集的存储需求减少了至少三分之一,此外,它还大大缩短了扫描和反序列化时间,从而降低了总体成本。

Spark读写parquet文件

Spark SQL 支持读取和写入 Parquet 文件,自动捕获原始数据的模式,它还平均减少了 75% 的数据存储。 Spark 默认在其库中支持 Parquet,因此我们不需要添加任何依赖库。下面展示如何通过spark读写parquet文件。

本文使用spark版本为3.0.3,运行如下命令进入本地模式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/spark-shell

数据写入

首先通过Seq创建DataFrame,列名为“firstname”, “middlename”, “lastname”, “dob”, “gender”, “salary”

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val data = Seq(("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1))

val columns = Seq("firstname","middlename","lastname","dob","gender","salary")

import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)

使用 DataFrameWriter 类的 parquet() 函数,我们可以将 Spark DataFrame 写入 Parquet 文件。在此示例中,我们将 DataFrame 写入“people.parquet”文件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.write.parquet("/tmp/output/people.parquet")

查看文件

数据读取

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val parqDF = spark.read.parquet("/tmp/output/people.parquet")
parqDF.createOrReplaceTempView("ParquetTable")
spark.sql("select * from ParquetTable where salary >= 4000").explain()
val parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show()

写入分区数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.write.partitionBy("gender","salary").parquet("/tmp/output/people2.parquet")
val parqDF2 = spark.read.parquet("/tmp/output/people2.parquet")
parqDF2.createOrReplaceTempView("ParquetTable2")
val df3 = spark.sql("select * from ParquetTable2  where gender='M' and salary >= 4000")
df3.explain()
df3.printSchema()
df3.show()
val parqDF3 = spark.read.parquet("/tmp/output/people2.parquet/gender=M")
parqDF3.show()

得到如下结果

Flink读写parquet文件

默认情况下,Flink包中未包含parquet相关jar包,所以需要针对特定版本下载flink-parquet文件。本文以flink-1.13.3为例,将文件下载到flink的lib目录下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
cd lib/
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet_2.12/1.13.3/flink-sql-parquet_2.12-1.13.3.jar

在完成下述测试之前,在本地启一个flink standalone集群环境。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/start-cluster.sh

执行如下命令进入Flink SQL Client

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/sql-client.sh

读取spark写入的parquet文件

在上一节中,我们通过spark写入了people数据到parquet文件中,现在我们在flink中创建table读取刚刚我们在spark中写入的parquet文件数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
create table people (
  firstname string,
  middlename string,
  lastname string,
  dob string,
  gender string,
  salary int
) with (
  'connector' = 'filesystem',
 'path' = '/tmp/output/people.parquet',
 'format' = 'parquet'
)

select * from people;

得到如下结果:

使用Flink写入数据到parquet文件

然后使用flink,往刚刚创建的table再写入数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
insert into people values('Tom', 'Mary', 'Ken', '21334', 'F', 5000);

在Flink UI查看执行结果

再次查询数据

可以查到我们刚刚新插入的数据。

参考文献:

  1. https://databricks.com/glossary/what-is-parquet
  2. https://sparkbyexamples.com/spark/spark-read-write-dataframe-parquet-example/
  3. https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/parquet/

本文为从大数据人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/1940061

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
2 条评论
热度
最新
写的还可以
写的还可以
回复回复点赞举报
路过看看 ~~
路过看看 ~~
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
​PySpark 读写 Parquet 文件到 DataFrame
本文中,云朵君将和大家一起学习如何从 PySpark DataFrame 编写 Parquet 文件并将 Parquet 文件读取到 DataFrame 并创建视图/表来执行 SQL 查询。还要学习在 SQL 的帮助下,如何对 Parquet 文件对数据进行分区和检索分区以提高性能。
数据STUDIO
2023/09/04
1.5K0
​PySpark 读写 Parquet 文件到 DataFrame
Spark SQL从入门到精通
熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);
Spark学习技巧
2019/05/09
1.2K0
Spark SQL从入门到精通
Spark SQL | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。
美图数据技术团队
2019/04/19
2.1K0
Spark SQL | Spark,从入门到精通
Spark SQL的Parquet那些事儿.docx
Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。
Spark学习技巧
2019/05/14
1.2K0
sparkSQL实例_flink sql
1)input:json日志 2)ETL:根据IP解析出 省份,城市 3)stat: 地区分布指标计算, 满足条件的才算,满足条件的赋值为1,不满足的赋值为0 (如下图) 将统计结果写入MySQL中。 (就比如说这个广告请求要满足 requestmode=1 和 processnode =3 这两个条件)
全栈程序员站长
2022/11/17
8080
sparkSQL实例_flink sql
大数据技术Spark学习
Spark SQL 是 Spark 用来处理结构化数据的一个模块,它提供了一个编程抽象叫做 DataFrame,并且作为分布式 SQL 查询引擎的作用。 我们已经学习了 Hive,它是将 Hive SQL 转换成 MapReduce 然后提交到集群上执行,大大简化了编写 MapReduce 的程序的复杂性,由于 MapReduce 这种计算模型执行效率比较慢。所以 Spark SQL 的应运而生,它是将 Spark SQL 转换成 RDD,然后提交到集群执行,执行效率非常快!
黑泽君
2019/05/10
5.5K0
大数据技术Spark学习
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
​ Spark 框架从最初的数据结构RDD、到SparkSQL中针对结构化数据封装的数据结构DataFrame, 最终使用Dataset数据集进行封装,发展流程如下。
Maynor
2021/12/07
4.2K0
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
Spark SQL,DataFrame以及 Datasets 编程指南 - For 2.0
Spark SQL 是 Spark 用来处理结构化数据的一个模块。与基础的 Spark RDD API 不同,Spark SQL 提供了更多数据与要执行的计算的信息。在其实现中,会使用这些额外信息进行优化。可以使用 SQL 语句和 Dataset API 来与 Spark SQL 模块交互。无论你使用哪种语言或 API 来执行计算,都会使用相同的引擎。这让你可以选择你熟悉的语言(现支持 Scala、Java、R、Python)以及在不同场景下选择不同的方式来进行计算。
codingforfun
2018/08/24
4.3K0
【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户
其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。
大数据小禅
2023/03/30
7080
【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户
Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN
本文介绍了基于Spark的SQL编程的常用概念和技术。首先介绍了Spark的基本概念和架构,然后详细讲解了Spark的数据类型和SQL函数,最后列举了一些Spark在实际应用中的例子。
片刻
2018/01/05
26.7K0
Spark SQL
Hive是一个基于Hadoop 的数据仓库工具,提供了类似于关系数据库SQL的查询语言HiveQL,用户可以通过HiveQL语句快速实现简单的MapReduce统计,Hive 自身可以自动将HiveQL语句快速转换成MapReduce 任务进行运行。当用户向Hive输入一段命令或查询(即HiveQL 语句)时, Hive需要与Hadoop交互来完成该操作。该命令或查询首先进入到驱动模块,由驱动模块中的编译器进行解析编译,并由优化器对该操作进行优化计算,然后交给执行器去执行,执行器通常的任务是启动一个或多个MapReduce任务。如图所示描述了用户提交一段SQL查询后,Hive把sQL 语句转化成MapReduce任务进行执行的详细过程。
Francek Chen
2025/01/22
3620
Spark SQL
2021年大数据Spark(三十二):SparkSQL的External DataSource
在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:
Lansonli
2021/10/09
2.5K0
spark2 sql读取数据源编程学习样例1
问题导读 1.dataframe如何保存格式为parquet的文件? 2.在读取csv文件中,如何设置第一行为字段名? 3.dataframe保存为表如何指定buckete数目? 作为一个开发人员
用户1410343
2018/03/26
1.7K0
spark2 sql读取数据源编程学习样例1
SparkSQL快速入门系列(6)
上一篇《SparkCore快速入门系列(5)》,下面给大家更新一篇SparkSQL入门级的讲解。
刘浩的BigDataPath
2021/04/13
2.5K0
SparkSQL快速入门系列(6)
实战|使用Spark Streaming写入Hudi
传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
ApacheHudi
2021/04/13
2.3K0
一段有用的代码 | Flink读写parquet文件
Flink读parquet import org.apache.flink.core.fs.Path import org.apache.flink.formats.parquet.ParquetRowInputFormat import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.ty
大数据真好玩
2021/09/18
2.8K0
0639-6.1.1-Spark读取由Impala创建的Parquet文件异常分析
2.通过CDH提供的parquet tool进行分析,参考《0631-6.2-如何确认一个Parquet文件是否被压缩》。
Fayson
2019/05/24
1.8K0
第三天:SparkSQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
sowhat1412
2020/11/05
13.6K0
第三天:SparkSQL
Spark2.x学习笔记:14、Spark SQL程序设计
程裕强
2018/01/02
5.2K0
Spark2.x学习笔记:14、Spark SQL程序设计
Spark SQL 快速入门系列(7) | SparkSQL如何实现与多数据源交互
  Spark SQL 的DataFrame接口支持操作多种数据源. 一个 DataFrame类型的对象可以像 RDD 那样操作(比如各种转换), 也可以用来创建临时表.
不温卜火
2020/10/28
1.4K0
Spark SQL 快速入门系列(7) |  SparkSQL如何实现与多数据源交互
相关推荐
​PySpark 读写 Parquet 文件到 DataFrame
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验