欢迎关注我的微信公众号:FunnyBigData
作为打着 “内存计算” 旗号出道的 Spark,内存管理是其非常重要的模块。作为使用者,搞清楚 Spark 是如何管理内存的,对我们编码、调试及优化过程会有很大帮助。本文之所以取名为 "Spark 内存管理的前世今生" 是因为在 Spark 1.6 中引入了新的内存管理方案,而在之前一直使用旧方案。
刚刚提到自 1.6 版本引入了新的内存管理方案,但并不是说在 1.6 及之后的版本中不能使用旧的方案,而是默认使用新方案。我们可以通过设置 spark.memory.userLegacyMode
值来选择,该值为 false
表示使用新方案,true
表示使用旧方案,默认为 false
。该值是如何发挥作用的呢?如下:
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
根据 spark.memory.useLegacyMode
值的不同,会创建 MemoryManager 不同子类的实例:
false
:创建 UnifiedMemoryManager
类实例,为新的内存管理的实现true
:创建 StaticMemoryManager
类实例,为旧的内存管理的实现不管是在新方案中还是旧方案中,都根据内存的不同用途,都包含三大块。
storage 和 execution 内存都通过 MemoryManager 来申请和管理,而另一块内存则不受 MemoryManager 管理,主要有两个作用:
这块不受 MemoryManager 管理的内存,由系统预留以及 storage 和 execution 安全系数之外的内存组成,这个会在下文中详述。
接下来,让我们先来看看 “前世”
旧方案的内存结构如下图所示:
让我们结合上图做进一步说明:
execution 最大可用内存为 jvm space * spark.storage.memoryFraction * spark.storage.safetyFraction
,默认为 jvm space * 0.2 * 0.8
。
spark.shuffle.memoryFraction
很大程度上影响了 spill 的频率,如果 spill 过于频繁,可以适当增大 spark.shuffle.memoryFraction
的值,增加用于 shuffle 的内存,减少Spill的次数。这样一来为了避免内存溢出,可能需要减少 storage 的内存,即减小spark.storage.memoryFraction
的值,这样 RDD cache 的容量减少,在某些场景下可能会对性能造成影响。
由于 shuffle 数据的大小是估算出来的(这主要为了减少计算数据大小的时间消耗),会存在误差,当实际使用的内存比估算大的时候,这里 spark.shuffle.safetyFraction
用来作为一个保险系数,增加一定的误差缓冲,降低实际内存占用超过用户配置值的可能性。所以 execution 真是最大可用的内存为 0.2*0.8=0.16
。shuffle 时,一旦 execution 内存使用超过该比例,就会进行 spill。
storage 最大可用内存为 jvm space * spark.storage.memoryFraction * spark.storage.safetyFraction
,默认为 jvm space * 0.6 * 0.9
。
由于在 cache block 时大小也是估算的,所以也需要一个保险系数用来防止误差引起 OOM,即 spark.storage.safetyFraction
,所以真实能用来进行 memory cache block 的内存大小的比例为 0.6*0.9=0.54
。一旦 storage 使用内存超过该比例,将根据 StorageLevel 决定不缓存 block 还是 OOM 或是存储到磁盘。
storage 内存中有 spark.shuffle.unrollFraction
的部分是用来 unroll,即用于 “展开” 一个 partition 的数据,这部分默认为 0.2
系统预留的大小为:1 - spark.storage.memoryFraction - spark.shuffle.memoryFraction
,默认为 0.2。另一部分是 storage 和 execution 保险系数之外的内存大小,默认为 0.1。
旧方案最大的问题是 storage 和 execution 的内存大小都是固定的,不可改变,即使 execution 有大量的空闲内存且 storage 内存不足,storage 也无法使用 execution 的内存,只能进行 spill,反之亦然。所以,在很多情况下存在资源浪费。
另外,旧方案中,只有 execution 内存支持 off heap,storage 内存不支持 off heap。
上面我们提到旧方案的两个不足之处,在新方案中都得到了解决,即:
这两点将在后文中进一步展开,我们先来看看新方案中,默认的内存结构是怎样的?依旧分为三块(这里将 storage 和 execution 内存放在一起讲):
RESERVED_SYSTEM_MEMORY_BYTES
,即 300M,可以通过设置 spark.testing.reservedMemory
改变,一般只有测试的时候才会设置该配置,所以我们可以认为系统预留大小为 300M。另外,executor 的最小内存限制为系统预留内存的 1.5 倍,即 450M,若 executor 的总内存大小小于 450M,则会抛出异常(heap space - RESERVED_SYSTEM_MEMORY_BYTES)*(1 - spark.memory.fraction)
,默认为 (heap space - 300M)* 0.4
(heap space - 300) * spark.memory.fraction
,spark.memory.fraction
默认为 0.6。该值越小,发生 spill 和 block 踢除的频率就越高。其中: 由于新方案是 1.6 后默认的内存管理方案,也是目前绝大部分 spark 用户使用的方案,所以我们有必要更深入且详细的展开分析。
在最开始我们提到,新方案是由 UnifiedMemoryManager
实现的,我们先来看看该类的成员及方法,类图如下:
通过这个类图,我想告诉你这几点:
有了上面的这些基础知识,再来看看是怎么申请 storage 内存的。申请 storage 内存是通过调用
UnifiedMemoryManager#acquireStorageMemory(blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean
更具体的说法应该是为某个 block(blockId 指定)以那种内存模式(on heap 或 off heap)申请多少字节(numBytes)的 storage 内存,该函数的主要流程如下图:
对于上图,还需要做一些补充来更好理解:
(jvm space - 300M)* spark.memory.fraction
,如果你还记得的话,这在文章最开始的时候有介绍spark.memory.offHeap.size
指定,由 execution 和 storage 共享计算要向 execution 借用多少内存的代码如下:
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
为 execution 空闲内存和申请内存 size 的较小值,这说明了两点:
executionPool.memoryFree < numBytes
),更进一步说,成功借用到的内存加上 storage 原本空闲的内存之和有可能还是小于要申请的内存大小还有一点需要注意的是,借用是发生在相同 MemoryMode 的 storageMemoryPool 和 executionMemoryPool 之间,不能在不同的 MemoryMode 间进行借用
当 storage 空闲内存不足以分配申请的内存时,从上面的分析我们知道会向 execution 借用,借来后是不是就万事大吉了?当然······不是,前面也提到了即使借到了内存也可能还不够,这也是上图中红色圆框中问号的含义,在我们再进一步跟进到 StorageMemoryPool#acquireMemory(blockId: BlockId, numBytes: Long): Boolean
中一探究竟,该函数主要流程如下:
同样,对于上面这个流程图需要做一些说明:
val numBytesToFree = math.max(0, numAcquireBytes - memoryFree)
如上,要释放的内存大小为再从 execution 借用了内存,使得 storage 空闲内存增大 n(n>=0) 后,还比申请的内存少的那部分内存,若借用后 storage 空闲内存足以满足申请的大小,则 numBytesToFree 为 0,无需进行释放
释放的方式是踢除已缓存的 blocks,实现为 evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long
,有以下几个原则:
首先会进行预踢除(所谓预踢除就是计算假设踢除该 block 能释放多少内存),预踢除的具体逻辑是:遍历一个已缓存到内存的 blocks 列表(该列表按照缓存的时间进行排列,约早缓存的在越前面),逐个计算预踢除符合原则的 block 是否满足以下条件之一:
若最终预踢除的结果是可以满足要提取的大小,则对预踢除中记录的要踢除的 blocks 进行真正的踢除。具体的方式是:如果从内存中踢除后,还具有其他 StorageLevel 或在其他节点有备份,依然保留该 block 信息;若无,则删除该 block 信息。最终,返回踢除的总大小(可能稍大于要踢除的大小)。
若最终预踢除的结果是无法满足要提取的大小,则不进行任何实质性的踢除,直接返回踢除size 为 0。需要再次提醒的是,只能踢除相同 MemoryMode 的 block。
以上,结合两幅流程图及相应的说明,相信你已经搞清楚如何申请 storage 内存了。我们再来看看 execution 内存是如何申请的
我们知道,申请 storage 内存是为了 cache 一个 numBytes 的 block,结果要么是申请成功、要么是申请失败,不存在申请到的内存数比 numBytes 少的情况,这是因为不能将 block 一部分放内存,一部分 spill 到磁盘。但申请 execution 内存则不同,申请 execution 内存是通过调用
UnifiedMemoryManager#acquireExecutionMemory(numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
来实现的,这里的 numBytes 是指至多 numBytes,最终申请的内存数比 numBytes 少也是成功的,比如在 shuffle write 的时候使用的时候,如果申请Å的内存不够,则进行 spill。
另一个特点是,申请 execution 时可能会一直阻塞,这是为了能确保每个 task 在进行 spill 之前都能占用至少 1/2N 的 execution pool 内存数(N 为 active tasks 数)。当然,这也不是能完全确保的,比如 tasks 数激增但老的 tasks 还没释放内存就不能满足。
接下来,我们来看看如何申请 execution 内存,流程图如下:
从上图可以看到,整个流程还是挺复杂的。首先,我先对上图中的一些环节进行进一步说明以帮助理解,最后再以简洁的语言来概括一下整个过程。
同样,不同的 MemoryMode 的情况是不同的,如下:
(heap space - 300M) * spark.memory.storageFraction
(heap space - 300M)
spark.memory.offHeap.size
maxOffHeapMemory * spark.memory.storageFraction
这一小节描述的内容非常重要,因为之后所有的流程都是基于此,看到后面的流程时,还记着会有 ON_HEAP 和 OFF_HEAP 两种情况
只有当 executionMemoryPool 的空闲内存不足以满足申请的 numBytes 时,该函数才会生效。那这个函数是怎么向 storage 借用内存的呢?流程如下:
以上就是整个 execution 向 storage 借用内存的过程,与 storage 向 execution 借用最大的不同是:execution 会踢除 storage 已经使用的向 execution 的内存,踢除的流程在文章的前面有描述。这是因为,这本来就是属于 execution 的内存并且通过踢除来实现归还实现上也不复杂
也就是流程图中的 maxMemoryPerTask 和 minMemoryPerTask 是如何计算的,如下:
val maxPoolSize = computeMaxExecutionPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
maxPoolSize 为从 storage 借用了内存后,executionMemoryPool 的最大可用内存,maxMemoryPerTask 和 minMemoryPerTask 的计算方式也如代码所示。这样做是为了使得每个 task 使用的内存都能维持在 1/2*numActiveTasks ~ 1/numActiveTasks
范围内,使得在整体上能保持各个 task 资源占用比较均衡并且一定程度上允许需要更多资源的 task 在一定范围内能分配到更多资源,也照顾到了个性化的需求
首先要计算两个值:
(maxMemoryPerTask-已为该 task 分配的内存值)
中的较小值,如果 maxMemoryPerTask < 已为该 task 分配的内存值
,则直接为 0,也就是之前已经给该 task 分配的够多了所以,本次最终能分配的量也就是 toGrant,如果 toGrant 加上已经为该 task 分配的内存量之和
还小于 minMemoryPerTask 并且 toGrant 小于申请的量,则就会触发阻塞。否则,分配 toGrant 成功,函数返回。
阻塞释放的条件有两个,如下:
用简短的话描述整个过程如下:
首先,建议使用新模式,所以接下来的配置建议都是基于新模式的。
spark.memory.fraction
:如果 application spill 或踢除 block 发生的频率过高(可通过日志观察),可以适当调大该值,这样 execution 和 storage 的总可用内存变大,能有效减少发生 spill 和踢除 block 的频率spark.memory.storageFraction
:为 storage 占 storage、execution 内存总和的比例。虽然新方案中 storage 和 execution 之间可以发生内存借用,但总的来说,spark.memory.storageFraction
越大,运行过程中,storage 能用的内存就会越多。所以,如果你的 app 是更吃 storage 内存的,把这个值调大一点;如果是更吃 execution 内存的,把这个值调小一点spark.memory.offHeap.enabled
:堆外内存最大的好处就是可以避免 GC,如果你希望使用堆外内存,将该值置为 true 并设置堆外内存的大小,即设置 spark.memory.offHeap.size
,这是必须的另外,需要特别注意的是,堆外内存的大小不会算在 executor memory 中,也就是说加入你设置了 --executor memory 10G
和 spark.memory.offHeap.size=10G
,那总共可以使用 20G 内存,堆内和堆外分别 10G。
到这里,已经比较笼统的介绍了 Spark 内存管理的 “前世”,也比较细致的介绍了 “今生”。篇幅比较长,但没有一大段一大段的代码,应该还算比较好懂。如果看到这里,希望你多少能有所收获。
然后,请你在大致回顾下这篇文章,有没有觉得缺了点什么?是的,是缺了点东西,所谓 “内存管理” 怎么就没看到具体是怎么分配内存的呢?是怎么使用的堆外内存?storage 和 execution 的堆外内存使用方式会不会不同?execution 和 storage 又是怎么使用堆内内存的呢?以怎么样的数据结构呢?
如果你想搞清楚这些问题,关注公众号并回复 “内存管理下”