RocketMQ學習筆記(七)

RocketMQ學習筆記(七)

過濾示例

在大多數情況下,TAG是一個簡單而有用的設計來選擇您想要的消息。例如:

<code>DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");/<code>

消費者將接收包含TAGA或TAGB或TAGC的消息。但是限制是一個消息只能有一個標籤,這對於複雜的場景可能不起作用。在這種情況下,可以使用SQL表達式篩選消息。

原理

SQL特性可以通過發送消息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些有趣的邏輯。下面是一個例子:

<code>------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------/<code>

語法

RocketMQ只定義了一些基本語法來支持這個特性。你也可以輕鬆但擴展它。

  • 數字比較,像>,>,
  • 字符比較,類似=,<>,IN;
  • IS NULL或IS NOT NULL;
  • 邏輯的AND,OR,NOT;

常量類型:

  • 數字, 如123, 3.1415;
  • 字符, 如‘abc’, 必須用單引號;
  • NULL, 特殊常量;
  • Boolean, TRUE or FALSE;

使用限制

只有推送用戶才能使用SQL92來選擇消息。接口為:

<code>public void subscribe(final String topic, final MessageSelector messageSelector)/<code>

消費者示例

你可以在發送時通過方法putUserProperty把屬性放入消息中。

<code>DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); 

producer.start();

Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);// Set some properties.msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);

producer.shutdown();/<code>

消費者示例

消費時使用MessageSelector.bySql通過SQL92選擇消息。

<code>DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<messageext> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();/<messageext>/<code>


分享到:


相關文章: