在Spark中,Executor需要定期向Driver发送心跳以表明自己仍然存活,而HeartbeatReceiver是由Driver持有并负责处理各个Executor的心跳,监控它们的运行状态。

在代码实现上,HeartbeatReceiver类继承了SparkListener抽象类,并实现了ThreadSafeRpcEndpoint特征,这表明其既是一个监听器,也是一个线程安全的RPC端点。 HeartbeatReceiver类有两个构造方法参数,分别是SparkContext和Clock特征的实现类SystemClock类,SystemClock提供了对系统时间System.currentTimeMillis() 的简单封装。在HeartbeatReceiver构造时,会同时将其加入到LiveListenerBus的Executor管理队列(executorManagement)中进行监听。

下面简单介绍下HeartbeatReceiver中的部分成员属性:

  • executorLastSeen:维护Executor ID与收到该Executor最后一次心跳的时间戳之间的映射关系;

  • slaveTimeoutMs:由spark.storage.blockManagerSlaveTimeoutMs配置项控制,表示的是Executor上的BlockManager的超时时间,默认是120秒;

  • executorTimeoutMs:由配置项spark.network.timeout配置项控制,表示的是Executor自身的超时时间,默认值是spark.storage.blockManagerSlaveTimeoutMs 配置的值;

  • timeoutIntervalMs:由配置项spark.storage.blockManagerTimeoutIntervalMs控制,表示的是检查Executor上BlockManager是否超时的时间间隔,默认是60秒;

  • checkTimeoutIntervalMs:由配置项spark.network.timeoutInterval控制,表示检查Executor是否超时的时间间隔,默认值是spark.storage.blockManagerTimeoutIntervalMs 配置的值;

  • timeoutCheckingTask:持有检查Executor是否超时的任务返回的ScheduledFuture对象;

  • eventLoopThread:单守护线程的调度线程池,名称为heartbeat-receiver-event-loop-thread,是整个HeartbeatReceiver的事件处理线程;

  • killExecutorThread:单守护线程的普通线程池,名称为kill-executor-thread,用于异步执行kill Executor的任务;

介绍了HeartbeatReceiver类的这么多属性,那么再看看它都有哪些方法:

  • onStart():HeartbeatReceiver是一个RPC端点,因此也实现了RpcEndpoint.onStart()方法,当RPC环境中的Dispatcher注册RPC端点时,将会调用该方法。 这个方法会让eventLoopThread以spark.network.timeoutInterval规定的时间间隔调度执行,并将ScheduledFuture对象返回给timeoutCheckingTask。而 eventLoopThread线程只做一件事情,那就是向HeartbeatReceiver自己发送ExpireDeadHosts消息,并等待回复。

  • onExecutorAdded()/onExecutorRemoved():HeartbeatReceiver作为一个监听器,实现了SparkListener.onExecutorAdded()与onExecutorRemoved() 方法,用来监听Executor的添加与移除。它们分别调用了addExecutor()和removeExecutor()方法,当监听到Executor的添加或移除时,HeartbeatReceiver就会 向自己发送一个带有Executor ID的ExecutorRegistered或ExecutorRemoved消息,并等待回复。

  • receiveAndReply():实现了RpcEndpoint.receiveAndReply()方法,对于不同的消息分别使用不同的逻辑进行处理:ExecutorRegistered消息是将Executor ID 与通过SystemClock获取的当前时间戳加入到executorLastSeen映射中,并回复true;ExecutorRemoved消息是从executorLastSeen映射中删除Executor ID对应的 条目,并回复true;TaskSchedulerIsSet消息的含义是TaskScheduler已经生成并准备好,在SparkContext初始化的过程中会发送此消息,在收到此消息后会令HeartbeatReceiver 也持有一份TaskScheduler实例,并回复true;ExpireDeadHosts消息的含义是清理那些由于太久没有发送心跳而超时的Executor,它会调用expiredDeadHosts()方法 并回复true;Heartbeat是Executor向Driver发送过来的心跳信号,在TaskScheduler不为空时,如果executorLastSeen映射中已经保存有Executor ID,就更新executorLastSeen 中对应Executor ID的时间戳,并向eventLoopThread线程提交执行TaskScheduler.executorHeartbeatReceived()方法(这个方法用于通知Master,使其知道BlockManager是存活的), 并回复HeartbeatResponse消息。executorHeartbeatReceived()方法会返回一个布尔值,表示Driver是否知道Executor持有的BlockManager,若不知道则在HeartbeatResponse消息 中注明需要重新注册BlockManager。executorLastSeen映射中如果不包含当前Executor ID或TaskScheduler为空,都会直接回复需要重新注册BlockManager的HeartbeatResponse消息。

  • expireDeadHosts():遍历executorLastSeen映射,取出最后一次心跳的时间戳与当前时间进行对比,如果差值大于spark.network.timeout表示Executor已超时,那么就需要:调用 TaskScheduler.executorLost()方法,从调度体系中移除超时的Executor;向killExecutorThread线程池提交执行SparkContext.killAndReplaceExecutor()方法的任务,异步地 杀掉超时的Executor;从executorLastSeen映射中删除超时的Executor ID的条目。