04.22 RocketMQ Java应用—NormalProducer(普通)模式

之前的文章《阿里消息队列使用》我们介绍了如何搭建RocketMQ的单Master环境,其它集群环境的搭建只要复制master,然后修改配置即可。这篇我们介绍如何在Java应用中使用RocketMQ来实现生产、消费者,快速体验RocketMQ。

1、RocketMQ的模式概述

RocketMQ提供了3种模式的Producer,2种模式的Consumer模式。

Producer模式:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务)。

Consumer模式:集群消费 AND 广播消费,默认采用集群消费。集群消费也就是消息的负载均衡消费;广播消息,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。

2、RocketMQ生产普通模式

我们用Maven创建RocketMQ的生产和消费应用环境,用Java代码实现简单的生产消费业务。

Maven搭建,pom中引入rocketmq-all和rocketmq-client

RocketMQ Java应用—NormalProducer(普通)模式

生产者

RocketMQ Java应用—NormalProducer(普通)模式

消费者

RocketMQ Java应用—NormalProducer(普通)模式

  • 无论生产者、消费者都必须给出GroupName,而且具有唯一性!

  • 生产到哪个Topic的哪个Tag下,消费者也是从Topic的哪个Tag进行消费,可见这个Tag有点类似于JMS Selector机制,即实现消息的过滤。

  • 生产者、消费者需要设置NameServer地址。

  • 这里,采用的是Consumer Push的方式,即设置Listener机制回调,相当于开启了一个线程。以后为大家介绍Consumer Pull的方式。

我们看一下运行结果。

生产者

RocketMQ Java应用—NormalProducer(普通)模式

我们这里采用的是单Master节点,在生产者中没有看到brokerName的不同变化,在多Master节点中,会实现自动负载,将消息发送到不同的broker。


消费者

RocketMQ Java应用—NormalProducer(普通)模式

这里消费消息是没有什么顺序的

在多Master模式中,如果某个Master进程挂了,显然这台broker将不可用,上面的消息也将无法消费,要知道开源版本的RocketMQ是没有提供切换程序,来自动恢复故障的,因此在实际开发中,我们一般提供一个监听程序,用于监控Master的状态。

在ActiveMQ中,生产消息的时候会提供是否持久化的选择,但是对于RocketMQ而言,消息是一定会被持久化的!

上面的消费者采用的是Push Consumer的方式,那么监听的Listener中的消息List到底是多少条呢?虽然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),实际上即使设置了批量的条数,但是注意了,是最大是10,并不意味着每次batch的都是10,只有在消息有挤压的情况下才有可能。而且Push Consumer的最佳实践方式就是一条条的消费,如果需要batch,可以使用Pull Consumer。

务必保证先启动消费者进行Topic订阅,然后在启动生产者进行生产(否则极有可能导致消息的重复消费,重复消费,重复消费!重要的事情说三遍!关于消息的重复问题后续给大家介绍~)。而且在实际开发中,有时候不会批量的处理消息,而是原子性的,单线程的去一条一条的处理消息,这样就是实时的在处理消息。(批量的处理海量的消息,可以考虑Kafka)

3、消息失败重试机制

消息失败,无非涉及到2端:从生产者端发往MQ的失败;消费者端从MQ消费消息的失败。

在业务应用场景中,有时网络抖动导致生产者发送消息到MQ失败。下面的框红的代码逻辑是如果该条消息在1S内没有发送成功,那么重试3次。

RocketMQ Java应用—NormalProducer(普通)模式

消费者端的失败,分为2种情况,一个是timeout,一个是exception。timeout,比如由于网络原因导致消息压根就没有从MQ到消费者上,在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)。exception,消息正常的到了消费者,结果消费者发生异常,处理失败了。这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?

RocketMQ Java应用—NormalProducer(普通)模式

消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)

RocketMQ Java应用—NormalProducer(普通)模式

在启动broker的过程中,可以观察下日志,你会发现RECONSUME_LATER的策略。

如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,......直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!

RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要在重试呢,因为重试解决不了问题了!这该如何做呢?

我们先来看一下一条消息MessageExt对象的输出:

RocketMQ Java应用—NormalProducer(普通)模式

注意到reconsumeTimes属性,这个属性就代表消息重试的次数!来看一段代码:

RocketMQ Java应用—NormalProducer(普通)模式

注意了,对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。

4、消息负载均衡及高效的水平扩展机制

RocketMQ Java应用—NormalProducer(普通)模式

对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/......的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!

我们考虑一下这种情况:比如C2发生了重启,一条消息发往C3进行消费,但是这条消息的处理需要0.1S,而此时C2刚好完成重启,那么C2是否可能会收到这条消息呢?答案是肯定的,也就是consume broker的重启,或者水平扩容,或者不遵守先订阅后生产消息,都可能导致消息的重复消费!

至于消息分发到C1/C2/C3,其实也是可以设置策略的。

RocketMQ Java应用—NormalProducer(普通)模式

5、集群消费 AND 广播消费

RocketMQ的消费方式有2种,在默认情况下,就是集群消费,也就是上面提及的消息的负载均衡消费。另一种消费模式,是广播消费。广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。

代码实现的时候在消费者代码中设置广播模式即可。

RocketMQ Java应用—NormalProducer(普通)模式

友情提示:消费者的集群消费、广播消费与生产者的普通、顺序、事务模式是笛卡尔积的关系。


分享到:


相關文章: