RocketMQ 的简单使用笔记
RocketMQ 简单使用笔记
RocketMQ (Rocket Message Queue) 是一种提供消息队列的中间件, 提供了消息的生产, 储存, 消费功能API
本次学习, 使用 RocketMQ 5.0.0 版本
消息队列
消息队列是分布式系统中重要的组件, 当不需要立即获得结果而且并发量又需要进行控制的时候, 就是需要使用消息队列的时候
消息队列主要解决了应用耦合, 异步处理, 流量削锋等问题
常见消息队列
当前使用较多的消息队列有
- RabbitMQ
- RocketMQ
- ActiveMQ
- Kafka
- ZeroMQ
- MetaMq
部分数据库如 Redis, Mysql 等也可实现消息队列的功能
消息队列常见协议
- JMS (Java Messaging Service)
- STOMP (Streaming Text Orientated Message Protocol)
- AMQP (Advance Message Queuing Protocol)
- MQTT (Message Queuing Telemetry Transport)
使用场景
- 应用耦合: 多应用间通过消息队列对同一消息进行处理, 避免调用接口失败导致整个过程失败
- 异步处理: 多应用对消息队列中同一消息进行处理, 应用间并发处理消息, 相比串行处理减少处理时间
- 限流削峰: 广泛应用于秒杀或抢购活动中, 避免流量过大导致应用系统宕机
- 消息驱动的系统: 系统分为消息队列, 消息生产者, 消息消费者, 生产者负责产生消息, 消费者负责对消息进行处理
基本概念
主题 (Topic)
用于标识同一类业务逻辑的消息. 主题通过TopicName来做唯一标识和区分
生产者 (Producer)
生产者是用来构建并传输消息到服务端的运行实体
消费者 (Consumer)
消费者是用来接收并处理消息的运行实体
消费者分组 (ConsumerGroup)
消费者分组不是运行实体而是一个逻辑资源, 作用是承载多个消费行为一致的消费者的负载均衡分组
消息队列 (MessageQueue)
队列是消息存储和传输的实际容器, 也是消息的最小存储单元. 队列通过 QueueId 来做唯一标识和区分
主题都是由多个队列组成, 以此实现队列数量的水平拆分和队列内部的流式存储
一个消息队列中的消息只能被同一消费者组中的一个消费者消费
一个队列从逻辑上可有读队列和写队列的属性
订阅关系 (Subscription)
订阅关系是消费者获取消息, 处理消息的规则和状态配置
订阅关系由消费者分组动态注册到服务端系统, 并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护
消息 (Message)
消息是最小数据传输单元. 生产者将业务数据的负载和拓展属性包装成消息发送到服务端, 服务端按照相关语义将消息投递到消费端进行消费
消息类型 (MessageType)
RocketMQ 支持的消息类型有
- 普通消息
- 顺序消息
- 事务消息
- 定时消息
- 延时消息
每个主题只支持一种消息类型
定时/延时消息
消息被发送至服务端后, 在指定时间后才能被消费者消费. 通过设置一定的定时时间可以实现分布式场景的延时调度触发效果
顺序消息
顺序消息可以使费者按照发送消息的先后顺序获取消息, 从而实现业务场景中的顺序处理
消息视图 (MessageView)
通过消息视图可以读取消息内部的多个属性和负载信息
消息标签 (MessageTag)
消息标签是细粒度消息分类属性, 消费者通过订阅特定的标签来实现细粒度过滤
消息位点 (MessageQueueOffset)
标识消息在队列中的位置
消费位点 (ConsumerOffset)
每个消费者分组记录消费过的最新一条消息的位点是消费位点
消息索引 (MessageKey)
通过设置的消息索引可以快速查找到对应的消息内容
消息过滤
消费者可以通过订阅指定消息标签对消息进行过滤, 确保最终只接收被过滤后的消息合集
消费结果 (ConsumeResult)
消费结果是消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理
重置消费位点
在消息持久化存储的时间范围内, 重新设置消费者分组对已订阅主题的消费进度
消息轨迹
消息轨迹是在一条消息从生产者发出到消费者接收并处理过程中, 经由各个相关节点的时间地点等数据汇聚而成的完整链路信息
消息堆积
由于消费者的消费能力有限, 未能在短时间内将所有消息正确消费掉, 此时在服务端保存着未被消费的消息的状态即消息堆积
事务消息
事务消息用于在分布式场景下保障消息生产和本地事务的最终一致性
事务检查器 (TransactionChecker)
事务检查器生产者用来执行本地事务检查和异常事务恢复的监听器
事务状态 (TransactionResolution)
事务状态是事务消息发送过程中, 事务提交的状态标识
服务端通过事务状态控制事务消息是否应该提交和投递
事务状态包括:
- 事务提交
- 事务回滚
- 事务未决
RocketMQ 逻辑架构
RocketMQ 架构图flowchart TD Name_Server_1 --- Broker_Master_1 Name_Server_2 --- Broker_Master_1 Name_Server_3 --- Broker_Master_1 Name_Server_1 --- Broker_Master_2 Name_Server_2 --- Broker_Master_2 Name_Server_3 --- Broker_Master_2 Name_Server_1 --- Broker_Master_3 Name_Server_2 --- Broker_Master_3 Name_Server_3 --- Broker_Master_3 Broker_Master_1 --- Broker_Slave_1-1 Broker_Master_1 --- Broker_Slave_1-2 Broker_Master_2 --- Broker_Slave_2-1 Broker_Master_2 --- Broker_Slave_2-2 Broker_Master_3 --- Broker_Slave_3-1 Broker_Master_3 --- Broker_Slave_3-2
flowchart TD Name_Server_集群 --- Broker_集群_1 Name_Server_集群 --- Broker_集群_2 Name_Server_集群 --- Broker_集群_3
flowchart TD Name_Server_集群 --- Broker_Master_1 Broker_Master_1 --- Broker_Slave_1-1 Broker_Master_1 --- Broker_Slave_1-2
Broker 是主备集群
flowchart TD Name_Server_1 --- Broker_集群_1 Name_Server_2 --- Broker_集群_1 Name_Server_3 --- Broker_集群_1 Name_Server_1 --- Broker_集群_2 Name_Server_2 --- Broker_集群_2 Name_Server_3 --- Broker_集群_2 Name_Server_1 --- Broker_集群_3 Name_Server_2 --- Broker_集群_3 Name_Server_3 --- Broker_集群_3
角色
Producer
Producer (生产者), 负责生产消息. Producer 通过 MRocketMQ 的负载均衡模块, 选择 Brocker 集群队列进行消息投递
生产者往往组成生产者组, 投递相同 Topic 的消息
Consumer
Consumer (消费者), 负责消费消息. Consumer 负责从 Broker 服务器中获取消息, 并对消息进行相应处理
消费者往往组成消费者组, 消费相同 Topic 的消息
负载均衡和容错
Producer 和 Consumer 分别成组, 对 Message Queue 实现负载均衡和对 Message 的容错
当 消费者组 中的 Consumer 比 Message Queue 还多时, 负载均衡将不能很好运行
Name Server
Name Server 是 Broker 的注册中心以及 Topic 的路由中心, 支持 Broker 的动态注册与发现
早期版本是由 Zookeeper 替代
Name Server 集群的实例间不进行通信, 这就导致了 Name Server 扩容上的麻烦
Broker 管理
管理 Broker 的注册信息, 检测 Broker 是否存活
路由信息管理
管理 Broker 集群的路由信息, 以及用于客户端查询的队列信息
Producer 和 Consumer 通过 Name Server 获取 Broker 集群信息
路由信息是依靠客户端主动拉取, 默认 30s 拉取一次
Broker
Broker 对消息进行存储和转发, 是 RocketMQ 的核心部分
Broker Master 负责读写请求, Broker Slave 只负责读请求
模块
Remoting Module
Boker 实体, 处理客户端请求
Client Manager
客户端管理器, 负责接受和解析客户端请求
Store Service
存储服务, 负责管理消息存储到硬盘以及消息查询
HA Service
高可用服务, 负责 Broker 集群实例间的数据同步
Index Service
索引服务, 根据 MessageKey 对投递到 Broker 的消息提供索引和快速查询服务
数据保存策略
复制策略
Broker Master 和 Slave 间同步数据的方式, 通常有两种方式:
- 同步策略: 消息到达 Master 后, Master 将数据同步给 Slave 后将 ACK 返回给客户端
- 异步策略: 消息到达 Master 后立即返回 ACK, 稍后将数据同步给 Slave
刷盘策略
Broker 将内存中数据与磁盘中的数据同步的方式, 通常有两种方式:
- 同步刷盘: 当消息持久化后算作消息写入成功
- 异步刷盘: 当消息到达内存后 算作消息写入成功
分片
在 Broker 集群中, 不同 Broker 可能存有相同 Topic, 例如 Broker1 存有 TopicA, TopicA. 那么, 存有 TopicA 的部分是一个分片
分区
分片中的 Queue
RocketMQ 部署模式
Master
单 Master 无 Slave
单节点部署, 只部署一个 Broker, 部署方式较为简单
可靠性非常差, 一般只在开发环境使用
多 Master 无 Slave
多节点部署, 部署多个 Broker Master, 同一个 Topic 会在不同节点存有切片, 各个分区平均分布
注意配置较为简单, 单个 Master 宕机对集群几乎没有影响. 但是宕机期间, 该 Master 的订阅者不可进行操作
多 Master 多 Slave 异步复制机制
多节点部署, 部署多个 Broker Master, 每个 Broker Master 配置有 Broker Slave (一般一个即可)
当 Broker Master 宕机后, Broker Slave 会自动取代 Broker Master. 由于异步赋值机制, 消息可能会有小概率丢失
多 Master 多 Slave 同步复制机制
可靠性最高, 但是性能开销较大且目前版本在主节点宕机后, 备机不能自动切换为主机 (可通过插件进行支持)
Name Server
由于 Name Server 之间是无状态模式, 只需将各个 Name Server 的地址告诉 Broker 即可, 所以说 Name Server 集群部署非常简单
RocketMQ 部署
RocketMQ 部署往往会使用 RAID10 等磁盘阵列
RocketMQ 单节点部署
下载 RocketMQ 5.0.0
配置环境
将 ROCKETMQ_HOME
添加至环境变量中
启动 RocketMQ
在 bin
目录下, 执行 ./mqnamesrv
或 ./mqnamesrv.cmd
由于JDK15+ 中删除了 CMS 垃圾收集器 (实测 JDK11 也无法正常启动), 在使用 JDK15+ 时启动脚本 runserver.cmd
或 runserver.sh
中需要修改 JVM 启动参数, 删除
1 | set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" |
启动 Broker + Proxy
在 bin
目录下, 执行./mqbroker -n localhost:9876 --enable-proxy
或 ./mqbroker.cmd -n localhost:9876
--enable-proxy
在 Windows 平台提示不是合法指令, 这里选择只启动 broker
可以使用命令 ./mqproxy -n 192.168.1.1:9876
单独启动 proxy
在使用 JDK15+ 时启动脚本 runserver.cmd
或 runserver.sh
中需要修改 JVM 启动参数, 删除
1 | set "JAVA_OPT=%JAVA_OPT% -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8" |
由于 JDK9+ 对 Java 内部包访问权限做了限制, 需要修改启动参数, 添加
1 | set "JAVA_OPT=%JAVA_OPT% --add-exports java.base/sun.nio.ch=ALL-UNNAMED |
在 bin
目录新建启动脚本 start.cmd 可快速完成两步操作
1 | start .\mqnamesrv.cmd |
测试
- 在
bin
目录下启动生产者或1
2export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer1
2$env:NAMESRV_ADDR="localhost:9876"
./tools.cmd org.apache.rocketmq.example.quickstart.Producer - 在
bin
目录下启动消费或1
2export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer1
2$env:NAMESRV_ADDR="localhost:9876"
./tools.cmd org.apache.rocketmq.example.quickstart.Producer
RocketMQ 多 Broker Master 及 Broker Slave 部署
部署 RocketMQ Dashboard 监控 RocketMQ
使用 Docker
1 | docker pull apacherocketmq/rocketmq-dashboard:latest |
源码启动
Windows 平台可使用源码方式进行启动
- 下载源码
- 使用 Maven 编译
mvn clean package -Dmaven.test.skip=true
使用高版本的 Java 进行编译可能需要修改 pom 文件, 升级 lombook 版本
- 启动服务
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
运维
Brocker 停机重启
- 首先将 Broker 中 Topic (除系统和重试 Queue) w 权限取消 (将 perm 设置为 4)
1
./mqadmin updateTopic -c rocketmq-cluster -b broker-a主节点的ip:10911 -t topic名称 -n 任意一台namesrv的ip:9876 -p 4 -r 读队列数 -w 写队列数
- 观测到 Broker 无流量后将 Broker 停机
- 启动 Broker
- 恢复 Broker 中 Topic 的权限 (将 perm 设置为 6)
1
./mqadmin updateTopic -c rocketmq-cluster -b broker-a主节点的ip:10911 -t topic名称 -n 任意一台namesrv的ip:9876 -p 6 -r 读队列数 -w 写队列数
队列缩容
- 读队列数量保持不变, 减少写队列数量
- 只读队列里无消息后, 减少读队列数量
mqadmin 命令
RocketMQ 工作原理
消息生产
- Producer 与 Name server 通信获取 Topic 路由信息和 Broker 列表
- Producer 根据算法选择合适的 Queue
- Producer 对消息进行预处理
- Producer 像被选择的 Queue 所在的 Broker 发送 RPC 请求, 将消息发送至 Queue
Queue 的选择算法
对于无序消息 Queue 的选择算法 (消息投递算法), 常用的有两种–轮询和最小投递延迟算法
轮询算法
默认算法
最小投递延迟算法
轮询投递延迟时间最小的 Queue, 这种算法容易使延迟小的 Queue 压力增大
消息存储
消息相关信息存储在家目录的 store
文件夹下
- abort 文件: RocketMQ 启动时创建, 正常关闭时删除
- checkpoint 文件夹: 存储 commitlog, consumequeue, index 文件的最后刷盘时间
- commitlog 文件夹: 存储消息
- config 文件夹: 存储 Broker 的配置信息
- consumequeue 文件夹: 分文件夹 (Tpoic 及其 QueueID) 存储消息的 commitlog 文件索引
- index 文件夹: 存储消息 key 索引文件
- lock 文件: 运行时的全局锁资源
Commitlog 文件
也叫 MappedFile 文件, 通常最大为 1G 大小
Broker 中的所有消息最终都落盘到这些 commitlog 文件中, 文件名称代表当前文件中第一条消息相对所有消息的偏移量 (十进制)
Commitlog 文件由消息单元组成, 消息单元包含着消息的元数据, 包括但不限于消息长度, 消息物理位置, 消息体, 消息主题, 消息所在队列, 消息队列偏移量等
Consumequeue 文件
Consumequeue 文件名称代表当前文件的第一个索引条目起始位置的偏移量 (十进制)
每个 Consumequeue 文件 通常可存储 300,000 条索引, 每个索引包含 8 字节的 Commitlog 偏移量, 4字节的消息长度, 8字节的 Tag hashCode
Index 文件
Index 文件名称为创建文件的时间戳, 里面记录了许多 Slot 槽位 (默认为 5,000,000), 每个 Slot 槽位包含若干 Index 索引单元
Slot 槽位由 Index header, Slots, Indexes (链表) 组成
Index header
Index header 大小为 40 字节, 包含
- beginTimeStamp: 该 Index 文件中第一条消息的储存时间
- endTimeStamp: 该 Index 文件中最后一条消息的储存时间
- beginPhyoffset: 该 Index 文件中第一条消息在 Commitlog 文件中的偏移量
- endPhyoffset: 该 Index 文件中最后一条消息在 Commitlog 文件中的偏移量
- hashSlotCount: 已经填充有 Index 的 Slot 数量
- indexCount: 该 Index 文件中包含的索引数量
消息读写
消息写入
- Broker 根据 QueueID 获取到 Queue offset
- 生成消息单元
- 同步 commitlog 文件
- 生成索引条目并写入 consumequeue 文件
消息拉取
- Consumer 获取其消费消息所在 Queue 的 消费偏移量, 确定需要消费消息的偏移量
- Consumer 向 Broker 发送拉取请求, 包含 Queue, 消息偏移量和 Tag
- Broker 计算 Consumerqueue 的 Queue offset
- 从 QueueOffset 处查找符合 Tag 的索引条目, 获取 Commitlog offset
- 获取消息单元, 响应 Consumer
消费模式
广播模式
广播模式下一个消费者组的所有消费者都可拿到消息
集群模式
集群模式下一条消息只有一个消费者组中的某个消费者消费
Rebalance
Rebalance 实际上就是 Queue 与消费者的再分配
Rebalance 可能会造成消费暂停, 消费重复以及消费突刺
消息幂等性
相同内容的消息被多次消费时对系统的影响与消费一次相同时, 这条消息的消费就具有幂等性
当网络不稳定等情况发生时, 消息可能会被重复消费, 业务系统应该消除这种重复消费的影响
实现幂等性不能依靠 RocketMQ 生成的任何标识, 应该使用业务标识
使用 Java 测试 RocketMQ
依赖
1 | implementation 'org.apache.rocketmq:rocketmq-client:5.0.0' |
创建 Topic
在 bin
目录下运行
1 | ./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster |
若启动 Broker 时指定属性 autoCreateTopicEnable=true
时, 可以不手动创建 Topic
编写测试代码
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
上述代码包含常量, 下面代码不再解释
1 | const val nameServer: String = "localhost:9876" |
RocketMQ 持久化目录为 C:\Users\${userName}\store
应用
简单消息
生产消息
同步消息
发送同步消息时, 生产者发送完一条消息后会等待 RocketMQ 返回 ACK 之后, 才会发送下一条消息
同步发送消息的可靠性最高, 但是效率太低
代码示例
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
1 | SendResult [sendStatus=SEND_OK, msgId=7F0000010C9463947C6B8D6BD4B30000, offsetMsgId=A9FEBEA700002A9F00000000000B3FA6, messageQueue=MessageQueue [topic=TestTopic, brokerName=patrick-lenovo, queueId=3], queueOffset=0] |
异步消息
发送异步消息时, 生产者会批量发送消息而不等待 ACK 响应
异步发送消息的可靠性较高
代码示例
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
单向发送消息
发送单向消息时, RocketMQ 不会返回 ACK
发送单向消息的效率最高, 但可靠性较差
代码示例
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
消费消息
代码示例
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
1 | MessageExt [brokerName=patrick-lenovo, queueId=2, storeSize=263, queueOffset=50, sysFlag=0, bornTimestamp=1674959398634, bornHost=/169.254.117.152:53799, storeTimestamp=1674959398679, storeHost=/169.254.117.152:10911, msgId=A9FE759800002A9F00000000000ED5F8, commitLogOffset=972280, bodyCRC=1501404796, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TestTopic', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=51, KEYS=One-way-key-1, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1674959398702, UNIQ_KEY=7F0000012A1063947C6B9272D2E80000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TestTag}, body=[72, 105, 44, 32, 109, 121, 32, 110, 117, 109, 98, 101, 114, 32, 105, 115, 32, 49], transactionId='null'}] |
顺序消息
顺序消息可以保证消费时的有序性 (按照发送顺序进行消息)
顺序消息是储存在同一个 Queue 里的
RocketMQ 可以保证消息的分区有序和全局有序
- 分区有序是保证一个 Queue 内的消息有序
- 全局有序是保证整个 Topic 中的消息有序
消息选择器
在定义 Producer 对象时, 可以传入一个实现了 MessageQueueSelector 接口的对象来指定消息所在的 Queue
一般采用 Hash 取模算法
代码示例
OrderProducer.kt1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
延时消息
当延时消息写入 RocketMQ 后, 需要等待指定时间后才能被消费
延时消息可以实现定时任务功能
延时消息时长是若干固定值, 默认值为
如需更改延时等级, 需要在服务器的 broker.conf
文件中指定 messageDelayLevel
的值, 如 messageDelayLevel = 1s 1m 1h 1d
, 默认值为 org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
延时消息相关的 Topic 是 SCHEDULE_TOPIC_XXXX 配合 ScheduleMessageService 在指定时间后转发至目标 Topic
代码示例
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
延迟等级从序号 1 开始, 与服务器配置的延迟等级一一对应
事务消息
当在分布式场景下需要保障消息生产和本地事务的最终一致性时, 就需要用到事务消息了
分布式事务可以参考 Seata 的简单使用笔记
RocketMQ 采用的是 2PC + 补偿机制 (事务回查) 的分布式事务功能
具体可以参考 https://help.aliyun.com/document_detail/440244.html
常见配置
配置名称及默认值 | 说明 |
---|---|
transationTimeout=60 | 指定 TM 将最终状态发送给 TC 的最长等待时间 (单位: 秒), 超时会触发消息回查 |
transationCheckMax=15 | 指定最大回查次数 |
transationCheckInterval=60 | 指定消息回查间隔时间 (单位: 秒) |
代码示例
1 | import org.apache.rocketmq.client.producer.LocalTransactionState |
批量消息
批量消息可以提高生产者的发送消息的效率
消费批量消息时, 当有一条消息出现异常时, 所有消息将被重复消费
批量消息必须
- 是相同 Tpoic
- 是相同刷盘策略
- 不能是延时消息或事务消息
- 总大小不超过单条消息限制
代码示例
BatchProducer.kt1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
消息过滤
Tag 过滤
consumer.subscribe(topic, "*")
中可以指定订阅消息的 Tag, 多个 Tag 用 ||
分隔, *
为通配符
SQL 过滤
SQL 过滤是通过特定表达式 (SQL92 标准), 对事先埋入到消息中的用户属性进行过滤
SQL 过滤只适用于 Push 模式的消费者
使用 SQL 过滤需要在 Broker 配置文件中开启 enablePropertyFilter = ture
以及 filterSupportRetry = true
或使用 mqadmin 命令运行
1 | mqadmin updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue |
语法
数据类型
类型 | 举例 |
---|---|
数值 | 123 , 1.23 |
字符 | 'abc' |
布尔 | TRUE , FALSE |
NULL | NULL |
运算符
运算符 | 说明 |
---|---|
> |
大于, 用于数值比较 |
< |
小于, 用于数值比较 |
>= |
大于等于, 用于数值比较 |
<= |
小于等于, 用于数值比较 |
= |
等于, 用于数值或字符比较 |
<> |
不等于, 用于数值或字符比较 |
BETWEEN ... AND ... |
用于数值区间判断判断 (包含两端) |
IN |
字符包含关系判断 |
IS NULLL |
判断空 |
IS NOT NULL |
判断非空 |
AND |
逻辑与 |
OR |
逻辑或 |
NOT |
逻辑非 |
代码演示
SQLProducer.kt1 | import org.apache.rocketmq.client.producer.DefaultMQProducer |
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
RocketMQ 结合 Spring 框架
RocketMQ + Spring boot 只需要少量配置, 甚至可以使用原生方式
RocketMQ + Spring cloud 需要的配置也极少, 这里暂时不再演示
TODO => 2023-01-31 09:19