请回答有关阿帕奇火花的小问题。
我有一个非常简单的火花工作:(这里用Java编写,但适用于其他语言)
final SparkSession sparkSession = SparkSession.builder().getOrCreate();
final Dataset<Row> someVeryBigDataSet = sparkSession.read().format("org.apache.spark.sql.cassandra").options(properties).load();
final Dataset<Integer> integerDataSet = someVeryBigDataSet.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT());
final Dataset<Integer> goodIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger == 0);
final Dataset<Integer> badIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger != 0);
LOGGER.info("good integer dataset size and bad integer dataset size:\n" + goodIntegerDataSet.count() + " " + badIntegerDataSet.count());
sparkSession.stop();
这项工作非常简单:
。
问题是,我在步骤2中看到了map方法,对于数据库的每一行,被多次执行。
我的理论(请纠正我,如果我错了),它是第一次计算在第3行,在地图函数。
但是在第4行和第5行,在这两个过滤器函数中,当我们需要计数时,管道需要步骤2的结果再次。
由于map函数只运行一次,如何避免这一点请
谢谢
发布于 2021-09-04 08:17:32
创建.cache()
时可以使用integerDataSet
方法
final Dataset<Integer> integerDataSet = someVeryBigDataSet
.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT())
.cache();
它将将您的数据存储在内存中,或者如果空间不够,则保存在磁盘中,并且每次调用此数据time时,将加载持久化的数据,而无需重新计算。
缓存策略的更多细节:https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/
发布于 2021-09-04 18:03:29
持久化()和缓存()都是火花优化技术,用于存储数据,但cache()方法在默认情况下只存储内存中的数据(MEMORY_ONLY),而在持久化()方法中,开发人员可以将存储级别定义为内存中或磁盘中的数据。
#缓存DF以将数据存储在MEMORY_ONLY中
df.cache()
要检查数据文件是否缓存,我们可以使用df.is_cached或df.storageLevel.useMemory。这两个方法都将返回一个bool值为True或False。
#使用默认存储级别的持久化数据
df.persist()
#使用MEMORY_AND_DISK_2保存数据
df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
https://stackoverflow.com/questions/69052370
复制相似问题