ELK+kafka+filebeat實現nginx日誌收集

工作流如下

nginx產生日誌,filebeat收集日誌推送到kafka,logstash從kafka獲取到數據,推送到es處理,最後kibana在頁面展示

服務規劃

<code>#ES服務三臺
10.88.0.250    10.88.0.251     10.88.0.252/<code>
<code>#kafka一臺(自帶zk)
10.88.0.252
/<code>
<code>#logstash一臺
10.88.0.250/<code>
<code>#kibana一臺
10.88.0.250/<code>
<code>#軟件管理方式,supervisor
#pip install supservisor
#echo_supervisord_conf >/etc/supervisord.conf
#cat /etc/supervisord.conf
[include]
files = /etc/supervisord/*.conf
#mkdir -p /etc/supervisord
#supervisord -c /etc/supervisord.conf/<code>
<code>#jdk環境
#ll /soft/jdk1.8.0_191/
#cat  /etc/profile
export JAVA_HOME=/soft/jdk1.8.0_191
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin/<code>

列出使用的軟件,軟件自帶版本

<code>elasticsearch-7.4.2-linux-x86_64.tar.gz
filebeat-7.4.2-linux-x86_64.tar.gz
kibana-7.4.2-linux-x86_64.tar.gz
logstash-7.4.2.tar.gz
kafka_2.13-2.4.1.tar.gz/<code>

首先ES環境的部署

useradd work 三臺機器都要創建ES的管理用戶,ES無法用root用戶啟動

節點-1 10.88.0.250

<code>#ll /soft/elasticsearch-7.4.2/
#grep -Ev '^#|^$' /soft/elasticsearch-7.4.2/config/elasticsearch.yml
cluster.name: cluster-es
node.name: node-1
network.host: 0.0.0.0
network.publish_host: 10.88.0.250
node.master: true
node.data: true
http.port: 9200
transport.tcp.port: 9300
discovery.seed_hosts: ["10.88.0.250:9300", "10.88.0.251:9300","10.88.0.252:9300"]
cluster.initial_master_nodes: ["10.88.0.250", "10.88.0.251","10.88.0.252"]
http.cors.enabled: true
http.cors.allow-origin: "*"
#su work
#cd /soft/elasticsearch-7.4.2/bin/
#./elasticsearch -d #啟動服務即可/<code>

節點-2 10.88.0.251

<code>#ll /soft/elasticsearch-7.4.2/
#grep -Ev '^#|^$' /soft/elasticsearch-7.4.2/config/elasticsearch.yml
cluster.name: cluster-es
node.name: node-2
network.host: 0.0.0.0
network.publish_host: 10.88.0.251
node.master: true
node.data: true
http.port: 9200
transport.tcp.port: 9300
discovery.seed_hosts: ["10.88.0.250:9300", "10.88.0.251:9300","10.88.0.252:9300"]
cluster.initial_master_nodes: ["10.88.0.250", "10.88.0.251","10.88.0.252"]
http.cors.enabled: true
http.cors.allow-origin: "*"
#su work
#cd /soft/elasticsearch-7.4.2/bin/
#./elasticsearch -d #啟動服務即可/<code>

節點-3 10.88.0.252

<code>#ll /soft/elasticsearch-7.4.2/
#grep -Ev '^#|^$' /soft/elasticsearch-7.4.2/config/elasticsearch.yml
cluster.name: cluster-es
node.name: node-3
network.host: 0.0.0.0
network.publish_host: 10.88.0.252
node.master: true

node.data: true
http.port: 9200
transport.tcp.port: 9300
discovery.seed_hosts: ["10.88.0.250:9300", "10.88.0.251:9300","10.88.0.252:9300"]
cluster.initial_master_nodes: ["10.88.0.250", "10.88.0.251","10.88.0.252"]
http.cors.enabled: true
http.cors.allow-origin: "*"
#su work
#cd /soft/elasticsearch-7.4.2/bin/
#./elasticsearch -d #啟動服務即可/<code>

------------------------------------------------------------------------------------------------------------------------------------------------

kafka的部署

在服務器10.88.0.252上

<code>#ll /soft/kafka_2.13-2.4.1/
#grep -Ev '^#|^$' /soft/kafka_2.13-2.4.1/config/server.properties
broker.id=0
listeners=PLAINTEXT://10.88.0.252:9092
advertised.listeners=PLAINTEXT://10.88.0.252:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.88.0.252:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

#zk 配置
[[email protected] config]#grep -Ev '^#|^$' /soft/kafka_2.13-2.4.1/config/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false/<code>

supervisor的服務配置

<code>#cat /etc/supervisord/kafka.conf 
[program:kafka_service]
command = /soft/kafka_2.13-2.4.1/bin/kafka-server-start.sh config/server.properties
directory = /soft/kafka_2.13-2.4.1/
process_name = %(program_name)s_%(process_num)s
numprocs = 1
autorestart = true
startsecs = 1
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 20
stdout_logfile = /var/log/kafka_service.log
stderr_logfile = /var/log/kafka_service_error.log
#cat /etc/supervisord/zk.conf
[program:kafka_zk]
command = /soft/kafka_2.13-2.4.1/bin/zookeeper-server-start.sh config/zookeeper.properties
directory = /soft/kafka_2.13-2.4.1/
process_name = %(program_name)s_%(process_num)s
numprocs = 1
autorestart = true
startsecs = 1
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 20
stdout_logfile = /var/log/kafka_zk.log
stderr_logfile = /var/log/kafka_zk_error.log/<code>

------------------------------------------------------------------------------------------------------------------------------------------------

#logstash服務配置

在服務器10.88.0.250上

<code>#cd /soft/logstash-7.4.2/
#grep -Ev '^#|^$' /soft/logstash-7.4.2/config/logstash.yml
path.data: /var/lib/logstash/data
path.logs: /var/lib/lofstash/logs
http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://10.88.0.250:9200", "http://10.88.0.252:9200","http://10.88.0.252:9200"]
#grep -Ev '^#|^$' /soft/logstash-7.4.2/config/logstash.conf
input {
kafka {
bootstrap_servers => "10.88.0.252:9092"
topics => ["xmw-php-logs"]
codec => "json"
consumer_threads => 1
client_id => "logstash20190702"
group_id => "logstash20190702"
}
}

filter {
grok {
patterns_dir => ["./patterns"]
match => {
"message" => "\\[%{WORD:[data][level]}\\] %{TIMESTAMP_ISO8601:[data][timestamp]} traceId:%{LOGBACK_WORD:[data][traceId]} spanId:%{LOGBACK_WORD:[data][spanId]} applicationName:%{LOGBACK_WORD:[data][application]} - %{GREEDYDATA:[data][textMessage]}"
}
}
grok {
patterns_dir => ["./patterns"]
match => {
\t"[data][textMessage]" => "\\[%{NUMBER:[data][elapsed]}\\(ms\\)\\] \\[%{NUMBER:[data][memory]}\\(MB\\)\\] \\[%{LOGBACK_WORD:[data][url]}\\] %{GREEDYDATA:[data][text]}"
}
}
grok {
patterns_dir => ["./patterns"]
match => {
"message" => "%{LOGBACK_WORD:[data][application]} \\[%{TIMESTAMP_ISO8601:[data][timestamp]}\\] %{IP:[data][client_ip]} %{USER:[data][user]} %{WORD:[data][scheme]} %{LOGBACK_WORD:[data][uri]} %{WORD:[data][method]} \"%{GREEDYDATA:[data][query_string]}\" body %{NUMBER:[data][body_size]} %{NUMBER:[data][status]} %{LOGBACK_WORD:[data][real_ip]} %{LOGBACK_WORD:[data][x_forwarded_for]} %{LOGBACK_WORD:[data][upstream_addr]} %{LOGBACK_WORD:[data][upstream_status]} %{LOGBACK_WORD:[data][traceId]} %{LOGBACK_WORD:[data][spanId]} %{NUMBER:[data][elapsed]} %{LOGBACK_WORD:[data][upstream_response_time]} %{LOGBACK_CHAR:[data][http_referer]} %{GREEDYDATA:[data][user_agent]}"
}
}
grok {
patterns_dir => ["./patterns"]
match => {
\t"message" => "%{LOGBACK_WORD:[data][application]} \\[%{TIMESTAMP_ISO8601:[data][timestamp]}\\] %{IP:[data][client_ip]} %{USER:[data][user]} %{WORD:[data][scheme]} %{LOGBACK_WORD:[data][uri]} %{WORD:[data][method]} \"%{GREEDYDATA:[data][query_string]}\" \"%{GREEDYDATA:[data][body]}\" %{NUMBER:[data][body_size]} %{NUMBER:[data][status]} %{LOGBACK_WORD:[data][real_ip]} %{LOGBACK_WORD:[data][x_forwarded_for]} %{LOGBACK_WORD:[data][upstream_addr]} %{LOGBACK_WORD:[data][upstream_status]} %{LOGBACK_WORD:[data][traceId]} %{LOGBACK_WORD:[data][spanId]} %{NUMBER:[data][elapsed]} %{LOGBACK_WORD:[data][upstream_response_time]} %{LOGBACK_CHAR:[data][http_referer]} %{GREEDYDATA:[data][user_agent]}"
}
}
date {
match => ["[data][timestamp]", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601"]
target => "@timestamp"
timezone => "+08:00"
}
json {
skip_on_invalid_json => true
source => "[data][textMessage]"
target => "[data][jsonMessage]"
}
ruby {
code => "
event.set('[data][message][client_ip]', event.get('[data][jsonMessage][client_ip]')) if event.get('[data][jsonMessage][client_ip]')
event.set('[data][message][params][operateType]', event.get('[data][jsonMessage][params][operateType]')) if event.get('[data][jsonMessage][params][operateType]')
event.set('[data][message][params][tableId]', event.get('[data][jsonMessage][params][tableId]')) if event.get('[data][jsonMessage][params][tableId]')
event.set('[data][message][user_id]', event.get('[data][jsonMessage][user_id]')) if event.get('[data][jsonMessage][user_id]')
event.set('[data][message][action_type]', event.get('[data][jsonMessage][action_type]')) if event.get('[data][jsonMessage][action_type]')
event.set('[data][message][group_id]', event.get('[data][jsonMessage][group_id]')) if event.get('[data][jsonMessage][group_id]')
event.set('[data][message][app_id]', event.get('[data][jsonMessage][app_id]')) if event.get('[data][jsonMessage][app_id]')
event.set('[data][host]', event.get('[host][hostname]'))
event.set('[data][message][uid]', event.get('[data][jsonMessage][0][uid]')) if event.get('[data][jsonMessage][0][uid]')
event.set('[data][message][url]', event.get('[data][jsonMessage][0][url]')) if event.get('[data][jsonMessage][0][url]')
event.set('[data][message][user_ip]', event.get('[data][jsonMessage][user_ip]')) if event.get('[data][jsonMessage][user_ip]')
event.set('[data][message][account]', event.get('[data][jsonMessage][account]')) if event.get('[data][jsonMessage][account]')
event.set('[data][message][performTime]', event.get('[data][jsonMessage][0][performTime]')) if event.get('[data][jsonMessage][0][performTime]')

event.set('[data][message][request][uri]', event.get('[data][jsonMessage][request][uri]')) if event.get('[data][jsonMessage][request][uri]')
event.set('[data][message][response][responseTime]', event.get('[data][jsonMessage][response][responseTime]')) if event.get('[data][jsonMessage][response][responseTime]')
event.set('[data][message][response][status]', event.get('[data][jsonMessage][response][status]')) if event.get('[data][jsonMessage][response][status]')
event.set('[data][message][invokeClientAddr]', event.get('[data][jsonMessage][invokeClientAddr]')) if event.get('[data][jsonMessage][invokeClientAddr]')
event.set('[data][message][invokeServerAddr]', event.get('[data][jsonMessage][invokeServerAddr]')) if event.get('[data][jsonMessage][invokeServerAddr]')
event.set('[data][message][invokeMethod]', event.get('[data][jsonMessage][invokeMethod]')) if event.get('[data][jsonMessage][invokeMethod]')
event.set('[data][message][consumeTime]', event.get('[data][jsonMessage][consumeTime]')) if event.get('[data][jsonMessage][consumeTime]')
event.set('[data][message][moduleName]', event.get('[data][jsonMessage][moduleName]')) if event.get('[data][jsonMessage][moduleName]')
event.set('[data][message][domain]', event.get('[data][jsonMessage][domain]')) if event.get('[data][jsonMessage][domain]')

"
}
if ![data][application] {
mutate { add_field => { "[data][application]" => "xmw-default" } }
}
mutate {
remove_field => ["cloud", "beat", "host", "@version", "prospector", "input", "fields", "[data][timestamp2]", "[data][textMessage]", "[data][jsonMessage]", "ecs", "agent", "tags", "[data][text]"]
}
mutate { convert => {
\t"[data][elapsed]" => "float"
\t"[data][memory]" => "integer"
\t"[data][status]" => "integer"
\t"[data][upstream_status]" => "integer"
\t"[data][body_size]" => "integer"
\t"[data][upstream_response_time]" => "float"
\t}
}
}
output {
elasticsearch {
hosts => ["10.88.0.250:9200","10.88.0.251:9200","10.88.0.252:9200"]
index => "xm-%{[data][application]}-%{+YYYY.MM}"
}
}
/<code>

#規則文件創建

<code>#cat  /soft/logstash-7.4.2/patterns/extra 
LOGBACK_WORD [:\\w+\\./-]*
LOGBACK_CHAR [^\\s]+
LOGBACK_BRACKET [^\\]]+/<code>

ruby相關的設置(按需,不一定需要)

<code>#cat /soft/logstash-7.4.2/ruby/filterPwd.rb 
# the value of `params` is the value of the hash passed to `script_params`
# in the logstash configuration

#def register(params)
#todo
#end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
queryRegex = /[&]?password=([^&]+[&]?)/
bodyFormRegex = /[-]+[-\\w:;\\s\\\\]+name=[\\w\\\\]+password[\\w\\\\]+[^-]/
bodyJsonRegex = /[^,{]+password[^,}]+[,]?/
_message = event.get('message').gsub(queryRegex, "").sub(bodyFormRegex, "").sub(bodyJsonRegex, "")
event.set('message', _message)
return [event]
#if queryStr = event.get('[data][query_string]')
# event.set('[data][query_string]', queryStr.sub(queryRegex, ""))
#end
#bodyStr = event.get('[data][body]')
#if bodyStr =~ queryRegex
# event.set('[data][body]', bodyStr.sub(queryRegex, ""))
#elsif bodyStr =~ bodyFromRegex
# event.set('[data][body]', bodyStr.sub(bodyFormRegex, ""))
#elsif bodyStr =~ bodyJsonRegex
# event.set('[data][body]', bodyStr.sub(bodyJsonRegex, ""))
#end
#return [event]
end/<code>

#supervisor控制服務的設置

<code>#cat /etc/supervisord/logstash.conf 
[program:logstash]
command = /soft/logstash-7.4.2/bin/logstash -f /soft/logstash-7.4.2/config/logstash.conf
directory = /soft/logstash-7.4.2/
process_name = %(program_name)s_%(process_num)s
numprocs = 1
autorestart = true
startsecs = 1
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 20
stdout_logfile = /var/log/logstash.log
stderr_logfile = /var/log/logstash.log/<code>

------------------------------------------------------------------------------------------------------------------------------------------------

kibana的配置

在服務器10.88.0.250上

<code>#grep -Ev '^#|^$' /soft/kibana-7.4.2/config/kibana.yml 
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://10.88.0.250:9200","http://10.88.0.251:9200","http://10.88.0.252:9200"]
logging.dest: /var/log/kibana/kibana.log
i18n.locale: "zh-CN"/<code>

supervisor控制kibana

<code>#cat  /etc/supervisord/kibana.conf 
[program:kibana]
command = /soft/kibana-7.4.2/bin/kibana --allow-root
directory = /soft/kibana-7.4.2
process_name = %(program_name)s_%(process_num)s
numprocs = 1
autorestart = true
startsecs = 1
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 20
stdout_logfile = /var/log/kibana.log
stderr_logfile = /var/log/kibana.log/<code>

------------------------------------------------------------------------------------------------------------------------------------------------

filebeat的設置

在服務器10.88.0.251上

<code>#ll /soft/filebeat-7.4.2/
#grep -Ev '^#|^$' /soft/filebeat-7.4.2/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /var/log/nginx/access.log
fields:
log_topics: xmw-php-logs
- type: log
enabled: true
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /var/log/nginx/access.log
fields:
log_topics: shequ-nginx
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml

reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
output.kafka:
hosts: ["10.88.0.252:9092"]
#topic: product-nginx-worldapi
topic: '%{[fields][log_topics]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~/<code>

supervisor控制的部分

<code>#cat  /etc/supervisord/filebeat.conf 
[program:filebeat]
command = /soft/filebeat-7.4.2/filebeat -c /soft/filebeat-7.4.2/filebeat.yml
directory = /soft/filebeat-7.4.2/
process_name = %(program_name)s_%(process_num)s
numprocs = 1
autorestart = true
startsecs = 1
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 20
stdout_logfile = /var/log/filebeat.log
stderr_logfile = /var/log/filebeat.log/<code>

在10.88.0.251啟動nginx服務

<code>#yum -y install   nginx
#cat /etc/nginx/nginx.conf   #注重log_format設置
log_format elk_nobody 'nginx-$http_host [$time_iso8601] $remote_addr $remote_user $scheme $uri $request_method "$query_string" body $body_bytes_sent $status $http_x_real_ip $http_x_forwarded_for $upstream_addr $upstream_status $http_trace_id $http_span_id $request_time $upstream_response_time $http_referer $http_user_agent';
access_log /var/log/nginx/access.log elk_nobody;/<code>

最後在kibana上創建索引

設置---索引模式--創建索引模式(索引模式,搜索xm*)


ELK+kafka+filebeat實現nginx日誌收集


ELK+kafka+filebeat實現nginx日誌收集

點擊創建索引模式就可以創建成功

進入discover可以查看剛才添加的nginx日誌(有訪問才會觸發,所以多訪問幾次nginx)


ELK+kafka+filebeat實現nginx日誌收集

這樣就創建成功了


ELK+kafka+filebeat實現nginx日誌收集


分享到:


相關文章: