阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者


阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

一、簡單的發送與接收消息 HelloWorld

1. 發送消息

發送消息首先要獲取與rabbitmq-server的連接,然後從渠道(chann)中指定的queue發送消息 , 不能定義兩個queue名字相同,但屬性不同

示例:

<code>package com.zf.rabbitmq01;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發送消息
* @author zhanghuan
*
*/
public class Sender01 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創建一個連接
Connection conn = connFac.newConnection() ;
//創建一個渠道
Channel channel = conn.createChannel() ;
//定義Queue名稱
String queueName = "queue01" ;
//為channel定義queue的屬性,queueName為Queue名稱
channel.queueDeclare( queueName , false, false, false, null) ;
String msg = "Hello World!";
//發送消息
channel.basicPublish("", queueName , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ queueName +" success!");
channel.close();

conn.close();
}
}

/<code>
<code>package com.zf.rabbitmq01;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String queueName = "queue01";
channel.queueDeclare(queueName, false, false, false, null) ;
//上面的部分,與Sender01是一樣的
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + queueName);
}
}
}
/<code>


阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

二、消息確認與公平調度消費者

從本節開始稱Sender為生產者 , Recv為消費者

1. 消息確認

為了確保消息一定被消費者處理,rabbitMQ提供了消息確認功能,就是在消費者處理完任務之後,就給服務器一個回饋,服務器就會將該消息刪除,如果消費者超時不回饋,那麼服務器將就將該消息重新發送給其他消費者默認是開啟的,在消費者端通過下面的方式開啟消息確認, 首先將autoAck自動確認關閉,等我們的任務執行完成之後,手動的去確認,類似JDBC的autocommit一樣

<code>QueueingConsumer consumer = new QueueingConsumer(channel);
Boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
/<code>

在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到消息後,就會自動反饋一個消息給服務器。

下面這個例子來測試消息確認的功能。

<code>package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**

* 發送消息
* @author zhanghuan
*
*/
public class Sender03 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創建一個連接
Connection conn = connFac.newConnection() ;
//創建一個渠道
Channel channel = conn.createChannel() ;
//定義Queue名稱
String queueName = "queue01" ;
//為channel定義queue的屬性,queueName為Queue名稱
channel.queueDeclare( queueName , false, false, false, null) ;
String msg = "Hello World!";
//發送消息
channel.basicPublish("", queueName , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ queueName +" success!");
channel.close();
conn.close();
}
}
/<code>

與Sender01.java一樣,沒有什麼區別。

<code>package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv03 {

public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String channelName = "channel01";
channel.queueDeclare(channelName, false, false, false, null) ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
//取消 autoAck
Boolean autoAck = false ;
channel.basicConsume(channelName, autoAck, consumer) ;
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
//確認消息,已經收到
channel.basicAck(delivery.getEnvelope().getDeliveryTag()
, false);
System.out.println("received message[" + msg + "] from " + channelName);
}
}
}
/<code>

注意:一旦將autoAck關閉之後,一定要記得處理完消息之後,向服務器確認消息。否則服務器將會一直轉發該消息如果將上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);註釋掉, Sender03.java只需要運行一次 , Recv03.java每次運行將都會收到HelloWorld消息

注意:但是這樣還是不夠的,如果rabbitMQ-Server突然掛掉了,那麼還沒有被讀取的消息還是會丟失 ,所以我們可以讓消息持久化。 只需要在定義Queue時,設置持久化消息就可以了,方法如下:

<code>boolean durable = true;channel.queueDeclare(channelName, durable, false, false, null);
/<code>

這樣設置之後,服務器收到消息後就會立刻將消息寫入到硬盤,就可以防止突然服務器掛掉,而引起的數據丟失了。 但是服務器如果剛收到消息,還沒來得及寫入到硬盤,就掛掉了,這樣還是無法避免消息的丟失。

阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

2. 公平調度

上一個例子能夠實現發送一個Message與接收一個Message

從上一個Recv01中可以看出,必須處理完一個消息,才會去接收下一個消息。如果生產者眾多,那麼一個消費者肯定是忙不過來的。此時就可以用多個消費者來對同一個Channel的消息進行處理,並且要公平的分配任務給多個消費者。不能部分很忙,部分總是空閒

實現公平調度的方式就是讓每個消費者在同一時刻會分配一個任務。 通過channel.basicQos(1);可以設置

列如:

當有多個消費者同時收取消息,且每個消費者在接收消息的同時,還要做其它的事情,且會消耗很長的時間,在此過程中可能會出現一些意外,比如消息接收到一半的時候,一個消費者宕掉了,這時候就要使用消息接收確認機制,可以讓其它的消費者再次執行剛才宕掉的消費者沒有完成的事情。另外,在默認情況下,我們創建的消息隊列以及存放在隊列裡面的消息,都是非持久化的,也就是說當RabbitMQ宕掉了或者是重啟了,創建的消息隊列以及消息都不會保存,為了解決這種情況,保證消息傳輸的可靠性,我們可以使用RabbitMQ提供的消息隊列的持久化機制。

阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

生產者:

<code>import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class ClientSend1 {
public static final String queue_name="my_queue";
public static final Boolean durable=true;
//消息隊列持久化
public static void main(String[] args)
throws java.io.IOException{
ConnectionFactory factory=new ConnectionFactory();
//創建連接工廠
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");

factory.setPassword("123456");
Connection connection=factory.newConnection();
//創建連接
Channel channel=connection.createChannel();
//創建信道
channel.queueDeclare(queue_name, durable, false, false, null);
//聲明消息隊列,且為可持久化的
String message="Hello world"+Math.random();
//將隊列設置為持久化之後,還需要將消息也設為可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Send message:"+message);
channel.close();
connection.close();
}
}
/<code>

說明:行17 和行20 需要同時設置,也就是將隊列設置為持久化之後,還需要將發送的消息也要設置為持久化才能保證隊列和消息一直存在

消費者:

<code>import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ClientReceive1 {
public static final String queue_name="my_queue";
public static final Boolean autoAck=false;
public static final Boolean durable=true;
public static void main(String[] args)
throws java.io.IOException,java.lang.InterruptedException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(queue_name, durable, false, false, null);

System.out.println("Wait for message");
channel.basicQos(1);
//消息分發處理
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicConsume(queue_name, autoAck, consumer);
while(true){
Thread.sleep(500);
QueueingConsumer.Delivery deliver=consumer.nextDelivery();
String message=new String(deliver.getBody());
System.out.println("Message received:"+message);
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
}
}
}
/<code>

說明:行22: 設置RabbitMQ調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完並且已經對剛才處理的消息進行確認之後, 才發送下一條消息,防止消費者太過於忙碌。如下圖所示:

阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

三、發佈/訂閱消息

前面都是一條消息只會被一個消費者處理。

如果要每個消費者都處理同一個消息,rabbitMq也提供了相應的方法。

在以前的程序中,不管是生產者端還是消費者端都必須知道一個指定的QueueName才能發送、獲取消息。 而rabbitMQ消息模型的核心思想是生產者不會將消息直接發送給隊列。

因為,生產者通常不會知道消息將會被哪些消費者接收。

生產者的消息雖然不是直接發送給Queue,但是消息會交給Exchange,所以需要定義Exchange的消息分發模式 ,之前的程序中,有如下一行代碼:

<code>channel.basicPublish("", queueName , null , msg.getBytes());
/<code>

第一個參數為空字符串,其實第一個參數就是ExchangeName,這裡用空字符串,就表示消息會交給默認的Exchange。

下面我們將自己定義Exchange的屬性。

