隨著業務複雜度的提升以及微服務的興起,傳統單一項目會被按照業務規則進行垂直拆分,另外為了防止單點故障我們也會將重要的服務模塊進行集群部署,通過負載均衡進行服務的調用。那麼隨著節點的增多,各個服務的日誌也會散落在各個服務器上。這對於我們進行日誌分析帶來了巨大的挑戰,總不能一臺一臺的登錄去下載日誌吧。那麼我們需要一種收集日誌的工具將散落在各個服務器節點上的日誌收集起來,進行統一的查詢及管理統計。那麼ELK就可以做到這一點。
ELK是ElasticSearch+Logstash+Kibana的簡稱1.1、ElasticSearch(簡稱ES)
Elasticsearch是一個高度可擴展的開源全文搜索和分析引擎。它允許您快速、實時地存儲、搜索和分析大量數據。它通常用作底層引擎/技術,為具有複雜搜索特性和需求的應用程序提供動力。我們可以藉助如ElasticSearch完成諸如搜索,日誌收集,反向搜索,智能分析等功能
1.2、Logstash
Logstash是一個開源數據收集引擎,具有實時流水線功能。Logstash可以動態地將來自不同數據源的數據統一起來,並將數據規範化後(通過Filter過濾)傳輸到您選擇的目標。
在這裡inputs代表數據的輸入通道,大家可以簡單理解為來源。常見的可以從kafka,FileBeat, DB等獲取日誌數據,這些數據經過fliter過濾後(比如說:日誌過濾,json格式解析等)通過outputs傳輸到指定的位置進行存儲(Elasticsearch,Mogodb,Redis等)
簡單的實例:
cd logstash-6.4.1
bin/logstash -e 'input { stdin { } } output { stdout {} }'
1.3、Kibana
kibana是用於Elasticsearch檢索數據的開源分析和可視化平臺。我們可以使用Kibana搜索、查看或者與存儲在Elasticsearch索引中的數據交互。同時也可以輕鬆地執行高級數據分析並在各種圖表、表和映射中可視化數據。基於瀏覽器的Kibana界面使您能夠快速創建和共享動態儀表板,實時顯示對Elasticsearch查詢的更改。
1.4、處理方案
用戶通過java應用程序的Slf4j寫入日誌,SpringBoot默認使用的是logback。我們通過實現自定義的Appender將日誌寫入kafka,同時logstash通過input插件操作kafka訂閱其對應的主題。當有日誌輸出後被kafka的客戶端logstash所收集,經過相關過濾操作後將日誌寫入Elasticsearch,此時用戶可以通過kibana獲取elasticsearch中的日誌信息
二、SpringBoot中的配置
在SpringBoot當中,我們可以通過logback-srping.xml來擴展logback的配置。不過我們在此之前應當先添加logback對kafka的依賴,代碼如下:
添加好依賴之後我們需要在類路徑下創建logback-spring.xml的配置文件並做如下配置(添加kafka的Appender):
在這裡面我們主要注意以下幾點:
- 日誌輸出的格式是為模塊名 | 時間 | 日誌級別 | 類的全名 | 日誌內容
- SpringProfile節點用於指定當前激活的環境,如果spring.profile.active的值是哪個,就會激活對應節點下的配置
- springProperty可以讀取Environment中的值
三、ELK搭建過程
3.1、檢查環境
ElasticSearch需要jdk8,官方建議我們使用JDK的版本為1.8.0_131,原文如下:
Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131
檢查完畢後,我們可以分別在官網下載對應的組件
- ElasticSearch
- Kibana
- Logstash
- kafka
- zookeeper
3.2、啟動zookeeper
首先進入啟動zookeeper的根目錄下,將conf目錄下的zoo_sample.cfg文件拷貝一份重新命名為zoo.cfg
mv zoo_sample.cfg zoo.cfg
配置文件如下:
緊接著我們進入bin目錄啟動zookeeper:
./zkServer.sh start
3.3、啟動kafka
在kafka根目錄下運行如下命令啟動kafka:
./bin/kafka-server-start.sh config/server.properties
啟動完畢後我們需要創建一個logger-channel主題:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel
3.4、配置並啟動logstash
進入logstash跟目錄下的config目錄,我們將logstash-sample.conf的配置文件拷貝到根目錄下重新命名為core.conf,然後我們打開配置文件進行編輯:
我們分別配置logstash的input,filter和output(懂ruby的童鞋們肯定對語法結構不陌生吧):
- 在input當中我們指定日誌來源為kafka,具體含義可以參考官網:kafka-input-plugin
- 在filter中我們配置grok插件,該插件可以利用正則分析日誌內容,其中patterns_dir屬性用於指定自定義的分析規則,我們可以在該文件下建立文件配置驗證的正則規則。舉例子說明:55.3.244.1 GET /index.html 15824 0.043的 日誌內容經過如下配置解析:
grok {
match => { "message" => "%{IP:client} %{WORD:method} % {URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
解析過後會變成:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
這些屬性都會在elasticsearch中存為對應的屬性字段。更詳細的介紹請參考官網:grok ,當然該插件已經幫我們定義好了好多種核心規則,我們可以在這裡查看所有的規則。
- 在output當中我們將過濾過後的日誌內容打印到控制檯並傳輸到elasticsearch中,我們可以參考官網上關於該插件的屬性說明:地址
- 另外我們在patterns文件夾中創建好自定義的規則文件logback,內容如下:
# yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527
LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})
編輯好配置後我們運行如下命令啟動logstash:
bin/logstash -f first-pipeline.conf --config.reload.automatic
該命令會實時更新配置文件而不需啟動
3.5、啟動ElasticSearch
啟動ElasticSearch很簡單,我們可以運行如下命令:
./bin/elasticsearch
我們可以發送get請求來判斷啟動成功:
GET http://localhost:9200
我們可以得到類似於如下的結果:
3.5.1 配置IK分詞器(可選)
我們可以在github上下載elasticsearch的IK分詞器,地址如下:ik分詞器,然後把它解壓至your-es-root/plugins/ik的目錄下,我們可以在{conf}/analysis-ik/config/IKAnalyzer.cfg.xmlor {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml 裡配置自定義分詞器:
首先我們添加索引:
curl -XPUT http://localhost:9200/my_index
我們可以把通過put請求來添加索引映射:
其中doc是映射名 my_index是索引名稱
3.5.2 logstash與ElasticSearch
logstash默認情況下會在ES中建立logstash-*的索引,*代表了yyyy-MM-dd的時間格式,根據上述logstash配置filter的示例,其會在ES中建立module ,logmessage,class,level等索引。(具體我們可以根據grok插件進行配置)
3.6 啟動Kibana
在kibana的bin目錄下運行./kibana即可啟動。啟動之後我們可以通過瀏覽器訪問http://localhost:5601 來訪問kibanaUI。我們可以看到如下界面:
閱讀更多 程序界小哥 的文章