Hadoop-MapReduce之WordCount的实现

wordcount计数逻辑简单概述:

  • 从文件夹中读取文本文件,即源数据
  • 计数分为map阶段和reduce两阶段实现,map阶段将单词做分割,并将每个单词打上标记—><key:word,value:1>的形式.
  • 在reduce阶段,将key值相同的word做一个合并,key:hello, values:{1,1,1,1,1,1.....}并统计values中1的总数,经过循环遍历所所有的key,得到各个字母的计数。
  • 最后输出到文件中

代码片段

WordCountRunner.javaGithub-repo-code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.elon33.hadoop.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance();
// 设置job所使用的jar包
conf.set("mapreduce.job.jar", "wcount.jar");
// 设置wcjob中的资源所在的jar包
wcjob.setJarByClass(WordCountRunner.class);
// wcjob要使用哪个mapper类
wcjob.setMapperClass(WordCountMapper.class);
// wcjob要使用哪个reducer类
wcjob.setReducerClass(WordCountReducer.class);
// wcjob的mapper类输出的kv数据类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(LongWritable.class);
// wcjob的reducer类输出的kv数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(LongWritable.class);
// 指定要处理的原始数据所存放的路径
FileInputFormat.setInputPaths(wcjob, "./wc/srcdata/input1");
// 指定处理之后的结果输出到哪个路径
FileOutputFormat.setOutputPath(wcjob, new Path("./wc/outdata/output1"));
// Submit the job to the cluster and wait for it to finish.
boolean res = wcjob.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
WordCountMapper.javaGithub-repo-code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.elon33.hadoop.mapreduce.wordcount;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取到一行文件的内容
String line = value.toString();
//切分这一行的内容为一个单词数组
String[] words = StringUtils.split(line," ");
//遍历输出 <word,1>
for(String word:words){
context.write(new Text(word), new LongWritable(1));
}
}
}
WordCountReducer.javaGithub-repo-code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.elon33.hadoop.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
// key:hello, values:{1,1,1,1,1,1.....}
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// 定义一个累加计数器
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
// 输出<单词:count>键值对
context.write(key, new LongWritable(count));
}
}

这里对wordcount程序做DEBUG代码跟踪时的补充笔记
1) 对于Map程序,一次读取文件中的一行,key值的类型是LongWritable,存放的是起始坐标点[0,30,60,90…]
在此之间,程序有一个shuffle和sort的过程,将key值相同的键值对合并在一起,形成
2) 对于Reduce程序一次处理一个key值,将[value..]值累加,得到该key的统计值。有多少个不同的key,reduce程序就运行多少次,
但是因为是集群,可以并行运算和统计,速度还是很快的。

bug笔记

当目标hdfs地址不是配置文件中设置的地址时,

1
2
3
4
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop:9000/</value>
</property>

会报以下异常:

Exception in thread “main” java.net.ConnectException: Call From elon/192.168.16.1 to hadoop:9001 failed on connection exception: java.net.ConnectException: Connection refused: no further information;

0%