消息队列 (MQ)
消息队列 (MQ) #
分布式系统核心组件,Kafka、RocketMQ 等消息中间件详解
📋 目录 #
消息队列概述 #
为什么需要消息队列 #
核心价值:
| 价值 | 说明 | 场景 |
|---|---|---|
| 异步解耦 | 上下游服务解耦 | 订单→库存→物流 |
| 削峰填谷 | 应对流量突增 | 秒杀、大促 |
| 数据分发 | 一对多消息传递 | 数据同步 |
主流MQ对比 #
| 特性 | Kafka | RocketMQ | RabbitMQ |
|---|---|---|---|
| 吞吐量 | 百万级 | 十万级 | 万级 |
| 延迟 | 毫秒级 | 毫秒级 | 微秒级 |
| 可靠性 | 高(副本机制) | 高(同步刷盘) | 高(ACK) |
| 消息堆积 | 支持 | 支持 | 支持(有限) |
| 时效性 | 高 | 高 | 最高 |
| 功能丰富度 | 基础 | 丰富(事务、顺序、延迟) | 丰富 |
| 复杂度 | 中 | 高 | 低 |
| 适用场景 | 日志、流计算 | 订单、交易 | 业务消息 |
Kafka #
面试高频 ⭐⭐⭐⭐⭐
核心架构 #
P = Partition (分区)
核心概念 #
| 概念 | 说明 |
|---|---|
| Broker | Kafka服务节点 |
| Topic | 消息逻辑分类 |
| Partition | Topic物理分片,提高并行度 |
| Consumer Group | 消费者组,实现负载均衡 |
| Offset | 消消费位置 |
| Replica | 副本,保证高可用 |
Kafka 高性能原理 #
| 优化技术 | 原理 | 效果提升 |
|---|---|---|
| 顺序读写 | 磁盘顺序读写快于随机读写 | 10+ 倍 |
| 零拷贝 | 直接从内核缓冲区到网卡,不应用缓冲区 | 减少拷贝次数 |
| 批量发送 | 多条消息合并发送 | 减少网络IO |
| Page Cache | 操作系统缓存,不显式flush | 减少磁盘IO |
| 压缩 | Gzip/Snappy/LZ4/Zstd | 减少网络传输 |
数据可靠性保证 #
| 配置 | 可靠性 | 性能 | 适用场景 |
|---|---|---|---|
acks=0 |
低 | 最高 | 不重要的日志 |
acks=1 |
中 | 高 | 一般消息 |
acks=all |
高 | 中 | 重要业务消息 |
消息顺序保证 #
消息不丢失方案 #
// 1. 生产者端
properties.put("acks", "all"); // 等待所有副本确认
properties.put("retries", Integer.MAX_VALUE); // 无限重试
properties.put("enable.idempotence", "true"); // 开启幂等性
// 2. 消费者端
enable.auto.commit = false; // 关闭自动提交
// 业务处理成功后手动提交offset
// 3. Broker端
min.insync.replicas = 2; // ISR中最少2个副本
unclean.leader.election.enable = false; // 禁止非ISR副本选Leader
消息不重复方案 #
RocketMQ #
核心架构 #
核心特性 #
| 特性 | Kafka | RocketMQ |
|---|---|---|
| 架构 | 基于Zookeeper | 自实现的NameSrv |
| 事务消息 | ❌ | ✅ 支持分布式事务 |
| 延迟消息 | ❌ | ✅ 18级延迟 |
| 消息回溯 | ❌ | ✅ 支持重消费 |
| 死信队列 | ✅ | ✅ |
| 消息过滤 | 消费端过滤 | Broker端过滤 |
事务消息 #
延迟消息 #
RocketMQ 18级延迟:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
应用: 订单超时关闭、延迟通知
消息可靠性 #
消息丢失场景与解决 #
| 场景 | 解决方案 |
|---|---|
| 生产者丢失 | 开启重试、ACK=all、幂等性 |
| Broker丢失 | 副本机制、同步刷盘、ISR |
| 消费者丢失 | 关闭自动提交、处理成功才提交offset |
消息堆积处理 #
原因:
- 消费者处理慢
- 消费者故障
- 流量突增
排查:
1. 查看Consumer Group lag
2. 检查消费者日志
3. 确认下游服务正常
优化:
1. 增加消费者实例
2. 增加分区数
3. 优化消费者处理逻辑
4. 暂停生产者(临时方案)
消息顺序性 #
| 方案 | 实现 | 缺点 |
|---|---|---|
| 单分区 | Topic只有1个分区 | 吞吐量低 |
| 分区路由 | 相同key发同一分区 | 部分场景不适用 |
| 有序消费者 | 单线程消费 | 性能低 |
常见问题 #
Kafka vs RocketMQ 如何选择? #
如何保证消息不丢失? #
如何处理消息重复消费? #
消息积压怎么办? #
面试题汇总 #
Kafka篇 #
- Kafka为什么高性能?
- Kafka如何保证消息有序?
- Kafka如何保证消息不丢失?
- Kafka的rebalance机制?
- Zookeeper在Kafka中的作用?
RocketMQ篇 #
- RocketMQ的架构设计?
- 事务消息原理?
- 延迟消息实现?
- 如何选择Kafka vs RocketMQ?
通用篇 #
- 消息队列的应用场景?
- 如何保证消息不重复?
- 消息积压怎么处理?
- 消息消费失败重试策略?
面试题答案详解 #
Kafka篇 #
- Kafka为什么高性能?
答案:
核心优化点:
| 优化 | 原理 | 效果 |
|---|---|---|
| 顺序读写 | 磁盘顺序写入,避免寻道 | 10倍+性能提升 |
| 零拷贝 | 直接从Page Cache到网卡,跳过用户空间 | 减少CPU拷贝 |
| 批量发送 | 多条消息合并,减少网络IO | 降低网络开销 |
| Page Cache | 利用OS缓存,不显式flush | 减少磁盘IO |
| 分区并行 | Topic分多个Partition,并行消费 | 提高吞吐量 |
| 压缩 | Gzip/Snappy/LZ4/Zstd压缩 | 减少网络传输 |
- Kafka如何保证消息有序?
答案:
分区内有序,分区间无序
方案:
- 单分区: 全局有序,但吞吐量低
- key路由: 相同key的消息发到同一分区(推荐)
// 示例:按用户ID路由,保证同一用户消息有序
ProducerRecord<String, String> record =
new ProducerRecord<>("order-topic", userId, message);
关键点:
- 只能保证分区内有序
- 不能保证全局有序(除非单分区)
- Kafka如何保证消息不丢失?
答案:
三方配合:
| 端 | 配置 | 说明 |
|---|---|---|
| 生产者 | acks=all |
等待所有ISR副本确认 |
| Broker | min.insync.replicas=2 |
ISR最少2个副本 |
| 消费者 | enable.auto.commit=false |
关闭自动提交 |
- Kafka的rebalance机制?
答案:
触发条件:
- Consumer加入/离开
- Partition数变化
- 订阅Topic变化
流程:
- Group Coordinator协调
- 选出Leader Consumer
- Leader分配分区
- 同步分配结果
- Consumer开始消费
分配策略:
- Range: 按范围分配
- RoundRobin: 轮询
- Sticky: 粘性分配(尽量保持原有分配)
- Zookeeper在Kafka中的作用?
答案:
| 作用 | 说明 |
|---|---|
| Broker注册 | /brokers/ids/[brokerId] |
| Topic配置 | /brokers/topics/[topic] |
| Controller选举 | /controller临时节点 |
| Consumer注册 | /consumers/[groupId] |
| Offset存储 | (新版Kafka已移到内部Topic) |
注意: Kafka 2.8+ 支持 KRaft模式,不再依赖Zookeeper
RocketMQ篇 #
- RocketMQ的架构设计?
答案:
四大核心组件:
| 组件 | 作用 |
|---|---|
| NameSrv | 路由中心,轻量级注册中心 |
| Broker | 消息存储,Master-Slave架构 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
架构特点:
- NameSrv无状态,可横向扩展
- Broker主从复制,保证高可用
- 不依赖Zookeeper,自实现注册中心
- 事务消息原理?
答案:
两阶段提交:
适用场景: 订单→库存→支付 分布式事务
- 延迟消息实现?
答案:
18级延迟等级:
1s 5s 10s 30s
1m 2m 3m 4m 5m 6m 7m 8m 9m 10m
20m 30m 1h 2h
实现原理:
- 消息发送到对应延迟级别的Topic
- 定时任务扫描到期消息
- 转发到实际Topic供消费
应用场景:
- 订单超时自动关闭
- 延迟通知
- 定时任务
- 如何选择Kafka vs RocketMQ?
答案:
| 对比项 | Kafka | RocketMQ |
|---|---|---|
| 吞吐量 | 百万级 | 十万级 |
| 延迟 | 毫秒级 | 毫秒级 |
| 事务消息 | ❌ | ✅ |
| 延迟消息 | ❌ | ✅ |
| 消息回溯 | ❌ | ✅ |
| 生态 | 大数据、流计算 | 金融、交易 |
选型建议:
- Kafka: 日志收集、实时流计算、大数据
- RocketMQ: 订单交易、事务消息、金融级业务
- RabbitMQ: 业务消息、复杂路由、小团队
通用篇 #
- 消息队列的应用场景?
答案:
| 场景 | 说明 | 示例 |
|---|---|---|
| 异步解耦 | 上下游解耦,提升性能 | 订单→库存→物流 |
| 削峰填谷 | 缓冲流量,保护下游 | 秒杀、大促 |
| 数据分发 | 一对多消息广播 | 数据同步 |
| 最终一致性 | 保证分布式事务一致性 | 订单支付 |
| 限流削峰 | 控制请求速率 | API限流 |
- 如何保证消息不重复?
答案:
幂等性设计:
| 方案 | 实现 |
|---|---|
| 唯一索引 | MySQL唯一键约束 |
| 分布式锁 | Redis SETNX |
| 幂等表 | 记录已处理消息ID |
| 版本号 | 乐观锁 UPDATE ... WHERE version=? |
示例:
-- 利用唯一索引去重
INSERT INTO order_log (order_id, status)
VALUES (123, 'success')
ON DUPLICATE KEY UPDATE status = status;
- 消息积压怎么处理?
答案:
紧急处理:
- 增加Consumer实例(不能超过分区数)
- 临时增加Topic分区数
- 暂停非核心生产者
长期优化:
- 优化Consumer处理逻辑
- 下游服务扩容/降级
- 批量消费代替单条消费
极端情况:
- 建新Topic,临时转移积压消息
- 启动多Consumer消费新Topic
- 处理完再切回
- 消息消费失败重试策略?
答案:
重试策略:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 固定间隔 | 固定时间重试 | 简单场景 |
| 指数退避 | 1s→2s→4s→8s... | 避免雪崩 |
| 死信队列 | 重试N次失败进入死信 | 需人工干预 |
RocketMQ重试:
- 默认重试16次
- 重试间隔递增
- 最终进入死信队列
Kafka重试:
- 需自行实现
- 利用Seek重新消费
- 或死信队列方案
MQ数据存储 #
面试高频 ⭐⭐⭐⭐⭐
逻辑存储架构 #
存储层级: Topic → Partition → Segment → Message
Kafka物理存储 #
| 组件 | 说明 |
|---|---|
| Log文件 | 实际消息数据,顺序追加写入 |
| Index文件 | 稀疏索引,每4KB消息建一条索引 |
| TimeIndex文件 | 基于时间的索引,用于消息回溯 |
| Segment | Log分段,默认1GB或7天 |
RocketMQ物理存储 #
| 组件 | 说明 |
|---|---|
| CommitLog | 所有Topic消息统一存储,顺序写 |
| ConsumeQueue | 消费队列,逻辑队列,存储CommitLog偏移 |
| IndexFile | 哈希索引,支持按key查询消息 |
存储核心设计 #
| 设计 | 原理 | 优势 |
|---|---|---|
| 顺序写 | 磁盘顺序写性能≈内存随机写 | 10+倍性能提升 |
| 零拷贝 | sendfile直接从Page Cache到网卡 | 减少CPU拷贝 |
| 稀疏索引 | 减少索引文件大小,提高查询效率 | 降低IO开销 |
| 分段存储 | 大文件分段,便于删除/清理 | 易维护 |
Kafka vs RocketMQ存储对比 #
| 特性 | Kafka | RocketMQ |
|---|---|---|
| 存储方式 | Partition独立存储 | CommitLog统一存储 |
| 索引 | Index + TimeIndex | ConsumeQueue + IndexFile |
| 顺序写 | ✅ Partition内顺序 | ✅ CommitLog顺序 |
| 零拷贝 | ✅ 支持 | ✅ 支持 |
| 消息查询 | Offset查询 | Offset + Key查询 |
Exactly-Once语义 #
面试高频 ⭐⭐⭐⭐⭐
Exactly-Once = At-Least-Once + 幂等性 #
Kafka Exactly-Once实现 #
// 1. 生产者幂等性
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-tx-id");
// 2. 事务(同时写多个Topic原子性)
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
// 3. 消费者读已提交
props.put("isolation.level", "read_committed");
RocketMQ Exactly-Once实现 #
// 1. 消息带唯一Key
Message msg = new Message("topic", "tag",
uniqueKey, body.getBytes());
// 2. 消费者幂等处理
if (redis.setnx("msg:" + msgId, "1", 3600)) {
// 第一次处理
processMessage(msg);
} else {
// 重复消息,跳过
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
幂等性实现方案对比 #
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 唯一索引 | 简单、数据库原生 | 需数据库支持 | 关键业务数据 |
| Redis SETNX | 高性能 | 需考虑过期时间 | 短期去重 |
| 幂等表 | 灵活 | 需额外存储 | 通用场景 |
| 版本号 | 乐观锁 | 需设计版本字段 | 更新操作 |
| 状态机 | 业务语义清晰 | 需设计状态流转 | 状态驱动业务 |
幂等性代码示例 #
// 方案1:唯一索引
try {
jdbcTemplate.update(
"INSERT INTO order_log (order_id, status) VALUES (?, ?)",
orderId, "SUCCESS"
);
} catch (DuplicateKeyException e) {
// 重复,忽略
}
// 方案2:Redis SETNX
String key = "msg:" + msgId;
if (redisTemplate.opsForValue().setIfAbsent(key, "1", 3600, TimeUnit.SECONDS)) {
// 第一次处理
processMessage(msg);
}
// 方案3:幂等表
String msgId = message.getMsgId();
if (!idempotentService.checkAndMark(msgId)) {
return; // 已处理过
}
processMessage(msg);
分布式事务 #
面试高频 ⭐⭐⭐⭐⭐
分布式事务方案对比 #
| 方案 | 适用MQ | 优点 | 缺点 | 推荐度 |
|---|---|---|---|---|
| RocketMQ事务消息 | RocketMQ | 原生支持,业务侵入小 | 需RocketMQ | ⭐⭐⭐⭐⭐ |
| 本地消息表 | 任意MQ | 通用,可靠 | 需额外表 | ⭐⭐⭐⭐ |
| Seata AT + MQ | 任意MQ | TCC模式 | 复杂度高 | ⭐⭐⭐ |
| Kafka事务 | Kafka | 原子性写多Topic | 无反查机制 | ⭐⭐⭐ |
方案1:RocketMQ事务消息(推荐) #
RocketMQ事务消息代码:
// 1. 实现事务监听器
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 比如:创建订单
orderService.createOrder(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 反查本地事务状态
String orderId = msg.getKeys();
Order order = orderService.findById(orderId);
if (order != null && order.getStatus() == SUCCESS) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.UNKNOW; // 继续反查
}
}
};
// 2. 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(transactionListener);
producer.start();
Message msg = new Message("order-topic", orderId, body.getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
方案2:本地消息表(通用方案) #
本地消息表代码:
// 1. 本地事务,同时写业务表和消息表
@Transactional
public void createOrder(Order order) {
// 1. 写入订单表
orderMapper.insert(order);
// 2. 写入消息表
MessageEntity msg = new MessageEntity();
msg.setId(UUID.randomUUID().toString());
msg.setStatus("待发送");
msg.setContent(JSON.toJSONString(order));
messageMapper.insert(msg);
}
// 2. 定时任务扫描待发送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<MessageEntity> messages = messageMapper.findByStatus("待发送");
for (MessageEntity msg : messages) {
try {
// 发送MQ
rocketMQTemplate.syncSend("order-topic", msg.getContent());
// 更新状态
messageMapper.updateStatus(msg.getId(), "已发送");
} catch (Exception e) {
// 重试
}
}
}
方案3:Kafka事务 #
// Kafka事务只能保证多Topic原子写入,无反查机制
producer.initTransactions();
try {
producer.beginTransaction();
// 原子写入多个Topic
producer.send(record1); // 订单Topic
producer.send(record2); // 库存Topic
producer.send(record3); // 积分Topic
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
// 消费者需设置: isolation.level = read_committed
最佳实践建议 #
| 场景 | 推荐方案 |
|---|---|
| 金融/交易场景 | RocketMQ事务消息 |
| 通用场景 | 本地消息表 + 定时任务 |
| 已有Seata | Seata + MQ组合 |
| 大数据/日志 | 最终一致性即可,不需要强事务 |
分布式事务注意事项 #
- 反查接口幂等:Broker可能多次反查,需保证幂等
- 超时时间设置:本地事务执行时间需控制在合理范围
- 死信队列:最终失败的消息进入死信队列,人工干预
- 监控告警:事务失败、反查异常需及时告警
🔗 相关笔记 #
最后更新: 2026-05-13