輕量級分佈式消息隊列XXL-MQ實戰

今天體驗一下xxl-mq,這個mq目前使用不多,功能挺全的,作為學習mq感覺比較好!

1.介紹

XXL-MQ是一款輕量級分佈式消息隊列,擁有 "水平擴展、高可用、海量數據堆積、單機TPS過10萬、毫秒級投遞" 等特性, 支持 "併發消息、串行消息、廣播消息、延遲消息、事務消費、失敗重試、超時控制" 等消息特性。現已開放源代碼,開箱即用。

1.2 特性

  • 1、簡單易用: 一行代碼即可發佈一條消息; 一行註解即可訂閱一個消息主題;
  • 2、輕量級: 部署簡單,不依賴第三方服務,一分鐘上手;
  • 3、水平擴展:消息中心支持無限水平擴展,這裡的水平擴展包括兩方面:消息生產能力、消息消費能力;通過集群擴展線性提升消息吞吐能力;
  • 4、高可用:消息中心能夠忍受部分示例失效,不影響整個集群的可用性。通過內置註冊中心可以實現秒級摘除失效節點,消息服務動態轉移;
  • 5、消息持久化:全部消息持久化存儲,消息中心支持通過配置選擇是否清理過期消息。
  • 6、強數據安全:消息數據存儲在DB中,可事務保障數據安全,防止消息數據丟失;
  • 7、海量數據堆積:消息數據存儲在DB中,原生兼容支持 "MySQL、TIDB" 兩種存儲方式,前者支持千萬級消息堆積,後者支持百億級別消息堆積(TIDB理論上無上限);
  • 8、單機TPS過10W:單機TPS受限於DB存儲方式,選型 "MySQL" 時單機TPS過萬,選型 "TIDB" 時單機TPS過10萬;
  • 9、毫秒級投遞延遲:消息中心與客戶端通過RPC的方式進行消息通訊,毫秒級延時;
  • 10、多種消息模式:
  • 並行消息:消息平均分配在該主題在線消費者,分片方式並行消費;適用於吞吐量較大的消息場景,如郵件發送、短信發送等業務邏輯
  • 串行消息:消息固定分配給該主題在線消費者中其中一個,FIFO方式串行消費;適用於嚴格限制併發的消息場景,如秒殺、搶單等排隊業務邏輯;
  • 廣播消息:消息將會廣播發送給該主題在線消費者分組,全部分組都會消費該消息,但是一個分組下只會消費一次;適用於廣播場景,如廣播更新緩存等
  • 11、延時消息: 支持設置消息的延遲生效時間, 到達設置的生效時間時該消息才會被消費;適用於延時消費場景,如訂單超時取消等;
  • 12、事務性: 消費者開啟事務開關後,消息事務性保證只會成功執行一次;
  • 13、失敗重試: 支持設置消息的重試次數, 在消息執行失敗後將會按照設置的值進行消息重試執行,直至重試次數耗盡或者執行成功;
  • 14、超時控制: 支持自定義消息超時時間,消息消費超時將會主動中斷;
  • 15、消息可見: 系統中每一條消息可通過Web界面在線查看,甚至支持編輯消息內容和消息狀態;
  • 16、消息可追蹤: 支持追蹤每一條消息的執行路徑, 便於排查業務問題;
  • 17、消息失敗告警:支持以Topic粒度監控消息,存在失敗消息時主動推送告警郵件;默認提供郵件方式失敗告警,同時預留擴展接口,可方面的擴展短信、釘釘等告警方式;
  • 18、容器化:提供官方docker鏡像,並實時更新推送dockerhub,進一步實現產品開箱即用;
  • 19、訪問令牌(accessToken):為提升系統安全性,消息中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;

2.搭建環境

docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.199.101:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8 --spring.datasource.username=root --spring.datasource.password=root --xxl-mq.rpc.remoting.ip=192.168.199.101" -p 8080:8080 -p 7080:7080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin:1.2.2

訪問:http://192.168.199.101:8080/xxl-mq-admin/

用戶名:admin

密碼:123456

輕量級分佈式消息隊列XXL-MQ實戰

Producer配置:

2.引入jar

<dependency>
<groupid>com.xuxueli/<groupid>
<artifactid>xxl-mq-client/<artifactid>
<version>1.2.2/<version>
/<dependency>

3.添加配置文件

# xxl-mq, admin conf
xxl.mq.admin.address=http://192.168.199.101:8080/xxl-mq-admin
### xxl-mq, access token
xxl.mq.accessToken=

讓springboot讀取該配置文件

@PropertySource("xxlmq.properties")
輕量級分佈式消息隊列XXL-MQ實戰

4.添加XxlMqConf

package com.lxf.eye.agent.common;
import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class XxlMqConf {
// ---------------------- param ----------------------
@Value("${xxl.mq.admin.address}")
private String adminAddress;
@Value("${xxl.mq.accessToken}")
private String accessToken;
@Bean
public XxlMqSpringClientFactory getXxlMqConsumer(){
XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();
xxlMqSpringClientFactory.setAdminAddress(adminAddress);
xxlMqSpringClientFactory.setAccessToken(accessToken);
return xxlMqSpringClientFactory;
}
}

5.生成消息

XxlMqProducer.produce(new XxlMqMessage(topic, canalMysqlEntry.toString()));
輕量級分佈式消息隊列XXL-MQ實戰

Comsumer配置

6.引入jar

<dependency>
<groupid>com.xuxueli/<groupid>
<artifactid>xxl-mq-client/<artifactid>
<version>1.2.2/<version>
/<dependency>

7.添加配置文件

# xxl-mq, admin conf
xxl.mq.admin.address=http://192.168.199.101:8080/xxl-mq-admin
### xxl-mq, access token
xxl.mq.accessToken=

讓springboot讀取該配置文件

@PropertySource("xxlmq.properties")
輕量級分佈式消息隊列XXL-MQ實戰

8.添加XxlMqConf

package com.lxf.eye.agent.common;
import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class XxlMqConf {
// ---------------------- param ----------------------
@Value("${xxl.mq.admin.address}")
private String adminAddress;
@Value("${xxl.mq.accessToken}")
private String accessToken;
@Bean
public XxlMqSpringClientFactory getXxlMqConsumer(){
XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();
xxlMqSpringClientFactory.setAdminAddress(adminAddress);
xxlMqSpringClientFactory.setAccessToken(accessToken);
return xxlMqSpringClientFactory;
}
}

9.消費消息

@MqConsumer(topic = "canal")
@Service
public class XXLMqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(XXLMqComsumer.class);
@Override
public MqResult consume(String data) throws Exception {
logger.info("[XXLMqComsumer] 消費一條消息:{}", data);
return MqResult.SUCCESS;
}
}

10.測試

輕量級分佈式消息隊列XXL-MQ實戰

後面再試用更多功能.


分享到:


相關文章: