RocketMQ 简单使用笔记

RocketMQ (Rocket Message Queue) 是一种提供消息队列的中间件, 提供了消息的生产, 储存, 消费功能API

本次学习, 使用 RocketMQ 5.0.0 版本

消息队列

消息队列是分布式系统中重要的组件, 当不需要立即获得结果而且并发量又需要进行控制的时候, 就是需要使用消息队列的时候
消息队列主要解决了应用耦合, 异步处理, 流量削锋等问题

常见消息队列

当前使用较多的消息队列有

  • RabbitMQ
  • RocketMQ
  • ActiveMQ
  • Kafka
  • ZeroMQ
  • MetaMq
    部分数据库如 Redis, Mysql 等也可实现消息队列的功能

消息队列常见协议

  • JMS (Java Messaging Service)
  • STOMP (Streaming Text Orientated Message Protocol)
  • AMQP (Advance Message Queuing Protocol)
  • MQTT (Message Queuing Telemetry Transport)

使用场景

  • 应用耦合: 多应用间通过消息队列对同一消息进行处理, 避免调用接口失败导致整个过程失败
  • 异步处理: 多应用对消息队列中同一消息进行处理, 应用间并发处理消息, 相比串行处理减少处理时间
  • 限流削峰: 广泛应用于秒杀或抢购活动中, 避免流量过大导致应用系统宕机
  • 消息驱动的系统: 系统分为消息队列, 消息生产者, 消息消费者, 生产者负责产生消息, 消费者负责对消息进行处理

基本概念

参考文档 1
参考文档 2

主题 (Topic)

用于标识同一类业务逻辑的消息. 主题通过TopicName来做唯一标识和区分

生产者 (Producer)

生产者是用来构建并传输消息到服务端的运行实体

消费者 (Consumer)

消费者是用来接收并处理消息的运行实体

消费者分组 (ConsumerGroup)

消费者分组不是运行实体而是一个逻辑资源, 作用是承载多个消费行为一致的消费者的负载均衡分组

消息队列 (MessageQueue)

队列是消息存储和传输的实际容器, 也是消息的最小存储单元. 队列通过 QueueId 来做唯一标识和区分
主题都是由多个队列组成, 以此实现队列数量的水平拆分和队列内部的流式存储
一个消息队列中的消息只能被同一消费者组中的一个消费者消费
一个队列从逻辑上可有读队列和写队列的属性

订阅关系 (Subscription)

订阅关系是消费者获取消息, 处理消息的规则和状态配置
订阅关系由消费者分组动态注册到服务端系统, 并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护

消息 (Message)

消息是最小数据传输单元. 生产者将业务数据的负载和拓展属性包装成消息发送到服务端, 服务端按照相关语义将消息投递到消费端进行消费

消息类型 (MessageType)

RocketMQ 支持的消息类型有

  • 普通消息
  • 顺序消息
  • 事务消息
  • 定时消息
  • 延时消息

    每个主题只支持一种消息类型

定时/延时消息

消息被发送至服务端后, 在指定时间后才能被消费者消费. 通过设置一定的定时时间可以实现分布式场景的延时调度触发效果

顺序消息

顺序消息可以使费者按照发送消息的先后顺序获取消息, 从而实现业务场景中的顺序处理

消息视图 (MessageView)

通过消息视图可以读取消息内部的多个属性和负载信息

消息标签 (MessageTag)

消息标签是细粒度消息分类属性, 消费者通过订阅特定的标签来实现细粒度过滤

消息位点 (MessageQueueOffset)

标识消息在队列中的位置

消费位点 (ConsumerOffset)

每个消费者分组记录消费过的最新一条消息的位点是消费位点

消息索引 (MessageKey)

通过设置的消息索引可以快速查找到对应的消息内容

消息过滤

消费者可以通过订阅指定消息标签对消息进行过滤, 确保最终只接收被过滤后的消息合集

消费结果 (ConsumeResult)

消费结果是消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理

重置消费位点

在消息持久化存储的时间范围内, 重新设置消费者分组对已订阅主题的消费进度

消息轨迹

消息轨迹是在一条消息从生产者发出到消费者接收并处理过程中, 经由各个相关节点的时间地点等数据汇聚而成的完整链路信息

消息堆积

由于消费者的消费能力有限, 未能在短时间内将所有消息正确消费掉, 此时在服务端保存着未被消费的消息的状态即消息堆积

事务消息

事务消息用于在分布式场景下保障消息生产和本地事务的最终一致性

事务检查器 (TransactionChecker)

事务检查器生产者用来执行本地事务检查和异常事务恢复的监听器

事务状态 (TransactionResolution)

事务状态是事务消息发送过程中, 事务提交的状态标识
服务端通过事务状态控制事务消息是否应该提交和投递
事务状态包括:

  • 事务提交
  • 事务回滚
  • 事务未决

RocketMQ 逻辑架构

Apache RocketMQ 领域模型

RocketMQ 架构图
RocketMQ 集群架构图
Broker 集群 1 架构图

Broker 是主备集群

Name Server 集群架构图

角色

Producer

Producer (生产者), 负责生产消息. Producer 通过 MRocketMQ 的负载均衡模块, 选择 Brocker 集群队列进行消息投递
生产者往往组成生产者组, 投递相同 Topic 的消息

Consumer

Consumer (消费者), 负责消费消息. Consumer 负责从 Broker 服务器中获取消息, 并对消息进行相应处理
消费者往往组成消费者组, 消费相同 Topic 的消息

负载均衡和容错
Producer 和 Consumer 分别成组, 对 Message Queue 实现负载均衡和对 Message 的容错
当 消费者组 中的 Consumer 比 Message Queue 还多时, 负载均衡将不能很好运行

Name Server

Name Server 是 Broker 的注册中心以及 Topic 的路由中心, 支持 Broker 的动态注册与发现
早期版本是由 Zookeeper 替代
Name Server 集群的实例间不进行通信, 这就导致了 Name Server 扩容上的麻烦

Broker 管理

管理 Broker 的注册信息, 检测 Broker 是否存活

路由信息管理

管理 Broker 集群的路由信息, 以及用于客户端查询的队列信息
Producer 和 Consumer 通过 Name Server 获取 Broker 集群信息
路由信息是依靠客户端主动拉取, 默认 30s 拉取一次

Broker

Broker 对消息进行存储和转发, 是 RocketMQ 的核心部分
Broker Master 负责读写请求, Broker Slave 只负责读请求

模块
Remoting Module

Boker 实体, 处理客户端请求

Client Manager

客户端管理器, 负责接受和解析客户端请求

Store Service

存储服务, 负责管理消息存储到硬盘以及消息查询

HA Service

高可用服务, 负责 Broker 集群实例间的数据同步

Index Service

索引服务, 根据 MessageKey 对投递到 Broker 的消息提供索引和快速查询服务

数据保存策略
复制策略

Broker Master 和 Slave 间同步数据的方式, 通常有两种方式:

  • 同步策略: 消息到达 Master 后, Master 将数据同步给 Slave 后将 ACK 返回给客户端
  • 异步策略: 消息到达 Master 后立即返回 ACK, 稍后将数据同步给 Slave
刷盘策略

Broker 将内存中数据与磁盘中的数据同步的方式, 通常有两种方式:

  • 同步刷盘: 当消息持久化后算作消息写入成功
  • 异步刷盘: 当消息到达内存后 算作消息写入成功

分片

在 Broker 集群中, 不同 Broker 可能存有相同 Topic, 例如 Broker1 存有 TopicA, TopicA. 那么, 存有 TopicA 的部分是一个分片

分区

分片中的 Queue

RocketMQ 部署模式

参考文档

Master

单 Master 无 Slave

单节点部署, 只部署一个 Broker, 部署方式较为简单
可靠性非常差, 一般只在开发环境使用

多 Master 无 Slave

多节点部署, 部署多个 Broker Master, 同一个 Topic 会在不同节点存有切片, 各个分区平均分布
注意配置较为简单, 单个 Master 宕机对集群几乎没有影响. 但是宕机期间, 该 Master 的订阅者不可进行操作

多 Master 多 Slave 异步复制机制

多节点部署, 部署多个 Broker Master, 每个 Broker Master 配置有 Broker Slave (一般一个即可)
当 Broker Master 宕机后, Broker Slave 会自动取代 Broker Master. 由于异步赋值机制, 消息可能会有小概率丢失

多 Master 多 Slave 同步复制机制

可靠性最高, 但是性能开销较大且目前版本在主节点宕机后, 备机不能自动切换为主机 (可通过插件进行支持)

Name Server

由于 Name Server 之间是无状态模式, 只需将各个 Name Server 的地址告诉 Broker 即可, 所以说 Name Server 集群部署非常简单

RocketMQ 部署

RocketMQ 部署往往会使用 RAID10 等磁盘阵列

RocketMQ 单节点部署

下载 RocketMQ 5.0.0

RocketMQ二进制包
RocketMQ源码

配置环境

ROCKETMQ_HOME 添加至环境变量中

启动 RocketMQ

bin 目录下, 执行 ./mqnamesrv./mqnamesrv.cmd

由于JDK15+ 中删除了 CMS 垃圾收集器 (实测 JDK11 也无法正常启动), 在使用 JDK15+ 时启动脚本 runserver.cmdrunserver.sh 中需要修改 JVM 启动参数, 删除

1
2
set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails -XX:+PrintGCDateStamps"

启动 Broker + Proxy

bin 目录下, 执行
./mqbroker -n localhost:9876 --enable-proxy./mqbroker.cmd -n localhost:9876

--enable-proxy 在 Windows 平台提示不是合法指令, 这里选择只启动 broker
可以使用命令 ./mqproxy -n 192.168.1.1:9876 单独启动 proxy

在使用 JDK15+ 时启动脚本 runserver.cmdrunserver.sh 中需要修改 JVM 启动参数, 删除

1
2
3
set "JAVA_OPT=%JAVA_OPT% -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:%USERPROFILE%\mq_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
set "JAVA_OPT=%JAVA_OPT% -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"

由于 JDK9+ 对 Java 内部包访问权限做了限制, 需要修改启动参数, 添加

1
set "JAVA_OPT=%JAVA_OPT% --add-exports java.base/sun.nio.ch=ALL-UNNAMED

bin 目录新建启动脚本 start.cmd 可快速完成两步操作

1
2
3
start .\mqnamesrv.cmd
start .\mqbroker.cmd -n localhost:9876
@REM start .\mqproxy -n 192.168.1.1:9876

测试

  1. bin 目录下启动生产者
    1
    2
    export NAMESRV_ADDR=localhost:9876
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    1
    2
    $env:NAMESRV_ADDR="localhost:9876"
    ./tools.cmd org.apache.rocketmq.example.quickstart.Producer
  2. bin 目录下启动消费
    1
    2
    export NAMESRV_ADDR=localhost:9876
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
    1
    2
    $env:NAMESRV_ADDR="localhost:9876"
    ./tools.cmd org.apache.rocketmq.example.quickstart.Producer

RocketMQ 多 Broker Master 及 Broker Slave 部署

部署 RocketMQ Dashboard 监控 RocketMQ

参考文档

使用 Docker

1
2
docker pull apacherocketmq/rocketmq-dashboard:latest
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876" -p 8080:8080 -t apacherocketmq/rocketmq-dashboard:latest

源码启动

Windows 平台可使用源码方式进行启动

  1. 下载源码
  2. 使用 Maven 编译 mvn clean package -Dmaven.test.skip=true

    使用高版本的 Java 进行编译可能需要修改 pom 文件, 升级 lombook 版本

  3. 启动服务 java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

运维

Brocker 停机重启

  1. 首先将 Broker 中 Topic (除系统和重试 Queue) w 权限取消 (将 perm 设置为 4)
    1
    ./mqadmin updateTopic -c rocketmq-cluster -b broker-a主节点的ip:10911 -t topic名称 -n 任意一台namesrv的ip:9876 -p 4 -r 读队列数 -w 写队列数
  2. 观测到 Broker 无流量后将 Broker 停机
  3. 启动 Broker
  4. 恢复 Broker 中 Topic 的权限 (将 perm 设置为 6)
    1
    ./mqadmin updateTopic -c rocketmq-cluster -b broker-a主节点的ip:10911 -t topic名称 -n 任意一台namesrv的ip:9876 -p 6 -r 读队列数 -w 写队列数

队列缩容

  1. 读队列数量保持不变, 减少写队列数量
  2. 只读队列里无消息后, 减少读队列数量

mqadmin 命令

Admin Tool

RocketMQ 工作原理

消息生产

  1. Producer 与 Name server 通信获取 Topic 路由信息和 Broker 列表
  2. Producer 根据算法选择合适的 Queue
  3. Producer 对消息进行预处理
  4. Producer 像被选择的 Queue 所在的 Broker 发送 RPC 请求, 将消息发送至 Queue

Queue 的选择算法

对于无序消息 Queue 的选择算法 (消息投递算法), 常用的有两种–轮询和最小投递延迟算法

轮询算法

默认算法

最小投递延迟算法

轮询投递延迟时间最小的 Queue, 这种算法容易使延迟小的 Queue 压力增大

消息存储

消息相关信息存储在家目录的 store 文件夹下

  • abort 文件: RocketMQ 启动时创建, 正常关闭时删除
  • checkpoint 文件夹: 存储 commitlog, consumequeue, index 文件的最后刷盘时间
  • commitlog 文件夹: 存储消息
  • config 文件夹: 存储 Broker 的配置信息
  • consumequeue 文件夹: 分文件夹 (Tpoic 及其 QueueID) 存储消息的 commitlog 文件索引
  • index 文件夹: 存储消息 key 索引文件
  • lock 文件: 运行时的全局锁资源

Commitlog 文件

也叫 MappedFile 文件, 通常最大为 1G 大小
Broker 中的所有消息最终都落盘到这些 commitlog 文件中, 文件名称代表当前文件中第一条消息相对所有消息的偏移量 (十进制)
Commitlog 文件由消息单元组成, 消息单元包含着消息的元数据, 包括但不限于消息长度, 消息物理位置, 消息体, 消息主题, 消息所在队列, 消息队列偏移量等

Consumequeue 文件

Consumequeue 文件名称代表当前文件的第一个索引条目起始位置的偏移量 (十进制)
每个 Consumequeue 文件 通常可存储 300,000 条索引, 每个索引包含 8 字节的 Commitlog 偏移量, 4字节的消息长度, 8字节的 Tag hashCode

Index 文件

Index 文件名称为创建文件的时间戳, 里面记录了许多 Slot 槽位 (默认为 5,000,000), 每个 Slot 槽位包含若干 Index 索引单元
Slot 槽位由 Index header, Slots, Indexes (链表) 组成

Index header

Index header 大小为 40 字节, 包含

  • beginTimeStamp: 该 Index 文件中第一条消息的储存时间
  • endTimeStamp: 该 Index 文件中最后一条消息的储存时间
  • beginPhyoffset: 该 Index 文件中第一条消息在 Commitlog 文件中的偏移量
  • endPhyoffset: 该 Index 文件中最后一条消息在 Commitlog 文件中的偏移量
  • hashSlotCount: 已经填充有 Index 的 Slot 数量
  • indexCount: 该 Index 文件中包含的索引数量

消息读写

消息写入

  1. Broker 根据 QueueID 获取到 Queue offset
  2. 生成消息单元
  3. 同步 commitlog 文件
  4. 生成索引条目并写入 consumequeue 文件

消息拉取

  1. Consumer 获取其消费消息所在 Queue 的 消费偏移量, 确定需要消费消息的偏移量
  2. Consumer 向 Broker 发送拉取请求, 包含 Queue, 消息偏移量和 Tag
  3. Broker 计算 Consumerqueue 的 Queue offset
  4. 从 QueueOffset 处查找符合 Tag 的索引条目, 获取 Commitlog offset
  5. 获取消息单元, 响应 Consumer
消费模式
广播模式

广播模式下一个消费者组的所有消费者都可拿到消息

集群模式

集群模式下一条消息只有一个消费者组中的某个消费者消费

Rebalance

Rebalance 实际上就是 Queue 与消费者的再分配
Rebalance 可能会造成消费暂停, 消费重复以及消费突刺

消息幂等性

相同内容的消息被多次消费时对系统的影响与消费一次相同时, 这条消息的消费就具有幂等性
当网络不稳定等情况发生时, 消息可能会被重复消费, 业务系统应该消除这种重复消费的影响
实现幂等性不能依靠 RocketMQ 生成的任何标识, 应该使用业务标识

使用 Java 测试 RocketMQ

依赖

1
implementation 'org.apache.rocketmq:rocketmq-client:5.0.0'

创建 Topic

bin 目录下运行

1
./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

若启动 Broker 时指定属性 autoCreateTopicEnable=true 时, 可以不手动创建 Topic

编写测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.MessageSelector
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper

fun push() {

val producer = DefaultMQProducer(group) // 实例化消息生产者Producer
producer.namesrvAddr = nameServer // 设置NameServer的地址
producer.start() // 启动Producer实例

for (i in 0..9) {
// 创建消息
val msg = Message(
topic,
tag,
"Hello RocketMQ $i".toByteArray(charset(RemotingHelper.DEFAULT_CHARSET))
)

producer.sendOneway(msg) // 发送单向消息
}

producer.shutdown() // 关闭Producer实例
}

fun pull() {

val consumer = DefaultMQPushConsumer(group) // 实例化消费者
consumer.namesrvAddr = nameServer // 设置NameServer的地址
consumer.subscribe(topic, "*") // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 (需要配置 broker.conf 中 enablePropertyFilter = true)

// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(MessageListenerConcurrently { msgs, _ ->
println("Receive New Messages: $msgs")

ConsumeConcurrentlyStatus.CONSUME_SUCCESS // 标记该消息已经被成功消费
})

consumer.start() // 启动消费者实例
}

上述代码包含常量, 下面代码不再解释

1
2
3
4
const val nameServer: String = "localhost:9876"
const val topic: String = "TestTopic"
const val tag: String = "TestTag"
const val group: String = "TestGroup"

RocketMQ 持久化目录为 C:\Users\${userName}\store

应用

简单消息

生产消息

同步消息

发送同步消息时, 生产者发送完一条消息后会等待 RocketMQ 返回 ACK 之后, 才会发送下一条消息
同步发送消息的可靠性最高, 但是效率太低

代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message

fun syncPush(){
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
producer.retryTimesWhenSendFailed = 3 // 设置同步发送失败时, 重试次数
producer.sendMsgTimeout = 5000 // 设置发送超时时间

producer.start()

for (i in 1 .. 100){
val body = "Hi, my number is $i".encodeToByteArray()

val message = Message(topic, tag, body)
message.keys = "key-$i" // 指定消息 Key

val sendResult = producer.send(message)
println(sendResult)
}
producer.shutdown()
}
output
1
2
3
4
5
6
7
8
9
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4B30000, offsetMsgId=A9FEBEA700002A9F00000000000B3FA6, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4DD0001, offsetMsgId=A9FEBEA700002A9F00000000000B409A, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4E10002, offsetMsgId=A9FEBEA700002A9F00000000000B418E, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4E40003, offsetMsgId=A9FEBEA700002A9F00000000000B4282, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4E80004, offsetMsgId=A9FEBEA700002A9F00000000000B4376, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4EA0005, offsetMsgId=A9FEBEA700002A9F00000000000B446A, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4ED0006, offsetMsgId=A9FEBEA700002A9F00000000000B455E, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4EF0007, offsetMsgId=A9FEBEA700002A9F00000000000B4652, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=2], queueOffset=1]
...
异步消息

发送异步消息时, 生产者会批量发送消息而不等待 ACK 响应
异步发送消息的可靠性较高

代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.client.producer.SendCallback
import org.apache.rocketmq.client.producer.SendResult
import org.apache.rocketmq.common.message.Message
import java.util.concurrent.atomic.AtomicInteger

fun asyncPush() {
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
producer.retryTimesWhenSendAsyncFailed = 1 // 设置异步发送失败时, 重试次数
producer.defaultTopicQueueNums = 2 // 指定新创建的 Topic 的 Queue 数量
producer.start()

val counter = AtomicInteger(0)

for (i in 1..100) {
val body = "Hi, my number is $i".encodeToByteArray()
try {
val message = Message(topic, tag, body)
message.keys = "Async-key-$i"

// 指定回调
producer.send(message, object : SendCallback {
// 消息发送成功回调
override fun onSuccess(sendResult: SendResult?) {
println(sendResult)
counter.addAndGet(1)
}

// 消息发送发生异常回调
override fun onException(e: Throwable?) {
e?.printStackTrace()
}
})
} catch (e: Exception) {
e.printStackTrace()
}
}

while (true)
if (100 == counter.get()) {
producer.shutdown() // 异步发送消息不能直接关闭生产者
break
}
}
单向发送消息

发送单向消息时, RocketMQ 不会返回 ACK
发送单向消息的效率最高, 但可靠性较差

代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message

fun onewayPush(){
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
producer.start()

for (i in 1 .. 100){
val body = "Hi, my number is $i".encodeToByteArray()
val message = Message(topic, tag, body)
message.keys = "One-way-key-$i"
producer.sendOneway(message)
}
producer.shutdown()
}

消费消息

代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.common.consumer.ConsumeFromWhere
import org.apache.rocketmq.common.message.MessageExt

fun pull() {
val consumer = DefaultMQPushConsumer(group) // 定义一个 Push 消费者
// val consumer = DefaultLitePullConsumer(group) // 定义一个 Pull 消费者
consumer.namesrvAddr = nameServer
consumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET // 设置初始消费位点
consumer.subscribe(topic, "*") // 指定消费的 Topic 和 Tag
// consumer.messageModel = MessageModel.BROADCASTING // 指定消费模式为广播模式

// 绑定消息监听器
consumer.registerMessageListener { msgs: List<MessageExt>, _: ConsumeConcurrentlyContext ->
// 逐条消费
for (msg in msgs) {
println(msg)
}

ConsumeConcurrentlyStatus.CONSUME_SUCCESS // 返回消费成功
}

consumer.start() // 启动消费者
}
output
1
2
3
4
5
6
7
8
9
MessageExt [brokerName=patrick-lenovo, queueId=2, storeSize=263, queueOffset=50, sysFlag=0, bornTimestamp=1674959398634, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398679, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000ED5F8, commitLogOffset=972280, bodyCRC=1501404796, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=51, KEYS=One-way-key-1, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398702, UNIQ_KEY=7F0000012A1063947C6B9272D2E80000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 49], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=1, storeSize=263, queueOffset=50, sysFlag=0, bornTimestamp=1674959398642, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398678, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000ED4F1, commitLogOffset=972017, bodyCRC=689400563, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=51, KEYS=One-way-key-4, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398702, UNIQ_KEY=7F0000012A1063947C6B9272D2F20003, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 52], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=3, storeSize=263, queueOffset=50, sysFlag=0, bornTimestamp=1674959398642, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398679, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000ED6FF, commitLogOffset=972543, bodyCRC=1081397190, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=51, KEYS=One-way-key-2, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398702, UNIQ_KEY=7F0000012A1063947C6B9272D2F20001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 50], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=0, storeSize=263, queueOffset=50, sysFlag=0, bornTimestamp=1674959398642, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398674, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000ED3EA, commitLogOffset=971754, bodyCRC=930348880, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=51, KEYS=One-way-key-3, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398702, UNIQ_KEY=7F0000012A1063947C6B9272D2F20002, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 51], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=1, storeSize=263, queueOffset=51, sysFlag=0, bornTimestamp=1674959398642, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398692, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000EDE34, commitLogOffset=974388, bodyCRC=547432152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=75, KEYS=One-way-key-8, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398712, UNIQ_KEY=7F0000012A1063947C6B9272D2F20007, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 56], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=2, storeSize=263, queueOffset=51, sysFlag=0, bornTimestamp=1674959398642, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398691, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000ED806, commitLogOffset=972806, bodyCRC=1578130021, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=75, KEYS=One-way-key-5, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398712, UNIQ_KEY=7F0000012A1063947C6B9272D2F20004, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 53], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=1, storeSize=265, queueOffset=52, sysFlag=0, bornTimestamp=1674959398643, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398693, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000EE14D, commitLogOffset=975181, bodyCRC=1128267396, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=75, KEYS=One-way-key-12, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398712, UNIQ_KEY=7F0000012A1063947C6B9272D2F3000B, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 49, 50], transactionId='null'}]
MessageExt [brokerName=patrick-lenovo, queueId=2, storeSize=263, queueOffset=52, sysFlag=0, bornTimestamp=1674959398642, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398692, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000EDB1B, commitLogOffset=973595, bodyCRC=1470502478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=75, KEYS=One-way-key-9, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398713, UNIQ_KEY=7F0000012A1063947C6B9272D2F20008, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 57], transactionId='null'}]
...

顺序消息

顺序消息可以保证消费时的有序性 (按照发送顺序进行消息)
顺序消息是储存在同一个 Queue 里的
RocketMQ 可以保证消息的分区有序和全局有序

  • 分区有序是保证一个 Queue 内的消息有序
  • 全局有序是保证整个 Topic 中的消息有序

消息选择器

在定义 Producer 对象时, 可以传入一个实现了 MessageQueueSelector 接口的对象来指定消息所在的 Queue
一般采用 Hash 取模算法

代码示例
OrderProducer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message

fun orderPush() {
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
// producer.defaultTopicQueueNums = 1 // 全局有序
producer.start()

for (i in 1..100) {
val body = "Hi, $i".encodeToByteArray()
val message = Message(topic, tag, body)

// 使用同步消息演示
val result = producer.send(
message,
{ mqs, msg, arg ->
val id = arg as Int
val index = id % mqs.size
mqs[index]
},
i // 传递给 lambda 中的 arg
)
}
}
OrderConsumer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
import org.apache.rocketmq.common.consumer.ConsumeFromWhere
import org.apache.rocketmq.common.message.MessageExt

fun orderPull() {
val consumer = DefaultMQPushConsumer(group)
consumer.namesrvAddr = nameServer
consumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
consumer.subscribe(topic, "*")

// 绑定消息监听器
consumer.registerMessageListener { msgs: List<MessageExt>, context: ConsumeOrderlyContext ->
// 逐条消费
for (msg in msgs) {
println(String(msg.body) + " " + context.messageQueue.queueId)
}

ConsumeOrderlyStatus.SUCCESS // 返回消费成功
}

consumer.start() // 启动消费者
}

延时消息

当延时消息写入 RocketMQ 后, 需要等待指定时间后才能被消费
延时消息可以实现定时任务功能
延时消息时长是若干固定值, 默认值为
如需更改延时等级, 需要在服务器的 broker.conf 文件中指定 messageDelayLevel 的值, 如 messageDelayLevel = 1s 1m 1h 1d, 默认值为 org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
延时消息相关的 Topic 是 SCHEDULE_TOPIC_XXXX 配合 ScheduleMessageService 在指定时间后转发至目标 Topic

代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message

fun delayPush() {
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
producer.start()

// 使用同步消息演示
for (i in 1 .. 10){
val body = "Hi, delay $i".encodeToByteArray()
val message = Message(topic, tag, body)
message.keys = "Delay-Key-$i"
message.delayTimeLevel = DELAY_10_SEC // const val DELAY_10_SEC = 3
val sendResult = producer.send(message)
println(sendResult)
}
producer.shutdown()
}

延迟等级从序号 1 开始, 与服务器配置的延迟等级一一对应

事务消息

当在分布式场景下需要保障消息生产和本地事务的最终一致性时, 就需要用到事务消息了
分布式事务可以参考 Seata 的简单使用笔记
RocketMQ 采用的是 2PC + 补偿机制 (事务回查) 的分布式事务功能
具体可以参考 https://help.aliyun.com/document_detail/440244.html
事务消息交互流程

常见配置

配置名称及默认值 说明
transationTimeout=60 指定 TM 将最终状态发送给 TC 的最长等待时间 (单位: 秒), 超时会触发消息回查
transationCheckMax=15 指定最大回查次数
transationCheckInterval=60 指定消息回查间隔时间 (单位: 秒)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import org.apache.rocketmq.client.producer.LocalTransactionState
import org.apache.rocketmq.client.producer.TransactionListener
import org.apache.rocketmq.client.producer.TransactionMQProducer
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.common.message.MessageExt
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

fun transactionPush() {
val producer = TransactionMQProducer(group)
producer.namesrvAddr = nameServer

val executorService: ExecutorService = ThreadPoolExecutor(
2, // 保留线程数量 (核心线程数量)
5, // 最大线程数量
100, // 当线程数量大于保留线程数量时, 多余的空闲线程最长存活时间
TimeUnit.SECONDS,
ArrayBlockingQueue<Runnable>(2000) // 在执行任务之前保留任务的队列
) { r -> // 执行器创建新线程时使用的工厂方法
val thread = Thread(r)
thread.name = "transaction-msg-check"
thread
}

producer.executorService = executorService // 指定线程池
producer.transactionListener = object : TransactionListener {
override fun executeLocalTransaction(msg: Message?, arg: Any?): LocalTransactionState {
println("executeLocalTransaction: 这里需要根据业务编写具体代码")
LocalTransactionState.ROLLBACK_MESSAGE // 本地回滚
LocalTransactionState.UNKNOW //未知状态
return LocalTransactionState.COMMIT_MESSAGE // 本地提交

}

override fun checkLocalTransaction(msg: MessageExt?): LocalTransactionState {
println("checkLocalTransaction: 这里需要根据业务编写具体代码")
return LocalTransactionState.COMMIT_MESSAGE
}
}
producer.start()

for (i in 1..5) {
val body = "Hi, transaction $i".encodeToByteArray()
val message = Message(topic, tag, body)
message.keys = "Transaction-key-$i"
val sendResult = producer.sendMessageInTransaction(message, "这里是业务参数, 即 transactionListener 中的 arg")
println(sendResult)
}

producer.shutdown()
}

批量消息

批量消息可以提高生产者的发送消息的效率
消费批量消息时, 当有一条消息出现异常时, 所有消息将被重复消费
批量消息必须

  • 是相同 Tpoic
  • 是相同刷盘策略
  • 不能是延时消息或事务消息
  • 总大小不超过单条消息限制

代码示例

BatchProducer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message
import java.lang.IllegalArgumentException

fun batchPush() {
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
producer.maxMessageSize = 4 * 1024 * 1024 // 指定消息大小上限, 该值不应超过服务器设置的消息大小上限

producer.start()

// 创建 12 条消息, 每条大小超过 1M
val messageList = ArrayList<Message>()
for (i in 1..12) {
val body = ByteArray(1024 * 1024)
body.fill(i.toByte())
val message = Message(topic, tag, body)
message.keys = "Batch-Key-$i"
messageList.add(message)
}

// 进行消息列表分割, 使每组消息大小不超过 maxMessageSize
val messageListSplitter = MessageListSplitter(messageList, producer.maxMessageSize)
messageListSplitter.forEach {
try {
producer.send(it)
} catch (e: Exception) {
e.printStackTrace()
}
}
producer.shutdown()
}


class MessageListSplitter(private val messageList: Collection<Message>, private val maxSize: Int) :
Iterator<Collection<Message>> {
private val subMessageList = ArrayList<Collection<Message>>()
private var nextIndex = 1

init {
var currentSize = 0
var subList = ArrayList<Message>()
messageList.forEach { message ->
var size = message.topic.length + message.body.size + 20 // 20 是 log 的长度
message.properties.forEach { entry ->
size += (entry.key.length + entry.value.length)
}

if (maxSize < size) throw IllegalArgumentException("Message too long")

if (maxSize < (currentSize + size)) {
subMessageList.add(subList)
currentSize = 0
subList = ArrayList()
}

currentSize += size
subList.add(message)
}
subMessageList.add(subList)
}

override fun hasNext(): Boolean {
return nextIndex < subMessageList.size
}

override fun next(): Collection<Message> {
return subMessageList[nextIndex++]
}
}
BatchConsumer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.common.consumer.ConsumeFromWhere
import org.apache.rocketmq.common.message.MessageExt

fun batchPull() {
val consumer = DefaultMQPushConsumer(group)
consumer.namesrvAddr = nameServer
consumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
consumer.subscribe(topic, "*")
consumer.consumeMessageBatchMaxSize = 16 // 设置批量消费的最多消息条数
consumer.pullBatchSize = 32 // 设置批量拉取消息的最多消息条数, 需要不小于 consumeMessageBatchMaxSize 的值

consumer.registerMessageListener { msgs: List<MessageExt>, context: ConsumeConcurrentlyContext ->
println(msgs.size)

for (msg in msgs) {
println(String(msg.body) + " (From Queue " + context.messageQueue.queueId + ")")
}

ConsumeConcurrentlyStatus.CONSUME_SUCCESS
}

consumer.start()
}

消息过滤

Tag 过滤

consumer.subscribe(topic, "*") 中可以指定订阅消息的 Tag, 多个 Tag 用 || 分隔, * 为通配符

SQL 过滤

SQL 过滤是通过特定表达式 (SQL92 标准), 对事先埋入到消息中的用户属性进行过滤

SQL 过滤只适用于 Push 模式的消费者

使用 SQL 过滤需要在 Broker 配置文件中开启 enablePropertyFilter = ture 以及 filterSupportRetry = true
或使用 mqadmin 命令运行

1
2
mqadmin updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
mqadmin updateBrokerConfig -blocalhost:10911 -kfilterSupportRetry -vtrue
语法
数据类型
类型 举例
数值 123, 1.23
字符 'abc'
布尔 TRUE, FALSE
NULL NULL
运算符
运算符 说明
> 大于, 用于数值比较
< 小于, 用于数值比较
>= 大于等于, 用于数值比较
<= 小于等于, 用于数值比较
= 等于, 用于数值或字符比较
<> 不等于, 用于数值或字符比较
BETWEEN ... AND ... 用于数值区间判断判断 (包含两端)
IN 字符包含关系判断
IS NULLL 判断空
IS NOT NULL 判断非空
AND 逻辑与
OR 逻辑或
NOT 逻辑非
代码演示
SQLProducer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message

fun SQLPush() {
val producer = DefaultMQProducer(group)
producer.namesrvAddr = nameServer
producer.start()

// 使用同步消息演示
for (i in 1..100) {
val body = "Hi, $i".encodeToByteArray()
val message = Message(topic, tag, body)
message.putUserProperty("age", i.toString())
producer.send(message)
}

producer.shutdown()
}
SQLConsumer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.MessageSelector
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.common.message.MessageExt

fun SQLPull() {
val consumer = DefaultMQPushConsumer(group)
consumer.namesrvAddr = nameServer

val sql = "age BETWEEN 1 AND 6"
consumer.subscribe(topic, MessageSelector.bySql(sql)) // 指定消费的 Topic 和 SQL

consumer.registerMessageListener { msgs: List<MessageExt>, context: ConsumeConcurrentlyContext ->
for (msg in msgs) {
println("Message: " + msg.getUserProperty("age") + " From Queue " + context.messageQueue.queueId + ")")
}
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
}

consumer.start()
}

RocketMQ 结合 Spring 框架

RocketMQ + Spring boot 只需要少量配置, 甚至可以使用原生方式
RocketMQ + Spring cloud 需要的配置也极少, 这里暂时不再演示
TODO => 2023-01-31 09:19