採集→清洗→處理:基於MapReduce的離線數據分析

采集→清洗→处理:基于MapReduce的离线数据分析

一、大數據處理的常用方法

大數據處理目前比較流行的是兩種方法,一種是離線處理,一種是在線處理,基本處理架構如下:

采集→清洗→处理:基于MapReduce的离线数据分析

在互聯網應用中,不管是哪一種處理方式,其基本的數據來源都是日誌數據,例如對於Web應用來說,則可能是用戶的訪問日誌、用戶的點擊日誌等。

如果對於數據的分析結果在時間上有比較嚴格的要求,則可以採用在線處理的方式來對數據進行分析,如使用Spark、Storm等進行處理。比較貼切的一個例子是天貓雙十一的成交額,在其展板上,我們看到交易額是實時動態進行更新的,對於這種情況,則需要採用在線處理。

當然,如果只是希望得到數據的分析結果,對處理的時間要求不嚴格,就可以採用離線處理的方式,比如我們可以先將日誌數據採集到HDFS中,之後再進一步使用MapReduce、Hive等來對數據進行分析,這也是可行的。

本文主要分享對某個電商網站產生的用戶訪問日誌(access.log)進行離線處理與分析的過程,基於MapReduce的處理方式,最後會統計出某一天不同省份訪問該網站的UV與PV。

二、生產場景與需求

在我們的場景中,Web應用的部署是如下的架構:

采集→清洗→处理:基于MapReduce的离线数据分析

即比較典型的Nginx負載均衡+KeepAlive高可用集群架構,在每臺Web服務器上,都會產生用戶的訪問日誌,業務需求方給出的日誌格式如下:

采集→清洗→处理:基于MapReduce的离线数据分析

其每個字段的說明如下:

appid ip mid userid login_type request status http_referer user_agent time

其中:

appid包括:web:1000,android:1001,ios:1002,ipad:1003

mid:唯一的id此id第一次會種在瀏覽器的cookie裡。如果存在則不再種。作為瀏覽器唯一標示。移動端或者pad直接取機器碼。

login_type:登錄狀態,0未登錄、1:登錄用戶

request:類似於此種 "GET /userList HTTP/1.1"

status:請求的狀態主要有:200 ok、404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等

http_referer:請求該url的上一個url地址。

user_agent:瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36"

time:時間的long格式:1451451433818。

根據給定的時間範圍內的日誌數據,現在業務方有統計出每個省每日訪問的PV、UV的需求。

三、數據採集:獲取原生數據

數據採集工作由運維人員來完成,對於用戶訪問日誌的採集,使用的是Flume,並且會將採集的數據保存到HDFS中,其架構如下:

采集→清洗→处理:基于MapReduce的离线数据分析

可以看到,不同的Web Server上都會部署一個Agent用於該Server上日誌數據的採集,之後,不同Web Server的Flume Agent採集的日誌數據會下沉到另外一個被稱為Flume Consolidation Agent(聚合Agent)的Flume Agent上,該Flume Agent的數據落地方式為輸出到HDFS。

在我們的HDFS中,可以查看到其採集的日誌:

采集→清洗→处理:基于MapReduce的离线数据分析

後面我們的工作正是要基於Flume採集到HDFS中的數據做離線處理與分析。

四、數據清洗:將不規整數據轉化為規整數據

1數據清洗目的

剛剛採集到HDFS中的原生數據,我們也稱為不規整數據,即目前來說,該數據的格式還無法滿足我們對數據處理的基本要求,需要對其進行預處理,轉化為我們後面工作所需要的較為規整的數據,所以這裡的數據清洗,其實指的就是對數據進行基本的預處理,以方便我們後面的統計分析,所以這一步並不是必須的,需要根據不同的業務需求來進行取捨,只是在我們的場景中需要對數據進行一定的處理。

2數據清洗方案

原來的日誌數據格式是如下的:

但是如果需要按照省份來統計UV、PV,其所包含的信息還不夠,我們需要對這些數據做一定的預處理,比如需要,對於其中包含的IP信息,我們需要將其對應的IP信息解析出來;為了方便我們的其它統計,我們也可以將其Request信息解析為Method、request_url、http_version等,所以按照上面的分析,我們希望預處理之後的日誌數據包含如下的數據字段:

appid;

ip;

//通過ip來衍生出來的字段 province和city

province;

city;

mid;

userId;

loginType;

request;

//通過request 衍生出來的字段 method request_url http_version

method;

requestUrl;

httpVersion;

status;

httpReferer;

userAgent;

//通過userAgent衍生出來的字段,即用戶的瀏覽器信息

browser;

time;

即在原來的基礎上,我們增加了其它新的字段,如Province、City等。

我們採用MapReduce來對數據進行預處理,預處理之後的結果,我們也是保存到HDFS中,即採用如下的架構:

采集→清洗→处理:基于MapReduce的离线数据分析

3數據清洗過程:MapReduce程序編寫

數據清洗的過程主要是編寫MapReduce程序,而MapReduce程序的編寫又分為寫Mapper、Reducer、Job三個基本的過程。但是在我們這個案例中,要達到數據清洗的目的,實際上只需要Mapper就可以了,並不需要Reducer,原因很簡單,我們只是預處理數據,在Mapper中就已經可以對數據進行處理了,其輸出的數據並不需要進一步經過Redcuer來進行彙總處理。

所以下面就直接編寫Mapper和Job的程序代碼:

  • AccessLogCleanMapper

采集→清洗→处理:基于MapReduce的离线数据分析
  • AccessLogCleanJob

采集→清洗→处理:基于MapReduce的离线数据分析
  • 執行MapReduce程序

將上面的mr程序打包後上傳到我們的Hadoop環境中,這裡,對2018-04-08這一天產生的日誌數據進行清洗,執行如下命令:

yarn jar data-extract-clean-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar\

cn.xpleaf.dataClean.mr.job.AccessLogCleanJob \hdfs://ns1/input/data-clean/access/2018/04/08 \hdfs://ns1/output/data-clean/access

觀察其執行結果:

采集→清洗→处理:基于MapReduce的离线数据分析

可以看到MapReduce Job執行成功。

4數據清洗結果

上面的MapReduce程序執行成功後,可以看到在HDFS中生成的數據輸出目錄:

采集→清洗→处理:基于MapReduce的离线数据分析

我們可以下載其中一個結果數據文件,並用Notepadd++打開查看其數據信息:

采集→清洗→处理:基于MapReduce的离线数据分析

五、數據處理:統計分析規整數據

經過數據清洗之後,就得到了我們做數據的分析統計所需要的比較規整的數據,下面就可以進行數據的統計分析了,即按照業務需求,統計出某一天中每個省份的PV和UV。

我們依然是需要編寫MapReduce程序,並且將數據保存到HDFS中,其架構跟前面的數據清洗是一樣的:

采集→清洗→处理:基于MapReduce的离线数据分析

1數據處理思路:編寫MapReduce程序

現在我們已經得到了規整的數據,關於在於如何編寫我們的MapReduce程序。

因為要統計的是每個省對應的PV和UV,PV就是點擊量,UV是獨立訪客量,需要將省相同的數據拉取到一起,拉取到一塊的這些數據每一條記錄就代表了一次點擊(PV + 1),這裡面有同一個用戶產生的數據(通過mid來唯一地標識是同一個瀏覽器,用mid進行去重,得到的就是UV)。

而拉取數據,可以使用Mapper來完成,對數據的統計(PV、UV的計算)則可以通過Reducer來完成,即Mapper的各個參數可以為如下:

Mapper

而Reducer的各個參數可以為如下:

Reducer

2數據處理過程:MapReduce程序編寫

根據前面的分析,來編寫我們的MapReduce程序。

  • ProvincePVAndUVMapper

package cn.xpleaf.dataClean.mr.mapper;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* Mapper

* Reducer

*/

public class ProvincePVAndUVMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString;

String fields = line.split("\t");

if(fields == || fields.length != 16) {

return;

}

String province = fields[2];

String mid = fields[4];

context.write(new Text(province), new Text(mid));

  • ProvincePVAndUVReducer

package cn.xpleaf.dataClean.mr.reducer;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

import java.util.HashSet;

import java.util.Set;

/**

* 統計該標準化數據,產生結果

* 省 pv uv

* 這裡面有同一個用戶產生的數|據(通過mid來唯一地標識是同一個瀏覽器,用mid進行去重,得到的就是uv)

public class ProvincePVAndUVReducer extends Reducer {

private Set uvSet = new HashSet<>;

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

long pv = 0;

uvSet.clear;

for(Text mid : values) {

pv++;

uvSet.add(mid.toString);

}

long uv = uvSet.size;

String pvAndUv = pv + "\t" + uv;

context.write(key, new Text(pvAndUv));

  • ProvincePVAndUVJob

采集→清洗→处理:基于MapReduce的离线数据分析
  • 執行MapReduce程序

將上面的mr程序打包後上傳到我們的Hadoop環境中,這裡,對前面預處理之後的數據進行統計分析,執行如下命令:

yarn jar data-extract-clean-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar \

cn.xpleaf.dataClean.mr.job.ProvincePVAndUVJob \

hdfs://ns1/output/data-clean/access \

hdfs://ns1/output/pv-uv

觀察其執行結果:

采集→清洗→处理:基于MapReduce的离线数据分析

可以看到MapReduce Job執行成功。

3數據處理結果

采集→清洗→处理:基于MapReduce的离线数据分析

我們可以下載其結果數據文件,並用Notepadd++打開查看其數據信息:

采集→清洗→处理:基于MapReduce的离线数据分析

至此,就完成了一個完整的數據採集、清洗、處理的離線數據分析案例。

相關的代碼我已經上傳到GitHub,有興趣可以參考一下:https://github.com/xpleaf/data-extract-clean-analysis


分享到:


相關文章: