之前有两篇文件分别分析了Map和Reduce阶段的流程,这篇文章把Map阶段的环形缓冲区单独拿出来进行分析,对环形缓冲区的数据结构和数据进入环形缓冲区然后溢写到磁盘的流程进行分析。
环形缓冲区数据结构
Map过程中环形缓冲区是指数据被map处理之后会先放入内存,内存中的这片区域就是环形缓冲区。
环形缓冲区是在MapTask.MapOutputBuffer
中定义的,相关的属性如下:
1 | // k/v accounting |
环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,key/value的元数据存储的格式是int类型,每个key/value对应一个元数据,元数据由4个int组成,第一个int存放value的起始位置,第二个存放key的起始位置,第三个存放partition,最后一个存放value的长度。
key/value序列化的数据和元数据在环形缓冲区中的存储是由equator分隔的,key/value按照索引递增的方向存储,meta则按照索引递减的方向存储,将其数组抽象为一个环形结构之后,以equator为界,key/value顺时针存储,meta逆时针存储。
初始化
环形缓冲区的结构在MapOutputBuffer.init
中创建。
1 | public void init(MapOutputCollector.Context context |
init是对环形缓冲区进行初始化构造,由mapreduce.task.io.sort.mb决定map中环形缓冲区的大小sortmb,默认是100M。
此缓冲区也用于存放meta,一个meta占用METASIZE(16byte),则其中用于存放数据的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好设置sortmb转换为byte之后是16的整数倍),然后用maxMemUsage初始化kvbuffer字节数组和kvmeta整形数组,最后设置数组的一些标识信息。利用setEquator(0)
设置kvbuffer和kvmeta的分界线,初始化的时候以0为分界线,kvindex为aligned - METASIZE + kvbuffer.length,其位置在环形数组中相当于按照逆时针方向减去METASIZE,由kvindex设置kvstart = kvend = kvindex,由equator设置bufstart = bufend = bufindex = equator,还得设置bufvoid = kvbuffer.length,bufvoid用于标识用于存放数据的最大位置。
为了提高效率,当buffer占用达到阈值之后,会进行spill,这个阈值是由bufferRemaining进行检查的,bufferRemaining由softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;
进行初始化赋值,这里需要注意的是softLimit并不是sortmb*spillper,而是kvbuffer.length * spillper,当sortmb << 20是16的整数倍时,才可以认为softLimit是sortmb*spillper。
下面是setEquator的代码
1 | // setEquator(0)的代码如下 |
buffer初始化之后的抽象数据结构如下图所示:
写入buffer
Map通过NewOutputCollector.write
方法调用collector.collect
向buffer中写入数据,数据写入之前已在NewOutputCollector.write
中对要写入的数据进行逐条分区,下面看下collect
1 | // MapOutputBuffer.collect |
每次写入数据时,执行bufferRemaining -= METASIZE
之后,检查bufferRemaining,
如果大于0,直接将key/value序列化对和对应的meta写入buffer中,key/value是序列化之后写入的,key/value经过一些列的方法调用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len)
,最后由MapOutputBuffer.Buffer.write(b, off, len)
将数据写入kvbuffer中,write方法如下:
1 | public void write(byte b[], int off, int len) |
write方法将key/value写入kvbuffer中,如果bufindex+len超过了bufvoid,则将写入的内容分开存储,将一部分写入bufindex和bufvoid之间,然后重置bufindex,将剩余的部分写入,这里不区分key和value,写入key之后会在collect中判断bufindex < keystart
,当bufindex小时,则key被分开存储,执行bb.shiftBufferedKey()
,value则直接写入,不用判断是否被分开存储,key不能分开存储是因为要对key进行排序。
这里需要注意的是要写入的数据太长,并且kvinde==kvend,则抛出MapBufferTooSmallException异常,在collect中捕获,将此数据直接spill到磁盘spillSingleRecord
,也就是当单条记录过长时,不写buffer,直接写入磁盘。
下面看下bb.shiftBufferedKey()代码
1 | // BlockingBuffer.shiftBufferedKey |
shiftBufferedKey时,判断首部是否有足够的空间存放key,有没有足够的空间,则先将首部的部分key写入keytmp中,然后分两次写入,再次调用Buffer.write,如果有足够的空间,分两次copy,先将首部的部分key复制到headbytelen的位置,然后将末尾的部分key复制到首部,移动bufindex,重置bufferRemaining的值。
key/value写入之后,继续写入元数据信息并重置kvindex的值。
spill
一次写入buffer结束,当写入数据比较多,bufferRemaining小于等于0时,准备进行spill,首次spill,spillInProgress为false,此时查看bUsed = distanceTo(kvbidx, bufindex),此时bUsed >= softLimit 并且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE)
,则进行spill,调用startSpill
1 | private void startSpill() { |
startSpill唤醒spill线程之后,进程spill操作,但此时map向buffer的写入操作并没有阻塞,需要重新边界equator和bufferRemaining的值,先来看下equator和bufferRemaining值的设定:
1 | // 根据已经写入的kv得出每个record的平均长度 |
因为equator是kvbuffer和kvmeta的分界线,为了更多的空间存储kv,则最多拿出distkvi的一半来存储meta,并且利用avgRec估算distkvi能存放多少个record和meta对,根据record和meta对的个数估算meta所占空间的大小,从distkvi/2和meta所占空间的大小中取最小值,又因为distkvi中最少得存放一个meta,所占空间为METASIZE,在选取kvindex时需要求aligned,aligned最多为METASIZE-1,总和上述因素,最终选取equator为(bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE)))
。equator选取之后,设置bufmark = bufindex = newPos和kvindex,但此时并不设置bufstart、bufend和kvstart、kvend,因为这几个值要用来表示spill数据的边界。
spill之后,可用的空间减少了,则控制spill的bufferRemaining也应该重新设置,bufferRemaining取三个值的最小值减去2*METASIZE,三个值分别是meta可用占用的空间distanceTo(bufend, newPos)
,kv可用空间distanceTo(newPos, serBound)
和softLimit。这里为什么要减去2*METASIZE,一个是spill之前kvend到kvindex的距离,另一个是当时的kvindex空间????此时,已有一个record要写入buffer,需要从bufferRemaining中减去当前record的元数据占用的空间,即减去METASIZE,另一个METASIZE是在计算equator时,没有包括kvindex到kvend(spill之前)的这段METASIZE,所以要减去这个METASIZE。
接下来解析下SpillThread线程,查看其run方法:
1 | public void run() { |
run中主要是sortAndSpill
,
1 | private void sortAndSpill() throws IOException, ClassNotFoundException, |
sortAndSpill中,有mstart和mend得到一共有多少条record需要spill到磁盘,调用sorter.sort对meta进行排序,先对partition进行排序,然后按key排序,排序的结果只调整meta的顺序。
排序之后,判断是否有combiner,没有则直接将record写入磁盘,写入时是一个partition一个IndexRecord,如果有combiner,则将该partition的record写入kvIter,然后调用combinerRunner.combine执行combiner。
写入磁盘之后,将spillx.out对应的spillRec放入内存indexCacheList.add(spillRec),如果所占内存totalIndexCacheMemory超过了indexCacheMemoryLimit,则创建index文件,将此次及以后的spillRec写入index文件存入磁盘。
最后spill次数递增。sortAndSpill结束之后,回到run方法中,执行finally中的代码,对kvstart和bufstart赋值,kvstart = kvend
,bufstart = bufend
,设置spillInProgress的状态为false。
在spill的同时,map往buffer的写操作并没有停止,依然在调用collect,再次回到collect方法中,
1 | // MapOutputBuffer.collect |
有新的record需要写入buffer时,判断bufferRemaining -= METASIZE
,此时的bufferRemaining是在开始spill时被重置过的(此时的bufferRemaining应该比初始的softLimit要小),当bufferRemaining小于等于0时,进入if,此时spillInProgress的状态为false,进入if (!spillInProgress),startSpill时对kvend和bufend进行了重置,则此时(kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)
,调用resetSpill()
,将kvstart、kvend和bufstart、bufend设置为上次startSpill时的位置。此时buffer已将一部分内容写入磁盘,有大量空余的空间,则对bufferRemaining进行重置,此次不spill。
bufferRemaining取值为Math.min(distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE
最后一个METASIZE是当前record进入collect之后bufferRemaining减去的那个METASIZE,为什么要减去2*METASIZE,不知道。。。。。
1 | private void resetSpill() { |
当bufferRemaining再次小于等于0时,进行spill,这以后就都是套路了。环形缓冲区分析到此结束。