RabbitMQ 的安裝以及介紹

RabbitMQ 的安裝以及介紹

簡介

RabbitMQ(消息隊列/消息中間件)是流行的開源消息隊列系統,是AMQP(Advanced Message Queuing Protocol高級消息隊列協議)的標準實現。用Erlang語言開發。RabbitMQ具有良好的性能和實效性,同時還支持集群和負載部署,非常適用於在較大規模的分佈式系統中使用。

Java常見的消息隊列技術:RabbitMQ、ActiveMQ、Redis(List數據格式)、JMS(Java Message Service)等。

RabbitMQ官網:http://www.rabbitmq.com/

消息隊列比較:http://blog.csdn.net/sunxinhere/article/details/7968886

RabbitMQ下載地址:http://www.rabbitmq.com/download.html

安裝

安裝erlang依賴。

  1. 安裝erlang的rpm庫。

下載erlang的倉庫:

wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm (如果提示wget不是一個命令執行yum install wget)

安裝erlang倉庫:rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

  1. 安裝erlang:yum install erlang

安裝RabbitMQ

  1. 解壓rabbitmq-server-generic-unix-3.6.12.tar.xz。

(*.tar.xz 需要使用xz -d 解壓後在 tar -xf 解壓)

(安裝xz: yum install xz.x86_64)①

② xz -d rabbitmq-server-generic-unix-3.6.12.tar.xz

  1. 進入sbin目錄啟動rabbitMQ:./rabbitmq-server start

RabbitMQ配置

啟動RabbitMQ後執行:

  1. ./rabbitmqctl add_user admin 123456(創建用戶)
  2. ./rabbitmqctl set_user_tags admin administrator(將創建好的用戶加入管理員)
  3. ./rabbitmqctl set_permissions -p "/" admin "." "." ".*"(授權)
  4. ./rabbitmq-plugins enable rabbitmq_management 啟動RabbitMQ管理頁面
  5. 重啟MQ服務、開放5672/15672/25672端口。在瀏覽器輸入ip:15672後出現RabbitMQ管理臺。

消息隊列運行原理

從上圖可以看出、生產者、消息隊列、消費者是最重要的三個概念:

生產者(Producer):創建消息到消息隊列中。

消息隊列(Queue):存儲消息。

消費者(Consumer):監聽指定消息隊列,當消息隊列接收到消息後,獲取消息並處理。

RabbitMQ:創建消息隊列,並提供API給生產者和消費者進行存取消息。

生產者產生消息並調用RabbitMQ的API將消息加入到對應的消息隊列中,消費者通過RabbitMQ的API從消息隊列中獲取消息進行消費。

RabbitMQ核心

RabbitMQ的核心由虛擬機、交換機、隊列、綁定這四部分組成。

虛擬機(Virtual Host)

在RabbitMQ中虛擬機主要是用於控制權限顆粒度,RabbitMQ默認的虛擬機路徑為/。一個虛擬機包含一組交換機、隊列和綁定。

例如admin的virtual host為/、zs的virtual host為/test。假設RabbitMQ Server中有/test /hello /demo,那麼admin可以操作全部虛擬機下的隊列而zs只可以操作/test下的隊列。

交換機/路由器(Exchange)

RabbitMQ中的交換機本身並不進行消息的存儲,主要用做進行消息的轉發到指定規則的隊列內。生產者在傳遞消息時並不是直接傳到隊列中,而是先發送給交換機,交換機會通過消息中的routing_key按照特定的路由算法轉發給路由綁定的對應隊列。Exchange可以和多個Queue綁定,Exchange本身也可以進行持久化、臨時、自動刪除。

路由規則

直接交換器(Dirct)

Dirct的工作方式類似單播,Exchange會將x消息轉發至ROUTING_KEY完全匹配的隊列中。

廣播交換器(Fanout)

Fanout的工作方式就是廣播,Exchange會將消息轉發至所有與其綁定的隊列中,而不管ROUTING_KEY的是什麼。

主題交換器(Topic)

Topic的工作方式是模糊匹配,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的所有隊列。比如ROUTING_KEY為user.dj的Message會轉發給綁定匹配模式為包含*user.dj*的隊列( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)。

頭交換機(Headers)[瞭解]

Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以理解成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。

綁定binding

所謂的binding就是將交換機與一個特定的隊列綁定起來,且交換機與隊列的關係可以是一對一、一對多、多對多。

消息隊列(queue)

Queue是用來存儲消息的容器,RabbitMQ提供了FIFO(先進先出)的機制,可以緩存消息也可以將消息持久化、臨時或者自動刪除。

設置為持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統奔潰後數據丟失。

設置為臨時隊列,queue中的數據在系統重啟之後就會丟失

設置為自動刪除的隊列,當不存在用戶(生產者和消費者)連接到server,隊列中的數據會被自動刪除。

(消費者消費完消息後消息就不存在了和設置了什麼狀態無關)

Message(消息體)

Message:由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等。而Body是真正需要傳輸的數據。

通信過程

假設P1和C1註冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。基本的通信流程大概如下所示:

  1. P1生產消息,發送給服務器端的Exchange。
  2. Exchange收到消息,根據ROUTINKEY,將消息轉發給匹配的Queue1。
  3. Queue1收到消息,將消息發送給訂閱者C1。
  4. C1收到消息,發送ACK給隊列確認收到消息。
  5. Queue1收到ACK,刪除隊列中緩存的此條消息

Consumer收到消息時需要顯式的向rabbit broker發送basic.ack消息或者consumer訂閱消息時設置auto_ack參數為true。在通信過程中,隊列對ACK的處理有以下幾種情況:

  1. 如果consumer接收了消息,發送ack,rabbitmq會刪除隊列中這個消息,發送另一條消息給consumer。
  2. 如果cosumer接受了消息, 但在發送ack之前斷開連接,rabbitmq會認為這條消息沒有被deliver,在consumer在次連接的時候,這條消息會被redeliver。
  3. 如果consumer接受了消息,但是程序中有bug,忘記了ack,rabbitmq不會重複發送消息。
  4. rabbitmq2.0.0和之後的版本支持consumer reject某條(類)消息,可以通過設置requeue參數中的reject為true達到目地,那麼rabbitmq將會把消息發送給下一個註冊的consumer。


分享到:


相關文章: