RabbitMQ是一個消息中間件,用來接收消息,轉發消息。可以認為消息中間件就是一家郵政局,你把信投到郵箱,郵局的人就會幫你遞送信件到接收者手上。從這個意義上講,RabbitMQ兼具郵箱、郵局和郵差的功能。
幾種角色
- 生產者:消息的生產者,負責產生消息。
- 隊列:RabbitMQ中的消息的緩衝區,用來存放消息,隊列僅受限於服務器的內存和磁盤空間。
- 消費者:從RabbitMQ中接收消息,消費消息。
搭建環境
Erlang語言是RabbitMQ的底層語言,安裝RabbitMQ需要安裝Erlang。在Windows環境下安裝示例:
先安裝erlang,全部配置默認就行
再安裝RabbitMQ,全部配置默認就行
安裝後RabbitMQ作為windows的服務自動運行。
寫一個Hello World
RabbitMQ支持非常多的語言:
- Python
- Java
- Ruby
- PHP
- C#
- JavaScript
- Go
- Elixir
- Objective-C
- Swift
- Spring AMQP
這裡以Java為例,IDE採用IntelliJ:
1.創建Java工程
pom.xml文件如下:
<project> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0/<modelversion>
<groupid>rabbitmq-demo/<groupid>
<artifactid>rabbitmq-java/<artifactid>
<version>1.0-SNAPSHOT/<version>
<dependencies>
<dependency>
<groupid>com.rabbitmq/<groupid>
<artifactid>amqp-client/<artifactid>
<version>5.5.2/<version>
/<dependency>
<dependency>
<groupid>com.rabbitmq/<groupid>
<artifactid>rabbitmq-client/<artifactid>
/<dependency>
/<dependencies>
/<project>
2.編寫消息發送代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
發送者、RabbitMQ服務器和接收者可以運行在不同的地方,只需要將localhost改成RabbitMQ的IP就行。
注意將IntelliJ中的language level設為8,否則有可能編譯不通過
3.編寫消息接收代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
4.測試運行
運行Recv.java
運行Send.java
查看Recv的輸出,已經接收到消息
用RabbitMQ做任務分發
此場景為創建工作隊列去做耗時的任務,RabbitMQ分發任務到多個消費者。
目的是為了讓資源密集型任務不阻塞主進程,將任務封裝為一個消息併發送到隊列中。
1.編寫發送者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
通過參數傳遞要發送的消息
2.編寫接收者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private final static String TASK_QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e){
e.printStackTrace();
}
finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
我們解析接收到的消息,消息中有多少個點就讓進程等待幾秒,模擬耗時任務。
3.測試運行
首先配置IntelliJ,讓Worker.java可以並行運行
運行兩次Worker
運行NewTask,我們也會運行多次NewTask,每次傳不一樣的參數。但是和Worker不同的是NewTask發送完消息就會退出,所以不需要並行運行
首先配置參數
運行多次,一次傳遞參數:
First message.
Second message..
Third message...
Fourth message....
Fifth message.....
分別查看兩個Worker的輸出
可以看到RabbitMQ採用循環的方式輪流給兩個worker發送任務。
更復雜的場景請參考官方文檔,包括消息是否正確處理,發佈訂閱默認,RPC等。
閱讀更多 Beaver1024 的文章