征服Hadoop:Hadoop实战之单词计数程序wordcount

我们搭建好集群后,也运行了hadoop本身自带提供的单词测试程序,现在我们用Eclipse和mavenlai8手动编写一下单词计数程序并提交到hadoop上运行。

一、环境准备

参考我之前的博文搭建好hadoop完全分布式环境并且启动。主备eclipse和maven.

二、新建一个maven项目

用eclipse新建一个maven羡慕,在pom.xml中添加如下依赖:

<code><dependency>
<groupid>org.apache.hadoop/<groupid>
<artifactid>hadoop-common/<artifactid>
<version>2.8.5/<version>
/<dependency>
<dependency>
<groupid>org.apache.hadoop/<groupid>
<artifactid>hadoop-hdfs/<artifactid>
<version>2.8.5/<version>
/<dependency>
<dependency>
<groupid>org.apache.hadoop/<groupid>
<artifactid>hadoop-mapreduce-client-core/<artifactid>
<version>2.8.5/<version>
/<dependency>/<code>

因为要打包成可执行jar并且有第三方依赖,需要添加如下build

<code><build>
<plugins>
<plugin>
<artifactid>maven-compiler-plugin/<artifactid>
<configuration>
<source>1.6/<source>
<target>1.6/<target>
/<configuration>
/<plugin>
<plugin>
<groupid>org.apache.maven.plugins/<groupid>
<artifactid>maven-shade-plugin/<artifactid>
<version>1.4/<version>
<configuration>
<createdependencyreducedpom>false/<createdependencyreducedpom>

/<configuration>
<executions>
<execution>

<phase>package/<phase>

<goals>
<goal>shade/<goal>
/<goals>
<configuration>

<filters>
<filter>
<artifact>*:*/<artifact>
<excludes>
<exclude>META-INF/*.SF/<exclude>
<exclude>META-INF/*.DSA/<exclude>
<exclude>META-INF/*.RSA/<exclude>
/<excludes>
/<filter>
/<filters>
<transformers>
<transformer> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers/<resource>
/<transformer>

<transformer> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainclass>com.suibibk.App/<mainclass>
/<transformer>
<transformer> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas/<resource>
/<transformer>
/<transformers>
/<configuration>
/<execution>
/<executions>
/<plugin>
/<plugins>
/<build>/<code>

注意修改主方法入口,也就是main方法所在类,这样子程序就可以直接maven install打包了。

三、编写Mapper、Reducer和启动类

Mapreduce程序围绕着分而治之的思想来的,分就是Mapper程序,治就是Reducer程序,然后用一个启动类将job提交给集群运行即可。

1、项目结构
征服Hadoop:Hadoop实战之单词计数程序wordcount

2、启动类
<code>package com.suibibk;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class App {
/**
* 1. 业务逻辑相关信息通过job对象定义与实现 2. 将绑定好的job提交给集群去运行
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(App.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置业务逻辑Mapper类的输出key和value的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定要处理的数据所在的位置
FileSystem fs = FileSystem.get(conf);
String inputPath = args[0];
Path input = new Path(inputPath);
if(fs.exists(input)) {
FileInputFormat.addInputPath(job, input);
}
// 指定处理完成之后的结果所保存的位置
String outputPath = args[1];
Path output = new Path(outputPath);
//需要先删除,不然第二次执行会报错
fs.delete(output, true);

FileOutputFormat.setOutputPath(job, output);
// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}/<code>

注意在hadoop2中FileInputFormat所属的包为: org.apache.hadoop.mapreduce.lib.input.FileInputFormat。out也一样,不要搞错了,我这里直接把导入的包也黏贴上来。

3、Mapper
<code>package com.suibibk;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<longwritable>{
// map方法的生命周期: 框架每传一行数据就被调用一次
protected void map(LongWritable key, Text value,Context context) throws IOException ,InterruptedException {
String line = value.toString(); // 行数据转换为string
String[] words = line.split(" "); // 行数据分隔单词
for (String word : words) { // 遍历数组,输出
context.write(new Text(word), new IntWritable(1));
}
}
}/<longwritable>/<code>
4、Reducer
<code>package com.suibibk;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<text>{
// 生命周期:框架每传递进来一个kv 组,reduce方法被调用一次
@Override
protected void reduce(Text key, Iterable<intwritable> values,

Reducer<text>.Context context) throws IOException, InterruptedException {
int count = 0; // 定义一个计数器
for (IntWritable value : values) { // 遍历所有v,并累加到count中
count += value.get();
}
context.write(key, new IntWritable(count));
}
}/<text>/<intwritable>/<text>/<code>

四、提交测试

1、项目右键执行maven install(package也可以)

然后再target中获得jar包。

2、上传到hadoop集群的一台机中

我这里是上传到worker1中。

4、执行测试

测试之前得先准备一下输入文件,这里用file2.txt来,然后执行如下命令:

<code> hadoop jar wordcount-0.0.1-SNAPSHOT.jar /input/file2.txt /output/<code>

执行成功后查看结果:

<code>hadoop hdfs -cat /output/*/<code>

会发现跟hadoop提供的例子结果一样。

完成。



分享到:


相關文章: