HDFS WebHDFS 读写文件分析及HTTP Chunk Transfer Coding相关问题探究

news/2024/5/20 1:56:12 标签: hdfs, http, hadoop
http://www.w3.org/2000/svg" style="display: none;">

文章目录

  • 前言
  • 需要回答的问题
    • DataNode端基于Netty的WebHDFS Service的实现
  • 基于重定向的文件写入流程
    • 写入一个大文件时WebHDFS和Hadoop Native的块分布差异
  • 基于重定向的数据读取流程
    • 尝试读取一个小文件
    • 尝试读取一个大文件
  • 读写过程中的Chunk Transfer-Encoding支持
    • 写文件使用Chunk Transfer-Encoding
    • 读文件使用Chunk Transfer-Encoding
      • Response Header中为什么没有Transfer-Encoding: chunked
      • 测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断
  • WebHDFS的特性和不足总结

前言

最近在调研使用WebHDFS进行HDFS文件读写的相关调查,因此需要对WebHDFS的整个读写流程进行探究,其中涉及到的必要的http重定向的整个流程必须搞清楚。
同时,由于HDFS涉及到大量的流式写和大文件读,因为我们比较关心WebHDFS对Chunked Transfer Coding的支持,我们对WebHDFS的这个特性进行了测试和代码层面的研究。

需要回答的问题

我们基于常规的hadoop-native 方式连接去读写文件的时候,会首先与NameNode通信,然后再与DataNode通信读取具体数据,这其中的处理逻辑都放在了hadoop-native中的具体客户端代码中。
但是一旦使用WebHDFS,我们所有的通信方式仅仅是HTTP。hadoop-native中的这种多次通信逻辑放在一个http请求中该怎么处理呢?所以,我们在测试WebHDFS以前,就首先有了疑问: 基于HTTP的WebHDFS该怎么处理先到NameNode然后到DataNode的整个过程?总不会让数据从DataNode、经过NameNode再到客户端吧?

DataNode端基于Netty的WebHDFS Service的实现

在DataNode启动的时候,会启动一个DatanodeHttpServer,用来启动对应的WebHDFS Service:

  private void startInfoServer()
    throws IOException {
    // SecureDataNodeStarter will bind the privileged port to the channel if
    // the DN is started by JSVC, pass it along.
    ServerSocketChannel httpServerChannel = secureResources != null ?
        secureResources.getHttpServerChannel() : null;

    httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
    httpServer.start();

DataNodeHttpServer 构造的时候,实际上构造了基于Netty的DataNode Http Service。我们特别注意到,在initChannel()方法中,添加了对Chunked Transfer Encoding的支持handler ChunkedWriteHandler。后面我们会具体讲这个类在我们通过WebHDFS读取文件时的作用。

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup)
        .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline p = ch.pipeline();
          p.addLast(new HttpRequestDecoder(),
            new HttpResponseEncoder());
          if (restCsrfPreventionFilter != null) {
            p.addLast(new RestCsrfPreventionFilterHandler(
                restCsrfPreventionFilter));
          }
          p.addLast(
              new ChunkedWriteHandler(), //支持chunk的数据写入
              new URLDispatcher(jettyAddr, conf, confForCreate));
        }
      });

上面的代码其实是创建一个Netty服务器端的基本流程(以下基本流程来自ChatGPT的回答,个人认为非常准确,因此直接粘贴):

  • 创建 EventLoopGroupEventLoopGroup 是一个处理 I/O 操作的线程池。在服务器端,我们通常需要创建两个 EventLoopGroup,一个用于接受客户端的连接(Boss Group),另一个用于处理连接的数据流(Worker Group)。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
  • 创建 ServerBootstrapServerBootstrap 是用于启动 Netty 服务器的引导类。它配置了服务器的各种参数,如线程模型、通道类型、处理器等。

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .childHandler(new MyChannelInitializer()); // 设置处理连接的 ChannelInitializer
    
  • 设置 ChannelInitializerChannelInitializer 是一个特殊的处理器,用于配置新连接的 ChannelPipeline。在这里,你可以添加自定义的处理器来处理连接的数据流。在上面的HDFS代码中,创建了一个匿名的ChannelInitializer实现,重写了initChannel()方法:

    this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup)
            .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new HttpRequestDecoder(),
                new HttpResponseEncoder());
            .....
            }
    
  • 实现自定义的处理器: 在 ChannelInitializerinitChannel()方法中,添加Netty集成的或者自定义的处理器ChannelHandler,用于处理连接的数据流。这些处理器会成为一个处理器链,挂载在ChannelPipeline中。很显然,这个Pipeline下面的Handler时按照addLast()添加的顺序有序的,比如WebHDFS在下面的代码中,意味着读数据的时候,数据先经过ChunkedWriteHandler,然后经过URLDispatcher,而向客户端写数据的时候,数据流先经过URLDispatcher,然后经过ChunkedWriteHandler

      p.addLast(
          new ChunkedWriteHandler(), //支持chunk的数据写入
          new URLDispatcher(jettyAddr, conf, confForCreate));
    
  • 绑定端口并启动服务器: 最后,将服务器绑定到指定的端口,并启动服务器。

    ChannelFuture f = httpServer.bind(infoAddr);
          try {
            f.syncUninterruptibly();
            ......
    

在DataNode端, 用户可以通过dfs.datanode.http.address配置了对应的WebHDFS地址,默认是0.0.0.0:9864

  public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";
  public static final int     DFS_DATANODE_HTTP_DEFAULT_PORT = 9864;
  public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;

基于重定向的文件写入流程

写入一个大文件时WebHDFS和Hadoop Native的块分布差异

我们通过基础的java.net提供的HttpURLConnection创建Http客户端向HDFS写入文件。

通过命令, 可以看到其实这个文件的第一个Block都是在同一个机器上。
hdfs -copyFromLocal这种基于hadoop native的api拷贝文件,这个文件的block是逐个申请的,NameNode会为每一个Block根据当前的BlockPlacementPolicy分配对应的replica位置,但是,基于WebHDFS的NameNode只能在请求写文件的时候和NameNode进行一次通信,返回一个Location信息供客户端进行Redirect的写操作,因此,这导致这个文件的所有Block的第一个Replica都在这台机器上。至于每一个Block剩下的Replica的复制,按照HDFS的块写入策略,都是由第一个Replica复制到剩余两个Replica上去的。

我们做测试的时候,通过WebHDFS上传一个300MB的文件到HDFS。之所以选择300MB,是因为HDFS的块大小是128MB,因此300MB的文件需要创建3个Block,从而观察3个Block的第一个 Replica 是否是同一台机器。
下面是使用Java的HTTP客户端基于WebHDFS向HDFS写入一个大概300MB的文件的代码:

public class WebHDFSWriteFileExample
{
	public static void write_file()
	{
		try {
	           // WebHDFS endpoint
	            String webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";
	
	            // HDFS path where the file will be written
	            String hdfsPath = "/user/hadoop/hello-world/330MB.txt";
	
	            // Local file path to be uploaded
	            String localFilePath = "/Users/cwu/330MB.txt";
	            // Hadoop username
	            String username = "hadoop";
	
	            // WebHDFS REST API URL for file creation (PUT method)
	            String webHdfsUrl = webHdfsEndpointProd + "/webhdfs/v1" + hdfsPath + "?op=CREATE&user.name=" + username + "&overwrite=true";
	
	            // Open the local file for reading
	            FileInputStream fileInputStream = new FileInputStream(localFilePath);
	            BufferedReader reader = new BufferedReader(new InputStreamReader(fileInputStream));
	
	            // Create HTTP connection
	            HttpURLConnection connection = createConnectionWithURL(webHdfsUrl);
	
	            // Write file content to HDFS
	            OutputStream outputStream = connection.getOutputStream();
	            String inputLine;
	            while ((inputLine = reader.readLine()) != null) {
	                outputStream.write(inputLine.getBytes());
	                outputStream.write("\r\n".getBytes());
	            }
	            outputStream.write("0\r\n\r\n".getBytes());
	            // Close the connection
	            connection.disconnect();
	       }
	}
    // 基于WebHDFS的url创建一个HttpURLConnection
    private static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {
        URL url = new URL(webHdfsUrl);
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setDoOutput(true);
        connection.setRequestMethod("PUT");
        return connection;
    }
  }

文件上传以后,我们通过fsck命令查看上传的文件的块分布情况,结果验证了我们的猜想:文件的所有Block的第一个replica都是机器81.2.20.332,显然,这个机器就是我们向NameNode发送请求的时候NameNode返回给我们的307重定向的DataNode地址。重定向地址只有一个,因此,所有的块的写入就只能写到一个机器上去。

root@dddddd-101:~# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://drdencrypted/user/hadoop/hello-world/330MB.txt -files -locations -blocks
Connecting to namenode via http://dddddd-101.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2F330MB.txt
FSCK started by hdfs (auth:SIMPLE) from /81.2.2.108 for path /user/hadoop/hello-world/330MB.txt at Wed Jan 10 14:49:14 UTC 2024
/user/hadoop/hello-world/330MB.txt 348093376 bytes, 3 block(s):  OK
0. BP-332385978-81.2.1.108-1654488140099:blk_5650223226_4580715954 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-fa64ea42-f4e8-4ebb-999e-1727fcfb879a,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-88228db5-1c5b-4715-8b36-2bb653201367,DISK], DatanodeInfoWithStorage[81.2.11.125:50010,DS-e6936497-52f8-495e-9d80-0d7d5684369a,DISK]]
1. BP-332385978-81.2.1.108-1654488140099:blk_5650225022_4580717750 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-c0cb79b2-7679-4961-8bb4-917224b248ed,DISK], DatanodeInfoWithStorage[81.2.8.131:50010,DS-368a6917-1a36-419f-a6bc-7129978d0926,DISK], DatanodeInfoWithStorage[81.2.8.132:50010,DS-56b945ff-004f-4af5-879e-2941db14693e,DISK]]
2. BP-332385978-81.2.1.108-1654488140099:blk_5650228459_4580721187 len=79657920 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-0bf35d03-c89c-4e9a-86ce-af9c1a149b52,DISK], DatanodeInfoWithStorage[81.2.11.126:50010,DS-74da8386-1369-4562-bfec-ae6350b48fbd,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-9bcd388e-a129-4f5c-99b3-6611e59f5a12,DISK]]

因此,通过WebHDFS写文件其实是有一个隐含的重定向过程,但是读者看上面的代码,并没有看到任何重定向的处理逻辑,这是因为,在默认情况下,HttpURLConnection帮我们自动处理了重定向,即,如果服务器端返回307状态码,那么客户端自动重新建立到header中的Location中到连接,然后一样的请求发送给redirect地址,这一切对用户隐藏。
如果我们的确想观察重定向的详细过程,完全可以通过connection.setInstanceFollowRedirects(false);关闭客户端的自动重定向,手动处理重定向。同时,后面会讲,假如我们enable了Chunked Transfer Coding,自动重定向处理也将被关闭。
没有了自动重定向,就需要手动创建两次http请求,第一次请求发往NameNode,NameNode返回307,并附带了指向某台DataNode的Location,然后我们再依据Location第二次建立连接,将文件写入到对应的DataNode。因此基于上面的WebHDFSWriteFileExample,我们修改代码进行手动重定向以观察重定向细节:

public class WebHDFSWriteFileExample
{
	public static void write_file()
	{
		try {
			....... 发送请求到NameNode
			if (responseCode == HttpURLConnection.HTTP_CREATED) {
	             System.out.println("File created successfully on HDFS.");
	         } else { // 发现了重定向
	             System.out.println("Error creating file on HDFS. Response Code: " + responseCode);
	             String newUrl = connection.getHeaderField("Location"); //获取header中的Location,即重定向的目标地址
	             connection.disconnect(); // 关闭原先的对NameNode的连接,开始创建针对DataNode的连接
	             HttpURLConnection connection2 = createConnectionWithURL(newUrl);
	             // Write file content to HDFS
	             outputStream = connection2.getOutputStream();
	             fileInputStream = new FileInputStream(localFilePath);
	             reader = new BufferedReader(new InputStreamReader(fileInputStream));
	             while ((inputLine = reader.readLine()) != null) {
	                 outputStream.write(inputLine.getBytes());
	             }
         }
    }
    
    private static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {
	    .....
	    connection.setInstanceFollowRedirects(false); //disable掉自动重定向
	    ......
}

基于重定向的数据读取流程

我们将分别测试和研究读取小文件(不足一个Block)和大文件(两个或者更多Block)的读取流程,二者理论上会产生区别,因为假如文件来自两个或者更多的Block,基于WebHDFS的重定向似乎就没那么简单了。
下文来讲解具体的读取过程。

尝试读取一个小文件

对于只占用一个Block的小文件,我们发送读取请求给NameNode,NameNode会返回一个307的重定向,这个重定向一定是指向这个唯一的Block的其中一个Replica所在的DataNode的。
比如,读取一个1MB的小文件的代码如下:

public class WebHDFSReadFileExample {

    private static void read(String[] args){
        try {
       	    // WebHDFS endpoint
            String webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";

            // HDFS path where the file will be written
            String hdfsPath = "/user/hadoop/hello-world/1MB.txt";

            // Local file path to be uploaded
            String localFilePath = "/Users/cwu/1MB.txt";
            // HDFS path to the file

            // Hadoop username
            String username = "hadoop";

            // WebHDFS REST API URL for file reading (GET method)
            String webHdfsUrl = webHdfsEndpoint + "/webhdfs/v1" + hdfsPath + "?op=OPEN&user.name=" + username;

            // Open the connection
            HttpURLConnection connection = createConnectionWithURL(webHdfsUrl, false, "GET", false);
            if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
                System.out.println("File created successfully on HDFS.");
            } else if (connection.getHeaderField("Location") != null){ // 307 redirect

                // 创建http连接,基于get请求读文件,关闭自动重定向,不enable chunk
                HttpURLConnection connection2 = createConnectionWithURL(newUrl, false, "GET", false);
                // Write file content to HDFS
                BufferedReader reader = new BufferedReader(new InputStreamReader(connection2.getInputStream()));
                // read the file from BufferReader....
               .....
            }
       ....
    }
    // 创建链接,用户可以选择是否打开自动重定向,是否使用chunk
    private static HttpURLConnection createConnectionWithURL(String webHdfsUrl, Boolean autoRedirect, String method, Boolean enable_chunk) throws IOException {
        URL url = new URL(webHdfsUrl);
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setInstanceFollowRedirects(autoRedirect); // 关闭自动重定向
        connection.setRequestMethod(method);
        connection.setDoOutput(true);
        if(enable_chunk)
            connection.setChunkedStreamingMode(-1); //选择是否打开chunked streaming out。后面会说,该选项决定的是客户端的写,基于GET读取文件的时候,该选项不可以打开
        return connection;
    }

读取一个小文件的过程在预期之内,HttpClient首先与NameNode 建立Http连接, NameNode响应了307状态码同时在Response Header中包含了目标location的地址信息。在我们关闭了自动重定向的情况下,建立了与Location中指示的URL建立连接,然后读取到对应的文件。

尝试读取一个大文件

我们已经知道读取文件的时候,NameNode会返回一个重定向地址给我们,告诉我们去联系对应的DataNode读取对应数据。但是,假如这个文件不止一个block,那么,是不是在读取下一个block的时候DataNode也会返回下一个DataNode的地址给我们接着进行下一个Block的读取呢?

我们创建了一个200MB的文件,显然,这个文件会有两个Block,并且,我们从fsck可以看到,这个文件的两个块的总共6个副本,分布在6台不同的机器上。

root@rccp103-2d:/var/log/hadoop-hdfs# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://rccp103-2d.iad5.prod.corp.com:8020/user/hadoop/hello-world/200MB.log -files -locations -blocks
Connecting to namenode via http://rccp103-2d.iad5.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2Fhadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1
FSCK started by hdfs (auth:SIMPLE) from /71.9.2.108 for path /user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 at Thu Jan 11 08:38:25 UTC 2024
/user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 209715453 bytes, 2 block(s):  OK
0. BP-332385978-71.9.1.108-1654488140099:blk_5655503893_4585996621 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[71.9.9.131:50010,DS-a8904c39-cb1c-4fe0-b9e8-625f3bad85be,DISK], DatanodeInfoWithStorage[71.9.7.130:50010,DS-8e0d756e-bab4-4b44-b131-6bb9af01362b,DISK], DatanodeInfoWithStorage[71.9.8.128:50010,DS-622eb32d-e292-4824-818e-a72a12144fe3,DISK]]
1. BP-332385978-71.9.1.108-1654488140099:blk_5655503948_4585996676 len=75497725 Live_repl=3 [DatanodeInfoWithStorage[71.9.10.128:50010,DS-3e0bbada-d77c-46af-a1fa-573e54443706,DISK], DatanodeInfoWithStorage[71.9.12.129:50010,DS-7942484b-2b8e-438a-9d4b-8bf8d892c0cf,DISK], DatanodeInfoWithStorage[71.9.12.131:50010,DS-d2e9c177-4ea0-4232-9f28-f8a52644be8a,DISK]]

我们还是使用上面的WebHDFSReadFileExample代码,发现,读取文件的时候,NameNode返回了307重定向,并且指向了第一个块的某一个副本机器,文件的第一个和第二个块都是从这个机器上读取的,尽管,第二个块并不是分布在这台机器上

我们检查代码,发现了其实现的原理,简而言之,第一个Block的Replica的机器,充当了整个文件读取的中转,而不是仅仅负责自己存储的那个Replica的读取。
DataNode通过向Netty注册的WebHdfsHandler这个自定义的ChannelHandler来处理发往DataNode请求(上文讲过Netty的一些基本实体的定义)。WebHdfsHandler.onOpen()方法用来处理用户的读文件请求。我们从下面的onOpen()代码的实现细节可以看到,实际上,DataNode并不是将用户请求的文件在自己上面的Replica返回给Http客户端,而是将自己作为中转,创建一个Hadoop Client,读取文件,然后将读取到的文件的InputStream封装程一个ChunkedStream返回给客户端。所以,当客户端调用connection2.getInputStream()就能够拿到对应DataNode创建的读取文件的InputStream

 private void onOpen(ChannelHandlerContext ctx) throws IOException {
    // DataNode自己创建一个读取文件的Client,这是一个普通的Hadoop Native客户端,与普通的HDFS Java 客户端没有任何去呗
    final DFSClient dfsclient = newDfsClient(nnId, conf);
    HdfsDataInputStream in = dfsclient.createWrappedInputStream(
      dfsclient.open(path, bufferSize, true));
    in.seek(offset);
    data = in;

    ctx.write(resp);
    ctx.writeAndFlush(new ChunkedStream(data) { // 封装好InputStream,返回给客户端,可以看到,是以Chunk的方式。后面会讲,ChunkedStream会被当前的Handler中的下一个Handler即ChunkedWriteHandler进一步处理
      @Override
      public void close() throws Exception {
        super.close();
        dfsclient.close();
      }
    }).addListener(ChannelFutureListener.CLOSE);
  }

读写过程中的Chunk Transfer-Encoding支持

Chunk Transfer-Encoding的支持会极大影响流式数据读写的性能。我们看看什么是Chunk Transfer-Encoding以及它能给我们带来什么:

分块传输编码(Chunk Transfer-Encoding)是一种 HTTP 传输机制,它将消息主体分割成一系列块,并使用长度前缀来标识每个块的大小。这种编码方式带来了一些好处:

  • 实时传输: 分块传输编码允许服务器在生成响应的同时将数据发送给客户端,而不必等到整个响应主体准备就绪。这对于实时生成的内容或流式传输非常有用。

  • 无需预先知道内容大小: 使用分块传输编码时,不需要预先知道整个消息主体的大小。这对于动态生成的内容或流式传输,其中内容的大小可能在生成过程中动态变化,非常有用。

  • 提高响应速度: 分块传输编码可以在数据准备的同时发送,减少了等待整个响应准备就绪的时间。这有助于提高响应速度,特别是对于大型或动态生成的内容。

  • 降低延迟: 分块传输编码允许客户端在接收到一部分数据时就开始处理,而不必等待整个响应完成。这有助于降低延迟,特别是对于实时应用或交互性应用。

  • 更好的适应网络波动: 在网络条件不稳定的情况下,分块传输编码可以更好地适应不同的带宽和延迟条件。由于可以在生成数据的同时发送,它能够更灵活地处理网络波动。

所以,总的说来,Chunk Transfer-Encoding主要用来文件大小未知的流式文件的传输,或者虽然确定大小但是文件大小很大的传输场景。

正常使用场景下,流式数据或者大文件的发送方会在http header中设置Transfer-Encoding: chunked,同时以chunk的方式发送数据,数据的接受方在接收到header并发现是Chunk Transfer-Encoding以后,会进行对应的编解码操作。当然,这个解码操作一般是对应的Library底层的自动操作,上层用户直接看到的是一个解码以后的InputStream比如BufferedInputStream等,而不需要关心本身的解码细节。

注意,Chunked Transfer Coding的每一个chunk的数据大小是自描述的,因此,并不要求在一个stream连接中所有chunk的大小都相同。只要遵循Chunked Transfer Coding的数据格式,比如,如果发送的文件是一个日志文件,数据的发送方甚至可以按照一行一个chunk进行数据的发送。

我们在下文详细讲解通过WebHDFS读写文件时对Chunk的支持。

写文件使用Chunk Transfer-Encoding

我们通过connection.setChunkedStreamingMode(chunklen),就会enable Chunked Transfer Coding写文件,即为我们添加Transfer-Encoding: chunkedhttp header, 同时设置了一个chunkLength,我们传入的参数如果是不大于0的参数,那么chunkLength就设置为默认值4096,否则就按照我们设置的大小设置chunkLength

与之相对,我们可以调用connection.setFixedLengthStreamingMode (int contentLength) 来选择使用Fixed-Length Streaming
Chunked StreamingFixed-Length Streaming是相互排斥的,二者只能选其一,这是因为二者的使用场景完全不同:

  • setChunkedStreamingMode: 用于指定请求体使用分块传输编码 (Chunked Transfer Encoding)。分块传输编码允许将请求体分割成多个块,每个块都带有长度信息。这样可以在数据生成的同时发送,而无需等到整个请求体准备就绪。使用这种模式可以提高效率,特别是对于较大的请求体。

    connection.setChunkedStreamingMode(0); // 使用分块传输编码,0 表示使用默认块大小
    
  • setFixedLengthStreamingMode: 用于指定请求体的固定长度。这意味着在实际写入请求体数据之前,需要知道请求体的总长度。这样的模式适用于较小的请求体,且长度可预知的情况。

    int contentLength = calculateContentLength(); // 计算请求体的长度
    connection.setFixedLengthStreamingMode(contentLength);
    

    选择使用哪个方法取决于请求体的大小和生成的时间。对于动态生成的或大小未知的请求体,通常更适合使用 setChunkedStreamingMode。对于已知大小的请求体,使用 setFixedLengthStreamingMode 可能更为合适。

下面的代码就是HttpURLConnection中两个方法的实现:

------------------------------------------HttpURLConnection------------------------------------------
    public void setChunkedStreamingMode (int chunklen) {
        .......
        if (fixedContentLength != -1 || fixedContentLengthLong != -1) {
            throw new IllegalStateException ("Fixed length streaming mode set");
        }
        chunkLength = chunklen <=0? DEFAULT_CHUNK_SIZE : chunklen;
    }

    public void setFixedLengthStreamingMode (int contentLength) {
        ......
        fixedContentLength = contentLength;
    }

我们从HttpURLConnection.getOutputStream0()方法可以看到,

  • 由于我们之前设置了chunkLength,因此,这里会构建对应的ChunkedOutputStream,这意味着,我们通过HttpURLConnection.getOutputStream()返回的OutputStream进行数据的写入的时候,会经过ChunkedOutputStream然后才到最底层的比如SocketOutputStream,即ChunkedOutputStream会帮我们进行数据的chunk编码并发送给远程。
  • 如果是Http Method是Get,会强制变成Post,即我们不可能在get请求中使用Chunk Transfer Encoding,如果调用了setChunkedStreamingMode(chunklen),那么Http Method会被换成POST。这是为什么我们在读文件的时候假如调用setChunkedStreamingMode(chunklen),WebHDFS会返回一个400,让我们误以为WebHDFS不支持Chunk Transfer Coding。后文会详细讲解。
  • 我们从writeRequests可以看到,假如我们通过setChunkedStreamingMode(0)设置了chunkLength,那么写文件的请求中会被隐含加上Transfer-Encoding: chunked 的 Http header。而如果我们通过setFixedLengthStreamingMode()设置了使用fixed-length streaming,那么就会隐含加上Content-Length 的 header。
private synchronized OutputStream getOutputStream0() throws IOException {
         .........
         if (method.equals("GET")) { //在获取OutputStream,GET会被强制替换成POST
             method = "POST"; // Backward compatibility
         }
         ....
         if (streaming()) {
             if (strOutputStream == null) {
                 if (chunkLength != -1) { /* chunked */
                      strOutputStream = new StreamingOutputStream(
                            new ChunkedOutputStream(ps, chunkLength), -1L); //封装一层ChunkedOutputStream
                 } else { /* must be fixed content length */
                     long length = 0L;
                     if (fixedContentLengthLong != -1) {
                         length = fixedContentLengthLong;
                     } else if (fixedContentLength != -1) {
                         length = fixedContentLength;
                     }
                     strOutputStream = new StreamingOutputStream(ps, length);
                 }
             .....
        }

 private void writeRequests() throws IOException {
     .....
     if (this.streaming()) {
         if (this.chunkLength != -1) { // 设置了chunk, 会帮我们添加Transfer-Encoding:chunked的header
             this.requests.set("Transfer-Encoding", "chunked");
             var14 = true;
         } else if (this.fixedContentLengthLong != -1L) { //如果是fixed-length code,会帮我们添加Content-Length header
             this.requests.set("Content-Length", String.valueOf(this.fixedContentLengthLong));
         } else if (this.fixedContentLength != -1) {
             this.requests.set("Content-Length", String.valueOf(this.fixedContentLength));
         }
   }

下图显示了我们使用Chunked Transfer Coding写文件的时候的客户端观察到的HttpURLConnection的运行时的outputStream,从最上层 HttpURLConnection.StreamingOutStream,经过ChunkedOutputStream一直到最下层SocketOutputStream的层层封装过程。
https://img-blog.csdnimg.cn/direct/0784e928165a439b8395573e4b85d021.png" alt="在这里插入图片描述" />

读文件使用Chunk Transfer-Encoding

刚刚说了,是否使用Chunk Transfer-Encoding,其实是数据的发送方的职责,因此,客户端从WebHDFS读文件,数据的发送方,WebHDFS Service,是否使用Chunk Transfer-Encoding,是它自己的选择。我们从WebHDFS服务器端代码可以看到,基于Netty的WebHDFS服务器端的确是使用Chunked Transfer Encoding来发送文件到客户端的。
DataNode启动的时候,会构造WebHDFS背后的Netty服务器端。构造时加入了ChunkedWriteHandler:

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup)
        .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline p = ch.pipeline();
          p.addLast(new HttpRequestDecoder(),
          .......
          p.addLast(
              new ChunkedWriteHandler(),
              new URLDispatcher(jettyAddr, conf, confForCreate));
        }
      });

从上面的代码可以看到,最后网ChannelPipeline中添加了两个ChannelHandlerChunkedWriteHandlerURLDispatcher,那么,这两个Handler在pipeline中的调用顺序是怎样的呢?

我们来看一下ChatGPT的准确回答:


在 Netty 中,ChannelPipeline 是一个处理事件的流水线。ChannelPipeline 中的每个节点是一个 ChannelHandler,负责处理特定类型的事件。当数据通过 Channel 时,它会在 ChannelPipeline 中的各个处理节点中依次经过,每个节点负责特定的处理。

对于 ChannelPipeline.addLast 添加的 ChannelHandler,它们的执行顺序是按照它们被添加的顺序来决定的。新添加的 ChannelHandler 会被添加到管道的尾部,因此会成为最后执行的处理器。

在读写的时候,事件的传递顺序如下:

  • 写操作: 当有数据要被写入 Channel 时,数据会从 ChannelPipeline 的尾部开始,依次经过每个 ChannelHandler write() 方法,最终传递给底层的通道。

  • 读操作: 当有数据从 Channel 读取时,数据会从 ChannelPipeline 的头部开始,依次经过每个 ChannelHandlerchannelRead() 方法,直到达到 ChannelPipeline 的尾部。


所以,在Netty Service向客户端写入数据的时候,从我们自定义的URLDispatcher中写入(其实内部会调用WebHdfsHandler),然后会被ChunkedWriteHandler进行处理,即chunk by chunk的进行数据的编码,最后发送给客户端。
根据ChunkedWriteHandler的官方文档,如果我们使用ChunkedWriteHandler来进行Chunk Transfer Coding的数据传输,它的输入,即它的前一个Handler必须写入的是ChunkedInput格式,这就是WebHdfsHandler.onOpen()的处理方式,ChunkedStreamChunkedInput的实现类:

  private void onOpen(ChannelHandlerContext ctx) throws IOException {
    ........
    resp = new DefaultHttpResponse(HTTP_1_1, OK);
    HttpHeaders headers = resp.headers();
    .....

    final DFSClient dfsclient = newDfsClient(nnId, conf);
    HdfsDataInputStream in = dfsclient.createWrappedInputStream(
      dfsclient.open(path, bufferSize, true));
    in.seek(offset);

    long contentLength = in.getVisibleLength() - offset;
    .......
    final InputStream data;
    ........
      headers.set(CONTENT_LENGTH, contentLength);
      data = new LimitInputStream(in, contentLength);
    

    ctx.write(resp); //先写元数据
    ctx.writeAndFlush(new ChunkedStream(data) { // 再写content,即读取的文件流。这个文件流被封装为ChunkedStream,进而交给pipeline中的ChunkedWriteHanlder,最后发送给客户端
      @Override
      public void close() throws Exception {
        super.close();
        dfsclient.close();
      }
    }).addListener(ChannelFutureListener.CLOSE);
  }

Response Header中为什么没有Transfer-Encoding: chunked

但是很奇怪,我们从HttpUrlConnection收到的服务器端的Response Header中并没有看到Transfer-Encoding:chunked,相反,我们看到了Content-Length: 192832,难道文件并不是以Chunked Transfer-Encoding的编码方式发送的?
其实不是这样的。HttpUrlConnectionChunked Transfer-Encoding的数据处理并完全依赖Transfer-Encoding:chunked的header,它可以自动识别数据本身的格式,然后在底层自动识别并解码Chunked Transfer-Encoding,然后直接将解码后的结以InputStream的方式返回给客户端。Netty的ChunkedWriteHandler帮我们发送数据到远程客户端时,尽管数据是按照chunked格式发送给的,但是其并不会帮我们加上Transfer-Encoding:chunked到Response header中去,但是客户端依然是可以正确解析到的。ChunkedWriteHandler的好处是:

  • 提供更灵活的数据处理:ChunkedWriteHandler可以接收 FileRegionHttpContentByteBuf 对象,并负责将它们转换成分块的形式发送到下一个 ChannelOutboundHandler
  • 自动处理分块传输:ChunkedWriteHandler会自动将数据分块,确保数据按照 Chunked Transfer Encoding 规范传输。
  • 对于大文件或大数据流,使用 ChunkedWriteHandler 可以在传输过程中分块发送,降低内存占用。
    我们看到, WebHDFS服务器端使用了ChunkedWriteHandler,因此在ChannelPipeline中,需要将读取块数据的InputStream封装成ChunkedStream,因为ChunkedStreamChunkedInput 接口的实现类:
    final InputStream data;
    if (contentLength >= 0) {
      headers.set(CONTENT_LENGTH, contentLength);
      data = new LimitInputStream(in, contentLength);
    } else {
      data = in;
    }

    ctx.write(resp);
    ctx.writeAndFlush(new ChunkedStream(data) { //将读取块数据的InputStream封装为ChunkedStream,供ChunkedWriteHandler进行进一步的分块处理
      @Override
      public void close() throws Exception {
        super.close();
        dfsclient.close();
      }
    }).addListener(ChannelFutureListener.CLOSE);

所以,其实ChunkedWriteHandler只是分块写入数据的一种更好的封装方式,方便了我们进行chunk数据的写入,但其实我们完全可以不用ChunkedWriteHandler而手动进行chunk数据写入,比如,我们自己在header中添加Transfer-Encoding:chunked,然后按照 Chunked Transfer Encoding 去写数据,服务端示例代码如下:

public class NettyChunkedWriteHandlerExample {
    public static void main(String[] args) {
        .....
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new SimpleChannelInboundHandler<HttpMessage>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {

                                         FileInputStream fis = new FileInputStream("/Users/cwu/chunk-test");
                                        ChunkedStream chunkedStream = new ChunkedStream(fis);
                                       // 设置响应头
                                       DefaultHttpResponse response = new DefaultHttpResponse(
                                               HttpVersion.HTTP_1_1,
                                               HttpResponseStatus.OK
                                               );

                                       HttpHeaders headers = response.headers();
                                       headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                                       headers.set(CONTENT_TYPE, APPLICATION_OCTET_STREAM);
                                       headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); // 设置`Transfer-Encoding:chunked`
                                       ctx.write(response);
                                       chunk_write_stream(fis, ctx);
                                       ....

            serverBootstrap.bind(8087).sync().channel().closeFuture().sync();
        .....
    }

    public static void chunk_write_stream(InputStream inputStream,  ChannelHandlerContext ctx) throws IOException {
        // 读取文件并以 chunked 形式写入响应
        byte[] buffer = new byte[1024];
        int bytesRead;
        while ((bytesRead = inputStream.read(buffer)) != -1) {
            if(bytesRead != 1024){
                System.out.println("The final bytesRead is not 1024, its size is " + bytesRead);
                byte[] buffer_copy = Arrays.copyOfRange(buffer, 0, bytesRead);
                ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer_copy)));
            }else{
                ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer)));
            }
        }
        //写入最后一个空字符
        ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);

    }

这时候我们看基于HttpURLConnection的客户端,可以看到,客户端识别了Transfer-Encoding:chunked,下层Stream的封装已经自动有了ChunkedInputStream,同样完成了基于chunk的数据传输。

测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断

我在最初测试WebHDFS是否支持 Chunked Transfer-Encoding,错误的代码差点儿导致我做出了错误的结论,认为从WebHDFS读取数据无法使用Chunked Transfer Encoding
由于对Chunked Transfer-Encoding理解不正确,我以为要想让服务器端Chunk by Chunk的方式将文件传输过来,客户端需要在GET请求(由于是读文件,因此是GET请求)中添加Transfer-Encoding:chunked header,对于HttpURLConnection客户端,不用手动添加这个header,只需要调用connection.setChunkedStreamingMode(0);就可以了,我这样去做,但是服务器端抛出了一个400

java.io.IOException: Server returned HTTP response code: 400 for URL: http://rccd101-7a.sjc2.dev.conviva.com:9864/webhdfs/v1/user/hadoop/hello-world/session_summaries.PT_HOURLY_2023-09-07_2023-09-07.sql-2?op=OPEN&user.name=hadoop&namenoderpcaddress=olap-hdfs-test&offset=0
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1914)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1512)
	at WebHDFSReadFileExample.chunk_read(WebHDFSReadFileExample.java:45)
	at WebHDFSReadFileExample.main(WebHDFSReadFileExample.java:86)

代码很简单,就是在创建HttpURLConnection的时候,增加了connection.setChunkedStreamingMode(0);。后来通过debug,发现,当我们调用connection.setChunkedStreamingMode(0);的时候,会带来了两个主要变化:

  1. 自动重定向会被disable:这个变化倒不是导致我们上面的异常的原因,但是需要引起注意。这意味着,即使我们没有显式地通过connection.setInstanceFollowRedirects(false);去关闭自动重定向,如果调用了connection.setChunkedStreamingMode(0);,自动重定向也会被关闭。加入式读文件,那么我们需要手动识别并处理307
  2. GET请求会被自动变成POST请求:这是导致400问题的原因。WebHDFS客户端是将Http Method和用户的operation联合在一起来识别操作的,代码如下。可以看到,如果是OPEN操作,那么Http Method必须是GET 才能被识别,因此,一个POST类型的OPEN操作无法被识别:
      public void handle(ChannelHandlerContext ctx, HttpRequest req)
        throws IOException, URISyntaxException {
        String op = params.op();
        HttpMethod method = req.getMethod();
        if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
          && method == PUT) {
          onCreate(ctx);
        } else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op)
          && method == POST) {
          onAppend(ctx);
        } else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op)
          && method == GET) {
          onOpen(ctx); //读取文件的HTTP Method必须是Get
        } else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)
          && method == GET) {
          onGetFileChecksum(ctx);
        } else if(PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
            && method == OPTIONS) {
          allowCORSOnCreate(ctx);
        } else {
          throw new IllegalArgumentException("Invalid operation " + op);
        }
    
  • 那么,为什么我们调用connection.setChunkedStreamingMode(0);,请求类型会被强制改成POST呢?因为connection.setChunkedStreamingMode(0);本来就应该是数据发送方的事情,数据的写入方通过设置Transfer-Encoding:chunked来告知数据接受方自己的数据是Chunked Transfer Coding,请按照Chunked Transfer Coding格式进行解析。所以,我们在读取WebHDFS文件的时候设置connection.setChunkedStreamingMode(0);是错误的操作。

WebHDFS的特性和不足总结

从上面的代码分析可以看到,先抛开性能差异不谈,在具体实现上,基于Hadoop Native的客户端的很多实现细节,肯定是无法在基于Http的WebHDFS中实现的。因此,WebHDFS表面上能成为Hadoop Native的替代,但是其实整个流程都发生了变化。总而言之,我们通过实验和HDFS的代码发现:

  1. WebHDFS不支持HDFS HA。但是这还好,我们可以在WebHDFS客户端上层手动做判断,实现HA。
  2. 写大文件的时候所有Block的第一个Replica会集中在某一台机器上。
  3. 读取文件的时候,文件的所有Block的读取操作都来自于第一个Block的某一个Replica机器
  4. 基于Http本身的限制,hedged read肯定完全不再可能。
  5. 数据的写入和读取都支持Chunked Transfer Coding
  6. 性能问题。我没有进行benchmark测试,有兴趣的读者可以自行寻找测试结果或者自行测试。

http://www.niftyadmin.cn/n/5320600.html

相关文章

[开发语言][c++][python]:C++与Python中的赋值、浅拷贝与深拷贝

C与Python中的赋值、浅拷贝与深拷贝 1. Python中的赋值、浅拷贝、深拷贝2. C中的赋值、浅拷贝、深拷贝2.1 概念2.2 示例&#xff1a;从例子中理解1) 不可变对象的赋值、深拷贝、浅拷贝2) 可变对象的赋值、浅拷贝与深拷贝3) **可变对象深浅拷贝(外层、内层改变元素)** 写在前面&…

使用Python向RabbitMQ发送JSON数据只需要一个send_json方法

发送JSON数据 通过调用rabbitmq.send_json(channel, user, queueresult)能够更简单的实现发送JSON数据。 生产者 import json import rabbitmq# 建立连接 connection rabbitmq.get_connection()# 创建管道 channel connection.channel()# 创建队列 queue_name "user…

02.部署LVS-DR群集

技能展示&#xff1a; 了解LVS-DR群集的工作原理 会构建LVS-DR负载均衡群集 2.1 LVS-DR 集群 LVS-DR&#xff08; Linux Virtual Server Director Server &#xff09;工作模式&#xff0c;是生产环境中最常用的一种工作模式。 2.1.1&#xff0e;LVS-DR 工作原理 LVS-DR 模式&…

学习Django前你需要知道的

1. 简介 官方文档&#xff1a;https://docs.djangoproject.com/zh-hans/4.2/ Django是一个开放源码的Web应用框架&#xff0c;由Python写成。采用了MVT的框架模式&#xff0c;即模型M&#xff0c;视图V和模板T。它最初是被开发用来管理劳伦斯出版集团旗下的一些以新闻内容为主…

leetcode 每日一题 2024年01月09日 字符串中的额外字符

题目 字符串中的额外字符 给你一个下标从 0 开始的字符串 s 和一个单词字典 dictionary 。你需要将 s 分割成若干个 互不重叠 的子字符串&#xff0c;每个子字符串都在 dictionary 中出现过。s 中可能会有一些 额外的字符 不在任何子字符串中。 请你采取最优策略分割 s &…

GPT function calling v2

原文&#xff1a;GPT function calling v2 - 知乎 OpenAI在2023年11月10号举行了第一次开发者大会&#xff08;OpenAI DevDays&#xff09;&#xff0c;其中介绍了很多新奇有趣的新功能和新应用&#xff0c;而且更新了一波GPT的API&#xff0c;在1.0版本后的API调用与之前的0.…

蓝桥杯练习题(六)

&#x1f4d1;前言 本文主要是【算法】——蓝桥杯练习题&#xff08;六&#xff09;的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 …

池化、线性、激活函数层

一、池化层 池化运算是深度学习中常用的一种操作&#xff0c;它可以对输入的特征图进行降采样&#xff0c;从而减少特征图的尺寸和参数数量。 池化运算的主要目的是通过“收集”和“总结”输入特征图的信息来提取出主要特征&#xff0c;并且减少对细节的敏感性。在池化运算中…