为什么会选择写Spark RPC,由于工作需要一直关注Spark的dev mail. Spark RPC的实现从之前依赖Akka Actor到抽象出统一的RPC interface, 最后现在基于netty自己开发,一直都觉得很好奇.
统一的Spark RPC interface
在Spark Core中依赖Akka,使得外围框架必需使用同一版本的Akka, 灵活性大大降低,抽象出Spark RPC interface,可以通过反射使用Akka, 避免对它的依赖,一旦有了统一的RPC interface, 可以使用其他不同的框架实现,如netty.
类Actor的接口
- RpcAddress,一对host和post, 代表一个RpcEnv
- RpcEndpointAddress 包含一个RpcAddress和name, 表示在某个RpcEnv上的RpcEndpoint, name只保证在RpcEnv上的唯一性.
- RpcCallContext线程安全类,用在RpcEndpoint中的receiveAndReply中可以使RpcEndpoint异步处理消息发送reply
- RpcEndpoint 类似Actor, 根据消息触发不同功能,life-cycle: constructor -> onStart -> receive* -> onStop, receive可以并发调用,如果想让receive线程安全,使用ThreadSafeRpcEndpoint
- RpcEnv实现整个Rpc逻辑,注册RpcEndpoint, 管理RpcEndpoint和RpcEndpointRef, 负责分发数据到RpcEndpoint
- RpcEndpointRef是RpcEndpoint的远程引用,提供send/ask方法,向RpcEndpoint发送数据,简单点实际是一个客户端,只是标注要发往那个RpcEndpoint
和Actor的异同
- 线程安全性:Actor是一个有状态,receive一次只能处理一条消息,不能并行执行.对于使用Actor实现Rpc interface, 不需要保证线程安全性,synchronize完全不需要,Akka已经保证.但这个设计也有不妥之处,假设一个事件RpcEndpoint需要处理很长时间,后续的消息延迟就会很高,响应变慢.通常处理办法都是开一个线程池来处理,对于RpcEndpoint的状态的更新又会牵涉到synchronize来保证线程安全性.
- 在RpcEndpoint中添加方法
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
使异步处理消息成为可能, 从而使RpcEndpoint可以并行处理消息.
trait RpcCallContext {
def reply(response: Any): Unit
def sendFailure(e: Throwable): Unit
def senderAddress: RpcAddress
}
- 异常处理,Actor中有SupervisorStrategy来负责异常处理,Non-fatal throwables会回传给sender, 由sender来判断处理逻辑.
- 隔离事件处理逻辑,
RpcEndpointRef
中ask
必须发送到RpcEndpoint
中的receiveAndReply
,send
才会发送到receive
Netty具体实现
//TODO