spark在1.6的时候就已经默认把内存管理变为UnifiedMemoryManager,如果需要用回StaticMemoryManager,可以通过设置spark.memory.useLegacyMode为true,就可以用回原来的模式,下图为默认时,不同区域的内存使用的划分
- 1. ReservedMemory :预留内存,默认为300M,如果系统内存SystemMemory小于1.5倍的ReservedMemory就会报错
- 2. UsableMemory :可用内存,计算方式为SystemMemory-ReservedMemory
- 3. MaxMemory :最大可以使用的内存,等于usableMemory x memoryFraction(spark.memory.fraction=0.6)
- 4. HeapStorageMemory :存储内存等于maxMemory x storageFraction(spark.memory.storageFraction默认0.5)
- 5 . HeapExecutionMemory :等于maxMemory – HeapStorageMemory
UnifiedMemoryManager
通过上面的简图内存划分为不同的区域,而MemoryManager主要用来管理execution和storage他们之间是如何共享内存,他有两个实现类UnifiedMemoryManager和StaticMemoryManager,接下来会重点查看UnifiedMemoryManager是如何进行内存的分配的。
MemoryManager作为抽象类,它在初始化的时候出事会初始化ON_HEAP,OFF_HEAP的两个StorageMemoryPool和ExecutionMemoryPool对象,用来管理不同模式下的内存区域。
StorageMemoryPool主要是用来记录storage的一个可调整内存池的大小,而ExecutionMemoryPool相对它复杂,因为它需要保证每个task能够合理的共享内存,如果这里有N个task,它会确保每个task在spill之前会有至少1/2N的内存,且最多不能超过1/N,因为N是动态变化的,当task的数量改变的时候,我们会需要重新计算1/2N和1/N。
从ExecutionMemoryPool获取分配需要调用acquireMemory方法,首先第一步会先判断这个task是否属于Active task,如果不属于,则把他放到memoryForTask这个map的数据结构当中,value则记录该task当前使用了多少内存。
1 | val numActiveTasks = memoryForTask.keys.size |
并在每次循环获取足够acquireMemory的numBytes之前,都会尝试的去回收storage从execution借去的内存
当获取到的内存不能够满足required大小的需求时,它就会阻塞等待,直到内存满足
1 | // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; |
TaskMemoryManager
UnifiedMemoryManager是对内存的统一管理,而TaskMemoryManager则是管理每个独立Task的内存分配,TaskMemoryManager通过MemoryManager的acquireExecutionMemory接口进行内存申请,如果不能满足,则从consumers中挑选可以spill,进行内存释放,什么是consumer,其实我们可以用过日志打印直观的看出改Task使用了哪些consumer.
1 | log4j.logger.org.apache.spark.memory.TaskMemoryManager=DEBUG |
然后就可以清楚的在日志里面看到了
1 | 17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 5.2 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9 |
对于consumer来说,他有不同的实现类,ShuffleExternalSorter就是其中一个consumer的实现类,当我们在ShuffleExternalSorter插入一条record时他就会调用acquireNewPageIfNecessary,尝试的从TaskMemoryManager获取一个MemoryBlock(即一个page)同时,会将这个page记录到自己的pageTable中,并得到对应的pageCursor偏移量。
Spark用MemoryLocation记录和追踪在off-heap或on-heap内存地址,在off-heap模式,内存可以直接通过64-bit长度的地址进行寻址,在In-heap模式中,内存由该对象的引用和64-bit的offset去寻址,MemoryBlock在MemoryLocation的基础上增加了pageNumber和对应数据的length。
简单的函数调用关系为:consumer.allocatePage -> TaskMemoryManager.allocatePage -> MemoryManager的MemoryAllocator.allocate
在HeapMemoryAllocator中,我们可以看到,我们会通过内存对齐产生一个long类型的数组,并通过这个数组构成一个MemoryBlock
1 | long[] array = new long[(int) ((size + 7) / 8)]; |
在得到对应的MemoryBlock之后,通过pageCursor偏移量(通过unsafe的arrayBaseOffset获取对象头的长度,我的是16)将数据写入到内存当中
1 | /** |
和copyMemory将数据复制到对应的内存位置当中,并每次对pageCursor进行和数据长度length相加,找到下次数据写入的位置
总结
通过阅读和理解代码,加深了spark对内存管理方面知识的理解