侧边栏壁纸
  • 累计撰写 75 篇文章
  • 累计创建 62 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

RocketMQ 快速学习教程与面试题

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

本教程面向有一定 Java 基础的开发者,目标是从"会用"到"吃透原理"再到"能扛生产"。全文约 15000 字,建议分 3-5 天阅读。

一、RocketMQ 全景图

1.1 定位与演进

RocketMQ 是阿里巴巴于 2012 年开源的分布式消息中间件,2016 年捐赠给 Apache 基金会,2017 年成为 Apache 顶级项目。它的诞生背景是:阿里内部原有的 ActiveMQ 在高并发场景下频繁出现消息堆积和延迟飙升,而 Kafka 虽然吞吐高但缺乏事务消息、延迟消息等业务刚需特性,因此阿里自研了 RocketMQ,目标是兼顾 Kafka 的高吞吐与 ActiveMQ 的丰富功能。

当前主要版本线:4.x(稳定、广泛使用)和 5.x(存算分离、gRPC 多语言、云原生)。本教程以 5.x 为主线,同时标注 4.x 差异点。

1.2 四大核心角色详解

                         ┌─────────────────┐
                         │   NameServer    │  ← 轻量注册中心 (AP 设计,节点间不通信)
                         │  9876 (默认端口) │
                         └────────┬────────┘
                                  │ 心跳上报 (每 30s)
            ┌─────────────────────┼─────────────────────┐
            │                     │                     │
     ┌──────▼──────┐       ┌──────▼──────┐       ┌──────▼──────┐
     │  Broker-M0  │◄─────►│  Broker-M1  │       │  Broker-S0  │
     │  (Master)   │ 复制   │  (Master)   │       │  (Slave)    │
     │  brokerId=0 │       │  brokerId=0 │       │  brokerId=1 │
     └──┬───────┬──┘       └──────┬──────┘       └─────────────┘
        │       │                 │
    ┌───▼──┐ ┌──▼───┐        ┌───▼──┐
    │Producer│ │Consumer│       │Consumer│
    └──────┘ └──────┘        └──────┘

NameServer

NameServer 是 RocketMQ 的注册中心,但它与 ZooKeeper、Eureka 等注册中心有本质不同:各 NameServer 节点之间完全不通信,不存在选举和共识协议。这种设计的哲学是——消息中间件对路由信息的要求是"最终一致"就够了,不需要强一致。每个 Broker 会向集群中所有 NameServer 发送心跳(默认每 30 秒一次),NameServer 在 120 秒内未收到心跳则认为该 Broker 下线。客户端(Producer / Consumer)启动时从 NameServer 拉取路由表并缓存在本地,之后每 30 秒更新一次。

核心源码入口:NamesrvController.startup()RouteInfoManager(管理所有 Broker 的路由信息)。

Broker

Broker 是消息存储和转发的核心。一个 Broker 进程对应一个 Master(brokerId=0)或一个 Slave(brokerId>0)。Master 负责读写,Slave 默认只读(Master 宕机时 Slave 可接管读请求)。Broker 内部的关键子系统:

  • RemotingServer:处理网络请求(Netty 实现,4.x 用自定义 Remoting 协议,5.x 新增 gRPC)
  • MessageStore:消息存储引擎(核心类 DefaultMessageStore
  • ReputMessageService:异步构建 ConsumeQueue 和 IndexFile 的后台线程
  • HAService:主从复制服务
  • FlushDiskService:刷盘服务

Producer

Producer 发送消息,关键机制:本地缓存路由表,发送时根据 Topic 找到 MessageQueue 列表,通过轮询或自定义策略选一个 Queue,再根据 Queue 所在的 Broker 地址发送。如果发送失败,Producer 有故障规避机制——默认会重试其他 Broker(sendLatencyFaultEnable=true 时开启)。

Consumer

Consumer 的 Push 模式(DefaultMQPushConsumer)并非真正的推送,而是长轮询拉取:Consumer 向 Broker 发起拉取请求,Broker 如果有消息就立即返回,没有消息就 Hold 住请求(默认最多 15 秒),期间有新消息到达就立即返回。这种设计兼顾了低延迟(接近推送)和低网络开销。Pull 模式(DefaultLitePullConsumer)则是完全的主动拉取,由应用程序控制拉取频率和位点。

二、消息存储——RocketMQ 最核心的设计

这是 RocketMQ 区别于其他 MQ 的根本所在,也是面试中被追问最深的部分。

2.1 CommitLog:万物归一

所有 Topic 的消息在 Broker 中共享一个 CommitLog 文件(物理上按 1GB 切分为多个文件,文件名是起始偏移量,如 00000000000000000000)。消息以追加(append-only)方式顺序写入,写入速度接近内存带宽(机械盘约 600MB/s,SSD 更高)。

一条消息在 CommitLog 中的物理结构:

┌──────────────────────────────────────────────────┐
│ totalSize(4B) │ magicCode(4B) │ bodyCRC(4B)     │
│ queueId(4B)   │ flag(4B)      │ queueOffset(8B)  │
│ physicOffset(8B) │ sysFlag(4B) │ bornTimeStamp(8B)│
│ bornHost(8B)  │ storeTimestamp(8B) │ storeHost(8B)│
│ reconsumeTimes(4B) │ preparedTransOffset(8B)     │
│ bodyLength(4B) │ body(N B)    │ topicLength(1B)  │
│ topic(N B)    │ propertiesLength(2B) │ properties(N B)│
└──────────────────────────────────────────────────┘

关键设计决策:为什么不按 Topic 分文件存储?因为如果每个 Topic 独立写文件,当有 100 个 Topic 同时写入时,就变成了 100 路随机写,磁盘 I/O 会急剧下降。顺序写一个文件的性能是随机写的数十倍甚至上百倍(尤其机械盘)。

2.2 ConsumeQueue:轻量级索引

ConsumeQueue 是面向消费者的逻辑队列索引,每个 Topic 的每个 QueueId 对应一个 ConsumeQueue 文件。文件中每条记录固定 20 字节:

┌────────────────────────────────────────┐
│ CommitLog Offset (8 bytes)             │  ← 消息在 CommitLog 中的物理位置
│ Message Size   (4 bytes)               │  ← 消息体大小
│ Tag HashCode   (8 bytes)              │  ← Tag 的哈希值,用于快速过滤
└────────────────────────────────────────┘

20 字节的设计极其精妙:因为固定长度,所以可以通过 offset * 20 直接计算出文件内位置(O(1) 随机访问),无需任何索引结构。一个 ConsumeQueue 文件默认存储 30 万条记录(约 5.72MB),写满后新建下一个文件。

Tag 过滤原理:Broker 在消费拉取时先用 ConsumeQueue 中的 tagHashCode 做快速比对(粗筛),通过后再从 CommitLog 读取完整消息的 Tag 字符串做精确匹配(因为 hash 可能冲突)。这一步把绝大多数不匹配的消息在索引层就过滤掉了,避免无谓的 CommitLog 读取。

2.3 消息写入全链路(源码级)

Producer.send(msg)
    │
    ▼
Broker 接收请求 (SendMessageProcessor)
    │
    ▼
CommitLog.putMessage()
    ├── 加锁(ReentrantLock / CAS,取决于 isReputMessageServiceLock)
    ├── 追加写入 MappedFile(MappedByteBuffer 或 FileChannel)
    ├── 返回写入结果的偏移量
    └── 释放锁
    │
    ▼ (异步,独立线程 ReputMessageService)
ReputMessageService.doReput()  ← 每 1ms 轮询一次
    ├── 从 CommitLog 中读取新写入的消息
    ├── 构建 ConsumeQueue 条目 → 写入对应 ConsumeQueue 文件
    ├── 构建 IndexFile 条目 → 写入 IndexFile
    └── 更新 reputFromOffset(记录已构建进度)
    │
    ▼ (异步,取决于刷盘策略)
FlushRealTimeService / FlushDiskService
    ├── ASYNC_FLUSH:GroupCommitService 定时刷盘(默认 500ms 或写满 10 页)
    └── SYNC_FLUSH:每条消息写完后立即调用 fsync,等待落盘后才返回 ACK

ReputMessageService 是 RocketMQ 存储引擎的灵魂。它用 reputFromOffset 指针追踪 CommitLog 的最新位置,以 1ms 的间隔不断轮询,将新消息异步构建到 ConsumeQueue 和 IndexFile。这意味着消息写入 CommitLog 后,需要约 1ms 才能被消费者看到(这是 RocketMQ 消费延迟的下限)。

2.4 零拷贝与内存映射

RocketMQ 大量使用了两种 I/O 优化技术:

mmap(MappedByteBuffer):用于小文件的读写,如 ConsumeQueue(每个文件仅 5.72MB)。mmap 将文件直接映射到用户空间内存,避免了内核态到用户态的数据拷贝(传统 read/write 需要 4 次拷贝 + 4 次上下文切换,mmap 减少为 1 次拷贝 + 2 次切换)。

sendfile / transferTo:用于大文件的网络传输,如消费拉取消息时从 CommitLog 读取。数据直接从文件描述符传输到 Socket 缓冲区,不经过用户空间(零拷贝)。

传统 I/O:   Disk → Kernel Buffer → User Buffer → Socket Buffer → NIC
mmap:       Disk → Kernel Buffer (mapped to User Space) → Socket Buffer → NIC
sendfile:   Disk → Kernel Buffer → NIC (via DMA)

2.5 消息删除与过期

CommitLog 文件默认保留 72 小时(fileReservedTime),过期后由 CleanCommitLogService 后台线程在每天凌晨 4 点(deleteWhen)清理。删除的条件:文件过期且磁盘使用率超过阈值(默认 75%)。如果磁盘使用率超过 90%,会强制删除过期文件,同时拒绝新消息写入(diskMaxUsedSpaceRatio)。

ConsumeQueue 文件的删除依赖 CommitLog 的最小 offset——如果 CommitLog 中某 offset 之前的消息都已被删除,对应的 ConsumeQueue 条目也就失去了指向,可以被清理。

三、生产者深度机制

3.1 消息发送的三种模式

同步发送(Sync):发送后阻塞等待 Broker ACK。适合对消息可靠性要求极高的场景(订单、支付)。底层实现是 Netty 的 SyncInvoker,发送线程调用 future.sync() 阻塞等待。

异步发送(Async):发送后立即返回,通过回调处理结果。适合对延迟敏感但允许少量失败的场景(日志、通知)。底层是 Netty 的异步回调,Broker 返回后在 EventLoop 线程中执行用户回调(注意:回调中不要做耗时操作,否则会阻塞 EventLoop)。

单向发送(OneWay):只管发,不等 ACK,连回调都没有。适合对可靠性无要求但追求极致吞吐的场景(如日志采集)。底层是直接写 Channel 不等响应。

3.2 路由选择与负载均衡

Producer 发送消息时如何选择 MessageQueue?默认策略是轮询(RoundRobin),所有可用 Queue 依次发一遍再循环。源码入口 MessageQueueSelector 接口:

// 默认轮询
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    int index = Math.abs(counter.getAndIncrement() % mqs.size());
    return mqs.get(index);
}

顺序消息场景下需要自定义 Selector,将同一业务 ID 的消息固定到同一个 Queue(hash 取模)。

3.3 故障规避(LatencyFaultTolerance)

这是 RocketMQ 4.x+ 的一个重要优化:当某个 Broker 响应延迟较高时,Producer 会在一段时间内避开该 Broker,优先选择其他 Broker 发送。

故障规避的延迟阈值(默认值):
延迟 > 50ms  → 规避 30 秒
延迟 > 100ms → 规避 60 秒
延迟 > 550ms → 规避 120 秒
延迟 > 1000ms → 规避 1800 秒

通过 producer.setSendLatencyFaultEnable(true) 开启。这个机制在 Broker 出现网络抖动或 GC 停顿时非常有效,能显著降低发送失败率。

3.4 消息重试机制

同步发送默认重试 2 次(共 3 次),异步发送默认重试 2 次。重试时会重新选择 Broker(结合故障规避),而不是盲目重试同一个 Broker。

重试的触发条件:发送超时(默认 3 秒)、Broker 返回 FLUSH_DISK_TIMEOUTSLAVE_NOT_AVAILABLE、网络异常。注意:FLUSH_SLAVE_TIMEOUT(异步复制超时)不触发重试,因为消息已经写入了 Master。

四、消费者深度机制

4.1 Push 模式的本质——长轮询

DefaultMQPushConsumer 内部其实是 PullMessageService 在循环拉取。核心流程:

PullMessageService (单线程循环)
    │
    ├── 从 processQueueTable 取出下一个要拉取的 MessageQueue
    │
    ├── 向 Broker 发送 PullRequest(包含 queueId, offset, maxMsgNums=32)
    │
    ├── Broker 收到 PullRequest:
    │   ├── 有消息 → 立即返回
    │   └── 无消息 → Hold 住请求,挂起 PullRequest(最多 15 秒)
    │       └── Hold 期间有新消息到达 → 立即唤醒并返回
    │
    ├── 收到消息 → 提交给 ConsumeMessageService 异步消费
    │
    └── 消费完成 → 更新 offset → 将新的 PullRequest 放入队列 → 继续循环

关键参数:pullInterval(拉取间隔,默认 0 即连续拉取)、consumeMessageBatchMaxSize(单次消费消息数,默认 1)、pullBatchSize(单次拉取消息数,默认 32)、consumeThreadMin / consumeThreadMax(消费线程池大小,默认 20/20)。

4.2 Rebalance——消费者组的负载均衡

当一个 Consumer Group 中的实例数发生变化(新增/宕机/重启),或者 Topic 的 Queue 数量变化时,需要重新分配 Queue 给各消费者。这个过程叫 Rebalance,每 20 秒触发一次。

核心算法(AllocateMessageQueueStrategy):

平均分配(AllocateMessageQueueAveragely)——默认策略:

Queue 数量: 8,  Consumer 数量: 3
分配结果:
  Consumer-0: Queue[0], Queue[1], Queue[2]  ← 前 2 个多分一个(8%3=2)
  Consumer-1: Queue[3], Queue[4], Queue[5]
  Consumer-2: Queue[6], Queue[7]

环形分配(AllocateMessageQueueByCircle)

Queue 数量: 8,  Consumer 数量: 3
分配结果:
  Consumer-0: Queue[0], Queue[3], Queue[6]
  Consumer-1: Queue[1], Queue[4], Queue[7]
  Consumer-2: Queue[2], Queue[5]

还有其他策略:一致性哈希(AllocateMessageQueueConsistentHash)、按机房分配(AllocateMessageQueueByMachineRoom)、按配置分配(AllocateMessageQueueByConfig)。

Rebalance 的前提:所有消费者必须看到一致的消费者列表。这依赖于每个消费者定期向 Broker 上报心跳(包含自身 clientId),Broker 维护消费者组的完整列表,消费者在 Rebalance 时从 Broker 拉取这个列表并排序。

4.3 消费位点(Offset)管理

集群模式:offset 存储在 Broker 端的 config/consumerOffset.json 中,消费者每消费一批就更新一次(默认 5 秒批量提交一次,不是每条消息都提交)。消费者重启后从 Broker 拉取最新 offset 继续消费。

广播模式:offset 存储在消费者本地的 ~/.rocketmq_offsets/{clientId}/offsets.json 中,每个消费者独立维护。

首次消费的起点策略:

  • CONSUME_FROM_LAST_OFFSET:从最新位置开始(默认,适合普通场景)
  • CONSUME_FROM_FIRST_OFFSET:从最早位置开始(适合需要回溯历史数据的场景)
  • CONSUME_FROM_TIMESTAMP:从指定时间点开始(适合故障恢复后跳过堆积直接消费新消息)

4.4 消费幂等——生产必做

消息可能因为重试而被消费多次(网络超时后 Broker 不确定消费结果,会重发),所以消费者必须实现幂等。常见方案:

数据库唯一键:以消息 ID 或业务主键作为唯一键,插入成功即为消费成功,重复插入走 INSERT IGNOREON DUPLICATE KEY UPDATE

Redis + Lua 去重

String script = """
    if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
        redis.call('expire', KEYS[1], ARGV[2])
        return 1
    else
        return 0
    end
""";
Long result = jedis.eval(script, List.of("msg:" + msgId), List.of("1", "86400"));
if (result == 0) {
    log.info("重复消息,跳过: {}", msgId);
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

业务状态机:在消费前检查业务状态(如订单是否已支付),如果已经处理过则直接返回成功。

五、事务消息——分布式事务的优雅解法

5.1 为什么需要事务消息

经典问题:用户下单后,需要同时做"扣减库存"和"发送订单消息到物流系统"。如果先扣库存再发消息,发消息失败则库存白扣;如果先发消息再扣库存,扣库存失败则物流收到无效消息。传统方案(本地消息表 + 定时任务补偿)实现复杂且延迟高。

事务消息的本质是:将"本地事务执行"和"消息投递"绑定为一个原子操作——要么都成功,要么都失败。

5.2 完整流程(源码级)

                    Producer                         Broker
                      │                                │
  ① 发送半消息 ──────►│──── Half Message ─────────────►│
                      │                                │ 存入 RMQ_SYS_TRANS_HALF_TOPIC
                      │                                │ (消费者不可见)
                      │◄─── 半消息 ACK ────────────────│
                      │                                │
  ② 执行本地事务 ────►│ doLocalTransaction()           │
                      │                                │
  ③ 提交/回滚 ────────│──── Commit / Rollback ────────►│
                      │                                │ Commit: 将消息从 HALF_TOPIC
                      │                                │       移到真实 Topic
                      │                                │ Rollback: 删除半消息
                      │                                │
                      │     (如果 Broker 长时间未收到 ③) │
  ④ 事务回查 ─────────│◄─── CheckTransaction ──────────│ (定时任务,默认每 60s 检查)
                      │                                │
                      │──── 返回状态 ──────────────────►│
                      │  COMMIT / ROLLBACK / UNKNOW    │

半消息(Half Message)的存储:Broker 收到半消息后,将原始 Topic 和 QueueId 保存到消息 Properties 中,然后将消息写入 RMQ_SYS_TRANS_HALF_TOPIC 的 Queue 0。消费者订阅的是真实 Topic,所以看不到半消息。

事务回查的实现:Broker 中的 TransactionalMessageCheckService 定时扫描 HALF_TOPIC 中的消息,对于超过 transactionTimeout(默认 60 秒)仍未收到确认的消息,通过 RPC 调用 Producer 的 checkLocalTransaction 方法。Producer 需要自己查询本地事务状态表(通常是数据库中的一张 transaction_log 表)来返回最终状态。回查最多执行 transactionCheckMax 次(默认 15 次),超过后默认丢弃(可通过 transactionCheckListener 自定义处理)。

5.3 本地事务状态表设计

CREATE TABLE transaction_log (
    id              BIGINT PRIMARY KEY AUTO_INCREMENT,
    transaction_id  VARCHAR(64) NOT NULL UNIQUE,  -- RocketMQ 事务 ID
    business_id     VARCHAR(64) NOT NULL,          -- 业务主键(如订单号)
    status          TINYINT NOT NULL,              -- 0=进行中, 1=已提交, 2=已回滚
    created_at      DATETIME NOT NULL,
    updated_at      DATETIME NOT NULL,
    INDEX idx_business_id (business_id)
);

executeLocalTransaction 中:执行业务逻辑后,插入或更新 transaction_log 状态为"已提交",然后返回 COMMIT_MESSAGE。如果异常,更新为"已回滚"并返回 ROLLBACK_MESSAGE

checkLocalTransaction 中:根据 transaction_id 查询 transaction_log,返回对应状态。如果记录不存在(本地事务根本没执行),返回 UNKNOW 让 Broker 继续回查。

六、延迟消息与定时消息

6.1 4.x 的实现(固定级别)

RocketMQ 4.x 支持 18 个固定延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

实现原理:Broker 收到延迟消息后,不直接写入目标 Topic,而是写入 SCHEDULE_TOPIC_XXXX,QueueId 对应延迟级别(level-1)。ScheduleMessageService 为每个级别启动一个定时线程,扫描对应 Queue 中的消息,如果消息的投递时间已到,就将其从 SCHEDULE_TOPIC_XXXX 取出,重新写入真实的 Topic。

延迟消息写入流程:
Producer → Broker → 修改 Topic 为 SCHEDULE_TOPIC_XXXX → 写入 CommitLog
                                                          ↓
                                               ConsumeQueue[SCHEDULE_TOPIC/QueueId]
                                                          ↓
                                            ScheduleMessageService 定时扫描
                                                          ↓ (到期)
                                            重新写入 CommitLog (真实 Topic)
                                                          ↓
                                            Consumer 正常消费

6.2 5.x 的改进(任意延迟)

5.x 引入了基于时间戳的定时消息(setDeliveryTimestamp),底层使用 TimerWheel(时间轮)替代了 4.x 的固定级别扫描,支持任意精度的延迟(最小粒度 1 秒)。

七、顺序消息深度解析

7.1 发送端有序

仅保证发送到同一个 Queue 内有序。通过 MessageQueueSelector 实现:

// 按订单号 hash 到同一个 Queue
producer.send(msg, (mqs, msg1, arg) -> {
    String orderId = (String) arg;
    int index = Math.abs(orderId.hashCode()) % mqs.size();
    return mqs.get(index);
}, orderId);

注意:hashCode 可能为负数,必须取绝对值。更安全的做法是用 orderId.hashCode() & Integer.MAX_VALUE(避免 Integer.MIN_VALUE 取绝对值溢出)。

7.2 消费端有序

MessageListenerOrderly 的实现机制:

ConsumeMessageOrderlyService
    │
    ├── 为每个 MessageQueue 维护一个 ConsumeRequest(相当于一个消费线程)
    │
    ├── 消费前对 Queue 加锁(ProcessQueue.locked = true)
    │   ├── 本地锁:ConcurrentHashMap<MessageQueue, Object>
    │   └── 分布式锁:向 Broker 发送 LockBatch 请求(保证同一时刻只有一个消费者消费该 Queue)
    │
    ├── 消费消息(串行执行,不会并发)
    │
    └── 消费成功后提交 offset(顺序消费模式下每次消费都立即提交 offset,不像并发模式批量提交)

顺序消费的核心代价是:同一 Queue 内消费是串行的,如果某个 Queue 的消息消费慢,会影响整体吞吐。可以通过增加 Queue 数量来提高并行度。

八、高可用与生产部署

8.1 部署拓扑

多 Master 多 Slave(推荐)

┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│Broker-M0│◄──►│Broker-S0│    │Broker-M1│◄──►│Broker-S1│
│ (Master)│    │ (Slave) │    │ (Master)│    │ (Slave) │
└────┬────┘    └────┬────┘    └────┬────┘    └────┬────┘
     │              │              │              │
     └──────────────┴──────────────┴──────────────┘
                         │
                  ┌──────▼──────┐
                  │ NameServer×3│
                  └─────────────┘

每个 Topic 的 Queue 分布在多个 Master 上,单个 Master 宕机只影响部分 Queue,其他 Master 正常服务。Slave 在 Master 宕机时接管读请求(通过设置 slaveReadEnable=true,当 Master 的 ConsumeQueue 落后 CommitLog 较多时,Broker 会建议消费者从 Slave 读)。

8.2 主从复制详解

异步复制(ASYNC_MASTER):Master 收到消息写入 CommitLog 后立即返回 ACK,同时异步将消息发给 Slave。性能最好,但 Master 宕机时 Slave 可能丢失最近几条消息。

同步双写(SYNC_MASTER):Master 写入 CommitLog 后,等待 Slave 也写入成功才返回 ACK。数据最安全,但延迟增加(取决于 Master 到 Slave 的网络延迟)。

源码实现:HAServiceGroupTransferService 负责同步双写的等待与通知。Master 每写一条消息,HAClient(与 Slave 的 TCP 长连接)就将数据推给 Slave,Slave 写完后回传最大 offset,GroupTransferService 比对 offset 决定是否唤醒等待的发送线程。

8.3 生产环境 JVM 参数建议

# Broker JVM(8核16G 机器)
-Xms8g -Xmx8g                    # 堆内存固定,避免 GC 抖动
-XX:+UseG1GC                      # 使用 G1 垃圾收集器
-XX:MaxGCPauseMillis=50           # 目标 GC 停顿时间
-XX:G1HeapRegionSize=16m          # G1 Region 大小
-XX:InitiatingHeapOccupancyPercent=45  # 触发并发标记的堆占用率
-XX:+ParallelRefProcEnabled       # 并行引用处理
-XX:MaxDirectMemorySize=8g        # 堆外内存(用于 MappedByteBuffer)
-XX:+HeapDumpOnOutOfMemoryError   # OOM 时自动 dump
-XX:HeapDumpPath=/data/logs/rocketmq/heapdump.hprof

# NameServer JVM(轻量,1-2G 足够)
-Xms1g -Xmx1g
-XX:+UseG1GC

8.4 操作系统参数调优

# /etc/sysctl.conf
vm.swappiness=1                    # 尽量不使用 swap
vm.dirty_ratio=40                  # 脏页占内存比例超过 40% 时强制刷盘
vm.dirty_background_ratio=10       # 脏页占 10% 时后台线程开始刷盘
vm.overcommit_memory=1             # 允许内存超分配(对 mmap 友好)
net.core.somaxconn=65535           # TCP 全连接队列大小
net.ipv4.tcp_max_syn_backlog=65535 # TCP 半连接队列大小

# 文件描述符
ulimit -n 655350

# 磁盘调度算法(SSD 用 noop,HDD 用 deadline)
echo noop > /sys/block/sda/queue/scheduler

8.5 监控告警关键指标

指标 告警阈值 含义
rocketmq_consumer_offset_diff > 10000 消费堆积量,超过阈值说明消费跟不上
rocketmq_producer_tps 波动 > 30% 生产 TPS 异常波动
rocketmq_broker_commitlog_max_offset_diff > Master-Slave 同步延迟 主从复制延迟
rocketmq_consumer_group_count 变化 消费者实例数变化(可能宕机)
Broker JVM GC 耗时 Full GC > 1s JVM 压力过大
磁盘使用率 > 75% 接近自动清理阈值

推荐监控栈:rocketmq-exporter(官方 Prometheus Exporter)+ Grafana Dashboard。

九、面试深度题 & 参考答案

基础概念层(初级)

Q1:RocketMQ 和 Kafka 的区别?各自适用什么场景?

从架构、存储、功能三个维度对比。架构上,Kafka 依赖 ZooKeeper(新版 KRaft 模式去掉了 ZK)做元数据管理,RocketMQ 用轻量级 NameServer。存储上,Kafka 按 Topic 分区独立写文件(分区多时随机写性能下降),RocketMQ 所有 Topic 共享 CommitLog 顺序写(写入性能更稳定)。功能上,RocketMQ 原生支持事务消息、延迟消息、消息回溯、死信队列,Kafka 的事务语义更偏向 Exactly-Once 流处理(read_committed 隔离级别),延迟消息需要自己实现。

选型原则:业务消息(电商、金融、订单)选 RocketMQ,功能丰富且可靠性高;日志流(ELK、ClickHouse 入库)和实时计算(Flink / Spark Streaming)选 Kafka,吞吐量大且生态成熟。

Q2:RocketMQ 的消息由哪些部分组成?一条消息最大多大?

消息由 Header(totalSize、magicCode、CRC、queueId、offset、timestamp、host 等固定字段)+ Topic(变长)+ Body(变长)+ Properties(变长,含 Tag、Keys、自定义属性)组成。默认最大 4MB(maxMessageSize),可通过 Broker 配置调整,但生产环境不建议超过 1MB——大消息会增大 CommitLog 写入延迟、增加网络传输开销、导致消费端内存暴涨。大文件应上传 OSS/S3 后在消息中传 URL。

Q3:RocketMQ 支持哪些消费模式?区别是什么?

集群消费(CLUSTERING):同一 Group 下所有消费者共同分担消息,每条消息只被一个消费者处理,offset 由 Broker 维护。广播消费(BROADCASTING):同一 Group 下每个消费者都收到全量消息,offset 本地维护,消费失败不重试。生产环境绝大多数用集群模式,广播模式仅用于本地缓存刷新、配置推送等场景。

原理深度层(中级)

Q4:RocketMQ 如何保证消息不丢失?从三端分别说。

生产者端:使用同步发送,retryTimesWhenSendFailed 设为 3 次以上,开启故障规避(sendLatencyFaultEnable=true),对关键消息做本地消息表兜底(先写 DB 再发送,定时任务扫描未确认的消息重发)。Broker 端:使用同步刷盘(SYNC_FLUSH)确保每条消息落盘后再返回 ACK,使用同步双写(SYNC_MASTER)确保 Slave 也有副本。消费者端:手动控制 offset 提交时机(业务逻辑全部处理完成后再返回 CONSUME_SUCCESS),失败时返回 RECONSUME_LATER 触发重试,同时做消费幂等(唯一键 / Redis 去重)。

三端中最容易丢消息的环节:Broker 异步刷盘时宕机(PageCache 中未落盘的消息丢失)、消费者返回 CONSUME_SUCCESS 后业务实际未处理完(先提交 offset 再执行业务的 bug)。

Q5:RocketMQ 为什么选择顺序写 CommitLog 而不是按 Topic 分文件写?

磁盘 I/O 的特性:顺序写速度远高于随机写。HDD 顺序写约 600MB/s(接近内存写入),随机写仅约 100 IOPS(约 0.4MB/s,差 1500 倍)。SSD 顺序写和随机写的差距虽然缩小(约 3-5 倍),但顺序写仍然更优。如果按 Topic 分文件,100 个 Topic 同时写入就是 100 路随机写,性能急剧下降。CommitLog 将所有 Topic 的消息混写到同一个文件,保证始终只有一路顺序写,写入性能稳定不受 Topic 数量影响。

代价是:消费时需要通过 ConsumeQueue 做二次索引,从 CommitLog 中随机读消息体(随机读)。但读取可以通过操作系统 PageCache 缓解(热点数据在内存中),而写入无法用 PageCache 加速(必须落盘才安全)。所以 RocketMQ 的设计是"优化写入瓶颈,用索引弥补读取"。

Q6:ConsumeQueue 为什么设计成固定 20 字节?

固定长度的核心好处是 O(1) 随机定位。消费者拉取第 N 条消息时,直接计算 offset = N * 20 跳转到文件对应位置读取,无需任何遍历或索引结构。20 字节的组成:CommitLog Offset(8 字节,定位物理消息)+ Size(4 字节,消息体大小)+ Tag HashCode(8 字节,用于过滤)。这三个字段是消费拉取所需的最小信息集:Offset 和 Size 用于从 CommitLog 读取消息,Tag HashCode 用于快速过滤。

如果改成变长记录,就必须引入额外的索引结构(如 B+Tree)来支持随机定位,增加了复杂度和维护成本。

Q7:RocketMQ 事务消息的完整流程,以及半消息为什么对消费者不可见?

完整流程分四步:① 生产者发送半消息(Half Message)到 Broker;② Broker 将半消息存入 RMQ_SYS_TRANS_HALF_TOPIC 的 Queue 0,替换原始 Topic 和 QueueId(保存到 Properties 中),对消费者不可见;③ 生产者执行本地事务,根据结果向 Broker 发送 Commit 或 Rollback;④ 如果 Broker 在 transactionTimeout(默认 60s)内未收到确认,会启动事务回查(TransactionalMessageCheckService),定时调用生产者的 checkLocalTransaction

半消息不可见的原因:消费者订阅的是真实 Topic,而半消息存储在 RMQ_SYS_TRANS_HALF_TOPIC(系统内部 Topic),消费者无法订阅。Commit 时 Broker 将消息从 HALF_TOPIC 取出,恢复原始 Topic 和 QueueId,重新写入 CommitLog,此时消费者才能看到。Rollback 时 Broker 将半消息写入 RMQ_SYS_TRANS_OP_HALF_TOPIC(标记为已处理),定时清理任务会删除这些消息。

Q8:RocketMQ 的 Push 消费者是怎么实现"伪推送"的?延迟有多高?

Push 消费者的本质是 PullMessageService 线程循环发起拉取请求。当 Broker 有消息时立即返回(延迟取决于网络 RTT,通常 1-5ms);当 Broker 无消息时,请求会被 Hold 住(默认最多 15 秒,pullRequestHoldTime),一旦有新消息到达就立即唤醒返回。所以新消息的消费延迟 ≈ 网络 RTT(毫秒级),而不是 15 秒的轮询间隔。

这个设计的精妙之处在于:既避免了传统轮询的"空请求浪费",又避免了真正推送的"服务端维护长连接压力"。Broker 只需要 Hold 住请求即可,无需主动推送,连接管理复杂度大大降低。

生产实战层(高级)

Q9:线上消息堆积了 500 万条,你怎么排查和处理?

排查四步法。第一步,看 Dashboard 的 consumerOffsetbrokerOffset 差值,确认堆积发生在哪个 Topic 的哪个 Queue。第二步,看消费 TPS(consumeTps)——如果 TPS 正常但堆积在增长,说明生产速率远超消费速率,需要扩容消费者;如果 TPS 接近 0,说明消费端可能卡住了(死锁、外部接口超时、GC 停顿)。第三步,看消费端的日志和监控(JVM GC 日志、线程栈、慢 SQL),定位具体原因。第四步,看 Broker 端的磁盘 I/O 和网络状况,排除 Broker 侧瓶颈。

应急处理:如果消费端有 bug 需要修复但时间较长,可以临时扩容消费者实例数(不超过 Queue 数)来减缓堆积增长速度。如果堆积量极大且消费端修复后需要快速消化,可以新建一个 Queue 数量更多的 Topic,写一个转发 Consumer 将堆积消息快速转移到新 Topic,再用更多消费者并行消费。

长期优化:增加 Queue 数量(提高并行度上限)、优化消费逻辑(异步化、批量处理)、增加消费者实例、配置合理的消费线程池(consumeThreadMin / consumeThreadMax)。

Q10:消费者组扩容时,为什么增加超过 Queue 数量的实例没有效果?

因为 RocketMQ 的最小并行单元是 MessageQueue。Rebalance 时每个 Queue 只能分配给一个消费者,如果消费者数量 > Queue 数量,多出来的消费者分不到 Queue,处于空闲状态。例如 Topic 有 8 个 Queue,即使部署 16 个消费者实例,也只有 8 个实例在工作。所以扩容消费者的前提是确认 Queue 数量足够。如果 Queue 不够,需要先增加 Topic 的 Queue 数(mqadmin updateTopic -w 16)。

Q11:RocketMQ 如何实现消息的 Exactly-Once 语义?

RocketMQ 本身保证的是 At-Least-Once(至少投递一次),要实现 Exactly-Once 需要消费端配合幂等。方案一:数据库唯一键约束,以业务主键作为唯一索引,重复消费时 INSERT 失败走 UPDATE 或直接忽略。方案二:Redis SETNX + 过期时间,消费前检查消息 ID 是否已处理。方案三:业务状态机,消费前检查业务状态(如订单状态为"待支付"才处理支付回调)。方案四:RocketMQ 5.x 的事务消息 + 幂等表组合,通过 transaction_log 表同时实现事务确认和幂等判断。

真正的 Exactly-Once(端到端精确一次)在分布式系统中极难实现,业界普遍做法是 At-Least-Once + 消费端幂等 = 业务层面的 Exactly-Once。

Q12:Master 宕机后,RocketMQ 的消息收发会受什么影响?

Master 宕机的影响分三个层面。发送端:Producer 的路由表会在 30 秒内更新(下一次从 NameServer 拉取),故障规避机制会立即将流量切到其他 Master。但宕机 Master 上的 Queue 在 Slave 接管前不可写——这意味着这些 Queue 的消息发送会失败(Producer 会自动重试到其他 Queue)。消费端:如果开启了 slaveReadEnable=true,消费者会自动从 Slave 读取消息(Slave 是只读的),消费不会中断。但如果 Slave 的数据落后于 Master(异步复制模式下),会有短暂的消息延迟。高可用恢复:如果配置了 Dledger(Raft 模式),集群会自动选举新的 Leader,实现秒级切换。

架构设计层(资深)

Q13:设计一个基于 RocketMQ 的订单超时取消系统,你怎么做?

方案:使用延迟消息实现。用户下单后发送一条延迟消息(延迟 30 分钟),消费者收到消息后检查订单状态:如果未支付则执行取消逻辑(释放库存、更新订单状态);如果已支付则直接忽略。

关键设计点:① 延迟消息的 Topic 单独创建,Queue 数量根据订单量设置(如 16 个),保证并行度。② 消费者做幂等:以 orderId 为唯一键检查订单状态表,只有"待支付"状态才执行取消。③ 如果延迟消息精度不够(4.x 的 30 分钟是固定级别 level 16),可以选 20 分钟级别 + 消费端检查时间差再等待。5.x 支持任意时间戳,可以精确到秒。④ 监控延迟消息的投递延迟(实际投递时间 - 预期投递时间),如果延迟过大说明 Broker 压力大。

对比其他方案:定时任务扫表(简单但延迟高、DB 压力大)、Redis 过期通知(不可靠、消息可能丢失)、时间轮(需要自建基础设施)。RocketMQ 延迟消息是最优解——可靠、延迟可控、无需额外基础设施。

Q14:如何设计一个支持百万级 TPS 的消息系统?RocketMQ 的瓶颈在哪里?

百万 TPS 需要横向扩展。单个 RocketMQ Broker(8 核 16G SSD)的写入 TPS 约 10 万(同步发送,256 字节消息体),要达到百万 TPS 需要 10+ 个 Master。

架构设计:① Broker 集群:10 个 Master + 10 个 Slave,部署在 2 个机房(每个机房 5M+5S,跨机房同步双写)。② NameServer:3-5 个节点,每个机房至少 2 个。③ Topic 设计:按业务拆分 Topic,每个 Topic 的 Queue 数量 = 目标 TPS / 单 Queue TPS(如目标 10 万 TPS,单 Queue 约 5000 TPS,需要 20 个 Queue 分布在多个 Broker 上)。④ 消费者:Consumer Group 数量与 Queue 数量匹配,每个消费者实例处理 1-2 个 Queue。⑤ 存储:使用 SSD,配置 vm.swappiness=1,JVM 堆内存 ≥ 8G,MaxDirectMemorySize ≥ 8G。

RocketMQ 的瓶颈:① 单 Broker 的写入吞吐受限于磁盘顺序写速度(SSD 约 500MB/s)。② CommitLog 写入是串行的(单文件追加),可以通过 Topic 级别的分区(不同 Broker 写不同文件)来并行化。③ ConsumeQueue 构建的异步线程是单线程(ReputMessageService),如果 Topic 很多可能导致索引构建延迟。④ 消费端的处理能力往往是最大瓶颈——生产快消费慢导致堆积。

Q15:RocketMQ 5.x 的存算分离架构相比 4.x 有什么优势?

4.x 的 Broker 是有状态的——既负责消息路由(计算),又负责消息存储(存储),扩容 Broker 需要同时考虑计算和存储的平衡,数据迁移成本高。

5.x 引入存算分离后:计算层(Broker Proxy)无状态,只负责协议解析、消息路由、流量控制,可以随时水平扩缩容;存储层(CommitLog Store)专注于数据持久化、复制、索引构建,可以独立扩容存储容量。两者通过内部 RPC 通信。

优势:① 弹性更好——计算不足加 Broker Proxy,存储不足加 Store 节点,互不影响。② 运维简化——Broker Proxy 无状态,重启无数据迁移风险。③ 多租户隔离——不同租户可以映射到不同的 Store 节点,实现资源隔离。④ 云原生友好——无状态组件适合 K8s Deployment,有状态组件适合 StatefulSet + PVC。

十、源码阅读指南

10.1 推荐阅读顺序

第 1 周:理解消息发送全链路
  入口:DefaultMQProducerImpl.send()
  重点:路由选择 → 序列化 → Netty 发送 → 故障规避 → 重试

第 2 周:理解消息存储全链路
  入口:CommitLog.putMessage() → ReputMessageService.doReput()
  重点:MappedFile 写入 → ConsumeQueue 构建 → IndexFile 构建 → 刷盘策略

第 3 周:理解消息消费全链路
  入口:DefaultMQPushConsumerImpl.pullMessage() → ConsumeMessageService
  重点:长轮询 → Rebalance → 消息分发 → 消费线程池 → offset 提交

第 4 周:理解事务消息
  入口:TransactionalMessageBridge → TransactionalMessageCheckService
  重点:半消息存储 → 事务提交/回滚 → 定时回查

10.2 调试技巧

  1. 本地启动 NameServer + Broker(单机即可),broker.conf 配置 brokerIP1=127.0.0.1
  2. SendMessageProcessor.processRequest() 打断点,跟踪一条消息从接收到存储的全过程。
  3. ReputMessageService.doReput() 打断点,观察 ConsumeQueue 是如何异步构建的。
  4. PullMessageProcessor.processRequest() 打断点,观察消费拉取的完整流程。
  5. 使用 Arthas 在线诊断工具观察生产环境的线程栈、方法耗时、消息流转。

十一、学习资源

资源 说明
RocketMQ 官方文档 最权威的参考,尤其 Best Practice 章节
RocketMQ 源码 GitHub 仓库,建议 clone 后按上述顺序阅读
RocketMQ 技术内幕(丁威 著) 目前最好的中文源码分析书籍
极客时间:消息队列高手课 李玥老师出品,覆盖 RocketMQ + Kafka 的原理与实战
rocketmq-learning 带详细注释的源码学习仓库

本教程基于 RocketMQ 5.x,部分内容同样适用于 4.x。API 差异请参考官方文档:https://rocketmq.apache.org

建议学习方式:每读完一节就动手实验,结合源码断点验证理解,面试前重点准备 Q4/Q5/Q7/Q9/Q11/Q13 这六道高频深度题。

0

评论区