MapReduce将HDFS文本数据导入HBase中

news/2024/5/20 4:03:02 标签: Hadoop, MapReduce, HDFS, HBase, 分布式存储

HBase本身提供了很多种数据导入的方式,通常有两种常用方式:

  1. 使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase
  2. 另一种方式就是使用HBase原生Client API

本文就是示范如何通过MapReduce作业从一个文件读取数据并写入到HBase中。

首先启动HadoopHBase,然后创建一个空表,用于后面导入数据:

hbase(main):006:0> create 'mytable','cf'
0 row(s) in 10.8310 seconds

=> Hbase::Table - mytable
hbase(main):007:0> list
TABLE                                                                                                   
mytable                                                                                                 
1 row(s) in 0.1220 seconds

=> ["mytable"]
hbase(main):008:0> scan 'mytable'
ROW                         COLUMN+CELL                                                                 
0 row(s) in 0.2130 seconds

一、示例程序

下面的示例程序通过TableOutputFormatHDFS上具有一定格式的文本数据导入到HBase中。

首先创建MapReduce作业,目录结构如下:

Hdfs2HBase/
├── classes
└── src
    ├── Hdfs2HBase.java
    ├── Hdfs2HBaseMapper.java
    └── Hdfs2HBaseReducer.java

Hdfs2HBaseMapper.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Hdfs2HBaseMapper extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text line, Context context) throws IOException,InterruptedException {
                String lineStr = line.toString();
                int index = lineStr.indexOf(":");
                String rowkey = lineStr.substring(0, index);
                String left = lineStr.substring(index+1);
                context.write(new Text(rowkey), new Text(left));
        }
}

Hdfs2HBaseReducer.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class Hdfs2HBaseReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
        public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
                String k = rowkey.toString();
                for(Text val : value) {
                        Put put = new Put(k.getBytes());
                        String[] strs = val.toString().split(":");
                        String family = strs[0];
                        String qualifier = strs[1];
                        String v = strs[2];
                        put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
                        context.write(new ImmutableBytesWritable(k.getBytes()), put);
                }
        }
}

Hdfs2HBase.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hdfs2HBase {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage: wordcount <infile> <table>");
            System.exit(2);
        }

        Job job = new Job(conf, "hdfs2hbase");
        job.setJarByClass(Hdfs2HBase.class);
        job.setMapperClass(Hdfs2HBaseMapper.class);
        job.setReducerClass(Hdfs2HBaseReducer.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

配置javac编译依赖环境:

$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

这里要操作HBase,故除了上面三个jar包,还需要$HBASE_HOME/lib目录下的jar包。为了方便,我们在/etc/profileCLASSPATH里包含所有的依赖包:

TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$HBASE_JARS

编译

$ javac -d classes/ src/*.java

打包

$ jar -cvf hdfs2hbase.jar classes

运行

创建一个data.txt文件,内容如下(列族是建表时创建的列族cf):

r1:cf:c1:value1 
r2:cf:c2:value2 
r3:cf:c3:value3

将文件复制到hdfs上:

$ hadoop/bin/hadoop fs -put data.txt /hbase

运行MapReduce作业:

$ hadoop/bin/hadoop jar Hdfs2HBase/hdfs2hbase.jar com.lisong.hdfs2hbase.Hdfs2HBase /hbase/data.txt mytable

报错NoClassDefFoundError找不到类定义:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/io/ImmutableBytesWritable
    at com.lisong.hdfs2hbase.Hdfs2HBase.main(Hdfs2HBase.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    ...
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

原因是我没有把HBase的jar包加到hadoop-env.sh中。

TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
HADOOP_CLASSPATH=$HBASE_JARS

再次运行发现又报了Unable to initialize MapOutputCollector的错误:

15/08/10 08:55:44 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)
    ...
    at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapred.LocalJobRunner: map task executor complete.
15/08/10 08:55:44 WARN mapred.LocalJobRunner: job_local2138114942_0001
java.lang.Exception: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
    ...
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapreduce.Job: Job job_local2138114942_0001 failed with state FAILED due to: NA
15/08/10 08:55:45 INFO mapreduce.Job: Counters: 0

原因是我没有指明Map输出的Key/Value类型,在Hdfs2HBase.java中添加以下两句:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);     

如果没有专门定义Mapper输出类型的话,job.setOutputKeyClassjob.setOutputValueClass设置的是Mapper和Reducer两个的输出类型。

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);

而Hdfs2HBaseMapper输出类型是Text/Text,所以这里需要单独指定。


修改Hdfs2HBase.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hdfs2HBase {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage: wordcount <infile> <table>");
            System.exit(2);
        }

        Job job = new Job(conf, "hdfs2hbase");
        job.setJarByClass(Hdfs2HBase.class);
        job.setMapperClass(Hdfs2HBaseMapper.class);
        job.setReducerClass(Hdfs2HBaseReducer.class);

        job.setMapOutputKeyClass(Text.class);    // +
        job.setMapOutputValueClass(Text.class);  // +

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

再次编译、打包,然后运行成功!

查询HBase表,验证数据是否已导入:

hbase(main):001:0> scan 'mytable'
ROW                         COLUMN+CELL                                                                 
 r1                         column=cf:c1, timestamp=1439223857492, value=value1                         
 r2                         column=cf:c2, timestamp=1439223857492, value=value2                         
 r3                         column=cf:c3, timestamp=1439223857492, value=value3                         
3 row(s) in 1.3820 seconds

可以看到,数据导入成功!

由于需要频繁的与存储数据的RegionServer通信,占用资源较大,一次性入库大量数据时,TableOutputFormat效率并不好。


二、拓展-TableReducer

我们可以将Hdfs2HBaseReducer.java代码改成下面这样,作用是一样的:

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class Hdfs2HBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
    public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
        String k = rowkey.toString();
        for(Text val : value) {
            Put put = new Put(k.getBytes());
            String[] strs = val.toString().split(":");
            String family = strs[0];
            String qualifier = strs[1];
            String v = strs[2];
            put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
            context.write(new ImmutableBytesWritable(k.getBytes()), put);
        }
    }
}

这里直接继承了TableReducerTableReducer是部分特例化的Reducer,它只有三个类型参数:输入Key/Value是对应Mapper的输出,输出Key可以是任意的类型,但是输出Value必须是一个PutDelete实例。

编译打包运行,结果与前面的一样!






个人站点:http://songlee24.github.com


http://www.niftyadmin.cn/n/1091757.html

相关文章

如何设画面大小_如何剪辑视频号最佳视频画面尺寸?

视频号画面宽高比在16&#xff1a;9 [1080*608] 至6&#xff1a;7 [1080*1260] 之间&#xff0c;超过这两个尺寸&#xff0c;可以发&#xff0c;但会被裁剪。横屏视频建议宽高比4&#xff1a;3&#xff0c;分辨率1080*810&#xff1b;竖屏视频建议宽高比6&#xff1a;7&#xf…

Jmeter 04 JMeter 负载与监听

1. 场景设计 2. 场景设置 3. JMeter性能参数配置 4. 测试监听转载于:https://www.cnblogs.com/SH-xuliang/p/7993061.html

3个sprint的团队贡献分

第一次冲刺贡献分 成员名字贡献分101丘娟23108周诗琦26107杨晓霞24124陈程27 第二次冲刺贡献分 成员名字贡献分101丘娟23108周诗琦27107杨晓霞24124陈程26   第三次冲刺贡献分 成员名字贡献分101丘娟22108周诗琦26107杨晓霞25124陈程27 转载于:https://www.cnblogs.com/que…

语言 提取列名_运用R提取城市或站点空气质量历史数据

作者&#xff1a;陈天舒&#xff0c;山东大学环境科学&#xff08;大气化学&#xff09;博士在读本文的R语言技能&#xff1a;遍历文件在研究大气科学的过程中我们经常需要用到某些城市、某些站点在某些时间段内的空气质量数据。全国空气质量历史数据 | 北京市空气质量历史数据…

嵌套的JsonObject与JSONArray的取值---JSON中嵌套JSONArray

在复杂的JSON数据的格式中&#xff0c;往往会对JSON数据进行嵌套&#xff0c;这样取值会比之前的取值稍微复杂一点&#xff0c;但是只要思路清晰&#xff0c;其实取法还是一样的。就跟if else语句一样&#xff0c;如果if中套if&#xff0c;if中再套if&#xff0c;写的规范了还行…

python数据分析师职业技能_数据分析师必须具备的10项基本技能

「Data Analyst数据分析师」数据分析在不断发展&#xff0c;因此掌握其基本的技术和软技能将帮助您在数据分析家的职业生涯中取得成功&#xff0c;并追求诸如深度学习和人工智能等先进概念。数据分析是一个广阔的领域&#xff0c;包括几个细分领域&#xff0c;例如数据准备和探…

-ROOT-表和.META.表结构详解

在《HBase技术简介》中我们知道&#xff0c;HBase中有两个特殊的表&#xff1a;-ROOT-和.META.。 由于HBase中的表可能非常大&#xff0c;故HBase会将表按行分成多个region&#xff0c;然后分配到多台RegionServer上。数据访问的整个流程如下图所示&#xff1a; 注意两点&…

Knockout应用开发指南 第三章:绑定语法(3)

12 value 绑定 目的 value绑定是关联DOM元素的值到view model的属性上。主要是用在表单控件<input>&#xff0c;<select>和<textarea>上。 当用户编辑表单控件的时候&#xff0c; view model对应的属性值会自动更新。同样&#xff0c;当你更新view model属…