HDFSRPC通信框架详解

news/2024/5/20 5:09:38 标签: hdfs

本文主要对HDFSRPC通信框架解析。包括listener,reader,handler,responser等实现类的源码分析。注意hadoop版本为3.1.1。

写在前面

rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。

https://blog.csdn.net/yhl_jxy/article/details/79332092

还有本文中涉及到的代码大部分都是作者都整理过的,会和server源码有些许区别。

RPC框架架构图

1871_2.jpeg

从架构图中可以看出一个socket连接的数据处理被多个模块分割,每个模块处理特定的问题。这样做的好处一方面保证了call的并发,另一方面也保证了代码的可扩展性。

Listener

listener就是监听线程,那到底是监听什么?显而易见是socket连接又称connection。

Listener.run、doAccpect

public void run() {
    LOG.info(Thread.currentThread().getName() + ": starting");
    Server.connectionManager.startIdleScan();
    while (Server.running) {
      SelectionKey key = null;
      try {
        getSelector().select();
        Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
        while (iter.hasNext()) {
          key = iter.next();
          iter.remove();
          try {
            if (key.isValid()) {
              if (key.isAcceptable())
                doAccept(key);
            }
          } catch (IOException e) {
          }
          key = null;
        }
      } catch (OutOfMemoryError e) {
        // we can run out of memory if we have too many threads
        // log the event and sleep for a minute and give 
        // some thread(s) a chance to finish
        LOG.warn("Out of Memory in server select", e);
        closeCurrentConnection(key, e);
        Server.connectionManager.closeIdle(true);
        try { Thread.sleep(60000); } catch (Exception ie) {}
      } catch (Exception e) {
        closeCurrentConnection(key, e);
      }
    }
    LOG.info("Stopping " + Thread.currentThread().getName());

    synchronized (this) {
      try {
        acceptChannel.close();
        selector.close();
      } catch (IOException e) { }

      selector= null;
      acceptChannel= null;
      
      // close all connections
      Server.connectionManager.stopIdleScan();
      Server.connectionManager.closeAll();
    }
  }

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    SocketChannel channel;
    while ((channel = server.accept()) != null) {

      channel.configureBlocking(false);
      channel.socket().setTcpNoDelay(tcpNoDelay);
      channel.socket().setKeepAlive(true);
      
      Reader reader = getReader();
      Connection c = Server.connectionManager.register(channel);
      // If the connectionManager can't take it, close the connection.
      if (c == null) {
        if (channel.isOpen()) {
          IOUtils.cleanup(null, channel);
        }
        Server.connectionManager.droppedConnections.getAndIncrement();
        continue;
      }
      key.attach(c);  // so closeCurrentConnection can get the object
      reader.addConnection(c);
    }
  }

简单来说就是accept channel,变成connection,然后交给reader处理。

Reader

Reader在整个RPC框架中起着举足轻重的作用。在HDFSRPC协议详解一文中processOneRpc之前的工作都是reader完成的。总结一下就是以下几点:

  1. rpc connection初始7字节的检查。
  2. sasl握手与验证。
  3. IpcConnectionContext读取。
  4. processOneRpc准备工作,包括RequestHeaderProto解析。

还有一点要注意的一次reader就包含完成这所有工作,而不是多次完成。单次reader生成call以后,就会马上下次call的read,本质上call是并发的,由handler处理。

reader的源码其实很简单,本质上是循环执行了connection.readAndProcess()。本文不会对readAndProcess过多介绍,有兴趣可以查看HDFSRPC协议详解。

@Override
  public void run() {
    LOG.info("Starting " + Thread.currentThread().getName());
    try {
      doRunLoop();
    } finally {
      try {
        readSelector.close();
      } catch (IOException ioe) {
        LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
      }
    }
  }

  private synchronized void doRunLoop() {
    while (Server.running) {
      SelectionKey key = null;
      try {
        // consume as many connections as currently queued to avoid
        // unbridled acceptance of connections that starves the select
        int size = pendingConnections.size();
        for (int i=size; i>0; i--) {
          Connection conn = pendingConnections.take();
          conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
        }
        readSelector.select();

        Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
        while (iter.hasNext()) {
          key = iter.next();
          iter.remove();
          try {
            if (key.isReadable()) {
              doRead(key);
            }
          } catch (CancelledKeyException cke) {
            // something else closed the connection, ex. responder or
            // the listener doing an idle scan.  ignore it and let them
            // clean up.
            LOG.info(Thread.currentThread().getName() +
                ": connection aborted from " + key.attachment());
          }
          key = null;
        }
      } catch (InterruptedException e) {
        if (Server.running) {                      // unexpected -- log it
          LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
        }
      } catch (IOException ex) {
        LOG.error("Error in Reader", ex);
      } catch (Throwable re) {
        LOG.error("Bug in read selector!", re);
        //ExitUtil.terminate(1, "Bug in read selector!");
      }
    }
  }

//from Listener doRead
  void doRead(SelectionKey key) throws InterruptedException {
    int count;
    Connection c = (Connection)key.attachment();
    if (c == null) {
      return;  
    }
    c.setLastContact(Time.now());
    
    try {
      count = c.readAndProcess();
    } catch (InterruptedException ieo) {
      LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
      throw ieo;
    } catch (Exception e) {
      // Any exceptions that reach here are fatal unexpected internal errors
      // that could not be sent to the client.
      LOG.info(Thread.currentThread().getName() +
          ": readAndProcess from client " + c +
          " threw exception [" + e + "]", e);
      count = -1; //so that the (count < 0) block is executed
    }
    // setupResponse will signal the connection should be closed when a
    // fatal response is sent.
    if (count < 0 || c.shouldClose()) {
      Server.closeConnection(c);
      c = null;
    }
    else {
      c.setLastContact(Time.now());
    }
  }   

CallQueue

callQueue主要是存放call队列,由于callqueue在hdfs是一个较为复杂的东西,后期会单做一期介绍。

Handler

handler线程也比较简单,实际上就是执行了call.run()。

@Override
  public void run() {
    LOG.debug(Thread.currentThread().getName() + ": starting");
    while (Server.running) {
      try {
        final Call call = Server.callQueue.take(); // pop the queue; maybe blocked here
        if (LOG.isDebugEnabled()) {
          LOG.debug(Thread.currentThread().getName() + ": " + call);
        }
        CurCall.set(call);
        /*TODO
        UserGroupInformation remoteUser = call.getRemoteUser();
        if (remoteUser != null) {
          remoteUser.doAs(call);
        } else {
          call.run();
        }*/
        call.run();
      } catch (InterruptedException e) {
        if (Server.running) {                          // unexpected -- log it
          LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
        }
      } catch (Exception e) {
        LOG.info(Thread.currentThread().getName() + " caught an exception", e);
      } finally {
        CurCall.set(null);
      }
    }
    LOG.debug(Thread.currentThread().getName() + ": exiting");
  }

主要的难点是这么执行call.run()。要知道call.run首先要知道protocols。

Protocols

每个server都自己的Protocols,protocols首先是以rpcKind分类的。

enum RpcKindProto {
  RPC_BUILTIN          = 0;  // Used for built in calls by tests
  RPC_WRITABLE         = 1;  // Use WritableRpcEngine 
  RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine
}

3.x的rpckind都使用的是RPC_PROTOCOL_BUFFER,所以以这个为例。

RPC_PROTOCOL_BUFFER的protocols会放到一个hashmap里面。

Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);

key为ProtoNameVer,要注意的hashcode的实现方法。

static class ProtoNameVer {
    final String protocol;
    final long   version;
    ProtoNameVer(String protocol, long ver) {
      this.protocol = protocol;
      this.version = ver;
    }
    @Override
    public boolean equals(Object o) {
      if (o == null) 
        return false;
      if (this == o) 
        return true;
      if (! (o instanceof ProtoNameVer))
        return false;
      ProtoNameVer pv = (ProtoNameVer) o;
      return ((pv.protocol.equals(this.protocol)) && 
          (pv.version == this.version));     
    }
    @Override
    public int hashCode() {
      return protocol.hashCode() * 37 + (int) version;    
    }
  }

所以任何protocol必须有protocol和version,即注解类ProtocolInfo。

@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {
  String protocolName();  // the name of the protocol (i.e. rpc service)
  long protocolVersion() default -1; // default means not defined use old way
}

一个protocol的接口类类似这样。

@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/**
 * Protocol that a clients use to communicate with the NameNode.
 *
 * Note: This extends the protocolbuffer service based interface to
 * add annotations required for security.
 */
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {
}

那反射的方法怎么来呢?我们可以发现ClientNamenodeProtocol.BlockingInterface其实是protobuf编译出来的,可以看一下ClientNamenodeProtocol.proto文件的最后service定义。

service ClientNamenodeProtocol {
  rpc getBlockLocations(GetBlockLocationsRequestProto)
      returns(GetBlockLocationsResponseProto);
  rpc getServerDefaults(GetServerDefaultsRequestProto)
      returns(GetServerDefaultsResponseProto);
  rpc create(CreateRequestProto)returns(CreateResponseProto);
  rpc append(AppendRequestProto) returns(AppendResponseProto);
  rpc setReplication(SetReplicationRequestProto)
      returns(SetReplicationResponseProto);
  rpc setStoragePolicy(SetStoragePolicyRequestProto)
  ...
}

编译出来就是ClientNamenodeProtocol.BlockingInterface,里面就是方法列表。

我们自己的实现类只需要实现ClientNamenodeProtocolPB即可。例如ClientNamenodeProtocolServerSideTranslatorPB。

//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);    

最后call.run其实是根据RequestHeaderProto来找到对应的实现类。

message RequestHeaderProto {
  /** Name of the RPC method */
  required string methodName = 1;

  /** 
   * RPCs for a particular interface (ie protocol) are done using a
   * IPC connection that is setup using rpcProxy.
   * The rpcProxy's has a declared protocol name that is 
   * sent form client to server at connection time. 
   * 
   * Each Rpc call also sends a protocol name 
   * (called declaringClassprotocolName). This name is usually the same
   * as the connection protocol name except in some cases. 
   * For example metaProtocols such ProtocolInfoProto which get metainfo
   * about the protocol reuse the connection but need to indicate that
   * the actual protocol is different (i.e. the protocol is
   * ProtocolInfoProto) since they reuse the connection; in this case
   * the declaringClassProtocolName field is set to the ProtocolInfoProto
   */
  required string declaringClassProtocolName = 2;
  
  /** protocol version of class declaring the called method */
  required uint64 clientProtocolVersion = 3;
}

然后通过反射,去执行了实现类的方法。

 Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {
     RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
     RequestHeaderProto rpcRequest = request.getRequestHeader();
     String methodName = rpcRequest.getMethodName();

     /** 
      * RPCs for a particular interface (ie protocol) are done using a
      * IPC connection that is setup using rpcProxy.
      * The rpcProxy's has a declared protocol name that is 
      * sent form client to server at connection time. 
      * 
      * Each Rpc call also sends a protocol name 
      * (called declaringClassprotocolName). This name is usually the same
      * as the connection protocol name except in some cases. 
      * For example metaProtocols such ProtocolInfoProto which get info
      * about the protocol reuse the connection but need to indicate that
      * the actual protocol is different (i.e. the protocol is
      * ProtocolInfoProto) since they reuse the connection; in this case
      * the declaringClassProtocolName field is set to the ProtocolInfoProto.
      */

     String declaringClassProtoName = 
         rpcRequest.getDeclaringClassProtocolName();
     long clientVersion = rpcRequest.getClientProtocolVersion();
     //LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);
     ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);
     BlockingService service = (BlockingService) protocolImpl.protocolImpl;
     MethodDescriptor methodDescriptor = service.getDescriptorForType()
         .findMethodByName(methodName);
     if (methodDescriptor == null) {
       String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";
       LOG.warn(msg);
       throw new RpcNoSuchMethodException(msg);
     }
     Message prototype = service.getRequestPrototype(methodDescriptor);
     Message param = request.getValue(prototype);

     Message result = null;
     long startTime = Time.now();
     int qTime = (int) (startTime - receiveTime);
     Exception exception = null;
     boolean isDeferred = false;
     try {
       //server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
       result = service.callBlockingMethod(methodDescriptor, null, param);
       // Check if this needs to be a deferred response,
       // by checking the ThreadLocal callback being set
     } catch (ServiceException e) {
       exception = (Exception) e.getCause();
       throw (Exception) e.getCause();
     } catch (Exception e) {
       exception = e;
       throw e;
     } finally {
       int processingTime = (int) (Time.now() - startTime);
       //if (LOG.isDebugEnabled()) {
         String msg =
             "Served: " + methodName + (isDeferred ? ", deferred" : "") +
                 ", queueTime= " + qTime +
                 " procesingTime= " + processingTime;
         if (exception != null) {
           msg += " exception= " + exception.getClass().getSimpleName();
         }
         //LOG.debug(msg);
         LOG.info(msg);
         //LOG.info("params:" + param.toString());
         //LOG.info("result:" + result.toString());
       //}
       String detailedMetricsName = (exception == null) ?
           methodName :
           exception.getClass().getSimpleName();
       //server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);
       
     }
     return RpcWritable.wrap(result);
   }

完成以后如果有返回Message会放入rpccall.rpcResponse。然后再把call放入ResponseQueue。

ResponseQueue

在connection中,主要存放处理完的rpccall。

Responder

Responder线程主要负责call结果的返回。

 private boolean processResponse(LinkedList<RpcCall> responseQueue,
                                  boolean inHandler) throws IOException {
    boolean error = true;
    boolean done = false;       // there is more data for this channel.
    int numElements = 0;
    RpcCall call = null;
    try {
      synchronized (responseQueue) {
        //
        // If there are no items for this channel, then we are done
        //
        numElements = responseQueue.size();
        if (numElements == 0) {
          error = false;
          return true;              // no more data for this channel.
        }
        //
        // Extract the first call
        //
        call = responseQueue.removeFirst();
        SocketChannel channel = call.connection.channel;
        if (LOG.isDebugEnabled()) {
          LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
        }
        //
        // Send as much data as we can in the non-blocking fashion
        //
        int numBytes = call.connection.channelWrite(channel, call.rpcResponse);
        if (numBytes < 0) {
          return true;
        }
        if (!call.rpcResponse.hasRemaining()) {
          //Clear out the response buffer so it can be collected
          call.rpcResponse = null;
          call.connection.decRpcCount();
          if (numElements == 1) {    // last call fully processes.
            done = true;             // no more data for this channel.
          } else {
            done = false;            // more calls pending to be sent.
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                + " Wrote " + numBytes + " bytes.");
          }
        } else {
          //
          // If we were unable to write the entire response out, then 
          // insert in Selector queue. 
          //
          call.connection.responseQueue.addFirst(call);
          
          if (inHandler) {
            // set the serve time when the response has to be sent later
            call.timestamp = Time.now();
            
            incPending();
            try {
              // Wakeup the thread blocked on select, only then can the call 
              // to channel.register() complete.
              writeSelector.wakeup();
              channel.register(writeSelector, SelectionKey.OP_WRITE, call);
            } catch (ClosedChannelException e) {
              //Its ok. channel might be closed else where.
              done = true;
            } finally {
              decPending();
            }
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                + " Wrote partial " + numBytes + " bytes.");
          }
        }
        error = false;              // everything went off well
      }
    } finally {
      if (error && call != null) {
        LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
        done = true;               // error. no more data for this channel.
        Server.closeConnection(call.connection);
      }
    }
    return done;
  }

http://www.niftyadmin.cn/n/5452443.html

相关文章

【前端】-【性能优化常识】

目录 前端性能优化指标首屏速度、白屏时间性能优化收效很大的操作&#xff1a;减少首屏资源体积收效不大或者特殊情况的优化操作 操作速度、渲染速度造成操作卡顿和渲染慢的场景性能优化 数据缓存 补充知识异步加载加载方式一&#xff1a;prefetch加载加载方式二&#xff1a;sc…

使用 IntelliJ IDEA 和 Maven 构建 Java 项目

使用 IntelliJ IDEA 和 Maven 构建 Java 项目 在 Java 开发中&#xff0c;使用 Maven 是一种广泛采用的构建工具&#xff0c;而 IntelliJ IDEA 则是一款功能强大的集成开发环境&#xff08;IDE&#xff09;。结合 Maven 和 IntelliJ IDEA&#xff0c;可以更加高效地管理和构建…

【数字图像处理 】 灰度变换增强图像

文章目录 灰度变换增强图像对数变换指数(伽玛)变换直方图均衡化变换 灰度变换增强图像 对数变换 对数变换的作用是对图像的低灰度范围进行扩展&#xff0c;并对高灰度范围进行压缩&#xff0c;得到的结 果图像灰度分布更均匀&#xff0c;其通用表达式为&#xff1a; y c log…

.Net Web窗口页属性

<%Page %> Page 指令定义 Web 窗体使用的属性&#xff0c;这些属性将被 Web 窗体页分析器和编译器使用。只能包含在 .aspx 文件中。我们每新建一个 Web 页面时&#xff0c;系统会自动为该 Web 页面头部创建一个 Page 指令&#xff0c;来指明页面最基本的属性。 Langu…

MySQL---存储过程详解

目录 一、介绍 二、基础语法 三、变量 四、流程控制 五、参数 六、游标 七、条件处理程序 八、存储函数 一、介绍 存储过程是事先经过编译并存储在数据库中的一段 SQL 语句的集合&#xff0c;调用存储过程可以简化应用开发人员的很多工作&#xff0c;减少数据在数据库和…

sever00启动AList

sever00启动AList cd ~/domains/alist && ~/.npm-global/bin/pm2 start ./alist -- server 其他 Serv00是一个提供免费的Virtual Host的平台&#xff0c;其托管平台使用的是FreeBSD系统&#xff0c;并不是Linux。每个账号有效期10年&#xff0c;超过三个月不登入Pan…

【MySQL】5.MySQL高级语句与sql语句

常用查询 对MySQL数据库的查询&#xff0c;除了基本的查询外&#xff0c;有时候需要对查询的结果集进行处理&#xff1b; 例如&#xff1a;只取10条数据、对查询结果进行排序或分组等 一、按关键字排序 ps&#xff1a;类比与windows任务管理器 使用select 语句可以将需要的…

湖北汽车工业学院 实验一 关系数据库标准语言SQL

头歌 实验一 关系数据库标准语言SQL 制作不易&#xff01;点个关注呗&#xff01;为大家创造更多的价值&#xff01; 目录 头歌 实验一 关系数据库标准语言SQL**制作不易&#xff01;点个关注呗&#xff01;为大家创造更多的价值&#xff01;** 第一关&#xff1a;创建数据库第…