[SPARK] 简单分析 Spark 的 RPC 通信框架

2017-01-02

在Spark中,已经采用了Netty作为RPC的通信框架,其通信的RpcEndpointRef和RpcEndpoint都是通过RpcEnv进行创建,在创建RpcEnv时,最终会调用NettyRpcEnvFactory中的create方法,并通过传入的clientMode去决定是否启动TransportServer。

以下是NettyRpcEnv的简图

除了Dispatcher之外,还有Inbox,Outbox,RpcEndpoint,RpcEndpointRef等几个重要的类

RpcEndpointRef的作用

进行远程通信时,一般都需要一个client一个server,而RpcEndpointRef就相当于一个client的角色,并且通过RpcEnv的实现类(NettyRpcEnv)的asyncSetupEndpointRefByURI进行创建在NettyRpcEndpointRef中我们可以看到,其实他只是需要RpcEndpoint对应的ip,port和RpcEndpoint name,然后程序在调用ask或者send方法发送信息时,NettyRpcEnv会根据他所提供的地址信息封装成RequestMessage进行处理,这里这个this指的是一个NettyRpcEndpointRef

1
2
3
4
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
}

程序在发送RequestMessage之前,会先判断改发送的地址是否是本地的地址,如果不是,则将message封装为OutboxMessage,并放到Outbox当中,如果是本地,则通过Dispatcher把message放到对应EndpointData的inbox里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[netty] def send(message: RequestMessage): Unit = {
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
// Message to a local RPC endpoint.
try {
dispatcher.postOneWayMessage(message)
} catch {
case e: RpcEnvStoppedException => logWarning(e.getMessage)
}
} else {
// Message to a remote RPC endpoint.
postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
}
}

Inbox,Outbox的作用

简单来说,Inbox主要的作用是存储发送给RpcEndpoint的消息,Outbox就是存放发送到remote host的message的地方

当程序在调用targetOutbox.send(message)时,该message会先放到OutBox内部的messages的list当中,然后通过传入的TransportClient发送到对应的RpcEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def send(message: OutboxMessage): Unit = {
val dropped = synchronized {
if (stopped) {
true
} else {
messages.add(message)
false
}
}
if (dropped) {
message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
} else {
drainOutbox()
}
}

RpcEndpoint的作用

像刚才说的,客户端有了,服务端也就是RpcEndpoint,就像CoarseGrainedExecutorBackend一样,它也是一个RpcEndpoint,并实现了对应的接口(receive,onStart)等等。 在RpcEndpoint启动时,需要RpcEnv中setupEndpoint,也就是向Dispatcher注册RpcEndpoint,这样dispatcher才能把message分发对应的RpcEndpoint当中

1
2
    env.rpcEnv.setupEndpoint("Executor", 
new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

下面的就是Dispatcher中的注册逻辑,里面维护着endpoints,endpointRefs和receivers等几个重要的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
}

就想刚才所说的如果我传进去的clientMode为false,就会启动相应的TransportServer监听该主机对应的端口,NettyRpcHandler通过反序列化得到对应的RequestMessage,并通过message的message.receiver.name找到对应EndpointData,并把message放到对应的inbox中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}

然后通过把对应的EndpointData放到receivers中,通过设置好的线程池的线程去消费receivers里面的EndpointData,从而调用Endpoint里面的的receive等实现方法进行不同的逻辑处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}

一个流程就大约这样了,通过分析能够加深对框架设计和spark的认识。

参考资料

Spark RPC通信层设计原理分析