實踐:RabbitMQ 延遲隊列,消息延遲推送的實現

cnblogs.com/haixiang/p/10966985.html

目錄

  • 應用場景
  • 消息延遲推送的實現
  • 測試結果

應用場景

目前常見的應用軟件都有消息的延遲推送的影子,應用也極為廣泛,例如:

  • 淘寶七天自動確認收貨。在我們簽收商品後,物流系統會在七天後延時發送一個消息給支付系統,通知支付系統將款打給商家,這個過程持續七天,就是使用了消息中間件的延遲推送功能。
  • 12306 購票支付確認頁面。我們在選好票點擊確定跳轉的頁面中往往都會有倒計時,代表著 30 分鐘內訂單不確認的話將會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鐘,告訴訂單系統訂單未完成,如果我們在30分鐘內完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息。

在上面兩種場景中,如果我們使用下面兩種傳統解決方案無疑大大降低了系統的整體性能和吞吐量:

  • 使用 redis 給訂單設置過期時間,最後通過判斷 redis 中是否還有該訂單來決定訂單是否已經完成。這種解決方案相較於消息的延遲推送性能較低,因為我們知道 redis 都是存儲於內存中,我們遇到惡意下單或者刷單的將會給內存帶來巨大壓力。
  • 使用傳統的數據庫輪詢來判斷數據庫表中訂單的狀態,這無疑增加了IO次數,性能極低。
  • 使用 jvm 原生的 DelayQueue ,也是大量佔用內存,而且沒有持久化策略,系統宕機或者重啟都會丟失訂單信息。

消息延遲推送的實現

在 RabbitMQ 3.6.x 之前我們一般採用死信隊列+TTL過期時間來實現延遲隊列,我們這裡不做過多介紹,可以參考:

https://www.cnblogs.com/haixiang/p/10905189.html

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。

延遲隊列插件下載:

https://www.rabbitmq.com/community-plugins.htm

實踐:RabbitMQ 延遲隊列,消息延遲推送的實現

application.properties 中配置代碼

<code>spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

#開啟 confirm 確認機制
spring.rabbitmq.publisher-confirms=true
#開啟 return 確認機制
spring.rabbitmq.publisher-returns=true
#設置為 true 後 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除
spring.rabbitmq.template.mandatory=true/<code>

首先我們創建交換機和消息隊列


實踐:RabbitMQ 延遲隊列,消息延遲推送的實現

我們在 Exchange 的聲明中可以設置exchange.setDelayed(true)來開啟延遲隊列,也可以設置為以下內容傳入交換機聲明的方法中,因為第一種方式的底層就是通過這種方式來實現的。

<code>//Map<string> pros = new HashMap<>();
//設置交換機支持延遲消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);/<string>/<code>

發送消息時我們需要指定延遲推送的時間,我們這裡在發送消息的方法中傳入參數 new MessagePostProcessor() 是為了獲得 Message對象,因為需要藉助 Message對象的api 來設置延遲時間。


實踐:RabbitMQ 延遲隊列,消息延遲推送的實現

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設置 x-delay。等同於我們手動設置 header

message.getMessageProperties().setHeader("x-delay", "6000");

<code>/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
    if (delay == null || delay         this.headers.remove(X_DELAY);
    }
    else {
        this.headers.put(X_DELAY, delay);
    }
}/<code>

消費端進行消費

<code>import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

    @RabbitListener(queues = "MQ.LazyQueue")
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException{
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("lazy receive " + new String(msg.getBody()));

    }/<code>

測試結果

<code>import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

    @Autowired
    private MQSender mqSender;

    @Test
    public void sendLazy() throws  Exception {
        String msg = "hello spring boot";

        mqSender.sendLazy(msg + ":");
    }
}/<code>

果然在 6 秒後收到了消息 lazy receive hello spring boot。


分享到:


相關文章: