本文作者:lazasha主页:https://blog.csdn.net/lazasha如果有Java相关的好文章,欢迎投稿!
目录
- CAP原则
- Rocket mq实现思路
- Rabbit mq实现思路
- 需要考虑的问题
- 后记
严格的来说,消息中间件并不能实现分布式事务,而是通过事后补偿机制,达到和分布式事务一样的数据一致性。这里主要探讨Rocket mq 和 Rabbit mq的实现思路。Rocket mq只描述一下实现思路,Rabbit mq会有代码演示。还是那句话,由于水平有限,难免有不当或者错误之处,请大家指正,谢谢。
CAP原则
首先我们得了解CAP原则,又称CAP原理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得 。
- 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
- 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
- 分区容错性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
在分布式存储系统中,最多只能实现上面的两点。而由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容错性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡,没有系统能同时保证这三点。
我的理解,使用mq实现最终补偿事务一致性,是牺牲了同意时刻访问同一份数据的一致性,通过一段时间的延迟,最终达到一致性。
Rocket mq实现思路
首先推荐事务和消息解耦的方式。
- 开始事务,Prepared消息,RocketMQ会返回消息地址
- 执行本地事务
- 事务提交成功,通过拿到的消息地址去修改RocketMQ里面修改消息的状态,消息从Prepared变为发送成功;如果事务失败,则通过消息地址去修改消息状态,变为消息取消。
- 消息消费方居于Push或者Pull方式消费消息成功后,向服务器发送消费成功的消息通知。
所以,在事务内需要向mq提交两次消息请求,一次是发送,另外一次是确认(确认成功或者取消)。
如果确认消息发送失败了,RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以消息生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
- 优点: 实现了最终一致性,不需要依赖本地数据库事务。
- 缺点: 实现难度大,主流MQ不支持,没有.NET客户端,RocketMQ事务消息部分代码也未开源
- 极端情况下仍然需要手工介入
Rabbit mq实现思路
和Rocket mq有所不一样。
消息生产方:
- 首先执行本地事务,写消息表
- 获取消息表的消息记录,向Rabbit mq发送消息
- 接收Rabbit mq返回的消息确认接收成功通知(ACK),更新消息表;如果失败,则不更新,保证发送消息和记录表状态一致。
这个会存在这样一个问题:消息已经发送成功,但是Rabbit mq没有返回,则无法更新消息表;或者接收到消息成功发送通知,但是更新数据库失败,也无法更新消息表,导致下一次重复发送。解决这个问题的思路,我只想到需要消息接收方要做幂等性检查,从而避免重复消费消息。
消息接收方:
- Rabbit mq通知有消息
- 消息接收方接收消息,处理成功或者失败,都要返回确认ACK, Rabbit mq接收到ACK,根据ACK更新消息状态为已经发送,删除队列中的消息;或者更改消息列,等待从新发送。
这里有个问题,如果消息接收方成功处理消息,但是由于特殊情况没有返回ACK, Rabbit mq没有接收到ACK,这条消息状态已经改变不会再发送,需要手工处理。
消息发送方代码示例:
配置:
rabbitmq
:
host
:
localhost
#集群配置:addresses:ip1:port1,ip2:port2,ip3:port3
port
:
5672
username
:
guest
password
:
guest
publisher
-
confirms
:
true
#确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack
publisher
-
returns
:
true
#确认消息是否正确到达queue,如果没有则触发,如果有则不触发
package
com
.
sleb
.
springcloud
.
rabbitproducerack
.
service
;
import
com
.
sleb
.
springcloud
.
modalservice
.
Users
;
import
com
.
sleb
.
springcloud
.
rabbitproducerack
.
config
.
CorrelationDataEx
;
import
org
.
springframework
.
amqp
.
rabbit
.
core
.
RabbitTemplate
;
import
org
.
springframework
.
amqp
.
support
.
converter
.
Jackson2JsonMessageConverter
;
import
org
.
springframework
.
beans
.
factory
.
annotation
.
Autowired
;
import
org
.
springframework
.
stereotype
.
Service
;
import
java
.
util
.
Date
;
import
static
com
.
sleb
.
springcloud
.
modalservice
.
RabbitConfigInfo
.
EXCHANGE
;
import
static
com
.
sleb
.
springcloud
.
modalservice
.
RabbitConfigInfo
.
QUEUE_TWO_ROUTING
;
/**
* 如果消息没有到exchange,则confirm回调,ack=false *
* 如果消息到达exchange,则confirm回调,ack=true *
* exchange到queue成功,则不回调return
*
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
* 确认方式:
* 方式一:channel.waitForConfirms( ) 普通发送方确认模式; *
* 方式二:channel.waitForConfirmsOrDie( ) 批量确认模式; *
* 方式三:channel.addConfirmListener()异步监听发送方确认模式; *
* 采用第三种比较好,异步监听
*/
@Service
public
class
SenderService
{
@Autowired
private
RabbitTemplate
rabbitTemplate
;
public
void
sender
(
Users
users
)
throws
Exception
{
System
.
out
.
println
(
"你好现在是 "
+
new
Date
()
+
""
);
System
.
out
.
println
(
"HelloSender发送内容 : "
+
users
.
toString
());
/**
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
* ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
*/
rabbitTemplate
.
setReturnCallback
((
message
,
replyCode
,
replyText
,
exchange
,
routingKey
)
->
{
//Users users1 = (Users)message.getBody().toString();
//String correlationId = message.getMessageProperties().getCorrelationId();
System
.
out
.
println
(
"Message : "
+
new
String
(
message
.
getBody
()));
//System.out.println("Message : " + new String(message.getBody()));
System
.
out
.
println
(
"replyCode : "
+
replyCode
);
System
.
out
.
println
(
"replyText : "
+
replyText
);
//错误原因
System
.
out
.
println
(
"exchange : "
+
exchange
);
System
.
out
.
println
(
"routingKey : "
+
routingKey
);
//queue名称
});
rabbitTemplate
.
setConfirmCallback
((
correlationData
,
ack
,
cause
)
->
{
if
(
ack
)
{
CorrelationDataEx
c
=
(
CorrelationDataEx
)
correlationData
;
System
.
out
.
println
(
"发送消息: "
+
c
.
getMsg
());
System
.
out
.
println
(
"HelloSender 消息发送成功 :"
+
correlationData
.
toString
()
);
/**
* 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。
*/
}
else
{
System
.
out
.
println
(
"HelloSender消息发送失败"
+
cause
);
}
});
/**
* CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入
* 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
*/
CorrelationDataEx
c
=
new
CorrelationDataEx
();
c
.
setId
(
users
.
getId
().
toString
());
c
.
setMsg
(
users
.
toString
());
/**
* 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
* 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
*/
rabbitTemplate
.
setMessageConverter
(
new
Jackson2JsonMessageConverter
());
rabbitTemplate
.
convertAndSend
(
EXCHANGE
,
QUEUE_TWO_ROUTING
,
users
,
c
);
}
}
消息接收方代码示例:
配置:
rabbitmq
:
host
:
localhost
port
:
5672
username
:
guest
password
:
guest
template
:
mandatory
:
true
#messageConverter: jackson2JsonMessageConverter 这个必须在程序里面创建bean
listener
:
simple
:
prefetch
:
1
acknowledge
-
mode
:
manual
concurrency
:
3
package
com
.
sleb
.
springcloud
.
rabbitreceiverack
.
service
;
import
com
.
rabbitmq
.
client
.
Channel
;
import
com
.
sleb
.
springcloud
.
modalservice
.
Users
;
import
org
.
springframework
.
amqp
.
core
.
Message
;
import
org
.
springframework
.
amqp
.
rabbit
.
annotation
.
RabbitHandler
;
import
org
.
springframework
.
amqp
.
rabbit
.
annotation
.
RabbitListener
;
import
org
.
springframework
.
amqp
.
support
.
converter
.
Jackson2JsonMessageConverter
;
import
org
.
springframework
.
amqp
.
support
.
converter
.
MessageConverter
;
import
org
.
springframework
.
context
.
annotation
.
Bean
;
import
org
.
springframework
.
stereotype
.
Service
;
import
java
.
io
.
IOException
;
import
java
.
util
.
Date
;
import
static
com
.
sleb
.
springcloud
.
modalservice
.
RabbitConfigInfo
.
QUEUE_ONE_ROUTING
;
@Service
public
class
Receiver
{
@RabbitHandler
@RabbitListener
(
queues
=
QUEUE_ONE_ROUTING
)
//containerFactory = "rabbitListenerContainerFactory", concurrency = "2")
public
void
process
(
Users
users
,
Channel
channel
,
Message
message
)
throws
IOException
{
System
.
out
.
println
(
"HelloReceiver收到 : "
+
users
.
toString
()
+
"收到时间"
+
new
Date
());
try
{
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
// 否则消息服务器以为这条消息没处理掉 后续还会在发
channel
.
basicAck
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
);
System
.
out
.
println
(
"receiver success"
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
//丢弃这条消息,则不会重新发送了
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
System
.
out
.
println
(
"receiver fail"
);
}
}
@Bean
public
MessageConverter
jackson2JsonMessageConverter
()
{
return
new
Jackson2JsonMessageConverter
();
}
}
最需要考虑的两个问题:
- 消息消费的顺序问题:发送消息指定队列,消息消费者指定队列可以解决,消费者只能一个。
- 消息消费的重复问题:每次消费消息时候创建一消息表,在消费消息前先查询该表,如果消息存在就说明已经消费。
转发+关注。私信“资料”获取更多JAVA干货内容。
閱讀更多 java高級架構 的文章