`
zqhxuyuan
  • 浏览: 31609 次
  • 性别: Icon_minigender_1
  • 来自: 福建
社区版块
存档分类
最新评论

Hadoop源码分析-RPC.Client

阅读更多


内部类

作用

Call 

用于封装Invocation对象,作为VO写到服务端,同时也用于存储从服务端返回的数据

Connection 

用以处理远程连接对象。继承了Thread

ConnectionId 

唯一确定一个连接

由于Client可能和多个Server通信, 典型的一次HDFS读, 需要和NameNode打交道, 也需要和某个/某些DataNode通信。这意味着某一个Client需要维护多个连接。同时为了减少不必要的连接, Client的做法是拿ConnectionId来做为Connection的ID。ConnectionId包括一个InetSocketAddress(IP地址+端口号或主机名+端口号)对象和一个用户信息对象。即同一个用户到同一个InetSocketAddress的通信将共享同一个连接。

 

连接被封装在类Client.Connection中, 所有的RPC调用, 都是通过Connection进行通信。一个RPC调用, 自然有输入参数, 输出参数和可能的异常, 同时为了区分在同一个Connection上的不同调用, 每个调用都有唯一的id。调用是否结束也需要一个标记, 所有的这些都体现在对象Client.Call中。Connection对象通过一个Hash表, 维护在这个连接上的所有Call。

一个RPC调用通过addCall, 把请求加到Connection里。为了能够在这个框架上传输Java的基本类型, String和Writable接口的实现类, 以及元素为以上类型的数组, 我们一般把Call需要的参数打包成为ObjectWritable对象。

Client.Connection会通过socket连接服务器, 连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法), 校验成功后就可以通过Writable对象来进行请求的发送/应答了。注意, 每个Client.Connection会起一个线程, 不断去读取socket, 并将收到的结果解包, 找出对应的Call, 设置Call并通知结果已经获取。

Call使用Obejct的wait和notify, 把RPC上的异步消息交互转成同步调用。

还有一点需要注意, 一个Client会有多个Client.Connection, 这是一个很自然的结果。


 

 

ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);

第一个参数Invocation实现了Writable,作为Call的一部分

 

Client.call(param, remoteId)

  /** Make a call, passing param, to the IPC server defined by remoteId, returning the value. */
  public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
    Call call = new Call(param);  							//将传入的数据封装成call对象 
    Connection connection = getConnection(remoteId, call); 	//获得一个连接 
    connection.sendParam(call);  							// send the parameter 向服务端发送call对象 
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();           // wait for the result 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
      if (interrupted) { // set the interrupt flag now that we are done waiting 因中断异常而终止,设置标志interrupted为true 
        Thread.currentThread().interrupt();
      }
      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception: use the connection because it will reflect an ip change, unlike the remoteId 本地异常
          throw wrapException(connection.getRemoteAddress(), call.error);
        }
      } else {
        return call.value; //返回结果数据 
      }
    }
  }

 

还有一个并行RPC调用的重载方法

  /** Makes a set of calls in parallel.  Each parameter is sent to the corresponding address.
   * When all values are available, or have timed out or errored, the collected results are returned in an array.
   * The array contains nulls for calls that timed out or errored.  */
  public Writable[] call(Writable[] params, InetSocketAddress[] addresses, Class<?> protocol, UserGroupInformation ticket, Configuration conf){
    if (addresses.length == 0) return new Writable[0];
    ParallelResults results = new ParallelResults(params.length);
    synchronized (results) {
      for (int i = 0; i < params.length; i++) {
        ParallelCall call = new ParallelCall(params[i], results, i);
        try {
          ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i], protocol, ticket, 0, conf);
          Connection connection = getConnection(remoteId, call);
          connection.sendParam(call);  	// send each parameter
        } catch (IOException e) {
          results.size--;             		//  wait for one fewer result
        }
      }
      while (results.count != results.size) {
        try {
          results.wait();               	// wait for all results
        } catch (InterruptedException e) {}
      }
      return results.values;
    }
  }

Call

  /** A call waiting for a value. */
  private class Call {
    int id;            	// call id 调用次数
    Writable param;   	// parameter 参数(Invocation包含调用方法的方法名和方法的参数)
    Writable value;    	// value, null if error 返回值(调用方法的方法值)
    IOException error;  	// exception, null if value 调用出现异常?
    boolean done;     	// true when call is done 调用完成?

    protected Call(Writable param) {
      this.param = param;
      synchronized (Client.this) {
        this.id = counter++;
      }
    }

    /** Indicate when the call is complete and the value or error are available.  Notifies by default.  */
    protected synchronized void callComplete() {
      this.done = true;
      notify();      		// notify caller 当调用方法完成,通知调用者,即Invoker-->InvocationHandler--->接口代理-->客户端-->Client
    }
    /** Set the return value when there is no error.  Notify the caller the call is done. 返回RPC调用的方法执行结果 */
    public synchronized void setValue(Writable value) {
      this.value = value;
      callComplete();
    }
  }

并行RPC调用对应的ParallelCall

  /** Call implementation used for parallel calls. */
  private class ParallelCall extends Call {
    private ParallelResults results;
    private int index;
    
    public ParallelCall(Writable param, ParallelResults results, int index) {
      super(param);
      this.results = results;
      this.index = index;
    }

    /** Deliver result to result collector. */
    protected void callComplete() {
      results.callComplete(this);
    }
  }

  /** Result collector for parallel calls. */
  private static class ParallelResults {
    private Writable[] values;
    private int size;
    private int count;
    public ParallelResults(int size) {
      this.values = new Writable[size];
      this.size = size;
    }

    /** Collect a result. */
    public synchronized void callComplete(ParallelCall call) {
      values[call.index] = call.value;   	// store the value
      count++;                     	// count it
      if (count == size)        		// if all values are in
        notify();                    	// then notify waiting caller
    }
  }

Client.getConnection(remoteId, call)

  /** Get a connection from the pool, or create a new one and add it to the pool. Connections to a given ConnectionId are reused. */
  private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
    if (!running.get()) { // the client is stopped 连接关闭
      throw new IOException("The client is stopped");
    }
    Connection connection;
    //we could avoid this allocation for each RPC by having a connectionsId object and with set() method.通过ConnectionId,不用每次RPC调用都new一个连接  
  //We need to manage the refs for keys in HashMap properly. For now its ok. 将ConnectionId作为连接池Map的key来管理连接 
  //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。  
  //但请注意,该连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池 
    //we don't invoke the method below inside "synchronized (connections)" block above. 不在上面的同步块中完成下面的方法操作
    // The reason for that is if the server happens to be slow, 因为服务器建立一个连接会耗费比较长的时间,在同步块中执行会影响系统运行
    //it will take longer to establish a connection and that will slow the entire system down.
    connection.setupIOstreams(); //和服务端建立连接
    return connection;
  }

connection.setupIOstreams()

  /** Thread that reads responses and notifies callers.  Each connection owns a
   * socket connected to a remote address.  Calls are multiplexed through this
   * socket: responses may be delivered out of order. */
  private class Connection extends Thread {
    private InetSocketAddress server;  	// server ip:port  (客户端要连接到的)服务端的IP地址:端口号
    private String serverPrincipal;  		// server's krb5 principal name
    private ConnectionHeader header; 	// connection header 连接头
    private final ConnectionId remoteId; 	// connection id 每一次RPC调用的ConnectionId,唯一
    private AuthMethod authMethod; 	// authentication method 授权方法
    private boolean useSasl;
    private Token<? extends TokenIdentifier> token;
    private SaslRpcClient saslRpcClient;
    
    private Socket socket = null;        	// connected socket 客户端连接的Socket
    private DataInputStream in;			// 输入流	
    private DataOutputStream out;		// 输出流
    private int rpcTimeout;
    private int maxIdleTime; 			//connections will be culled if it was idle for maxIdleTime msecs
    private final RetryPolicy connectionRetryPolicy;
    private boolean tcpNoDelay; 		// if T then disable Nagle's Algorithm
    private int pingInterval; 			// how often sends ping to the server in msecs
    
    private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); 	// currently active calls
    private AtomicLong lastActivity = new AtomicLong();					// last I/O activity time
    private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  	// indicate if the connection is closed
  private IOException closeException; 									// close reason
  
  /** Update lastActivity with the current time. */
    private void touch() {
      lastActivity.set(System.currentTimeMillis());
  }
  
  /**
     * Add a call to this connection's call queue and notify a listener; synchronized. Returns false if called during shutdown.
     * @param call to add
     * @return true if the call was added.
     */
    private synchronized boolean addCall(Call call) {
      if (shouldCloseConnection.get())
        return false;
      calls.put(call.id, call);
      notify();
      return true;
    }
  
    private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket(); //创建连接用的Socket(NIO方式创建)
          this.socket.setTcpNoDelay(tcpNoDelay); 
          NetUtils.connect(this.socket, server, 20000); // connection time out is 20s 开始连接
          if (rpcTimeout > 0) {
            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
          }
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* Check for an address change and update the local reference.
           * Reset the failure counter if the address was changed*/
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          /* The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.*/
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          handleConnectionFailure(ioFailures++, ie);
        }
      }
    }
  
    /** Connect to the server and set up the I/O streams. It then sends a header to the server and starts the connection thread that waits for responses.
     * 客户端和服务器建立连接, 然后客户端会一直监听服务端传回的数据. 和05例子的监听器类似 */
    private synchronized void setupIOstreams() throws InterruptedException {
      if (socket != null || shouldCloseConnection.get()) {
        return;
      }
      try {
        short numRetries = 0;
        final short maxRetries = 15;
        Random rand = null;
        while (true) {
          setupConnection(); 										//建立连接 
          InputStream inStream = NetUtils.getInputStream(socket); 		//获得输入流(接收数据) 
          OutputStream outStream = NetUtils.getOutputStream(socket); 	//获得输出流(发送数据) 
          writeRpcHeader(outStream);
          // ... 使用Sasl安全机制的连接设置
          this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream))); //将输入流装饰成DataInputStream 
          this.out = new DataOutputStream(new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream 
          writeHeader();
          touch(); // update last activity time 更新最近活动的时间 
          start(); // start the receiver thread after the socket connection has been set up 连接建立启动接收线程等待服务端传回数据.Thread调用run()
          return;
        }
      } catch (Throwable t) {
        close();
      }
    }
    
    private void closeConnection() {
      try {
        socket.close();  // close the current connection 关闭当前连接
      } catch (IOException e) {
        LOG.warn("Not able to close a socket", e);
      }
      // set socket to null so that the next call to setupIOstreams can start the process of connect all over again.
      socket = null;
    }
    /* Write the RPC header 向输出流写入RPC调用的header*/
    private void writeRpcHeader(OutputStream outStream) throws IOException {
      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
      out.write(Server.HEADER.array()); // Write out the header, version and authentication method
      out.write(Server.CURRENT_VERSION);
      authMethod.write(out);
      out.flush();
    }
    /* Write the protocol header for each connection.Out is not synchronized because only the first thread does this.*/
    private void writeHeader() throws IOException {
      DataOutputBuffer buf = new DataOutputBuffer();
      header.write(buf); // Write out the ConnectionHeader
      int bufLen = buf.getLength();
      out.writeInt(bufLen); // Write out the payload length
      out.write(buf.getData(), 0, bufLen);
    }
    
    /* wait till someone signals us to start reading RPC response or it is idle too long, it is marked as to be closed, or the client is marked as not running.
     * 等待开始读取RPC调用的返回值, 空闲时间太长, 被标记为连接关闭, 客户端没有运行. 当准备读取数据时返回true
     * Return true if it is time to read a response; false otherwise. */
    private synchronized boolean waitForWork() {
      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
        long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity.get());
        if (timeout>0) {
          try {
            wait(timeout);
          } catch (InterruptedException e) {}
        }
      }
      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
        return true;
      } else if (shouldCloseConnection.get()) {
        return false;
      } else if (calls.isEmpty()) { // idle connection closed or stopped
        markClosed(null);
        return false;
      } else { // get stopped but there are still pending requests 
        markClosed((IOException)new IOException().initCause(new InterruptedException()));
        return false;
      }
    }
  
  // 建立连接后, start()会调用run(), 相当于05中的监听器. 监听服务端返回的数据并读取数据.
    public void run() {
      while (waitForWork()) {//wait here for work - read or close connection
        receiveResponse();
      }
      close();
    }

    /** Initiates a call by sending the parameter to the remote server. 发送Call对象给远程服务器,开始RPC调用
     * Note: this is not called from the Connection thread, but by other threads. 不是由当前客户端连接的线程调用 */
    public void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }
      DataOutputBuffer d=null;
      try {
        synchronized (this.out) {          
          //for serializing the data to be written 序列化Call对象,因为Call由id和传入的Invocation param组成,需要对所有属性进行序列化
          d = new DataOutputBuffer();  	//构造输出流缓冲区,用于客户端向服务端输出(写)数据
          d.writeInt(call.id);				//序列化Call的id
          call.param.write(d);				//序列化Call的param即Invocation对象
          byte[] data = d.getData();			//输出流的数据
          int dataLength = d.getLength();	//输出流的长度
          out.writeInt(dataLength);      	//first put the data length 首先写出数据的长度
          out.write(data, 0, dataLength);	//write the data 输出数据,向服务端写入数据
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally { //the buffer is just an in-memory buffer, but it is still polite to close early
        IOUtils.closeStream(d);
      }
    }  

    /* Receive a response. Because only one receiver, so no synchronization on in.接收响应 */
    private void receiveResponse() {
      if (shouldCloseConnection.get()) { return; }
      touch();
      int id = in.readInt();     		// try to read an id 阻塞读取输入流的id
      Call call = calls.get(id);		//在calls池中找到发送时的那个调用对象 
      int state = in.readInt();    	// read call status 阻塞读取RPC调用结果的状态
      if (state == Status.SUCCESS.state) {
        Writable value = ReflectionUtils.newInstance(valueClass, conf);
        value.readFields(in);   	// read value 读取调用结果.由此我们知道客户端接收服务端的输入流包含了3个数据
        call.setValue(value);		//将读取到的值赋给call对象,同时唤醒Client等待线程. value为方法的执行结果,设置value时会通知调用者 
        calls.remove(id);			//删除已处理的Call: 本次调用结束,从活动的调用Map中删除该调用
      }
    }
    
    /** Close the connection. */
    private synchronized void close() {
      if (!shouldCloseConnection.get()) {
        return;
      }
      // release the resources  first thing to do;take the connection out of the connection list
      synchronized (connections) {
        if (connections.get(remoteId) == this) {
          connections.remove(remoteId);
        }
      }
      // close the streams and therefore the socket
      IOUtils.closeStream(out);
      IOUtils.closeStream(in);
      disposeSasl();
      // clean up all calls
      cleanupCalls();
    }
    /* Cleanup all calls and mark them as done */
    private void cleanupCalls() {
      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
      while (itor.hasNext()) {
        Call c = itor.next().getValue(); 
        c.setException(closeException); // local exception
        itor.remove();         
      }
    }
  }

 

  • 大小: 28.2 KB
  • 大小: 36.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics