RocketMQ學習筆記(六)


RocketMQ學習筆記(六)

批處理示例

為什麼要批處理?

批量發送消息可提高傳遞小消息的性能。

使用限制
同一批次的消息應具有:相同的主題,相同的waitStoreMsgOK,並且不支持計劃。

此外,一批郵件的總大小不得超過1MiB。

如何使用批處理

如果您一次只發送不超過1MiB的消息,則可以輕鬆使用批處理:

<code>String topic = "BatchTest";
List<message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace(); //handle the error}/<message>/<code>

分成列表

僅當您發送大批量時,複雜性才會增加,並且您可能不確定它是否超過大小限制(1MiB)。

目前,您最好拆分列表:

<code>public class ListSplitter implements Iterator<list>> {    private final int SIZE_LIMIT = 1000 * 1000;    private final List<message> messages;    private int currIndex;    public ListSplitter(List<message> messages) {            this.messages = messages;
} @Override public boolean hasNext() { return currIndex < messages.size();
} @Override public List<message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length;
Map<string> properties = message.getProperties(); for (Map.Entry<string> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) { //it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
} break;
} if (tmpSize + totalSize > SIZE_LIMIT) { break;
} else {
totalSize += tmpSize;
}
}
List<message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex; return subList;
}
}//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) { try {
List<message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace(); //handle the error
}
}/<message>/<message>/<string>/<string>/<message>/<message>/<message>/<list>/<code>


分享到:


相關文章: