1、 什麼是kafka?
Apache Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,之後成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。
kafka架構幾個重要組件:
· Topic(主題):是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。
· Producer(生產者):是能夠發佈消息到話題的任何對象。
· Broker(服務代理):已發佈的消息保存在一組服務器中,它們被稱為代理(Broker)或Kafka集群。
· Consumer(消費者):可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發佈的消息。
2、 準備工作
操作系統是centos6.9 64位,關閉防火牆,修改主機名及系統安全級別忽略,不明白的可以關注哥哥頭條號《HelloTeacher陳》自行復習。
根據服務器scala版本選擇合適的kafka版本
安裝包:kafka_2.12-1.0.0.tgz(2.12是scala版本,1.0.0是kafka版本)下載地址:
1.配置免密登錄(可選擇配置) 2.搭建zookeeper集群 (zookeeper集群安裝請參考哥哥前期文章)3.上傳kafka安裝包 4.解壓安裝包
tar -zxvf kafka_2.12-1.0.0.tgz -C /opt/kafka/
5.修改配置文件 config/server.properties
master主機配置
broker.id=0 標示符(多臺服務器標示符0,1,2,3,...依次增長)
host.name=slave1綁定的主機
log.dirs=/opt/kafka/kafka1.0/tmp/kafka-logs 數據保存的位置
log.retention.hours=168 數據的保留時間
zookeeper.connect=master:2181,slave1:2181,slave2:2181
slaver1主機配置
broker.id=1 標示符(多臺服務器標示符0,1,2,3,...依次增長)
host.name=slave2 綁定的主機
log.dirs= /opt/kafka/kafka1.0/tmp/kafka-logs 數據保存的位置
log.retention.hours=168 數據的保留時間
zookeeper.connect=master:2181,slave1:2181,slave2:2181
6.配置環境變量
vi /etc/profile
export KAFKA_HOME=/opt/kafka/kafka1.0
export PATH=$PATH:$KAFKA_HOME/bin
7.使環境變量生效 source /etc/profile
8.測試集群是否搭建成功
a.啟動zookeeper集群
b.啟動服務的命令
nohup /opt/kafka/kafka1.0/bin/kafka-server-start.sh /opt/kafka/kafka1.0/config/server.properties &
解釋:nohup是不掛斷,&是後臺運行,其實就是保證哥哥這個進程一直運行。
c.創建主題
/opt/kafka/kafka1.0/bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkad.
查看當前有哪些主題: kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
9.模擬發送數據
/opt/kafka/kafka1.0/bin/kafka-console-producer.sh --broker-list slave1:9092,slave1:9092 --topic kafka
10.消費數據:
/opt/kafka/kafka1.0/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic kafka
在模擬發送數據窗口輸入,就會看到在消費端有相應的數據
3、 創建springboot工程
pom文件
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
創建生產者類
package com.aaa.kafka.util;
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;//kafka模版對象
/**
* 定時任務,每一秒執行一次
*/
@Scheduled(cron = "00/1 * * * * ?")
public void send(){
String message = UUID.randomUUID().toString();
ListenableFuture future = kafkaTemplate.send("kafka", message);
//添加回調處理,兩個參數(SuccessCallback super T> var1, FailureCallback var2),成功(successCallback),失敗(FailureCallback)
future.addCallback(o->System.out.println("send-消息發送成功:" + message),throwable->System.out.println("消息發送失敗:" + message));
}
@KafkaListener(topics = "kafka")
public void getMessage(ConsumerRecord, ?> record){
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
創建消費者類
package com.aaa.kafka.util;
import java.util.UUID;
@Component
public class KafkaConsumer {
//指定主題
@KafkaListener(topics = "kafka")
public void getMessage(ConsumerRecord, ?> record){
//offset是顯示當前消費了第多少條消息
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
測試效果:
至此測試成功,需要源碼的朋友請私信我,發送kafka,喜歡的朋友請關注我頭條號!
閱讀更多 HelloTeacher陳 的文章