MQ的使用場景大概包括解耦,提高峰值處理能力,送達和排序保證,緩衝等。
MQ概述
消息隊列技術是分佈式應用間交換信息的一種技術。
消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。
通過消息隊列,應用程序可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程序接收此消息。
MQ主要作用是接收和轉發消息。 你可以想想在生活中的一種場景:當你把信件的投進郵筒,郵遞員肯定最終會將信件送給收件人。我們可以把MQ比作 郵局和郵遞員 。
MQ和郵局的主要區別是,它不處理消息,但是,它會接受數據、存儲消息數據、轉發消息。
RabbitMQ術語
生產者:
消息發送者,在MQ中被稱為生產者( producer ),一個發送消息的應用也被叫做生產者,用 P 表示
消費者:
生產者“生產”出消息後,最終由誰消費呢?等待接受消息的應用程序,我們稱之為消費者( Consuming ),用 C 表示
隊列:
消息只能存儲在隊列( queue )中。儘管消息在rabbitMQ和應用程序間流通,但是隊列卻是存在於RabbitMQ內部。
一個隊列不受任何限制,它可以存儲你想要存儲的消息量,它本質上是一個無限的緩衝區。
多個生產者可以向同一個隊列發送消息,多個消費者可以嘗試從同一個消息隊列中接收數據。
一個隊列像下面這樣(上面是它的隊列名稱)
注意:
生產者、消費者、中間件不必在一臺機器上,實際應用中也是絕大多數不在一起的。我們可以用一張圖表示RabbitMQ的構造:
注:此圖片摘自於 百度百科RabbitMQ 。
RabbitMQ 實現RPC
(RPC) Remote Procedure Call Protocol 遠程過程調用協議,它是一種通過 網絡 從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。 RPC協議 假定某些 傳輸協議 的存在,如TCP或UDP,為通信程序之間攜帶信息數據。
在一個大型的公司,系統由大大小小的服務構成,不同的團隊維護不同的代碼,部署在不同的機器。但是在做開發時候往往要用到其它團隊的方法,因為已經有了實現。但是這些服務部署不同的機器上,想要調用就需要網絡通信,這些代碼繁瑣且複雜,一不小心就會寫的很低效。RPC協議定義了規劃,其它的公司都給出了不同的實現。比如微軟的wcf,以及現在火熱的WebApi。
在RabbitMQ中RPC的實現也是很簡單高效的,現在我們的客戶端、服務端都是消息發佈者與消息接收者。
首先客戶端通過RPC向服務端發出請求
我這裡有一堆東西需要你給我處理一下,correlation_id:這是我的請求標識,erply_to:你處理完過後把結果返回到這個隊列中。
服務端拿到了請求,開始處理並返回
correlation_id:這是你的請求標識 ,原封不動的給你。 這時候客戶端用自己的correlation_id與服務端返回的id進行對比。是我的,就接收。
在我們發佈消息的時候,會調用channel對象的 BasicPublish 方法,這個方法中有一個 IBasicProperties 的參數 basicProperties 。
在這對象中,有一個 ReplyTo 屬性,我們可以將生產者監聽的消息隊列名稱存放在裡面。當消費者程序接收到這條消息的時候,就可以在Receive事件的ea對象中獲取ReplyTo屬性的值
<code>var
props = channel.CreateBasicProperties();
=replyQueueName;
var
messageBytes = Encoding.UTF8.GetBytes(message);
:"",
routingKey
:"rpc_queue",
basicProperties
:props,
body
:messageBytes);
/<code>
那麼當消息生產者接收到消息消費者任務完成的消息之後,該如何確定完的是哪一個任務呢?
在現實情況,消息生產者通常會發出多個任務,多個消息消費者分別進行不同的任務,這時候我們就需要知道是哪個消息消費者完成了任務。
當消息生產者調用channel對象的 BasicPublish 方法發送消息時, IBasicProperties 對象除了可以幫助我們傳遞消息生產者監聽的消息隊列名,還可以幫我們傳遞一個 CorrelationId (相關Id),當發送任務消息的時候,我們給每個任務消息定義一個唯一的相關Id, 並存儲在 IBasicProperties 對象的 CorrelationId 屬性中。
<code>var
properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueueName; properties.CorrelationId = Guid.NewGuid().ToString(); /<code>
這樣消息消費者在接收到任務消息時,可以從Receive的ea參數中獲取 CorrelationId 。當任務完成時,再將保存有這個 CorrelationId 的任務完成消息發送到消息生產者關注的消息隊列中, 消息生產者就可以知道是哪個任務完成了
一些繁瑣的細節rabbitmq已經為我們封裝了,簡單的 SimpleRpcServer 與 SimpleRpcClient 讓Rpc實現的更為方便。
開發指南: RabbitMQ .NET/C# Client API Guide
API文檔:
https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.2.2/rabbitmq-dotnet-client-3.2.2-client-htmldoc/html/index.html
Client
<code>static
void
Main
(string
[] args) {var
factory =new
ConnectionFactory() { HostName ="localhost"
};using
(var
connection = factory.CreateConnection()) {using
(var
channel = connection.CreateModel()) { SimpleRpcClient client =new
SimpleRpcClient(channel,new
PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName:string
.Empty, routingKey:"RpcQueue"
));var
prop = channel.CreateBasicProperties(); prop.CorrelationId = Guid.NewGuid().ToString(); IBasicProperties outProp;var
msg = client.Call(prop, Encoding.UTF8.GetBytes(args[0
]),out
outProp);if
(prop.CorrelationId == outProp.CorrelationId) { Console.WriteLine($"Task
{prop.CorrelationId}
completed."); Console.WriteLine(Encoding.UTF8.GetString(msg)); } } } } /<code>
Server
- 創建MySimpleRpcServer類,繼承自SimpleRpcServer類
- HandleSimpleCall方法裡添加回調返回值
- ProcessRequest方法為任務處理方法
- 使用server.MainLoop() 啟動服務
<code>public
class
MySimpleRpcServer
:SimpleRpcServer
{public
MySimpleRpcServer
(Subscription subscription
) :base
(subscription
) { }public
override
byte
[]HandleSimpleCall
(bool
isRedelivered, IBasicProperties requestProperties,byte
[] body,out
IBasicProperties replyProperties) { replyProperties =null
;return
Encoding.UTF8.GetBytes($"
{DateTime.Now.ToString(
"yyyy-MM-dd HH:mm:ss"
)} Task{requestProperties.CorrelationId}
Completed."); }public
override
void
ProcessRequest
(BasicDeliverEventArgs evt
) { Console.WriteLine("[x] Received {0}"
, Encoding.UTF8.GetString(evt.Body)); Thread.Sleep(4000
);base
.ProcessRequest(evt); } } /<code>
Program.cs
<code>static
void
Main
(string
[] args) {var
factory =new
ConnectionFactory() { HostName ="localhost"
};using
(var
connection = factory.CreateConnection()) {using
(var
channel = connection.CreateModel()) { channel.QueueDeclare("RpcQueue"
,true
,false
,false
,null
); SimpleRpcServer rpc =new
MySimpleRpcServer(new
Subscription(channel,"RpcQueue"
)); rpc.MainLoop(); Console.ReadKey(); } } } /<code>
參考文檔: RabbitMQ學習筆記(六) RPC 含手工實現RabbitMQ的RPC、使用SimpleRpcClient類和SimpleRpcServer類實現RPC的簡單示例。
簡易RPC框架-心跳與重連機制
參考文檔: 簡易RPC框架-心跳與重連機制
心跳
就是告訴其它人自己還活著。在簡易RPC框架中,採用的是TCP長連接,為了確保長連接有效,就需要客戶端與服務端之間有一種通知機制告知對方的存活狀態。
如何實現
客戶端發送心跳消息
在狀態空閒的時候定時給服務端發送消息類型為PING消息。
服務端接收心跳消息
捕獲通道空閒狀態事件,如果接收客戶端PING消息,則發送PONG消息給服務端。如果在一定時間內沒有收到客戶端的PING消息,則說明客戶端已經不在線,此時關閉通道。
客戶端管理可用連接
由於服務端會因為長時間接收不到服務端的PING消息而關閉通道,這就導致緩存在客戶端的連接的可用性發生變化。需要將不可用的從可用列表中轉移出去,並對不可用連接進行處理,比如直接丟棄或者是重新連接。
預備知識
ChannelPipeline與handle的關係。netty中的這些handle和spring mvc中的filter作用是類似的,ChannelPipeline可以理解成handle的容器,裡面可以被註冊眾多處理不同業務功能的事件處理器,比如:
- 編碼
- 解碼
- 心跳
- 權限
- 加密
- 解密
- 業務代碼執行
- ......
具體實現
空閒狀態處理器
可以利用netty提供的IdleStateHandler來發送PING-PONG消息。這個處理器主要是捕獲通道超時事件,主要有三類
- 讀超時,一定時間內沒有從通道內讀取到任何數據
- 寫超時,一定時間內沒有從通道內寫入任何數據
- 讀寫超時,一定時間內沒有從通道內讀取或者是寫入任何數據
客戶端加入空閒狀態處理器
客戶端捕獲讀寫超時,如果事件觸發就給服務端發送PING消息。
服務端加入空閒狀態處理器
服務端只需要捕獲讀超時即可,當讀超時觸發後就關閉通道。
為什麼在空閒狀態才發送心跳消息
在正常客戶端與服務端有交互的情況下,說明雙方都在正常工作不需要額外的心跳來告知對方的存活。只有雙方在一定時間內沒有接收到對方的消息時才開始採用心跳消息來探測對方的存活,這也是一種提升效率的做法。
抽象心跳處理器
創建AbstractHeartbeatHandler,並繼承
ChannelInboundHandlerAdapter,服務於客戶端與服務端的心跳處理器。在讀取方法中判斷消息類型:
- 如果是PING消息就發送PONG消息給客戶端
- 如果收到的是PONG消息,則直接打印消息說明客戶端已經成功接收到服務端返回的PONG消息
- 如果是其它類型的消息,則通知下一個處理器處理消息
<code>public
void
channelRead
(ChannelHandlerContext channelHandlerContext, Object msg)
throws
Exception {if
(!(msginstanceof
RpcMessage)){ channelHandlerContext.fireChannelRead(msg);return
; } RpcMessage message=(RpcMessage)msg;if
(null
==message||null
==message.getMessageHeader()){ channelHandlerContext.fireChannelRead(msg);return
; }if
(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){ logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}"
,message.getMessageBody()); }else
if
(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){this
.sendPong(channelHandlerContext); }else
{ channelHandlerContext.fireChannelRead(msg); } } /<code>
空閒狀態事件,可以根據不同的狀態做不同的行為處理,定義三個可重寫事件供客戶端與服務端處理器具體確認處理事件。
<code>public
void
userEventTriggered
(ChannelHandlerContext ctx, Object evt)
throws
Exception {if
(evtinstanceof
IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt;switch
(e.state()) {case
READER_IDLE:this
.handleReaderIdle(ctx);break
;case
WRITER_IDLE:this
.handleWriterIdle(ctx);break
;case
ALL_IDLE:this
.handleAllIdle(ctx);break
;default
:break
; } } } /<code>
客戶端心跳處理器
繼承抽象心跳處理器,並重寫事件發送PING消息。
<code>public
class
ClientHeartbeatHandler
extends
AbstractHeartbeatHandler
{protected
void
handleAllIdle
(ChannelHandlerContext ctx)
{this
.sendPing(ctx); } } /<code>
服務端心跳處理器
繼承抽象心跳處理器,並重寫事件關閉通道。
<code>public
class
ServerHeartbeatHandler
extends
AbstractHeartbeatHandler
{protected
void
handleReaderIdle
(ChannelHandlerContext ctx)
{ logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel"
); ctx.close(); } } /<code>
客戶端ChannelPipeline中加入心跳處理器
比如5秒內未寫入或者讀取通道數據就觸發超時事件。
<code>.addLast
(new
IdleStateHandler
(0, 0,Constants
.ALLIDLE_TIME_SECONDS
)); /<code>
服務端ChannelPipeline中加入心跳處理器
比如10秒未接收到通道消息就觸發讀超時事件。
<code>.addLast
(new
IdleStateHandler
(Constants
.READER_TIME_SECONDS
, 0, 0)) /<code>
客戶端消息示例
正常情況下心跳消息顯示如下圖所示,消息的內容可以根據自己的情況自行定義。
客戶端下線消息示例
停止客戶端程序,然後服務端讀超時事件觸發,並關閉通道。
客戶端可用連接管理
由於上述的服務端心跳處理器,在觸發讀超時後會關閉通信管道,這導致客戶端緩存的連接狀態會出現不可用的情況,為了讓客戶端一直只能取到可用連接就需要對從緩存中獲取到的連接做狀態判斷,如果可用直接返回,如果不可用則將連接從可用列表中刪除然後取下一個可用連接。
修改獲取連接方法
通過channel的isActive屬性可以判斷連接是否可用,如果不可以做刪除並重新獲取的操作。
<code>public
RpcClientInvoker getInvoker() { int index = loadbalanceService.index(size); RpcClientInvoker invoker= RpcClientInvokerCache.get
(index);if
(invoker.getChannel().isActive()) {return
invoker; }else
{ RpcClientInvokerCache.removeHandler(invoker); logger.info("invoker is not active,so remove it and get next one"
);return
this
.getInvoker(); } } /<code>
後臺啟動任務處理不可用連接
啟動一個每隔5秒執行一次任務的線程,定時取出不可用連接,然後重連,並將不可用連接刪除。
這裡我處理的重連是直接丟棄原有不可用連接,然後重新創建新連接。
<code>private
static
final
Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.
class
);static
{ executorService.schedule(new
Runnable() {public
void
run
()
{while
(true
) { List notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();if
(!CollectionUtils.isEmpty(notConnectedHandlers)) {for
(RpcClientInvoker invoker : notConnectedHandlers) { RpcClientInvokerManager.getInstance(referenceConfig).connect(); } RpcClientInvokerCache.clearNotConnectedHandler(); } } } }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS); } /<code>
本文源碼
https://github.com/jiangmin168168/jim-framework