突然之间,原本就已经很肆虐的疫情变得更加疯狂,而我却还没来得及买N95的口罩。往年过年还需要走亲访友,今年却是哪里也不能去。不过对于我这种不太喜欢走亲戚的人倒也乐得清闲, 还多了许多时间来分析源码,不过还是希望疫情赶快过去。武汉,加油!!!

Spark源码阅读17:Block中对块的元数据信息BlockInfo进行分析时涉及到了锁的相关信息,包括读取块数据的加锁次数和写入数据时的写锁, 但是我们在看BlockData的实现类ByteBufferBlockData时,其底层的实际实现ChunkedByteBuffer类中并没有锁的代码,这说明锁的具体实现和管理另有别的类来完成, 这个类就是BlockInfoManager,字面理解是”块信息管理器”,其实主要是对块的锁进行管理。这也很好理解,锁的信息出现在BlockInfo,而且实际上锁属于块的元数据 信息,由BlockInfo委托给BlockInfoManager管理也属正常操作。

BlockInfoManager类与BlockInfo类在一个文件内,直接来看一下BlockInfoManage的里面的成员属性吧:

  • TaskAttemptId:Long类型的重新命名,表示Task的尝试ID;

  • infos:存储BlockId与BlockInfo的对应关系,用于遍历每一个独立的块的元数据信息;

  • writeLocksByTask:跟踪每一个Task持有写锁的块的集合,存储的是TaskAttemptId和该Task持有写锁的块之间的映射关系,一个Task一次可以获取多个块的写锁;

  • readLocksByTask:与writeLocksByTask类似,只不过它存储的是读锁;

在BlockInfoManager中,会调用registerTask()方法注册任务,并且这个方法必须在调用这个任务的所有其它BlockInfoManager方法之前进行调用,传入的参数 BlockInfo.NON_TASK_WRITER是一个特殊的taskAttemptId(值是-1024),用于标识块的写锁被非Task的线程所持有。currentTaskAttemptId()用于获取当前任务的 taskAttemptId,如果是被非Task的线程调用则会返回lockInfo.NON_TASK_WRITER,有点类似于get()方法。

lockForReading()是为Block加读锁,并会返回其元数据。其具体的执行流程是:根据传入的参数blockId从infos中找到对应的BlockInfo,判断其对应的writerTask是否为 BlockInfo.NO_WRITER(特殊的taskAttemptId标识Block的写锁没有被占有),如果是的话,就增加info.readerCount计数,并将BlockId加入到readLocksByTask映射 中,这表明加锁已经成功。否则如果加锁不成功,且如果blocking为true,则调用Object.wait()方法阻塞,直到该块的写锁被释放后被notify()/notifyAll()方法唤醒。 需要注意的是:一个Task能够给一个Block多次加读锁,在此情况下每一个锁都需要分别进行释放。

lockForWriting()与lockForReading()相对应,只是其给Block加的是写锁,其执行流程也比较类似。不同的是,其判断加锁成功的条件是既没有写锁也没有读锁(这说明Block 写锁与读锁、写锁与写锁之间是互斥的,读锁与读锁之间是可以共享的,且读锁可重入,写锁不可重入,当然这点对并发编程的读写锁比较熟悉的话应该是很清楚的规则了),并且会 设置info.writerTask的值为currentTaskAttemptId()方法返回的值,将BlockId加入到的则是writeLocksByTask映射。

lockNewBlockForWriting()用于尝试获取新块的合适的锁,采用的是第一个写入者优先的原则,即尝试获取blockId对应的读锁,如果能够获取到读锁则表明块已存在,也就是有其它 线程已经向这个块写入了数据,因此没必要再写直接返回false即读锁即可。反之,则将新块放入infos映射,并获取其写锁,返回true。

unlock()方法用于释放单个块的锁,它先获取taskAttemptId和BlockInfo信息,然后检查当前Task是否持有块的写锁,如果持有则将info.writerTask设置为BlockInfo.NO_WRITER, 也就是释放写锁。如果没有持有写锁,则将info.readerCount减1,也就是释放读锁。最后调用notifyAll()方法唤醒块上等待的所有线程,它会唤醒先前调用wait()进行等待的所有Task。

releaseAllLocksForTask()也是释放锁的方法,但是它释放的是当前taskAttemptId对应的所有锁,并返回所有blockIds,这个方法应该在Task结束时被调用。

downgradeLock()锁降级也是并发编程中涉及到的概念,它能将独占的写锁降级为共享的读锁,当一个线程在获取到写锁时再去获取读锁时就会释放写锁,也就是进行锁降级。但是从代码实现 上实际上是先释放了写锁,然后再重新获取读锁,结果一样但过程跟理解的还是有区别。

removeBlock()的逻辑是,当持有BlockInfo写锁的Task是当前Task时,从infos映射中删除blockId对应的BlockInfo,并释放BlockInfo上的所有读锁和写锁,并调用notifyAll()方法 唤醒块上等待的所有线程。