大牛教你使用自定義Partition實現hadoop部分排序

排序在很多業務場景都要用到,今天本文介紹如何藉助於自定義Partition類實現hadoop部分排序。本文還是使用java和python實現排序代碼。

1、部分排序。

部分排序就是在每個文件中都是有序的,和其他文件沒有關係,其實很多業務場景就需要到部分排序,而不需要全局排序。例如,有個水果電商網站,要對每個月的水果的銷量進行排序,我們可以把reduce進程之後的文件分成12份,對應1到12月份。每個文件按照水果的銷量從高到底排序,1月份的排序和其他月份的排序沒有任何關係。

原始數據如下,有三個字段,第一個字段是水果名稱,第二個字段是銷售月份,第三個字段是銷售量,

Apple 201701 20

Pear 201701 30

Banana 201701 40

Orange 201701 90

Apple 201702 50

Pear 201702 60

Banana 201702 20

Orange 201702 10

Apple 201703 230

Pear 201703 302

Banana 201703 140

Orange 201703 290

Apple 201704 30

Pear 201704 102

Banana 201704 240

Orange 201704 190

經過部分排序後會生成12個文件,內容如下,銷量按照從高到低排序

Pear 302

Orange 290

Apple 230

Banana 140

實現思路:

1、自定義Partition類,因為一年有12個月 ,因此需要12個分區,同時在MapReduce入口類中要指定Partition類,以及partition的數量。

2、在map函數中將年月作為key值,value變為“Apple_20”的格式。

3、在reduce函數中比較每種水果的銷量,按照從高到低排序。

Java代碼如下,Map類:

1 public class PartSortMap extends Mapper {
2
3 public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
4 String line = value.toString();//讀取一行數據,數據格式為“Apple 201701 30”
5 String str[] = line.split(" ");//
6 //年月當做key值,因為要根據key值設置分區,而Apple+“_”+銷量當做value

7 context.write(new Text(str[1]),new Text(str[0] + "_" + str[2]));
8 }
9 }

自定義Partition類:

 1 public class PartParttition extends Partitioner {
2 public int getPartition(Text arg0, Text arg1, int arg2) {
3 String key = arg0.toString();
4 int month = Integer.parseInt(key.substring(4, key.length()));
5 if (month == 1) {
6 return 1 % arg2;
7 } else if (month == 2) {
8 return 2 % arg2;
9 } else if (month == 3) {
10 return 3 % arg2;
11 }else if (month == 4) {
12 return 4 % arg2;
13 }else if (month == 5) {
14 return 5 % arg2;
15 }else if (month == 6) {
16 return 6 % arg2;
17 }else if (month == 7) {
18 return 7 % arg2;
19 }else if (month == 8) {
20 return 8 % arg2;
21 }else if (month == 9) {
22 return 9 % arg2;
23 }else if (month == 10) {
24 return 10 % arg2;
25 }else if (month == 11) {
26 return 11 % arg2;
27 }else if (month == 12) {
28 return 12 % arg2;
29 }
30 return 0;
31 }
32 }

Reduce類:

 1 public class PartSortReduce extends Reducer {
2 class FruitSales implements Comparable{

3 private String name;//水果名字
4 private double sales;//水果銷量
5 public void setName(String name){
6 this.name = name;
7 }
8
9 public String getName(){
10 return this.name;
11 }
12 public void setSales(double sales){
13 this.sales = sales;
14 }
15
16 public double getSales() {
17 return this.sales;
18 }
19
20 @Override
21 public int compareTo(FruitSales o) {
22 if(this.getSales() > o.getSales()){
23 return -1;
24 }else if(this.getSales() == o.getSales()){
25 return 0;
26 }else {
27 return 1;
28 }
29 }
30 }
31
32 public void reduce(Text key, Iterable values,Context context)throws IOException,InterruptedException{
33 List fruitList = new ArrayList();
34
35 for(Text value: values) {
36 String[] str = value.toString().split("_");
37 FruitSales f = new FruitSales();
38 f.setName(str[0]);
39 f.setSales(Double.parseDouble(str[1]));
40 fruitList.add(f);
41 }
42 Collections.sort(fruitList);
43
44 for(FruitSales f : fruitList){
45 context.write(new Text(f.getName()),new Text(String.valueOf(f.getSales())));
46 }
47 }
48 }

入口類:

 1 public class PartSortMain {
2 public static void main(String[] args)throws Exception{
3 Configuration conf = new Configuration();
4 //獲取運行時輸入的參數,一般是通過shell腳本文件傳進來。
5 String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
6 if(otherArgs.length < 2){
7 System.err.println("必須輸入讀取文件路徑和輸出路徑");
8 System.exit(2);
9 }
10 Job job = new Job();
11 job.setJarByClass(PartSortMain.class);
12 job.setJobName("PartSort app");
13
14 //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來
15 FileInputFormat.addInputPath(job,new Path(args[0]));
16
17 //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中
18 FileOutputFormat.setOutputPath(job,new Path(args[1]));
19
20
21 job.setPartitionerClass(PartParttition.class);//設置自定義partition類
22 job.setNumReduceTasks(12);//設置為partiton數量
23 //設置實現了map函數的類
24 job.setMapperClass(PartSortMap.class);
25
26 //設置實現了reduce函數的類
27 job.setReducerClass(PartSortReduce.class);
28
29 //設置reduce函數的key值

30 job.setOutputKeyClass(Text.class);
31 //設置reduce函數的value值
32 job.setOutputValueClass(Text.class);
33
34 System.exit(job.waitForCompletion(true) ? 0 :1);
35 }
36 }

運行後會在hdfs中生成12個文件,如下圖所示:

大牛教你使用自定義Partition實現hadoop部分排序

大牛教你使用自定義Partition實現hadoop部分排序

查看其中的一個文件會看到如下的內容:

大牛教你使用自定義Partition實現hadoop部分排序

大牛教你使用自定義Partition實現hadoop部分排序

可以看到是按照銷量從高到低排序。

使用Python實現部分排序。

Python使用streaming的方式實現MapReduce,和Java方式不一樣,不能自定義Partition,但是可以在腳本文件中指定哪個字段用作partition,哪個字段用於排序。

下圖顯示數據經過部分排序之後,數據變化的過程。即原始數據,經過map函數,然後到reduce函數,最終在每個文件中按照銷量從高到底排序的過程:

大牛教你使用自定義Partition實現hadoop部分排序

大牛教你使用自定義Partition實現hadoop部分排序

上圖中的第一步是在map函數中將原始數據的第二列的“年月”轉換成“月”,當做partition,將銷量當做key,水果名當做value。第二步是經過MapReduce的排序之後到達Reduce函數之間的結果。第三步是在reduce函數中將map輸入的數據中將key當做reduce的value,將value當做reduce的key。

代碼如下:

map_sort.py

 1 #!/usr/bin/python
2 import sys
3 base_numer = 99999
4 for line in sys.stdin:
5 ss = line.strip().split(' ')
6 fruit = ss[0]
7 yearmm = ss[1]
8 sales = ss[2]
9 new_key = base_number - int(sales)
10 mm = yearmm[4:6]
11 print "%s\t%s\t%s" % (int(mm), int(new_key), fruit)

reduce_sort.py

1 #!/usr/bin/python
2 import sys
3 base_number = 99999
4 for line in sys.stdin:
5 idx_id, sales, fruit = line.strip().split('\t')
6 new_key = base_number - int(sales)
7 print '\t'.join([val, str(new_key)])

執行腳本如下:

run.sh

 1 set -e -x 

2 HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
3 STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
4 INPUT_FILE_PATH_A="/data/fruit.txt"
5 OUTPUT_SORT_PATH="/output_sort"
6 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
7 $HADOOP_CMD jar $STREAM_JAR_PATH \
8 -input $INPUT_FILE_PATH_A\
9 -output $OUTPUT_SORT_PATH \
10 -mapper "python map_sort.py" \
11 -reducer "python reduce_sort.py" \
12 -file ./map_sort.py \
13 -file ./red_sort.py \
14 -jobconf mapred.reduce.tasks=12 \
15 -jobconf stream.num.map.output.key.fields=2 \
16 -jobconf num.key.fields.for.partition=1 \
17 -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

-jobconf stream.num.map.output.key.fields=2 這行代碼用於指定排序的字段,數字2指定map函數輸出數據的第幾列用於排序,就是例子中的sales字段。

-jobconf num.key.fields.for.partition=1這行代碼指定partition字段,數字1指定map函數輸出數據的第一列用於分區。

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner這行代碼是調用hadoop streaming包中的分區類,實現分區功能。

實現streaming partition功能時這三行代碼必不可少。

總結:

實現hadoop部分排序主要是通過partition方式實現。

java語言使用自定義分區Partition類實現分區的功能,而streaming是通過KeyFieldBasedPartitioner類,然後在腳本文件中指定partition類的方式實現。


分享到:


相關文章: