04.22 RocketMQ Java應用—NormalProducer(普通)模式

之前的文章《阿里消息隊列使用》我們介紹瞭如何搭建RocketMQ的單Master環境,其它集群環境的搭建只要複製master,然後修改配置即可。這篇我們介紹如何在Java應用中使用RocketMQ來實現生產、消費者,快速體驗RocketMQ。

1、RocketMQ的模式概述

RocketMQ提供了3種模式的Producer,2種模式的Consumer模式。

Producer模式:NormalProducer(普通)、OrderProducer(順序)、TransactionProducer(事務)。

Consumer模式:集群消費 AND 廣播消費,默認採用集群消費。集群消費也就是消息的負載均衡消費;廣播消息,類似於ActiveMQ中的發佈訂閱模式,消息會發給Consume Group中的每一個消費者進行消費。

2、RocketMQ生產普通模式

我們用Maven創建RocketMQ的生產和消費應用環境,用Java代碼實現簡單的生產消費業務。

Maven搭建,pom中引入rocketmq-all和rocketmq-client

RocketMQ Java應用—NormalProducer(普通)模式

生產者

RocketMQ Java應用—NormalProducer(普通)模式

消費者

RocketMQ Java應用—NormalProducer(普通)模式

  • 無論生產者、消費者都必須給出GroupName,而且具有唯一性!

  • 生產到哪個Topic的哪個Tag下,消費者也是從Topic的哪個Tag進行消費,可見這個Tag有點類似於JMS Selector機制,即實現消息的過濾。

  • 生產者、消費者需要設置NameServer地址。

  • 這裡,採用的是Consumer Push的方式,即設置Listener機制回調,相當於開啟了一個線程。以後為大家介紹Consumer Pull的方式。

我們看一下運行結果。

生產者

RocketMQ Java應用—NormalProducer(普通)模式

我們這裡採用的是單Master節點,在生產者中沒有看到brokerName的不同變化,在多Master節點中,會實現自動負載,將消息發送到不同的broker。


消費者

RocketMQ Java應用—NormalProducer(普通)模式

這裡消費消息是沒有什麼順序的

在多Master模式中,如果某個Master進程掛了,顯然這臺broker將不可用,上面的消息也將無法消費,要知道開源版本的RocketMQ是沒有提供切換程序,來自動恢復故障的,因此在實際開發中,我們一般提供一個監聽程序,用於監控Master的狀態。

在ActiveMQ中,生產消息的時候會提供是否持久化的選擇,但是對於RocketMQ而言,消息是一定會被持久化的!

上面的消費者採用的是Push Consumer的方式,那麼監聽的Listener中的消息List到底是多少條呢?雖然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),實際上即使設置了批量的條數,但是注意了,是最大是10,並不意味著每次batch的都是10,只有在消息有擠壓的情況下才有可能。而且Push Consumer的最佳實踐方式就是一條條的消費,如果需要batch,可以使用Pull Consumer。

務必保證先啟動消費者進行Topic訂閱,然後在啟動生產者進行生產(否則極有可能導致消息的重複消費,重複消費,重複消費!重要的事情說三遍!關於消息的重複問題後續給大家介紹~)。而且在實際開發中,有時候不會批量的處理消息,而是原子性的,單線程的去一條一條的處理消息,這樣就是實時的在處理消息。(批量的處理海量的消息,可以考慮Kafka)

3、消息失敗重試機制

消息失敗,無非涉及到2端:從生產者端發往MQ的失敗;消費者端從MQ消費消息的失敗。

在業務應用場景中,有時網絡抖動導致生產者發送消息到MQ失敗。下面的框紅的代碼邏輯是如果該條消息在1S內沒有發送成功,那麼重試3次。

RocketMQ Java應用—NormalProducer(普通)模式

消費者端的失敗,分為2種情況,一個是timeout,一個是exception。timeout,比如由於網絡原因導致消息壓根就沒有從MQ到消費者上,在RocketMQ內部會不斷的嘗試發送這條消息,直至發送成功為止!(比如集群中一個broker失敗,就嘗試另一個broker)。exception,消息正常的到了消費者,結果消費者發生異常,處理失敗了。這裡涉及到一些問題,需要我們思考下,比如,消費者消費消息的狀態有哪些定義?如果失敗,MQ將採取什麼策略進行重試?假設一次性批量PUSH了10條,其中某條數據消費異常,那麼消息重試是10條呢,還是1條呢?而且在重試的過程中,需要保證不重複消費嗎?

RocketMQ Java應用—NormalProducer(普通)模式

消息消費的狀態,有2種,一個是成功(CONSUME_SUCCESS),一個是失敗&稍後重試(RECONSUME_LATER)

RocketMQ Java應用—NormalProducer(普通)模式

在啟動broker的過程中,可以觀察下日誌,你會發現RECONSUME_LATER的策略。

如果消費失敗,那麼1S後再次消費,如果失敗,那麼5S後,再次消費,......直至2H後如果消費還失敗,那麼該條消息就會終止發送給消費者了!

RocketMQ為我們提供了這麼多次數的失敗重試,但是在實際中也許我們並不需要這麼多重試,比如重試3次,還沒有成功,我們希望把這條消息存儲起來並採用另一種方式處理,而且希望RocketMQ不要在重試呢,因為重試解決不了問題了!這該如何做呢?

我們先來看一下一條消息MessageExt對象的輸出:

RocketMQ Java應用—NormalProducer(普通)模式

注意到reconsumeTimes屬性,這個屬性就代表消息重試的次數!來看一段代碼:

RocketMQ Java應用—NormalProducer(普通)模式

注意了,對於消費消息而言,存在2種指定的狀態(成功 OR 失敗重試),如果一條消息在消費端處理沒有返回這2個狀態,那麼相當於這條消息沒有達到消費者,勢必會再次發送給消費者!也即是消息的處理必須有返回值,否則就進行重發。

4、消息負載均衡及高效的水平擴展機制

RocketMQ Java應用—NormalProducer(普通)模式

對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/......的機制,這意味著我們將非常方便的通過加機器來實現水平擴展!

我們考慮一下這種情況:比如C2發生了重啟,一條消息發往C3進行消費,但是這條消息的處理需要0.1S,而此時C2剛好完成重啟,那麼C2是否可能會收到這條消息呢?答案是肯定的,也就是consume broker的重啟,或者水平擴容,或者不遵守先訂閱後生產消息,都可能導致消息的重複消費!

至於消息分發到C1/C2/C3,其實也是可以設置策略的。

RocketMQ Java應用—NormalProducer(普通)模式

5、集群消費 AND 廣播消費

RocketMQ的消費方式有2種,在默認情況下,就是集群消費,也就是上面提及的消息的負載均衡消費。另一種消費模式,是廣播消費。廣播消費,類似於ActiveMQ中的發佈訂閱模式,消息會發給Consume Group中的每一個消費者進行消費。

代碼實現的時候在消費者代碼中設置廣播模式即可。

RocketMQ Java應用—NormalProducer(普通)模式

友情提示:消費者的集群消費、廣播消費與生產者的普通、順序、事務模式是笛卡爾積的關係。


分享到:


相關文章: