RocketMQ 简介

概述

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷: 主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
  • 系统解耦: 解决不同重要程度、不同能力级别系统之间依赖导致一死全死
  • 提升性能: 当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
  • 蓄流压测: 线上有些链路不好压测,可以通过堆积一定量消息再放开来压测

RocketMQ

Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。

RocketMQ 特点

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  • 支持严格的消息顺序
  • 支持 Topic 与 Queue 两种模式
  • 亿级消息堆积能力
  • 比较友好的分布式特性
  • 同时支持 Push 与 Pull 方式消费消息
  • 历经多次天猫双十一海量消息考验

RocketMQ 优势

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

  • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
  • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
  • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
  • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
  • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

消息队列对比参照表

RocketMQ 简介

基于 Docker 安装 RocketMQ

docker-compose.yml

注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存

<code>version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv

rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker

rmqconsole:
image: styletang/rocketmq-console-ng

container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole

networks:
rmq:
name: rmq
driver: bridge

/<code>

broker.conf

RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在 ./data/brokerconf/ 目录下创建一个名为 broker.conf 的配置文件,内容如下:

<code># Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 所属集群名字
brokerClusterName=DefaultCluster

# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b

brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
# brokerIP1=192.168.0.253

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
listenPort=10911

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128


/<code>

RocketMQ 控制台

访问 http://192.168.198.131:8070 登入控制台

RocketMQ 简介

由于本教程整个案例基于 Spring Cloud,故我们采用 Spring Cloud Stream 完成一次发布和订阅

官方教程Spring Cloud Alibaba RocketMQ

Spring Cloud Stream

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

RocketMQ 简介


SCSt overview

Figure 4. Spring Cloud Stream

使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:

<code>MessageChannel messageChannel = new DirectChannel();

// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});

// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());

/<code>

这段代码所有的消息类都是 spring-messaging 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。

Spring Cloud Stream 底层基于这段代码去做了各种抽象。

解决连接超时问题

我们采用 Docker 部署了 RocketMQ 服务,此时 RocketMQ Broker 暴露的地址和端口(10909,10911)是基于容器的,会导致我们开发机无法连接,从而引发 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 异常

注意下图中的 IP 地址,这个是容器的 IP,开发机与容器不在一个局域网所以无法连接。

RocketMQ 简介

解决方案是在 broker.conf 配置文件中增加 brokerIP1=宿主机IP 即可

RocketMQ 简介

POM

参考文档,此处使用com.alibaba.cloud:spring-cloud-starter-stream-rocketmq

<code>
<project> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0/<modelversion>

<parent>
<groupid>com.wj/<groupid>
<artifactid>hello-spring-cloud-alibaba-dependencies/<artifactid>
<version>1.0.0-SNAPSHOT/<version>
<relativepath>../hello-spring-cloud-alibaba-dependencies/pom.xml/<relativepath>
/<parent>


<artifactid>hello-spring-cloud-alibaba-rocketmq-provider/<artifactid>
<packaging>jar/<packaging>

<name>hello-spring-cloud-alibaba-rocketmq-provider/<name>


<dependencies>

<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-actuator/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
<scope>test/<scope>
/<dependency>



<dependency>
<groupid>com.alibaba.cloud/<groupid>
<artifactid>spring-cloud-starter-stream-rocketmq/<artifactid>
/<dependency>

/<dependencies>

<build>
<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
<configuration>
<mainclass>com.wj.hello.spring.cloud.alibaba.rocketmq.provider.RocketMQProviderApplication/<mainclass>
/<configuration>
/<plugin>
/<plugins>
/<build>
/<project>
/<code>

Application

<code>package com.wj.hello.spring.cloud.alibaba.rocketmq.provider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
@EnableBinding({Source.class})
public class RocketMQProviderApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RocketMQProviderApplication.class,args);
}

@Autowired
private MessageChannel output;

@Override
public void run(String... args) throws Exception {
output.send(MessageBuilder.withPayload("Hello RockerMQ").build());
}
}
/<code>

application.yml

<code>spring:
application:
name: rocketmq-provider
cloud:
stream:
rocketmq:
binder:
# RocketMQ 服务器地址
namesrv-server: 192.168.198.131:9876
bindings:
# 这里是个 Map 类型参数,{} 为 YAML 中 Map 的行内写法
output: {destination: test-topic, content-type: application/json}

server:
port: 9093

management:
endpoints:
web:
exposure:
include: '*'
/<code>

运行成功后即可在 RocketMQ 控制台的 消息 列表中选择 test-topic 主题即可看到发送的消息

RocketMQ 消费者

POM

<code>
<project> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0/<modelversion>

<parent>
<groupid>com.wj/<groupid>
<artifactid>hello-spring-cloud-alibaba-dependencies/<artifactid>
<version>1.0.0-SNAPSHOT/<version>
<relativepath>../hello-spring-cloud-alibaba-dependencies/pom.xml/<relativepath>
/<parent>

<artifactid>hello-spring-cloud-alibaba-rocketmq-consumer/<artifactid>
<packaging>jar/<packaging>

<name>hello-spring-cloud-alibaba-rocketmq-consumer/<name>

<dependencies>

<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-actuator/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
<scope>test/<scope>
/<dependency>



<dependency>
<groupid>org.springframework.cloud/<groupid>
<artifactid>spring-cloud-starter-stream-rocketmq/<artifactid>
/<dependency>

/<dependencies>

<build>
<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
<configuration>
<mainclass>com.wj.hello.spring.cloud.alibaba.rocketmq.consumer.RocketMQConsumerApplication/<mainclass>
/<configuration>
/<plugin>
/<plugins>
/<build>
/<project>
/<code>

Application

<code>package com.wj.hello.spring.cloud.alibaba.rocketmq.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding({Sink.class})
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class, args);
}

@StreamListener("input")
private void inputMessage(String message){
System.out.printf("input receive: " + message);
}
}
/<code>

application.yml

<code>spring:
application:
name: rocketmq-consumer
cloud:
stream:
rocketmq:
binder:
namesrv-server: 192.168.198.131:9876
bindings:
input: {consumer.orderly: true}
bindings:
input: {destination: test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}

server:
port: 9094

management:
endpoints:
web:
exposure:
include: '*'
/<code>

运行成功后即可在控制台接收到消息:input receive:: Hello RocketMQ

RocketMQ 简介

rocketmq导图

RocketMQ 简介

参考文章:

https://www.cnblogs.com/goodAndyxublog/p/11457164.html

https://www.funtl.com/zh/spring-cloud-alibaba/RocketMQ-%E7%AE%80%E4%BB%8B.html


分享到:


相關文章: