01.07 flume部署安装以及案例运行

基本认识:

大数据阶段数据的收集来源, flume的收集数据一般是日志,比如:网站日志

flume是一个分布式的,可靠的,可用的

flume可以做离线也可以做实时分析

collecting --》source --》数据采集来源

aggregating --》channel --》数据临时缓存(只要数据被move了,那就不在存储了)

moving --》sink --》数据的转移

1、agent :source、channel、sink

(1)source:用于采集数据,将产生的数据流传输到Channel

(2)channel:连接 sources 和 sinks ,临时缓存数据

(3)sink:从Channel收集数据,将数据写到目标源

2、Events:

(1)是Flume数据传输的基本单元

(2)由header和载有数据的一个byte array构成,byte array字节数组:存储真实的数据

(3)每一个事件的大小:deserializer.maxLineLength2048字节,编码格式:UTF-8

一个source,可以绑定多个channel

一个sink,只能绑定一个channel


flume安装:

准备安装包

apache-flume-1.7.0-bin.tar.gz

flume部署安装以及案例运行

解压缩

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/

flume部署安装以及案例运行

配置文件:flume-env.sh

flume部署安装以及案例运行

mv flume-env.sh.template flume-env.sh

flume部署安装以及案例运行

配置jdk

export JAVA_HOME=/opt/bigdata/jdk1.8

flume部署安装以及案例运行

测试是否成功

bin/flume-ng version

flume部署安装以及案例运行

flume的flume-ng命令

Usage: bin/flume-ng [options]...

flume部署安装以及案例运行

例如一些提交任务的命令(熟悉下格式):

bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties

bin/flume-ng agent -c conf -n agent -f conf/test.properties

bin/flume-ng avro-client --conf conf --host host --port 8080


配置情况选择:

1、flume安装在hadoop集群中:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

2、flume安装在hadoop集群中,而且还配置了HA:

(1)HDFS访问入口变化

(2)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(3)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

3、flume不在hadoop集群里:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(2)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

(3)将hadoop的一些jar包添加到flume的lib目录下(用的是什么版本拷贝什么版本)


运行官网案例:

编辑配置文件flume-test.properties(创建一个)

flume部署安装以及案例运行

准备配置信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安装以及案例运行

查看系统有没有安装telnet:rpm -qa | grep telnet

flume部署安装以及案例运行

没有的就安装: yum -y install telnet或yum -y install nc

flume部署安装以及案例运行

文件log4j.properties显示的是日志信息配置

flume部署安装以及案例运行

运行:

bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console

flume部署安装以及案例运行

开启一个窗口:telnet连接端口号

telnet masterhbase 44444 (卡在那是正常的,你可以随意输入信息)

flume部署安装以及案例运行

输入hello world

flume部署安装以及案例运行

在flume中就可以看到数据

flume部署安装以及案例运行

退出telnet:输入ctrl + ] 然后输入quit

flume部署安装以及案例运行

运行实例一

需求:监控apache服务器的日志,利用flume监控某一个文件

安装httpd服务

yum -y install httpd

flume部署安装以及案例运行

安装完成之后,会有个目录生成 /var/www/html

到/var/www/html这个目录下 vim index.html [随意输入内容]

flume部署安装以及案例运行

启动服务: service httpd start

flume部署安装以及案例运行

浏览网页:输入主机名[hostname]

flume部署安装以及案例运行

日志产生的路径:/var/log/httpd/access_log

flume部署安装以及案例运行

配置flume agent

source: exec

channel:memory

sink:hdfs

我们复制配置文件

flume部署安装以及案例运行

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安装以及案例运行

一些配置说明


问题:hdfs上的文件一般数据文件大小要大,而且文件数量是要少


hdfs.rollInterval = 600 (这个地方最好还是设置一个时间)

hdfs.rollSize = 1048576 (1M,134217728-》128M)

hdfs.rollCount = 0

hdfs.minBlockReplicas = 1 (这个不设置的话,上面的参数有可能不会生效)


在hdfs文件上设置时间格式分层 年月日/时 每小时生成一个文件

hdfs.useLocalTimeStamp = true

hdfs.round = true

hdfs.roundValue= 1

hdfs.roundUnit = hour

将准备好的jar上传到flume/lib中

flume部署安装以及案例运行

flume部署安装以及案例运行

运行

flume部署安装以及案例运行

查看hdfs上,不断刷新会有新的文件

flume部署安装以及案例运行

查看下进程

flume部署安装以及案例运行

运行实例二

利用flume监控某一个文件目录,将目录下滚动好的文件实时抽取到HDFS上

类型选择

source:spooldir

channel:file

sink:hdfs

创建配置文件flume-spooldir.properties

flume部署安装以及案例运行

编写信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text


# Describe the channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安装以及案例运行

配置信息概念补充

1.source:spooldir(已经生成好的最终的数据文件)

(1)recursiveDirectorySearch 是否监视子目录以查找要读取的新文件

(2)includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)

(3)ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)

(4)缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)


2.flume监控目录,支持文件修改,并记录文件状态

(1)source:taildir (类似exec + spooldir的组合)

(2)filegroups :设置source组 可设置多个 filegroups = f1

(3)filegroups.:设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件

(4)positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置

3.Memory Channel是一个不稳定的channel,它在内存中存储所有事件,

如果进程异常停止,内存中的数据将不能让恢复,而且受内存大小的限制。

4.flie channel:是一个持久化的channel,数据安全并且只要磁盘空间足够,它就可以将数据存储到磁盘上

5.checkpointDir:检查数据完整性,存放检查点目录,可以检测出哪些数据已被抽取,哪些还没有

6.dataDirs:存放数据的目录,dataDirs可以是多个目录,以逗号隔开,用独立的多个磁盘上的多个目录可以提高file channel的性能。

7.hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改为DataStream(数据流)hdfs.writeFormat = Text 改为文本格式

9.当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.codeC压缩编码解码器 --》snappy压缩

10.batchSize默认值:100 每个批次刷新到HDFS上的events数量;


创建目录

mkdir –p /data/logs

flume部署安装以及案例运行

模拟数据

cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/

flume部署安装以及案例运行

查看数据

flume部署安装以及案例运行

运行

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties

flume部署安装以及案例运行

查看下HDFS

flume部署安装以及案例运行

运行实例三

将hive的一些jar拷贝过来 flume的lib目录下

flume部署安装以及案例运行

flume部署安装以及案例运行

配置flume agent

source:netcat

channel:Memory

sink:hive

启动hive的元数据服务:

/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &

flume部署安装以及案例运行

创建库和表 (表必须是CLUSTERED BY ,INTO BUCKETS)

create database flume_test;

use flume_test;

create table flume_user(

user_id int,

user_name string,

user_age int

)CLUSTERED BY (user_id) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

flume部署安装以及案例运行

准备配置文件flume-sink-hive.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安装以及案例运行

配置概念补充

1.serializer: 负责解析事件中的字段并将它们映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列, 内部使用

org.apache.hive.hcatalog.data.JsonSerDe)


2.DELIMITED:

serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\\t"

serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表

serializer.serdeSeparator :输出字段分隔符,单引号括起来,例如'\\t'

hive参数设置vim hive-site.xml:


<code><property>/<code>
<code> <name>hive.metastore.uris/<name>/<code>
<code> <value>thrift://masterhbase:9083/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.txn.manager/<name>/<code>
<code> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.initiator.on/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.compactor.worker.threads/<name>/<code>
<code> <value>1/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.support.concurrency/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.enforce.bucketing/<name>/<code>
<code> <value>true/<value>/<code>
<code>
<code><property>/<code>
<code> <name> hive.exec.dynamic.partition.mode/<name>/<code>
<code> <value>nonstrict/<value>/<code>
<code>
<code><property>/<code>
<code> <name>hive.in.test/<name>/<code>
<code> <value>true/<value>/<code>
<code>


flume部署安装以及案例运行

解决报错问题

(1)报错:

Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

打开一部分事务支持

-》协同配置

hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务

hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间

hive.support.concurrency=true; -》是否支持并发,默认是false

hive.enforce.bucketing=true; -》是否启用bucketing,写入table数据时会启动分桶

hive.exec.dynamic.partition.mode=nonstrict; -》设置非严格模式

(2)启动metastore时报错:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下属性:这个是用来创建COMPACTION_QUEUE这张表的


hive.in.test

true


(3)再启动metastore时报错:

Error rolling back: Can't call rollback when autocommit=true

去掉以下属性:


hive.in.test

true

之前没有安装,先安装

flume部署安装以及案例运行

启动flume agent

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

flume部署安装以及案例运行

使用nc去连接,然后输入数据,数据以制表符分割

flume部署安装以及案例运行

Hive中可以看到数据

flume部署安装以及案例运行

运行实例四(hive)

创建表

create table emp(

empno int,

ename string,

job string,

mgr int,

hiredate string,

sal double,

comm double,

deptno int

)CLUSTERED BY (empno) INTO 2 BUCKETS

row format delimited fields terminated by '\\t'

stored as orc;

flume部署安装以及案例运行

准备配置信息flume-sink-hive2.properties

flume部署安装以及案例运行

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume部署安装以及案例运行

运行flume

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties

flume部署安装以及案例运行

查看数据

flume部署安装以及案例运行



分享到:


相關文章: