消息队列 (MQ) #

分布式系统核心组件,Kafka、RocketMQ 等消息中间件详解


📋 目录 #


消息队列概述 #

为什么需要消息队列 #

串行执行

异步解耦

同步调用

总耗时=各服务耗时之和

消息队列

总耗时=最长服务耗时

核心价值:

价值 说明 场景
异步解耦 上下游服务解耦 订单→库存→物流
削峰填谷 应对流量突增 秒杀、大促
数据分发 一对多消息传递 数据同步

主流MQ对比 #

特性 Kafka RocketMQ RabbitMQ
吞吐量 百万级 十万级 万级
延迟 毫秒级 毫秒级 微秒级
可靠性 高(副本机制) 高(同步刷盘) 高(ACK)
消息堆积 支持 支持 支持(有限)
时效性 最高
功能丰富度 基础 丰富(事务、顺序、延迟) 丰富
复杂度
适用场景 日志、流计算 订单、交易 业务消息

Kafka #

面试高频 ⭐⭐⭐⭐⭐

核心架构 #

Producer
生产者

Broker 集群

Consumer
消费者

ZooKeeper
元数据

Topic A
3 Partitions

Topic B
2 Partitions

P = Partition (分区)

核心概念 #

概念 说明
Broker Kafka服务节点
Topic 消息逻辑分类
Partition Topic物理分片,提高并行度
Consumer Group 消费者组,实现负载均衡
Offset 消消费位置
Replica 副本,保证高可用

Kafka 高性能原理 #

Kafka高性能

顺序读写

零拷贝

批量发送

页缓存

优化技术 原理 效果提升
顺序读写 磁盘顺序读写快于随机读写 10+ 倍
零拷贝 直接从内核缓冲区到网卡,不应用缓冲区 减少拷贝次数
批量发送 多条消息合并发送 减少网络IO
Page Cache 操作系统缓存,不显式flush 减少磁盘IO
压缩 Gzip/Snappy/LZ4/Zstd 减少网络传输

数据可靠性保证 #

ACK 配置

acks=0: 不等待确认 ⚠️ 可能丢失

acks=1: Leader确认 ⚠️ Leader挂可能丢

acks=all: 所有副本确认 ✅ 最高可靠

ISR (In-Sync Replicas)

保持同步的副本集合

只有ISR中副本才能选为Leader

三副本机制

Leader
负责读写

Follower
异步复制

Follower
异步复制

配置 可靠性 性能 适用场景
acks=0 最高 不重要的日志
acks=1 一般消息
acks=all 重要业务消息

消息顺序保证 #

示例: 按用户路由

全局有序方案

order1

order2

order1

order2

默认: 分区内有序, 分区间无序

Topic只设1个分区 ❌ 性能差

相同key发到同一分区 ✅ 推荐

userA

Partition 1

userB

Partition 2

消息不丢失方案 #

// 1. 生产者端
properties.put("acks", "all");                    // 等待所有副本确认
properties.put("retries", Integer.MAX_VALUE);     // 无限重试
properties.put("enable.idempotence", "true");     // 开启幂等性

// 2. 消费者端
enable.auto.commit = false;                       // 关闭自动提交
// 业务处理成功后手动提交offset

// 3. Broker端
min.insync.replicas = 2;                          // ISR中最少2个副本
unclean.leader.election.enable = false;           // 禁止非ISR副本选Leader

消息不重复方案 #

消费者幂等性

业务层面去重
MySQL唯一索引 / Redis SETNX

消息ID存储
Redis: SET message:id 1 EX 3600

生产者幂等性

enable.idempotence = true

每个分区每个epoch只接受一个Producer

自动重试不会导致重复


RocketMQ #

核心架构 #

Broker Cluster

Master
负责读写

Slave
只读,同步复制

NameSrv
注册中心

Broker Cluster

Producer
生产者

Consumer
消费者

核心特性 #

特性 Kafka RocketMQ
架构 基于Zookeeper 自实现的NameSrv
事务消息 ✅ 支持分布式事务
延迟消息 ✅ 18级延迟
消息回溯 ✅ 支持重消费
死信队列
消息过滤 消费端过滤 Broker端过滤

事务消息 #

本地DBBrokerProducer本地DBBrokerProducer1. 半消息发送2. 本地事务执行3. 提交/回滚alt[本地事务成功][本地事务失败]4. 反查机制(未收到提交/回滚时)适用场景: 订单创建→库存扣减→支付发送半消息返回ACK(消息标记为不可消费)执行本地数据库事务Commit(消息可消费)Rollback(消息删除)反查Producer事务状态返回最终状态

延迟消息 #

RocketMQ 18级延迟:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

应用: 订单超时关闭、延迟通知

消息可靠性 #

消息丢失场景与解决 #

Producer

问题1
网络异常

Broker

问题2
Broker宕机

Consumer

问题3
消费失败

场景 解决方案
生产者丢失 开启重试、ACK=all、幂等性
Broker丢失 副本机制、同步刷盘、ISR
消费者丢失 关闭自动提交、处理成功才提交offset

消息堆积处理 #

原因:
  - 消费者处理慢
  - 消费者故障
  - 流量突增

排查:
  1. 查看Consumer Group lag
  2. 检查消费者日志
  3. 确认下游服务正常

优化:
  1. 增加消费者实例
  2. 增加分区数
  3. 优化消费者处理逻辑
  4. 暂停生产者(临时方案)

消息顺序性 #

方案 实现 缺点
单分区 Topic只有1个分区 吞吐量低
分区路由 相同key发同一分区 部分场景不适用
有序消费者 单线程消费 性能低

常见问题 #

Kafka vs RocketMQ 如何选择? #

RabbitMQ

业务消息

需要复杂路由

小团队(运维简单)

RocketMQ

订单、交易

需要事务消息

需要消息回溯

Kafka

日志收集

实时流计算

大数据场景

如何保证消息不丢失? #

消费者

Broker

生产者

重试机制

ACK = all

幂等性

副本机制 ≥ 2

ISR同步

禁止非ISR选Leader

关闭自动提交

处理成功才提交

幂等性设计

如何处理消息重复消费? #

版本号

UPDATE table SET ... WHERE id=? AND version=?

消息ID去重

SET message:id 1 EX 3600

业务幂等性

唯一索引(MySQL)

分布式锁(Redis)

幂等表

消息积压怎么办? #

极端

长期

短期

增加Consumer实例数

增加分区数

优化Consumer处理逻辑

下游服务降级

临时存储到Redis

创建临时新Topic

启动大量Consumer消费

处理完再切回原Topic


面试题汇总 #

Kafka篇 #

  1. Kafka为什么高性能?
  2. Kafka如何保证消息有序?
  3. Kafka如何保证消息不丢失?
  4. Kafka的rebalance机制?
  5. Zookeeper在Kafka中的作用?

RocketMQ篇 #

  1. RocketMQ的架构设计?
  2. 事务消息原理?
  3. 延迟消息实现?
  4. 如何选择Kafka vs RocketMQ?

通用篇 #

  1. 消息队列的应用场景?
  2. 如何保证消息不重复?
  3. 消息积压怎么处理?
  4. 消息消费失败重试策略?

面试题答案详解 #

Kafka篇 #

  1. Kafka为什么高性能?

答案:

核心优化点:

优化 原理 效果
顺序读写 磁盘顺序写入,避免寻道 10倍+性能提升
零拷贝 直接从Page Cache到网卡,跳过用户空间 减少CPU拷贝
批量发送 多条消息合并,减少网络IO 降低网络开销
Page Cache 利用OS缓存,不显式flush 减少磁盘IO
分区并行 Topic分多个Partition,并行消费 提高吞吐量
压缩 Gzip/Snappy/LZ4/Zstd压缩 减少网络传输

  1. Kafka如何保证消息有序?

答案:

分区内有序,分区间无序

方案:

  • 单分区: 全局有序,但吞吐量低
  • key路由: 相同key的消息发到同一分区(推荐)
// 示例:按用户ID路由,保证同一用户消息有序
ProducerRecord<String, String> record = 
    new ProducerRecord<>("order-topic", userId, message);

关键点:

  • 只能保证分区内有序
  • 不能保证全局有序(除非单分区)

  1. Kafka如何保证消息不丢失?

