Eclipse搭建Hadoop环境及实战资源分享

news/2024/5/20 5:09:30 标签: eclipse, hadoop, mapreduce, hdfs

首先搭建eclipse的haoop2.7.1开发环境,使用的资源链接如下:

windows安装hadoop2.7.1环境

eclipse下搭建hadoop开发环境

这样我们就可以在eclipse进行hadoop开发了


目录

一、MapReduce 模型简介

1.Map 和 Reduce 函数

2.MapReduce 体系结构

3.MapReduce 工作流程

4.MapReduce 应用程序执行过程

 二、MapReduce 实战

1.数据去重

2.数据排序

3.平均成绩

4.单表关联

 三、总结

 


一、MapReduce 模型简介

MapReduce 将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map Reduce 。它采用 分而治之 策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split ),这些分片可以被多个 Map 任务并行处理。

1.Map 和 Reduce 函数

2.MapReduce 体系结构

MapReduce 体系结构主要由四个部分组成,分别是: Client JobTracker、 TaskTracker 以及 Task

1)Client

  用户编写的MapReduce程序通过Client提交到JobTracker端 用户可通过Client提供的一些接口查看作业运行状态

2)JobTracker

JobTracker负责资源监控和作业调度 JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点 JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源

3)TaskTracker

TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等) TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用

4)Task

Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动

3.MapReduce 工作流程

1) 工作流程概述

 

  • 不同的Map任务之间不会进行通信
  • 不同的Reduce任务之间也不会发生任何信息交换
  • 用户不能显式地从一台机器向另一台机器发送消息
  • 所有的数据交换都是通过MapReduce框架自身去实现的

2) MapReduce各个执行阶段

 4.MapReduce 应用程序执行过程

 


 二、MapReduce 实战

1.数据去重

"数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。

1.1实例描述

对数据文件中的数据进行去重。数据文件中的每行都是一个数据。样例输入如下所示:

1)file1:

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

2)file2:

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

样例输出如下所示:

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

1.2 解题思路

map阶段:将每一行的文本作为键值对的key

 reduce阶段:将每一个公用的键组输出

1.3 代码展示

package datadeduplicate.pers.xls.datadeduplicate;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.BasicConfigurator;

public class Deduplication {
    public static void main(String[] args) throws Exception {
    			BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
    			//必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
    			//1首先寫job,知道需要conf和jobname在去創建即可
                Configuration conf=new Configuration();
                String jobName=Deduplication.class.getSimpleName();
                Job job = Job.getInstance(conf, jobName);
                //2将自定义的MyMapper和MyReducer组装在一起
                //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                //4指定解析<k1,v1>的类(谁来解析键值对)
                //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
                job.setInputFormatClass(TextInputFormat.class);
                //5指定自定义mapper类
                job.setMapperClass(MyMapper.class);
                //6指定map输出的key2的类型和value2的类型  <k2,v2>
                //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                //7分区(默认1个),排序,分组,规约 采用 默认
                job.setCombinerClass(MyReducer.class);
                //接下来采用reduce步骤
                //8指定自定义的reduce类
                job.setReducerClass(MyReducer.class);
                //9指定输出的<k3,v3>类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                //10指定输出<K3,V3>的类
                 //*下面这一步可以省
                job.setOutputFormatClass(TextOutputFormat.class);
                //11指定输出路径
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                //12写的mapreduce程序要交给resource manager运行
                job.waitForCompletion(true);
                //*13最后,如果要打包运行改程序,则需要调用如下行
                job.setJarByClass(Deduplication.class);
    }
    private static class MyMapper extends Mapper<Object, Text, Text, Text>{
        private static Text line=new Text();
        @Override
        protected void map(Object k1, Text v1,Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            line=v1;//v1为每行数据,赋值给line
            context.write(line, new Text(""));
         }
    }
    private static class MyReducer extends Reducer<Text, Text, Text, Text>
    {
        @Override
        protected void reduce(Text k2, Iterable<Text> v2s,Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
             context.write(k2, new Text(""));
         }
    }
}

1.4 运行结果展示

打包项目成可运行的jar包,上传的hdfs文件系统:

 

 在linux系统下终端输入hadoop命令,在建立的hadoop节点上运行jar包:

 查看eclipsehdfs文件系统下out文件夹,发现生成了先前指定的deduplication文件夹,其中part-r-00000为运行的输出。

 2.数据排序

package dararank.pers.xls.datarank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
 
public class DataRank {
    /**
     * 使用Mapper将数据文件中的数据本身作为Mapper输出的key直接输出
     */
    public static class forSortedMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
        private IntWritable mapperValue = new IntWritable(); //存放key的值
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //获取读取的值,转化为String
            mapperValue.set(Integer.parseInt(line)); //将String转化为Int类型
            context.write(mapperValue,new IntWritable(1)); //将每一条记录标记为(key,value) key--数字 value--出现的次数
          //每出现一次就标记为(number,1)
        }
    }
 
    /**
     * 使用Reducer将输入的key本身作为key直接输出
     */
 public static class forSortedReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
        private IntWritable postion = new IntWritable(1); //存放名次
        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            for (IntWritable item :values){ //同一个数字可能出多次,就要多次并列排序
                context.write(postion,key); //写入名次和具体数字
                System.out.println(postion + "\t"+ key);
                postion = new IntWritable(postion.get()+1); //名次加1
            }
        }
    }
 
 
    public static void main(String[] args) throws Exception {
 
    	BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
        
    	Configuration conf = new Configuration(); //设置MapReduce的配置
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length < 2){
            System.out.println("Usage: datarank <in> [<in>...] <out>");
            System.exit(2);
        }
        //设置作业
        //Job job = new Job(conf);
        Job job = Job.getInstance(conf);
        job.setJarByClass(DataRank.class);
        job.setJobName("DataRank");
        //设置处理map,reduce的类
        job.setMapperClass(forSortedMapper.class);
        job.setReducerClass(forSortedReducer.class);
        //设置输入输出格式的处理
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        //设定输入输出路径
        for (int i = 0; i < otherArgs.length-1;++i){
            FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
 
}

3.平均成绩

package averagescoreapp.pers.xls.averagescoreapp;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.BasicConfigurator;
 
/**
 * 求平均成绩
 *
 */
public class AverageScoreApp {
 
	public static class Map extends Mapper<Object, Text, Text, IntWritable>{
		@Override
		protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
			//成绩的结构是:
			// 张三	80
			// 李四	82
			// 王五	86
			StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\n");
			while(tokenizer.hasMoreElements()) {
				StringTokenizer lineTokenizer = new StringTokenizer(tokenizer.nextToken());
				String name = lineTokenizer.nextToken(); //姓名
				String score = lineTokenizer.nextToken();//成绩
				context.write(new Text(name), new IntWritable(Integer.parseInt(score)));
			}
		}
	}
	
	public static class Reduce extends Reducer<Text, IntWritable, Text, DoubleWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
				throws IOException, InterruptedException {
			//reduce这里输入的数据结构是:
			// 张三 <80,85,90>
			// 李四 <82,88,94>
			// 王五 <86,80,92>
			int sum = 0;//所有课程成绩总分
			double average = 0;//平均成绩
			int courseNum = 0; //课程数目
			for(IntWritable score:values) {
				sum += score.get();
				courseNum++;
			}
			average = sum/courseNum;
			context.write(new Text(key), new DoubleWritable(average));
		}
	}
	
	public static void main(String[] args) throws Exception{
		BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length < 2){
            System.out.println("Usage: AverageScoreRank <in> [<in>...] <out>");
            System.exit(2);
        }
		Job job = Job.getInstance(conf);
		job.setJarByClass(AverageScoreApp.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);
		
		 //设定输入输出路径
        for (int i = 0; i < otherArgs.length-1;++i){
            FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
		
		System.exit(job.waitForCompletion(true)?0:1);
	}
 
}

 4.单表关联

package singletabblerelation.pers.xls.singletablerelation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.BasicConfigurator;

public class SingleTableRelation {
    public static int time = 0;
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
    protected void map(LongWritable key, Text value, Context context)throws java.io.IOException, InterruptedException {
        	// 左右表的标识
            int relation;
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            String child = tokenizer.nextToken();
            String parent = tokenizer.nextToken();
            if (child.compareTo("child") != 0) {
                // 左表
                relation = 1;
                context.write(new Text(parent), new Text(relation + "+" + child));
                // 右表
                relation = 2;
                context.write(new Text(child), new Text(relation + "+" + parent));
            }
        };

    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context output)
                throws java.io.IOException, InterruptedException {
            int grandchildnum = 0;
            int grandparentnum = 0;
            List<String> grandchilds = new ArrayList<>();
            List<String> grandparents = new ArrayList<>();
            /** 输出表头 */
            if (time == 0) {
                output.write(new Text("grandchild"), new Text("grandparent"));
                time++;
            }
            for (Text val : values) {
                String record = val.toString();
                char relation = record.charAt(0);
                // 取出此时key所对应的child
                if (relation == '1') {
                    String child = record.substring(2);
                    grandchilds.add(child);
                    grandchildnum++;
                }
                // 取出此时key所对应的parent
                else {
                    String parent = record.substring(2);
                    grandparents.add(parent);
                    grandparentnum++;
                }
            }
            if (grandchildnum != 0 && grandparentnum != 0) {
                for (int i = 0; i < grandchildnum; i++)
                    for (int j = 0; j < grandparentnum; j++)
                        output.write(new Text(grandchilds.get(i)), new Text(
                                grandparents.get(j)));
            }

        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
			BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
			//必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
            //2将自定义的MyMapper和MyReducer组装在一起
            Configuration conf=new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
            if(otherArgs.length < 2){
                System.out.println("Usage: SingleTableRelation <in> [<in>...] <out>");
                System.exit(2);
            }
            String jobName=SingleTableRelation.class.getSimpleName();
            //1首先寫job,知道需要conf和jobname在去創建即可
             Job job = Job.getInstance(conf, jobName);
	        job.setJarByClass(SingleTableRelation.class);
	        job.setMapperClass(Map.class);
	        job.setReducerClass(Reduce.class);
	        job.setOutputKeyClass(Text.class);
	        job.setOutputValueClass(Text.class);
	        //设定输入输出路径
	        for (int i = 0; i < otherArgs.length-1;++i){
	            FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
	        }
	        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));       
	        System.exit((job.waitForCompletion(true) ? 0 : 1));
    }
}


 三、总结

hadoop 是一个分布式的基础架构,利用分布式实现高效的计算与储存,最核心的设计在于 HDFS MapReduce
HDFS 在集群上实现了分布式文件系统, MapReduce 则在集群上实现了分布式计算和任务处理。HDFS MapReduce 任务处理过程中提供了对文件操作和存储等的支持。而MapReduce在 HDFS 的基础上实现任务的分发、跟踪和执行等工作,并收集结果,两种相互作用,完成了 Hadoop 分布式集群的主要任务。
通过这四个实战的题目我进一步掌握了 Hadoop 架构在现实生活中的应用。


http://www.niftyadmin.cn/n/1714403.html

相关文章

快递管理系统版本二(添加Exception)

代码资源&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1CKeXKaOdIcYIh5_sQ9XgcQ 提取码&#xff1a;3qau

快递管理系统版本三(添加list集合)

链接&#xff1a;https://pan.baidu.com/s/13g92YC5Hgbpp3rPyNhZkLg 提取码&#xff1a;a911

快递管理系统版本四(添加IO)

链接&#xff1a;https://pan.baidu.com/s/1WbXWWzjvmflzP9cu4YFe_w 提取码&#xff1a;zu26

XML解析(SAX, DOM, JDOM, DOM4J)

目录 1、面试题&#xff1a; 2、DOM4J解析XML文件 3、XPATH解析XML文件 1、面试题&#xff1a; 问: Java中有几种XML解析方式 ? 分别是什么 ? 有什么样的优缺点 ? 答: 四种. 1. SAX解析解析方式是事件驱动机制 ! SAX解析器, 逐行读取XML文件解析 , 每当解析到一个标签的开…

XML生成和XStream的使用

XML生成 步骤 案例 XStream的使用 快速的将Java中的对象, 转换为 XML字符串

Json和Json解析

目录 简介: 1、格式 2、 案例 Java与JSON 1、Gson 2、FastJson 简介: JSON: JavaScript Object Notation JS对象简谱 , 是一种轻量级的数据交换格式。1、格式 对象格式&#xff1a; 数组格式&#xff1a; 2、 案例 Java与JSON 做什么?将Java中的对象 快速的转换为 JSON格式…

Java枚举注解反射和内省

目录 1、枚举 1.1 简介 1.2 定义格式 1.3 枚举类的主要方法 1.4 实现接口的枚举类 1.5 注意事项 2、注解 2.1 简介 2.2 重点 2.3 内置注解 2.4 元注解 3、反射 3.1 简介 3.2 类加载器 3.3 所有类型的class对象 3.4 得到class的几种方式 3.5 获取Constructor 3…

前端技术(html)

目录 一、HTML 1. 概念 2. 快速入门 3. 开发工具 HBuilder 3.1 HBuilder介绍 3.2 HBuilder下载 3.3 HBuilder的安装与使用 3.4 使用HBuilder创建项目 4. HTML文档的基本结构 4.1 基本结构 4.2 HTML注释 5. HTML中常用标签 5.1.文本标签 5.2 图片标签 5.3列表标签 5.4…