<code>package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;
/**
* 發送消息
* @author zhanghuan
*
*/
public class Sender04 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創建一個連接
Connection conn = connFac.newConnection() ;
//創建一個渠道
Channel channel = conn.createChannel() ;
//定義ExchangeName,第二個參數是Exchange的類型,fanout表示消息將會分列發送給多賬戶
String exchangeName = "news" ;
channel.exchangeDeclare(exchangeName, "fanout") ;
String msg = "Hello World!";
//發送消息,這裡與前面的不同,這裡第一個參數不再是字符串,而是ExchangeName ,第二個參數也不再是queueName,而是空字符串
channel.basicPublish( exchangeName , "" , null , msg.getBytes());
System.out.println("send message[" + msg + "] to exchange "+ exchangeName +" success!");
channel.close();
conn.close();
}
}
/<code>

Send04.java 發送消息時沒有指定的queueName 用的空字符串代替的。 Exchange的類型有direct, topic, headers 、 fanout四種,上面用的是fanout類型

<code>package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv04_01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = "news" ;
channel.exchangeDeclare(exchangeName, "fanout") ;
//這裡使用沒有參數的queueDeclare方法創建Queue並獲取QueueName
String queueName = channel.queueDeclare().getQueue() ;
//將queue綁定到Exchange中
channel.queueBind( queueName, exchangeName, "") ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + queueName);
}
}
}
/<code>

Recv04_01.java 使用channel.queueDeclare()方法創建了一個Queue,該Queue有系統創建,並分配了一個隨機的名稱。 然後將該Queue與與Exchange綁定在一起。 該Queue就能從Exchange中後去消息了。

測試

將Recv04_01.java 文件複製幾份 Recv04_02.java Recv04_03.java,然後執行Recv04_01 與 Recv04_02,接下來執行Sender04發送消息,可以看到Recv04_01 與Recv04_02都接收到了消息。然後執行Recv04_03,沒有獲取到任何消息。接下來再執行Sender04發送消息,可以看到Recv04_01 、Recv04_02與Recv04_03都接收到了消息。

說明Exchange在收到生產者的消息後,會將消息發送給當前已經與它綁定了的所有Queue 。 然後被移除。

阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

四、消息路由

生產者會生產出很多消息 , 但是不同的消費者可能會有不同的需求,只需要接收指定的消息,其他的消息需要被過濾掉。 這時候就可以對消息進行過濾了。 在消費者端設置好需要接收的消息類型。

如果不使用默認的Exchange發送消息,而是使用我們自定定義的Exchange發送消息,那麼下面這個方法的第二個參數就不是QueueName了,而是消息的類型。

<code>channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
/<code>

示例:

<code>package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發送消息
* @author zhanghuan
*
*/
public class Sender05 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創建一個連接
Connection conn = connFac.newConnection() ;
//創建一個渠道
Channel channel = conn.createChannel() ;
String exchangeName = "exchange02";
String messageType = "type01";
channel.exchangeDeclare(exchangeName, "direct") ;
//定義Queue名
String msg = "Hello World!";
//發送消息
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
channel.close();
conn.close();
}
}
/<code>
<code>package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv05_01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = "exchange02";
channel.exchangeDeclare(exchangeName, "direct") ;
String queueName = channel.queueDeclare().getQueue() ;
//第三個參數就是type,這裡表示只接收type01類型的消息。
channel.queueBind(queueName, exchangeName, "type01") ;
//也可以選擇接收多種類型的消息。只需要再下面再綁定一次就可以了
channel.queueBind(queueName, exchangeName, "type02") ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + exchangeName);
}
}
}
/<code>

這時,啟動Recv05_01.java 然後啟動Sender05.java ,消費者端就會收到消息。然後將Sender05.java 中的messageType分別改為type02 type03 然後發送消息 , 可以看到消費者端能接收到type02的消息,但是不能接收到type03的消息。

阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者

五、Topic類型消息

上一節中使用了消息路由,消費者可以選擇性的接收消息。 但是這樣還是不夠靈活。

比如某個消費者要訂閱娛樂新聞消息 。 包括新浪、網易、騰訊的娛樂新聞。那麼消費者就需要綁定三次,分別綁定這三個網站的消息類型。 如果新聞門戶更多了,那麼消費者將要綁定個更多的消息類型, 其實消費者只是需要訂閱娛樂新聞,不管是哪個網站的新聞,都需要。 那麼在rabbitMQ中可以使用topic類型。 模糊匹配消息類型。

模糊匹配中的 *代表一個 #代表零個或1個

示例:

<code>package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv06_01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = "exchange03";
channel.exchangeDeclare(exchangeName, "topic") ;
String queueName = channel.queueDeclare().getQueue() ;
//第三個參數就是type,這裡表示只接收type01類型的消息。
channel.queueBind(queueName, exchangeName, "#.type01") ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環獲取消息
while(true){
//獲取消息,如果沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + exchangeName);
}
}
}

/<code>
<code>package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發送消息
* @author zhanghuan *
*/
public class Sender06 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創建一個連接
Connection conn = connFac.newConnection() ;
//創建一個渠道
Channel channel = conn.createChannel() ;
String exchangeName = "exchange03";
String messageType = "fs.type01";
channel.exchangeDeclare(exchangeName, "topic") ;
//定義Queue名
String msg = "Hello World!";
//發送消息
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
channel.close();
conn.close();
}
}
/<code>

使用topic之後 。不管Sender端發送的消息類型是fs.type01 還是 xx.type01 還是 type01 ,消費者都會收到消息

六、RPC 遠程過程調用

當客戶端想要調用服務器的某個方法來完成某項功能時,就可以使用rabbitMQ支持的PRC服務。

其實RPC服務與普通的收發消息的區別不大, RPC的過程其實就是客戶端向服務端定義好的Queue發送消息,其中攜帶的消息就應該是服務端將要調用的方法的參數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。

示例:

<code>package com.zf.rabbitmq07;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCServer {
public static final String RPC_QUEUE_NAME = "rpc_queue";
public static String sayHello(String name){
return "hello " + name ;
}
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("localhost");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
while(true){
System.out.println("服務端等待接收消息..");
Delivery deliver = consumer.nextDelivery() ;
System.out.println("服務端成功收到消息..");
BasicProperties props = deliver.getProperties() ;
String message = new String(deliver.getBody() , "UTF-8") ;
String responseMessage = sayHello(message) ;
BasicProperties responseProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId())
.build() ;
//將結果返回到客戶端Queue
channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;

//向客戶端確認消息
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
System.out.println("服務端返回消息完成..");
}
}
}
/<code>
<code>package com.zf.rabbitmq07;
import java.io.IOException;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCClient {
public static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("localhost");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
//響應QueueName ,服務端將會把要返回的信息發送到該Queue
String responseQueue = channel.queueDeclare().getQueue() ;
String correlationId = UUID.randomUUID().toString() ;
BasicProperties props = new BasicProperties.Builder()
.replyTo(responseQueue)
.correlationId(correlationId)
.build();
String message = "is_zhoufeng";
channel.basicPublish( "" , RPC_QUEUE_NAME , props , message.getBytes("UTF-8"));
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume( responseQueue , consumer) ;
while(true){
Delivery delivery = consumer.nextDelivery() ;
if(delivery.getProperties().getCorrelationId().equals(correlationId)){
String result = new String(delivery.getBody()) ;
System.out.println(result);
}
}
}
}/<code>

寫在最後:

  • 針對於Java程序員,筆者最近整理了一些面試真題,思維導圖,程序人生等PDF學習資料;
  • 關注私信我"86",即可獲取!
  • 希望讀到這的您能點個小贊和關注下我,以後還會更新技術乾貨,謝謝您的支持!
阿里P7架構師的RabbitMQ學習筆記告訴你什麼叫做消息隊列王者


分享到:


相關文章: