kafka安裝(集群)及配置說明

kafka安裝(集群)及配置說明

檢測本地JDK版本,保證為JDK1.8版本

#檢測JDK版本
rpm -qa | grep openjdk

# 卸載舊版JDK
yum -y remove java-1.7.0-openjdk-headless-1.7.0.111-2.6.7.8.el7.x86_64

# 安裝JDK1.8
yum install java-1.8.0-openjdk.x86_64

查看是否啟用zookeeper

安裝zookeeper方法參考

netstat -antpl | grep 2181

下載kafka

官網地址:http://kafka.apache.org/

下載地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.12-2.2.0.tgz

cd /opt/kafka
wget https://www-eu.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz

單機部署

修改配置信息

cd /opt/kafka/kafka_2.12-2.2.0

# 創建日誌目錄
mkdir -p /opt/kafka/kafka_2.12-2.2.0/logs

# 修改配置,參考下方說明
vim config/server.properties

# 啟動服務
./bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-2.2.0/config/server.properties

注:下面使用的zookeeper與kafka為同一臺服務器部署

# 測試是否生效
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
顯示:Created topic test.

# 查看topic分區
./bin/kafka-topics.sh --list --zookeeper localhost:2181

# 生產消息並且消費
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello !
>hello wecom kafka!
>hello my name githen
>
#每一次回車換行表示一條消息,使用ctrl+c結束生產消息


消費者消費消息命令
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning // 支持舊版本

注:以下為server.properties部分配置信息的說明

#一個broker在集群中的唯一標示,要求是正數。在改變IP地址,不改變broker.id的話不會影響consumers
broker.id=1

# 若使用1024以下端口,需要全用root權限啟動
#listeners=PLAINTEXT://:9092

#advertised.listeners=PLAINTEXT://your.host.name:9092

#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS


# broker 處理消息的最大線程數,一般情況下不需要去修改
num.network.threads=3

# broker處理磁盤IO 的線程數 ,數值應該大於你的硬盤數
num.io.threads=8

# socket的發送緩衝區(SO_SNDBUF)
socket.send.buffer.bytes=102400

# socket的接收緩衝區 (SO_RCVBUF)
socket.receive.buffer.bytes=102400

# 壓縮後限制單個消息的大小,默認值1000000,也就是1MB
message.max.bytes

# socket請求的最大字節數。為了防止內存溢出,message.max.bytes必然要小於
socket.request.max.bytes=104857600

#kafka數據的存放地址,多個地址的話用逗號分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2,按最少使用原則
# 把同一個分區 的日誌片段保存到同一個路徑下。
# 要注意:broker會往擁有最少數目分區的路徑新增分區,而不是往擁有最小磁盤空間的路徑新增分區
log.dirs=/opt/kafka/kafka_2.12-2.2.0/logs

# 每個topic的分區個數,更多的partition會產生更多的segment file
# 自動創建主題,主題的個數即為num.partitions的數
# 主題分區的個數不能減少,只能增加

# 哪果要讓一個主題的分區少於num.partitions的值,需要手動創建主題
num.partitions=1

# 如下三種情況,kafka會使用可配置的線程池來處理日誌片片段
# * 1. 服務器正常啟動,用於打開 每個分區的日誌片段
# * 2. 服務器崩潰後重啟,用戶檢查和截短每個分區 的日誌片段
# * 3. 服務器正常關閉,用於關閉日誌片段
# 試例:num.recovery.threads.per.data.dir設為8,並且log.dir指定3個路徑,那共需要24個線程
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 當達到下面的消息數量時,會將數據flush到日誌文件中。默認10000
#log.flush.interval.messages=10000

# 當達到下面的時間(ms)時,執行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。默認3000ms
#log.flush.interval.ms=1000

# 日誌保存時間 (hours|minutes),默認為7天(168小時)。超過這個時間會根據policy處理數據。bytes和minutes無論哪個先達到都會觸發。
# 還有 log.retention.minutes log.retention.ms 推薦使用log.retention.ms
log.retention.hours=168

# 日誌保存最大字節數,若一個包含8個分區的主題,且log.retention.bytes設置為1GB,那主題最多可以保留8GB的數據

# 同時設置了log.retention.ms 和 log.retention.bytes,滿足任意一條,消息都會被 刪除
#log.retention.bytes=1073741824

# 控制日誌片段文件的大小,超出該大小則追加到一個新的日誌segment文件中(-1表示沒有限制)
# 這個參數的值越小,就會越頻繁地關閉和分配新文件,從而降低磁盤寫入的整體效率
# 如一個主題每天接收100MB的消息,而log.segment.bytes使用默認設置(1GB),那需要10天才能填滿一個日誌片段,
# 因為日誌片段被關閉之前消息是不會過期的,所以如果log.retention.ms設置為604800000(1周),那日誌片段最多需要17天才會過期
# 關閉日誌片段需要10天時間,再根據過期時間,還需要再保留7天(要等到日誌片段裡的最後一個消息過期才能被刪除)
log.segment.bytes=1073741824


# 日誌片段關閉時間,無默認值
# log.segment.ms=
# 日誌片段文件的檢查週期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000

# Zookeeper quorum設置。如果有多個使用逗號分割 例如 ip:prot,ip:prot,ip:prot,一旦有一個zk服務器宕機,broker 可以連接到zk的另一個節點上
# 結構如下:hostname:port/path /path 是可選的zookeeper路徑,作為 kafka集群的chroot環境,不指定使用根路徑,避免多kafka集群的衝突

zookeeper.connect=localhost:2181

# 連接zk的超時時間
zookeeper.connection.timeout.ms=6000

# ZooKeeper集群中leader和follower之間的同步實際
group.initial.rebalance.delay.ms=0

# kafka會在以下幾種情況 創建主題:
# * 1. 當一個生產者開始往主題寫入消息時
# * 2. 當一個消費者開始從主題讀取消息時
# * 3. 當任意一個客戶端向主題發送元數據請求時
# 根據kafka協議,如果 一個主題不先被創建,根本無法知道 它是否已經存在
auto.create.topics.enable=false

集群部署

配置節點

利用單節點部署多個broker模擬集群機制,不同的broker不同的id,監聽端口以及日誌目錄

cd /opt/kafka/kafka_2.12-2.2.0/config

# borker 1
cp server.properties server-1.properties
vim server-1.properties

> broker.id=1
> listeners=PLAINTEXT://:9091
> log.dirs=/opt/kafka/kafka_2.12-2.2.0/logs/1

cp server.properties server-2.properties

vim server-2.properties

> broker.id=2
> listeners=PLAINTEXT://:9092
> log.dirs=/opt/kafka/kafka_2.12-2.2.0/logs/2

cp server.properties server-3.properties
vim server-3.properties

> broker.id=3
> listeners=PLAINTEXT://:9093
> log.dirs=/opt/kafka/kafka_2.12-2.2.0/logs/3


# 啟動服務
./bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-2.2.0/config/server-1.properties
./bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-2.2.0/config/server-2.properties
./bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-2.2.0/config/server-3.properties

測試集群

# 添加消息
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mytest1
#說明:factor:指的使我們副本的數量,這裡我們需要三個副本去對應我們的三個broker,讓他們自動選舉leader。

# 查看topic信息
./bin/kafka-topics.sh --list --zookeeper localhost:2181

# 查看詳細信息
./bin/kafka-topics.sh --describe --zookeeper localhost:2181

# 顯示內容:
Topic:mytest1 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: mytest1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
# 說明:leader:主的broker ,replicas:代表副本對應的broker id 號, Isr:代表活著的broker


# 創建消息生產者

./bin/kafka-console-producer.sh --broker-list localhost:9091,localhost:9092,localhost:9093 --topic mytest1

# 消費者消費消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic mytest1 --from-beginning


分享到:


相關文章: