首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Apache火花:如何“缓存”数据集,使其不再被重新计算以便下一次计算。

Apache火花:如何“缓存”数据集,使其不再被重新计算以便下一次计算。
EN

Stack Overflow用户
提问于 2021-09-04 04:15:59
回答 2查看 288关注 0票数 0

请回答有关阿帕奇火花的小问题。

我有一个非常简单的火花工作:(这里用Java编写,但适用于其他语言)

代码语言:javascript
运行
复制
final SparkSession     sparkSession       = SparkSession.builder().getOrCreate();
final Dataset<Row>     someVeryBigDataSet = sparkSession.read().format("org.apache.spark.sql.cassandra").options(properties).load();
final Dataset<Integer> integerDataSet     = someVeryBigDataSet.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT());
final Dataset<Integer> goodIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger == 0);
final Dataset<Integer> badIntegerDataSet  = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger != 0);
LOGGER.info("good integer dataset size and bad integer dataset size:\n" + goodIntegerDataSet.count() + " " + badIntegerDataSet.count());
sparkSession.stop();

这项工作非常简单:

  1. 从一些大数据表
  2. 中提取一个非常大的数据集,将每一行转换为整数。为此,我使用了非常大的计算量,此操作只应执行一次。
  3. 将好的整数结果与步骤2的坏整数分开,显示计数

问题是,我在步骤2中看到了map方法,对于数据库的每一行,被多次执行

我的理论(请纠正我,如果我错了),它是第一次计算在第3行,在地图函数。

但是在第4行和第5行,在这两个过滤器函数中,当我们需要计数时,管道需要步骤2的结果再次

由于map函数只运行一次,如何避免这一点请

谢谢

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-09-04 08:17:32

创建.cache()时可以使用integerDataSet方法

代码语言:javascript
运行
复制
final Dataset<Integer> integerDataSet = someVeryBigDataSet
  .map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT())
  .cache();

它将将您的数据存储在内存中,或者如果空间不够,则保存在磁盘中,并且每次调用此数据time时,将加载持久化的数据,而无需重新计算。

缓存策略的更多细节:https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/

票数 1
EN

Stack Overflow用户

发布于 2021-09-04 18:03:29

持久化()和缓存()都是火花优化技术,用于存储数据,但cache()方法在默认情况下只存储内存中的数据(MEMORY_ONLY),而在持久化()方法中,开发人员可以将存储级别定义为内存中或磁盘中的数据。

#缓存DF以将数据存储在MEMORY_ONLY中

代码语言:javascript
运行
复制
df.cache()

要检查数据文件是否缓存,我们可以使用df.is_cached或df.storageLevel.useMemory。这两个方法都将返回一个bool值为True或False。

#使用默认存储级别的持久化数据

代码语言:javascript
运行
复制
df.persist()

#使用MEMORY_AND_DISK_2保存数据

代码语言:javascript
运行
复制
df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69052370

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档