侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

MapReduce统计词语出现次数

2022-07-07 星期四 / 0 评论 / 0 点赞 / 57 阅读 / 12104 字

晨曦同学(Dota界号称利神)前段时间分享了这样一个问题:如何在一个很大的文件中(该文件包含了中英文)找出出现频率比较高的几个词呢?我们来分析一下。找出现频率比较高的词语,首先要有一个支持中文的分词器

晨曦同学(Dota界号称利神)前段时间分享了这样一个问题:如何在一个很大的文件中(该文件包含了中英文)找出出现频率比较高的几个词呢?我们来分析一下。找出现频率比较高的词语,首先要有一个支持中文的分词器(IK,庖丁解牛等等),这个问题不大;分词之后呢就要统计词语出现次数,类似于MapReduce程序中WordCount,这可是学习MapReduce的hello world程序呀,当然很容易搞定;最后还要来个排序,统计完了我们期望出现次数高的词语出现在前面,MapReduce默认就支持排序,也没问题。

解决这个问题需要两个Job,一个是统计Job,一个是排序Job。

统计Job的Mapper需要做的事情就是分词,这里我们选用IKanalyzer分词器,可能IK在官网上不好下载,我给大家准备好了,点此下载。分词之后,将每个单词个数置为1(跟WordCount程序一样)。

public static class AnalyzerMapper extends Mapper<Object, Text, Text, IntWritable> {	private final static IntWritable one = new IntWritable(1);    private Text word = new Text();    	@Override	protected void map(Object key, Text value,			Mapper<Object, Text, Text, IntWritable>.Context context)			throws IOException, InterruptedException {		breakupSentence(value.toString(), context);	}    	/**	 * 用分词器将一段话拆分成多个词。	 * 分出一个词就将数量置为1。	 * 	 * @param sentence	 * @param context	 * @throws IOException 	 * @throws InterruptedException 	 */	private void breakupSentence(String sentence, Mapper<Object, Text, Text,			IntWritable>.Context context) throws IOException, InterruptedException {		Analyzer analyzer = new IKAnalyzer(true);		TokenStream tokenStream = analyzer.tokenStream("content",				new StringReader(sentence));		tokenStream.addAttribute(CharTermAttribute.class);		while (tokenStream.incrementToken()) {			CharTermAttribute charTermAttribute = tokenStream					.getAttribute(CharTermAttribute.class);			word.set(charTermAttribute.toString());			context.write(word, one);		}	}    }

别忘了给IK设置停止词字典,过滤掉那些"了",”呢“,”啊“,”的“,"is", "and", "a" 之类的语气词、助词、连词、量词等。

IKAnalyzer.cfg.xml

<properties>  	<comment>IK Analyzer 扩展配置</comment>	<!--用户可以在这里配置自己的扩展字典 	<entry key="ext_dict">ext.dic;</entry> 	-->	<!--用户可以在这里配置自己的扩展停止词字典-->	<entry key="ext_stopwords">stopword.dic;chinese_stopword.dic</entry> 	</properties>

chinese_stopword.dic

的呢吧和......

统计Job的Reducer就是统计各个词语的出现次数,跟WordCount程序中的完全一致,不再烦述。我们可以将该Reducer设置为Job的CombinerClass,这样每次Mapper Task向Reducer Task传递数据时候,先执行Combiner,将结果先做个统计,减少了Mapper向Reducer的数据传输。

public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {	private IntWritable result = new IntWritable();		 @Override	 protected void reduce(Text key, Iterable<IntWritable> values,			Reducer<Text, IntWritable, Text, IntWritable>.Context context)			throws IOException, InterruptedException {				  int sum = 0;	      for (IntWritable val : values) {	        sum += val.get();	      }	      result.set(sum);	      context.write(key, result);    }	}


接下来再看排序Job,MapReduce任务是通过key来排序的,我们需要将词语出现的次数排序,所以需要先将统计Job的结果Key-Value互换,排序完成后,再换回来即可。

排序Job的Mapper将统计Job的结果Key-Value互换,代码如下:

public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {	private final static IntWritable wordCount = new IntWritable(1);	private Text word = new Text();	@Override	protected void map(Object key, Text value,			Mapper<Object, Text, IntWritable, Text>.Context context)			throws IOException, InterruptedException {		StringTokenizer tokenizer = new StringTokenizer(value.toString());	    while (tokenizer.hasMoreTokens()) {	    	String a = tokenizer.nextToken().trim();	        word.set(a);	        String b = tokenizer.nextToken().trim();	        wordCount.set(Integer.valueOf(b));	        context.write(wordCount, word);	    }	}	}

排序Job的Reducer任务就是再将Key-Value倒置过来。

public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {	private Text result = new Text();	@Override	protected void reduce(IntWritable key, Iterable<Text> values,			Reducer<IntWritable, Text, Text, IntWritable>.Context context)			throws IOException, InterruptedException {		for (Text val : values) {	        result.set(val.toString());	        context.write(result, key);	    }	}	}

Reducer默认排序是从小到大(数字),而我们期望出现次数多的词语排在前面,所以需要重写排序类WritableComparator。

public class DescWritableComparator extends WritableComparator {	protected DescWritableComparator() {		super(IntWritable.class, true);	}	@Override	public int compare(WritableComparable a, WritableComparable b) {		return -super.compare(a, b);	}	}

如果有多个Reducer任务,Reducer的默认排序只是对发送到该Reducer下的数据局部排序。如果想达到全局排序,需要我们手动去写partitioner。Partitioner的作用是根据不同的key,制定相应的规则分发到不同的Reducer中。

public static class SortPartitioner<K, V> extends Partitioner<K, V> {	@Override	public int getPartition(K key, V value, int numReduceTasks) {		int maxValue = 50;	    int keySection = 0;	    // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0	    if (numReduceTasks > 1 && key.hashCode() < maxValue) {	        int sectionValue = maxValue / (numReduceTasks - 1);	        int count = 0;	        while ((key.hashCode() - sectionValue * count) > sectionValue) {	            count++;	        }	        keySection = numReduceTasks - 1 - count;	    }	    return keySection;	}	}


最后就是链接MapReduce Job流,这里有两个Job,需要先执行统计Job,再执行排序Job。我们需要将统计Job的输出作为排序Job的输入。(友情提示:别忘了给统计Job设置Combiner哦,也别忘了给排序Job设置Comparator和Partitioner哦。

Job job1 = new Job(configuration, "key word analyzer");job1.setJarByClass(JobDefiner.class);job1.setMapperClass(AnalyzerMapper.class);job1.setCombinerClass(CountReducer.class);job1.setReducerClass(CountReducer.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));Path outPath1 = new Path(otherArgs[1]);FileOutputFormat.setOutputPath(job1, outPath1);job1.waitForCompletion(true);Job job2 = new Job(configuration, "result sort");job2.setJarByClass(JobDefiner.class);job2.setOutputKeyClass(IntWritable.class);job2.setOutputValueClass(Text.class);job2.setMapperClass(SortKeyWordHandler.SortMapper.class);job2.setReducerClass(SortKeyWordHandler.SortReducer.class);// key按照降序排列job2.setSortComparatorClass(DescWritableComparator.class);job2.setPartitionerClass(SortKeyWordHandler.SortPartitioner.class);FileInputFormat.addInputPath(job2, outPath1);FileOutputFormat.setOutputPath(job2, new Path(otherArgs[2]));job2.waitForCompletion(true);


大功告成?且慢!!在我的博客Eclipse远程调试Hadoop集群中,我们只讲了如何配置本地Eclipse如何远程调试Hadoop集群,在这里我们就演示一下如何去跑。

我们先上传两篇关于习大大的报道到hdfs上

bin/hadoop dfs -mkdir inputbin/hadoop dfs -put mupeng/files/test_chinese* input

刷一下Eclipse里面DFS Location就能看到

找到定义Job的main方法类,右键Run As=>Run Configurations ...

确认Project、Main class准确后,设置main方法的参数:统计Job的输入路径、统计Job的输出路径(同时也是排序Job的输入路径)、排序Job的输出路径。

hdfs://192.168.248.149:9000/user/mupeng/inputhdfs://192.168.248.149:9000/user/mupeng/output1hdfs://192.168.248.149:9000/user/mupeng/output2

设置好后,点击Run,在第二个输出路径中,我们看到结果(我这只有一个Reducer)

引用	20强调	16习近平	14斐济	13我们	13斐	12中国	11对	10中	10等	9中方	9为	9方	9......


最后提示大家:

本文相关源码下载地址(GitHub):点击查看

相关博客地址:Eclipse远程调试Hadoop集群

广告 广告

评论区