三表相连 mapjoin

news/2024/5/20 5:31:08 标签: hadoop, 大数据, hdfs

三表相连 mapjoin

  • 要求
    • 输出的样式
    • 三张表
      • score.csv
      • student.csv
      • subject.csv
  • 创建三个类
  • StudentSc
    • getset方法
    • 实现类
  • MapJoinDriver
    • 用mapjoin不需要reduce
  • MapJoinMapper
  • 运行结果

要求

输出的样式

在这里插入图片描述

三张表

在这里插入图片描述

score.csv

在这里插入图片描述

student.csv

在这里插入图片描述

subject.csv

在这里插入图片描述

创建三个类

在这里插入图片描述

StudentSc

在这里插入图片描述

getset方法

插入getset方法,可用javabean插件一键生成

实现类

 public StudentSc(String stuName, String subName, Integer scScore, String flag) {
        this.stuName = stuName;
        this.subName = subName;
        this.scScore = scScore;
    }


    @Override
    public int compareTo(nj.zb.kb21.demo5.StudentScore o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(stuName);
        dataOutput.writeUTF(subName);
        dataOutput.writeInt(scScore);
    }

MapJoinDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(MapJoinDriver.class);

        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(StudentSc.class);
        job.setMapOutputValueClass(NullWritable.class);

        Path inPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\in\\demo6\\score.csv");
        FileInputFormat.setInputPaths(job,inPath);

        Path outPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\out7");
        FileSystem fs = FileSystem.get(outPath.toUri(), configuration);

        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }

        FileOutputFormat.setOutputPath(job,outPath);

        //设置Reduce阶段的任务数量
        job.setNumReduceTasks(0);

        //配置Map阶段的缓存,尽量使用小文件做缓存,如果文件太大,会引起OOM(内存溢出)
        Path cachePath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\in\\demo6\\student.csv");
        job.addCacheFile(cachePath.toUri());

        Path cachePath2 = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopexec\\in\\demo6\\subject.csv");
        job.addCacheFile(cachePath2.toUri());

        boolean result = job.waitForCompletion(true);
        System.out.println(result);
    }
}

用mapjoin不需要reduce

MapJoinMapper

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text, StudentSc, NullWritable> {

    Map<Integer, StudentSc> studentScMap = new HashMap<Integer, StudentSc>();
    Map<Integer, StudentSc> studentScMap2 = new HashMap<Integer, StudentSc>();

    @Override
    protected void setup(Mapper<LongWritable, Text, StudentSc, NullWritable>.Context context) throws IOException, InterruptedException {
        URI[] cacheFiles = context.getCacheFiles();
        for (URI uri : cacheFiles) {
            String currentFileName = new Path(uri).getName();
            if (currentFileName.startsWith("student")) {
                String path = uri.getPath();
                BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
                String line;
                while ((line = br.readLine()) != null) {
                    String[] fields = line.split(",");
                    StudentSc studentSc = new StudentSc(fields[1],"",0,"");
                    studentScMap.put(Integer.parseInt(fields[0]), studentSc);
> 这里按照要求将student的名字添加到studentScMap表中

                }
                br.close();
            }
            if (currentFileName.startsWith("subject")) {
                String path = uri.getPath();
                BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
                String line;
                while ((line = br.readLine()) != null) {
                    String[] fields = line.split(",");
                    StudentSc studentSc = new StudentSc("",fields[1],0,"");
                    studentScMap2.put(Integer.parseInt(fields[0]), studentSc);
>这里按照要求将subject的科目名字添加到studentScMap2表中

                }
                br.close();
            }
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, StudentSc, NullWritable>.Context context) throws IOException, InterruptedException {
        String[] scFields = value.toString().split(",");//这个集合读取的是driver中的inpath的表 score

        StudentSc currentStudent = studentScMap.get(Integer.parseInt(scFields[0]));
        StudentSc currentStudent2 = studentScMap2.get(Integer.parseInt(scFields[1]));

        StudentSc studentScs = new StudentSc();
        studentScs.setStuName(currentStudent.getStuName());
        studentScs.setFlag("0");//flag不重要,是我上一个项目多写的,懒得删
        studentScs.setSubName(currentStudent2.getSubName());
        studentScs.setScScore(Integer.parseInt(scFields[2]));

        context.write(studentScs, NullWritable.get());
    }
}

运行结果

在这里插入图片描述


http://www.niftyadmin.cn/n/49807.html

相关文章

20、CSS中单位:【px和%】【em和rem】【vw|vh|vmin|vmax】的区别

CSS中的px 和 % px (pixels) 是固定单位,也可以叫基本单位&#xff0c;代表像素&#xff0c;可以确保元素的大小不受屏幕分辨率的影响。 % (percentage) 是相对单位&#xff0c;代表元素大小相对于其父元素或视口&#xff08;viewport&#xff09;的大小的百分比。使用百分比可…

隐私计算概览

1、政策背景与应用驱动 1、国内政策法规 2022年1月&#xff0c;十四五规划&#xff1a;初步建立数据源要素市场体系&#xff0c;在保护数据安全和用户隐私前提下&#xff0c;参与数据价值开发。2022年3月&#xff0c;国务院《关于构建更加完善的要素市场化配置体制机制的意见…

18-考虑柔性负荷的综合能源系统低碳经济优化调度MATLAB程序

参考文献&#xff1a;考虑柔性负荷的综合能源系统低碳经济优化调度_薛开阳考虑用户侧柔性负荷的社区综合能源系统日前优化调度_刘蓉晖主要内容&#xff1a;基础模型参考刘蓉晖的论文&#xff0c;主要做了场景1、2、3&#xff1b;碳交易模型采用薛开阳论文中的。采用CPIEX求解某…

内网资源探测

✅作者简介&#xff1a;CSDN内容合伙人、信息安全专业在校大学生&#x1f3c6; &#x1f525;系列专栏 &#xff1a;内网安全 &#x1f4c3;新人博主 &#xff1a;欢迎点赞收藏关注&#xff0c;会回访&#xff01; &#x1f4ac;舞台再大&#xff0c;你不上台&#xff0c;永远是…

JVM——类加载器

目录一、类加载器类与类加载器启动类加载器拓展类加载器自定义类加载器一、类加载器 Java虚拟机设计团队有意把类加载阶段中的“通过一个类的全限定名来获取描述该类的二进制字节流”这个动作放到Java虚拟机外部去实现&#xff0c;以便让应用程序自己决定如何去获取所需的类。…

leetcode刷题---递归思想

leetcode刷题---递归思想&#xff09;1.1 递归介绍1.2 基本步骤1.3 代表题目1.3.1 入门题---青蛙跳1.3.2.1 初级题226.翻转二叉树112.路径总和1.3.3 中级题---汉诺塔问题1.3.4 进阶题---细胞分裂1.1 递归介绍 如果在函数中存在着调用函数本身的情况&#xff0c;这种现象就叫递…

使用Python脚本修改Maya ASCII文件路径方法

以下脚本修改当前项目路径和子文件夹中扩展名为“.ma”的所有文件&#xff0c;这样您就可以轻松地一次编辑所有文件。此脚本搜索特定字符串replace_This变量并将其替换为with_This&#xff0c;您可以使用它更改引用路径、纹理路径等… 话不多说直接上脚本&#xff1a; import…

lio-sam学习笔记(二)

前言&#xff1a; 对于lio-sam中IMU预积分源码的理解。预积分在imuPreintegration.cpp里面。 一、main函数 int main(int argc, char** argv) {ros::init(argc, argv, "roboat_loam");IMUPreintegration ImuP;TransformFusion TF;ROS_INFO("\033[1;32m----&g…