Hadoop:MapReduce多路徑輸入與多文件輸出詳解

前言

我前段時間在完成一個公司業務時,遇到了一個這樣的需求:將HDFS上按每天每小時存儲的數據進行數據預處理,然後對應按天存儲在HDFS........由此可得,MapReduce的輸入路徑是:

<code>/user/data/yyyy/MM/dd/HH//<code>

每天有24小時,dd/目錄下有24個目錄,然後,對這24個目錄下的數據預處理,最後輸出到dd/目錄:

<code>/user/out/yyyy/MM/dd//<code>

在設計代碼的時候,發現FileInputFormat.addInputPath()難堪此大任,於是,我就通過APIs等資料,找到了FileInputFormat.setInputPaths()的解決方案。不過,我將在下面對MapReduce的輸入/輸出進行總結和介紹。

1.MapReduce多路徑輸入

1.1FileInputFormat.addInputPath(s)

FileInputFormat.addInputPath()是我們最常用的設置MapReduce輸入路徑的方法了。其實,FileInputFormat有兩個這樣的方法:

<code>static void addInputPath(Job job, Path path)

static void addInputPaths(Job job, String commaSeperatedPaths)/<code>

addInputPath()只能指定一個路徑,如果要想添加多個路徑需要多次調用該方法:

<code>FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.addInputPath(job, new Path(args[1]));
FileInputFormat.addInputPath(job, new Path(args[2]));/<code>

addInputPaths()可以指定多條路徑,而這多條路徑是用“,”分隔的一個字符串:

<code>String paths = strings[0] + "," + strings[1];
FileInputFormat.addInputPaths(job, paths);/<code>

這兩種方法的缺陷:

  1. 路徑必須是指向目的文件的,如:/user/yyyy/mm/dd,而文件實際存在/user/yyyy/mm/dd/00,/user/yyyy/mm/dd/01。
  2. 路徑中的目錄不能存在通配符,如:/user/yyyy/mm/dd/*/。(我在2個節點的虛擬機上是用小數據可行的,但是在公司的大集群用大數據是不可行的,所以建議不要用通配符)。
  3. 目的文件的文件格式和類型必須一樣,如:文件類型有CSV,RCFile。
  4. 所有文件都通過一個Mapper進行處理。
  5. 文件路徑過多,代碼冗餘增加。

1.2MultipleInputs.addInputPath

MultipleInputs的addInputPath有兩種定義方式:

<code>static void addInputPath(Job job, Path path, Class extends InputFormat> inputFormatClass)

static void addInputPath(Job job, Path path, Class extends InputFormat> inputFormatClass, Class extends Mapper> mapperClass)/<code>

前者不需要指定Mapper,所以所有文件都通過一個Mapper進行處理;

<code>MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class);/<code>

後者可以對不同的路徑指定不同的Mapper,故可以指定不同Mapper處理不同類型的文件。

<code>MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,
MultiPathMR.MultiMap1.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,
MultiPathMR.MultiMap2.class);/<code>

這兩種方法的缺陷:

路徑必須是指向目的文件的,如:/user/yyyy/mm/dd,而文件實際存在/user/yyyy/mm/dd/00,/user/yyyy/mm/dd/01。

文件路徑過多,代碼冗餘增加。

優勢:

可以處理不同類型或不同格式的文件,如:CSV,RCFile。

路徑中的目錄能存在通配符,如:/user/yyyy/mm/dd/*/。

可以指定不同的Mapper處理不同路徑下的文件。

1.3 FileInputFormat.setInputPaths

FileInputFormat有三個設置路徑的方法:

<code>static void setInputPathFilter(Job job, Class extends PathFilter> filter)

static void setInputPaths(Job job, Path... inputPaths)

static void setInputPaths(Job job, String commaSeparatedPaths)/<code>

這三個方法功能特別強大,可以匹配路徑上的通配符:

Hadoop:MapReduce多路徑輸入與多文件輸出詳解

所以,當讀路徑/user/yyyy/mm/dd下所有目錄的文件是可以簡單的寫成:

<code>/user/yyyy/mm/dd/*//<code>

因為第二個參數是Path... args,代表可以穿0個,1個或多個參數(數組);

<code>FileInputFormat.setInputPaths(job, new Path(strings[0]));/<code>

也可以像這樣:

<code>Path[] paths = {new Path(strings[0]), new Path(strings[1])};
FileInputFormat.setInputPaths(job, paths);/<code>

對於第三個方法的使用:

<code>String paths = strings[0] + "," + strings[1];
FileInputFormat.setInputPaths(job, paths);/<code>

這三種方法的缺陷:

  1. 目的文件的文件格式和類型必須一樣,如:文件類型有CSV,RCFile。
  2. 所有文件都通過一個Mapper進行處理。

優勢:

  1. 路徑必須是指向目的文件的,如:/user/yyyy/mm/dd,而文件實際存在/user/yyyy/mm/dd/00,/user/yyyy/mm/dd/01。
  2. 路徑中的目錄能存在通配符,如:/user/yyyy/mm/dd/*/。
  3. 由於能使用通配符,所以即使路徑過多,也不至於是代碼冗餘太多。

2多文件輸出

MapReduce可以定義多文件輸出,但是不能定義多目錄輸出。提供這種功能的是MultipleOutputs類。MultipleOutputs有三個write()方法:

<code>void write(KEYOUT key, VALUEOUT value, String baseOutputPath)

void write(String namedOutput, K key, V value)

void write(String namedOutput, K key, V value, String baseOutputPath)
/<code>

在用後兩個方法時,需要在調用FileOutputFormat. setOutputPath(job, new Path(args[1]))之前,先用addNamedOutput()方法定義namedOutput:MultipleOutputs.addNamedOutput(job, namedOutput, TextOutputFormat.class,Text.class, LongWritable.class);

下面是一個WordCount的簡單示例,輸入文件是:

<code>hello,world
hello,hadoop
hello,spark/<code>

MR代碼:

<code>public class MultiOutMR {

public static class MultiOutMapper extends Mapper<object> {

private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().trim().split(",");
for(String word : line){
outKey.set(word);
context.write(outKey, outValue);
}
}
}

public static class MultiOutReducer extends Reducer<text> {

private LongWritable count = new LongWritable();
private MultipleOutputs outputs;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
outputs = new MultipleOutputs(context);
}

@Override
protected void reduce(Text key, Iterable<intwritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
count.set(sum);

Configuration conf = context.getConfiguration();
String type = conf.get("type");
if(type.equalsIgnoreCase("namedOutput")) {
if(key.toString().equals("hello")) {
outputs.write("hello", key, count);
}
else {
outputs.write("IT", key, count);
}
}
else if(type.equalsIgnoreCase("baseOutputPath")){
outputs.write(key, count, key.toString());
}
else {
if(key.toString().equals("hello")) {
outputs.write("hello", key, count, key.toString());
}
else {
outputs.write("IT", key, count, key.toString());
}
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
outputs.close();
}
}
}/<intwritable>/<text>/<object>/<code>

Driver代碼:

<code>public class Driver extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
conf.set("type", strings[2]);

Job job = new Job(conf, "Multiple Output");
job.setJarByClass(Driver.class);
job.setMapperClass(MultiOutMR.MultiOutMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MultiOutMR.MultiOutReducer.class);

if(!strings[2].equalsIgnoreCase("baseOutputPath")){
MultipleOutputs.addNamedOutput(job, "hello", TextOutputFormat.class,
Text.class, LongWritable.class);
MultipleOutputs.addNamedOutput(job, "IT", TextOutputFormat.class,
Text.class, LongWritable.class);
}

FileInputFormat.addInputPath(job, new Path(strings[0]));
FileOutputFormat.setOutputPath(job, new Path(strings[1]));

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 3){
System.err.println("Usage: <output>");
System.out.println("Type:\\n" +
"namedOutput - the named output name.\\n" +
"baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath.\\n" +
"all - contains namedOutput and baseOutputPath.");
System.exit(1);
}

System.exit(ToolRunner.run(conf, new Driver(), otherArgs));
}
}/<output>/<code>


分享到:


相關文章: