12.30 大數據 Kafka 知識 要點

大數據 Kafka 知識 要點

信息時代

kafka消息服務器使用場景 90% 緩衝 消息通訊組件

1、jd訂單系統

2、解決數據的併發寫

Topic :消息分類,以日誌分區存儲,每一個分區都會有leader和Follower

Record:組成Topic的基本單元,一則消息 key value ts

broker:運行Kafka服務-broker 一臺機器只運行一個Broker

Producer:生產消息

Consumer:消費消息

ConsumerGroup:把消費者歸類,同一Group的消費者默認是對Topic分區消息實現負載均衡(Fair Shard)

不同Group之間相互不影響,消息是以廣播的形式發佈。

大數據 Kafka 知識 要點

Kafka消息服務搭建:

1.解壓到/usr目錄

[root@CentOS ~]# tar zxf kafka_2.11-0.11.0.0.tgz -C /usr/

2.創建軟連接(可選)

[root@CentOS ~]# ln -s /usr/kafka_2.11-0.11.0.0/ kafka

3.啟動zookeeper (對broker監控,記錄集群的元數據信息)

4.搭建kafka的偽分佈式

1.創建三個kafka的配置文件

[root@CentOS config]# cp server.properties server-1.properties

[root@CentOS config]# cp server.properties server-2.properties

[root@CentOS config]# cp server.properties server-3.properties

修改配置文件:

server-1.peroperties

broker.id=0 -- 一個kafka實例唯一標示必須唯一

delete.topic.enable=true --允許用戶刪除topic

listeners=PLAINTEXT://CentOS:9092 --因為kafka服務是通過TCP/IP實現

log.dirs=/tmp/kafka-logs-1 --配置kafka消息存儲路徑

zookeeper.connect=CentOS:2181

server-2.peroperties

broker.id=1 -- 一個kafka實例唯一標示必須唯一

delete.topic.enable=true --允許用戶刪除topic

listeners=PLAINTEXT://CentOS:9093 --因為kafka服務是通過TCP/IP實現

log.dirs=/tmp/kafka-logs-2 --配置kafka消息存儲路徑

zookeeper.connect=CentOS:2181

server-3.peroperties

broker.id=3 -- 一個kafka實例唯一標示必須唯一

delete.topic.enable=true --允許用戶刪除topic

listeners=PLAINTEXT://CentOS:9094 --因為kafka服務是通過TCP/IP實現

log.dirs=/tmp/kafka-logs-3 --配置kafka消息存儲路徑

zookeeper.connect=CentOS:2181

啟動Kafka

start.sh

echo 'start kafka cluster..'

for i in {1..3}

do

/usr/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /usr/kafka_2.11-0.11.0.0/config/server-$i.properties 1>/dev/null 2>&1 &

sleep 5

done

shutdown.sh

for i in `jps|grep Kafka | awk '{print $1}'`

do

echo 'kill kafka '$i

kill -9 $i

done

Topic管理

創建Topic

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --create --topic topic01 --partitions 3 --replication-factor 2

查看所有Topic

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --list

查看Tpoic詳情

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --describe --topic topic01

Topic:topic01PartitionCount:3ReplicationFactor:2Configs:

Topic: topic01Partition: 0Leader: 2Replicas: 2,0Isr: 2,0

Topic: topic01Partition: 1Leader: 0Replicas: 0,1Isr: 0,1

Topic: topic01Partition: 2Leader: 1Replicas: 1,2Isr: 1,2

修改分區

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --alter --topic topic01 --partitions 4

刪除

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --delete --topic topic01

Topic topic01 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

訂閱消息:

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning

生產消息

[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01

介紹:http://kafka.apache.org/documentation/

Java集成Kafka

<dependency>

<groupid>org.apache.kafka/<groupid>

<artifactid>kafka-clients/<artifactid>

<version>0.11.0.0/<version>

大數據 Kafka 知識 要點

1.消費方offset管理

①:手動一定offset

kafkaConsumer.seek(new TopicPartition("t_user_topic",part),offset);

②kafkaConsumer.commitAsync();

提交指定分區的offset

①props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

②將所有的消息的消費放置在try{}catch(Exception e)

try{

//消費消息

//提交offset+1

}catch(Exception e){

//seek到上一次消費的消息

}

提交到指定分區

Map<topicpartition> offsetMetaMap=new HashMap<topicpartition>() ;/<topicpartition>/<topicpartition>

OffsetAndMetadata offsetMeta=new OffsetAndMetadata(offset+1);//必須必當前offset大1

offsetMetaMap.put(new TopicPartition("topic01",分區號),offsetMeta);

kafkaConsumer.commitAsync(offsetMetaMap,null);

2、如何使用Kafka發送複雜消息

Deserializer、Serializer接口

3.如何實現Partition數據的負載均衡

①生產方:實現 Partitioner

②消費方:

1.subscribe fair shared 均分策略

2.assign 指定指定分區


分享到:


相關文章: