使用mq實現分佈式事務-補償事務一致性

本文作者:lazasha主頁:https://blog.csdn.net/lazasha如果有Java相關的好文章,歡迎投稿!


目錄

  • CAP原則
  • Rocket mq實現思路
  • Rabbit mq實現思路
  • 需要考慮的問題
  • 後記

嚴格的來說,消息中間件並不能實現分佈式事務,而是通過事後補償機制,達到和分佈式事務一樣的數據一致性。這裡主要探討Rocket mq 和 Rabbit mq的實現思路。Rocket mq只描述一下實現思路,Rabbit mq會有代碼演示。還是那句話,由於水平有限,難免有不當或者錯誤之處,請大家指正,謝謝。

CAP原則

首先我們得了解CAP原則,又稱CAP原理,指的是在一個分佈式系統中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可兼得 。

  • 一致性(C):在分佈式系統中的所有數據備份,在同一時刻是否同樣的值。(等同於所有節點訪問同一份最新的數據副本)
  • 可用性(A):在集群中一部分節點故障後,集群整體是否還能響應客戶端的讀寫請求。(對數據更新具備高可用性)
  • 分區容錯性(P):以實際效果而言,分區相當於對通信的時限要求。系統如果不能在時限內達成數據一致性,就意味著發生了分區的情況,必須就當前操作在C和A之間做出選擇。

在分佈式存儲系統中,最多隻能實現上面的兩點。而由於當前的網絡硬件肯定會出現延遲丟包等問題,所以分區容錯性是我們必須需要實現的。所以我們只能在一致性和可用性之間進行權衡,沒有系統能同時保證這三點。

我的理解,使用mq實現最終補償事務一致性,是犧牲了同意時刻訪問同一份數據的一致性,通過一段時間的延遲,最終達到一致性。

Rocket mq實現思路

首先推薦事務和消息解耦的方式。

  • 開始事務,Prepared消息,RocketMQ會返回消息地址
  • 執行本地事務
  • 事務提交成功,通過拿到的消息地址去修改RocketMQ裡面修改消息的狀態,消息從Prepared變為發送成功;如果事務失敗,則通過消息地址去修改消息狀態,變為消息取消。
  • 消息消費方居於Push或者Pull方式消費消息成功後,向服務器發送消費成功的消息通知。

所以,在事務內需要向mq提交兩次消息請求,一次是發送,另外一次是確認(確認成功或者取消)。

如果確認消息發送失敗了,RocketMQ會定期掃描消息集群中的事務消息,這時候發現了Prepared消息,它會向消息發送者確認,所以消息生產方需要實現一個check接口,RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

  • 優點: 實現了最終一致性,不需要依賴本地數據庫事務。
  • 缺點: 實現難度大,主流MQ不支持,沒有.NET客戶端,RocketMQ事務消息部分代碼也未開源
  • 極端情況下仍然需要手工介入

Rabbit mq實現思路

和Rocket mq有所不一樣。

消息生產方:

  • 首先執行本地事務,寫消息表
  • 獲取消息表的消息記錄,向Rabbit mq發送消息
  • 接收Rabbit mq返回的消息確認接收成功通知(ACK),更新消息表;如果失敗,則不更新,保證發送消息和記錄表狀態一致。

這個會存在這樣一個問題:消息已經發送成功,但是Rabbit mq沒有返回,則無法更新消息表;或者接收到消息成功發送通知,但是更新數據庫失敗,也無法更新消息表,導致下一次重複發送。解決這個問題的思路,我只想到需要消息接收方要做冪等性檢查,從而避免重複消費消息。

消息接收方:

  • Rabbit mq通知有消息
  • 消息接收方接收消息,處理成功或者失敗,都要返回確認ACK, Rabbit mq接收到ACK,根據ACK更新消息狀態為已經發送,刪除隊列中的消息;或者更改消息列,等待從新發送。

這裡有個問題,如果消息接收方成功處理消息,但是由於特殊情況沒有返回ACK, Rabbit mq沒有接收到ACK,這條消息狀態已經改變不會再發送,需要手工處理。

消息發送方代碼示例:

配置:

rabbitmq
:
host
:
localhost
#集群配置:addresses:ip1:port1,ip2:port2,ip3:port3
port
:

5672
username
:
guest
password
:
guest
publisher
-
confirms
:

true

#確認消息是否到達broker服務器,也就是隻確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該消息返回給客戶端ack
publisher
-
returns
:

true

#確認消息是否正確到達queue,如果沒有則觸發,如果有則不觸發


package
com
.
sleb
.
springcloud
.
rabbitproducerack
.
service
;
import
com
.
sleb
.
springcloud
.
modalservice
.
Users
;
import
com
.
sleb
.
springcloud
.
rabbitproducerack
.
config
.
CorrelationDataEx
;
import
org
.
springframework
.

amqp
.
rabbit
.
core
.
RabbitTemplate
;
import
org
.
springframework
.
amqp
.
support
.
converter
.
Jackson2JsonMessageConverter
;
import
org
.
springframework
.
beans
.
factory
.
annotation
.
Autowired
;
import
org
.
springframework
.
stereotype
.
Service
;
import
java
.
util
.
Date
;

import

static
com
.
sleb
.
springcloud
.
modalservice
.
RabbitConfigInfo
.
EXCHANGE
;
import

static
com
.
sleb
.
springcloud
.
modalservice
.
RabbitConfigInfo
.
QUEUE_TWO_ROUTING
;
/**
* 如果消息沒有到exchange,則confirm回調,ack=false *
* 如果消息到達exchange,則confirm回調,ack=true *
* exchange到queue成功,則不回調return
*
* exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
* 確認方式:
* 方式一:channel.waitForConfirms( ) 普通發送方確認模式; *
* 方式二:channel.waitForConfirmsOrDie( ) 批量確認模式; *
* 方式三:channel.addConfirmListener()異步監聽發送方確認模式; *
* 採用第三種比較好,異步監聽

*/
@Service
public

class

SenderService

{

@Autowired

private

RabbitTemplate
rabbitTemplate
;

public

void
sender
(
Users
users
)

throws

Exception

{

System
.
out
.
println
(
"你好現在是 "

+

new

Date
()

+
""

);

System
.
out
.
println
(
"HelloSender發送內容 : "

+
users
.
toString
());

/**
* ConfirmCallback接口用於實現消息發送到RabbitMQ交換器後接收ack回調。
* ReturnCallback接口用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調。
*/
rabbitTemplate
.
setReturnCallback
((
message
,
replyCode
,
replyText
,
exchange
,
routingKey
)

->

{

//Users users1 = (Users)message.getBody().toString();

//String correlationId = message.getMessageProperties().getCorrelationId();

System
.

out
.
println
(
"Message : "

+

new

String
(
message
.
getBody
()));

//System.out.println("Message : " + new String(message.getBody()));

System
.
out
.
println
(
"replyCode : "

+
replyCode
);

System
.
out
.
println
(
"replyText : "

+
replyText
);

//錯誤原因

System
.
out
.
println

(
"exchange : "

+
exchange
);

System
.
out
.
println
(
"routingKey : "

+
routingKey
);
//queue名稱

});
rabbitTemplate
.
setConfirmCallback
((
correlationData
,
ack
,
cause
)

->

{

if

(
ack
)

{

CorrelationDataEx
c
=

(
CorrelationDataEx

)
correlationData
;

System
.
out
.
println
(
"發送消息: "

+
c
.
getMsg
());

System
.
out
.
println
(
"HelloSender 消息發送成功 :"

+
correlationData
.
toString
()

);

/**
* 通過設置correlationData.id為業務主鍵,消息發送成功後去繼續做候選業務。
*/

}

else

{

System
.
out

.
println
(
"HelloSender消息發送失敗"

+
cause
);

}

});

/**
* CorrelationDataEx繼承CorrelationData, 把需要發送消息的關鍵字段加入
* 這樣confirmcallback可以返回帶有關鍵字段的correlationData,我們可以通過這個來確定發送的是那條業務記錄
*/

CorrelationDataEx
c
=

new

CorrelationDataEx
();
c
.
setId
(
users
.
getId
().
toString
());
c
.
setMsg
(
users
.
toString
());

/**
* 加上這個,可以從returncallback參數中讀取發送的json消息,否則是二進制bytes
* 比如:如果returncallback觸發,則表明消息沒有投遞到隊列,則繼續業務操作,比如將消息記錄標誌位未投遞成功,記錄投遞次數
*/
rabbitTemplate
.
setMessageConverter
(
new

Jackson2JsonMessageConverter
());
rabbitTemplate
.
convertAndSend
(
EXCHANGE
,
QUEUE_TWO_ROUTING
,
users
,
c
);

}
}

消息接收方代碼示例:

配置:

rabbitmq
:
host
:
localhost
port
:

5672

username
:
guest
password
:
guest

template
:
mandatory
:

true

#messageConverter: jackson2JsonMessageConverter 這個必須在程序裡面創建bean
listener
:
simple
:
prefetch
:

1
acknowledge
-
mode
:
manual
concurrency
:

3


package
com
.
sleb
.
springcloud
.
rabbitreceiverack
.
service
;
import

com
.
rabbitmq
.
client
.
Channel
;
import
com
.
sleb
.
springcloud
.
modalservice
.
Users
;
import
org
.
springframework
.
amqp
.
core
.
Message
;
import
org
.
springframework
.
amqp
.
rabbit
.
annotation
.
RabbitHandler
;
import
org
.
springframework
.
amqp
.

rabbit
.
annotation
.
RabbitListener
;
import
org
.
springframework
.
amqp
.
support
.
converter
.
Jackson2JsonMessageConverter
;
import
org
.
springframework
.
amqp
.
support
.
converter
.
MessageConverter
;
import
org
.
springframework
.
context
.
annotation
.
Bean
;
import
org
.
springframework
.
stereotype
.

Service
;
import
java
.
io
.
IOException
;
import
java
.
util
.
Date
;
import

static
com
.
sleb
.
springcloud
.
modalservice
.
RabbitConfigInfo
.
QUEUE_ONE_ROUTING
;
@Service
public

class

Receiver

{

@RabbitHandler

@RabbitListener
(
queues
=
QUEUE_ONE_ROUTING
)

//containerFactory = "rabbitListenerContainerFactory", concurrency = "2")


public

void
process
(
Users
users
,

Channel
channel
,

Message
message
)

throws

IOException

{

System
.
out
.
println
(
"HelloReceiver收到 : "

+
users
.
toString
()

+

"收到時間"

+

new

Date
());

try


{

//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉 這樣以後就不會再發了

// 否則消息服務器以為這條消息沒處理掉 後續還會在發
channel
.
basicAck
(
message
.
getMessageProperties
().
getDeliveryTag
(),

false
);

System
.
out
.
println
(
"receiver success"
);

}

catch

(
IOException
e
)

{
e
.
printStackTrace
();

//丟棄這條消息,則不會重新發送了


//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

System
.
out
.
println
(
"receiver fail"
);

}

}

@Bean

public

MessageConverter
jackson2JsonMessageConverter
()

{

return

new

Jackson2JsonMessageConverter
();

}
}

最需要考慮的兩個問題:

  • 消息消費的順序問題:發送消息指定隊列,消息消費者指定隊列可以解決,消費者只能一個。
  • 消息消費的重複問題:每次消費消息時候創建一消息表,在消費消息前先查詢該表,如果消息存在就說明已經消費。

轉發+關注。私信“資料”獲取更多JAVA乾貨內容。


分享到:


相關文章: