shuffle系统的入口。ShuffleManager在driver和executor中的sparkEnv中创建。在driver中注册shuffle,在executor中读取和写入数据。
registerShuffle:注册shuffle,返回shuffleHandle
unregisterShuffle:移除shuffle
shuffleBlockResolver:获取shuffleBlockResolver,用于处理shuffle和block之间的关系
getWriter:获取partition对应的writer,在executor的map task中调用
getReader、getReaderForRange:获取一段范围partition的reader,在executor的 reduce task中调用
是shuffleManager的唯一实现。
在基于sort的shuffle中,进入的消息会按照partition进行排序,最后输出一个单独的文件。
reducer会读取这个文件的一段区域数据。
当输出的文件太大了,不能全部放在内存中的时候,会spill在磁盘上生成排序的中间结果文件,这些中间文件会合并成一个最终文件输出。
Sort-based shuffle有两个方式:
序列化sort的优势
在序列化sort模式下,shuffle writer将进来的消息序列化后保存在一个数据结构中并排序。

根据不同场景选择对应的handle。优先顺序是BypassMergeSortShuffleHandle>SerializedShuffleHandle>BaseShuffleHandle
bypass条件:没有mapside,partition数量小于等于_SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD_
序列化handle条件:序列化类支持支持序列化对象的迁移,并且不使用mapSideCombine操作以及父RDD的分区数不大于 (1 << 24)
首先缓存此次的shuffle和map信息到taskIdMapsForShuffle_中_
根据shuffle对应的handle选择对应的writer.
BypassMergeSortShuffleHandle->BypassMergeSortShuffleWriter
SerializedShuffleHandle->UnsafeShuffleWriter
BaseShuffleHandle->SortShuffleWriter
taskIdMapsForShuffle移除对应的shuffle和shuffle对应map产生的文件
获取shuffle文件对应全部block地址,即blocksByAddress.
创建BlockStoreShuffleReader对象并返回.
主要是用来传递shuffle的参数,同时也是一个标记,标记选择哪个writer



抽象类,负责map任务输出消息.主要方法是write,有三个实现类
后面在单独分析。
特质,实现类可以根据mapId、reduceId、shuffleId来获取对应的block数据.
ShuffleBlockResolver的唯一实现类。
创建并维护逻辑块与物理文件位置之间的映射关系,针对来自同一map任务的shuffle块数据。
属于同一个map任务的shuffle块数据会被存储在一个整合的数据文件中。
而这些数据块在数据文件中的偏移量,则被单独存储在一个索引文件中。
.data是数据文件后缀
.index是索引文件后缀
获取数据文件。
生成ShuffleDataBlockId,调用的blockManager.diskBlockManager.getFile方法获取file
同getDataFile类似
生成ShuffleIndexBlockId,调用的blockManager.diskBlockManager.getFile方法获取file
根据shuffleId和mapId获取到data文件和index文件,然后删除
根据mapId、shuffleId获取对应的data文件和index文件。
检查data文件和index文件是否存在并且能够匹配上,直接返回。
不能匹配上,就生成新的index临时文件。再重命名生成新的index文件和data文件并返回。

假设shuffle有3个partition,对应数据大小分别是1000、1500、2500。
index文件,首行是0,后面都是partition数据的累加值,第二行是1000,第三行是1000+1500=2500,第三行是2500+2500=5000.
data文件是按照partition大小排序进行存储的。
校验data文件和index文件是否匹配,不匹配返回null,匹配返回partition大小的数组。
1.index文件大小是 (blocks + 1) * 8L
2.index文件第一行是 0
3.获取partition的大小写入lengths,lengths的汇总值等于data文件大小
满足上面三个条件,返回lengths,否则返回null
获取到shuffleId、mapId、startReduceId、endReduceId
获取到index文件
读取对应的startOffset和endOffset
使用data文件、startOffset、endOffset生成FileSegmentManagedBuffer并返回