答案:

三方配合:

配置 说明
生产者 acks=all 等待所有ISR副本确认
Broker min.insync.replicas=2 ISR最少2个副本
消费者 enable.auto.commit=false 关闭自动提交

  1. Kafka的rebalance机制?

答案:

触发条件:

  • Consumer加入/离开
  • Partition数变化
  • 订阅Topic变化

流程:

  1. Group Coordinator协调
  2. 选出Leader Consumer
  3. Leader分配分区
  4. 同步分配结果
  5. Consumer开始消费

分配策略:

  • Range: 按范围分配
  • RoundRobin: 轮询
  • Sticky: 粘性分配(尽量保持原有分配)

  1. Zookeeper在Kafka中的作用?

答案:

作用 说明
Broker注册 /brokers/ids/[brokerId]
Topic配置 /brokers/topics/[topic]
Controller选举 /controller临时节点
Consumer注册 /consumers/[groupId]
Offset存储 (新版Kafka已移到内部Topic)

注意: Kafka 2.8+ 支持 KRaft模式,不再依赖Zookeeper


RocketMQ篇 #

  1. RocketMQ的架构设计?

答案:

四大核心组件:

组件 作用
NameSrv 路由中心,轻量级注册中心
Broker 消息存储,Master-Slave架构
Producer 消息生产者
Consumer 消息消费者

架构特点:

  • NameSrv无状态,可横向扩展
  • Broker主从复制,保证高可用
  • 不依赖Zookeeper,自实现注册中心

  1. 事务消息原理?

答案:

两阶段提交:

本地DBBrokerProducer本地DBBrokerProducer阶段1:发送半消息阶段2:执行本地事务阶段3:提交/回滚alt[事务成功][事务失败]补偿:反查机制适用场景: 订单→库存→支付 分布式事务半消息(暂不可消费)存储成功,回调Producer执行本地DB事务提交消息(可消费)回滚消息(删除)定时反查Producer事务状态返回最终状态

适用场景: 订单→库存→支付 分布式事务


  1. 延迟消息实现?

答案:

18级延迟等级:

1s 5s 10s 30s 
1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 
20m 30m 1h 2h

实现原理:

  • 消息发送到对应延迟级别的Topic
  • 定时任务扫描到期消息
  • 转发到实际Topic供消费

应用场景:

  • 订单超时自动关闭
  • 延迟通知
  • 定时任务

  1. 如何选择Kafka vs RocketMQ?

答案:

对比项 Kafka RocketMQ
吞吐量 百万级 十万级
延迟 毫秒级 毫秒级
事务消息
延迟消息
消息回溯
生态 大数据、流计算 金融、交易

选型建议:

  • Kafka: 日志收集、实时流计算、大数据
  • RocketMQ: 订单交易、事务消息、金融级业务
  • RabbitMQ: 业务消息、复杂路由、小团队

通用篇 #

  1. 消息队列的应用场景?

答案:

场景 说明 示例
异步解耦 上下游解耦,提升性能 订单→库存→物流
削峰填谷 缓冲流量,保护下游 秒杀、大促
数据分发 一对多消息广播 数据同步
最终一致性 保证分布式事务一致性 订单支付
限流削峰 控制请求速率 API限流

  1. 如何保证消息不重复?

答案:

幂等性设计:

方案 实现
唯一索引 MySQL唯一键约束
分布式锁 Redis SETNX
幂等表 记录已处理消息ID
版本号 乐观锁 UPDATE ... WHERE version=?

示例:

-- 利用唯一索引去重
INSERT INTO order_log (order_id, status) 
VALUES (123, 'success')
ON DUPLICATE KEY UPDATE status = status;

  1. 消息积压怎么处理?

答案:

紧急处理:

  1. 增加Consumer实例(不能超过分区数)
  2. 临时增加Topic分区数
  3. 暂停非核心生产者

长期优化:

  1. 优化Consumer处理逻辑
  2. 下游服务扩容/降级
  3. 批量消费代替单条消费

极端情况:

  • 建新Topic,临时转移积压消息
  • 启动多Consumer消费新Topic
  • 处理完再切回

  1. 消息消费失败重试策略?

答案:

重试策略:

策略 说明 适用场景
固定间隔 固定时间重试 简单场景
指数退避 1s→2s→4s→8s... 避免雪崩
死信队列 重试N次失败进入死信 需人工干预

RocketMQ重试:

  • 默认重试16次
  • 重试间隔递增
  • 最终进入死信队列

Kafka重试:

  • 需自行实现
  • 利用Seek重新消费
  • 或死信队列方案

MQ数据存储 #

面试高频 ⭐⭐⭐⭐⭐

逻辑存储架构 #

Topic: order-topic

Partition 0

Partition 1

Segment 0
offset 0-1000

Segment 1
offset 1001-2000

Segment 0
offset 0-800

Segment 1
offset 801-1600

存储层级: Topic → Partition → Segment → Message

Kafka物理存储 #

${log.dirs}/

topic1-0/

topic1-1/

...

00000000000000000000.log (数据文件)

00000000000000000000.index (偏移索引)

00000000000000000000.timeindex (时间索引)

组件 说明
Log文件 实际消息数据,顺序追加写入
Index文件 稀疏索引,每4KB消息建一条索引
TimeIndex文件 基于时间的索引,用于消息回溯
Segment Log分段,默认1GB或7天

RocketMQ物理存储 #

${store}/

commitlog/

consumequeue/

index/

00000000000000000000 (所有Topic消息统一存储)

topic1/

topic2/

0/

1/

00000000000000000000 (消费队列)

00000000000000000000 (索引文件)

组件 说明
CommitLog 所有Topic消息统一存储,顺序写
ConsumeQueue 消费队列,逻辑队列,存储CommitLog偏移
IndexFile 哈希索引,支持按key查询消息

存储核心设计 #

设计 原理 优势
顺序写 磁盘顺序写性能≈内存随机写 10+倍性能提升
零拷贝 sendfile直接从Page Cache到网卡 减少CPU拷贝
稀疏索引 减少索引文件大小,提高查询效率 降低IO开销
分段存储 大文件分段,便于删除/清理 易维护

Kafka vs RocketMQ存储对比 #

特性 Kafka RocketMQ
存储方式 Partition独立存储 CommitLog统一存储
索引 Index + TimeIndex ConsumeQueue + IndexFile
顺序写 ✅ Partition内顺序 ✅ CommitLog顺序
零拷贝 ✅ 支持 ✅ 支持
消息查询 Offset查询 Offset + Key查询

Exactly-Once语义 #

面试高频 ⭐⭐⭐⭐⭐

Exactly-Once = At-Least-Once + 幂等性 #

消费者端

Broker端

生产者端

Exactly-Once = At-Least-Once + 幂等性

开启幂等性 enable.idempotence=true (Kafka)

消息带唯一ID msgId = UUID/snowflake

去重存储(可选)Redis记录已处理msgId

事务消息(RocketMQ)

业务幂等性(核心)
唯一索引 / 分布式锁 / 幂等表

消息去重
SET message:id 1 EX 3600

Kafka Exactly-Once实现 #

// 1. 生产者幂等性
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-tx-id");

// 2. 事务(同时写多个Topic原子性)
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

// 3. 消费者读已提交
props.put("isolation.level", "read_committed");

RocketMQ Exactly-Once实现 #

// 1. 消息带唯一Key
Message msg = new Message("topic", "tag", 
    uniqueKey, body.getBytes());

