MapReduce找共同好友

介紹
<code>A:B,C,D,F,E,O
B:A,C,E,K

C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J/<code>

問題 >> 現在有一組日誌數據,A-O用戶,分別有各自關注的用戶,需要知道兩兩共同好用是那些用戶

下面使用MapReduce的方式來解決

先來用圖片的方式看看買個階段處理的結果過程和介紹

MapReduce找共同好友

第三列下面還有很多,省略顯示了

很明顯兩步計算得到了結果

先梳理下解決思路

  1. 找到用戶被哪些用戶關注了例如: A 被I,K,C,B,G,F,H,O,D 關注了
  2. 兩兩關注的人就有共同的用戶例如: I和K的共同好友就是A, C和I的共同好友就是A
代碼環境配置
<code>A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J/<code>

用MapReduce解決此問題,需要也是需要兩個步驟

第一步

需要從0環節中處理成1環節的結果

第一次Map
<code>package com.tangf.friend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FriendMapper1 extends Mapper<longwritable> {

private Text k = new Text();
private Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] split = value.toString().split(":");
v.set(split[0]);
final String[] men = split[1].split(",");
for (String man : men) {
k.set(man);
context.write(k, v);
}
}
}
​/<longwritable>/<code>

DEBUG

<code>A:B,C,D,F,E,O

行處理
1.用:分割 成兩個值
2.第一個值就是關注的人
3.第二個值使用,分割的被關注的人
4.輸出結果
B A
C A
D A
F A
E A
O A

第一個Map的輸出結果是:
B A
C A

D A
F A
E A
O A
A B
C B
E B
K B
F C
A C
D C
I C
A D
E D
F D
L D
B E
C E
D E
M E
L E
A F
B F
C F
D F
E F
O F
M F
A G
C G
D G
E G
F G
A H
C H
D H
E H
O H
A I
O I
B J
O J
A K
C K
D K
D L
E L
F L
E M
F M

G M
A O
H O
I O
J O
​/<code>
第一次Reduce
<code>package com.tangf.friend;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FriendReducer extends Reducer<text> {

private Text v = new Text();
private StringBuffer sb = new StringBuffer();

@Override
protected void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException {

sb.delete(0, sb.length());
for (Text value : values) {
sb.append(value).append(",");
}
v.set(sb.toString());
context.write(key, v);

}
}
​/<text>/<text>/<code>

DEBUG

<code>A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,

O A,H,I,J,F,/<code>
第二步
第二次Map
<code>package com.tangf.friend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FriendMapper2 extends Mapper<longwritable> {

private Text k = new Text();
private Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] split = value.toString().split("\\t");
v.set(split[0]);

final String[] ss = split[1].split(",");
for (int i = 0; i < ss.length; i++) {
for (int j = i + 1; j < ss.length; j++) {
if (ss[i].compareTo(ss[j]) > 0) {
k.set(ss[j] + "-" + ss[i]);
} else {
k.set(ss[i] + "-" + ss[j]);
}
context.write(k, v);
}
}

}
}
​/<longwritable>/<code>

DEBUG

<code>I-K A
C-I A
B-I A
G-I A
F-I A
H-I A
I-O A
D-I A

C-K A
B-K A
G-K A
F-K A
H-K A
K-O A
D-K A
B-C A
C-G A
C-F A
C-H A
C-O A
C-D A
B-G A
B-F A
B-H A
B-O A
B-D A
F-G A
G-H A
G-O A
D-G A
F-H A
F-O A
D-F A
H-O A
D-H A
D-O A
A-F B
A-J B
A-E B
F-J B
E-F B
E-J B
A-E C
A-B C
A-H C
A-F C
A-G C
A-K C
B-E C
E-H C
E-F C
E-G C
E-K C
B-H C
B-F C
B-G C
B-K C
F-H C

G-H C
H-K C
F-G C
F-K C
G-K C
C-G D
G-K D
A-G D
G-L D
F-G D
E-G D
G-H D
C-K D
A-C D
C-L D
C-F D
C-E D
C-H D
A-K D
K-L D
F-K D
E-K D
H-K D
A-L D
A-F D
A-E D
A-H D
F-L D
E-L D
H-L D
E-F D
F-H D
E-H D
G-M E
G-L E
G-H E
A-G E
F-G E
B-G E
D-G E
L-M E
H-M E
A-M E
F-M E
B-M E
D-M E
H-L E
A-L E
F-L E
B-L E

D-L E
A-H E
F-H E
B-H E
D-H E
A-F E
A-B E
A-D E
B-F E
D-F E
B-D E
L-M F
D-L F
C-L F
G-L F
A-L F
D-M F
C-M F
G-M F
A-M F
C-D F
D-G F
A-D F
C-G F
A-C F
A-G F
C-O I
D-E L
E-F M
A-H O
A-I O
A-J O
A-F O
H-I O
H-J O
F-H O
I-J O
F-I O
F-J O
​/<code>
第二次Reduce

和第一次Reduce代碼一致

<code>package com.tangf.friend;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

public class FriendReducer extends Reducer<text> {

private Text v = new Text();
private StringBuffer sb = new StringBuffer();

@Override
protected void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException {

sb.delete(0, sb.length());
for (Text value : values) {
sb.append(value).append(",");
}
v.set(sb.toString());
context.write(key, v);

}
}
​/<text>/<text>/<code>

DEBUG

<code>A-B C,E,
A-C D,F,
A-D F,E,
A-E B,D,C,
A-F C,B,O,D,E,
A-G F,D,E,C,
A-H D,O,E,C,
A-I O,
A-J O,B,
A-K D,C,
A-L E,D,F,
A-M F,E,
B-C A,
B-D E,A,
B-E C,
B-F E,A,C,
B-G C,A,E,
B-H E,C,A,
B-I A,
B-K C,A,
B-L E,
B-M E,
B-O A,
C-D F,A,
C-E D,
C-F D,A,

C-G D,F,A,
C-H A,D,
C-I A,
C-K D,A,
C-L D,F,
C-M F,
C-O I,A,
D-E L,
D-F A,E,
D-G E,A,F,
D-H E,A,
D-I A,
D-K A,
D-L E,F,
D-M E,F,
D-O A,
E-F B,M,D,C,
E-G D,C,
E-H D,C,
E-J B,
E-K D,C,
E-L D,
F-G C,D,A,E,
F-H A,E,O,C,D,
F-I O,A,
F-J O,B,
F-K A,C,D,
F-L E,D,
F-M E,
F-O A,
G-H A,D,E,C,
G-I A,
G-K C,A,D,
G-L F,D,E,
G-M F,E,
G-O A,
H-I A,O,
H-J O,
H-K A,D,C,
H-L D,E,
H-M E,
H-O A,
I-J O,
I-K A,
I-O A,
K-L D,
K-O A,
L-M F,E,
​/<code>
總結

Map用來映射新的結構,Reduce用來彙總結果


分享到:


相關文章: