在Spark源码解析中曾经分析到Spark新版采用的是Netty来进行节点间的通信,当然Netty也是Storm节点间通信的主要方式(Flink中用的还是Akka),在Netty4中能 提供全双工、多路复用I/O模型的Socket I/O能力,既然实时计算引擎都使用Netty来进行节点间的通信,那么可以肯定Netty的通信效率一定是特别高的,不然也不能 满足实时计算这种对传输效率要求非常高的场合。恰好,最近做个性化实时推送,每天大概要推送7亿条左右的数据,需要解决机器节点间的高效率通信问题。于是,打算 借鉴一下Spark或是Flink中底层通信方式来实现。

既然打算借鉴,那首先就必须对Spark节点间的RPC有一个深入的了解才行,于是在前面已经阅读过相关源码的基础上,咱们再有目的的深入分析研究一下,以方便咱的这个 借鉴过程^-^!

先回顾一下Spark源码阅读5:RpcEnv的知识。首先,RPC环境承担着Spark体系内几乎所有的内部及外部通信,RpcEnv抽象类是Spark RPC环境的通用表示,其需要由SparkConf组件加载Spark中RPC相关的配置,但RpcEnv知识一个抽象类,NettyRpcEnv是Spark官方提供的RPC环境的唯一实现,通过 NettyRpcEnvFactory的create()方法创建,这个方法先创建JavaSerializer序列化器,用于RPC传输的序列化,然后通过NettyRpcEnv的构造方法创建NettyRpcEnv, 这其中也包含一些RPC基础组件的初始化,最后定义函数变量startNettyRpcEnv并调用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。如此就能得到 Spark RPC环境了。之后再在这层环境上使用RpcEndpoint进行通信就是顺利成章的事了。

好了,到了这里,相信大家应该已经明确了我们的借鉴过程的起点应该是SparkConf,不过SparkConf负责管理的是整个Spark的配置项,我们需要借鉴的只是RPC相关的东西, 并不需要那么多,其实可以给它进行一下瘦身工作。关于SparkConf的知识,可以参考Spark源码阅读1:SparkConf

在完成了SparkConf的瘦身后,就来到了RpcEnv这个抽象类,在这个类中RpcEnvFileServer类没什么作用,完全可以去掉,另外为了让它能正常运行还得添加一些额外的 工具类。在这个类中,我们知道最重要的方法是在其中定义的setupEndpoint()方法,它用来注册一个RPC端点(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果 客户端想向一个RpcEndpoint发送消息,那么首先必须就获取其对应RpcEndpoint的引用。

再来看一下RpcEnv抽象类的唯一实现NettyRpcEnv类,首先要创建它必须要通过NettyRpcEnvFactory类的create()方法,它接收一个参数RpcEnvConfig。这个配置类 与RpcEnv在同一个文件中,是一个简单的样例类,对SparkConf进行了简单的封装,增加了一些RPC通信额外必需的参数,包括IP、端口号等等。在create()方法中会先创建 Java序列化器,然后通过NettyRpcEnv的构造方法创建NettyRpcEnv,这其中也包含一些RPC基础组件的初始化,最后判断如果是非客户端模式就定义函数变量startNettyRpcEnv 并调用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。

RpcEndpoint和RpcEndpointRef是RPC环境中的基础组件,RpcEnv抽象类中定义的setupEndpoint()方法用来注册一个RPC端点RpcEndpoint,返回其引用RpcEndpointRef。 如果客户端想向一个RpcEndpoint发送消息,那么首先必须获取其对应RpcEndpoint的引用。实现一个继承自RpcEndpoint的Endpoint表明可以并发的调用该服务,如果实现 一个继承自ThreadSafeRpcEndpoint则表明该Endpoint不允许并发。在setupEndpoint()方法中将会遇到另一个重要的组件Dispatcher,这个组件在 Spark源码阅读6:Dispatcher中进行过分析,它是NettyRpcEnv中所包含的组件,用于将消息路由到正确的RPC端点。其线程池执行的都是MessageLoop, 这是一个内部类,本质上是一个线程,用于不断循环的处理消息,根据消息类型的不同会调用RpcEndpoint中的对应方法进行处理,如果没有匹配,则通过safelyCall()方法 并最终调用RpcEndpoint.onError()进行处理。

好了,还剩最后一步,通过setupEndpoint()方法得到了RpcEndpointRef,也就是远端RPC端点的引用,于是就可以调用ask()方法和send()方法直接向远端发送消息。其中, ask()方法的作用是”异步发送一条消息,并在指定的超时时间内等待RPC端点的回复”,其执行分为两种情况:如果远端地址与当前NettyRpcEnv的地址相同,那么说明处理该 消息的RPC端点就在本地,此时新建Promise对象,将其Future设置为回调方法(即onSuccess()方法和onFailure()方法),并调用调度器的postLocalMessage()方法将消 息发送给本地的RPC端点;如果远端地址与当前NettyRpcEnv的地址不同,则说明处理该消息的RPC端点位于其它节点,此时先将消息序列化,并将其与onSuccess()、onFailure() 方法逻辑一起封装到RpcOutboxMessage中并投递出去。send()方法的作用是”发送一条单向的异步消息,使用’发送即忘’语义,无需回复”,实现逻辑与ask()方法大致相同, 可以类比ask()为TCP,而send()为UDP。投递过程就是放到Outbox中,然后通过其send()方法发送出去,通过Dispatcher组件路由到正确的节点,放到远端节点的Inbox中, 通过MessageLoop从reveivers阻塞队列中获取消息,调用Inbox.process()方法处理。关于这部分的内容,在Spark源码阅读7:NettyRpcEnv有 比较细致的讲解。

至此,已经介绍完成了Spark RPC中涉及到的所有主要组件及它们的作用与调用关系,这些也是我们想要借鉴Spark RPC所需要的所有主要组件。总的来说,Spark RPC的组件 还是比较独立,只要对其有深入的了解,借鉴起来并不是特别复杂。

好了,今天就到这里。从明天开始再研究一下Flink中底层RPC的实现吧!