利用SpringBoot+RabbitMQ,實現一個郵件推送服務

最近一直在學習RabbitMQ,但是不知如何在實際業務中擼出它的功效,最近剛好看到一篇相關案例,有一些心得,想和小夥伴們分享一下!

一、先來一張 RabbitMQ 流程圖

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

本文內容主要圍繞這個流程圖展開,利用 RabbitMQ 消息隊列,實現生產者與消費者解耦,所以有必要先貼出來,涵蓋了 RabbitMQ 很多知識點,如:

  • 消息發送確認機制
  • 消費確認機制
  • 消息的重新投遞
  • 消費冪等性, 等等

二、實現思路

  • 1.在虛擬機創建一個CentOS7上,並安裝 RabbitMQ
  • 2.開放QQ郵箱或者其它郵箱授權碼,用於發送郵件
  • 3.創建郵件發送項目並編寫代碼
  • 4.發送郵件測試
  • 5.消息發送失敗處理

三、RabbitMQ安裝

RabbitMQ 基於 erlang 進行通信,相比其它的軟件,安裝有些麻煩,不過本例採用rpm方式安裝,任何新手都可以完成安裝,過程如下!

3.1、安裝前命令準備

輸入如下命令,完成安裝前的環境準備。

<code>yum install lsof  build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim/<code>

3.2、下載 RabbitMQ、erlang、socat 的安裝包

本次下載的是RabbitMQ-3.6.5版本,採用rpm一鍵安裝,適合新手直接上手。

先創建一個rabbitmq目錄,本例的目錄路徑為/usr/app/rabbitmq,然後在目錄下執行如下命令,下載安裝包!

  • 下載erlang
<code>wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm/<code>
  • 下載socat
<code>wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm/<code>
  • 下載rabbitMQ
<code>wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm/<code>

最終目錄文件如下:

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

3.3、安裝軟件包

下載完之後,按順序依次安裝軟件包,這個很重要哦~

  • 安裝erlang
<code>rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm/<code>
  • 安裝socat
<code>rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm/<code>
  • 安裝rabbitmq
<code>rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm/<code>

安裝完成之後,修改rabbitmq的配置,默認配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。

<code>vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app/<code>

修改loopback_users節點的值!

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

最後只需通過如下命令,啟動服務即可!

<code>rabbitmq-server start &/<code>

運行腳本之後,如果報錯,例如下圖!

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

解決辦法如下:

<code>vim /etc/rabbitmq/rabbitmq-env.conf/<code>

在文件裡添加一行,如下配置!

<code>NODENAME=rabbit@localhost/<code> 

然後,再保存!再次以下命令啟動服務!

<code>rabbitmq-server start &/<code>

通過如下命令,查詢服務是否啟動成功!

<code>lsof -i:5672/<code>

如果出現5672已經被監聽,說明已經啟動成功!

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

3.4、啟動可視化的管控臺

輸入如下命令,啟動控制檯!

<code>rabbitmq-plugins enable rabbitmq_management/<code>

用瀏覽器打開http://ip:15672,這裡的ip就是 CentOS 系統的 ip,結果如下:

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

賬號、密碼,默認為guest,如果出現無法訪問,檢測防火牆是否開啟,如果開啟將其關閉即可!

登錄之後的監控平臺,界面如下:

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

獲取郵箱授權碼的目的,主要是為了通過代碼進行發送郵件,例如 QQ 郵箱授權碼獲取方式,如下圖:

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

點擊【開啟】按鈕,然後發送短信,即可獲取授權碼,該授權碼就是配置文件spring.mail.password需要的密碼!

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

五、項目介紹

  • springboot版本:2.1.5.RELEASE
  • RabbitMQ版本:3.6.5
  • SendMailUtil:發送郵件工具類
  • ProduceServiceImpl:生產者,發送消息
  • ConsumerMailService:消費者,消費消息,發送郵件

六、代碼實現

6.1、創建項目

在 IDEA 下創建一個名稱為smail的 Springboot 項目,pom文件中加入amqp和mail。

<code><dependencies>
    
    <dependency>
        <groupid>org.springframework.boot/<groupid>
        <artifactid>spring-boot-starter/<artifactid>
    /<dependency>
    
    <dependency>
        <groupid>org.springframework.boot/<groupid>
        <artifactid>spring-boot-starter-test/<artifactid>
        <scope>test/<scope>
    /<dependency>
    
    <dependency>
        <groupid>org.springframework.boot/<groupid>
        <artifactid>spring-boot-starter-web/<artifactid>
    /<dependency>
    

    <dependency>
        <groupid>org.springframework.boot/<groupid>
        <artifactid>spring-boot-devtools/<artifactid>
        <optional>true/<optional>
    /<dependency>
    
    <dependency>
        <groupid>org.springframework.boot/<groupid>
        <artifactid>spring-boot-starter-mail/<artifactid>
    /<dependency>
    
    <dependency>
        <groupid>org.springframework.boot/<groupid>
        <artifactid>spring-boot-starter-amqp/<artifactid>
    /<dependency>
    
    <dependency>
        <groupid>org.apache.commons/<groupid>
        <artifactid>commons-lang3/<artifactid>
        <version>3.4/<version>
    /<dependency>
    
    <dependency>
        <groupid>org.projectlombok/<groupid>
        <artifactid>lombok/<artifactid>
        <version>1.16.10/<version>
    /<dependency>
/<dependencies>/<code>

6.2、配置rabbitMQ、mail

在application.properties文件中,配置amqp和mail!

<code>#rabbitmq
spring.rabbitmq.host=192.168.0.103
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開啟confirms回調 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 開啟returnedMessage回調 Exchange -> Queue
spring.rabbitmq.publisher-returns=true

# 設置手動確認(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100

# mail
spring.mail.default-encoding=UTF-8
spring.mail.host=smtp.qq.com
[email protected]
spring.mail.password=獲取的郵箱授權碼
[email protected]
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true/<code>

其中,spring.mail.password第四步中獲取的授權碼,同時username和from要一致!

6.3、RabbitConfig配置類

<code>@Configuration
@Slf4j
public class RabbitConfig {

    // 發送郵件
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        // 消息是否成功發送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功發送到Exchange");
            } else {
                log.info("消息發送到Exchange失敗, {}, cause: {}", correlationData, cause);
            }
        });

        // 觸發setReturnCallback回調必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發回調
        rabbitTemplate.setMandatory(true);
        // 消息是否從Exchange路由到Queue, 注意: 這是一個失敗回調, 只有消息從Exchange路由到Queue失敗才會回調這個方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    @Bean
    public DirectExchange mailExchange() {
        return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }
}/<code>

6.4、Mail 郵件實體類

<code>@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Mail {

    @Pattern(regexp = "^([a-z0-9A-Z]+[-|\\\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\\\.)+[a-zA-Z]{2,}$", message = "郵箱格式不正確")
    private String to;

    @NotBlank(message = "標題不能為空")
    private String title;

    @NotBlank(message = "正文不能為空")
    private String content;

    private String msgId;// 消息id
}/<code>

6.5、SendMailUtil郵件發送類

<code>@Component
@Slf4j
public class SendMailUtil {

    @Value("${spring.mail.from}")
    private String from;

    @Autowired
    private JavaMailSender mailSender;

    /**
     * 發送簡單郵件
     *
     * @param mail
     */
    public boolean send(Mail mail) {
        String to = mail.getTo();// 目標郵箱
        String title = mail.getTitle();// 郵件標題
        String content = mail.getContent();// 郵件正文

        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(from);
        message.setTo(to);
        message.setSubject(title);
        message.setText(content);

        try {
            mailSender.send(message);
            log.info("郵件發送成功");
            return true;
        } catch (MailException e) {
            log.error("郵件發送失敗, to: {}, title: {}", to, title, e);
            return false;
        }

    }
}/<code>

6.6、ProduceServiceImpl 生產者類

<code>@Service
public class ProduceServiceImpl implements ProduceService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public boolean send(Mail mail) {
  //創建uuid
        String msgId = UUID.randomUUID().toString().replaceAll("-", "");
        mail.setMsgId(msgId);
  
  //發送消息到rabbitMQ
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);

        return true;
    }
}/<code>

6.7、ConsumerMailService 消費者類

<code>@Component
@Slf4j
public class ConsumerMailService {

    @Autowired
    private SendMailUtil sendMailUtil;

    @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
    public void consume(Message message, Channel channel) throws IOException {
        //將消息轉化為對象
        String str = new String(message.getBody());
        Mail mail = JsonUtil.strToObj(str, Mail.class);
        log.info("收到消息: {}", mail.toString());

        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();

        boolean success = sendMailUtil.send(mail);
        if (success) {
            channel.basicAck(tag, false);// 消費確認

        } else {
            channel.basicNack(tag, false, true);
        }
    }
}/<code>

6.8、TestController 控制層類

<code>@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Autowired
    private ProduceService testService;

    @PostMapping("send")
    public boolean sendMail(Mail mail) {
        return testService.send(mail);
    }
}/<code>

七、測試服務

啟動 SpringBoot 服務之後,用 postman 模擬請求接口。

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

查看控制檯信息。

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

查詢接受者郵件信息。

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

郵件發送成功!

八、消息發送失敗處理

雖然,上面案例可以成功的實現消息的發送,但是上面的流程很脆弱,例如:rabbitMQ 突然蹦了、郵件發送失敗了、重啟 rabbitMQ 服務器出現消息重複消費,應該怎處理呢?

很顯然,我們需要對原有的邏輯進行升級改造,因此我們需要引入數據庫來記錄消息的發送情況。

8.1、創建消息投遞日誌表

<code>CREATE TABLE `msg_log` (
  `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一標識',
  `msg` text COMMENT '消息體, json格式化',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交換機',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由鍵',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態: 0投遞中 1投遞成功 2投遞失敗 3已消費',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時間',
  `create_time` datetime DEFAULT NULL COMMENT '創建時間',
  `update_time` datetime DEFAULT NULL COMMENT '更新時間',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投遞日誌';/<code>

8.2、編寫 MsgLog 相關服務類

<code>public interface MsgLogService {

    /**
     * 插入消息日誌
     * @param msgLog
     */
    void insert(MsgLog msgLog);

    /**
     * 更新消息狀態
     * @param msgId
     * @param status
     */
    void updateStatus(String msgId, Integer status);

    /**
     * 查詢消息
     * @param msgId
     * @return
     */
    MsgLog selectByMsgId(String msgId);
}/<code>

8.3、改寫服務邏輯

在生產服務類中,新增數據寫入。

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

同時,在RabbitConfig服務配置,當消息發送成功之後,新增更新消息狀態邏輯。

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

改造消費者ConsumerMailService,每次消費的時候,從數據庫中查詢,如果消息已經被消費,不用再重複發送數據!

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

這樣即可保證,如果 rabbitMQ 服務器,即使重啟之後重新推送消息,通過數據庫判斷,也不會重複消費進而發生業務異常!

8.4、利用定數任務對消息投遞失敗進行補償

當 rabbitMQ 服務器突然掛掉之後,生成者就無法正常進行投遞數據,此時因為消息已經被記錄到數據庫,因此我們可以利用定數任務查詢出沒有投遞成功的消息,進行補償投遞。

利用SpringBoot+RabbitMQ,實現一個郵件推送服務

利用定數任務,對投遞失敗的消息進行補償投遞,基本可以保證消息 100% 消費成功!

九、總結

本文主要是通過發送郵件這個業務案例,來講解 Springboot 與 rabbitMQ 技術的整合和使用!

當然解決這個業務需求的技術方案還有很多,例如 Springboot 與 rocketMQ 也可以實現這個需求,這個會在後期的文章講解!

同時,Springboot + rabbitMQ 這種架構方案更適合於集群應用,如果是單體應用,直接通過服務類操作即可實現郵件推送!

本篇主要是Springboot 與 rabbitMQ整合和基本使用教程,希望小夥伴能有所收穫!


分享到:


相關文章: