Spark中负责内存管理的组件是MemoryManager及其子类,真正负责内存分配和释放的组件是MemoryStore,负责磁盘块管理的组件是DiskBlockManager,由此可以 联想到,真正负责磁盘写入和释放的组件应该也有个类似于XxxStore的名字,没错啦,它就是今天要分析的DiskStore类。

DiskStore类的构造方法有三个参数,分别接受SparkConf、DiskBlockManager、SecurityManager这三个类的实例,前两个类在前面都已经分析过了,最后一个 SecurityManager从名字就能看出是负责提供安全支持的类,由SparkEnv负责实例化,且绝大多数的组件都要通过SparkEnv来访问SecurityManager。再来看看其 成员属性:

  • minMemoryMapBytes:内存映射(MemoryMap)读取文件的最小阈值,由spark.storage.memoryMapThreshold配置项指定,默认是2M,如果磁盘文件大小大 于这个值,就需要使用内存映射来读取;

  • maxMemoryMapBytes:内存映射(MemoryMap)读取文件的最大阈值,由spark.storage.memoryMapLimitForTests配置项指定,这是个测试参数,默认值是 Integer.MAX_VALUE - 15,之所以减去15是为了避免某些情况下JVM的实际内存比Integer.MAX_VALUE要小的问题;

  • blockSizes:ConcurrentHashMap类型,维护BlockId与其对应大小之间的映射;

其写入方法put()调用了提供的回调函数来写入特定的块数据,其具体逻辑是:首先根据blockId来检查块是否已经存入磁盘,如果还没有存入,则调用diskManager.getFile() 获取块对应的文件操作对象,然后打开文件获取文件的WritableByteChannel(这是java NIO中用于写入数据的通道),最后调用传入参数writeFunc函数,通过 WritableByteChannel将数据写入到文件,并将块ID及其对应的块大小放入到blockSizes映射中。

上面的put()方法中调用了openForWrite()方法来打开文件获取文件的WritableByteChannel,其实现是根据文件操作对象构造文件的输出流FileOutputStream, 然后获取其对应的唯一的FileChannel用于写入数据。如果在进行I/O操作的过程中需要加密,就需要使用CryptoStreamUtils.createWritableChannel()方法 对channel进行包装。

紧跟着put()方法的putBytes()方法在实现上调用了put()方法,它的参数除了BlockId外,还有一个ChunkedByteBuffer参数,这个参数内存放着需要写入的数据, 且在调用put()方法时会调用ChunkedByteBuffer.writeFully()方法,将数据以固定大小的分块写入(分块大小由spark.buffer.write.chunkSize配置项设置, 不能超过Integer.MAX_VALUE - 15,默认是64M)WritableByteChannel。

再来看看getBytes()方法,其实现也很简单,就是通过diskManager.getFile()获取到块对应的文件操作对象,然后获取到块大小,然后根据I/O操作的过程中是否 需要加密来返回不同的结果,如果需要加密返回的是EncryptedBlockData对象,否则返回的是DiskBlockData对象,它们都是BlockData的子类,BlockData就是 抽象了如何对块进行存储并提供了多种不同的方式来读取其中的数据的特征类。

DiskBlockData()是BlockData子类中比较常用到的,在这个类中有两个比较重要且实现稍微复杂一点的方法:toChunkedByteBuffer()和toByteBuffer()方法。 前者首先调用了Utils.tryWithResource()方法,这个方法的作用与java 1.7版本中增加的try-with-resource方法一样,用于自动关闭外部资源,来看一下被它 包起来的代码,它先调用同一个类中提供的open()方法将文件转化为文件输入流FileInputStream,并获取其FileChannel,再调用JavaUtils.readFully()方法 将从channel中获取到的数据写到ByteBuffer中,每一个ByteBuffer就是一个chunk,并将该chunk添加到ListBuffer类型的chunks中,组装成ChunkedByteBuffer。 后者则会先检查块大小是否小于maxMemoryMapBytes(这个的取值上面有看到),只有小于才会继续走下去,它同样调用了Utils.tryWithResource()方法,只是它 包起来的代码是再次检查块大小是否小于minMemoryMapBytes(这个的取值上面也有看到),如果条件满足,表明是小文件则采用与toChunkedByteBuffer()类似的 方式直接读取,否则调用FileChannelImpl.map()方法将数据直接映射到MappedByteBuffer也就是进程的虚拟内存中。

到此,已经分析完DiskStore究竟是如何写入和读取数据的了。