// 2. 消费者幂等处理
if (redis.setnx("msg:" + msgId, "1", 3600)) {
    // 第一次处理
    processMessage(msg);
} else {
    // 重复消息,跳过
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

幂等性实现方案对比 #

方案 优点 缺点 适用场景
唯一索引 简单、数据库原生 需数据库支持 关键业务数据
Redis SETNX 高性能 需考虑过期时间 短期去重
幂等表 灵活 需额外存储 通用场景
版本号 乐观锁 需设计版本字段 更新操作
状态机 业务语义清晰 需设计状态流转 状态驱动业务

幂等性代码示例 #

// 方案1:唯一索引
try {
    jdbcTemplate.update(
        "INSERT INTO order_log (order_id, status) VALUES (?, ?)",
        orderId, "SUCCESS"
    );
} catch (DuplicateKeyException e) {
    // 重复,忽略
}

// 方案2:Redis SETNX
String key = "msg:" + msgId;
if (redisTemplate.opsForValue().setIfAbsent(key, "1", 3600, TimeUnit.SECONDS)) {
    // 第一次处理
    processMessage(msg);
}

// 方案3:幂等表
String msgId = message.getMsgId();
if (!idempotentService.checkAndMark(msgId)) {
    return; // 已处理过
}
processMessage(msg);

分布式事务 #

面试高频 ⭐⭐⭐⭐⭐

分布式事务方案对比 #

方案 适用MQ 优点 缺点 推荐度
RocketMQ事务消息 RocketMQ 原生支持,业务侵入小 需RocketMQ ⭐⭐⭐⭐⭐
本地消息表 任意MQ 通用,可靠 需额外表 ⭐⭐⭐⭐
Seata AT + MQ 任意MQ TCC模式 复杂度高 ⭐⭐⭐
Kafka事务 Kafka 原子性写多Topic 无反查机制 ⭐⭐⭐

方案1:RocketMQ事务消息(推荐) #

本地DBBrokerProducer本地DBBrokerProducer阶段1: 发送半消息阶段2: 执行本地事务阶段3: 提交/回滚成功→消息可消费失败→删除补偿: 反查机制Half Message(暂不可消费)执行本地数据库事务Commit/Rollback定时反查(默认1分钟)

RocketMQ事务消息代码:

// 1. 实现事务监听器
TransactionListener transactionListener = new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            // 比如:创建订单
            orderService.createOrder(order);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 反查本地事务状态
        String orderId = msg.getKeys();
        Order order = orderService.findById(orderId);
        if (order != null && order.getStatus() == SUCCESS) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW; // 继续反查
        }
    }
};

// 2. 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(transactionListener);
producer.start();

Message msg = new Message("order-topic", orderId, body.getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

方案2:本地消息表(通用方案) #

消费者MQ数据库应用消费者MQ数据库应用1. 本地事务(同一DB)2. 定时扫描 + 发送3. 消费BEGIN TRANSACTIONINSERT INTO order ...INSERT INTO message (status=待发送)COMMITSELECT * FROM message WHERE status=待发送返回待发送消息producer.send(msg)UPDATE message SET status=已发送消费消息(幂等处理)

本地消息表代码:

// 1. 本地事务,同时写业务表和消息表
@Transactional
public void createOrder(Order order) {
    // 1. 写入订单表
    orderMapper.insert(order);
    
    // 2. 写入消息表
    MessageEntity msg = new MessageEntity();
    msg.setId(UUID.randomUUID().toString());
    msg.setStatus("待发送");
    msg.setContent(JSON.toJSONString(order));
    messageMapper.insert(msg);
}

// 2. 定时任务扫描待发送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
    List<MessageEntity> messages = messageMapper.findByStatus("待发送");
    for (MessageEntity msg : messages) {
        try {
            // 发送MQ
            rocketMQTemplate.syncSend("order-topic", msg.getContent());
            // 更新状态
            messageMapper.updateStatus(msg.getId(), "已发送");
        } catch (Exception e) {
            // 重试
        }
    }
}

方案3:Kafka事务 #

// Kafka事务只能保证多Topic原子写入,无反查机制
producer.initTransactions();

try {
    producer.beginTransaction();
    
    // 原子写入多个Topic
    producer.send(record1); // 订单Topic
    producer.send(record2); // 库存Topic
    producer.send(record3); // 积分Topic
    
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

// 消费者需设置: isolation.level = read_committed

最佳实践建议 #

场景 推荐方案
金融/交易场景 RocketMQ事务消息
通用场景 本地消息表 + 定时任务
已有Seata Seata + MQ组合
大数据/日志 最终一致性即可,不需要强事务

分布式事务注意事项 #

  1. 反查接口幂等:Broker可能多次反查,需保证幂等
  2. 超时时间设置:本地事务执行时间需控制在合理范围
  3. 死信队列:最终失败的消息进入死信队列,人工干预
  4. 监控告警:事务失败、反查异常需及时告警

🔗 相关笔记 #


最后更新: 2026-05-13