HDFS多文件Join操作

news/2024/5/20 3:40:03 标签: HDFS多文件Join操作, HDFS, Join操作

最近在用Java做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作

下面是个简单的例子;采用两个表来做left join其中数据结构如下:

A 文件:

a|1b|2|c

B文件:

a|b|1|2|c

即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键

代码如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
 
import cn.eshore.traffic.hadoop.util.CommUtil;
import cn.eshore.traffic.hadoop.util.StringUtil;
 
 
 
 
/**
 * @ClassName: DataJoin
 * @Description: HDFS JOIN操作
 * @author hadoop
 * @date 2012-12-18 下午5:51:32
 */
public class InstallJoin extends Configured implements Tool {
 
private String static  enSplitCode = "\\|";
 
private String static splitCode = "|";
 
 
// 自定义Reducer
public static class ReduceClass extends DataJoinReducerBase {
 
 
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
String joinedStr = "";
 
//该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】
 
//去掉则为All Join
if (tags.length == 1 && tags[0].toString().contains("install")) {
return null;
}
 
 
Map<String, String> map = new HashMap<String, String>();
for (int i = 0; i < values.length; i++) {
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
 
 
String[] tokens = line.split(enSplitCode, 8);
String groupValue = tokens[6];
 
String type =  tokens[7];
 
map.put(type, groupValue);
}
 
joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
 
 
// 自定义Mapper
public static class MapClass extends DataJoinMapperBase {
 
 
//自定义Key【类似数据库中的主键/外键】
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(CommUtil.enSplitCode);
 
 
String key = "";
String type = tokens[7];
 
//由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据
if ("7".equals(type)) {
key =  tokens[0]+"|"+tokens[1];
}else if ("30".equals(type)) {
key =  tokens[0]+"|"+tokens[2];
}
return new Text(key);
}
 
 
@Override
protected Text generateInputTag(String inputFile) {
return new Text(inputFile);
}
 
 
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
 
 
}
 
 
public static class TaggedWritable extends TaggedMapOutput {
 
 
private Writable data;
 
 
// 自定义
public TaggedWritable() {
this.tag = new Text("");
}
 
 
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
 
 
@Override
public Writable getData() {
return data;
}
 
 
@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
 
 
@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}
 
 
}
 
 
/**
* job运行
*/
@Override
public int run(String[] paths) throws Exception {
int no = 0;
try {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InstallJoin.class);
FileInputFormat.setInputPaths(job, new Path(paths[0]));
FileOutputFormat.setOutputPath(job, new Path(paths[1]));
job.setJobName("join_data_test");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", CommUtil.splitCode);
JobClient.runJob(job);
no = 1;
} catch (Exception e) {
throw new Exception();
}
return no;
}
 
 
//测试
public static void main(String[] args) {
String[] paths = {
"hdfs://master...:9000/home/hadoop/traffic/join/newtype",
"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }
 
int res = 0;
try {
res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}

 


http://www.niftyadmin.cn/n/1555463.html

相关文章

Nginx 权限控制文件预览和下载

author: momo date: 2020-07-31 06:00 基于 Nginx Java(SpringBoot) 实现带权限验证的静态文件服务器&#xff0c;支持文件下载、PDF预览和图片预览。 需要注意的是&#xff0c;无需权限判断的图片不建议使用此方法&#xff0c;大量的图片访问会增加后台服务器的处理压力。 …

Hadoop多Job并行处理

有关Hadoop多Job任务并行处理&#xff0c;经过测试&#xff0c;配置如下&#xff1a; 首先做如下配置&#xff1a; 1、修改mapred-site.xml添加调度器配置&#xff1a; <property> <name>mapred.jobtracker.taskScheduler</name> <value>org.ap…

Docker 部署 SeafilePro + OnlyOffice(CentOS版)

Docker 部署 SeafilePro OnlyOffice&#xff08;CentOS版&#xff09; 前言 本教程完全基于Seafile官方文档进行安装部署的完全操作&#xff08;傻瓜式&#xff09;指南。 官方文档 1.Seafile 官方文档-用Docker部署Seafile 2.Seafile 官方文档-OnlyOffice 集成 滚蛋吧202…

用Sqoop把数据从HDFS导入到关系型数据库

由于工作的需求&#xff0c;需要把HDFS中处理之后的数据转移至关系型数据库中成为对应的Table&#xff0c;在网上寻找有关的资料良久&#xff0c;发现各个说法不一&#xff0c;下面是本人自身测试过程&#xff1a; 使用Sqoop来实现这一需求&#xff0c;首先要明白Sqoop是什么&…

JConsole之Java性能分析器使用

一、JConsole是什么 从Java 5开始 引入了 JConsole。JConsole 是一个内置 Java 性能分析器&#xff0c;可以从命令行或在 GUI shell 中运行。您可以轻松地使用 JConsole&#xff08;或者&#xff0c;它更高端的 “近亲” VisualVM &#xff09;来监控 Java 应用程序性能和跟…

关于Ehcache缓存中timeToLiveSeconds和timeToIdleSeconds

闲来无事测试了下Ehcache与MemCache比较&#xff0c;在此发现了Ehcache中一个小细节问题&#xff0c;以前未用心去注意过&#xff0c;在此特记录一下&#xff0c;同时也望能给需要的道友留下些益处&#xff1a; 其中主要记录的是timeToLiveSeconds和timeToIdleSeconds&#xf…

Oracle客户端使用

在日常开发中 有好些新同事不太明白如何连接Oracle服务端&#xff0c;在这里 我做个Oracle客户端常用方式 的简述&#xff1a; 其实连接Oracle服务的方式很多如&#xff1a;Native、PL、、&#xff0c; 其中Native for Oracle非常简单的配置 在做数据量小 或者说 数据简单的情…

Eclipse/MyEclipse中使用VSS

欲在MyEclipse/Eclipse中使用VSS&#xff0c;必先安装VSS插件、其安装方法与安装SVN类似&#xff0c;不多说&#xff1b;进入正题&#xff1a; 一、首先需要新建一个项目&#xff0c;即&#xff1a;本地工作目录如&#xff0c;项目类型可以自己根据需要来定义下图所示&#xf…