最近一直在學習RabbitMQ,但是不知如何在實際業務中擼出它的功效,最近剛好看到一篇相關案例,有一些心得,想和小夥伴們分享一下!
一、先來一張 RabbitMQ 流程圖
本文內容主要圍繞這個流程圖展開,利用 RabbitMQ 消息隊列,實現生產者與消費者解耦,所以有必要先貼出來,涵蓋了 RabbitMQ 很多知識點,如:
- 消息發送確認機制
- 消費確認機制
- 消息的重新投遞
- 消費冪等性, 等等
二、實現思路
- 1.在虛擬機創建一個CentOS7上,並安裝 RabbitMQ
- 2.開放QQ郵箱或者其它郵箱授權碼,用於發送郵件
- 3.創建郵件發送項目並編寫代碼
- 4.發送郵件測試
- 5.消息發送失敗處理
三、RabbitMQ安裝
RabbitMQ 基於 erlang 進行通信,相比其它的軟件,安裝有些麻煩,不過本例採用rpm方式安裝,任何新手都可以完成安裝,過程如下!
3.1、安裝前命令準備
輸入如下命令,完成安裝前的環境準備。
<code>yum install lsof build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim/<code>
3.2、下載 RabbitMQ、erlang、socat 的安裝包
本次下載的是RabbitMQ-3.6.5版本,採用rpm一鍵安裝,適合新手直接上手。
先創建一個rabbitmq目錄,本例的目錄路徑為/usr/app/rabbitmq,然後在目錄下執行如下命令,下載安裝包!
- 下載erlang
<code>wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm/<code>
- 下載socat
<code>wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm/<code>
- 下載rabbitMQ
<code>wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm/<code>
最終目錄文件如下:
3.3、安裝軟件包
下載完之後,按順序依次安裝軟件包,這個很重要哦~
- 安裝erlang
<code>rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm/<code>
- 安裝socat
<code>rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm/<code>
- 安裝rabbitmq
<code>rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm/<code>
安裝完成之後,修改rabbitmq的配置,默認配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。
<code>vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app/<code>
修改loopback_users節點的值!
最後只需通過如下命令,啟動服務即可!
<code>rabbitmq-server start &/<code>
運行腳本之後,如果報錯,例如下圖!
解決辦法如下:
<code>vim /etc/rabbitmq/rabbitmq-env.conf/<code>
在文件裡添加一行,如下配置!
<code>NODENAME=rabbit@localhost/<code>
然後,再保存!再次以下命令啟動服務!
<code>rabbitmq-server start &/<code>
通過如下命令,查詢服務是否啟動成功!
<code>lsof -i:5672/<code>
如果出現5672已經被監聽,說明已經啟動成功!
3.4、啟動可視化的管控臺
輸入如下命令,啟動控制檯!
<code>rabbitmq-plugins enable rabbitmq_management/<code>
用瀏覽器打開http://ip:15672,這裡的ip就是 CentOS 系統的 ip,結果如下:
賬號、密碼,默認為guest,如果出現無法訪問,檢測防火牆是否開啟,如果開啟將其關閉即可!
登錄之後的監控平臺,界面如下:
獲取郵箱授權碼的目的,主要是為了通過代碼進行發送郵件,例如 QQ 郵箱授權碼獲取方式,如下圖:
點擊【開啟】按鈕,然後發送短信,即可獲取授權碼,該授權碼就是配置文件spring.mail.password需要的密碼!
五、項目介紹
- springboot版本:2.1.5.RELEASE
- RabbitMQ版本:3.6.5
- SendMailUtil:發送郵件工具類
- ProduceServiceImpl:生產者,發送消息
- ConsumerMailService:消費者,消費消息,發送郵件
六、代碼實現
6.1、創建項目
在 IDEA 下創建一個名稱為smail的 Springboot 項目,pom文件中加入amqp和mail。
<code><dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
<scope>test/<scope>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-devtools/<artifactid>
<optional>true/<optional>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-mail/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-amqp/<artifactid>
/<dependency>
<dependency>
<groupid>org.apache.commons/<groupid>
<artifactid>commons-lang3/<artifactid>
<version>3.4/<version>
/<dependency>
<dependency>
<groupid>org.projectlombok/<groupid>
<artifactid>lombok/<artifactid>
<version>1.16.10/<version>
/<dependency>
/<dependencies>/<code>
6.2、配置rabbitMQ、mail
在application.properties文件中,配置amqp和mail!
<code>#rabbitmq
spring.rabbitmq.host=192.168.0.103
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開啟confirms回調 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 開啟returnedMessage回調 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 設置手動確認(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
spring.mail.default-encoding=UTF-8
spring.mail.host=smtp.qq.com
[email protected]
spring.mail.password=獲取的郵箱授權碼
[email protected]
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true/<code>
其中,spring.mail.password第四步中獲取的授權碼,同時username和from要一致!
6.3、RabbitConfig配置類
<code>@Configuration
@Slf4j
public class RabbitConfig {
// 發送郵件
public static final String MAIL_QUEUE_NAME = "mail.queue";
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
// 消息是否成功發送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功發送到Exchange");
} else {
log.info("消息發送到Exchange失敗, {}, cause: {}", correlationData, cause);
}
});
// 觸發setReturnCallback回調必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發回調
rabbitTemplate.setMandatory(true);
// 消息是否從Exchange路由到Queue, 注意: 這是一個失敗回調, 只有消息從Exchange路由到Queue失敗才會回調這個方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue mailQueue() {
return new Queue(MAIL_QUEUE_NAME, true);
}
@Bean
public DirectExchange mailExchange() {
return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
}
@Bean
public Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
}
}/<code>
6.4、Mail 郵件實體類
<code>@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Mail {
@Pattern(regexp = "^([a-z0-9A-Z]+[-|\\\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\\\.)+[a-zA-Z]{2,}$", message = "郵箱格式不正確")
private String to;
@NotBlank(message = "標題不能為空")
private String title;
@NotBlank(message = "正文不能為空")
private String content;
private String msgId;// 消息id
}/<code>
6.5、SendMailUtil郵件發送類
<code>@Component
@Slf4j
public class SendMailUtil {
@Value("${spring.mail.from}")
private String from;
@Autowired
private JavaMailSender mailSender;
/**
* 發送簡單郵件
*
* @param mail
*/
public boolean send(Mail mail) {
String to = mail.getTo();// 目標郵箱
String title = mail.getTitle();// 郵件標題
String content = mail.getContent();// 郵件正文
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(title);
message.setText(content);
try {
mailSender.send(message);
log.info("郵件發送成功");
return true;
} catch (MailException e) {
log.error("郵件發送失敗, to: {}, title: {}", to, title, e);
return false;
}
}
}/<code>
6.6、ProduceServiceImpl 生產者類
<code>@Service
public class ProduceServiceImpl implements ProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public boolean send(Mail mail) {
//創建uuid
String msgId = UUID.randomUUID().toString().replaceAll("-", "");
mail.setMsgId(msgId);
//發送消息到rabbitMQ
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);
return true;
}
}/<code>
6.7、ConsumerMailService 消費者類
<code>@Component
@Slf4j
public class ConsumerMailService {
@Autowired
private SendMailUtil sendMailUtil;
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
public void consume(Message message, Channel channel) throws IOException {
//將消息轉化為對象
String str = new String(message.getBody());
Mail mail = JsonUtil.strToObj(str, Mail.class);
log.info("收到消息: {}", mail.toString());
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
boolean success = sendMailUtil.send(mail);
if (success) {
channel.basicAck(tag, false);// 消費確認
} else {
channel.basicNack(tag, false, true);
}
}
}/<code>
6.8、TestController 控制層類
<code>@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
@Autowired
private ProduceService testService;
@PostMapping("send")
public boolean sendMail(Mail mail) {
return testService.send(mail);
}
}/<code>
七、測試服務
啟動 SpringBoot 服務之後,用 postman 模擬請求接口。
查看控制檯信息。
查詢接受者郵件信息。
郵件發送成功!
八、消息發送失敗處理
雖然,上面案例可以成功的實現消息的發送,但是上面的流程很脆弱,例如:rabbitMQ 突然蹦了、郵件發送失敗了、重啟 rabbitMQ 服務器出現消息重複消費,應該怎處理呢?
很顯然,我們需要對原有的邏輯進行升級改造,因此我們需要引入數據庫來記錄消息的發送情況。
8.1、創建消息投遞日誌表
<code>CREATE TABLE `msg_log` (
`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一標識',
`msg` text COMMENT '消息體, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交換機',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由鍵',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態: 0投遞中 1投遞成功 2投遞失敗 3已消費',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時間',
`create_time` datetime DEFAULT NULL COMMENT '創建時間',
`update_time` datetime DEFAULT NULL COMMENT '更新時間',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投遞日誌';/<code>
8.2、編寫 MsgLog 相關服務類
<code>public interface MsgLogService {
/**
* 插入消息日誌
* @param msgLog
*/
void insert(MsgLog msgLog);
/**
* 更新消息狀態
* @param msgId
* @param status
*/
void updateStatus(String msgId, Integer status);
/**
* 查詢消息
* @param msgId
* @return
*/
MsgLog selectByMsgId(String msgId);
}/<code>
8.3、改寫服務邏輯
在生產服務類中,新增數據寫入。
同時,在RabbitConfig服務配置,當消息發送成功之後,新增更新消息狀態邏輯。
改造消費者ConsumerMailService,每次消費的時候,從數據庫中查詢,如果消息已經被消費,不用再重複發送數據!
這樣即可保證,如果 rabbitMQ 服務器,即使重啟之後重新推送消息,通過數據庫判斷,也不會重複消費進而發生業務異常!
8.4、利用定數任務對消息投遞失敗進行補償
當 rabbitMQ 服務器突然掛掉之後,生成者就無法正常進行投遞數據,此時因為消息已經被記錄到數據庫,因此我們可以利用定數任務查詢出沒有投遞成功的消息,進行補償投遞。
利用定數任務,對投遞失敗的消息進行補償投遞,基本可以保證消息 100% 消費成功!
九、總結
本文主要是通過發送郵件這個業務案例,來講解 Springboot 與 rabbitMQ 技術的整合和使用!
當然解決這個業務需求的技術方案還有很多,例如 Springboot 與 rocketMQ 也可以實現這個需求,這個會在後期的文章講解!
同時,Springboot + rabbitMQ 這種架構方案更適合於集群應用,如果是單體應用,直接通過服務類操作即可實現郵件推送!
本篇主要是Springboot 與 rabbitMQ整合和基本使用教程,希望小夥伴能有所收穫!
閱讀更多 Java高級架構師 的文章