RabbitMQ原理與相關操作(三)消息持久化

現在聊一下RabbitMQ消息持久化:

問題及方案描述

1.當有多個消費者同時收取消息,且每個消費者在接收消息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現一些意外,比如消息接收到一半的時候,一個消費者死掉了。

這種情況要使用消息接收確認機制,可以執行上次宕機的消費者沒有完成的事情。

2.在默認情況下,我們程序創建的消息隊列以及存放在隊列裡面的消息,都是非持久化的。當RabbitMQ死掉了或者重啟了,上次創建的隊列、消息都不會保存。

這種情況可以使用RabbitMQ提供的消息隊列的持久化機制。

相關理論描述

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我個人覺得大多數開發人員都會選擇持久化。

隊列和交換機有一個創建時候指定的標誌durabledurable的唯一含義就是具有這個標誌的隊列和交換機會在重啟之後重新建立,它不表示說在隊列當中的消息會在重啟後恢復。

消息隊列持久化包括3個部分:

1、exchange持久化,在聲明時指定durable => true2、queue持久化,在聲明時指定durable => true3、消息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)

如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

注意:一旦創建了隊列和交換機,就不能修改其標誌了。例如,如果創建了一個non-durable的隊列,然後想把它改變成durable的,唯一的辦法就是刪除這個隊列然後重現創建

程序示例

生產者

<code>class Producter
{
const string ExchangeName = "eric.exchange";
const string QueueName = "eric.queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的

channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);

string message = "Eric is very handsome";
var body = Encoding.UTF8.GetBytes(message);

//將隊列設置為持久化之後,還需要將消息也設為可持久化的
var props = channel.CreateBasicProperties();
props.SetPersistent(true);

channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);

Console.WriteLine("Producter Sent: {0}", message);
Console.ReadKey();
}
}
}/<code>

注:ack是 acknowledgments 的縮寫,noAck 是("no manual acks")

因為我前段時間換了筆記本,所以用戶的“eric”的操作出踩了個坑,下面進行介紹下:

如果調試運行時報錯:None of the specified endpoints were reachable

innerException是:

<code>{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"Unexpected Exception\", classId=0, methodId=0, cause=System.IO.IOException: 無法從傳輸連接中讀取數據: 遠程主機強迫關閉了一個現有的連接。。 ---> System.Net.Sockets.SocketException: 遠程主機強迫關閉了一個現有的連接。\\r\\n   在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\\r\\n   在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\\r\\n   --- 內部異常堆棧跟蹤的結尾 ---\\r\\n   在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\\r\\n   在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\\r\\n   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\\r\\n   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}/<code>

這說明我們使用的用戶 不是 系統默認的 guest 而是我們自己創建的用戶

,但是沒有足夠的權限進行操作。

解決辦法:

<code>rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"/<code>

執行結果:

RabbitMQ原理與相關操作(三)消息持久化

相關其他操作見:windows下 安裝 rabbitMQ 及操作常用命令

程序運行結果:

RabbitMQ原理與相關操作(三)消息持久化

消費者

<code>class Recevice
{
const string ExchangeName = "eric.exchange";
const string QueueName = "eric.queue";
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);

BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
//NoAck:true 告訴RabbitMQ立即從隊列中刪除消息,另一個非常受歡迎的方式是從隊列中刪除已經確認接收的消息,可以通過單獨調用BasicAck 進行確認:
//BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
var msgContent = Encoding.UTF8.GetString(msgResponse.Body);

Console.WriteLine("The received content:"+msgContent);

channel.BasicAck(msgResponse.DeliveryTag, multiple: false);
//使用BasicAck方式來告之是否從隊列中移除該條消息
//需要額外注意,比如從隊列中獲取消息並用它來操作數據庫或日誌文件時,如果出現操作失敗時,則該條消息應該保留在隊列中,只到操作成功時才從隊列中移除。
Console.ReadKey();
}
}
}/<code>

接受消息還有一種方法,就是通過基於推送的事件訂閱。可以使用內置的 QueueingBasicConsumer 提供簡化的編程模型,允許在共享隊列上阻塞,直到收到一條消息。

<code>var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue();
var msgContent = Encoding.UTF8.GetString(msgResponse.Body);/<code>

程序運行結果:

RabbitMQ原理與相關操作(三)消息持久化


原文鏈接:https://www.cnblogs.com/ericli-ericli/p/5938106.html


分享到:


相關文章: