我将使用Apache Spark来处理大文本文件,其中在处理周期中,将文本部分与来自大型SQL表的数据进行比较。
任务是:
1) Process files and break text into pieces
2) Compare pieces with database ones
当然,瓶颈将是SQL。我对Apache Spark完全陌生,虽然我确信Subtask #1是“他的人”,但我不能完全肯定,Subtask #2是否可以由火花(我的意思是说,以有效的方式处理)。
问题是Spark如何处理大SQL中的可迭代选择(也许,尽可能多地缓存?)在并行和分布式环境中?
我当然知道spark.sql.shuffle.partitions的配置,
但是例如,当我在只有200行的小型数据集上设置此配置300时,配置是无效的,实际分区数仅为2,
例如,我在300亿行的数据集上设置了这个配置3000,这个配置也是无效的,实际的分区数只有600,
我们看到,当我们在小数据集上设置一个大值分区配置时,该配置将是无效的,
所以我只想知道在SparkSQL中混洗时,Spark是如何决定下一阶段的分区数量的?或者如何强制此配置生效?
我的Spark SQL如下所示:
set spark.sql.shuffle.partitions=3000;
with base_data as
当我将相当大的数据集(即维基百科的档案)加载到火花数据格式中时,我收到了以下错误:
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.
累加器基本上是火花中的共享变量,将由执行器更新,但仅由驱动程序读取。spark中的Collect()是将所有数据从执行器中输入到驱动程序中。
所以,在这两种情况下,我最终得到的数据只在驱动程序中。那么,当我们使用累加器或collect()将一个大的RDD转换为一个列表时,性能有什么不同呢?
使用累加器将数据转换为列表的代码
val queryOutput = spark.sql(query)
val acc = spark.sparkContext.collectionAccumulator[Map[String,Any]]("JsonCollector")
val jsonS
我有一个相当大的数据集(100个million+记录和100个列),我正在用spark处理。我正在将数据读入spark数据集,并希望过滤此数据集并将其字段的子集映射到case类。
代码看起来有点类似,
case class Subset(name:String,age:Int)
case class Complete(name:String,field1:String,field2....,age:Int)
val ds = spark.read.format("csv").load("data.csv").as[Complete]
#approach 1