Hadoop:文件操作过程之HDFS打开文件、读流程(部分源码)

news/2024/5/20 5:09:31 标签: hadoop, hdfs

DistributedFileSystem和DFSClient

Hadoop可以支持不止一种的文件系统,比如对宿主机的文件系统RawLocalSystem、运行在Amazon平台上的S3FileSystem等,所以Hadoop定义了一个FileSystem的抽象类。

DistributedFileSystem继承于FileSystem,是一种具体的文件系统即HDFS;hadoop中还定义了一个抽象类AbstractFileSystem,以及扩充子类Hdfs。DistributedFileSystem与Hdfs类内部都有个DFSClient类的对象dfs。

DFSClient是HDFS文件系统的用户(客户端)与namenode和datanode交互的桥梁;针对namenode和datanode的rpc调用都是通过DFSClient类对象实现的;也就是客户端对HDFS的操作最底层都是通过DFSClient完成的。

通常我们再写代码时使用的是对FileSystem的引用而不是DistributedFileSystem的引用,通过FileSystem.get(Configuration conf)可以获取到DistributedFileSystem类对象,因为conf中有相关hadoop的配置信息,这是一个向下造型的过程。

 

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFSWriteTest {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration(); //读取配置文件core-default.xml和core-site.xml
		FileSystem fs = FileSystem.get(conf);
                //该文件长度为10
		Path path = new Path("hdfs://master1:9000/d2/f1"); //创建Path对象,指定要读取的文件
		FSDataInputStream fsin = fs.open(path);
		IOUtils.copyBytes(fsin, System.out, 4096, false);
		IOUtils.closeStream(fsin); //关闭流
	}
}

获取DistributedFileSystem对象

FileSystem fs = FileSystem.get(conf);

- [FileSystem] static FileSystem get(Configuration conf)

return get(getDefaultUri(conf), conf);

-- [FileSystem] static URI getDefaultUri(Configuration conf)

	//return "hdfs://master1:9000"
	return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));

--- [FileSystem] static FileSystem get(URI uri, Configuration conf)

	    //scheme = hdfs
	    String scheme = uri.getScheme();
	    //authority = master1:9000
	    String authority = uri.getAuthority();
	    //disableCacheName = fs.hdfs.impl.disable.cache 是否启动缓存的属性
	    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
        
            return CACHE.get(uri, conf);

---- [FileSystem.Cache] FileSystem get(URI uri, Configuration conf)

		//conf = Configuration: core-default.xml, core-site.xml,
                //mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
		//此时读取了剩下的6个配置文件
		Key key = new Key(uri, conf);
		return getInternal(uri, conf, key);

----- [FileSystem] FileSystem getInternal(URI uri, Configuration conf, Key key)

	FileSystem fs;
	fs = createFileSystem(uri, conf);

------ [FileSystem] static FileSystem createFileSystem(URI uri, Configuration conf)

	    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
	    //创建FileSystem实例
	    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
	    //初始化过程,给dfs赋值
	    fs.initialize(uri, conf);
	    return fs;

------- [DistributedFileSystem] void initialize(URI uri, Configuration conf)
 

		  this.dfs = new DFSClient(uri, conf, statistics);
		  this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
		  this.workingDir = getHomeDirectory();

-------- [DFSClient] DFSClient(URI nameNodeUri, Configuration conf,FileSystem.Statistics stats)

        this(nameNodeUri, null, conf, stats);

--------- [DFSClient] DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats)

        //给dfs中各个属性赋值

读流程概述

客户端通过调用文件系统实例的open()方法就可以打开系统中需要读取的文件。HDFS通过rpc调用namenode获取文件块的位置信息,对于文件的每一块,namenode会返回含有该块的datanode的节点地址;客户端还会根据网络拓扑来确定每一个datanode与它的距离信息,从离它最"近"的datanode上获取数据,最理想的情况是数据块就存储在客户端所在节点上

HDFS会返回一个FSDataInputStream对象,底层为DFSDataInputStream对象,这个对象管理着datanode和namenode的交互,读过程可概括为:

1、客户端发起读请求

2、客户端从namenode得到文件的块及位置信息列表

3、客户端直接和datanode交互读取数据

4、读取完毕关闭连接

当FSDataInputStream与datanode通信时遇到错误,它会选择另一个较近的datanode,并为故障的datanode做标记一面重复向其读取数据;FSDataInputStream还会对读取的数据块进行校验和确认(CRC32C),发现块损坏也会通知namenode并重新读取

这样设计的优点在于:

1、客户端直连datanode读取数据,可以使HDFS扩展到大量的并发客户端,数据流是分散在集群中的节点上的

2、namenode的内存中仅需要相应块的位置信息请求 ,不然随着客户端增加namenode会很快成为瓶颈

Hadoop网络拓扑

hadoop中把网络拓扑看成一棵树,两个节点的距离等于它们到最近共同祖先距离的总和。树的层次:

  • 同一节点中的进程
  • 同一机架上的不同节点
  • 同一数据中不同机架
  • 不同数据中心的节点

若数据中心d1中一个机架r1中的一个节点n1表示为d1/r1/n1,则:

  • distance(d1/r1/n1,d1/r1/n1)=0
  • distance(d1/r1/n1,d1/r1/n2)=2
  • distance(d1/r1/n1,d1/r2/n3)=4
  • distance(d1/r1/n1,d2/r3/n4)=6

打开文件

FSDataInputStream fsin = fs.open(path);

- [FileSystem] FSDataInputStream open(Path f)

    return open(f, getConf().getInt("io.file.buffer.size", 4096));

-- [DistributedFileSystem] FSDataInputStream open(Path f, final int bufferSize)

	    //获取绝对路径,absF=hdfs://master1:9000/d2/f1
	    Path absF = fixRelativePart(f);
	    //创建一个FileSystemLinkResolver类的对象,并调用resolve()方法
	    //FileSystemLinkResolver是一个抽象类,需要补全doCall()和next()
	    return new FileSystemLinkResolver<FSDataInputStream>() {
	      @Override
	      //对文件系统中符号链接的解析
	      public FSDataInputStream doCall(final Path p)
		  throws IOException, UnresolvedLinkException {
		final DFSInputStream dfsis =
		  dfs.open(getPathName(p), bufferSize, verifyChecksum);
		return dfs.createWrappedInputStream(dfsis);
	      }
	      @Override
	      public FSDataInputStream next(final FileSystem fs, final Path p)
		  throws IOException {
		return fs.open(p, bufferSize);
	      }
	    }.resolve(this, absF);

--- [FileSystemLinkResolver<T>] T resolve(final FileSystem filesys, final Path path)

	    for (boolean isLink = true; isLink;) {
	      try {
		    in = doCall(p);

--- [DFSClient] DFSInputStream open(String src, int buffersize, boolean verifyChecksum)

    return new DFSInputStream(this, src, verifyChecksum);

---- [DFSInputStream] DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)

	    this.dfsClient = dfsClient;
	    this.verifyChecksum = verifyChecksum;
	    this.src = src;
	    synchronized (infoLock) {
	      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
	    }
	    //通过dfs从namenode中获取要打开文件的信息
	    openInfo();

--- [DFSClient] HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)

	//底层为一个FilterInputStream
	return new HdfsDataInputStream(dfsis);

HDFS的open操作其实只是通过rpc向namenode索要了目标文件各数据块的存储地点和文件长度

 

IOUtils.copyBytes(fsin, System.out, 4096, false);

- [IOUtils] static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 

close参数指定了读完后是否关闭相关流,因为当前只读取一个datanode节点,不需要关闭;若需要读取多个datanode,将关闭与存有前一个数据块的Datanode的连接后,DFSInputStream就会继续从Namenode发送回来的下一个块的Datanode列表信息中寻找最佳的Datanode节点

	    try {
	      copyBytes(in, out, buffSize);
	      if(close) {
		out.close();
		out = null;
		in.close();
		in = null;
	      }
	    } finally {
	      if(close) {
		closeStream(out);
		closeStream(in);
	      }
	    }

-- [IOUtils] static void copyBytes(InputStream in, OutputStream out, int buffSize) 

	    //buf长度为指定的4096
	    byte buf[] = new byte[buffSize];
	    //读到buf中,byteRead为10
	    int bytesRead = in.read(buf);
	    while (bytesRead >= 0) {
	      //写入到输出流
	      out.write(buf, 0, bytesRead);
	      if ((ps != null) && ps.checkError()) {
		throw new IOException("Unable to write to output stream.");
	      }
          //while循环中重复执行read方法
	      bytesRead = in.read(buf);
	    }

 

 


http://www.niftyadmin.cn/n/1033133.html

相关文章

Hadoop:文件操作之Java接口(FileSystem类)

目录 通过java.net.URL读取数据 通过FlieSystem API读写数据 读取数据 写入数据 新建目录 新建文件并写入 追加写入 一致模型 修改配置信息 通过FileStatus查询文件系统 查看文件系统信息 查看块信息 查看datanode信息 通过java.net.URL读取数据 让java程序能够识…

ExitCodeExcetion excode=-1073741701:eclipes运行mapreduce在本地local模式下报错

这几天在mapreduce上摸爬滚打&#xff0c;可谓是困难重重 背景如下&#xff1a;mapreduce.framework.name默认为local&#xff0c;在本地(file:///)eclipses运行mapreduce作业&#xff0c;报错 万万没想到是因为bin下的winutils.exe不能运行 下载个 DirectX修复工具 即可

Hadoop:MapReduce概述、WordCount

MapReduce概述 MapReduce是Hadoop的两大核心技术之一&#xff0c;HDFS解决了大数据存取问题&#xff0c;而MapReduce是对大数据的高效并行编程模型。 MapReduce任务分为两个阶段&#xff1a;map与reduce&#xff1b;每阶段都是以键值对(key-value)作为输入和输出的&#xff1…

Hadoop:MapReduce之Mapper类的输入

目录 Mapper类 Mapper的输入 InputFormat 文件输入FileInputFormat & 输入分片InputSplit 文本输入TextInputFormat & 行记录阅读器LineRecordReader Mapper的输出 收集器Collector 分区器Partitioner 案例&#xff1a;分别计算奇数行和偶数行之和 Hadoop的代…

MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

MapOutputBuffer 在上一篇博客中说过&#xff0c;Mapper的输出中有两个重要部分&#xff1a;一是collector&#xff0c;负责收集Mapper输出并将其交付给Reducer&#xff1b;二是partitioner&#xff0c;决定了应该将具体的输出交付给哪一个Reducer。 Mapper的输出是通过其Rec…

MapReduce:关于RecordReader调用getCurrentKey()和getCurrentValue()时返回相同键-值对象

在《Hadoop权威指南 第4版》的P219&#xff0c;关于Mapper类的run()方法部分有这样一段描述&#xff1a; 由于效率的原因&#xff0c;RecordReader程序每次调用getCurrentKey()和getCurrentValue()时将返回相同的键-值对象。只是这些对象的内容被reader的netKeyValue()方法改变…

MapReduce案例:求共同好友

目录 需求1&#xff1a;所有人两两之间的共同好友 需求2&#xff1a;互为好友的两人之间的共同好友 需求1&#xff1a;所有人两两之间的共同好友 原文件&#xff1a; A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:…

MapReduce:shuffle阶段之Mapper输出

shuffle本意为混洗&#xff0c;MR将排完序的mapper输出作为reducer的输入的过程就称为shuffle&#xff0c;可以理解为mapper到reducer的中间过程&#xff0c;在这个过程中MR框架其实干了很多事。 Mapper输出阶段概述 map函数开始产生输出时(调用context.write()方法&#xff0…