Hadoop:文件操作过程之HDFS写流程详解(部分源码)

news/2024/5/20 1:30:33 标签: hadoop, hdfs

写流程

在创建了分布式文件系统的实例后,客户端通过调用该实例的create()方法就可以创建文件,并会发送给Namenode一个RPC调用,在文件系统的命名空间中创建一个新文件,在创建文件前namenode会做一些检查(如文件是否存在,客户端是否有权限)。检查通过后namenode会为创建文件写一条记录到本地磁盘的Editlog文件,创建成功后DFS实例会返回一个FSDataOutputStream对象,客户端由此开始写入数据。

FSDataOutputStream类封装成DFSDataOutputStream对象,这个对象管理着datanode和namenode之间的交互:

1、客户端在向namenode请求之前先写入文件数据到本地文件系统的内存中(缓存队列dataqueue)

2、等到内存中达到块大小时客户端(DataStreamer线程)开始向namenode请求datanode信息

3、namenode在文件系统中创建文件并返回给客户端一个数据块以及对应datanode的地址列表(包含副本存放位置),这些datanode构成一个管线(pipeline),管线中除了最后一个节点外都负有接力转发的能力

4、客户端通过地址列表把内存中的数据冲刷(flush)到列表中的第一个datanode

        ①客户端以数据包(Packet)的形式向第一个datanode发送数据,每发送一个packet,这个packet从dataqueue中转移到由ResponseProcessor线程管理的缓存队列ackqueue;

        ②通过管线依次向下流节点发送packet,并且在管线中的每一个datanode节点在收到数据后都会向前一个datanode节点发送一个ack(ResponseProcessor会在ackqueue中将其移除),最终第一个datanode会向客户端发送一个ack确认

        ③当客户端收到数据块的确认后,数据块会被认为已经持久化到所有节点,然后客户端会向namenode发送一个确认;如果发送过程中出错,已经发送并移到ackqueue中的packet没有得到应答确认,首先会关闭管线,ackqueue中的packet会移回至dataqueue以供重发,为存储在另一正常datanode的当前数据块指定一个新的标识,并将该标识发送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。然后从管线中删除故障datanode,基于两个正常的datanode构建一条新的管线

5、文件关闭时,namenode会提交这次文件创建,此时文件在文件系统中可见

Hadoop 默认存放策略:在运行客户端的节点上存放第一个复本,如果客户端位于集群之外,会随机选择一个节点(但会避开那些存储太满或太忙的节点);第二个复本放在与第一个不同且随机另外选择的机架的节点上;第三个复本与第二个复本在同一个机架上,且随机选择另外一个节点;系统会尽量避免在同一个机架上放太多复本

Block、Packet、Chunk

block是最大的一个单位,它是最终存储于DataNode上的数据粒度,由dfs.block.size参数决定,默认是64M

chunk是最小的一个单位,它是DFSClient到DataNode数据传输中进行数据校验的粒度,由io.bytes.per.checksum参数决定,默认是512B;事实上一个chunk还包含4B(ChecksumSize)的校验值,因而chunk写入packet时是516B;数据与检验值的比值为128:1,所以对于一个128M的block会有一个1M的校验文件与之对应,这也就为什么默认最小块大小(dfs.namenode.fs-limits.min-block-size)为1M!

Packet在HDFS中被视为一个基本的网络传输单元,是数据由DFSClient流向DataNode的粒度,以dfs.write.packet.size参数为参考值默认是64K,但由于packet有特定的结构,实际上为65049B

Packet由三部分组成:

每个Packet最多由126(chunksPerPkt)个chunk组成,每个chunk中有4B校验和与512B的数据,但在实际发送packet过程中,校验和和数据是分开存放的

1、头部,由两部分组成,头部最长长度为33B

PKT_LENGTHS_LEN = 6
MAX_PROTO_SIZE = 27
PKT_MAX_HEADER_LEN = PKT_LENGTHS_LEN + MAX_PROTO_SIZE = 33

2、检验和Checksum,最大长度为 chunksPerPkt(126)*4 =  504B

3、数据,最大长度为 chunksPerPkt(126)*512 = 64512B

所以Packet最大长度为 33 + 504 + 64512 = 65049B,或者可以理解为 33 + (512+4)*126

 

写过程中会以chunk、packet及packet 三个粒度做三层缓存:

1、当数据流入DFSOutputStream时,DFSOutputStream底层内有一个9倍(BUFFER_NUM_CHUNKS)chunk大小的buf,当数据写满这个buf(或遇到强制flush),会计算checksum值,然后填塞进packet;

2、当一个chunk填塞进入packet后,仍然不会立即发送,而是累积到一个packet填满后,将这个packet放入dataqueue队列;

3、进入dataqueue队列的packet会被按序取出发送到datanode;生产者消费者模型,阻塞生产者的条件是dataqueue与ackqueue之和超过一个block的packet上限

 

部分源码分析

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSWriteTest {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		Path path = new Path("hdfs://master1:9000/d2/f1");
		FSDataOutputStream fsout = fs.create(path);
		fsout.write("helloworld".getBytes());
		fsout.close();
	}
}

 

fsout.write("helloworld".getBytes());

└1─[FilterOutputStream] void write(byte b[]) 

└2──[FilterOutputStream] void write(byte b[], int off, int len) 

└3───[DataOutputStream] synchronized void write(byte b[], int off, int len)

└4────[FSDataOutputStream.PositionCache]  write(byte b[], int off, int len) 

└5─────[FSOutputSummer] synchronized void write(byte b[], int off, int len)     

└6──────[FSOutputSummer] int write1(byte b[], int off, int len)

FSOutputSummer内有一个BytesPerChecksum*BUFFER_NUM_CHUNKS=512*9=4608B的buf[],以及对应的存放校验和数组checksum[],长度为4B*9=36B

           //如果写入数据长度大于缓冲区长度(4608),将其直接发送到底层流(packet)
	   if(count==0 && len>=buf.length) {
		final int length = buf.length;
		writeChecksumChunks(b, off, length);
		return length;
	    }
	   //将数据复制到buf中
	   int bytesToCopy = buf.length-count;
	   bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
	   System.arraycopy(b, off, buf, count, bytesToCopy);
	   count += bytesToCopy;
	   //buf满了则flush,当前只有10B
	   if (count == buf.length) 
	      flushBuffer();
	   } 
	   return bytesToCopy;

 

fsout.close();

└1─[FSDataOutputStream] void close()

└2──[FSDataOutputStream.PositionCache]

└3───[DFSOutputStream] void close()

└4────[DFSOutputStream] synchronized void closeImpl()

	    //DFSOutputStream中有两个LinkedList类型的缓冲队列dataQueue和ackQueue
	    try {
	      //从所有上层流flush至packert中
	      flushBuffer();      
	      //当前packet不为空时,等待dataqueue为空闲时挂入	
	      if (currentPacket != null) { 
		waitAndQueueCurrentPacket();
	      }
	      if (bytesCurBlock != 0) {
		//发送一个空的packet标志着块的结束,该packet中仅有头部即长度为33
		currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
		currentPacket.setSyncBlock(shouldSyncBlock);
	      }
	      //将所有数据(此时为空的packet)flush到datanode中
	      flushInternal(); 
		
              //获取最后一个block
	      ExtendedBlock lastBlock = streamer.getBlock();

	      try {
	      //namenode进行创建文件并写入数据
		completeFile(lastBlock);
	      } finally {
		scope.close();
	      }

	    //关闭streamer和DFSOutputStream,释放dataQueue,ackQueue空间
	    }finally {
              closeThreads(true);
	    }

└5─────[FSOutputSummer] synchronized void flushBuffer()

└6──────[FSOutputSummer] synchronized int flushBuffer(boolean keep,boolean flushPartial)

	//强制对buf中的数据进行校验并写入底层输出流(packet),如果buf中有未被填满
	//的(大概这个意思,源码注释英文为trailing partial)chunk,flushPartial为true
	//则flush该chunk;当flushPartial为true,keep决定是否将改chunk保留在buf中,
	//若flushPartial为false,则保存在buf内
	int bufLen = count;
        int partialLen = bufLen % sum.getBytesPerChecksum();
        int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
        //lenToFlush=10
	if (lenToFlush != 0) {
	writeChecksumChunks(buf, 0, lenToFlush);

└7───────[FSOutputSummer] void writeChecksumChunks(byte b[], int off, int len)

	    //为指定的chunk生成校验和,将chunk+校验和一起发送给底层输出流(packet)
	    //通过CRC32C进行计算校验和
	    sum.calculateChunkedSums(b, off, len, checksum, 0);
	    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
	      //当前只有1个chunk,chunkLen为10,checksum数组中只有4位有值
	      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
	      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
	      //将chunk写入底层输出流
	      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
	    }

└8────────[DFSOutputStream] void writeChunk(byte[] b, int offset, int len,byte[] checksum, int ckoff, int cklen)

└9────────[DFSOutputStream] synchronized void writeChunkImpl(byte[] b, int offset, int len,byte[] checksum, int ckoff, int cklen)

	    //当前packet为空,新建一个packet
	    if (currentPacket == null) {
	      //packetSize=chunkSize*chunksPerPacket=(512+4)*126=65016
	      currentPacket = createPacket(packetSize, chunksPerPacket, 
		  bytesCurBlock, currentSeqno++, false);  
            }
	
	    //将校验和部分写入到packet中的buf
	    currentPacket.writeChecksum(checksum, ckoff, cklen);
	    //将数据部分写入到packet中的buf中
	    currentPacket.writeData(b, offset, len);
	    //chunk数+1
	    currentPacket.incNumChunks();
	    //block中有10B了
	    bytesCurBlock += len;

└10─────────[DFSOutputStream] DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,long seqno, boolean lastPacketInBlock)    

	    //packet内部维护着一个缓冲区
	    final byte[] buf;
	    //buf长度为 33+65016=65049
	    final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;

	    try {
	      buf = byteArrayManager.newByteArray(bufferSize);
	    } catch (InterruptedException ie) {
	      final InterruptedIOException iioe = new InterruptedIOException(
		  "seqno=" + seqno);
	      iioe.initCause(ie);
	      throw iioe;
	    }
	    //返回构造方法
	    return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
				 getChecksumSize(), lastPacketInBlock);

└11─────────[DFSPacket] DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,int checksumSize, boolean lastPacketInBlock)

		    this.lastPacketInBlock = lastPacketInBlock;
		    this.numChunks = 0;
		    this.offsetInBlock = offsetInBlock;
		    this.seqno = seqno;

		    this.buf = buf;
			
		    //PKT_MAX_HEADER_LEN = PKT_LENGTHS_LEN + MAX_PROTO_SIZE =6+27=33
		    checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
		    checksumPos = checksumStart;
		    //33+(126*4)=537
		    dataStart = checksumStart + (chunksPerPkt * checksumSize);
		    dataPos = dataStart;
		    //126
		    maxChunks = chunksPerPkt;

└5─────[DFSOutputStream] void waitAndQueueCurrentPacket()

	    //等待dataqueue空闲时挂入
	    synchronized (dataQueue) {
	      try {
		boolean firstWait = true;
		try {
		  //阻塞条件为dataQueue.size() + ackQueue.size() >80
		  while (!isClosed() && dataQueue.size() + ackQueue.size() >
		      dfsClient.getConf().writeMaxPackets) {
		    
		    try {
		      dataQueue.wait();
		    } catch (InterruptedException e) {
		      Thread.currentThread().interrupt();
		      break;
		    }
		  }
		} finally {
		  
		}
		//加入dataQueue并唤醒dataStreamer线程
		checkClosed();
		queueCurrentPacket();
	      } catch (ClosedChannelException e) {
	      }
	    }

└5─────[DFSOutputStream] void queueCurrentPacket()

	      if (currentPacket == null) return;
	      currentPacket.addTraceParent(Trace.currentSpan());
	      //将该packet添加到队列末尾
	      dataQueue.addLast(currentPacket);
	      lastQueuedSeqno = currentPacket.getSeqno();
	      
	      currentPacket = null;
	      //唤醒dataStreamer线程
	      dataQueue.notifyAll();
	      }

└5─────[DFSOutputStream] void flushInternal()

	    //待确认的packet数量
	    long toWaitFor;
	    synchronized (this) {
	      dfsClient.checkOpen();
	      checkClosed();
	      //
	      // If there is data in the current buffer, send it across
	      //
	      queueCurrentPacket();
	      toWaitFor = lastQueuedSeqno;
	    }
	    //向datanode发送packet
	    waitForAckedSeqno(toWaitFor);

└5─────[DFSOutputStream] void completeFile(ExtendedBlock last)

	    boolean fileComplete = false;
	    //有5次重试机会
	    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
	    while (!fileComplete) {
	      //src=/d2/f1
	      //DFSClient[clientName=DFSClient_NONMAPREDUCE_-1501156867_1, ugi=jinge (auth:SIMPLE)]
	      //last=BP-163740078-192.168.80.120-1554104956001:blk_1073741875_1061
	      //fileId=16426
	      fileComplete =
		  dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);

 

 

 


http://www.niftyadmin.cn/n/1033134.html

相关文章

Hadoop:文件操作过程之HDFS打开文件、读流程(部分源码)

DistributedFileSystem和DFSClient Hadoop可以支持不止一种的文件系统&#xff0c;比如对宿主机的文件系统RawLocalSystem、运行在Amazon平台上的S3FileSystem等&#xff0c;所以Hadoop定义了一个FileSystem的抽象类。 DistributedFileSystem继承于FileSystem&#xff0c;是一…

Hadoop:文件操作之Java接口(FileSystem类)

目录 通过java.net.URL读取数据 通过FlieSystem API读写数据 读取数据 写入数据 新建目录 新建文件并写入 追加写入 一致模型 修改配置信息 通过FileStatus查询文件系统 查看文件系统信息 查看块信息 查看datanode信息 通过java.net.URL读取数据 让java程序能够识…

ExitCodeExcetion excode=-1073741701:eclipes运行mapreduce在本地local模式下报错

这几天在mapreduce上摸爬滚打&#xff0c;可谓是困难重重 背景如下&#xff1a;mapreduce.framework.name默认为local&#xff0c;在本地(file:///)eclipses运行mapreduce作业&#xff0c;报错 万万没想到是因为bin下的winutils.exe不能运行 下载个 DirectX修复工具 即可

Hadoop:MapReduce概述、WordCount

MapReduce概述 MapReduce是Hadoop的两大核心技术之一&#xff0c;HDFS解决了大数据存取问题&#xff0c;而MapReduce是对大数据的高效并行编程模型。 MapReduce任务分为两个阶段&#xff1a;map与reduce&#xff1b;每阶段都是以键值对(key-value)作为输入和输出的&#xff1…

Hadoop:MapReduce之Mapper类的输入

目录 Mapper类 Mapper的输入 InputFormat 文件输入FileInputFormat & 输入分片InputSplit 文本输入TextInputFormat & 行记录阅读器LineRecordReader Mapper的输出 收集器Collector 分区器Partitioner 案例&#xff1a;分别计算奇数行和偶数行之和 Hadoop的代…

MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

MapOutputBuffer 在上一篇博客中说过&#xff0c;Mapper的输出中有两个重要部分&#xff1a;一是collector&#xff0c;负责收集Mapper输出并将其交付给Reducer&#xff1b;二是partitioner&#xff0c;决定了应该将具体的输出交付给哪一个Reducer。 Mapper的输出是通过其Rec…

MapReduce:关于RecordReader调用getCurrentKey()和getCurrentValue()时返回相同键-值对象

在《Hadoop权威指南 第4版》的P219&#xff0c;关于Mapper类的run()方法部分有这样一段描述&#xff1a; 由于效率的原因&#xff0c;RecordReader程序每次调用getCurrentKey()和getCurrentValue()时将返回相同的键-值对象。只是这些对象的内容被reader的netKeyValue()方法改变…

MapReduce案例:求共同好友

目录 需求1&#xff1a;所有人两两之间的共同好友 需求2&#xff1a;互为好友的两人之间的共同好友 需求1&#xff1a;所有人两两之间的共同好友 原文件&#xff1a; A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:…