HDFS文件常用操作

news/2024/5/20 4:29:26 标签: HDFS文件常用操作, HDFS, 文件常用操作

弄了段时间hadoop的HDFS,用了些常用的HDFS文件操作,Java实现记录如下,以作Demo:

/**
* @Title: uploadLocalFileToHDFS
* @Description: 单个本地文件拷贝到HDFS
* @param @param localPath 本地文件路径
* @param @param hdfsPath HDFS文件路径
* @param @throws IOException 设定文件
* @return void 返回类型
* @throws
*/
public static void uploadLocalFileToHDFS(String localPath, String hdfsPath)
throws IOException {
InputStream in = new BufferedInputStream(new FileInputStream(localPath));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
OutputStream out = fs.create(new Path(hdfsPath), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
 
 
/**
* @Title: bathUploadLocalFileToHDFS
* @Description:本地文件批量拷贝到HDFS
* @param localPath
*            本地文件目录
* @param hdfsPath
*            HDFS文件目录
* @param localFileName
*            本地文件名称
* @param date
*            日期
* @return void 返回类型
* @throws
*/
public static boolean batchUploadLocalFileToHDFS(String localPath,
String localFileName, String hdfsPath, String date) {
boolean bln = false;
try {
// 读取本地文件
File[] files = getFiles(localPath, localFileName);
if (files != null && files.length > 0) {
for (File file : files) {
String fileName = file.getName();
uploadLocalFileToHDFS(localPath + "/" + fileName, hdfsPath
+ "/" + date + "/" + fileName);
}
bln = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return bln;
}
 
 
/**
* @Title: hdfsFileReName
* @Description:重命名HDFS文件
* @param @param oldHdfsPath 旧的文件名
* @param @param newHdfsPath 新的文件名
* @return void 返回类型
* @throws
*/
public static boolean hdfsFileReName(String oldHdfsPath, String newHdfsPath) {
boolean isRename = false;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(oldHdfsPath), conf);
Path frpaht = new Path(oldHdfsPath);
Path topath = new Path(newHdfsPath);
isRename = fs.rename(frpaht, topath);
} catch (Exception e) {
e.printStackTrace();
}
return isRename;
}
 
 
/**
* @Title: hdfsFileReName
* @Description:HDFS文件删除
* @param @param oldHdfsPath 旧的文件名
* @param @param newHdfsPath 新的文件名
* @return void 返回类型
* @throws
*/
public static boolean hdfsFileDelete(String hedfFilePath) {
boolean isDeleted = false;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hedfFilePath), conf);
Path frpaht = new Path(hedfFilePath);
// 递归删除
isDeleted = fs.delete(frpaht, true);
} catch (Exception e) {
e.printStackTrace();
}
return isDeleted;
}
 
//HDFS文件目录创建
public static void hdfsDirectoryCreate(String hedfDirPath) {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hedfDirPath), conf);
Path frpaht = new Path(hedfDirPath);
fs.mkdirs(frpaht);
} catch (Exception e) {
e.printStackTrace();
}
}
 
 
// HDFS文件拷贝到本地
public static void downLoadHDFSFileToLocal(String hdfsPath, String localPath)
throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
InputStream in = fs.open(new Path(hdfsPath));
OutputStream out = new BufferedOutputStream(new FileOutputStream(
localPath));
IOUtils.copyBytes(in, out, 4096, false);
IOUtils.closeStream(in);
}
 
 
// HDFS文件重命名
public static void hDFSFileReName(String hdfsPath, String localPath)
throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
InputStream in = fs.open(new Path(hdfsPath));
OutputStream out = new BufferedOutputStream(new FileOutputStream(
localPath));
IOUtils.copyBytes(in, out, 4096, false);
IOUtils.closeStream(in);
}
 
 
/**
* @Title: mergeLocalFileToHDFS
* @Description: 多个本地文件合并拷贝到HDFS
* @param @param localPath 本地文件路径
* @param @param hdfsPath HDFS文件路径
* @param @throws IOException 设定文件
* @return void 返回类型
* @throws
*/
public static void mergeLocalFileToHDFS(String localPath, String hdfsPath)
throws IOException {
// InputStream in = new BufferedInputStream(new
// FileInputStream(localPath));
Configuration conf = new Configuration();
FileSystem local = FileSystem.getLocal(conf);
FileSystem hdfs = FileSystem.get(conf);
Path inputDir = new Path(localPath);
Path hdfsFile = new Path(hdfsPath);
try {
FileStatus[] inputFiles = local.listStatus(inputDir);
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i = 0; i < inputFiles.length; i++) {
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
// 不同文件合并要换行
out.write("\n".getBytes(), 0, "\n".length());
}
out.close();
 
 
} catch (IOException e) {
e.printStackTrace();
}
 
 
} 
 
 
 
//检查HDFS中文件是否存在
public static List<String> checkFileExists(String[] hdfsFilePaths) throws IOException {
List<String> pathList = new ArrayList<String>();
Configuration conf = new Configuration();
for (String item : hdfsFilePaths) {
Path path = new Path(item);
FileSystem fs = FileSystem.get(URI.create(item), conf);
if (fs.exists(path) == true){
pathList.add(item);
}
}
return pathList;
}
 
//HDFS生成后,处理
public static void hdfsFileHandle(Object[] Paths){
String oldFileName = Paths[1].toString();
String newFileName = Paths[2].toString();
String delFileName = Paths[3].toString();
// 重命名、
FileUtil.hdfsFileReName(oldFileName, newFileName);
// 删掉重复的文件夹
FileUtil.hdfsFileDelete(delFileName);
}
 
 
//读取HDFS文件
@SuppressWarnings("deprecation")
public static void hdfsFileRead(String filePath) throws IOException {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
Path pathq = new Path(filePath);
FSDataInputStream fsr = fs.open(pathq);
String line = "";
while (line!=null) {
line = fsr.readLine();
if (line!=null) {
System.out.println(line);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
 
//写入HDFS文件
public static void hdfsFileWrite(String filePath,String context){
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
Path pathq = new Path(filePath);
FSDataOutputStream fsr = fs.create(pathq);
fsr.writeBytes(context);
fsr.close();
} catch (Exception e) {
e.printStackTrace();
}
}

 


http://www.niftyadmin.cn/n/1555467.html

相关文章

多个搜索引擎搜索,快人一步

MoMo智能搜索 支持多个搜索引擎搜索&#xff0c;快速切换。 百度、谷歌、Bing、秘迹搜索、CSDN 一、搜索网址 momo智能搜索https://www.itmm.wang/search 二、效果预览 百度 谷歌 Bing 秘迹 CSDN 三、Chrome设置 设置默认搜索和地址栏搜索 点击Chrome右上角“...” → 选择“设…

Hadoop之Join时 DataJoin软件包问题

在做HDFS多文件Join时通过监控job成功启动并且mapper执行完毕&#xff0c;但reduce总是不能执行完成&#xff0c;进度卡在66.66%。mapper输出是想要的格式&#xff0c;而且小规模输入数据测试时整个job能成功完成。在查看未完成reduce的状态&#xff0c;发现key不变&#xff0c…

Nginx 优化静态资源加载 解决 Waiting(TTFB)时间过长问题

前因后果 我的MoMo导航网站每次加载都需要等待两三秒&#xff0c;一直以为是带宽问题&#xff08;因为带宽真的小&#xff0c;钱的问题&#xff09;&#xff0c;后来开了全站 CDN 加速依然没有解决问题&#xff0c;今天正好没事就研究研究。 如图&#xff1a;多个静态文件 Wait…

HDFS多文件Join操作

最近在用Java做HDFS文件处理之时&#xff0c;遇到了多文件Join操作&#xff0c;其中包括&#xff1a;All Join以及常用的Left Join操作&#xff0c; 下面是个简单的例子&#xff1b;采用两个表来做left join其中数据结构如下&#xff1a; A 文件&#xff1a; a|1b|2|c B文件…

Nginx 权限控制文件预览和下载

author: momo date: 2020-07-31 06:00 基于 Nginx Java(SpringBoot) 实现带权限验证的静态文件服务器&#xff0c;支持文件下载、PDF预览和图片预览。 需要注意的是&#xff0c;无需权限判断的图片不建议使用此方法&#xff0c;大量的图片访问会增加后台服务器的处理压力。 …

Hadoop多Job并行处理

有关Hadoop多Job任务并行处理&#xff0c;经过测试&#xff0c;配置如下&#xff1a; 首先做如下配置&#xff1a; 1、修改mapred-site.xml添加调度器配置&#xff1a; <property> <name>mapred.jobtracker.taskScheduler</name> <value>org.ap…

Docker 部署 SeafilePro + OnlyOffice(CentOS版)

Docker 部署 SeafilePro OnlyOffice&#xff08;CentOS版&#xff09; 前言 本教程完全基于Seafile官方文档进行安装部署的完全操作&#xff08;傻瓜式&#xff09;指南。 官方文档 1.Seafile 官方文档-用Docker部署Seafile 2.Seafile 官方文档-OnlyOffice 集成 滚蛋吧202…

用Sqoop把数据从HDFS导入到关系型数据库

由于工作的需求&#xff0c;需要把HDFS中处理之后的数据转移至关系型数据库中成为对应的Table&#xff0c;在网上寻找有关的资料良久&#xff0c;发现各个说法不一&#xff0c;下面是本人自身测试过程&#xff1a; 使用Sqoop来实现这一需求&#xff0c;首先要明白Sqoop是什么&…