01.07 四個flume運行實例

基本認識:

大數據階段數據的收集來源, flume的收集數據一般是日誌,比如:網站日誌

flume是一個分佈式的,可靠的,可用的

flume可以做離線也可以做實時分析

collecting --》source --》數據採集來源

aggregating --》channel --》數據臨時緩存(只要數據被move了,那就不在存儲了)

moving --》sink --》數據的轉移

1、agent :source、channel、sink

(1)source:用於採集數據,將產生的數據流傳輸到Channel

(2)channel:連接 sources 和 sinks ,臨時緩存數據

(3)sink:從Channel收集數據,將數據寫到目標源

2、Events:

(1)是Flume數據傳輸的基本單元

(2)由header和載有數據的一個byte array構成,byte array字節數組:存儲真實的數據

(3)每一個事件的大小:deserializer.maxLineLength2048字節,編碼格式:UTF-8

一個source,可以綁定多個channel

一個sink,只能綁定一個channel


flume安裝:

準備安裝包

apache-flume-1.7.0-bin.tar.gz

解壓縮

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/

配置文件:flume-env.sh

mv flume-env.sh.template flume-env.sh

配置jdk

export JAVA_HOME=/opt/bigdata/jdk1.8

測試是否成功

bin/flume-ng version

flume的flume-ng命令

Usage: bin/flume-ng [options]...

例如一些提交任務的命令(熟悉下格式):

bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties

bin/flume-ng agent -c conf -n agent -f conf/test.properties

bin/flume-ng avro-client --conf conf --host host --port 8080


配置情況選擇:

1、flume安裝在hadoop集群中:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

2、flume安裝在hadoop集群中,而且還配置了HA:

(1)HDFS訪問入口變化

(2)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(3)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄

3、flume不在hadoop集群裡:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(2)還需要添加hadoop的core-site.xml和hdfs-site.xml拷貝到flume的conf目錄

(3)將hadoop的一些jar包添加到flume的lib目錄下(用的是什麼版本拷貝什麼版本)


運行官網案例:

編輯配置文件flume-test.properties(創建一個)

準備配置信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

查看系統有沒有安裝telnet:rpm -qa | grep telnet

沒有的就安裝: yum -y install telnet或yum -y install nc

文件log4j.properties顯示的是日誌信息配置

運行:

bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console

開啟一個窗口:telnet連接端口號

telnet masterhbase 44444 (卡在那是正常的,你可以隨意輸入信息)

輸入hello world

在flume中就可以看到數據

退出telnet:輸入ctrl + ] 然後輸入quit

運行實例一

需求:監控apache服務器的日誌,利用flume監控某一個文件

安裝httpd服務

yum -y install httpd

安裝完成之後,會有個目錄生成 /var/www/html

到/var/www/html這個目錄下 vim index.html [隨意輸入內容]

啟動服務: service httpd start

瀏覽網頁:輸入主機名[hostname]

日誌產生的路徑:/var/log/httpd/access_log

配置flume agent

source: exec

channel:memory

sink:hdfs

我們複製配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

一些配置說明


問題:hdfs上的文件一般數據文件大小要大,而且文件數量是要少


hdfs.rollInterval = 600 (這個地方最好還是設置一個時間)

hdfs.rollSize = 1048576 (1M,134217728-》128M)

hdfs.rollCount = 0

hdfs.minBlockReplicas = 1 (這個不設置的話,上面的參數有可能不會生效)


在hdfs文件上設置時間格式分層 年月日/時 每小時生成一個文件

hdfs.useLocalTimeStamp = true

hdfs.round = true

hdfs.roundValue= 1

hdfs.roundUnit = hour

將準備好的jar上傳到flume/lib中

運行

查看hdfs上,不斷刷新會有新的文件

查看下進程

運行實例二

利用flume監控某一個文件目錄,將目錄下滾動好的文件實時抽取到HDFS上

類型選擇

source:spooldir

channel:file

sink:hdfs

創建配置文件flume-spooldir.properties

編寫信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text


# Describe the channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置信息概念補充

1.source:spooldir(已經生成好的最終的數據文件)

(1)recursiveDirectorySearch 是否監視子目錄以查找要讀取的新文件

(2)includePattern 正則表達式,指定要包含的文件 (只.csv數據文件,是正則匹配)

(3)ignorePattern 正則表達式,指定要忽略的文件 (不抽取.csv數據文件,是正則匹配)

(4)缺點:不能對目錄文件進行修改,如果有追加內容的文本文件,是不允許的(有可能不會被抽取,有可能會有錯誤)


2.flume監控目錄,支持文件修改,並記錄文件狀態

(1)source:taildir (類似exec + spooldir的組合)

(2)filegroups :設置source組 可設置多個 filegroups = f1

(3)filegroups.:設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件

(4)positionFile:設置定位文件的位置,以JSON格式寫入給定位置文件上每個文件的最後讀取位置

3.Memory Channel是一個不穩定的channel,它在內存中存儲所有事件,

如果進程異常停止,內存中的數據將不能讓恢復,而且受內存大小的限制。

4.flie channel:是一個持久化的channel,數據安全並且只要磁盤空間足夠,它就可以將數據存儲到磁盤上

5.checkpointDir:檢查數據完整性,存放檢查點目錄,可以檢測出哪些數據已被抽取,哪些還沒有

6.dataDirs:存放數據的目錄,dataDirs可以是多個目錄,以逗號隔開,用獨立的多個磁盤上的多個目錄可以提高file channel的性能。

7.hdfs上數據默認是二進制的文件類型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改為DataStream(數據流)hdfs.writeFormat = Text 改為文本格式

9.當使用DataStream時候,文件不會被壓縮,不需要設置hdfs.codeC;當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值;hdfs.codeC壓縮編碼解碼器 --》snappy壓縮

10.batchSize默認值:100 每個批次刷新到HDFS上的events數量;


創建目錄

mkdir –p /data/logs

模擬數據

cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/

查看數據

運行

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties

查看下HDFS

運行實例三

將hive的一些jar拷貝過來 flume的lib目錄下

配置flume agent

source:netcat

channel:Memory

sink:hive

啟動hive的元數據服務:

/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &

創建庫和表 (表必須是CLUSTERED BY ,INTO BUCKETS)

create database flume_test;

use flume_test;

create table flume_user(

user_id int,

user_name string,

user_age int

)CLUSTERED BY (user_id) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

準備配置文件flume-sink-hive.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置概念補充

1.serializer: 負責解析事件中的字段並將它們映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的對象名稱直接映射到Hive表中具有相同名稱的列, 內部使用

org.apache.hive.hcatalog.data.JsonSerDe)


2.DELIMITED:

serializer.delimiter:傳入數據中的字段分隔符,用雙引號括起來,例如"\\t"

serializer.fieldnames:從輸入字段到hive表中的列的映射,指定為hive表列名稱的逗號分隔列表

serializer.serdeSeparator :輸出字段分隔符,單引號括起來,例如'\\t'

hive參數設置vim hive-site.xml:


<code><property>/<code>

<code> <name>hive.metastore.uris/<name>/<code>

<code> <value>thrift://masterhbase:9083/<value>/<code>

<code>

<code><property>/<code>

<code> <name>hive.txn.manager/<name>/<code>

<code> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager/<value>/<code>

<code>

<code><property>/<code>

<code> <name>hive.compactor.initiator.on/<name>/<code>

<code> <value>true/<value>/<code>

<code>

<code><property>/<code>

<code> <name>hive.compactor.worker.threads/<name>/<code>

<code> <value>1/<value>/<code>

<code>

<code><property>/<code>

<code> <name>hive.support.concurrency/<name>/<code>

<code> <value>true/<value>/<code>

<code>

<code><property>/<code>

<code> <name>hive.enforce.bucketing/<name>/<code>

<code> <value>true/<value>/<code>

<code>

<code><property>/<code>

<code> <name> hive.exec.dynamic.partition.mode/<name>/<code>

<code> <value>nonstrict/<value>/<code>

<code>

<code><property>/<code>

<code> <name>hive.in.test/<name>/<code>

<code> <value>true/<value>/<code>

<code>


解決報錯問題

(1)報錯:

Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

打開一部分事務支持

-》協同配置

hive.compactor.initiator.on=true; -》運行啟動程序和清除線程,用於打開所需參數的完整列表事務

hive.compactor.worker.threads=1; -》增加工作線程的數量將減少花費的時間

hive.support.concurrency=true; -》是否支持併發,默認是false

hive.enforce.bucketing=true; -》是否啟用bucketing,寫入table數據時會啟動分桶

hive.exec.dynamic.partition.mode=nonstrict; -》設置非嚴格模式

(2)啟動metastore時報錯:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下屬性:這個是用來創建COMPACTION_QUEUE這張表的


hive.in.test

true


(3)再啟動metastore時報錯:

Error rolling back: Can't call rollback when autocommit=true

去掉以下屬性:


hive.in.test

true

之前沒有安裝,先安裝

啟動flume agent

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

使用nc去連接,然後輸入數據,數據以製表符分割

Hive中可以看到數據

運行實例四(hive)

創建表

create table emp(

empno int,

ename string,

job string,

mgr int,

hiredate string,

sal double,

comm double,

deptno int

)CLUSTERED BY (empno) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

準備配置信息flume-sink-hive2.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

運行flume

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties

查看數據