分享阿里 P8 高級架構師吐血總結的 《Java 核心知識體系&面試資料.pdf》
據說是阿里 P8 級高級架構師吐血總結的一份 Java 核心知識.pdf, 內容覆蓋很廣,Java 核心基礎、Java 多線程、高併發、Spring、微服務、Netty 與 RPC、Zookeeper、Kafka、RabbitMQ、Habase、設計模式、負載均衡、分佈式緩存、Hadoop、Spark、Storm、雲計算等。
另外,附送 100G 學習、面試視頻文檔喲~
獲取方式:【關注 + 轉發】後,私信我,回覆關鍵字【資源】,即可免費無套路獲取哦~
以下是資源的部分目錄以及內容截圖:
重要的事再說一遍,獲取方式:【關注 + 轉發】後,私信我,回覆關鍵字【資源】,即可免費無套路獲取哦~
正文開始
目錄
- 一、什麼是 Kafka ?
- 二、為什麼要用 Kafka ?
- 三、Kafka 環境安裝
- 四、Spring Boot 2.x 整合 Kafka
- 五、總結
- 六、GitHub 源碼地址
什麼是 Kafka?
Kafka 是 Apache 基金會開源的一個分佈式發佈 - 訂閱消息中間件,流處理平臺。它起源於 LinkedIn,由 Scala 和 Java兩種語言編寫而成。於 2011 年成為 Apache 項目,2012 成為 Apache 基金會下頂級項目。
Kafka 專為分佈式高吞吐系統而設計。相比較其他消息中間件,如 RabbitMq 等,Kafka 具有更好的吞吐量,內置分區,複製和固有的容錯能力,使得它非常適合應用在大數據領域。另外,Kafka 還支持離線、在線消費消息。
為什麼要用 Kafka
- 低延遲 - Kafka 支持低延遲消息傳遞,速度極快,能達到 200w 寫/秒;
- 高性能 - Kafka對於消息的發佈、訂閱都具有高吞吐量。即使存儲了 TB 級的消息,依然能夠保證穩定的性能;
- 可靠性 - Kafka 是分佈式,分區,複製和容錯的,保證零停機和零數據丟失。
- 可拓展性 - Kafka 支持集群水平拓展。
- 耐用性 - Kafka 使用"分佈式提交日誌",消息能夠快速的持久化的磁盤上。
Kafka 環境安裝
接下來,小哈為大家演示一下,在 Linux 系統中,採用最簡單的單機安裝方式, 因為本文著重點還是介紹 Spring Boot 2.x 快速集成整合 Kafka.
下載 Kafka
訪問 Kafka 官網 http://kafka.apache.org/downloads,下載 tgz 包, 這裡演示版本為最新的 2.3.0 版本。
解壓,進入目錄
下載下來過後,放置到指定位置,執行命令解壓:
tar -zxvf kafka_2.11-2.3.0.tgz
解壓完成後,進入 Kafka 目錄下:
cd kafka_2.11-2.3.0
啟動 zookeeper
通過 bin 目錄下的 zookeeper-server-start.sh 啟動腳本,來啟動 zk 單節點實例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
啟動 Kafka
通過 bin 目錄下的 kafka-server-start.sh 來啟動 :
bin/kafka-server-start.sh config/server.properties
注意:Kafka 默認使用 9092 端口,注意關閉防火牆,阿里雲服務器的話,記得添加安全組。
Spring Boot 2.x 開始整合
新建一個 Spring Boot 2.x Web 工程。
項目結構
添加 maven 依賴
小哈這裡完整的 maven 依賴如下:
<project> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0/<modelversion>
<parent>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-parent/<artifactid>
<version>2.1.2.RELEASE/<version>
<relativepath>
/<parent>
<groupid>site.exception/<groupid>
<artifactid>spring-boot-kafka/<artifactid>
<version>0.0.1-SNAPSHOT/<version>
<name>spring-boot-kafka/<name>
<description>Demo project for Spring Boot/<description>
<properties>
<java.version>1.8/<java.version>
/<properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka-test/<artifactid>
<scope>test/<scope>
/<dependency>
<dependency>
<groupid>org.projectlombok/<groupid>
<artifactid>lombok/<artifactid>
<optional>true/<optional>
/<dependency>
<dependency>
<groupid>com.alibaba/<groupid>
<artifactid>fastjson/<artifactid>
<version>1.2.58/<version>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-devtools/<artifactid>
<scope>runtime/<scope>
<optional>true/<optional>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
<scope>test/<scope>
/<dependency>
/<dependencies>
<build>
<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
/<plugin>
/<plugins>
/<build>
/<project>
添加 kafka 配置
修改 application.yml 文件,添加 kafka 相關配置:
spring:
kafka:
# 指定 kafka 地址,我這裡在本地,直接就 localhost, 若外網地址,注意修改【PS: 可以指定多個】
bootstrap-servers: localhost:9092
consumer:
# 指定 group_id
group-id: group_id
auto-offset-reset: earliest
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
關於 auto-offset-reset
auto.offset.reset 配置有3個值可以設置,分別如下:
- earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
- latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
- none: topic各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;
默認建議用 earliest, 設置該參數後 kafka出錯後重啟,找到未消費的offset可以繼續消費。
而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啟kafka,這個設置會從最新的offset開始消費, 中間出問題的那些就不管了。
none 這個設置沒有用過,兼容性太差,經常出問題。
新增一個訂單類
模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其他服務消費:
/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription 訂單實體類
**/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 訂單id
*/
private long orderId;
/**
* 訂單號
*/
private String orderNum;
/**
* 訂單創建時間
*/
private LocalDateTime createTime;
}
添加一個消息發佈者
新建一個 KafkaProvider 消息提供者類,源碼如下:
/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription 消息提供者
**/
@Component
@Slf4j
public class KafkaProvider {
/**
* 消息 TOPIC
*/
private static final String TOPIC = "xiaoha";
@Autowired
private KafkaTemplate<string> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 構建一個訂單類
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 發送消息,訂單類的 json 作為消息體
ListenableFuture<sendresult>> future =
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
// 監聽回調
future.addCallback(new ListenableFutureCallback<sendresult>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("## Send message fail ...");
}
@Override
public void onSuccess(SendResult<string> result) {
log.info("## Send message success ...");
}
});
}
}
/<string>/<sendresult>/<sendresult>/<string>
添加一個消息消費者
消息發送出去了,當然就需要一個消費者,消費者拿到消息後,再做相關的業務處理,這裡,小哈僅僅是打印消息體。
添加 KafkaConsumer 消費者類:
/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription 消息消費者
**/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "xiaoha", groupId = "group_id")
public void consume(String message) {
log.info("## consume message: {}", message);
}
}
通過 @KafkaListener註解,我們可以指定需要監聽的 topic 以及 groupId, 注意,這裡的 topics 是個數組,意味著我們可以指定多個 topic,如:@KafkaListener(topics={"xiaoha","xiaoha2"},groupId="group_id")。
注意:消息發佈者的 TOPIC 需要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。
單元測試
新建單元測試,功能測試消息發佈,以及消費。
/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription
**/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
// 發送 1000 個消息
for (int i = 0; i < 1000; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}
發送 1000 個消息,看消息是否能夠被正常發佈與消費,控制檯日誌如下:
可以發現,1000 個消息被成功發送,且被正常消費。
我們再驗證下 Kafka 的 topic 列表,看 xiaoha 這個 topic 是否正常被創建, 執行 bin 目錄下查看 topic 列表的 kafka-topics.sh 腳本:
bin/kafka-topics.sh --list --zookeeper localhost:2181
好了,大功告成!
總結
小哈今天主要和大家分享了,如何安裝單機版的 kafka 環境、如何在 Spring Boot 2.x 中快速集成消息中間件 Kafka,以及演示了相關示例代碼來發布消息、消費消息,希望大家看完過後有所收穫,下期見!
閱讀更多 小哈學Java 的文章