HDFS和MapReduce综合实训

news/2024/5/20 1:30:08 标签: hdfs, mapreduce, hadoop

文章目录

  • 第1关:WordCount词频统计
    • 第2关:HDFS文件读写
    • 第3关:倒排索引
    • 第4关: 网页排序——PageRank算法


第1关:WordCount词频统计

测试说明
以下是测试样例:

测试输入样例数据集:文本文档test1.txt和test2.txt

文档test1.txt中的内容为:
tale as old as time
true as it can be
beauty and the beast

文档test2.txt中的内容为:
ever just the same
ever as before
beauty and the beast

预期输出result.txt文档中的内容为:
and 2
as 4
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1

import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
 
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
 
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
 
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
 
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
 
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
 
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

在这里插入图片描述

第2关:HDFS文件读写

编程要求
本关的编程任务是补全右侧代码片段中的代码,具体要求及说明如下:

在主函数main中已获取hadoop的系统设置,并在其中创建HDFS文件。在main函数中,指定创建文档路径(必须设置为/user/hadoop/myfile才能评测),输入内容必须是本关要求内容才能评测。
添加读取文件输出部分
本关只要求在指定区域进行代码编写,其他区域仅供参考请勿改动。
测试说明
本关无测试样例,直接比较文件内容确定输出是否为“china cstor cstor cstor china”

import java.io.IOException;
import java.sql.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfs {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
            System.out.println(fs.getUri());
        Path file = new Path("/user/hadoop/myfile");
        if (fs.exists(file)) {
             System.out.println("File exists.");
        } else
            {
           FSDataOutputStream outStream = fs.create(file);
           outStream.writeUTF("china cstor cstor cstor china");
         outStream.close();
        }
        FSDataInputStream inStream = fs.open(file);
        String data = inStream.readUTF();
        FileSystem hdfs = file.getFileSystem(conf);
        FileStatus[] fileStatus = hdfs.listStatus(file);
        for(FileStatus status:fileStatus)
        {
           System.out.println("FileOwer:"+status.getOwner());
           System.out.println("FileReplication:"+status.getReplication());
           System.out.println("FileModificationTime:"+new Date(status.getModificationTime()));
           System.out.println("FileBlockSize:"+status.getBlockSize());
        }
        System.out.println(data);
        System.out.println("Filename:"+file.getName());
        inStream.close();
        fs.close();
    }
}

在这里插入图片描述

第3关:倒排索引

编程要求
本关的编程任务是补全右侧代码片段中map和reduce函数中的代码,具体要求及说明如下:

在主函数main中已初始化hadoop的系统设置,包括hadoop运行环境的连接。
在main函数中,已经设置好了待处理文档路径(即input),以及结果输出路径(即output)。
在main函数中,已经声明了job对象,程序运行的工作调度已经设定好。
本关只要求在map和reduce函数的指定区域进行代码编写,其他区域请勿改动。
测试说明
测试输入样例数据集:文本文档test1.txt, test2.txt
在这里插入图片描述

文档test1.txt中的内容为:

tale as old as time
true as it can be
beauty and the beast

文档test2.txt中的内容为:

ever just the same
ever as before
beauty and the beast

预期输出文件result.txt的内容为:

import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
    public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> 
    {
        public void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException 
         
        {    
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            
            String word;
            IntWritable frequence=new IntWritable();
            int one=1;
            Hashtable<String,Integer>    hashmap=new Hashtable();
            StringTokenizer itr = new StringTokenizer(value.toString());
            for(;itr.hasMoreTokens(); ) 
            {   
                
                word=itr.nextToken();
                if(hashmap.containsKey(word)){
                    hashmap.put(word,hashmap.get(word)+1);
                }else{
                    hashmap.put(word, one);
                
                }
            
            }
            
            for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){
                word=it.next();
                frequence=new IntWritable(hashmap.get(word));
                Text fileName_frequence = new Text(fileName+"@"+frequence.toString());    
                context.write(new Text(word),fileName_frequence);
            }
            
        }
    }
    public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
        protected void reduce(Text key,Iterable<Text> values,Context context)
                        throws IOException ,InterruptedException{ 
         
            String fileName="";
            int sum=0;
            String num;
            String s;
            for (Text val : values) {
                    
                    s= val.toString();
                    fileName=s.substring(0, val.find("@"));
                    num=s.substring(val.find("@")+1, val.getLength());
                    sum+=Integer.parseInt(num);
            }
        IntWritable frequence=new IntWritable(sum);
        context.write(key,new Text(fileName+"@"+frequence.toString()));
        }
    }
    
    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> 
    {    @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException 
         {    Iterator<Text> it = values.iterator();
            StringBuilder all = new StringBuilder();
            if(it.hasNext())  all.append(it.next().toString());
            for(;it.hasNext();) {
                all.append(";");
                all.append(it.next().toString());                    
            }
            context.write(key, new Text(all.toString()));
        }
    }
    public static void main(String[] args) 
    {
        if(args.length!=2){
            System.err.println("Usage: InvertedIndex <in> <out>");
            System.exit(2);
        }
        
      try {
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                
                Job job = new Job(conf, "invertedindex");
                job.setJarByClass(InvertedIndex.class);
                job.setMapperClass(InvertedIndexMapper.class);
                job.setCombinerClass(InvertedIndexCombiner.class);
                job.setReducerClass(InvertedIndexReducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                
                System.exit(job.waitForCompletion(true) ? 0 : 1);
     
        } catch (Exception e) { 
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

第4关: 网页排序——PageRank算法

测试说明
输入文件格式如下:
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7

注:为了简化运算,已经对网页集关系进行了规整,并且给出了相应的初始PR值。
以第一行为例: 1表示网址(以tab键隔开),1.0为给予的初始pr值,2,3,4,5,6,7,8为从网址1指向的网址。
输出文件格式:
The origin result
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
The 1th result
1 0.150 1.121 _2 3 4 5 6 7 8
2 0.150 1.243 _3 4 5 6 7 8
3 0.150 1.526 _4 5 6 7 8
4 0.150 2.036 _5 6 7 8
5 0.150 2.886 _6 7 8
6 0.150 4.303 _7 8
7 0.150 6.853 _8
8 0.150 11.831 _1 2 3 4 5 6 7
The 2th result
1 0.150 1.587 _2 3 4 5 6 7 8
2 0.150 1.723 _3 4 5 6 7 8
3 0.150 1.899 _4 5 6 7 8
4 0.150 2.158 _5 6 7 8
5 0.150 2.591 _6 7 8
6 0.150 3.409 _7 8
7 0.150 5.237 _8
8 0.150 9.626 _1 2 3 4 5 6 7
The 3th result
1 0.150 1.319 _2 3 4 5 6 7 8
2 0.150 1.512 _3 4 5 6 7 8
3 0.150 1.756 _4 5 6 7 8
4 0.150 2.079 _5 6 7 8
5 0.150 2.537 _6 7 8
6 0.150 3.271 _7 8
7 0.150 4.720 _8
8 0.150 8.003 _1 2 3 4 5 6 7
The 4th result
1 0.150 1.122 _2 3 4 5 6 7 8
2 0.150 1.282 _3 4 5 6 7 8
3 0.150 1.496 _4 5 6 7 8
4 0.150 1.795 _5 6 7 8
5 0.150 2.236 _6 7 8
6 0.150 2.955 _7 8
7 0.150 4.345 _8
8 0.150 7.386 _1 2 3 4 5 6 7
The 5th result
1 0.150 1.047 _2 3 4 5 6 7 8
2 0.150 1.183 _3 4 5 6 7 8
3 0.150 1.365 _4 5 6 7 8
4 0.150 1.619 _5 6 7 8
5 0.150 2.000 _6 7 8
6 0.150 2.634 _7 8
7 0.150 3.890 _8
8 0.150 6.686 _1 2 3 4 5 6 7

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class PageRank {
 
  public static class MyMapper   extends Mapper<Object, Text, Text, Text>
  {
        private Text id = new Text();
        public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
        {
            String line = value.toString();
//判断是否为输入文件
            if(line.substring(0,1).matches("[0-9]{1}"))
            {
                  boolean flag = false;
                  if(line.contains("_"))
                  {
                        line = line.replace("_","");
                        flag = true;
                  }
//对输入文件进行处理
                  String[] values = line.split("\t");
                  Text t = new Text(values[0]);
                  String[] vals = values[1].split(" ");
                  String url="_";//保存url,用作下次计算
                  double pr = 0;
                  int i = 0;
                  int num = 0;
 
                  if(flag)
                  {
                      i=2;
                      pr=Double.valueOf(vals[1]);
                      num=vals.length-2;
                  }
                  else
                  {
                      i=1;
                      pr=Double.valueOf(vals[0]);
                      num=vals.length-1;
                  }
 
                  for(;i<vals.length;i++)
                  {
                      url=url+vals[i]+" ";
                      id.set(vals[i]);
                      Text prt = new Text(String.valueOf(pr/num));
                      context.write(id,prt);
                  }
                  context.write(t,new Text(url));
              }
          }
  }
 
  public static class MyReducer  extends Reducer<Text,Text,Text,Text>
  {
              private Text result = new Text();
              private Double pr = new Double(0);
 
         public void reduce(Text key, Iterable<Text> values,  Context context  ) throws IOException, InterruptedException
         {
              double sum=0;
              String url="";
 
//****请通过url判断否则是外链pr,作计算前预处理****//
/*********begin*********/
  for(Text val:values)  
              {  
                      //发现_标记则表明是url,否则是外链pr,要参与计算  
                  if(!val.toString().contains("_"))  
                  {  
                      sum=sum+Double.valueOf(val.toString());  
                  }  
                  else  
                 {  
                      url=val.toString();  
                  }  
              }  
              pr=0.15+0.85*sum;  
              String str=String.format("%.3f",pr);  
              result.set(new Text(str+" "+url));  
              context.write(key,result);  
 
 
/*********end**********/            
 
 
//****请补全用完整PageRank计算公式计算输出过程,q取0.85****//
/*********begin*********/
 
 
/*********end**********/    
 
          }
 }
 
    public static void main(String[] args) throws Exception
    {
             String paths="file:///tmp/input/Wiki0";//输入文件路径,不要改动
            String path1=paths;
            String path2="";
 
            for(int i=1;i<=5;i++)//迭代5次
              {
                System.out.println("This is the "+i+"th job!");
                System.out.println("path1:"+path1);
                System.out.println("path2:"+path2);
                Configuration conf = new Configuration();
                Job job = new Job(conf, "PageRank");
                path2=paths+i;    
                job.setJarByClass(PageRank.class);
                job.setMapperClass(MyMapper.class);
        //****请为job设置Combiner类****//
/*********begin*********/
job.setCombinerClass(MyReducer.class); 
 
/*********end**********/                    
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                FileInputFormat.addInputPath(job, new Path(path1));
                FileOutputFormat.setOutputPath(job, new Path(path2));
                path1=path2;      
             job.waitForCompletion(true);
            System.out.println(i+"th end!");
        }
      } 
 }

在这里插入图片描述



http://www.niftyadmin.cn/n/5328040.html

相关文章

使用ffmpeg进行视频截取

1 原始视频信息 通过ffmpeg -i命令查看视频基本信息 ffmpeg version 6.1-essentials_build-www.gyan.dev Copyright (c) 2000-2023 the FFmpeg developersbuilt with gcc 12.2.0 (Rev10, Built by MSYS2 project)configuration: --enable-gpl --enable-version3 --enable-sta…

无需编程,简单易上手的家具小程序搭建方法分享

想要开设一家家具店的小程序吗&#xff1f;现在&#xff0c;我将为大家介绍如何使用乔拓云平台搭建一个家具小程序&#xff0c;帮助您方便快捷地开展线上家具销售业务。 第一步&#xff0c;登录乔拓云平台进入商城后台管理页面。 第二步&#xff0c;在乔拓云平台的后台管理页面…

数据库mysql no.4

1.流程控制函数 ①if(条件表达式&#xff0c;表达式1&#xff0c;表达式2)&#xff1a; 如果条件表达式成立&#xff0c;返回表达式1&#xff0c;否则返回表达式2 case情况1 case 变量或表达式或字段 when 常量1 then 值1 when 常量2 then 值2 ... else 值n end case情况2…

使用vite框架封装vue3插件,发布到npm

目录 一、vue环境搭建 1、创建App.vue 2、修改main.ts 3、修改vite.config.ts 二、插件配置 1、创建插件 2、开发调试 3、打包配置 4、package.json文件配置 上一篇文章讲述使用vite《如何使用vite框架封装一个js库&#xff0c;并发布npm包》封装js库&#xff0c;本文将…

Ubuntu20.04安装配置OpenCV-Python库并首次执行读图

一、选择三方提供的预编译包安装&#xff1a; 可以从官网下载 OpenCV 的安装包&#xff0c;编译后使用&#xff1b;也可以直接使用第三方提供的预编译包 安装。显然后者不需要执行编译步骤&#xff0c;更便捷。选择由 PyPI 提供的 OpenCV 安装包&#xff0c;可以在 https://py…

【RT-DETR有效改进】ShapeIoU、InnerShapeIoU关注边界框本身的IoU(包含二次创新)

前言 大家好&#xff0c;我是Snu77&#xff0c;这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进&#xff0c;内容持续更新&#xff0c;每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本&#xff0c;同时修改内容也支持Re…

CentOS stream 9配置网卡

CentOS stream9的网卡和centos 7的配置路径&#xff1a;/etc/sysconfig/network-scripts/ifcfg-ens32不一样。 CentOS stream 9的网卡路径&#xff1a; /etc/NetworkManager/system-connections/ens32.nmconnection 方法一&#xff1a; [connection] idens32 uuid426b60a4-4…

12种常见的网络钓鱼

网络钓鱼是一种网络攻击&#xff0c;是指具有恶意动机的攻击者伪装欺骗人们并收集用户名或密码等敏感信息的一系列行为。由于网络钓鱼涉及心理操纵并依赖于人为失误(而不是硬件或软件漏洞)&#xff0c;因此被认定为是一种社会工程攻击。 1. 普通网络钓鱼&#xff08;群攻&…