DistributedFileSystem和DFSClient
Hadoop可以支持不止一种的文件系统,比如对宿主机的文件系统RawLocalSystem、运行在Amazon平台上的S3FileSystem等,所以Hadoop定义了一个FileSystem的抽象类。
DistributedFileSystem继承于FileSystem,是一种具体的文件系统即HDFS;hadoop中还定义了一个抽象类AbstractFileSystem,以及扩充子类Hdfs。DistributedFileSystem与Hdfs类内部都有个DFSClient类的对象dfs。
DFSClient是HDFS文件系统的用户(客户端)与namenode和datanode交互的桥梁;针对namenode和datanode的rpc调用都是通过DFSClient类对象实现的;也就是客户端对HDFS的操作最底层都是通过DFSClient完成的。
通常我们再写代码时使用的是对FileSystem的引用而不是DistributedFileSystem的引用,通过FileSystem.get(Configuration conf)可以获取到DistributedFileSystem类对象,因为conf中有相关hadoop的配置信息,这是一个向下造型的过程。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class HDFSWriteTest {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration(); //读取配置文件core-default.xml和core-site.xml
FileSystem fs = FileSystem.get(conf);
//该文件长度为10
Path path = new Path("hdfs://master1:9000/d2/f1"); //创建Path对象,指定要读取的文件
FSDataInputStream fsin = fs.open(path);
IOUtils.copyBytes(fsin, System.out, 4096, false);
IOUtils.closeStream(fsin); //关闭流
}
}
获取DistributedFileSystem对象
FileSystem fs = FileSystem.get(conf);
- [FileSystem] static FileSystem get(Configuration conf)
return get(getDefaultUri(conf), conf);
-- [FileSystem] static URI getDefaultUri(Configuration conf)
//return "hdfs://master1:9000"
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
--- [FileSystem] static FileSystem get(URI uri, Configuration conf)
//scheme = hdfs
String scheme = uri.getScheme();
//authority = master1:9000
String authority = uri.getAuthority();
//disableCacheName = fs.hdfs.impl.disable.cache 是否启动缓存的属性
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
return CACHE.get(uri, conf);
---- [FileSystem.Cache] FileSystem get(URI uri, Configuration conf)
//conf = Configuration: core-default.xml, core-site.xml,
//mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
//此时读取了剩下的6个配置文件
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
----- [FileSystem] FileSystem getInternal(URI uri, Configuration conf, Key key)
FileSystem fs;
fs = createFileSystem(uri, conf);
------ [FileSystem] static FileSystem createFileSystem(URI uri, Configuration conf)
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
//创建FileSystem实例
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//初始化过程,给dfs赋值
fs.initialize(uri, conf);
return fs;
------- [DistributedFileSystem] void initialize(URI uri, Configuration conf)
this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
-------- [DFSClient] DFSClient(URI nameNodeUri, Configuration conf,FileSystem.Statistics stats)
this(nameNodeUri, null, conf, stats);
--------- [DFSClient] DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats)
//给dfs中各个属性赋值
读流程概述
客户端通过调用文件系统实例的open()方法就可以打开系统中需要读取的文件。HDFS通过rpc调用namenode获取文件块的位置信息,对于文件的每一块,namenode会返回含有该块的datanode的节点地址;客户端还会根据网络拓扑来确定每一个datanode与它的距离信息,从离它最"近"的datanode上获取数据,最理想的情况是数据块就存储在客户端所在节点上
HDFS会返回一个FSDataInputStream对象,底层为DFSDataInputStream对象,这个对象管理着datanode和namenode的交互,读过程可概括为:
1、客户端发起读请求
2、客户端从namenode得到文件的块及位置信息列表
3、客户端直接和datanode交互读取数据
4、读取完毕关闭连接
当FSDataInputStream与datanode通信时遇到错误,它会选择另一个较近的datanode,并为故障的datanode做标记一面重复向其读取数据;FSDataInputStream还会对读取的数据块进行校验和确认(CRC32C),发现块损坏也会通知namenode并重新读取
这样设计的优点在于:
1、客户端直连datanode读取数据,可以使HDFS扩展到大量的并发客户端,数据流是分散在集群中的节点上的
2、namenode的内存中仅需要相应块的位置信息请求 ,不然随着客户端增加namenode会很快成为瓶颈
Hadoop网络拓扑
hadoop中把网络拓扑看成一棵树,两个节点的距离等于它们到最近共同祖先距离的总和。树的层次:
- 同一节点中的进程
- 同一机架上的不同节点
- 同一数据中不同机架
- 不同数据中心的节点
若数据中心d1中一个机架r1中的一个节点n1表示为d1/r1/n1,则:
- distance(d1/r1/n1,d1/r1/n1)=0
- distance(d1/r1/n1,d1/r1/n2)=2
- distance(d1/r1/n1,d1/r2/n3)=4
- distance(d1/r1/n1,d2/r3/n4)=6
打开文件
FSDataInputStream fsin = fs.open(path);
- [FileSystem] FSDataInputStream open(Path f)
return open(f, getConf().getInt("io.file.buffer.size", 4096));
-- [DistributedFileSystem] FSDataInputStream open(Path f, final int bufferSize)
//获取绝对路径,absF=hdfs://master1:9000/d2/f1
Path absF = fixRelativePart(f);
//创建一个FileSystemLinkResolver类的对象,并调用resolve()方法
//FileSystemLinkResolver是一个抽象类,需要补全doCall()和next()
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
//对文件系统中符号链接的解析
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
--- [FileSystemLinkResolver<T>] T resolve(final FileSystem filesys, final Path path)
for (boolean isLink = true; isLink;) {
try {
in = doCall(p);
--- [DFSClient] DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
return new DFSInputStream(this, src, verifyChecksum);
---- [DFSInputStream] DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
//通过dfs从namenode中获取要打开文件的信息
openInfo();
--- [DFSClient] HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
//底层为一个FilterInputStream
return new HdfsDataInputStream(dfsis);
HDFS的open操作其实只是通过rpc向namenode索要了目标文件各数据块的存储地点和文件长度
读
IOUtils.copyBytes(fsin, System.out, 4096, false);
- [IOUtils] static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
close参数指定了读完后是否关闭相关流,因为当前只读取一个datanode节点,不需要关闭;若需要读取多个datanode,将关闭与存有前一个数据块的Datanode的连接后,DFSInputStream就会继续从Namenode发送回来的下一个块的Datanode列表信息中寻找最佳的Datanode节点
try {
copyBytes(in, out, buffSize);
if(close) {
out.close();
out = null;
in.close();
in = null;
}
} finally {
if(close) {
closeStream(out);
closeStream(in);
}
}
-- [IOUtils] static void copyBytes(InputStream in, OutputStream out, int buffSize)
//buf长度为指定的4096
byte buf[] = new byte[buffSize];
//读到buf中,byteRead为10
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
//写入到输出流
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
//while循环中重复执行read方法
bytesRead = in.read(buf);
}