RPC是Remote Procedure Call(远程过程调用)的简称,这一机制都要面对两个问题
在此之前,我们有必要了解什么是架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度来说,架构层次协议的说法就可以成立了。Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口,如下图:
Hadoop的RPC组件,依赖于Hadoop Writable接口类型的支持,要求每个实现类都要确保将本类的对象正确序列化与反序列化。因此RPC使用Java动态代理与反射实现对象调用方式,客户端到服务器数据的序列化与反序列化由Hadoop框架或用户自己来实现,也就是数据组装时定制的。RPC架构图如下:
主要用来做方法的增强,让你可以在不修改源码的情况下,增强一些方法,在方法执行前后做任何你想做的事情(甚至根本不去执行这个方法),因为在InvocationHandler的invoke方法中,你可以直接获取正在调用方法对应的Method对象,具体应用的话,比如可以添加调用日志,做事务控制等。
这个接口的实现部署在其它服务器上,在编写客户端代码的时候,没办法直接调用接口方法,因为接口是不能直接生成对象的,这个时候就可以考虑代理模式(动态代理)了,通过Proxy.newProxyInstance代理一个该接口对应的InvocationHandler对象,然后在InvocationHandler的invoke方法内封装通讯细节就可以了。具体的应用,最经典的当然是Java标准库的RMI,其它比如hessian,各种webservice框架中的远程调用,大致都是这么实现的。
VersionedProtocol是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()
简单来说,Hadoop RPC=动态代理+定制的二进制流。分布式对象一般都会要求根据接口生成存根和框架。如 CORBA,可以通过 IDL,生成存根和框架。在ipc.RPC类中有一些内部类,下边简单介绍下
hashMap <SocketFactory, Client>
。上类图
从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。为了分析 Invoker,我们需要介绍一些 Java 反射实现 Dynamic Proxy 的背景。
Dynamic Proxy 是由两个 class 实现的:java.lang.reflect.Proxy
和 java.lang.reflect.InvocationHandler
,后者是一个接口。
所谓 Dynamic Proxy 是这样一种 class:它是在运行时生成的 class,在生成它时你必须提供一组 interface 给它,然后该 class就宣称它实现了这些 interface。
这个 Dynamic Proxy 其实就是一个典型的 Proxy 模式,它丌会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。
这个 handler,在 Hadoop 的 RPC 中,就是 Invoker 对象。 我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的 InvocationHandler 实现中。
在 Hadoop 的 RPC 中,Invoker 实现了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的唯一方法)。 Invoker 会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的 Client,通过 socket 传递到服务器端。就是说,你在 proxy 类上的任何调用,都通过 Client 发送到远方的服务器上。
Invoker 使用 Invocation。 Invocation 封装了一个过程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和 parameters,调用方法参数。注意,它实现了 Writable 接口,可以串行化。
RPC.Server 实现了 org.apache.hadoop.ipc.Server
,你可以把一个对象,通过 RPC,升级成为一个服务器。服务器接收到的请求(通过 Invocation),解串行化以后,就发成了方法名,方法参数列表和参数列表。调用 Java 反射,我们就可以调用对应的对象的方法。调用的结果再通过 socket,迒回给客户端,客户端把结果解包后,就可以返回给Dynamic Proxy 的使用者了。
我们接下来去研究的就是RPC.Invoker类中的invoke()方法了,代码如下
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable
{
……
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
……
return value.get();
}
一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有。其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信只是调用了Client类的call()方法。
接下来分析一下ipc.Client源码,在此之前我们得明确下我们的目标,总结出了以下几个问题
基于这三个问题,我们开始分析ipc.Client源码,主要包含以下几个类
Question1:客户端和服务端的连接是怎样建立的?
Client类中的cal()方法如下
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException
{
Call call = new Call(param); //将传入的数据封装成call对象
Connection connection = getConnection(remoteId, call); //获得一个连接
connection.sendParam(call); // 向服务端发送call对象
boolean interrupted = false;
synchronized (call)
{
while (!call.done)
{
try
{
call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
}
catch (InterruptedException ie)
{
// 因中断异常而终止,设置标志interrupted为true
interrupted = true;
}
}
if (interrupted)
{
Thread.currentThread().interrupt();
}
if (call.error != null)
{
if (call.error instanceof RemoteException)
{
call.error.fillInStackTrace();
throw call.error;
}
else // 本地异常
{
throw wrapException(remoteId.getAddress(), call.error);
}
}
else
{
return call.value; //返回结果数据
}
}
}
具体代码的作用我已做了注释,所以这里不再赘述。分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:
Connection connection = getConnection(remoteId, call); //获得一个连接
connection.sendParam(call); // 向服务端发送call对象
先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。
private Connection getConnection(ConnectionId remoteId,
Call call)
throws IOException, InterruptedException
{
if (!running.get())
{
// 如果client关闭了
throw new IOException("The client is stopped");
}
Connection connection;
//如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
//但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
do
{
synchronized (connections)
{
connection = connections.get(remoteId);
if (connection == null)
{
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
}
while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了
//这句代码才是真正的完成了和服务端建立连接哦~
connection.setupIOstreams();
return connection;
}
Client.Connection类中的setupIOstreams()方法如下:
private synchronized void setupIOstreams() throws InterruptedException
{
……
try
{
……
while (true)
{
setupConnection(); //建立连接
InputStream inStream = NetUtils.getInputStream(socket); //获得输入流
OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流
writeRpcHeader(outStream);
……
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream))); //将输入流装饰成DataInputStream
this.out = new DataOutputStream
(new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream
writeHeader();
// 跟新活动时间
touch();
//当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
start();
return;
}
}
catch (IOException e)
{
markClosed(e);
close();
}
}
再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了
this.socket.setTcpNoDelay(tcpNoDelay);
……
// 设置连接超时为20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
/* 设置最多连接重试为45次。
* 总共有20s*45 = 15 分钟的重试时间。
*/
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}
不难看出客户端的连接是创建一个普通的socket进行通信的。
Question2:客户端是怎样给服务端发送数据的?
Client.Connection类的sendParam()方法如下
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d=null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
//创建一个缓冲区
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //首先写出数据的长度
out.write(data, 0, dataLength); //向服务端写数据
out.flush();
}
} catch(IOException e) {
markClosed(e);
} finally {
IOUtils.closeStream(d);
}
}
Question3:客户端是怎样获取服务端的返回数据的?
Client.Connection类和Client.Call类中的相关方法如下
Method1:
public void run() {
……
while (waitForWork()) {
receiveResponse(); //具体的处理方法
}
close();
……
}
Method2:
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int id = in.readInt(); // 阻塞读取id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.get(id); //在calls池中找到发送时的那个对象
int state = in.readInt(); // 阻塞读取call对象的状态
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // 读取数据
//将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码Method3
call.setValue(value);
calls.remove(id); //删除已处理的call
} else if (state == Status.ERROR.state) {
……
} else if (state == Status.FATAL.state) {
……
}
} catch (IOException e) {
markClosed(e);
}
}
Method3:
public synchronized void setValue(Writable value) {
this.value = value;
callComplete(); //具体实现
}
protected synchronized void callComplete() {
this.done = true;
notify(); // 唤醒client等待线程
}
启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据。客户端的源码分析暂时到这,下面我们来分析Server端的源码
内部类如下
hadoop是怎样初始化RPC的Server端的呢?
Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码
private void initialize(Configuration conf) throws IOException {
……
// 创建 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//获得serviceRpcServer
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//获得server
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
……
this.server.start(); //启动 RPC server Clients只允许连接该server
if (serviceRpcServer != null) {
serviceRpcServer.start(); //启动 RPC serviceRpcServer 为HDFS服务的server
}
startTrashEmptier(conf);
}
RPC的server对象是通过ipc.RPC类的getServer()方法获得的。ipc.RPC类中的getServer()源码如下
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager <? extends TokenIdentifier > secretManager)
throws IOException
{
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。
初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码
/** 启动服务 */
public synchronized void start()
{
responder.start(); //启动responder
listener.start(); //启动listener
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++)
{
handlers[i] = new Handler(i);
handlers[i].start(); //逐个启动Handler
}
}
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// 创建ServerSocketChannel,并设置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// 将server socket绑定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 获得一个selector
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//启动多个reader线程,为了防止请求多时服务端响应延时的问题
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 注册连接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法
public void run()
{
……
while (running)
{
SelectionKey key = null;
try
{
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext())
{
key = iter.next();
iter.remove();
try
{
if (key.isValid())
{
if (key.isAcceptable())
doAccept(key); //具体的连接方法
}
}
catch (IOException e)
{
}
key = null;
}
}
catch (OutOfMemoryError e)
{
……
}
Server.Listener类中doAccept ()方法中的关键源码如下:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) { //建立连接
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader(); //从readers池中获得一个reader
try {
reader.startAdd(); // 激活readSelector,设置adding为true
SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象
readKey.attach(c); //将connection对象注入readKey
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
……
} finally {
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了。
reader.finishAdd();
}
}
}
当reader被唤醒,reader接着执行doRead()方法。
Method1:
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment(); //获得connection对象
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess(); // 接受并处理请求
} catch (InterruptedException ieo) {
……
}
……
}
Method2:
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
……
if (!rpcHeaderRead) {
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
//读取请求头
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
// 读取请求版本号
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
……
data = ByteBuffer.allocate(dataLength);
}
// 读取请求
count = channelRead(channel, data);
if (data.remaining() == 0) {
……
if (useSasl) {
……
} else {
processOneRpc(data.array());//处理请求
}
……
}
}
return count;
}
}
Method1:
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
Method2:
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // 尝试读取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
param.readFields(dis);
Call call = new Call(id, param, this); //封装成call
callQueue.put(call); // 将call存入callQueue
incRpcCount(); // 增加rpc请求的计数
}
while (running) {
try {
final Call call = callQueue.take(); //弹出call,可能会阻塞
……
//调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
value = call(call.connection.protocol, call.param, call.timestamp);
synchronized (call.connection.responseQueue) {
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
……
//给客户端响应请求
responder.doRespond(call);
}
}
void doRespond(Call call) throws IOException
{
synchronized (call.connection.responseQueue)
{
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1)
{
// 返回响应结果,并激活writeSelector
processResponse(call.connection.responseQueue, true);
}
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。