HDFS的Java API

news/2024/5/20 3:40:05 标签: hdfs

hdfs文件读取流程">hdfs文件读取流程

  • client调用FileSystem.open()方法
    1. FileSystem通过RPC与NN通信,NN返回该文件的部分或全部block列表(含有block拷贝的DN地址)
    2. 选取距离客户端最近的DN建立连接,读取block,返回FSDataInputStream
  • client调用输入流的read()方法
    1. 当读取到block结尾时,FSDataInputStream关闭与当前DN的连接,并为读取下一个block寻找最近DN
    2. 读取完一个block都会进行checksum验证,如果读取DN时出现错误,客户端会通知NN,然后再从下一个拥有该block拷贝的DN继续读
    3. 如果block列表读完后,文件还未结束,FileSystem会继续从NN获取下一批block列表
  • 关闭FSDataInputStream

hdfs文件写入流程">hdfs文件写入流程

  • client调用FileSystem的create()方法
    1. FileSystem向NN发出请求,在NN的namespace里面创建一新文件,但是并不关联任何块
    2. NN检查文件是否已存在、操作权限。如果检查通过,NN记录新文件信息,并在某一个DN上创建数据块
    3. 返回FSDataOutStream,将client引导至该数据块执行写入操作
  • client调用输入流的write()方法
    HDFS默认FSDataOutStream将数据首先写到第一节点,第一节点将数据包传送并写入第二节点,依次进行
  • client调用流的close()方法
    flush缓冲区的数据包,block完成复制份数后,NN返回成功消息

hdfs内文件内容">url方式读取HDFS内文件内容

static {
    // 让Java程序识别HDFS的URL
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}

public void urlway(String fileurl) {
    InputStream in = null;
    try {
        in = new URL(fileurl).openStream();
        // 4096是复制缓冲区的大小
        IOUtils.copyBytes(in, System.out, 4096);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        IOUtils.closeStream(in);
    }
}

// 调用
urlway("hdfs://localhost.localdomain:9000/in/dir/0.log");

用FileSystem类操作

package hadoop.examples;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

public class WordCount {

    static {
        // 让Java程序识别HDFS的URL
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    public static void urlway(String fileurl) {
        InputStream in = null;
        try {
            in = new URL(fileurl).openStream();
            // 4096是复制缓冲区的大小
            IOUtils.copyBytes(in, System.out, 4096);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeStream(in);
        }
    }

    // 获得FileSystem
    public static FileSystem GetFileSystem() {
        FileSystem hdfs = null;
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost.localdomain:9000");
            hdfs = FileSystem.get(conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return hdfs;
    }

    // 上传文件
    public static void upload() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path srcpath = new Path("c:/123.log");
        Path path = new Path("/in/dir");
        hdfs.copyFromLocalFile(srcpath, path);
        //=hdfs.copyFromLocalFile(false, true, srcpath, path);
    }

    // 查看目录
    public static void scandir() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path path = new Path("/in/dir");
        FileStatus[] fileStatus = hdfs.listStatus(path);
        for (FileStatus fs : fileStatus) {
            Path p = fs.getPath();
            String info = fs.isFile() ? "file" : "dir";
            System.out.println(p.toString()+":"+info);
        }
    }

    // 创建目录
    public static void makedir() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path path = new Path("/in/dirx");
        boolean isSuc = hdfs.mkdirs(path);
        if (isSuc) System.out.println("mkdir yes");
        else System.out.println("mkdir no");
    }

    // 创建文件并写入内容
    public static void createandwrite() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path path = new Path("/in/dir/789.log");
        FSDataOutputStream s = hdfs.create(path);
        s.writeChars("hello");
        s.close();
    }

    // 删除重命名
    public static void delorrename() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path src = new Path("/in/dir/789.log");
        Path dst = new Path("/in/dir/0.log");
        hdfs.rename(src, dst);
        Path del = new Path("/in/dir/456.log");
        hdfs.deleteOnExit(del);
    }

    // 读取文件内容
    public static void read() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path path = new Path("/record.txt");
        FSDataInputStream inStream = hdfs.open(path);
        // 读取文件到控制台
        IOUtils.copyBytes(inStream, System.out, 4096);
        IOUtils.closeStream(inStream);
    }

    // 文件在集群中位置
    public static void getstatus() throws Exception {
        FileSystem hdfs = GetFileSystem();
        Path path = new Path("/in/dir/0.log");
        FileStatus fileStatus = hdfs.getFileStatus(path);
        BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
        for(BlockLocation blockLocation:blockLocations) {
            String[] hosts = blockLocation.getHosts();
            for(String host:hosts) {
                System.out.print(host+" ");
            }
        }
    }

    // 集群所有节点信息
    public static void getcluster() throws Exception {
        FileSystem hdfs = GetFileSystem();
        DistributedFileSystem dfs = (DistributedFileSystem)hdfs;

        DatanodeInfo[] inos = dfs.getDataNodeStats();
        for(DatanodeInfo info:inos) {
            String hostname = info.getHostName();
            System.out.print(hostname+" ");
        }
    }

    /**
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        urlway("hdfs://localhost.localdomain:9000/in/dir/0.log");
        getcluster();
    }
}

几个API

public abstract class FileSystem extends Configured implements Closeable {

  /**
   * The src file is on the local disk.  Add it to FS at
   * the given dst name.
   * delSrc indicates if the source should be removed
   * @param delSrc whether to delete the src
   * @param overwrite whether to overwrite an existing file
   * @param src path
   * @param dst path
   */
  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
                                Path src, Path dst)
    throws IOException {
    Configuration conf = getConf();
    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
  }

  /**
   * Copies from one stream to another.
   *
   * @param in InputStrem to read from
   * @param out OutputStream to write to
   * @param buffSize the size of the buffer
   */
  public static void copyBytes(InputStream in, OutputStream out, int buffSize)
    throws IOException {
    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    byte buf[] = new byte[buffSize];
    int bytesRead = in.read(buf);
    while (bytesRead >= 0) {
      out.write(buf, 0, bytesRead);
      if ((ps != null) && ps.checkError()) {
        throw new IOException("Unable to write to output stream.");
      }
      bytesRead = in.read(buf);
    }
  }

  /**
   * Return an array containing hostnames, offset and size of 
   * portions of the given file.  For a nonexistent 
   * file or regions, null will be returned.
   *
   * This call is most helpful with DFS, where it returns 
   * hostnames of machines that contain the given file.
   *
   * The FileSystem will simply return an elt containing 'localhost'.
   *
   * @param p path is used to identify an FS since an FS could have
   *          another FS that it could be delegating the call to
   * @param start offset into the given file
   * @param len length for which to get locations for
   */
  public BlockLocation[] getFileBlockLocations(Path p, 
      long start, long len) throws IOException {
    if (p == null) {
      throw new NullPointerException();
    }
    FileStatus file = getFileStatus(p);
    return getFileBlockLocations(file, start, len);
  }
}

http://www.niftyadmin.cn/n/1453473.html

相关文章

Codeforces 385B Bear and Strings(字符串)

题目连接:Codeforces 385B Bear and Strings 题目大意:给出一个字符串,问说该字符串中有多少个子串包含“bear”。 解题思路:遍历,每次找到“bear”,就用该串前面个字符数x,以及该串后面的字符数…

Codeforces 385A Bear and Raspberry(水题)

题目链接&#xff1a;Codeforces 385A Bear and Raspberry 题目大意&#xff1a;就是给出一个序列&#xff0c;然后相邻两个数之间差的最大值于C的差&#xff0c;小于0的话要输出0. 解题思路&#xff1a;水题。 #include <stdio.h> #include <string.h> #include &…

用XFire 实现webservice

Web Services&#xff1a;是由企业发布的完成其特定商务需求的在线应用服务,其他公司或应用软件能够通过Internet来访问并使用这项在线服务&#xff08;来自百度百科&#xff0c;详情请移步http://baike.baidu.com/view/837392.htm&#xff09;。 在构建和使用Web Service时,主…

Hadoop的序列化和数据类型

接口 serialization&#xff0c;结构化对象转化为字节流 deserialization&#xff0c;字节流转化为结构化对象 序列化在分布式数据处理的两大领域经常出现&#xff1a;进程间通信和永久存储。 Hadoop使用自己的序列化格式Writable&#xff0c;它格式紧凑&#xff0c;速度快…

【索引】CodeForces Round #226 (Div. 2)

Problem A: Bear and Raspberry(385A) Problem B: Bear and Strings(385B) Problem C: Bear and Prime Numbers (385C) Problem D: Bear and Floodlight(385D) Problem E: Bear in the Field (385E)

101. Symmetric Tree

https://leetcode.com/problems/symmetric-tree/description/ 判断一棵树是否对称 思路1&#xff1a;naive的方法。先层序遍历&#xff0c;检查每一层是否对称。此时还不能说明是对称&#xff08;为什么&#xff1f;见下图&#xff09; 这棵树层序遍历是“对称”的&#x…

STL之numeric

头文件<numeric>在数值序列上定义了一组一般数学操作&#xff0c;也可以用于其他序列。 有四个函数模板&#xff1a; namedescriptionaccumulate累加和adjacent_difference相邻元素之差inner_product累加内积partial_sum部分和 对每个操作&#xff0c;你都可以自定义“…

uva 11572 - Unique Snowflakes(Towpointer)

题目连接&#xff1a;uva 11572 - Unique Snowflakes 题目大意&#xff1a;给出一个字符串&#xff0c;找出最长的连续子串不含相同的数字。 解题思路&#xff1a;Towpointer&#xff0c;维护一个区间&#xff0c;保证没有相同的数字&#xff0c;同时维护最大长度。然后有因为数…