「SpringBoot MQ 系列」RabbitMq 消息發送基本使用姿勢

「SpringBoot MQ 系列」RabbitMq 消息發送基本使用姿勢

【MQ 系列】SprigBoot + RabbitMq 消息發送基本使用姿勢

前面兩篇博文,分別介紹了RabbitMq的核心知識點,以及整合SpringBoot的demo應用;接下來也該進入正題,看一下SpringBoot的環境下,如何玩轉rabbitmq

本篇內容主要為消息發送,包括以下幾點

  • RabbitTemplate 發送消息的基本使用姿勢
  • 自定義消息基本屬性
  • 自定義消息轉換器AbstractMessageConverter
  • 發送Object類型消息失敗的case

I. 基本使用姿勢

1. 配置

我們藉助SpringBoot 2.2.1.RELEASE + rabbitmq 3.7.5來完整項目搭建與測試

項目pom.xml如下

<code><dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-amqp/<artifactid>
/<dependency>
/<code>

配置文件application.yml內容如下

<code>spring: 

rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1
/<code>

2. 配置類

通過前面rabbitmq的知識點學習,我們可以知道發送端的主要邏輯 “將消息發送給exchange,然後根據不同的策略分發給對應的queue”

本篇博文主要討論的是消息發送,為了後續的實例演示,我們定義一個topic模式的exchange,並綁定一個的queue;(因為對發送端而言,不同的exchange類型,對發送端的使用姿勢影響並不大,有影響的是消費者)

<code>public class MqConstants {

public static final String exchange = "topic.e";

public static final String routing = "r";

public final static String queue = "topic.a";

}

@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.exchange);
}

@Bean
public Queue queue() {
// 創建一個持久化的隊列
return new Queue(MqConstants.queue, true);
}


@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
/<code>

3. 消息發送

消息發送,主要藉助的是RabbitTemplate#convertAndSend方法來實現,通常情況下,我們直接使用即可

<code>@Service
public class BasicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 一般的用法,推送消息
*
* @param ans
* @return
*/
private String publish2mq1(String ans) {
String msg = "Durable msg = " + ans;
System.out.println("publish: " + msg);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg;
}
}
/<code>

上面的核心點就一行rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);

  • 表示將msg發送給指定的exchange,並設置消息的路由鍵

請注意

通過上面的方式,發送的消息默認是持久化的,當持久化的消息,分發到持久化的隊列時,會有消息的落盤操作;

在某些場景下,我們對消息的完整性要求並沒有那麼嚴格,反而更在意mq的性能,丟失一些數據也可以接受;這個時候我們可能需要定製一下發送的消息屬性(比如將消息設置為非持久化的)

下面提供兩種姿勢,推薦第二種

<code>/**
* 推送一個非持久化的消息,這個消息推送到持久化的隊列時,mq重啟,這個消息會丟失;上面的持久化消息不會丟失
*
* @param ans
* @return
*/
private String publish2mq2(String ans) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

System.out.println("publish: " + message);
return message.toString();
}


private String publish2mq3(String ans) {
String msg = "Define msg = " + ans;

rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("ta", "測試");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
}
});

return msg;
}
/<code>
「SpringBoot MQ 系列」RabbitMq 消息發送基本使用姿勢

注意

  • 在實際的項目開發中,推薦使用MessagePostProcessor來定製消息屬性
  • 其次不推薦在每次發送消息時都創建一個MessagePostProcessor對象,請定義一個通用的對象,能複用就複用

4. 非序列化對象發送異常case

通過查看rabbitTemplate#convertAndSend的接口定義,我們知道發送的消息可以是Object類型,那麼是不是意味著任何對象,都可以推送給mq呢?

下面是一個測試case

<code>private String publish2mq4(String ans) {
NonSerDO nonSerDO = new NonSerDO(18, ans);
System.out.println("publish: " + nonSerDO);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}


@Data
public static class NonSerDO {
private Integer age;
private String name;

public NonSerDO(int age, String name) {
this.age = age;
this.name = name;
}
}
/<code>

當我們調用上面的publish2mq4方法時,並不會是想象中的直接成功,相反拋出一個參數類型異常

「SpringBoot MQ 系列」RabbitMq 消息發送基本使用姿勢

為什麼會出現這個問題呢?從堆棧分析,我們知道RabbitTemplate默認是利用SimpleMessageConverter來實現封裝Message邏輯的,核心代碼為

<code>// 下面代碼來自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
\tbyte[] bytes = null;
\tif (object instanceof byte[]) {
\t\tbytes = (byte[]) object;
\t\tmessageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
\t}

\telse if (object instanceof String) {
\t\ttry {
\t\t\tbytes = ((String) object).getBytes(this.defaultCharset);
\t\t}
\t\tcatch (UnsupportedEncodingException e) {
\t\t\tthrow new MessageConversionException(
\t\t\t\t\t"failed to convert to Message content", e);
\t\t}
\t\tmessageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
\t\tmessageProperties.setContentEncoding(this.defaultCharset);
\t}
\telse if (object instanceof Serializable) {
\t\ttry {
\t\t\tbytes = SerializationUtils.serialize(object);
\t\t}
\t\tcatch (IllegalArgumentException e) {
\t\t\tthrow new MessageConversionException(
\t\t\t\t\t"failed to convert to serialized Message content", e);
\t\t}
\t\tmessageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
\t}
\tif (bytes != null) {
\t\tmessageProperties.setContentLength(bytes.length);
\t\treturn new Message(bytes, messageProperties);
\t}
\tthrow new IllegalArgumentException(getClass().getSimpleName()
\t\t\t+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
/<code>

上面邏輯很明確的指出了,只接受byte數組,string字符串,可序列化對象(這裡使用的是jdk的序列化方式來實現對象和byte數組之間的互轉)

  • 所以我們傳遞一個非序列化的對象會參數非法的異常

自然而然的,我們會想有沒有其他的MessageConverter來友好的支持任何類型的對象

5. 自定義MessageConverter

接下來我們希望通過自定義一個json序列化方式的MessageConverter來解決上面的問題

一個比較簡單的實現(利用FastJson來實現序列化/反序列化)

<code>public static class SelfConverter extends AbstractMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
messageProperties.setContentType("application/json");
return new Message(JSON.toJSONBytes(object), messageProperties);
}

@Override
public Object fromMessage(Message message) throws MessageConversionException {
return JSON.parse(message.getBody());
}
}
/<code>

重新定義一個rabbitTemplate,並設置它的消息轉換器為自定義的SelfConverter

<code>@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new SelfConverter());
return rabbitTemplate;
}
/<code>

然後再次測試一下

<code>@Service
public class JsonPublisher {
@Autowired
private RabbitTemplate jsonRabbitTemplate;

private String publish1(String ans) {
Map<string> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "json");
msg.put("version", 123);

System.out.println("publish: " + msg);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}

private String publish2(String ans) {
BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
System.out.println("publish: " + nonSerDO);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}
}
/<string>/<code>

mq內接收到的推送消息如下

「SpringBoot MQ 系列」RabbitMq 消息發送基本使用姿勢

6. Jackson2JsonMessageConverter

上面雖然實現了Json格式的消息轉換,但是比較簡陋;而且這麼基礎通用的功能,按照Spring全家桶的一貫作風,肯定是有現成可用的,沒錯,這就是Jackson2JsonMessageConverter

所以我們的使用姿勢也可以如下

<code>//定義RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}


// 測試代碼
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
Map<string> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "jackson");
msg.put("version", 456);
System.out.println("publish: " + msg);
jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}
/<string>/<code>

下面是通過Jackson序列化消息後的內容,與我們自定義的有一些不同,多了headers和content_encoding

「SpringBoot MQ 系列」RabbitMq 消息發送基本使用姿勢

7. 小結

本篇博文主要的知識點如下

  • 通過RabbitTemplate#convertAndSend來實現消息分發
  • 通過MessagePostProcessor來自定義消息的屬性(請注意默認投遞的消息時持久化的)
  • 默認的消息封裝類為SimpleMessageConverter,只支持分發byte數組,字符串和可序列化的對象;不滿足上面三個條件的方法調用會拋異常
  • 我們可以通過實現MessageConverter接口,來定義自己的消息封裝類,解決上面的問題

在RabbitMq的知識點博文中,明確提到了,為了確保消息被brocker正確接收,提供了消息確認機制和事務機制兩種case,那麼如果需要使用這兩種方式,消息生產者需要怎麼做呢?

限於篇幅,下一篇博文將帶來在消息確認機制/事務機制下的發送消息使用姿勢

II. 其他

項目源碼

  • 工程:https://github.com/liuyueyi/spring-boot-demo
  • 源碼:https://github.com/liuyueyi/spring-boot-demo/tree/master/spring-boot/301-rabbitmq-publish


分享到:


相關文章: