前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >2021年大数据Spark(十七):Spark Core的RDD持久化

2021年大数据Spark(十七):Spark Core的RDD持久化

作者头像
Lansonli
发布2021-10-09 17:17:34
发布2021-10-09 17:17:34
42200
代码可运行
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客
运行总次数:0
代码可运行

RDD 持久化

引入

在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

API

缓存/持久化函数

可以将RDD数据直接缓存到内存中,函数声明如下:

但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别

缓存/持久化级别

在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:

持久化级别

说明

MEMORY_ONLY(默认)

将RDD以非序列化的Java对象存储在JVM中。 如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算。 这是默认级别。

MEMORY_AND_DISK (开发中可以使用这个)

将RDD以非序列化的Java对象存储在JVM中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取

MEMORY_ONLY_SER (Java and Scala)

将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU。

MEMORY_AND_DISK_SER (Java and Scala)

与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们。

DISK_ONLY

将RDD分区存储在磁盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2等

与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上。

OFF_HEAP(实验中)

与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中。 (即不是直接存储在JVM内存中) 如:Tachyon-分布式内存存储系统、Alluxio - Open Source Memory Speed Virtual Distributed Storage

实际项目中缓存数据时,往往选择MEMORY_AND_DISK

缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。

释放缓存/持久化

当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:

此函数属于eager,立即执行。

代码演示

代码语言:javascript
代码运行次数:0
运行
复制
package cn.itcast.core

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
 * RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
 */
object SparkCacheTest {
    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")
        
        // 读取文本文件数据
        val inputRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)

        // 缓存数据
        inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
        // 使用Action函数触发缓存
        println(s"Count = ${inputRDD.count()}")
        println(s"Count = ${inputRDD.count()}")

        // 释放缓存
        inputRDD.unpersist()
        // 应用程序运行结束,关闭资源
        sc.stop()
    }
}

或使用spark-shell演示

代码语言:javascript
代码运行次数:0
运行
复制
// 启动集群和spark-shell

/export/servers/spark/sbin/start-all.sh



// 将一个RDD持久化,后续操作该RDD就可以直接从缓存中拿

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)//WordCount

rdd2.cache //缓存/持久化

rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行缓存/持久化

rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

总结:何时使用缓存/持久化

在实际项目开发中,什么时候缓存RDD数据,最好呢???

 第一点:某个RDD被使用多次的时候,建议缓存此RDD数据

比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据

第二点:某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据

比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD:etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据

案例: etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/04/14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RDD 持久化
    • 引入
    • API
      • 缓存/持久化函数
      • 缓存/持久化级别
      • 释放缓存/持久化
    • 代码演示
    • 总结:何时使用缓存/持久化
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档