Kafka(一)

Kafka(一)

Kafka是一款分佈式消息發佈和訂閱系統,具有高性能、高吞吐量的特點而被廣泛應用與大數據傳輸場景。它是由LinkedIn公司開發,使用Scala語言編寫,之後成為Apache基金會的一個頂級項目。

Kafka應用場景

日誌分析系統


Kafka(一)


為什麼要用Kafka

互聯網公司在營銷方面需要逐步做到精細化運營的需求,這樣就能夠針對不同用戶的喜好推送不同的產品。而要實現這個過程就需要收集和分析用戶的行為數據。而通過傳統的ActiveMQ這類的消息中間件在處理大數據傳輸的時候存在時效性、性能、吞吐能力、消息堆積等問題。

Kafka早期設計的目的是作為LinkIn活動流和運營數據處理管道,它天然的具備了高吞吐量、內置分區、複製、容錯的能力而非常適合處理大規模的消息。因此很多的大數據傳輸場景都選用kafka。比如應用日誌收集分析、消息系統、用戶行為分析、運營指標(服務器性能數據)、流式處理(spark、storm)

Kafka架構


Kafka(一)


Kafka(一)


KafKa關鍵詞講解

  • Broker 每個kafka實例
  • Producer 生產者
  • Consumer 消費者
  • Topic 一類消息
  • Partition 分區
  • Consumer Group 消費者組

Kafka集群安裝

  1. 準備一個3節點的zookeeper的集群
  2. 官網下載一個Kafka安裝包kafka_2.11-0.11.0.0.tgz
  3. 3個Kafka節點,本人是:192.168.202.133,192.168.202.134,192.168.202.135
  4. 解壓kafka_2.11-0.11.0.0.tgz安裝包
  5. 修改每個節點的config/server.properties
<code>#broken id,0/1/2
broker.id=0
#集群通信端口
listeners=PLAINTEXT://192.168.202.133:9092
#zookeeper連接串
zookeeper.connect=192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181/<code>
  1. 啟動服務
<code>sh ../bin/kafka-server-start.sh -daemon ../config/server.properties/<code>
  1. 查看Kafka Leader
<code>./zkCli.shls /brokers/ids[0,1,2]/<code>


Kafka(一)


Kafka(一)

三個節點已經註冊上去,brokerid為1的是leader


  1. 查看Kafka的topicsbin/kafka-topics.sh --list --zookeeper 192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181
  2. 創建topic./kafka-topics.sh --create --zookeeper 192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181 --replication-factor 3 --partitions 3 --topic test1
  3. 創建producer,發送消息./kafka-console-producer.sh --broker-list 192.168.202.133:9092,192.168.202.134:9092,192.168.202.135:9092 --topic test1
  4. 創建consumer,接收消息sh kafka-console-consumer.sh --zookeeper 192.168.202.133:2181,192.168.202.134:2181,192.168.202.133:2181 --topic test1producer發送消息consumer能接受到消息。
  5. 查看分區和副本
<code>ls /tmp/kafka-logs//<code>


Kafka(一)


查看上圖test1 test1_0,test1_1,test1_2說明是3個副本,然後查看集群3個節點發現都有test1(192.168.202.133,192.168.202.134,192.168.202.135)即3個分區。所以 大家多手動嘗試


Kafka(一)


Kafka(一)


Kafka監控平臺

kafka監控平臺有

  1. Kafka Web Conslole
  2. Kafka Manager
  3. KafkaOffsetMonitor(推薦)

KafkaOffsetMonitor監控平臺安裝

  1. 下載或者百度雲https://pan.baidu.com/s/1azAGWSWfYcqm5kNRfP_lbw密碼:xpo9
  2. mkdir -p /usr/local/kafkaMonitor
  3. 創建start.shjava
<code>-Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m \\     -cp KafkaOffsetMonitor-assembly-0.4.1-SNAPSHOT.jar \\     com.quantifind.kafka.offsetapp.OffsetGetterWeb \\     --offsetStorage kafka \\     --kafkaBrokers 112.74.73.47:9092 \\     --kafkaSecurityProtocol SASL_PLAINTEXT \\     --zk 112.74.73.47:2181 \\     --port 8048 \\     --refresh 10.seconds \\     --retain 7.days \\     --dbName offsetapp_kafka 1>/usr/local/kafkaMonitor/stdout.log 2>/usr/local/kafkaMonitor/stderr.log &chmod 755 start.sh/<code>
  1. 創建stop.sh#!/bin/bash

    killnum=`jps | grep OffsetGetterWeb | awk '{print $1}'`
    kill -9 ${killnum}
    echo "stop...."chmod 777 stop.sh
  1. 訪問地址 http://112.74.73.47:8048


Kafka(一)


topic:創建topic的名稱 partition:分區編號 offset:該partition已經消費了多少條message logsize:該partition已經寫入了多少條message lag:該partition未消費多少條message owner:gaipartition所屬消費者

Kafka實現細節

zookeeper在kafka中的作用

  1. 在Kakfa中扮演的角色

Kafka將元數據信息保存在Zookeeper中,但是發送給Topic本身的數據是不會發到Zk上的。kafka使用zookeeper來實現動態的集群擴展,不需要更改客戶端(producer和consumer)的配置。broker會在zookeeper註冊並保持相關的元數據(topic,partition信息等)更新。而客戶端會在zookeeper上註冊相關的watcher。一旦zookeeper發生變化,客戶端能及時感知並作出相應調整。這樣就保證了添加或去除broker時,各broker間仍能自動實現負載均衡。這裡的客戶端指的是Kafka的消息生產端(Producer)和消息消費端(Consumer)Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition的leader建立socket連接併發送消息。也就是說每個Topic的partition是由Lead角色的Broker端使用zookeeper來註冊broker信息,以及監測partition leader存活性.Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息.

  1. zookeeper中信息


Kafka(一)


Kafka的消息

Kafka中最基本的數據單元是消息,我們可以簡單把消息理解成數據庫裡面的一條記錄。消息是由字符數組組成。

消息可以有一個可選的key,這個key也是字符數組。Key用來確定消息寫入分區時,進入哪一個分區。

Topic&Partition

Topic是用於存儲消息的邏輯概念,可以看做一個消息的集合。每個Topic可以有多個生產者像其推送消息,也可以有多個消費者消費其中的消息,每個topic可以劃分多個分區(至少有一個分區),同一topic下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會分配一個offset(稱之為偏移量),它是消息在此分區中的唯一編號,Kafka通過offset保證消息在分區內的順序,offset的順序不跨分區,即Kafka只保證在同一個分區內消息是有序的;

Partition是以文件的形式存儲在文件系統中,存儲在kafka-log目錄下,命名規則是:<topic>-<patition>/<topic>


Kafka(一)


Kafka的高吞吐量的因素

1. 順序寫的方式存儲數據;

2. 批量發送;在異步發送模式中。Kafka允許進行批量發送,也就是先將消息緩存在內存中,然後依次請求批量發送出去。這樣減少了頻繁的磁盤IO以及網絡IO造成的性能瓶頸

batch.size 每批次發送的數據大小

linger.ms 間隔時間

3. 零拷貝

消息從發送到落地保存,broker維護的消息日誌本身就是文件目錄,每個文件都是二進制保存,生產者和消費者使用相同的格式來處理。在消費者獲取消息時,服務器先從硬盤讀取數據到內存


Kafka(一)


左圖:

  1. 操作系統將數據從磁盤讀入到內核空間的頁緩存
  2. 應用程序將數據從內核空間讀入到用戶空間
  3. 應用程序將數據從用戶空間寫回到內核空間的socket緩存中
  4. 操作系統將數據從socket緩衝區複製到網卡緩衝區,以便將數據經網絡發出

右圖:

  • 通過“零拷貝”技術可以省去沒必要的複製操作,同時減少上下文切換次數

日誌保留策略&日誌壓縮

  • 日誌保留策略無論消費者是否已經消費了消息,Kafka都會一直保存這些消息,但不是像數據庫哪樣長期保存,為了避免磁盤空間被佔滿,Kafka會配置相應的保留策略(retention policy),以實現週期性地刪除陳舊的消息Kafka有兩種"保留策略":根據消息保留的時間,當消息在Kafka中保存的時間超過指定時間,就可以被刪除根據topic存儲的數據大小,當topic所佔的日誌文件大小大於一個閥值,則可以刪除最舊的消息
  • 日誌壓縮策略在很多場景下,消息的key和value的值之間的對應關係是不斷變化的,就數據庫中的數據被修改一樣,消費者只關心key對應的最新的value。我們可以開啟壓縮功能,Kafka定期將相同key的消息進行合併,只保留最新的value值


Kafka(一)


Kafka消息的可靠性

消息發送可靠性

生產者發送消息到broker,有三種確認方式(request.required.acks)

acks = 0: producer不會等待broker(leader)發送ack 。因為發送消息網絡超時或broker crash(1.Partition的Leader還沒有commit消息 2.Leader與Follower數據不同步),既有可能丟失也可能會重發。

acks = 1: 當leader接收到消息之後發送ack,丟失的消息會重發,丟的概率很小

acks = -1: 當所有的follower都同步消息成功後發送ack. 丟失消息可能性比較低。

消息存儲的可靠性

每一條消息被髮送到broker中,會根據partition規則選擇被存儲到哪一個partition。如果partition規則設置的合理,所有消息可以均勻分佈到不同的partition中,這樣實現了水平擴展

在創建topic時可以指定這個topic對應的partition的數量。在發送一條消息時,可以指定這條消息的key,producer根據這個key和partition機制來判斷這個消息發送到哪個partition。Kafka的高可靠性的保障來自另外一個叫做副本(replication)的策略,通過設置副本的相關參數,可以使Kafka在性能和可靠性之間做不同的切換

sh kafka-topics.sh --create--zookeeper 192.168.11.140:2181 --replication-factor 2 --partitions 3 --topic topic_01

--replication-factor 表示的副本數

--partitions 表示分區數量


副本機制

ISR:ISR表示目前“可用且消息量與leader相差不多的副本集合,這是整個副本集合的一個子集”副本同步隊列維護的是有資格的follower節點,

副本的所有節點都必須和zookeeper保持連接狀態

副本的最後一條消息offset和leader副本的最後一條消息的offset之間的差值不能超過指定的閥值,這個閥值是可以設置的(replica.lag.max.message)


Kafka(一)


HW&LEO

HW(HighWatermark)和LEO(Log End Offset)

關於follower副本同步的過程中,還有兩個關鍵的概念HW和LEO,這兩個參數和ISR集合密切相關。HW標記了一個特殊的offset,當消費者處理消息的時候,只能拉取到HW之前的消息,HW之後的消息對消費者來說是不可見的。也就是說,取partition對應ISR中最小的LEO作為HW,consumer最多隻能消費到HW所在的位置。每個replica都有HW,leader和follower各自維護更新自己的HW的狀態。對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步更新HW,此時消息才能被consumer消費。這樣保證瞭如果leader副本損壞,該消息仍然可以從新選舉的leader中獲取

LEO是所有副本都會有的一個offset標記,它指向追加到當前副本的最後一個消息的offset。當生產者像leader副本追加消息的時候,leader副本的LEO標記就會遞增;當follower副本成功從leader副本拉取消息並更新到本地的時候,follower副本的LEO就會增加


Kafka(一)


文件存儲機制

存儲機制

在kafka文件存儲中,同一個topic下有多個不同的partition,每個partition為一個目錄,partition的名稱規則為:topic名稱+有序序號,第一個序號從0開始,最大的序號為partition數量減1,partition是實際物理上的概念,而topic是邏輯上的概念

partition還可以細分為segment,這個segment是什麼呢? 假設kafka以partition為最小存儲單位,那麼我們可以想象當kafka producer不斷髮送消息,必然會引起partition文件的無線擴張,這樣對於消息文件的維護以及被消費的消息的清理帶來非常大的挑戰,所以kafka 以segment為單位又把partition進行細分。每個partition相當於一個巨型文件被平均分配到多個大小相等的segment數據文件中(每個setment文件中的消息不一定相等),這種特性方便已經被消費的消息的清理,提高磁盤的利用率

segment file**組成**:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,後綴".index"和“.log”分別表示為segment索引文件、數據文件.

segment**文件命名規則**:partion全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充


Kafka(一)


查看消息記錄

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/$topicName/000**.log --print-data-log

查找方式

以上圖為例,讀取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index為最開始的文件,第二個文件為00000000000000170410.index(起始偏移為170410+1=170411),而第三個文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了第二個segment文件之中。其他後續文件可以依次類推,以其實偏移量命名並排列這些文件,然後根據二分查找法就可以快速定位到具體文件位置。其次根據00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進行讀取。


分享到:


相關文章: