消息中間件RabbitMQ入門詳解

消息中間件RabbitMQ入門詳解

RabbitMQ是一個消息中間件,用來接收消息,轉發消息。可以認為消息中間件就是一家郵政局,你把信投到郵箱,郵局的人就會幫你遞送信件到接收者手上。從這個意義上講,RabbitMQ兼具郵箱、郵局和郵差的功能。

幾種角色

  • 生產者:消息的生產者,負責產生消息。
  • 隊列:RabbitMQ中的消息的緩衝區,用來存放消息,隊列僅受限於服務器的內存和磁盤空間。
  • 消費者:從RabbitMQ中接收消息,消費消息。

搭建環境

Erlang語言是RabbitMQ的底層語言,安裝RabbitMQ需要安裝Erlang。在Windows環境下安裝示例:

先安裝erlang,全部配置默認就行

消息中間件RabbitMQ入門詳解

先下載安裝Erlang

再安裝RabbitMQ,全部配置默認就行

消息中間件RabbitMQ入門詳解

下載安裝RabbitMQ

安裝後RabbitMQ作為windows的服務自動運行。

消息中間件RabbitMQ入門詳解

寫一個Hello World

RabbitMQ支持非常多的語言:

  • Python
  • Java
  • Ruby
  • PHP
  • C#
  • JavaScript
  • Go
  • Elixir
  • Objective-C
  • Swift
  • Spring AMQP

這裡以Java為例,IDE採用IntelliJ:

1.創建Java工程

pom.xml文件如下:


<project> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0/<modelversion>
<groupid>rabbitmq-demo/<groupid>
<artifactid>rabbitmq-java/<artifactid>
<version>1.0-SNAPSHOT/<version>
<dependencies>
<dependency>

<groupid>com.rabbitmq/<groupid>
<artifactid>amqp-client/<artifactid>
<version>5.5.2/<version>
/<dependency>
<dependency>
<groupid>com.rabbitmq/<groupid>
<artifactid>rabbitmq-client/<artifactid>
/<dependency>
/<dependencies>
/<project>

2.編寫消息發送代碼

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

發送者、RabbitMQ服務器和接收者可以運行在不同的地方,只需要將localhost改成RabbitMQ的IP就行。

注意將IntelliJ中的language level設為8,否則有可能編譯不通過

消息中間件RabbitMQ入門詳解

3.編寫消息接收代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

4.測試運行

運行Recv.java

消息中間件RabbitMQ入門詳解

運行Send.java

消息中間件RabbitMQ入門詳解

查看Recv的輸出,已經接收到消息

消息中間件RabbitMQ入門詳解

用RabbitMQ做任務分發

此場景為創建工作隊列去做耗時的任務,RabbitMQ分發任務到多個消費者。

消息中間件RabbitMQ入門詳解

目的是為了讓資源密集型任務不阻塞主進程,將任務封裝為一個消息併發送到隊列中。

1.編寫發送者代碼

import com.rabbitmq.client.Channel; 

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

通過參數傳遞要發送的消息

2.編寫接收者代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private final static String TASK_QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e){
e.printStackTrace();
}
finally {
System.out.println(" [x] Done");
}
};

boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}

消息中間件RabbitMQ入門詳解

我們解析接收到的消息,消息中有多少個點就讓進程等待幾秒,模擬耗時任務。

3.測試運行

首先配置IntelliJ,讓Worker.java可以並行運行

消息中間件RabbitMQ入門詳解

運行兩次Worker

消息中間件RabbitMQ入門詳解

運行NewTask,我們也會運行多次NewTask,每次傳不一樣的參數。但是和Worker不同的是NewTask發送完消息就會退出,所以不需要並行運行

首先配置參數

消息中間件RabbitMQ入門詳解

消息中間件RabbitMQ入門詳解

運行多次,一次傳遞參數:

First message.
Second message..
Third message...
Fourth message....
Fifth message.....

分別查看兩個Worker的輸出

消息中間件RabbitMQ入門詳解

消息中間件RabbitMQ入門詳解

可以看到RabbitMQ採用循環的方式輪流給兩個worker發送任務。

更復雜的場景請參考官方文檔,包括消息是否正確處理,發佈訂閱默認,RPC等。


分享到:


相關文章: