建哥手把手系列之springboot快速整合kafka

1、 什麼是kafka?

建哥手把手系列之springboot快速整合kafka

Apache Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,之後成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。

kafka架構幾個重要組件:

· Topic(主題):是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。

· Producer(生產者):是能夠發佈消息到話題的任何對象。

· Broker(服務代理):已發佈的消息保存在一組服務器中,它們被稱為代理(Broker)或Kafka集群。

· Consumer(消費者):可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發佈的消息。

建哥手把手系列之springboot快速整合kafka

2、 準備工作

操作系統是centos6.9 64位,關閉防火牆,修改主機名及系統安全級別忽略,不明白的可以關注哥哥頭條號《HelloTeacher陳》自行復習。

根據服務器scala版本選擇合適的kafka版本

建哥手把手系列之springboot快速整合kafka

安裝包:kafka_2.12-1.0.0.tgz(2.12是scala版本,1.0.0是kafka版本)下載地址:

建哥手把手系列之springboot快速整合kafka

1.配置免密登錄(可選擇配置) 2.搭建zookeeper集群 (zookeeper集群安裝請參考哥哥前期文章)3.上傳kafka安裝包 4.解壓安裝包

tar -zxvf kafka_2.12-1.0.0.tgz -C /opt/kafka/

5.修改配置文件 config/server.properties

master主機配置

broker.id=0 標示符(多臺服務器標示符0,1,2,3,...依次增長)

host.name=slave1綁定的主機

log.dirs=/opt/kafka/kafka1.0/tmp/kafka-logs 數據保存的位置

log.retention.hours=168 數據的保留時間

zookeeper.connect=master:2181,slave1:2181,slave2:2181

slaver1主機配置

broker.id=1 標示符(多臺服務器標示符0,1,2,3,...依次增長)

host.name=slave2 綁定的主機

log.dirs= /opt/kafka/kafka1.0/tmp/kafka-logs 數據保存的位置

log.retention.hours=168 數據的保留時間

zookeeper.connect=master:2181,slave1:2181,slave2:2181

6.配置環境變量

vi /etc/profile

export KAFKA_HOME=/opt/kafka/kafka1.0

export PATH=$PATH:$KAFKA_HOME/bin

建哥手把手系列之springboot快速整合kafka

7.使環境變量生效 source /etc/profile

8.測試集群是否搭建成功

a.啟動zookeeper集群

b.啟動服務的命令

nohup /opt/kafka/kafka1.0/bin/kafka-server-start.sh /opt/kafka/kafka1.0/config/server.properties &

解釋:nohup是不掛斷,&是後臺運行,其實就是保證哥哥這個進程一直運行。

c.創建主題

/opt/kafka/kafka1.0/bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkad.

查看當前有哪些主題: kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

9.模擬發送數據

/opt/kafka/kafka1.0/bin/kafka-console-producer.sh --broker-list slave1:9092,slave1:9092 --topic kafka

10.消費數據:

/opt/kafka/kafka1.0/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic kafka

在模擬發送數據窗口輸入,就會看到在消費端有相應的數據

建哥手把手系列之springboot快速整合kafka

3、 創建springboot工程

建哥手把手系列之springboot快速整合kafka

pom文件

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.aaa

kafka

0.0.1-SNAPSHOT

jar

kafka

Demo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

2.0.4.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter-web

org.springframework.kafka

spring-kafka

2.1.5.RELEASE

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

創建生產者類

package com.aaa.kafka.util;

@Component

@EnableScheduling

public class KafkaProducer {

@Autowired

private KafkaTemplate kafkaTemplate;//kafka模版對象

/**

* 定時任務,每一秒執行一次

*/

@Scheduled(cron = "00/1 * * * * ?")

public void send(){

String message = UUID.randomUUID().toString();

ListenableFuture future = kafkaTemplate.send("kafka", message);

//添加回調處理,兩個參數(SuccessCallback super T> var1, FailureCallback var2),成功(successCallback),失敗(FailureCallback)

future.addCallback(o->System.out.println("send-消息發送成功:" + message),throwable->System.out.println("消息發送失敗:" + message));

}

@KafkaListener(topics = "kafka")

public void getMessage(ConsumerRecord, ?> record){

System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());

}

}

創建消費者類

package com.aaa.kafka.util;

import java.util.UUID;

@Component

public class KafkaConsumer {

//指定主題

@KafkaListener(topics = "kafka")

public void getMessage(ConsumerRecord, ?> record){

//offset是顯示當前消費了第多少條消息

System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());

}

}

測試效果:

建哥手把手系列之springboot快速整合kafka

至此測試成功,需要源碼的朋友請私信我,發送kafka,喜歡的朋友請關注我頭條號!


分享到:


相關文章: