过滤停用词
过滤停用词中的重点是设置全局变量,设置全局变量有两种方法,1.hdfs读取,将文件设置在所有节点都能访问的地方,适合文件较大类型 2.利用分布式缓存的方式,运行的时候会把文件复制在每个节点中,适合文件较小的类型
//利用分布式缓存方式设置全局变量
package wordcount;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException; //报错类
import java.util.Arrays;
import java.util.StringTokenizer; //StringTokenizer类,用于将空白字符作为分割符的类
import org.apache.hadoop.conf.Configuration;//Hadoop中用于读取配置信息的类
import org.apache.hadoop.fs.Path; //有关文件系统输入输出数据的类
import org.apache.hadoop.io.IntWritable; //封装定义了IntWritable类
import org.apache.hadoop.io.Text; //封装定义了Text类
import org.apache.hadoop.mapreduce.Job; //封装定义了Job类
import org.apache.hadoop.mapreduce.Mapper; //封装定义了Mapper类
import org.apache.hadoop.mapreduce.Reducer; //封装定义了Reducer类
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //文件输入要用到的类
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //文件输出要用到的类
import org.apache.hadoop.util.GenericOptionsParser; //GenericOptionsParser类,用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值
public class stopwords{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ //自定义的TokenizerMapper类,继承自前面导入的Mapper类
private final static IntWritable one = new IntWritable(1); //实例化了一个IntWritable类的one对象并赋值为常量1
private Text word = new Text(); //实例化了一个Text类的对象word
public void map(Object key, Text value, Context context //定义Map方法
) throws IOException, InterruptedException {
//这里说一下context类,它是Mapper的一个内部类,它用来与MapReduce系统进行通信,如把map的结果传给reduce处理。简单的说顶级接口用它在map或是reduce任务中跟踪task的状态,MapContext就是记录了map执行的上下文,在mapper类中,这个context可以存储一些job conf的信息,同时context作为了map和reduce执行中各个函数的一个桥梁,我们可以在map函数中处理这个信息
StringTokenizer itr = new StringTokenizer(value.toString());//实例化了一个以空白字符为分隔符的StringTokenizer类的对象itr,value.toString()是指将value转化为string类型的字符串,这是要StringTokenizer的内容
while (itr.hasMoreTokens()) {//如果判断还有下一个分隔符(空格)
word.set(itr.nextToken()); //则输出并返回之间的字符串给word
context.write(word, one); //context.write方法将(word,1)这样的二元组存入context中
}
}
}
public static class IntSumReducer //自定义的IntSumReducer类,继承自前面导入的Reducer类
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); //实例化了一个IntWritable类的result对象
int minFrequency;
public void setup(Context context)
{
Configuration jobconf = context.getConfiguration();
minFrequency = jobconf.getInt(“minFrequency”, -1);//将minFrequency的值默认设为-1
}
public void reduce(Text key, Iterable values,Context context//定义Reduce方法,这里迭代器(Iterator)是一种设计模式,它是一个对象,它可以遍历并选择序列(IntWritable)中的对象,而开发人员不需要了解该序列的底层结构。
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();//将该词的出现次数相加
}
Path[] cachefiles=context.getLocalCacheFiles();//获取设置在disrtibuted cachefile中的文件路径
String[] stop;
String temp="";
BufferedReader joinReader=new BufferedReader(new FileReader(cachefiles[0].toUri().getPath()));
try//将停用词文件分割
{
String line=null;
while((line=joinReader.readLine())!=null) temp+=line+" “;
}finally {joinReader.close();}
stop=temp.split(” ");
if(sum>minFrequency&&!Arrays.asList(stop).contains(key.toString()))
{
this.result.set(sum);
context.write(key, this.result);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//运行MapReduce程序前都要初始化Configuration,该类主要是读取MapReduce系统配置信息,这些信息包括hdfs还有MapReduce,也就是安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解为啥要这么做,这个是没有深入思考MapReduce计算框架造成,我们程序员开发MapReduce时候只是在填空,在map函数和reduce函数里编写实际进行的业务逻辑,其它的工作都是交给MapReduce框架自己操作的,但是至少我们要告诉它怎么操作啊,比如hdfs在哪里,MapReduce的jobstracker在哪里,而这些信息就在conf包下的配置文件里。
conf.setInt(“minFrequency”,Integer.parseInt(args[2]));//
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();//将 otherArgs赋值为运行配置输入的文件参数
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount […] ");
System.exit(2);
}//If的语句好理解,就是运行WordCount程序时候一定是两个参数,如果不是就会报错退出。至于第一句里的GenericOptionsParser类,它是用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值
Job job = Job.getInstance(conf, “stopwords”);//用Job.getInstance方法设置作业名为word count
job.setJarByClass(stopwords.class); //为job的输出数据设置Key类
job.addCacheFile(new Path(args[3]).toUri());//将输入的停用词文件添加到distributed cachefile中
job.setMapperClass(TokenizerMapper.class); //设置Mapper类(Map阶段使用)
job.setCombinerClass(IntSumReducer.class); //设置Combiner类(中间合并结果)
job.setReducerClass(IntSumReducer.class); //设置Reducer类(Reduce阶段使用)
job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类,规定Reduce输出的Key类型为Text
job.setOutputValueClass(IntWritable.class); //设置Reduce输出的Value类型为IntWritable
for (int i = 0; i < 1; ++i) { //设置输入路径
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[1]));//输出路径是第二个参数
System.exit(job.waitForCompletion(true) ? 0 : 1);//等待任务执行完毕退出
}
}
输出结果
2.hdfs
package wordcount;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class hdfs
{
public static class StopwordsMapper extends Mapper<Object,Text,Text,IntWritable>
{
private String localFiles;
private Set stopwords;
@Override
protected void setup(Context context)throws IOException, InterruptedException {//获取缓存文件路径的数组
// InputStream in = new URL(“hdfs://localhost:9000/user/hadoop/Stopwords/stopwords.txt”).openStream();另一种方法,这时不需要在主函数中设置停用词文件路径
stopwords=new TreeSet ();//必须加上这句话
Configuration conf = context.getConfiguration();
localFiles = conf.getStrings(“stopwords”)[0];
FileSystem fs = FileSystem.get(URI.create(localFiles), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(localFiles));
// 从hdfs中读取
InputStreamReader isr = new InputStreamReader(hdfsInStream, “utf-8”);
String line;
BufferedReader br = new BufferedReader(isr);
//读取数据
while ((line = br.readLine()) != null) {
StringTokenizer itr = new StringTokenizer(line);
// 得到停词表
stopwords.add(itr.nextToken());
}
}
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
StringTokenizer itr = new StringTokenizer(value.toString());//实例化了一个以空白字符为分隔符的StringTokenizer类的对象itr
String temp = new String();
final IntWritable one = new IntWritable(1);
while (itr.hasMoreTokens()) {//如果判断还有下一个分隔符(空格)
temp=itr.nextToken();
if (!stopwords.contains(temp)) {
Text word = new Text();
word.set(temp);
context.write(word, one);
}
}
}
}
public static class StopwordsReducer
extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable(); //实例化了一个IntWritable类的result对象
int minFrequency;
public void setup(Context context)
{
Configuration jobconf = context.getConfiguration();
minFrequency = jobconf.getInt(“minFrequency”, -1);//将minFrequency的值默认设为-1
}
public void reduce(Text key, Iterable values,Context context//定义Reduce方法,这里迭代器(Iterator)是一种设计模式,它是一个对象,它可以遍历并选择序列(IntWritable)中的对象,而开发人员不需要了解该序列的底层结构。
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();//将该词的出现次数相加
}
if(sum>minFrequency)
{
this.result.set(sum);
context.write(key, this.result);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 停词表所在的路径
conf.setStrings(“stopwords”,“hdfs://localhost:9000/user/hadoop/shujuji/input/stopwords.txt”);
String[] otherArgs = new String[]{“hdfs://localhost:9000/user/hadoop/input/lala”,“hdfs://localhost:9000/user/hadoop/shujuji/output8”};
conf.setInt(“minFrequency”,Integer.parseInt(args[0]));
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount […] ");
System.exit(2);
}
Job job = Job.getInstance(conf, “hdfs”);
job.setJarByClass(hdfs.class);
//设置分布式缓存文件
//job.addCacheFile(new URI(“hdfs://localhost:9000/user/hadoop/Stopwords/stopwords.txt”));
job.setMapperClass(StopwordsMapper.class);
job.setReducerClass(StopwordsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
结果截图: