一、Kafka 全景图
1.1 定位与演进
Kafka 由 LinkedIn 于 2011 年开源,现为 Apache 顶级项目。最初定位为分布式日志系统(Log Aggregation),后来演变为"事件流平台"(Event Streaming Platform),广泛用于日志采集、实时数据管道、流计算、CDC(Change Data Capture)等场景。
版本演进关键节点:
- 0.8(2013):引入复制机制(Replication),具备高可用能力
- 0.10(2016):引入 Kafka Streams,具备流处理能力
- 0.11(2017):引入事务(Transactions)和 Exactly-Once 语义
- 2.8(2021):引入 KRaft 模式(Early Access),开始去 ZooKeeper 依赖
- 3.3+(2022):KRaft 模式正式生产可用(GA)
- 4.0(2025):完全移除 ZooKeeper 模式,KRaft 成为唯一运行模式
Kafka 与 RocketMQ 的根本区别:Kafka 的设计哲学是"高吞吐的日志管道",一切设计围绕吞吐量和顺序写入优化;RocketMQ 的设计哲学是"金融级消息中间件",一切设计围绕可靠性和业务功能(事务、延迟、死信队列)。选型时记住:日志和流计算选 Kafka,业务消息选 RocketMQ。
1.2 架构全景
┌──────────────────┐
│ ZooKeeper / │
│ KRaft Quorum │ ← 元数据管理 + Leader 选举
└────────┬─────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Broker-0 │ │ Broker-1 │ │ Broker-2 │
│ (Leader │ │ (Follower │ │ (Leader │
│ p0, p2) │ │ p0, p2) │ │ p1) │
└──┬───────┬──┘ └──────┬──────┘ └──────┬──────┘
│ │ │ │
┌───▼──┐ ┌──▼───┐ ┌───▼──┐ ┌───▼──┐
│Producer│ │Consumer│ │Consumer│ │Producer│
└──────┘ └──────┘ └──────┘ └──────┘
Kafka 集群由多个 Broker 组成,每个 Broker 是一个独立的 Kafka 进程。 Topic 被切分为多个 Partition(分区),每个 Partition 分布在不同的 Broker 上,实现水平扩展。每个 Partition 有 Leader 和 Follower 副本,Leader 负责读写,Follower 从 Leader 同步数据。
1.3 核心角色详解
ZooKeeper / KRaft
Kafka 早期版本依赖 ZooKeeper 管理元数据(Broker 列表、Topic 配置、Partition 分配、Controller 选举)。ZooKeeper 的问题:① 元数据更新通过 ZAB 协议达成共识,延迟高(几十毫秒级);② Partition 数量过多时 ZK 压力大(每个 Partition 在 ZK 上是一个 ZNode);③ Controller 故障转移依赖 ZK 的 Session 超时检测,恢复慢(秒级)。
KRaft(Kafka Raft Metadata Mode)是 Kafka 自研的元数据管理方案,用内置的 Raft 协议替代 ZooKeeper。KRaft 的优势:① 元数据存储在 Kafka 内部的 __cluster_metadata Topic 中,与消息存储共用同一套高效的日志存储引擎;② 支持百万级 Partition(ZooKeeper 模式下建议不超过 20 万);③ Controller 故障转移基于 Raft 选举,毫秒级恢复。Kafka 4.0 开始 KRaft 成为唯一模式,ZooKeeper 模式已被完全移除。
Broker
Broker 是 Kafka 的服务节点,核心职责:接收生产者消息、存储消息到磁盘、响应消费者拉取请求、管理副本同步。每个 Broker 内部的关键组件:
SocketServer:基于 NIO 的多线程网络层(Acceptor 线程接收连接 → Processor 线程处理请求 → RequestHandler 线程池处理业务逻辑)LogManager:管理所有 Partition 的日志存储(每个 Partition 对应一个Log对象,由多个LogSegment组成)ReplicaManager:管理副本同步,Follower 通过ReplicaFetcherThread从 Leader 拉取消息GroupCoordinator:管理 Consumer Group 的 offset 和 RebalanceKafkaController(仅 Controller Broker 上运行):管理 Partition Leader 选举、副本分配、Broker 上下线
Controller
Controller 是 Kafka 集群中一个特殊的 Broker(通过 ZooKeeper/KRaft 选举产生),负责集群级别的元数据管理:Partition Leader 选举、副本重新分配(Reassignment)、Broker 上下线通知。整个集群同一时刻只有一个 Controller,Controller 故障时其他 Broker 竞选成为新 Controller。
Producer
Kafka Producer 是异步、批量、高效的。核心流程:消息先写入 RecordAccumulator(本地缓冲区),按 Partition 分组攒批,由后台 Sender 线程批量发送到 Broker。这种设计用延迟换吞吐——单条消息不立即发送,而是等攒够一批再发,极大减少了网络请求次数。
Consumer
Kafka Consumer 是纯拉取模式(Pull),消费者主动从 Broker 拉取消息,Broker 不做推送。Consumer 通过 poll() 方法拉取,每次拉取返回一批消息(默认最多 500 条,max.poll.records)。Consumer Group 内的消费者共同分担 Topic 的 Partition,每个 Partition 只被组内一个消费者消费。
二、Partition 与 Replica——Kafka 最核心的抽象
2.1 Partition:并行度的最小单元
一个 Topic 可以有多个 Partition,每个 Partition 是一个有序、不可变的消息序列。Partition 是 Kafka 并行度的最小单元:生产者可以并行向不同 Partition 写入,消费者可以并行从不同 Partition 读取。
Topic: "order-events" (3 Partitions)
Partition-0: [msg0] [msg3] [msg6] [msg9] ... (Leader: Broker-0)
Partition-1: [msg1] [msg4] [msg7] [msg10] ... (Leader: Broker-1)
Partition-2: [msg2] [msg5] [msg8] [msg11] ... (Leader: Broker-2)
Partition 数量的选择:太少则并行度不够(消费者数 > Partition 数时,部分消费者空闲),太多则文件描述符开销大、Rebalance 慢、Leader 选举慢。经验公式:Partition 数 = max(目标吞吐 / 单 Partition 吞吐, 消费者数量)。单 Partition 的写入吞吐约 10-50 MB/s(取决于消息大小和磁盘性能),一个消费者单线程处理一个 Partition 的消费吞吐约 5-20 MB/s。
2.2 Replica:高可用的基石
每个 Partition 可以配置多个副本(replication.factor,推荐 3)。副本分为 Leader 和 Follower:Leader 处理所有读写请求,Follower 只从 Leader 同步数据(不处理客户端请求,Kafka 2.4+ 支持 Follower Read)。
ISR(In-Sync Replicas):与 Leader 保持同步的副本集合。"同步"的标准是:Follower 在 replica.lag.time.max.ms(默认 30 秒)内持续向 Leader 发送 Fetch 请求并消费消息。如果 Follower 落后太多(超过 30 秒没有 Fetch),会被从 ISR 中移除。
Partition-0:
Leader: Broker-0 (ISR)
Follower: Broker-1 (ISR) ← 同步正常
Follower: Broker-2 (NOT in ISR) ← 落后太多,被移出 ISR
AR(Assigned Replicas):Partition 的所有副本列表,无论是否同步。ISR ⊆ AR。
OSR(Out-of-Sync Replicas):AR - ISR,即落后的副本。
ISR 的核心意义:当 min.insync.replicas=2(最少同步副本数)且 acks=all 时,Leader 必须确保消息被 ISR 中至少 2 个副本写入才返回 ACK。如果 ISR 中的副本数 < min.insync.replicas,Leader 会拒绝写入请求(抛出 NotEnoughReplicasException),保证数据安全性。
2.3 Leader 选举
当 Leader 宕机时,Controller 从 ISR 中选举新的 Leader(优先选 ISR 中的第一个副本)。如果 ISR 为空(所有副本都落后了),行为取决于 unclean.leader.election.enable:
false(默认):Partition 不可用,直到原 Leader 恢复。数据最安全,但可用性低。true:允许从非 ISR 副本中选举 Leader(可能丢失数据)。可用性高,但数据不安全。
生产环境建议保持 false,通过监控 ISR 收缩事件及时告警,而不是靠牺牲数据安全性来保证可用性。
三、消息存储——Kafka 高吞吐的秘密
3.1 LogSegment:日志的物理组织
Kafka 的每个 Partition 在磁盘上由一个 Log 对象管理,Log 由多个 LogSegment 组成。每个 LogSegment 包含三个文件:
/data/kafka-logs/order-events-0/
├── 00000000000000000000.log # 消息数据(LogSegment)
├── 00000000000000000000.index # 偏移量索引(Offset Index)
├── 00000000000000000000.timeindex # 时间戳索引(Time Index)
├── 00000000000000368769.log # 下一个 Segment
├── 00000000000000368769.index
├── 00000000000000368769.timeindex
└── leader-epoch-checkpoint # Leader Epoch 检查点
.log 文件:消息的实际存储文件,默认 1GB(log.segment.bytes),写满后新建下一个 Segment(文件名是起始 offset,左补零到 20 位)。消息在文件内是顺序追加的,与 RocketMQ 的 CommitLog 类似。
.index 文件:稀疏偏移量索引(Sparse Offset Index),不是每条消息都有索引条目,而是每隔一定字节间隔(默认 4KB,log.index.interval.bytes)记录一条映射:相对偏移量(4B) + 文件内位置(4B) = 8 字节/条。因为稀疏,所以索引文件很小(一个 1GB 的 log 文件对应的 index 文件约 2MB),可以全部 mmap 到内存。
.timeindex 文件:时间戳索引,格式为 时间戳(8B) + 偏移量(4B) = 12 字节/条,用于按时间查找消息(如消费端要从某个时间点开始消费)。
3.2 消息查找流程
消费者要从 offset=368900 开始读取消息:
Step 1: 定位 Segment
二分查找文件名 → 找到 00000000000000368769.log(起始 offset 为 368769)
Step 2: 在 Index 文件中查找
mmap 加载 .index 文件 → 二分查找 ≤ 368900 的最大索引条目
得到:相对偏移量 131,文件位置 52480
含义:offset 368769+131=368900 的消息,在 .log 文件的 52480 字节附近
Step 3: 从 .log 文件扫描
seek 到 52480 位置 → 顺序扫描,找到 offset=368900 的消息 → 返回
关键点:索引是稀疏的,所以不能直接定位到精确位置,只能定位到"附近",然后顺序扫描。但由于 Segment 内的消息是有序的,扫描距离通常很短(最多几千字节),性能影响极小。
3.3 顺序写入与 Page Cache
Kafka 的写入性能之所以高,核心在于两个机制:
顺序追加写:生产者消息到达后,直接追加到当前活跃 LogSegment 的末尾。顺序写磁盘的速度接近内存写入(HDD 约 600MB/s,SSD 更高),远高于随机写。Kafka 不做复杂的内存数据结构操作,就是"往文件末尾 append"。
利用操作系统 Page Cache:Kafka 不自己管理内存缓存,而是直接依赖操作系统的 Page Cache。写入时数据先进 Page Cache(内存),由操作系统的 pdflush 线程异步刷到磁盘。读取时如果数据在 Page Cache 中就直接返回,不需要磁盘 I/O。这种设计的优势:① JVM 不需要做 GC(数据在 JVM 堆外);② 操作系统对 Page Cache 的预读和刷盘策略经过了数十年的优化;③ Broker 重启后 Page Cache 仍然有效(由操作系统管理)。
写入流程(极简):
Producer → Broker → 追加写入 Page Cache (内存) → 立即返回 ACK
↓ (异步)
OS pdflush → 刷到磁盘
读取流程(热点数据):
Consumer → Broker → 从 Page Cache 直接返回 (零拷贝) → NIC
3.4 零拷贝:sendfile / transferTo
消费者拉取消息时,Kafka 使用 Java NIO 的 FileChannel.transferTo() 方法(底层是 Linux 的 sendfile 系统调用),数据直接从文件描述符传输到 Socket 缓冲区,不经过用户空间。
传统 read/write (4 次拷贝 + 4 次切换):
Disk → Kernel Buffer → User Buffer → Socket Buffer → NIC
(DMA) (copy) (copy) (DMA)
内核→用户 用户→内核
sendfile (2 次拷贝 + 2 次切换):
Disk → Kernel Buffer → NIC
(DMA) (DMA gather copy)
如果网卡支持 SG-DMA(Scatter-Gather DMA),甚至可以直接从 Kernel Buffer 传到网卡,
只有 1 次 DMA 拷贝。
零拷贝的前提:消息在传输过程中不需要被修改(Kafka 的消息是不可变的,天然满足)。如果需要对消息做转换(如压缩/解压),就不能用零拷贝,必须经过用户空间。
3.5 消息格式(Record Batch)
Kafka 0.11+ 使用 v2 消息格式(Magic=2),以 Record Batch 为单位存储:
┌──────────────── RecordBatch Header (61 bytes) ────────────────┐
│ baseOffset(8B) │ batchLength(4B) │ partitionLeaderEpoch(4B) │
│ magic(1B) │ crc(4B) │ attributes(2B) │
│ lastOffsetDelta(4B) │ baseTimestamp(8B) │ maxTimestamp(8B) │
│ producerId(8B) │ producerEpoch(2B) │ baseSequence(4B) │
│ recordsCount(4B) │
└───────────────────────────────────────────────────────────────┘
┌──────── Record 0 ────────┐
│ length(varint) │ attributes(1B) │ timestampDelta(varlong) │
│ offsetDelta(varint) │ keyLength(varint) │ key(bytes) │
│ valueLength(varint) │ value(bytes) │ headersCount(varint) │
│ headers... │
└──────────────────────────┘
┌──────── Record 1 ────────┐
│ ... │
└──────────────────────────┘
v2 格式的关键改进:① 使用 varint 变长编码(小数值占更少字节),比 v1 节省约 25% 空间。② 以 Batch 为单位压缩(而非单条消息压缩),压缩率更高。③ 引入了 producerId + producerEpoch + baseSequence,支持 Exactly-Once 语义的去重。
四、生产者深度机制
4.1 发送流程全链路
Producer.send(ProducerRecord)
│
▼
Interceptor.onSend() ← 拦截器链(可修改消息)
│
▼
Serializer.serialize() ← Key/Value 序列化
│
▼
Partitioner.partition() ← 分区选择
│
▼
RecordAccumulator.append() ← 写入本地缓冲区(按 Partition 分组的 Deque<RecordBatch>)
│
▼
Sender Thread (后台) ← 从 Accumulator 中取出攒好的 Batch
│
├── 按 Broker 分组 Batch
├── 构建 ProduceRequest(每个 Broker 一个请求,包含多个 Partition 的 Batch)
├── 通过 NetworkClient 发送 (Netty / NIO)
└── 收到 Response → 执行 Callback
RecordAccumulator 是 Producer 的核心组件,它的作用是将零散的单条消息攒成 Batch,减少网络请求次数。关键参数:
batch.size(默认 16KB):每个 Batch 的最大字节数,满了就发送linger.ms(默认 0):即使 Batch 没满,等待指定毫秒后也发送(用延迟换吞吐)buffer.memory(默认 32MB):Accumulator 的总内存上限,超过后send()阻塞或抛异常
生产环境推荐配置:batch.size=65536(64KB),linger.ms=5-50(根据延迟容忍度),buffer.memory=67108864(64MB)。
4.2 分区策略(Partitioner)
Kafka 默认的分区策略(DefaultPartitioner,3.x+ 为 UniformStickyPartitioner):
- 有 Key 的消息:对 Key 做 Murmur2 Hash,取模 Partition 数量(
hash(key) % numPartitions)。同一 Key 的消息始终路由到同一 Partition,保证 Key 级别的有序性。 - 无 Key 的消息:Sticky 策略——随机选一个 Partition 并持续发送到该 Partition,直到 Batch 满或
linger.ms到期,然后切换到另一个 Partition。比纯粹的轮询更高效(减少 Batch 碎片)。
自定义分区器:实现 Partitioner 接口,适合按业务规则路由(如按地域、按用户 ID 范围)。
public class RegionPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
String region = extractRegion(key.toString());
if ("cn-east".equals(region)) return 0;
if ("cn-north".equals(region)) return 1;
return 2;
}
}
4.3 ACK 机制与数据可靠性
acks 参数控制生产者发送消息后需要等待多少个副本确认:
acks=0:不等待任何确认,发送后立即返回。吞吐最高,但消息可能丢失(Broker 宕机、网络中断)。适合日志采集等允许少量丢失的场景。acks=1:等待 Leader 写入成功即返回(不等 Follower 同步)。如果 Leader 在 Follower 同步前宕机,消息丢失。适合大多数业务场景。acks=all(或acks=-1):等待 ISR 中所有副本都写入成功才返回。最安全,配合min.insync.replicas=2可保证至少 2 个副本有数据。适合金融、支付等场景。
生产环境的推荐组合:acks=all + min.insync.replicas=2 + replication.factor=3。这保证了即使一个 Broker 宕机,数据仍然有 2 个副本,且写入时至少有 2 个副本确认。
4.4 Producer 幂等与 Exactly-Once
幂等性(Idempotence):enable.idempotence=true(3.x 默认开启)。Broker 为每个 Producer(producerId + producerEpoch)维护一个 sequence number,如果收到的消息 sequence number 不大于已处理的,说明是重复消息,直接丢弃。幂等性保证的是单 Partition 内的 Exactly-Once(单 Producer 单 Partition 不重复)。
事务(Transactions):transactional.id 设置后,Producer 可以将多个 Partition 的写入包装在一个事务中——要么所有 Partition 都写入成功,要么都回滚。消费者设置 isolation.level=read_committed 后只能看到已提交事务的消息。
Properties props = new Properties();
props.put("transactional.id", "order-tx-001");
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", key1, value1));
producer.send(new ProducerRecord<>("inventory", key2, value2));
// 发送消费 offset(用于 consume-transform-produce 场景)
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
事务消息的典型场景:consume-transform-produce 管道(读一个 Topic 的消息,处理后写到另一个 Topic,保证 Exactly-Once)。
五、消费者深度机制
5.1 Consumer Group 与 Partition 分配
Consumer Group 是 Kafka 消费者的组织单位。同一 Group 内的消费者共同分担 Topic 的所有 Partition,每个 Partition 只能被组内一个消费者消费。不同 Group 之间互不影响(同一个 Partition 可以被不同 Group 各消费一次)。
Topic: orders (4 Partitions)
Group-A: Consumer-0, Consumer-1, Consumer-2
Group-B: Consumer-3
分配结果:
Group-A:
Consumer-0: Partition-0, Partition-1
Consumer-1: Partition-2
Consumer-2: Partition-3
Group-B:
Consumer-3: Partition-0, Partition-1, Partition-2, Partition-3 (全量)
5.2 Rebalance 协议详解
Rebalance 是 Consumer Group 中 Partition 重新分配的过程,发生在以下情况:消费者加入/离开 Group、Topic 的 Partition 数量变化、消费者订阅的 Topic 列表变化。
Rebalance 的三代协议:
Eager Rebalance(经典)——所有消费者同时放弃所有 Partition,然后重新分配。问题是"Stop-The-World"效应:Rebalance 期间所有消费者停止消费,如果 Partition 很多或消费者很多,Rebalance 时间可能达到分钟级。
Eager Rebalance 流程:
1. 所有 Consumer 发送 LeaveGroup → 放弃所有 Partition
2. GroupCoordinator 收到所有 Leave → 清除所有分配
3. 所有 Consumer 重新发送 JoinGroup
4. Coordinator 选举 Leader Consumer → Leader 执行分配算法
5. Leader 将分配方案发给 Coordinator → Coordinator 分发给所有 Consumer
6. 所有 Consumer 确认新分配 → Rebalance 完成
Cooperative Rebalance(增量,2.4+)——不再一次性放弃所有 Partition,而是只调整需要变化的部分。消费者继续处理不需要移动的 Partition,极大减少了 Rebalance 对消费的影响。
Cooperative Rebalance 流程:
1. Consumer 发送 JoinGroup(不放弃现有 Partition)
2. Coordinator 计算新分配 vs 旧分配的差异
3. 只 revoke 需要移动的 Partition(发 revoke 通知)
4. 等待被 revoke 的 Partition 提交 offset
5. 将 revoke 的 Partition 分配给新消费者
6. 其他 Partition 的消费不中断
Static Membership(静态成员,2.3+)——给消费者设置 group.instance.id,消费者重启后(在 session.timeout.ms 内)不会触发 Rebalance,Coordinator 直接恢复之前的 Partition 分配。适合消费者需要频繁重启(如部署更新)的场景。
生产环境推荐:使用 Cooperative Rebalance(partition.assignment.strategy=CooperativeStickyAssignor),配合 Static Membership 减少不必要的 Rebalance。
5.3 Offset 管理
Offset 的存储位置:消费者提交的 offset 存储在内部 Topic __consumer_offsets 中(50 个 Partition,Key 为 groupId + topic + partition 的 hash)。GroupCoordinator 负责管理这个 Topic 的读写。
自动提交 vs 手动提交:
自动提交(enable.auto.commit=true,默认):消费者每隔 auto.commit.interval.ms(默认 5 秒)自动提交当前 poll 返回的最大 offset。问题:如果在提交前消费的消息处理失败了,offset 已经提交,消息就丢了。
手动提交(enable.auto.commit=false):消费者处理完消息后调用 commitSync() 或 commitAsync() 提交 offset。更安全,但需要自己管理提交时机。
// 手动提交示例(推荐)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // 业务处理
}
// 处理完所有消息后提交 offset
consumer.commitSync(); // 同步提交,阻塞直到成功
// 或 consumer.commitAsync(); // 异步提交,不阻塞
}
max.poll.interval.ms(默认 5 分钟):两次 poll() 之间的最大间隔。如果消费者处理消息太慢,超过这个时间没有调用 poll(),Coordinator 会认为消费者挂了,触发 Rebalance。这个参数需要根据最慢的消息处理时间来设置——如果单条消息处理需要 30 秒,max.poll.records=100,则 max.poll.interval.ms 至少设为 30 * 100 * 1.5 = 4500000(75 分钟)。
5.4 消费者心跳与故障检测
消费者通过后台心跳线程定期向 Coordinator 发送心跳(间隔 heartbeat.interval.ms,默认 3 秒)。如果 Coordinator 在 session.timeout.ms(默认 45 秒)内没有收到心跳,就认为消费者挂了,触发 Rebalance。
Consumer 主线程: poll() → process records → commit offset → poll() → ...
Consumer 心跳线程: heartbeat → sleep(3s) → heartbeat → sleep(3s) → ...
关键:心跳线程和消费线程是独立的。即使消费线程卡住了(处理慢),心跳线程仍在正常发送,不会触发 Rebalance。只有当消费者进程整体挂掉(心跳线程也停了)或 max.poll.interval.ms 超时才会触发 Rebalance。
六、数据可靠性与一致性
6.1 副本同步机制
Follower 通过 ReplicaFetcherThread 从 Leader 拉取消息(和消费者拉取是同一套协议),写入本地 LogSegment。Leader 维护每个 Follower 的 Log End Offset(LEO),即 Follower 已写入的最新 offset。
HW(High Watermark):ISR 中所有副本 LEO 的最小值。只有 offset < HW 的消息才对消费者可见。HW 保证了消费者永远不会读到可能被丢失的消息(即使 Leader 宕机,HW 之前的消息至少有 ISR 中的所有副本)。
Leader LEO: 100
Follower-1 LEO: 98
Follower-2 LEO: 95
HW = min(100, 98, 95) = 95
→ offset 0-94 的消息对消费者可见
→ offset 95-99 的消息已写入 Leader 但尚未同步到所有 ISR 副本,消费者看不到
Leader Epoch(2.0+):每次 Leader 选举,Epoch 加 1。Follower 恢复时通过 Epoch 判断数据一致性:如果 Follower 的 Epoch < Leader 的 Epoch,说明 Leader 换过了,Follower 需要从 Leader 拉取 Epoch 对应的起始 offset 开始重新同步。Leader Epoch 解决了之前 HW 在极端场景下的数据截断问题。
6.2 消息丢失的三个环节及防范
生产者 → Broker:消息在网络传输中丢失,或 Broker 收到后写入前宕机。防范:acks=all + retries=Integer.MAX_VALUE + delivery.timeout.ms=120000(2 分钟)。
Broker 存储:消息写入 Page Cache 但未刷盘时 Broker 宕机。防范:replication.factor=3 + min.insync.replicas=2(至少 2 个副本写入成功才算成功)。Kafka 默认不依赖同步刷盘(没有 SYNC_FLUSH 概念),而是靠多副本保证可靠性——单个 Broker 宕机,其他副本有数据。
Broker → 消费者:消费者处理完消息但在提交 offset 前崩溃,重启后从上次提交的 offset 开始消费,导致消息重复(不是丢失,但语义上是 At-Least-Once)。防范:手动提交 offset(enable.auto.commit=false),确认业务处理成功后再提交;消费端做幂等。
6.3 Exactly-Once 语义的实现
Kafka 的 Exactly-Once 需要 Producer 事务 + Consumer 隔离级别配合:
Producer:
transactional.id = "my-app-tx"
enable.idempotence = true
→ 保证单 Producer 的消息不重复写入
Consumer:
isolation.level = read_committed
→ 只读取已提交事务的消息(未提交的、已回滚的不可见)
Consume-Transform-Produce:
producer.beginTransaction();
records = consumer.poll();
for (record : records) {
transformed = transform(record);
producer.send(new ProducerRecord<>("output-topic", transformed));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
这套机制保证了:输入消息的消费和输出消息的写入是原子的——要么都成功,要么都回滚。典型应用:Kafka Streams 的 Exactly-Once 处理模式。
七、Kafka Streams 与 KSQL 简述
7.1 Kafka Streams 架构
Kafka Streams 是 Kafka 官方的流处理库(不是独立服务,嵌入在 Java 应用中)。核心概念:
- Stream(KStream):无界的事件流,每条记录是独立的插入事件(INSERT)
- Table(KTable):变更日志流,每条记录是 Key 的更新事件(UPSERT)
- GlobalKTable:全量复制到每个应用实例的 Table(用于 Join 操作,避免 Shuffle)
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orders = builder.stream("orders");
// 过滤 + 转换
orders.filter((key, value) -> value.contains("VIP"))
.mapValues(value -> enrichOrder(value))
.to("vip-orders");
// 聚合:按用户统计订单金额
orders.groupByKey()
.aggregate(() -> 0.0,
(key, value, total) -> total + extractAmount(value),
Materialized.as("order-totals"));
Kafka Streams 的状态存储使用 RocksDB(本地持久化),通过 Changelog Topic 实现容错。如果应用实例宕机,其他实例可以从 Changelog Topic 恢复 RocksDB 状态。
7.2 KSQL / ksqlDB
ksqlDB 是 Kafka 之上的流处理 SQL 引擎(Confluent 开源),可以用 SQL 语法做实时流处理:
-- 创建 Stream
CREATE STREAM orders (orderId VARCHAR, userId VARCHAR, amount DOUBLE)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON');
-- 实时聚合:每分钟的用户订单总额
CREATE TABLE user_order_totals AS
SELECT userId, SUM(amount) as total
FROM orders
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY userId;
-- 实时 Join:订单 + 用户信息
CREATE STREAM enriched_orders AS
SELECT o.orderId, o.amount, u.name, u.level
FROM orders o
LEFT JOIN users u ON o.userId = u.id;
八、高可用与生产部署
8.1 部署拓扑
┌─────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Broker-0 │ │Broker-1 │ │Broker-2 │ │
│ │Controller│ │ │ │ │ │
│ │Leader: │ │Leader: │ │Leader: │ │
│ │ p0,p3 │ │ p1,p4 │ │ p2,p5 │ │
│ │Follower: │ │Follower: │ │Follower: │ │
│ │ p1,p2 │ │ p2,p0 │ │ p0,p1 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ KRaft Quorum (Controller + Broker 共用进程): │
│ ┌────┐ ┌────┐ ┌────┐ │
│ │ B0 │ │ B1 │ │ B2 │ ← 三个节点组成 Raft 集群 │
│ └────┘ └────┘ └────┘ │
└─────────────────────────────────────────────────────────┘
KRaft 模式下,Controller 和 Broker 可以共用进程(Combined Mode),也可以独立部署(Separate Mode)。中小规模(< 50 个 Broker)推荐 Combined Mode 简化运维;大规模推荐 Separate Mode 隔离元数据管理与数据服务的资源竞争。
8.2 核心配置调优
Broker 端(server.properties):
# 日志存储
log.dirs=/data1/kafka-logs,/data2/kafka-logs # 多磁盘目录,Partition 均匀分布
log.segment.bytes=1073741824 # 1GB Segment
log.retention.hours=168 # 保留 7 天
log.retention.bytes=-1 # 不按大小清理(按时间)
log.cleanup.policy=delete # 清理策略(delete / compact)
log.index.interval.bytes=4096 # 索引间隔 4KB
# 副本与可靠性
default.replication.factor=3 # 默认 3 副本
min.insync.replicas=2 # 最少 2 个同步副本
unclean.leader.election.enable=false # 禁止非 ISR 选举
num.replica.fetchers=4 # 副本拉取线程数
# 网络与线程
num.network.threads=8 # 网络线程数(Processor 线程)
num.io.threads=16 # I/O 线程数(RequestHandler 线程)
socket.send.buffer.bytes=1048576 # Socket 发送缓冲 (1MB)
socket.receive.buffer.bytes=1048576 # Socket 接收缓冲 (1MB)
socket.request.max.bytes=104857600 # 最大请求大小 (100MB)
# 消费者相关
group.initial.rebalance.delay.ms=3000 # 等待 3 秒让更多消费者加入再 Rebalance
offsets.topic.replication.factor=3 # __consumer_offsets 的副本数
transaction.state.log.replication.factor=3 # 事务日志的副本数
transaction.state.log.min.isr=2 # 事务日志的最小 ISR
Producer 端:
acks=all
retries=2147483647 # Integer.MAX_VALUE
delivery.timeout.ms=120000 # 2 分钟
enable.idempotence=true
batch.size=65536 # 64KB
linger.ms=10 # 攒批 10ms
buffer.memory=67108864 # 64MB
compression.type=lz4 # 推荐 lz4(速度/压缩率平衡最好)
max.in.flight.requests.per.connection=5 # 幂等模式下最大 5
Consumer 端:
enable.auto.commit=false
max.poll.records=500 # 单次 poll 最大记录数
max.poll.interval.ms=300000 # 5 分钟
session.timeout.ms=45000 # 45 秒
heartbeat.interval.ms=3000 # 3 秒
fetch.min.bytes=1 # 最小拉取字节
fetch.max.wait.ms=500 # 最大等待时间
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
auto.offset.reset=latest # 首次消费从最新位置开始
8.3 JVM 参数建议
# Kafka Broker(推荐 JDK 17+,ZGC 或 G1GC)
-Xms12g -Xmx12g # 堆内存 12G(不超过 32G,避免压缩指针失效)
-XX:+UseZGC # ZGC(JDK 17+ 推荐,亚毫秒停顿)
# 或 -XX:+UseG1GC -XX:MaxGCPauseMillis=20(G1 方案)
-XX:+ParallelRefProcEnabled
-XX:MaxInlineLevel=15 # 方法内联深度
-XX:+UnlockDiagnosticVMOptions
-XX:G1SummarizeRSetStatsPeriod=1 # G1 RSet 统计
-Xlog:gc*:file=/var/log/kafka/gc.log:time,uptime,level,tags:filecount=5,filesize=50m
-XX:+ExitOnOutOfMemoryError # OOM 时退出(让 K8s/systemd 重启)
# 注意:Kafka 主要依赖 Page Cache(堆外),JVM 堆不需要特别大
# 32G 以内的机器:12G 堆 + 20G Page Cache 是较优的分配
8.4 操作系统调优
# 文件描述符
ulimit -n 1000000
# 内存
vm.swappiness=1 # 几乎不用 swap(Kafka 严重依赖 Page Cache)
vm.dirty_ratio=60 # 脏页占 60% 时强制刷盘
vm.dirty_background_ratio=5 # 脏页占 5% 时后台线程开始刷盘
# 网络
net.core.wmem_default=1048576 # 默认发送缓冲 1MB
net.core.rmem_default=1048576 # 默认接收缓冲 1MB
net.core.wmem_max=2097152 # 最大发送缓冲 2MB
net.core.rmem_max=2097152 # 最大接收缓冲 2MB
net.ipv4.tcp_wmem="1048576 2097152 4194304"
net.ipv4.tcp_rmem="1048576 2097152 4194304"
net.core.somaxconn=65535
net.ipv4.tcp_max_syn_backlog=65535
# 磁盘调度(SSD 用 none/noop,HDD 用 mq-deadline)
echo none > /sys/block/sda/queue/scheduler
# 文件系统(推荐 XFS,ext4 也可以,不要用 ZFS/Btrfs 用于数据目录)
8.5 监控告警关键指标
| 指标 | 告警阈值 | 含义 |
|---|---|---|
UnderReplicatedPartitions |
> 0 | 有副本不在 ISR 中,可能有 Broker 故障 |
OfflinePartitionsCount |
> 0 | 有 Partition 无 Leader,该 Partition 不可用 |
ActiveControllerCount |
≠ 1 | Controller 数量异常(脑裂) |
IsrShrinksPerSec |
> 0 | ISR 收缩,Follower 落后 |
RequestQueueSize |
> 100 | 请求队列堆积,Broker 处理能力不足 |
NetworkProcessorAvgIdlePercent |
< 0.3 | 网络线程繁忙,需增加 num.network.threads |
LogFlushRateAndTimeMs |
Flush 耗时 > 100ms | 磁盘 I/O 瓶颈 |
ConsumerLag(per group) |
> 10000 | 消费堆积 |
RequestHandlerAvgIdlePercent |
< 0.3 | I/O 线程繁忙,需增加 num.io.threads |
推荐监控栈:jmx_exporter(Kafka 自带 JMX 指标)+ Prometheus + Grafana。Confluent 提供的 kafka-lag-exporter 可以方便地监控所有 Consumer Group 的 Lag。
九、Log Compaction——Kafka 独有的存储特性
Log Compaction(日志压缩)是 Kafka 区别于大多数消息队列的独特功能。它保证每个 Key 至少保留最新的一条消息,旧值会被后台清理。
压缩前:
Key=A, offset=0, value="v1"
Key=B, offset=1, value="v1"
Key=A, offset=2, value="v2"
Key=C, offset=3, value="v1"
Key=A, offset=4, value="v3"
压缩后:
Key=B, offset=1, value="v1"
Key=C, offset=3, value="v1"
Key=A, offset=4, value="v3" ← 只保留 Key=A 的最新值
配置:log.cleanup.policy=compact(或 compact,delete 组合使用)。
典型应用场景:① CDC(Change Data Capture):数据库变更流经过 Compaction 后变成最新快照。② 物化视图:KTable 的 Changelog Topic 使用 Compaction,保证每个 Key 只保留最新状态。③ 配置推送:应用的配置变更写入 Kafka,Compaction 保证消费者总是读到最新配置。
十、面试深度题 & 参考答案
基础概念层(初级)
Q1:Kafka 为什么吞吐这么高?
五个核心设计:① 顺序写磁盘——每个 Partition 的 LogSegment 是顺序追加写入,HDD 顺序写约 600MB/s,接近内存速度。② 零拷贝(sendfile/transferTo)——消费拉取时数据直接从文件到 Socket,不经过用户空间,减少 2 次拷贝和 2 次上下文切换。③ 批量发送——Producer 的 RecordAccumulator 将多条消息攒成 Batch 后一次性发送,减少网络请求次数。④ 压缩——Batch 级别压缩(lz4/snappy/zstd),减少网络传输量和磁盘占用。⑤ 利用操作系统 Page Cache——Kafka 不自己管理内存缓存,直接依赖 OS 的 Page Cache,避免 JVM GC 开销,且 Broker 重启后 Cache 仍然有效。
Q2:Kafka 的 Partition 和 RocketMQ 的 MessageQueue 有什么区别?
两者都是 Topic 下的分区,是最小并行单元,但有几个关键区别。存储方式:Kafka 每个 Partition 独立一个文件(LogSegment),Partition 多时随机写性能下降;RocketMQ 所有 Topic 的所有 Queue 共享 CommitLog 顺序写,写入性能不受 Topic/Queue 数量影响。副本机制:Kafka 原生支持 Partition 级别的 Leader-Follower 复制,ISR 机制精细到每个 Partition;RocketMQ 的复制是 Broker 级别的(Master-Slave),整个 Broker 的数据一起复制。消费模型:Kafka 一个 Partition 只能被同一 Group 内的一个消费者消费(强绑定);RocketMQ 的 MessageQueue 在 Rebalance 后也类似,但支持更灵活的消费模式(如广播模式每个实例都收全量)。
Q3:Kafka 如何保证消息不丢失?
三端保障:Producer 端使用 acks=all + retries=MAX_VALUE + enable.idempotence=true,确保消息至少被 ISR 中所有副本写入。Broker 端使用 replication.factor=3 + min.insync.replicas=2 + unclean.leader.election.enable=false,保证至少 2 个副本有数据且不允许非 ISR 选举(可能丢数据)。Consumer 端使用 enable.auto.commit=false,手动在业务处理成功后再提交 offset,消费端做幂等处理。
最容易丢消息的环节:acks=1 时 Leader 写入后还没同步给 Follower 就宕机了;消费者使用自动提交 offset,消费处理到一半崩溃,重启后从上次提交的 offset 开始消费,跳过了部分消息。
原理深度层(中级)
Q4:什么是 ISR?ISR 为空时会发生什么?
ISR(In-Sync Replicas)是与 Leader 保持同步的副本集合。"同步"的判断标准:Follower 在 replica.lag.time.max.ms(默认 30 秒)内有 Fetch 请求成功拉取了数据。如果 Follower 因网络延迟、GC 停顿、磁盘故障等原因落后 Leader 超过 30 秒,就会被移出 ISR。
ISR 为空意味着所有 Follower 都落后了。此时如果 Leader 宕机:unclean.leader.election.enable=false(默认)时,Partition 不可用,直到原 Leader 恢复或某个 Follower 追上来。unclean.leader.election.enable=true 时,从非 ISR 副本中选 Leader,Partition 可用但可能丢失落后部分的消息。
生产环境通常保持 false,同时监控 IsrShrinksPerSec 指标,ISR 收缩时立即告警排查。
Q5:Kafka 的 Rebalance 机制是什么?为什么 Rebalance 是"万恶之源"?
Rebalance 是 Consumer Group 中 Partition 重新分配的过程。之所以被称为"万恶之源",是因为 Eager Rebalance 模式下所有消费者必须同时放弃所有 Partition(Stop-The-World),期间所有消费停止。如果 Group 有 100 个消费者、1000 个 Partition,Rebalance 可能持续数分钟甚至更长,期间消息堆积急剧增长。
触发 Rebalance 的常见原因:① 消费者加入或退出(包括因心跳超时被踢出)。② max.poll.interval.ms 超时(消费太慢,被 Coordinator 认为挂了)。③ Topic 的 Partition 数量变化。④ 消费者订阅的 Topic 列表变化。
减少不必要 Rebalance 的方案:① 使用 Cooperative Rebalance(增量式,不中断不需要移动的 Partition)。② 使用 Static Membership(group.instance.id),消费者重启不触发 Rebalance。③ 合理设置 session.timeout.ms(心跳超时)和 max.poll.interval.ms(处理超时),避免误判。④ 增加 group.initial.rebalance.delay.ms,等待更多消费者加入后再 Rebalance(适合批量启动场景)。
Q6:HW(High Watermark)和 LEO(Log End Offset)的区别是什么?HW 解决了什么问题?
LEO 是每个副本已写入的最新 offset + 1(下一条消息的 offset)。HW 是 ISR 中所有副本 LEO 的最小值。只有 offset < HW 的消息才对消费者可见。
HW 解决的问题:保证消费者不会读到可能丢失的消息。假设 Leader LEO=100,Follower LEO=95(HW=95),如果此时 Leader 宕机,Follower 被选为新 Leader,那么 offset 95-99 的消息在原 Leader 上但不在新 Leader 上,这些消息会丢失。HW=95 保证了消费者最多只读到 offset 94,不会读到可能丢失的 95-99。
HW 的更新时机:Leader 在收到 Follower 的 Fetch 请求时,根据 Follower 报告的 LEO 更新 HW,并将新的 HW 返回给 Follower(Follower 也更新自己的 HW)。
Q7:Kafka 为什么用 ZooKeeper?KRaft 解决了什么问题?
Kafka 早期用 ZooKeeper 做三件事:① 存储集群元数据(Broker 列表、Topic 配置、Partition 分配)。② Controller 选举(通过 ZK 临时节点竞争)。③ 故障检测(依赖 ZK Session 超时)。
ZooKeeper 的问题:① 元数据更新延迟高(ZAB 协议,几十毫秒),Partition 数量多时 ZK 压力大(每个 Partition 是一个 ZNode,20 万个以上性能急剧下降)。② Controller 故障转移慢(依赖 ZK Session 超时,通常 6-18 秒)。③ 运维复杂度高(需要独立维护 ZK 集群)。
KRaft 用内置的 Raft 协议替代 ZK:① 元数据存储在 __cluster_metadata Topic 中,利用 Kafka 自身高效的日志存储引擎,支持百万级 Partition。② Controller 选举基于 Raft,毫秒级故障转移。③ 不再需要维护外部 ZK 集群,部署和运维大幅简化。
Q8:Kafka 的 Log Compaction 是什么?和 delete 策略有什么区别?
Log Compaction(log.cleanup.policy=compact)保证每个 Key 至少保留最新的一条消息,后台的 LogCleaner 线程定期扫描并删除旧版本。Delete 策略(log.cleanup.policy=delete)按时间或大小清理过期的 LogSegment,不管 Key 是否重复。
两者可以组合使用(compact,delete):先做 Compaction 保留每个 Key 的最新值,再对超过保留时间的数据做 Delete。
Compaction 的典型应用:CDC 场景(数据库 binlog 变更流经过 Compaction 变成最新快照),KTable 的 Changelog Topic(保证每个 Key 的最新状态可用)。
生产实战层(高级)
Q9:线上 Consumer Lag 持续增长,你怎么排查和处理?
排查步骤:① 确认 Lag 发生在哪个 Topic 的哪个 Partition(用 kafka-consumer-groups.sh --describe),如果所有 Partition 的 Lag 均匀增长,说明消费速率整体跟不上;如果只有个别 Partition 增长,可能是数据倾斜(某些 Key 的消息量特别大)或该 Partition 的消费者有问题。② 看消费者的处理耗时(应用日志 + APM 监控),常见原因:慢 SQL、外部 API 超时、单条消息处理逻辑太重。③ 看消费者的 poll 间隔和 max.poll.records,如果 max.poll.records 太大导致处理时间超过 max.poll.interval.ms,会触发 Rebalance,Rebalance 期间消费停止,Lag 继续增长,形成恶性循环。④ 看 Broker 端的 RequestQueueSize 和磁盘 I/O,排除 Broker 瓶颈。
应急处理:① 临时增加消费者实例数(不超过 Partition 数)。② 减小 max.poll.records(如从 500 降到 100),降低单次处理量,避免超时。③ 如果是代码 bug 导致处理卡住,先重启消费者恢复消费,再修复代码。
长期优化:① 异步化消费处理(消费线程只做分发,实际处理交给异步线程池)。② 增加 Partition 数量,提高并行度上限。③ 对热点 Key 做分散处理(加随机后缀,避免数据倾斜)。④ 配置合理的消费线程池和 max.poll.records。
Q10:Kafka 如何实现消息的 Exactly-Once 语义?
Kafka 的 Exactly-Once 分两层:
单 Partition 幂等:enable.idempotence=true,Broker 为每个 producerId + producerEpoch 维护 sequence number,重复的消息(sequence number 不大于已处理的)被自动丢弃。保证单 Producer 对单 Partition 的 Exactly-Once。
跨 Partition 事务:设置 transactional.id,Producer 可以将多个 Partition 的写入包装在一个事务中。消费者设置 isolation.level=read_committed 后只读取已提交事务的消息(未提交的、已回滚的不可见)。事务的实现:Broker 端有一个 __transaction_state Topic 记录事务状态(Prepare / Commit / Abort),事务提交时 Broker 将事务标记为 Commit 并让所有涉及的 Partition 对消费者可见。
完整的 Consume-Transform-Produce Exactly-Once:Producer 在事务中同时写入输出消息和消费者 offset(sendOffsetsToTransaction),保证"消费输入 + 生产输出"是原子的。
Q11:Kafka 集群扩容(加 Broker)后,已有的 Partition 会自动迁移到新 Broker 吗?
不会自动迁移。新增的 Broker 只会被新创建的 Topic 或新增的 Partition 使用。已有的 Partition 仍然在原来的 Broker 上,需要手动执行 Partition Reassignment(kafka-reassign-partitions.sh)将部分 Partition 迁移到新 Broker。
迁移过程:① 生成迁移计划(--generate),指定哪些 Partition 迁移到哪些 Broker。② 执行迁移(--execute),Kafka 开始在新 Broker 上创建副本并从 Leader 同步数据。③ 验证迁移(--verify),确认所有副本同步完成。④ 迁移期间旧副本继续服务,同步完成后 Controller 更新元数据,新副本加入 ISR,旧副本被删除。
注意:迁移过程会产生大量的网络流量(从 Leader 复制数据到新 Follower),建议在低峰期执行,并通过 --throttle 参数限制复制速率(如 --throttle 50000000 限制为 50MB/s)。
Q12:Kafka 的消息积压和 RocketMQ 有什么不同?处理策略有何差异?
Kafka 的消息积压(Consumer Lag)处理与 RocketMQ 有几个关键差异:
Kafka 的消费者和 Partition 是强绑定关系(一个 Partition 只能被同一 Group 内一个消费者消费),所以扩容的上限是 Partition 数量,超过后多余消费者空闲。RocketMQ 的 Rebalance 机制类似,但 Queue 数量通常更灵活(可以通过 mqadmin updateTopic -w 快速增加)。
Kafka 没有内置的重试队列和死信队列,消费失败需要自己实现(通常是将失败消息发到 error Topic,由另一个消费者处理)。RocketMQ 内置了重试 Topic 和死信队列(%RETRY% + %DLQ%),消费失败自动重试。
Kafka 的 Partition 迁移(扩容后均衡数据)需要手动操作 kafka-reassign-partitions。RocketMQ 的 Queue 分布在多个 Broker 上,新增 Broker 后新 Topic 自动均衡,但已有 Topic 也需要手动调整。
处理策略的共同点:先排查消费端瓶颈 → 优化处理逻辑 → 扩容消费者(不超过 Partition/Queue 数)→ 必要时增加 Partition/Queue 数量。
架构设计层(资深)
Q13:设计一个基于 Kafka 的 CDC(Change Data Capture)系统,你怎么做?
CDC 的目标是将数据库的变更(INSERT/UPDATE/DELETE)实时同步到其他系统(如 Elasticsearch、数据仓库、缓存)。
架构设计:① 数据源:MySQL binlog → Debezium(CDC 连接器)→ Kafka Topic(按表分 Topic,Key 为主键,Value 为变更事件 JSON)。② 日志压缩:CDC Topic 配置 log.cleanup.policy=compact,保证每个主键只保留最新状态,消费者重启后可以从 Compaction 后的快照恢复。③ 数据分发:Kafka Connect(JDBC Sink / ES Sink)将 Kafka 中的变更写入目标系统。④ Schema 管理:使用 Schema Registry(Confluent 开源)管理 Avro/Protobuf Schema,保证数据格式兼容性。
关键设计点:① Debezium 的 offset 存储在 Kafka Connect 的 offset Topic 中,支持断点续传。② CDC 消息需要保证表级别的顺序(同一行的变更必须有序),Debezium 默认以主键为 Kafka Key,保证同一行的消息路由到同一 Partition。③ 对于大表的全量初始同步,Debezium 支持 Snapshot 模式(先全量读取再做增量 binlog)。
Q14:如何设计一个支持每秒百万消息的 Kafka 集群?
百万消息/秒的设计需要从多维度考量:
容量规划:假设平均消息大小 1KB,百万消息/秒 ≈ 1GB/s 写入吞吐。单个 Broker(SSD、8 核 32G)的写入吞吐约 200-500 MB/s,需要 3-5 个 Broker。考虑峰值和冗余(N+1),推荐 6 个 Broker。
Topic 与 Partition 设计:按业务拆分 Topic,每个 Topic 的 Partition 数 = 目标吞吐 / 单 Partition 吞吐。假设单 Partition 写入约 20 MB/s,百万消息(1GB/s)需要 50 个 Partition 分布在 6 个 Broker 上。消费者端,每个 Consumer Group 的实例数 = Partition 数(或按比例分配)。
硬件选型:Broker 使用 NVMe SSD(顺序写 3GB/s+),32G 内存(12G JVM 堆 + 20G Page Cache),万兆网卡(10Gbps ≈ 1.25GB/s,跨机房复制需要足够带宽)。
网络:同机房部署(跨机房复制延迟高),Broker 之间使用万兆内网。生产者使用 acks=all 时,跨 Broker 同步的延迟约 0.5-1ms(同机房)。
监控与告警:重点监控 UnderReplicatedPartitions、ConsumerLag、RequestQueueSize、磁盘使用率,配置 Grafana Dashboard 和 PagerDuty 告警。
Q15:Kafka 和 Pulsar 的架构对比,各自适合什么场景?
两者都是分布式消息流平台,但架构设计有根本差异:
存储架构:Kafka 是存算一体(Broker 既做计算又做存储),Partition 和 Broker 绑定;Pulsar 是存算分离(Broker 无状态 + BookKeeper 做存储),Topic 可以跨多个 Bookie 节点。Kafka 扩缩容需要迁移 Partition 数据,Pulsar 扩缩容只需加节点(数据自动均衡)。
多租户:Pulsar 原生支持多租户(Tenant → Namespace → Topic),资源隔离粒度更细;Kafka 需要通过 Quota + 独立集群实现多租户。
跨地域复制:Pulsar 内置 Geo-Replication(配置即可开启);Kafka 需要 MirrorMaker 2 或 Confluent Replicator。
适用场景:Kafka 生态更成熟(Confluent 商业支持、ksqlDB、Schema Registry、丰富的 Connector),适合已有大数据生态(Spark/Flink/Hadoop)的团队,以及日志、流计算、CDC 场景。Pulsar 适合对多租户、跨地域、弹性扩缩容有强需求的场景(如 SaaS 平台、全球化部署)。
性能对比:同等硬件下 Kafka 的吞吐更高(存算一体减少了网络跳转),Pulsar 的延迟更稳定(存算分离避免了 Broker 间的 I/O 竞争)。
十一、源码阅读指南
11.1 推荐阅读顺序
第 1 周:理解消息写入全链路
入口:KafkaProducer.send() → Sender.run()
重点:RecordAccumulator 攒批 → Sender 分组发送 → ProduceRequest 构建
Broker 端:ProduceRequest 处理 → Log.append() → ReplicaManager 同步
第 2 周:理解消息存储
入口:Log.append() → LogSegment.append()
重点:MappedByteBuffer 写入 → Index 更新 → 刷盘策略
消费端:FileRecords.read() → sendfile 零拷贝
第 3 周:理解消费与 Rebalance
入口:KafkaConsumer.poll() → ConsumerCoordinator
重点:分区分配 → Fetcher 拉取 → offset 管理 → Rebalance 协议
第 4 周:理解 Controller 与副本管理
入口:KafkaController(Controller 模块)→ ReplicaManager
重点:Leader 选举 → ISR 管理 → Partition Reassignment
KRaft:QuorumController → MetadataLog(__cluster_metadata Topic)
11.2 调试技巧
- 本地启动单节点 Kafka(KRaft 模式,无需 ZooKeeper),配置文件设置
num.partitions=1简化调试。 - 在
Log.append()打断点,跟踪一条消息从接收到写入 LogSegment 的全过程。 - 在
ConsumerCoordinator.onJoinComplete()打断点,观察 Rebalance 后的分区分配结果。 - 使用
kafka-dump-log.sh工具查看 LogSegment 的二进制内容(解码消息格式)。 - 使用 Arthas 或 JFR(Java Flight Recorder)观察生产环境的线程行为和性能瓶颈。
十二、学习资源
| 资源 | 说明 |
|---|---|
| Kafka 官方文档 | 最权威的参考,Design 章节写得极好 |
| Kafka: The Definitive Guide (2nd) | Confluent 团队编写,覆盖原理与实践 |
| Kafka 核心技术与实践(极客时间) | 胡夕老师出品,中文最佳 Kafka 课程 |
| Kafka 源码 | Scala + Java 混编,3.x 开始逐步迁移到纯 Java |
| Confluent Developer | Confluent 官方学习平台,有大量教程和示例 |
| Kafka KRaft 设计文档 | KIP-500,理解 KRaft 的设计动机 |
本教程基于 Kafka 3.x / 4.x(KRaft 模式),部分内容标注了与旧版本(ZooKeeper 模式)的差异。
建议学习方式:每读完一节就动手实验(本地 Docker 启动 Kafka + kafkacat 工具收发消息),结合源码断点验证理解。面试前重点准备 Q3/Q5/Q6/Q9/Q10/Q13 这六道高频深度题。
评论区