在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
可以将RDD数据直接缓存到内存中,函数声明如下:
但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别
在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
持久化级别 | 说明 |
---|---|
MEMORY_ONLY(默认) | 将RDD以非序列化的Java对象存储在JVM中。 如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算。 这是默认级别。 |
MEMORY_AND_DISK (开发中可以使用这个) | 将RDD以非序列化的Java对象存储在JVM中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取 |
MEMORY_ONLY_SER (Java and Scala) | 将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU。 |
MEMORY_AND_DISK_SER (Java and Scala) | 与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们。 |
DISK_ONLY | 将RDD分区存储在磁盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 | 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上。 |
OFF_HEAP(实验中) | 与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中。 (即不是直接存储在JVM内存中) 如:Tachyon-分布式内存存储系统、Alluxio - Open Source Memory Speed Virtual Distributed Storage |
实际项目中缓存数据时,往往选择MEMORY_AND_DISK
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
此函数属于eager,立即执行。
package cn.itcast.core
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
*/
object SparkCacheTest {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 读取文本文件数据
val inputRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)
// 缓存数据
inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
// 使用Action函数触发缓存
println(s"Count = ${inputRDD.count()}")
println(s"Count = ${inputRDD.count()}")
// 释放缓存
inputRDD.unpersist()
// 应用程序运行结束,关闭资源
sc.stop()
}
}
或使用spark-shell演示
// 启动集群和spark-shell
/export/servers/spark/sbin/start-all.sh
// 将一个RDD持久化,后续操作该RDD就可以直接从缓存中拿
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)//WordCount
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了
在实际项目开发中,什么时候缓存RDD数据,最好呢???
第一点:当某个RDD被使用多次的时候,建议缓存此RDD数据
比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据
第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据
比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD:etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据
案例: etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)