Organized 50 interview questions into 12 categories: - 01-分布式系统 (9 files): 分布式事务, 分布式锁, 一致性哈希, CAP理论, etc. - 02-数据库 (2 files): MySQL索引优化, MyBatis核心原理 - 03-缓存 (5 files): Redis数据结构, 缓存问题, LRU算法, etc. - 04-消息队列 (1 file): RocketMQ/Kafka - 05-并发编程 (4 files): 线程池, 设计模式, 限流策略, etc. - 06-JVM (1 file): JVM和垃圾回收 - 07-系统设计 (8 files): 秒杀系统, 短链接, IM, Feed流, etc. - 08-算法与数据结构 (4 files): B+树, 红黑树, 跳表, 时间轮 - 09-网络与安全 (3 files): TCP/IP, 加密安全, 性能优化 - 10-中间件 (4 files): Spring Boot, Nacos, Dubbo, Nginx - 11-运维 (4 files): Kubernetes, CI/CD, Docker, 可观测性 - 12-面试技巧 (1 file): 面试技巧和职业规划 All files renamed to Chinese for better accessibility and organized into categorized folders for easier navigation. Generated with [Claude Code](https://claude.com/claude-code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
37 KiB
消息队列(RocketMQ/Kafka)
问题
背景:消息队列是分布式系统的核心组件,用于解耦、异步、削峰填谷。
问题:
- 消息队列的作用是什么?在什么场景下使用?
- 如何保证消息不丢失?(生产者、Broker、消费者三个层面)
- 如何保证消息不重复消费(幂等性)?
- 如何保证消息的顺序性?
- 什么是消息积压?如何处理?
- RocketMQ 和 Kafka 的区别是什么?如何选型?
标准答案
1. 消息队列的作用
核心作用
1. 解耦(Decoupling)
场景:用户注册成功后,需要发送欢迎邮件、发送短信、发放优惠券。
不使用 MQ:
用户服务
↓
调用邮件服务 ✉️
调用短信服务 📱
调用优惠券服务 🎫
问题:
- 用户服务依赖所有下游服务
- 任何一个下游服务故障都会影响注册流程
- 新增需求需要修改用户服务代码
使用 MQ:
用户服务 → 发送消息到 MQ ← 邮件服务(订阅)
← 短信服务(订阅)
← 优惠券服务(订阅)
← 新服务(随时订阅)
优点:
- 用户服务不需要知道下游服务的存在
- 下游服务故障不影响用户注册
- 新增服务只需订阅消息,无需修改用户服务
2. 异步(Asynchronous)
场景:用户下单后,需要:
- 扣减库存
- 生成订单
- 通知物流
- 发送短信
- 积分处理
同步调用(串行):
总耗时 = 50ms + 100ms + 80ms + 50ms + 60ms = 340ms
异步调用(MQ):
下单服务 → MQ(5ms)
└─ 返回给用户(总耗时:55ms)
后台异步处理:
├─ 库存服务(50ms)
├─ 订单服务(100ms)
├─ 物流服务(80ms)
├─ 短信服务(50ms)
└─ 积分服务(60ms)
优点:
- 响应时间从 340ms 降低到 55ms
- 用户体验提升
3. 削峰填谷(Peak Shaving)
场景:秒杀活动、双11大促
不使用 MQ:
瞬时 QPS:50000
数据库最大 QPS:5000
结果:数据库被打挂 ❌
使用 MQ:
瞬时 QPS:50000 → MQ(缓冲)
└─ 按数据库能力消费(5000 QPS)
结果:数据库平稳运行 ✅
图解:
不使用 MQ:
请求量 ███████████████████████████████████████
↑
数据库被打挂
使用 MQ:
请求量 ███████████████████████████████████████
↓
MQ 缓冲
↓
消费量 ████████████████████████████████████████
↑ 平滑消费
使用场景
| 场景 | 说明 | 示例 |
|---|---|---|
| 异步处理 | 非核心流程异步化 | 注册后发送邮件、短信 |
| 系统解耦 | 降低系统耦合度 | 订单系统 ↔ 物流系统 |
| 流量削峰 | 应对突发流量 | 秒杀、大促 |
| 日志收集 | 分布式日志收集 | 应用日志 → ELK |
| 事件驱动 | 基于事件的架构 | 用户行为触发推荐算法 |
| 数据同步 | 跨数据库同步 | MySQL → Elasticsearch |
| 分布式事务 | 最终一致性 | 订单 → 库存(Saga 模式) |
消息队列的缺点
-
系统复杂度增加
- 需要维护 MQ 集群
- 需要处理消息丢失、重复、顺序等问题
-
数据一致性
- 无法保证强一致性,只能保证最终一致性
-
可用性降低
- MQ 成为单点故障(需要高可用集群)
-
消息延迟
- 异步处理导致消息延迟
2. 保证消息不丢失
三个层面的保障
生产者 → Broker → 消费者
↓ ↓ ↓
三个层面都要保证
层面 1:生产者(Producer)
方案 1:同步发送 + 确认机制
RocketMQ 示例:
// 同步发送(默认)
Message message = new Message("TopicTest", "TagA", "OrderID001", "Hello MQ".getBytes());
// 同步发送:等待 Broker 确认
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// 发送成功
} else {
// 发送失败,重试或记录日志
log.error("消息发送失败: {}", sendResult);
}
发送状态:
SEND_OK:发送成功FLUSH_DISK_TIMEOUT:刷盘超时FLUSH_SLAVE_TIMEOUT:同步到 Slave 超时SLAVE_NOT_AVAILABLE:Slave 不可用
方案 2:异步发送 + 回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功
log.info("消息发送成功: {}", sendResult);
}
@Override
public void onException(Throwable e) {
// 发送失败,重试或记录日志
log.error("消息发送失败", e);
}
});
方案 3:事务消息(RocketMQ 独有)
原理:
- 发送半消息(Half Message,对消费者不可见)
- 执行本地事务
- 提交/回滚消息
// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务(如:创建订单)
try {
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 反查本地事务状态(Broker 未收到确认时调用)
if (orderService.exists(msg.getKeys())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
// 发送消息
producer.sendMessageInTransaction(message, null);
方案 4:本地消息表
原理:
- 在同一本地事务中:完成业务操作 + 插入消息到本地消息表
- 定时任务扫描消息表,发送消息到 MQ
- 发送成功后更新消息状态
@Transactional
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);
// 2. 保存消息到本地消息表
Message message = new Message("OrderTopic", JSON.toJSONString(order).getBytes());
localMessageMapper.insert(new LocalMessage(order.getId(), message));
}
// 定时任务:发送消息到 MQ
@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
List<LocalMessage> messages = localMessageMapper.findPendingMessages();
for (LocalMessage msg : messages) {
try {
SendResult result = producer.send(msg.getMessage());
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 更新消息状态为已发送
localMessageMapper.markAsSent(msg.getId());
}
} catch (Exception e) {
log.error("消息发送失败: {}", msg.getId(), e);
}
}
}
优点:
- 可靠性极高(消息持久化到数据库)
- 支持重试
缺点:
- 需要维护本地消息表
- 实现复杂
层面 2:Broker(消息服务器)
方案 1:消息持久化
RocketMQ:
<!-- 刷盘方式:同步刷盘 -->
<flushDiskType>SYNC_FLUSH</flushDiskType>
<!-- 或异步刷盘(性能更高,但可能丢失少量消息) -->
<flushDiskType>ASYNC_FLUSH</flushDiskType>
同步刷盘 vs 异步刷盘:
| 方式 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|
| SYNC_FLUSH | 低(写入磁盘后才返回成功) | 高 | 金融、支付 |
| ASYNC_FLUSH | 高(写入内存后返回成功) | 中 | 日志、监控 |
方案 2:主从复制(HA)
RocketMQ 配置:
<!-- 同步复制:Master 收到消息后,同步到 Slave 才返回成功 -->
<brokerRole>SYNC_MASTER</brokerRole>
<!-- 异步复制:Master 收到消息后立即返回,异步复制到 Slave -->
<brokerRole>ASYNC_MASTER</brokerRole>
复制方式对比:
| 方式 | 性能 | 可靠性 | 宕机丢数据 |
|---|---|---|---|
| SYNC_MASTER | 低 | 高 | 否 |
| ASYNC_MASTER | 高 | 中 | 可能 |
推荐配置(金融级可靠性):
<flushDiskType>SYNC_FLUSH</flushDiskType>
<brokerRole>SYNC_MASTER</brokerRole>
方案 3:多副本(Kafka)
Kafka 配置:
# 副本数量(建议 ≥ 3)
default.replication.factor=3
# 最小同步副本数(生产者确认)
min.insync.replicas=2
# 不完全的选举首领(禁止)
unclean.leader.election.enable=false
生产者配置:
Properties props = new Properties();
// 等待所有副本确认
props.put("acks", "all");
// 或至少等待主副本 + 1 个从副本确认
props.put("acks", "1");
acks 参数:
acks=0:不等待确认(最快,可能丢失)acks=1:等待主副本确认(折中)acks=all(或-1):等待所有副本确认(最可靠)
层面 3:消费者(Consumer)
方案 1:手动提交 Offset
RocketMQ 示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 设置为手动提交
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 处理消息
processMessage(msg);
// 处理成功,返回 CONSUME_SUCCESS(自动提交 offset)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 处理失败,稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
});
Kafka 示例:
Properties props = new Properties();
// 关闭自动提交
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 手动提交 offset
consumer.commitSync();
} catch (Exception e) {
// 处理失败,不提交 offset,下次重试
log.error("消息处理失败: {}", record.offset(), e);
}
}
}
方案 2:先处理消息,再提交 Offset
// 错误示例:先提交 offset,再处理消息
consumer.commitSync(); // ❌ 提交成功
processMessage(record); // ❌ 处理失败,消息丢失
// 正确示例:先处理消息,再提交 offset
processMessage(record); // ✅ 处理成功
consumer.commitSync(); // ✅ 提交 offset
方案 3:死信队列(DLQ)
场景:消息处理失败多次后,不再重试,放入死信队列。
RocketMQ 示例:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
int reconsumeTimes = msg.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 重试 3 次后仍失败,放入死信队列
sendToDeadLetterQueue(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
processMessage(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
});
Kafka 示例:
// 配置死信队列
Properties props = new Properties();
props.put("enable.deadletter", "true");
props.put("deadletter.queue.topic.prefix", "dlq.");
// 消费者处理失败时,发送到死信队列
if (retries >= 3) {
kafkaTemplate.send("dlq.MyTopic", record.value());
}
总结:端到端的可靠投递
生产者 Broker 消费者
↓ ↓ ↓
1. 同步发送 1. 持久化到磁盘 1. 手动提交 offset
2. 事务消息 2. 主从复制 2. 处理成功后才提交
3. 本地消息表 3. 多副本 3. 死信队列
3. 保证消息幂等性
为什么会有重复消息?
- 生产者重复发送:网络抖动导致生产者未收到 ACK,重试发送
- Broker 重复投递:主从切换时,从副本尚未同步的消息被重复投递
- 消费者重复消费:消费成功后 offset 提交失败,下次重复消费
幂等性解决方案
方案 1:数据库唯一约束
场景:订单创建
@Transactional
public void createOrder(Order order) {
// order_id 有唯一索引
try {
orderMapper.insert(order);
} catch (DuplicateKeyException e) {
// 重复订单,忽略
log.warn("重复订单: {}", order.getId());
return;
}
// 处理订单逻辑
processOrder(order);
}
优点:
- 实现简单
- 数据库保证唯一性
缺点:
- 依赖数据库
- 高并发下性能较差(频繁抛异常)
方案 2:Redis 分布式锁 + 唯一标识
原理:为每条消息设置唯一 ID(如 UUID),消费前先检查 Redis 是否已处理。
public void consumeMessage(Message message) {
String messageId = message.getHeaders().get("messageId");
String lockKey = "message:processed:" + messageId;
// 尝试获取锁
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS);
if (!locked) {
// 消息已处理,跳过
log.warn("重复消息: {}", messageId);
return;
}
try {
// 处理消息
processMessage(message);
} catch (Exception e) {
// 处理失败,删除锁(允许重试)
redisTemplate.delete(lockKey);
throw e;
}
}
优点:
- 性能高(Redis 内存操作)
- 可设置过期时间(自动清理)
缺点:
- 依赖 Redis
- Redis 宕机可能导致问题
方案 3:状态机
场景:订单状态流转
-- 订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
status VARCHAR(20), -- CREATED, PAID, SHIPPED, COMPLETED
version INT, -- 乐观锁版本号
UNIQUE KEY id_status (id, status) -- 联合唯一索引
);
@Transactional
public void updateOrderStatus(Long orderId, String fromStatus, String toStatus) {
// 使用 CAS(Compare And Set)更新状态
int updated = orderMapper.updateStatus(orderId, fromStatus, toStatus);
if (updated == 0) {
// 状态不匹配,可能是重复消息
log.warn("订单状态已变更: orderId={}, fromStatus={}, toStatus={}",
orderId, fromStatus, toStatus);
return;
}
// 处理后续逻辑
processStatusChange(orderId, toStatus);
}
// Mapper
@Update("UPDATE orders SET status = #{toStatus}, version = version + 1 " +
"WHERE id = #{orderId} AND status = #{fromStatus}")
int updateStatus(@Param("orderId") Long orderId,
@Param("fromStatus") String fromStatus,
@Param("toStatus") String toStatus);
优点:
- 状态机保证幂等性
- 支持复杂业务逻辑
缺点:
- 设计复杂
- 需要明确状态流转
方案 4:去重表
原理:创建专门的去重表,记录已处理的消息 ID。
-- 去重表
CREATE TABLE message_dedup (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
topic VARCHAR(64),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_message_id (message_id)
);
@Transactional
public void consumeMessage(Message message) {
String messageId = message.getId();
// 插入去重表
try {
dedupMapper.insert(new MessageDedup(messageId, message.getTopic()));
} catch (DuplicateKeyException e) {
// 消息已处理,跳过
log.warn("重复消息: {}", messageId);
return;
}
// 处理消息
processMessage(message);
}
优点:
- 简单直观
- 可查询处理历史
缺点:
- 额外的存储开销
- 需要定期清理历史数据
方案 5:多版本并发控制(MVCC)
原理:为数据增加版本号,更新时检查版本号。
-- 订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
amount DECIMAL(10,2),
version INT DEFAULT 0, -- 版本号
updated_at DATETIME
);
@Transactional
public void updateOrderAmount(Long orderId, BigDecimal newAmount) {
// 使用乐观锁更新
int updated = orderMapper.updateAmountWithVersion(orderId, newAmount);
if (updated == 0) {
// 版本号不匹配,可能是重复消息
log.warn("订单已被更新: orderId={}", orderId);
return;
}
// 处理后续逻辑
processAmountChange(orderId, newAmount);
}
// Mapper
@Update("UPDATE orders SET amount = #{amount}, version = version + 1 " +
"WHERE id = #{orderId} AND version = #{version}")
int updateAmountWithVersion(@Param("orderId") Long orderId,
@Param("amount") BigDecimal amount,
@Param("version") Integer version);
优点:
- 无锁,性能高
- 支持高并发
缺点:
- 更新失败时需要重试
- 不适合写冲突频繁的场景
幂等性方案对比
| 方案 | 性能 | 复杂度 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| 数据库唯一约束 | 低 | 低 | 高 | 低并发、简单场景 |
| Redis 分布式锁 | 高 | 中 | 中 | 高并发、通用场景 |
| 状态机 | 中 | 高 | 高 | 订单、工作流等 |
| 去重表 | 中 | 低 | 高 | 需要记录历史的场景 |
| MVCC | 高 | 中 | 高 | 高并发、读多写少 |
4. 保证消息顺序性
为什么消息会乱序?
场景:一个 Topic 有多个 Queue(分区),多个消费者并发消费。
Topic: OrderTopic
├─ Queue1: 消息1, 消息4, 消息7
├─ Queue2: 消息2, 消息5, 消息8
└─ Queue3: 消息3, 消息6, 消息9
消费者1消费Queue1,消费者2消费Queue2,消费者3消费Queue3
└─ 消费速度不同,导致消息乱序
解决方案
方案 1:单分区(单 Queue)
原理:一个 Topic 只有一个 Queue,保证消息按顺序消费。
// RocketMQ 生产者:发送到同一个 Queue
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 始终选择第一个 Queue
return mqs.get(0);
}
}, null);
// RocketMQ 消费者:单线程消费
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
Kafka 配置:
# Topic 只有 1 个分区
num.partitions=1
优点:
- 实现简单
- 严格保证顺序
缺点:
- 性能差(无法并行消费)
- 成为瓶颈
适用场景:
- 低并发、对顺序要求极高的场景
方案 2:按业务 Key 分区
原理:相同业务 Key 的消息发送到同一个 Queue。
场景:订单号相同的消息需要顺序消费。
RocketMQ 示例:
// 生产者:按订单 ID 分区
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
// 相同订单 ID 的消息发送到同一个 Queue
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
// 消费者:单线程消费每个 Queue(或至少保证相同订单的消息顺序处理)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// MessageListenerOrderly 保证顺序消费
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
Kafka 示例:
// 生产者:按订单 ID 分区
Properties props = new Properties();
props.put("partitioner.class", "com.example.OrderPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息(相同订单 ID 的消息会发送到同一个分区)
producer.send(new ProducerRecord<>("OrderTopic", orderId, message));
// 自定义分区器
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Long orderId = Long.parseLong(key.toString());
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return (int) (orderId % partitions.size());
}
}
优点:
- 相同业务 Key 的消息保证顺序
- 不同业务 Key 的消息可并行消费
- 性能较好
缺点:
- 需要选择合适的分区 Key
- 分区数量固定,扩容困难
方案 3:单线程消费 + 内存队列
原理:消费者内部按业务 Key 分组,使用多个内存队列 + 多个线程处理。
架构图:
MQ 消费者(单线程拉取)
↓
按 Key 分组
↓
┌───────────┬───────────┬───────────┐
│ 内存队列1 │ 内存队列2 │ 内存队列3 │
│ (订单100) │ (订单200) │ (订单300) │
└───────────┴───────────┴───────────┘
↓ ↓ ↓
工作线程1 工作线程2 工作线程3
代码示例:
@Component
public class OrderMessageConsumer {
private final Map<Long, BlockingQueue<Message>> keyQueues = new ConcurrentHashMap<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "order-group")
public void onMessage(MessageExt message) {
Long orderId = parseOrderId(message);
// 获取或创建该订单的内存队列
BlockingQueue<Message> queue = keyQueues.computeIfAbsent(orderId,
k -> new LinkedBlockingQueue<>()
);
// 消息放入队列
queue.offer(message);
// 提交异步任务处理
executorService.submit(() -> {
Message msg = queue.poll();
if (msg != null) {
processMessage(msg);
}
});
}
}
优点:
- 保证相同 Key 的消息顺序
- 不同 Key 的消息并行处理
- 性能好
缺点:
- 实现复杂
- 内存占用较高
消息顺序性总结
| 方案 | 性能 | 复杂度 | 顺序粒度 | 适用场景 |
|---|---|---|---|---|
| 单分区 | 低 | 低 | 全局顺序 | 低并发、全局顺序 |
| 按 Key 分区 | 高 | 中 | Key 级顺序 | 订单、用户等业务 |
| 内存队列 | 高 | 高 | Key 级顺序 | 高并发、复杂业务 |
5. 消息积压处理
消息积压的原因
-
消费者消费速度慢:
- 消费逻辑复杂
- 下游服务响应慢
- 数据库操作慢
-
生产者发送速度过快:
- 突发流量
- 秒杀活动
-
消费者故障:
- 消费者宕机
- 网络问题
-
消费者消费失败:
- 消息格式错误
- 业务异常
- 无限重试
监控消息积压
RocketMQ:
# 查看消费者堆积情况
sh mqadmin queryConsumeQueue -t OrderTopic -n localhost:9876
# 查看消费者组
sh mqadmin consumerList -g order-group -n localhost:9876
# 查看消费进度
sh mqadmin consumerProgress -g order-group -n localhost:9876
输出示例:
# 消费进度
Topic: OrderTopic
Broker: broker-a
Queue: 0
Diff: 1000000 ← 积压 100 万条
LastOffset: 5000000
Kafka:
# 查看消费者 lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-group --describe
# 输出
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
OrderTopic 0 5000000 6000000 1000000
↑
积压 100 万
解决方案
方案 1:增加消费者数量(横向扩容)
原理:增加消费者实例,提高消费能力。
RocketMQ 示例:
// 原来:1 个消费者实例
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "order-group",
consumeThreadNumber = 1 // 1 个线程
)
// 扩容后:10 个消费者实例(部署 10 个 Pod)
// 每个 Pod 有 1 个线程,总消费能力 ×10
注意事项:
- 消费者数量 ≤ Queue 数量
- 如果消费者数量 > Queue 数量,部分消费者会空闲
计算公式:
所需消费者数 = 消息积压量 / (单个消费者消费速度 × 预期清理时间)
示例:
积压 100 万条,单个消费者每秒消费 100 条,预期 1 小时清理
所需消费者数 = 1000000 / (100 × 3600) ≈ 3 个
方案 2:增加消费线程(纵向扩容)
RocketMQ 示例:
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "order-group",
consumeThreadNumber = 20 // 从 1 增加到 20 个线程
)
Kafka 示例:
Properties props = new Properties();
// 增加消费线程数
props.put("max.poll.records", 500); // 每次拉取 500 条
props.put("max.poll.interval.ms", 300000); // 最大处理时间 5 分钟
注意事项:
- 线程数不是越多越好(CPU、数据库连接数限制)
- 需要测试找到最优值
方案 3:优化消费逻辑
问题代码:
// ❌ 在循环中逐条处理数据库
public void consumeMessage(List<Message> messages) {
for (Message msg : messages) {
// 逐条插入数据库(慢)
orderMapper.insert(parseOrder(msg));
}
}
优化代码:
// ✅ 批量处理数据库
public void consumeMessage(List<Message> messages) {
List<Order> orders = messages.stream()
.map(this::parseOrder)
.collect(Collectors.toList());
// 批量插入(快 10-100 倍)
orderMapper.batchInsert(orders);
}
Mapper:
@Insert({
"<script>",
"INSERT INTO orders (id, user_id, amount) VALUES ",
"<foreach collection='orders' item='order' separator=','>",
"(#{order.id}, #{order.userId}, #{order.amount})",
"</foreach>",
"</script>"
})
int batchInsert(@Param("orders") List<Order> orders);
方案 4:临时扩容方案
场景:积压严重(如 1000 万条),正常消费需要几天。
步骤:
- 创建临时 Topic(大量 Queue):
# 创建 OrderTopic_Temp,有 100 个 Queue
- 编写转发程序:
// 从原 Topic 消费,发送到临时 Topic
@Component
public class MessageForwarder {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "forward-group")
public void onMessage(MessageExt message) {
// 转发到临时 Topic
rocketMQTemplate.send("OrderTopic_Temp", message);
}
}
- 部署大量消费者消费临时 Topic:
// 部署 100 个消费者实例,每个消费 1 个 Queue
@RocketMQMessageListener(
topic = "OrderTopic_Temp",
consumerGroup = "temp-consumer-group"
)
- 积压清理完毕后,恢复正常架构。
方案 5:丢弃部分消息
场景:
- 消息积压过于严重(如 1 亿条)
- 消息时效性要求高(如实时日志)
- 允许部分数据丢失
实现:
// 1. 创建临时消费者,只消费不处理(快速跳过)
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "skip-group"
)
public void onMessage(MessageExt message) {
// 直接返回成功,不处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 2. 跳到指定 offset
consumer.seekToBegin(); // 跳到最前
consumer.seek(queue, 10000000); // 跳到第 1000 万条
消息积压处理流程
1. 监控发现积压
↓
2. 分析积压原因
├─ 消费者慢 → 优化消费逻辑
├─ 消费者少 → 横向扩容
├─ 生产者快 → 限流
└─ 消费失败 → 修复 bug,跳过错误消息
↓
3. 执行扩容或优化
↓
4. 监控清理进度
↓
5. 恢复正常架构
6. RocketMQ vs Kafka
核心特性对比
| 特性 | RocketMQ | Kafka |
|---|---|---|
| 开发语言 | Java | Scala/Java |
| 协议 | 自有协议 | TCP |
| 消息模型 | 队列模型 | 发布-订阅模型 |
| 消息顺序 | 支持严格顺序 | 分区内有序 |
| 事务消息 | ✅ 支持 | ❌ 不支持(KIP-98 有提案) |
| 定时消息 | ✅ 支持 | ❌ 不支持 |
| 延迟消息 | ✅ 支持(18 级) | ❌ 不支持 |
| 消息回溯 | ✅ 支持 | ✅ 支持(需设置) |
| 消息过滤 | SQL92 表达式 | 客户端过滤 |
| 吞吐量 | 万级/单机 | 十万级/单机 |
| 延迟 | ms 级 | ms 级 |
| 可靠性 | 高(同步刷盘 + 主从) | 高(多副本) |
| 运维复杂度 | 中 | 中 |
| 社区活跃度 | 中(阿里主导) | 高(Confluent 支持) |
| 文档质量 | 中(中文文档多) | 高 |
| 适用场景 | 业务消息、订单、金融 | 日志收集、流式处理 |
详细对比
1. 消息模型
RocketMQ:
Topic: OrderTopic
├─ Queue1
├─ Queue2
├─ Queue3
└─ Queue4
消费者组:order-group
├─ 消费者1 → Queue1, Queue2
└─ 消费者2 → Queue3, Queue4
一条消息只能被同一个消费者组的一个消费者消费
Kafka:
Topic: OrderTopic
├─ Partition1
│ ├─ Replica1 (Leader)
│ ├─ Replica2 (Follower)
│ └─ Replica3 (Follower)
├─ Partition2
│ ├─ Replica1 (Leader)
│ ├─ Replica2 (Follower)
│ └─ Replica3 (Follower)
└─ ...
消费者组:order-group
├─ 消费者1 → Partition1
└─ 消费者2 → Partition2
一条消息只能被同一个消费者组的一个消费者消费
区别:
- RocketMQ 的 Queue 是逻辑概念
- Kafka 的 Partition 是物理概念(有副本)
2. 消息存储
RocketMQ:
CommitLog:
├─ 所有消息顺序写入(混合存储)
├─ 顺序写,性能高
└─ 通过 ConsumeQueue 索引快速查找
ConsumeQueue(索引文件):
├─ 消息 1: CommitLog 偏移量 100, 大小 200, TagsCode
├─ 消息 2: CommitLog 偏移量 300, 大小 150, TagsCode
└─ 消息 3: CommitLog 偏移量 450, 大小 180, TagsCode
Kafka:
Partition:
├─ Segment 0
│ ├─ 00000000000000000000.log (消息数据)
│ ├─ 00000000000000000000.index (索引)
│ └─ 00000000000000000000.timeindex (时间索引)
├─ Segment 1
│ ├─ 00000000000000050000.log
│ ├─ 00000000000000050000.index
│ └─ 00000000000000050000.timeindex
└─ ...
每个 Partition 独立存储
对比:
- RocketMQ:所有消息混合存储,通过索引文件查找
- Kafka:每个 Partition 独立存储,支持 Segment 滚动
3. 消息过滤
RocketMQ(服务端过滤):
// 消费者订阅时指定过滤条件
consumer.subscribe("OrderTopic", MessageSelector.bySql("amount > 100 AND status = 'PAID'"));
// 生产者发送消息时设置属性
Message msg = new Message("OrderTopic", "Order", body);
msg.putUserProperty("amount", "200");
msg.putUserProperty("status", "PAID");
producer.send(msg);
Kafka(客户端过滤):
// 消费者消费后手动过滤
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("OrderTopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 客户端过滤(浪费网络带宽)
if (record.value().contains("amount:200")) {
processMessage(record);
}
}
}
区别:
- RocketMQ:服务端过滤(节省带宽)
- Kafka:客户端过滤(浪费带宽)
4. 事务消息
RocketMQ:
// 1. 发送半消息
Message msg = new Message("OrderTopic", orderJson.getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
// 2. 执行本地事务(如创建订单)
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
}
// 3. 提交/回滚消息
// 消费者可以正常消费
Kafka:
❌ 不支持事务消息(KIP-98 有提案,但未正式发布)
✅ 支持"精确一次"语义(Exactly-Once),但仅限于流式处理(Kafka Streams)
5. 延迟消息
RocketMQ:
// 18 个默认延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
Message msg = new Message("OrderTopic", body);
msg.setDelayTimeLevel(3); // 延迟 10 秒
producer.send(msg);
Kafka:
// ❌ 不支持延迟消息
// ✅ 通过时间轮自定义实现
6. 吞吐量
测试环境:
- 机器:4 核 8G
- 消息大小:1 KB
结果:
| MQ | 单机吞吐量 | 延迟 |
|---|---|---|
| RocketMQ | 10-15 万/秒 | 1-5 ms |
| Kafka | 30-50 万/秒 | 1-5 ms |
原因:
- Kafka:零拷贝(sendfile)、批量处理、更好的磁盘顺序写
- RocketMQ:也有零拷贝,但 Java GC 影响性能
选型建议
选择 RocketMQ 的场景
-
需要事务消息:
- 订单系统(下单 → 扣库存)
- 支付系统(支付 → 通知)
-
需要延迟消息:
- 定时任务
- 超时取消订单
-
需要严格的消息顺序:
- 金融交易
- 订单状态流转
-
需要消息回溯:
- 数据恢复
- 数据重放
-
业务复杂、需要高可靠性:
- 金融、支付、电商
选择 Kafka 的场景
-
日志收集:
- 应用日志 → ELK
- 用户行为日志
-
流式处理:
- 实时数据分析
- 实时推荐
-
高吞吐量场景:
- 日志收集(百万级/秒)
- 监控数据收集
-
已有 Kafka 生态:
- 使用 Flink、Spark Streaming 等流式计算框架
企业级选型案例
| 公司 | 场景 | 选择 | 原因 |
|---|---|---|---|
| 阿里巴巴 | 订单、支付 | RocketMQ | 事务消息、高可靠性 |
| 美团 | 订单、物流 | RocketMQ | 业务复杂、延迟消息 |
| 字节跳动 | 日志、推荐 | Kafka | 高吞吐、流式处理 |
| 携程 | 订单、日志 | 两者都用 | 订单用 RocketMQ,日志用 Kafka |
| 京东 | 订单、支付 | RocketMQ | 事务消息、高可靠性 |
7. 总结
消息队列核心问题
| 问题 | 解决方案 |
|---|---|
| 消息不丢失 | 生产者(同步发送、事务消息、本地消息表) Broker(持久化、主从、多副本) 消费者(手动提交 offset、死信队列) |
| 消息重复消费 | 数据库唯一约束、Redis 分布式锁、状态机、去重表、MVCC |
| 消息顺序性 | 单分区、按 Key 分区、内存队列 |
| 消息积压 | 横向扩容、纵向扩容、优化消费逻辑、临时 Topic、丢弃消息 |
| 高可用 | 集群部署、主从复制、多副本、故障自动转移 |
8. 阿里 P7 加分项
深度理解:
- 理解 RocketMQ 的 CommitLog 和 ConsumeQueue 设计
- 理解 Kafka 的 Partition、Segment、副本同步机制
- 理解零拷贝(sendfile)、mmap 等底层技术
实战经验:
- 有将消息积压从千万级清理到正常的经验
- 有处理消息丢失、重复消费的线上故障经验
- 有消息队列性能调优经验(JVM 参数、操作系统参数)
架构能力:
- 能设计跨数据中心的消息队列架构
- 能设计消息队列的监控和告警体系
- 有消息队列迁移经验(如从 ActiveMQ 迁移到 RocketMQ)
技术选型:
- 能根据业务特点选择合适的 MQ
- 有多种 MQ 混用的经验
- 了解 MQTT、AMQP 等其他消息协议