首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将多个JSON文件合并为单个JSON和parquet文件

将多个JSON文件合并为单个JSON和parquet文件
EN

Stack Overflow用户
提问于 2020-05-01 16:44:02
回答 2查看 2.9K关注 0票数 2

带有10S JSON的源S3定位

  1. 所有JSON文件都需要合并为单个JSON文件。即非part-0000...文件
  2. 输出的单个JSON文件需要替换源S3位置上的所有这些文件
  3. 相同的JSON文件需要转换Parquet并保存到其他S3位置

除了下面有没有最好的选择,

  1. 将JSON文件加载到Dataframe中
  2. 将其保存在本地磁盘上
  3. 将合并的JSON文件上载到S3
  4. 使用AWS客户端API成功上传组合S3文件后,清除S3文件的其余部分
  5. 此操作与4并行运行。通过dataframes API将拼花文件保存到S3位置。

我对上面的设计有以下的疑问

  • 还有什么更有力的方法吗?
  • 我可以读和写到相同的S3位置和跳过第二步。
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-05 14:43:06

代码语言:javascript
运行
复制
spark.read
                  .json(sourcePath)
                  .coalesce(1)
                  .write
                  .mode(SaveMode.Overwrite)
                  .json(tempTarget1)

                val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)

                val deleted = fs
                  .delete(new Path(sourcePath + File.separator), true)
                logger.info(s"S3 folder path deleted=${deleted} sparkUuid=$sparkUuid path=${sourcePath}")

                val renamed = fs
                  .rename(new Path(tempTarget1),new Path(sourcePath))

试过但失败了

  1. Dataframe缓存/持久化不起作用,因为每当我试图编写cachedDf.write时,都会返回检查S3文件,这是我在编写之前手动清理的。
  2. 将Dataframe直接写入相同的S3目录并不有效,因为Dataframe只覆盖已分区的文件,即以“part-00.”开头的文件。
票数 0
EN

Stack Overflow用户

发布于 2020-05-01 17:26:17

是的,跳过#2是可能的。使用SaveMode.Overwrite可以完成对同一个位置的写入,与您读取的位置相同。

当您第一次将json (即#1 )读取为dataframe时,如果您进行缓存,它将在内存中。在此之后,您可以做一个清理和组合所有json在一个与联合和存储在拼花文件在一个步骤。就像这个例子。

案例1:所有的jsons都在不同的文件夹中,您希望它们将最终的数据作为拼花存储在jsons所在的同一位置.

代码语言:javascript
运行
复制
val dfpath1 = spark.read.json("path1")
val dfpath2 =  spark.read.json("path2")
val dfpath3 =  spark.read.json("path3")

val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe

val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..

 
  finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")

Case 2:所有的jsons都在同一个文件夹中,您希望它们将最终的数据作为多个拼图存储在同一根位置,而jsons就在其中.

在本例中,不需要将数据读取为多个数据格式,您可以给出根路径,其中jsons具有相同的模式。

代码语言:javascript
运行
复制
val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")

// or you can give multiple paths spark.read.json("path1","path2","path3")
 // since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns  dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")

AFAIK,在这两种情况下,aws不再是必需的。

更新:注册表。文件找不到你面临的异常。请参阅下面的代码示例,说明如何做到这一点。我引用了你在这里给我看的那个例子

代码语言:javascript
运行
复制
import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("quick silver saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

结果:

代码语言:javascript
运行
复制
+---+----+
|sex|date|
+---+----+
|1  |10  |
|2  |20  |
|3  |30  |
+---+----+

+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows

quick silver saved in same directory where he read it from final records he saved after clean up are  
+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
|3  |30  |Quick silver want to cleanup|
+---+----+----------------------------+

文件保存和读取的屏幕截图已清除,并再次保存:

注意:您需要实现案例1案例2,就像上面建议的更新一样.

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61546787

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档