Hadoop 3.x(MapReduce)----【MapReduce 框架原理 三】

news/2024/5/20 3:40:06 标签: hadoop, hdfs, 大数据, mapreduce

Hadoop 3.x(MapReduce)----【MapReduce 框架原理 三】

    • 1. OutputFormat接口实现类
    • 2. 自定义OutputFormat案例实操
      • 1. 需求
      • 2. 需求分析
      • 3. 案例实操
      • 4. 测试输出结果

1. OutputFormat接口实现类

OutputFormat 是 MapReduce 输出的基类,所有实现 MapReduce 输出都实现了 OutputFormat 接口。下面我们介绍几种常见的 OutputFormat 实现类。

1. OutputFormat 实现类
在这里插入图片描述

2. 默认输出格式 TextOutputFormat

3. 自定义 OutputFormat

  • 应用场景:输出数据到 MySql / HBase / Elasticsearch 等存储框架中。
  • 自定义 OutputFormat 步骤
  • 自定义一个类继承 FileOutputFormat。
  • 改写 RecordWriter,具体改写输出数据的方法 write()。

2. 自定义OutputFormat案例实操

1. 需求

过了输入的 log 日志,包含 atguigu 的网站输出到 e:/atguigu.log,不包含 atguigu 的网站输出到 e:/other.log

  1. 输入数据

在这里插入图片描述

  1. 期望输出数据

在这里插入图片描述

2. 需求分析

在这里插入图片描述

3. 案例实操

  1. 编写 LogMapper 类
package com.fickler.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author dell
 * @version 1.0
 */
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

        context.write(value, NullWritable.get());

    }
}

  1. 编写 LogReducer 类
package com.fickler.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author dell
 * @version 1.0
 */
public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {

        for (NullWritable value : values){
            context.write(key, NullWritable.get());
        }

    }
}

  1. 自定义一个 LogOutputFormat 类
package com.fickler.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author dell
 * @version 1.0
 */
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {


    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        LogRecordWrite lrw = new LogRecordWrite(taskAttemptContext);

        return lrw;
    }
}

  1. 编写 LogRecordWriter 类
package com.fickler.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
 * @author dell
 * @version 1.0
 */
public class LogRecordWrite extends RecordWriter<Text, NullWritable> {

    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    public LogRecordWrite(TaskAttemptContext job){

        try {
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
            atguiguOut = fileSystem.create(new Path("C:\\Users\\dell\\Desktop\\output\\atguigu.log"));
            otherOut = fileSystem.create(new Path("C:\\Users\\dell\\Desktop\\output\\other.log"));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }


    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {

        String log = text.toString();

        if (log.contains("atguigu")){
            atguiguOut.writeBytes(log + "\n");
        } else {
            otherOut.writeBytes(log + "\n");
        }

    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);

    }
}

  1. 编写 LogDriver 类
package com.fickler.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author dell
 * @version 1.0
 */
public class LogDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("C:\\Users\\dell\\Desktop\\input"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\dell\\Desktop\\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }

}

4. 测试输出结果

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


http://www.niftyadmin.cn/n/1680142.html

相关文章

何为S端子

何为S端子   目前有不少显卡都增加了视频输出&#xff0c;就安装的视频输出来看&#xff0c;主要有S端子输出和普通的复合视频输出。什么是S端子输出呢?有人将其称为“高清晰度输出”&#xff0c;这种说法自然也不能算错。下面笔者详细谈谈有关S端子输出的一些技术问题&…

Cocos2d-x编程中的runOnUiThread方法和runOnGLThread方法剖析

本文对于runOnGLThread方法的分析主要目的是为了帮助我的课程“拇指接龙游戏”学员更好地理解EasyNDK这个开源框架的使用。借助于这个框架&#xff0c;实现Android JAVA端与Cococ2d-x C端交互&#xff0c;以及iOS Objective-C端与Cococ2d-x C端交互将变得异常容易----极大地减少…

pythonmd5解密代码_python写一个md5解密器示例,pythonmd5解密器

python写一个md5解密器示例&#xff0c;pythonmd5解密器前言&#xff1a;md5解密&#xff0c;百度了一下发现教程不是很多也不详细。这个图都没一张。。。0x01windows环境&#xff0c;kali也可以啊burpsuiterequests模块bs4模块0x02:设置好代理开启burpsuite(我这是新版的burp)…

SqlServer获取表结构语句

--sql server 2005-- 1. 表结构信息查询 -- -- 表结构信息查询-- 邹建 2005.08(引用请保留此信息)-- SELECT TableNameCASE WHEN C.column_id1 THEN O.name ELSE N END, TableDescISNULL(CASE WHEN C.column_id1 THEN PTB.[value] END,N), Column_idC.column_id, ColumnNameC.…

python计算实例_python实例一

1、金字塔2、计算阶乘(1) list 普通的链表&#xff0c;初始化后可以通过特定方法动态增加元素。定义方式&#xff1a;arr [元素](2) Tuple 固定的数组&#xff0c;一旦定义后&#xff0c;其元素个数是不能再改变的。定义方式&#xff1a;arr (元素)(2) Dictionary 词典类型&a…

车展

自我感觉&#xff0c;从拍摄技术的掌握和熟练程度来说&#xff0c;依次是风光、花卉、静物、人物&#xff0c;所以拍摄人物的时候&#xff0c;始终心中都有些犯嘀咕&#xff0c;曾有朋友说我的片子太缺少人像了&#xff0c;自己也觉得&#xff0c;不能偏科跛腿啊~争取逐渐补上吧…

Hadoop 3.x(MapReduce)----【MapReduce 框架原理 六】

Hadoop 3.x&#xff08;MapReduce&#xff09;----【MapReduce 框架原理 六】1. 数据清洗&#xff08;ETL&#xff09;1. 要求2. 需求分析3. 实现代码2. MapReduce 开发总结1. 输入数据接口&#xff1a;InputFormat2. 逻辑处理接口&#xff1a;Mapper3. Partition分区4. Compar…

智能实验室-通用网络请求(Webio) 1.1.0.81 - 正式发布第一版

怎样获取最新版本&#xff1f;1.1.0.81□全新下载&#xff1a; 通用网络请求(Webio)&#xff1a;http://unruledboy.cnblogs.com/Files/unruledboy/Webio.zip智能实验室&#xff0d;通用网络请求(Webio) 是什么&#xff1f;Webio 通用网络请求是通用的HTTP请求软件&#xff0c;…