Files
interview/questions/message-queue.md
2026-02-28 21:10:27 +08:00

1492 lines
37 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 消息队列RocketMQ/Kafka
## 问题
**背景**:消息队列是分布式系统的核心组件,用于解耦、异步、削峰填谷。
**问题**
1. 消息队列的作用是什么?在什么场景下使用?
2. 如何保证消息不丢失生产者、Broker、消费者三个层面
3. 如何保证消息不重复消费(幂等性)?
4. 如何保证消息的顺序性?
5. 什么是消息积压?如何处理?
6. RocketMQ 和 Kafka 的区别是什么?如何选型?
---
## 标准答案
### 1. 消息队列的作用
#### **核心作用**
##### **1. 解耦Decoupling**
**场景**:用户注册成功后,需要发送欢迎邮件、发送短信、发放优惠券。
**不使用 MQ**
```
用户服务
调用邮件服务 ✉️
调用短信服务 📱
调用优惠券服务 🎫
```
**问题**
- 用户服务依赖所有下游服务
- 任何一个下游服务故障都会影响注册流程
- 新增需求需要修改用户服务代码
**使用 MQ**
```
用户服务 → 发送消息到 MQ ← 邮件服务(订阅)
← 短信服务(订阅)
← 优惠券服务(订阅)
← 新服务(随时订阅)
```
**优点**
- 用户服务不需要知道下游服务的存在
- 下游服务故障不影响用户注册
- 新增服务只需订阅消息,无需修改用户服务
---
##### **2. 异步Asynchronous**
**场景**:用户下单后,需要:
1. 扣减库存
2. 生成订单
3. 通知物流
4. 发送短信
5. 积分处理
**同步调用(串行)**
```
总耗时 = 50ms + 100ms + 80ms + 50ms + 60ms = 340ms
```
**异步调用MQ**
```
下单服务 → MQ5ms
└─ 返回给用户总耗时55ms
后台异步处理:
├─ 库存服务50ms
├─ 订单服务100ms
├─ 物流服务80ms
├─ 短信服务50ms
└─ 积分服务60ms
```
**优点**
- 响应时间从 340ms 降低到 55ms
- 用户体验提升
---
##### **3. 削峰填谷Peak Shaving**
**场景**秒杀活动、双11大促
**不使用 MQ**
```
瞬时 QPS50000
数据库最大 QPS5000
结果:数据库被打挂 ❌
```
**使用 MQ**
```
瞬时 QPS50000 → MQ缓冲
└─ 按数据库能力消费5000 QPS
结果:数据库平稳运行 ✅
```
**图解**
```
不使用 MQ
请求量 ███████████████████████████████████████
数据库被打挂
使用 MQ
请求量 ███████████████████████████████████████
MQ 缓冲
消费量 ████████████████████████████████████████
↑ 平滑消费
```
---
#### **使用场景**
| 场景 | 说明 | 示例 |
|------|------|------|
| **异步处理** | 非核心流程异步化 | 注册后发送邮件、短信 |
| **系统解耦** | 降低系统耦合度 | 订单系统 ↔ 物流系统 |
| **流量削峰** | 应对突发流量 | 秒杀、大促 |
| **日志收集** | 分布式日志收集 | 应用日志 → ELK |
| **事件驱动** | 基于事件的架构 | 用户行为触发推荐算法 |
| **数据同步** | 跨数据库同步 | MySQL → Elasticsearch |
| **分布式事务** | 最终一致性 | 订单 → 库存Saga 模式) |
---
#### **消息队列的缺点**
1. **系统复杂度增加**
- 需要维护 MQ 集群
- 需要处理消息丢失、重复、顺序等问题
2. **数据一致性**
- 无法保证强一致性,只能保证最终一致性
3. **可用性降低**
- MQ 成为单点故障(需要高可用集群)
4. **消息延迟**
- 异步处理导致消息延迟
---
### 2. 保证消息不丢失
#### **三个层面的保障**
```
生产者 → Broker → 消费者
↓ ↓ ↓
三个层面都要保证
```
---
#### **层面 1生产者Producer**
##### **方案 1同步发送 + 确认机制**
**RocketMQ 示例**
```java
// 同步发送(默认)
Message message = new Message("TopicTest", "TagA", "OrderID001", "Hello MQ".getBytes());
// 同步发送:等待 Broker 确认
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// 发送成功
} else {
// 发送失败,重试或记录日志
log.error("消息发送失败: {}", sendResult);
}
```
**发送状态**
- `SEND_OK`:发送成功
- `FLUSH_DISK_TIMEOUT`:刷盘超时
- `FLUSH_SLAVE_TIMEOUT`:同步到 Slave 超时
- `SLAVE_NOT_AVAILABLE`Slave 不可用
---
##### **方案 2异步发送 + 回调**
```java
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功
log.info("消息发送成功: {}", sendResult);
}
@Override
public void onException(Throwable e) {
// 发送失败,重试或记录日志
log.error("消息发送失败", e);
}
});
```
---
##### **方案 3事务消息RocketMQ 独有)**
**原理**
1. 发送半消息Half Message对消费者不可见
2. 执行本地事务
3. 提交/回滚消息
```java
// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务(如:创建订单)
try {
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 反查本地事务状态Broker 未收到确认时调用)
if (orderService.exists(msg.getKeys())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
// 发送消息
producer.sendMessageInTransaction(message, null);
```
---
##### **方案 4本地消息表**
**原理**
1. 在同一本地事务中:完成业务操作 + 插入消息到本地消息表
2. 定时任务扫描消息表,发送消息到 MQ
3. 发送成功后更新消息状态
```java
@Transactional
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);
// 2. 保存消息到本地消息表
Message message = new Message("OrderTopic", JSON.toJSONString(order).getBytes());
localMessageMapper.insert(new LocalMessage(order.getId(), message));
}
// 定时任务:发送消息到 MQ
@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
List<LocalMessage> messages = localMessageMapper.findPendingMessages();
for (LocalMessage msg : messages) {
try {
SendResult result = producer.send(msg.getMessage());
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 更新消息状态为已发送
localMessageMapper.markAsSent(msg.getId());
}
} catch (Exception e) {
log.error("消息发送失败: {}", msg.getId(), e);
}
}
}
```
**优点**
- 可靠性极高(消息持久化到数据库)
- 支持重试
**缺点**
- 需要维护本地消息表
- 实现复杂
---
#### **层面 2Broker消息服务器**
##### **方案 1消息持久化**
**RocketMQ**
```xml
<!-- 刷盘方式:同步刷盘 -->
<flushDiskType>SYNC_FLUSH</flushDiskType>
<!-- 或异步刷盘(性能更高,但可能丢失少量消息) -->
<flushDiskType>ASYNC_FLUSH</flushDiskType>
```
**同步刷盘 vs 异步刷盘**
| 方式 | 性能 | 可靠性 | 适用场景 |
|------|------|--------|----------|
| SYNC_FLUSH | 低(写入磁盘后才返回成功) | 高 | 金融、支付 |
| ASYNC_FLUSH | 高(写入内存后返回成功) | 中 | 日志、监控 |
---
##### **方案 2主从复制HA**
**RocketMQ 配置**
```xml
<!-- 同步复制Master 收到消息后,同步到 Slave 才返回成功 -->
<brokerRole>SYNC_MASTER</brokerRole>
<!-- 异步复制Master 收到消息后立即返回,异步复制到 Slave -->
<brokerRole>ASYNC_MASTER</brokerRole>
```
**复制方式对比**
| 方式 | 性能 | 可靠性 | 宕机丢数据 |
|------|------|--------|------------|
| SYNC_MASTER | 低 | 高 | 否 |
| ASYNC_MASTER | 高 | 中 | 可能 |
**推荐配置**(金融级可靠性):
```xml
<flushDiskType>SYNC_FLUSH</flushDiskType>
<brokerRole>SYNC_MASTER</brokerRole>
```
---
##### **方案 3多副本Kafka**
**Kafka 配置**
```properties
# 副本数量(建议 ≥ 3
default.replication.factor=3
# 最小同步副本数(生产者确认)
min.insync.replicas=2
# 不完全的选举首领(禁止)
unclean.leader.election.enable=false
```
**生产者配置**
```java
Properties props = new Properties();
// 等待所有副本确认
props.put("acks", "all");
// 或至少等待主副本 + 1 个从副本确认
props.put("acks", "1");
```
**acks 参数**
- `acks=0`:不等待确认(最快,可能丢失)
- `acks=1`:等待主副本确认(折中)
- `acks=all`(或 `-1`):等待所有副本确认(最可靠)
---
#### **层面 3消费者Consumer**
##### **方案 1手动提交 Offset**
**RocketMQ 示例**
```java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 设置为手动提交
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 处理消息
processMessage(msg);
// 处理成功,返回 CONSUME_SUCCESS自动提交 offset
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 处理失败,稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
});
```
---
**Kafka 示例**
```java
Properties props = new Properties();
// 关闭自动提交
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 手动提交 offset
consumer.commitSync();
} catch (Exception e) {
// 处理失败,不提交 offset下次重试
log.error("消息处理失败: {}", record.offset(), e);
}
}
}
```
---
##### **方案 2先处理消息再提交 Offset**
```java
// 错误示例:先提交 offset再处理消息
consumer.commitSync(); // ❌ 提交成功
processMessage(record); // ❌ 处理失败,消息丢失
// 正确示例:先处理消息,再提交 offset
processMessage(record); // ✅ 处理成功
consumer.commitSync(); // ✅ 提交 offset
```
---
##### **方案 3死信队列DLQ**
**场景**:消息处理失败多次后,不再重试,放入死信队列。
**RocketMQ 示例**
```java
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
int reconsumeTimes = msg.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 重试 3 次后仍失败,放入死信队列
sendToDeadLetterQueue(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
processMessage(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
});
```
**Kafka 示例**
```java
// 配置死信队列
Properties props = new Properties();
props.put("enable.deadletter", "true");
props.put("deadletter.queue.topic.prefix", "dlq.");
// 消费者处理失败时,发送到死信队列
if (retries >= 3) {
kafkaTemplate.send("dlq.MyTopic", record.value());
}
```
---
#### **总结:端到端的可靠投递**
```
生产者 Broker 消费者
↓ ↓ ↓
1. 同步发送 1. 持久化到磁盘 1. 手动提交 offset
2. 事务消息 2. 主从复制 2. 处理成功后才提交
3. 本地消息表 3. 多副本 3. 死信队列
```
---
### 3. 保证消息幂等性
#### **为什么会有重复消息?**
1. **生产者重复发送**:网络抖动导致生产者未收到 ACK重试发送
2. **Broker 重复投递**:主从切换时,从副本尚未同步的消息被重复投递
3. **消费者重复消费**:消费成功后 offset 提交失败,下次重复消费
---
#### **幂等性解决方案**
##### **方案 1数据库唯一约束**
**场景**:订单创建
```java
@Transactional
public void createOrder(Order order) {
// order_id 有唯一索引
try {
orderMapper.insert(order);
} catch (DuplicateKeyException e) {
// 重复订单,忽略
log.warn("重复订单: {}", order.getId());
return;
}
// 处理订单逻辑
processOrder(order);
}
```
**优点**
- 实现简单
- 数据库保证唯一性
**缺点**
- 依赖数据库
- 高并发下性能较差(频繁抛异常)
---
##### **方案 2Redis 分布式锁 + 唯一标识**
**原理**:为每条消息设置唯一 ID如 UUID消费前先检查 Redis 是否已处理。
```java
public void consumeMessage(Message message) {
String messageId = message.getHeaders().get("messageId");
String lockKey = "message:processed:" + messageId;
// 尝试获取锁
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS);
if (!locked) {
// 消息已处理,跳过
log.warn("重复消息: {}", messageId);
return;
}
try {
// 处理消息
processMessage(message);
} catch (Exception e) {
// 处理失败,删除锁(允许重试)
redisTemplate.delete(lockKey);
throw e;
}
}
```
**优点**
- 性能高Redis 内存操作)
- 可设置过期时间(自动清理)
**缺点**
- 依赖 Redis
- Redis 宕机可能导致问题
---
##### **方案 3状态机**
**场景**:订单状态流转
```sql
-- 订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
status VARCHAR(20), -- CREATED, PAID, SHIPPED, COMPLETED
version INT, -- 乐观锁版本号
UNIQUE KEY id_status (id, status) -- 联合唯一索引
);
```
```java
@Transactional
public void updateOrderStatus(Long orderId, String fromStatus, String toStatus) {
// 使用 CASCompare And Set更新状态
int updated = orderMapper.updateStatus(orderId, fromStatus, toStatus);
if (updated == 0) {
// 状态不匹配,可能是重复消息
log.warn("订单状态已变更: orderId={}, fromStatus={}, toStatus={}",
orderId, fromStatus, toStatus);
return;
}
// 处理后续逻辑
processStatusChange(orderId, toStatus);
}
// Mapper
@Update("UPDATE orders SET status = #{toStatus}, version = version + 1 " +
"WHERE id = #{orderId} AND status = #{fromStatus}")
int updateStatus(@Param("orderId") Long orderId,
@Param("fromStatus") String fromStatus,
@Param("toStatus") String toStatus);
```
**优点**
- 状态机保证幂等性
- 支持复杂业务逻辑
**缺点**
- 设计复杂
- 需要明确状态流转
---
##### **方案 4去重表**
**原理**:创建专门的去重表,记录已处理的消息 ID。
```sql
-- 去重表
CREATE TABLE message_dedup (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
topic VARCHAR(64),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_message_id (message_id)
);
```
```java
@Transactional
public void consumeMessage(Message message) {
String messageId = message.getId();
// 插入去重表
try {
dedupMapper.insert(new MessageDedup(messageId, message.getTopic()));
} catch (DuplicateKeyException e) {
// 消息已处理,跳过
log.warn("重复消息: {}", messageId);
return;
}
// 处理消息
processMessage(message);
}
```
**优点**
- 简单直观
- 可查询处理历史
**缺点**
- 额外的存储开销
- 需要定期清理历史数据
---
##### **方案 5多版本并发控制MVCC**
**原理**:为数据增加版本号,更新时检查版本号。
```sql
-- 订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
amount DECIMAL(10,2),
version INT DEFAULT 0, -- 版本号
updated_at DATETIME
);
```
```java
@Transactional
public void updateOrderAmount(Long orderId, BigDecimal newAmount) {
// 使用乐观锁更新
int updated = orderMapper.updateAmountWithVersion(orderId, newAmount);
if (updated == 0) {
// 版本号不匹配,可能是重复消息
log.warn("订单已被更新: orderId={}", orderId);
return;
}
// 处理后续逻辑
processAmountChange(orderId, newAmount);
}
// Mapper
@Update("UPDATE orders SET amount = #{amount}, version = version + 1 " +
"WHERE id = #{orderId} AND version = #{version}")
int updateAmountWithVersion(@Param("orderId") Long orderId,
@Param("amount") BigDecimal amount,
@Param("version") Integer version);
```
**优点**
- 无锁,性能高
- 支持高并发
**缺点**
- 更新失败时需要重试
- 不适合写冲突频繁的场景
---
#### **幂等性方案对比**
| 方案 | 性能 | 复杂度 | 可靠性 | 适用场景 |
|------|------|--------|--------|----------|
| 数据库唯一约束 | 低 | 低 | 高 | 低并发、简单场景 |
| Redis 分布式锁 | 高 | 中 | 中 | 高并发、通用场景 |
| 状态机 | 中 | 高 | 高 | 订单、工作流等 |
| 去重表 | 中 | 低 | 高 | 需要记录历史的场景 |
| MVCC | 高 | 中 | 高 | 高并发、读多写少 |
---
### 4. 保证消息顺序性
#### **为什么消息会乱序?**
**场景**:一个 Topic 有多个 Queue分区多个消费者并发消费。
```
Topic: OrderTopic
├─ Queue1: 消息1, 消息4, 消息7
├─ Queue2: 消息2, 消息5, 消息8
└─ Queue3: 消息3, 消息6, 消息9
消费者1消费Queue1消费者2消费Queue2消费者3消费Queue3
└─ 消费速度不同,导致消息乱序
```
---
#### **解决方案**
##### **方案 1单分区单 Queue**
**原理**:一个 Topic 只有一个 Queue保证消息按顺序消费。
```java
// RocketMQ 生产者:发送到同一个 Queue
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 始终选择第一个 Queue
return mqs.get(0);
}
}, null);
// RocketMQ 消费者:单线程消费
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
```
**Kafka 配置**
```properties
# Topic 只有 1 个分区
num.partitions=1
```
**优点**
- 实现简单
- 严格保证顺序
**缺点**
- 性能差(无法并行消费)
- 成为瓶颈
**适用场景**
- 低并发、对顺序要求极高的场景
---
##### **方案 2按业务 Key 分区**
**原理**:相同业务 Key 的消息发送到同一个 Queue。
**场景**:订单号相同的消息需要顺序消费。
**RocketMQ 示例**
```java
// 生产者:按订单 ID 分区
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
// 相同订单 ID 的消息发送到同一个 Queue
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
// 消费者:单线程消费每个 Queue或至少保证相同订单的消息顺序处理
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// MessageListenerOrderly 保证顺序消费
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
```
**Kafka 示例**
```java
// 生产者:按订单 ID 分区
Properties props = new Properties();
props.put("partitioner.class", "com.example.OrderPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息(相同订单 ID 的消息会发送到同一个分区)
producer.send(new ProducerRecord<>("OrderTopic", orderId, message));
// 自定义分区器
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Long orderId = Long.parseLong(key.toString());
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return (int) (orderId % partitions.size());
}
}
```
**优点**
- 相同业务 Key 的消息保证顺序
- 不同业务 Key 的消息可并行消费
- 性能较好
**缺点**
- 需要选择合适的分区 Key
- 分区数量固定,扩容困难
---
##### **方案 3单线程消费 + 内存队列**
**原理**:消费者内部按业务 Key 分组,使用多个内存队列 + 多个线程处理。
**架构图**
```
MQ 消费者(单线程拉取)
按 Key 分组
┌───────────┬───────────┬───────────┐
│ 内存队列1 │ 内存队列2 │ 内存队列3 │
│ (订单100) │ (订单200) │ (订单300) │
└───────────┴───────────┴───────────┘
↓ ↓ ↓
工作线程1 工作线程2 工作线程3
```
**代码示例**
```java
@Component
public class OrderMessageConsumer {
private final Map<Long, BlockingQueue<Message>> keyQueues = new ConcurrentHashMap<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "order-group")
public void onMessage(MessageExt message) {
Long orderId = parseOrderId(message);
// 获取或创建该订单的内存队列
BlockingQueue<Message> queue = keyQueues.computeIfAbsent(orderId,
k -> new LinkedBlockingQueue<>()
);
// 消息放入队列
queue.offer(message);
// 提交异步任务处理
executorService.submit(() -> {
Message msg = queue.poll();
if (msg != null) {
processMessage(msg);
}
});
}
}
```
**优点**
- 保证相同 Key 的消息顺序
- 不同 Key 的消息并行处理
- 性能好
**缺点**
- 实现复杂
- 内存占用较高
---
#### **消息顺序性总结**
| 方案 | 性能 | 复杂度 | 顺序粒度 | 适用场景 |
|------|------|--------|----------|----------|
| 单分区 | 低 | 低 | 全局顺序 | 低并发、全局顺序 |
| 按 Key 分区 | 高 | 中 | Key 级顺序 | 订单、用户等业务 |
| 内存队列 | 高 | 高 | Key 级顺序 | 高并发、复杂业务 |
---
### 5. 消息积压处理
#### **消息积压的原因**
1. **消费者消费速度慢**
- 消费逻辑复杂
- 下游服务响应慢
- 数据库操作慢
2. **生产者发送速度过快**
- 突发流量
- 秒杀活动
3. **消费者故障**
- 消费者宕机
- 网络问题
4. **消费者消费失败**
- 消息格式错误
- 业务异常
- 无限重试
---
#### **监控消息积压**
**RocketMQ**
```bash
# 查看消费者堆积情况
sh mqadmin queryConsumeQueue -t OrderTopic -n localhost:9876
# 查看消费者组
sh mqadmin consumerList -g order-group -n localhost:9876
# 查看消费进度
sh mqadmin consumerProgress -g order-group -n localhost:9876
```
**输出示例**
```
# 消费进度
Topic: OrderTopic
Broker: broker-a
Queue: 0
Diff: 1000000 ← 积压 100 万条
LastOffset: 5000000
```
**Kafka**
```bash
# 查看消费者 lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-group --describe
# 输出
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
OrderTopic 0 5000000 6000000 1000000
积压 100
```
---
#### **解决方案**
##### **方案 1增加消费者数量横向扩容**
**原理**:增加消费者实例,提高消费能力。
**RocketMQ 示例**
```java
// 原来1 个消费者实例
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "order-group",
consumeThreadNumber = 1 // 1 个线程
)
// 扩容后10 个消费者实例(部署 10 个 Pod
// 每个 Pod 有 1 个线程,总消费能力 ×10
```
**注意事项**
- **消费者数量 ≤ Queue 数量**
- 如果消费者数量 > Queue 数量,部分消费者会空闲
**计算公式**
```
所需消费者数 = 消息积压量 / (单个消费者消费速度 × 预期清理时间)
示例:
积压 100 万条,单个消费者每秒消费 100 条,预期 1 小时清理
所需消费者数 = 1000000 / (100 × 3600) ≈ 3 个
```
---
##### **方案 2增加消费线程纵向扩容**
**RocketMQ 示例**
```java
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "order-group",
consumeThreadNumber = 20 // 从 1 增加到 20 个线程
)
```
**Kafka 示例**
```java
Properties props = new Properties();
// 增加消费线程数
props.put("max.poll.records", 500); // 每次拉取 500 条
props.put("max.poll.interval.ms", 300000); // 最大处理时间 5 分钟
```
**注意事项**
- 线程数不是越多越好CPU、数据库连接数限制
- 需要测试找到最优值
---
##### **方案 3优化消费逻辑**
**问题代码**
```java
// ❌ 在循环中逐条处理数据库
public void consumeMessage(List<Message> messages) {
for (Message msg : messages) {
// 逐条插入数据库(慢)
orderMapper.insert(parseOrder(msg));
}
}
```
**优化代码**
```java
// ✅ 批量处理数据库
public void consumeMessage(List<Message> messages) {
List<Order> orders = messages.stream()
.map(this::parseOrder)
.collect(Collectors.toList());
// 批量插入(快 10-100 倍)
orderMapper.batchInsert(orders);
}
```
**Mapper**
```java
@Insert({
"<script>",
"INSERT INTO orders (id, user_id, amount) VALUES ",
"<foreach collection='orders' item='order' separator=','>",
"(#{order.id}, #{order.userId}, #{order.amount})",
"</foreach>",
"</script>"
})
int batchInsert(@Param("orders") List<Order> orders);
```
---
##### **方案 4临时扩容方案**
**场景**:积压严重(如 1000 万条),正常消费需要几天。
**步骤**
1. **创建临时 Topic**(大量 Queue
```bash
# 创建 OrderTopic_Temp有 100 个 Queue
```
2. **编写转发程序**
```java
// 从原 Topic 消费,发送到临时 Topic
@Component
public class MessageForwarder {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "forward-group")
public void onMessage(MessageExt message) {
// 转发到临时 Topic
rocketMQTemplate.send("OrderTopic_Temp", message);
}
}
```
3. **部署大量消费者**消费临时 Topic
```java
// 部署 100 个消费者实例,每个消费 1 个 Queue
@RocketMQMessageListener(
topic = "OrderTopic_Temp",
consumerGroup = "temp-consumer-group"
)
```
4. **积压清理完毕后,恢复正常架构**
---
##### **方案 5丢弃部分消息**
**场景**
- 消息积压过于严重(如 1 亿条)
- 消息时效性要求高(如实时日志)
- 允许部分数据丢失
**实现**
```java
// 1. 创建临时消费者,只消费不处理(快速跳过)
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "skip-group"
)
public void onMessage(MessageExt message) {
// 直接返回成功,不处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 2. 跳到指定 offset
consumer.seekToBegin(); // 跳到最前
consumer.seek(queue, 10000000); // 跳到第 1000 万条
```
---
#### **消息积压处理流程**
```
1. 监控发现积压
2. 分析积压原因
├─ 消费者慢 → 优化消费逻辑
├─ 消费者少 → 横向扩容
├─ 生产者快 → 限流
└─ 消费失败 → 修复 bug跳过错误消息
3. 执行扩容或优化
4. 监控清理进度
5. 恢复正常架构
```
---
### 6. RocketMQ vs Kafka
#### **核心特性对比**
| 特性 | RocketMQ | Kafka |
|------|----------|-------|
| **开发语言** | Java | Scala/Java |
| **协议** | 自有协议 | TCP |
| **消息模型** | 队列模型 | 发布-订阅模型 |
| **消息顺序** | 支持严格顺序 | 分区内有序 |
| **事务消息** | ✅ 支持 | ❌ 不支持KIP-98 有提案) |
| **定时消息** | ✅ 支持 | ❌ 不支持 |
| **延迟消息** | ✅ 支持18 级) | ❌ 不支持 |
| **消息回溯** | ✅ 支持 | ✅ 支持(需设置) |
| **消息过滤** | SQL92 表达式 | 客户端过滤 |
| **吞吐量** | 万级/单机 | 十万级/单机 |
| **延迟** | ms 级 | ms 级 |
| **可靠性** | 高(同步刷盘 + 主从) | 高(多副本) |
| **运维复杂度** | 中 | 中 |
| **社区活跃度** | 中(阿里主导) | 高Confluent 支持) |
| **文档质量** | 中(中文文档多) | 高 |
| **适用场景** | 业务消息、订单、金融 | 日志收集、流式处理 |
---
#### **详细对比**
##### **1. 消息模型**
**RocketMQ**
```
Topic: OrderTopic
├─ Queue1
├─ Queue2
├─ Queue3
└─ Queue4
消费者组order-group
├─ 消费者1 → Queue1, Queue2
└─ 消费者2 → Queue3, Queue4
一条消息只能被同一个消费者组的一个消费者消费
```
**Kafka**
```
Topic: OrderTopic
├─ Partition1
│ ├─ Replica1 (Leader)
│ ├─ Replica2 (Follower)
│ └─ Replica3 (Follower)
├─ Partition2
│ ├─ Replica1 (Leader)
│ ├─ Replica2 (Follower)
│ └─ Replica3 (Follower)
└─ ...
消费者组order-group
├─ 消费者1 → Partition1
└─ 消费者2 → Partition2
一条消息只能被同一个消费者组的一个消费者消费
```
**区别**
- RocketMQ 的 Queue 是逻辑概念
- Kafka 的 Partition 是物理概念(有副本)
---
##### **2. 消息存储**
**RocketMQ**
```
CommitLog:
├─ 所有消息顺序写入(混合存储)
├─ 顺序写,性能高
└─ 通过 ConsumeQueue 索引快速查找
ConsumeQueue索引文件:
├─ 消息 1: CommitLog 偏移量 100, 大小 200, TagsCode
├─ 消息 2: CommitLog 偏移量 300, 大小 150, TagsCode
└─ 消息 3: CommitLog 偏移量 450, 大小 180, TagsCode
```
**Kafka**
```
Partition:
├─ Segment 0
│ ├─ 00000000000000000000.log (消息数据)
│ ├─ 00000000000000000000.index (索引)
│ └─ 00000000000000000000.timeindex (时间索引)
├─ Segment 1
│ ├─ 00000000000000050000.log
│ ├─ 00000000000000050000.index
│ └─ 00000000000000050000.timeindex
└─ ...
每个 Partition 独立存储
```
**对比**
- RocketMQ所有消息混合存储通过索引文件查找
- Kafka每个 Partition 独立存储,支持 Segment 滚动
---
##### **3. 消息过滤**
**RocketMQ**(服务端过滤):
```java
// 消费者订阅时指定过滤条件
consumer.subscribe("OrderTopic", MessageSelector.bySql("amount > 100 AND status = 'PAID'"));
// 生产者发送消息时设置属性
Message msg = new Message("OrderTopic", "Order", body);
msg.putUserProperty("amount", "200");
msg.putUserProperty("status", "PAID");
producer.send(msg);
```
**Kafka**(客户端过滤):
```java
// 消费者消费后手动过滤
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("OrderTopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 客户端过滤(浪费网络带宽)
if (record.value().contains("amount:200")) {
processMessage(record);
}
}
}
```
**区别**
- RocketMQ服务端过滤节省带宽
- Kafka客户端过滤浪费带宽
---
##### **4. 事务消息**
**RocketMQ**
```java
// 1. 发送半消息
Message msg = new Message("OrderTopic", orderJson.getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
// 2. 执行本地事务(如创建订单)
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
}
}
// 3. 提交/回滚消息
// 消费者可以正常消费
```
**Kafka**
```
❌ 不支持事务消息KIP-98 有提案,但未正式发布)
✅ 支持"精确一次"语义Exactly-Once但仅限于流式处理Kafka Streams
```
---
##### **5. 延迟消息**
**RocketMQ**
```java
// 18 个默认延迟级别1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message msg = new Message("OrderTopic", body);
msg.setDelayTimeLevel(3); // 延迟 10 秒
producer.send(msg);
```
**Kafka**
```java
// ❌ 不支持延迟消息
// ✅ 通过时间轮自定义实现
```
---
##### **6. 吞吐量**
**测试环境**
- 机器4 核 8G
- 消息大小1 KB
**结果**
| MQ | 单机吞吐量 | 延迟 |
|----|-----------|------|
| RocketMQ | 10-15 万/秒 | 1-5 ms |
| Kafka | 30-50 万/秒 | 1-5 ms |
**原因**
- Kafka零拷贝sendfile、批量处理、更好的磁盘顺序写
- RocketMQ也有零拷贝但 Java GC 影响性能
---
#### **选型建议**
##### **选择 RocketMQ 的场景**
1. **需要事务消息**
- 订单系统(下单 → 扣库存)
- 支付系统(支付 → 通知)
2. **需要延迟消息**
- 定时任务
- 超时取消订单
3. **需要严格的消息顺序**
- 金融交易
- 订单状态流转
4. **需要消息回溯**
- 数据恢复
- 数据重放
5. **业务复杂、需要高可靠性**
- 金融、支付、电商
---
##### **选择 Kafka 的场景**
1. **日志收集**
- 应用日志 → ELK
- 用户行为日志
2. **流式处理**
- 实时数据分析
- 实时推荐
3. **高吞吐量场景**
- 日志收集(百万级/秒)
- 监控数据收集
4. **已有 Kafka 生态**
- 使用 Flink、Spark Streaming 等流式计算框架
---
#### **企业级选型案例**
| 公司 | 场景 | 选择 | 原因 |
|------|------|------|------|
| 阿里巴巴 | 订单、支付 | RocketMQ | 事务消息、高可靠性 |
| 美团 | 订单、物流 | RocketMQ | 业务复杂、延迟消息 |
| 字节跳动 | 日志、推荐 | Kafka | 高吞吐、流式处理 |
| 携程 | 订单、日志 | 两者都用 | 订单用 RocketMQ日志用 Kafka |
| 京东 | 订单、支付 | RocketMQ | 事务消息、高可靠性 |
---
### 7. 总结
#### **消息队列核心问题**
| 问题 | 解决方案 |
|------|----------|
| **消息不丢失** | 生产者(同步发送、事务消息、本地消息表)<br>Broker持久化、主从、多副本<br>消费者(手动提交 offset、死信队列 |
| **消息重复消费** | 数据库唯一约束、Redis 分布式锁、状态机、去重表、MVCC |
| **消息顺序性** | 单分区、按 Key 分区、内存队列 |
| **消息积压** | 横向扩容、纵向扩容、优化消费逻辑、临时 Topic、丢弃消息 |
| **高可用** | 集群部署、主从复制、多副本、故障自动转移 |
---
### 8. 阿里 P7 加分项
**深度理解**
- 理解 RocketMQ 的 CommitLog 和 ConsumeQueue 设计
- 理解 Kafka 的 Partition、Segment、副本同步机制
- 理解零拷贝sendfile、mmap 等底层技术
**实战经验**
- 有将消息积压从千万级清理到正常的经验
- 有处理消息丢失、重复消费的线上故障经验
- 有消息队列性能调优经验JVM 参数、操作系统参数)
**架构能力**
- 能设计跨数据中心的消息队列架构
- 能设计消息队列的监控和告警体系
- 有消息队列迁移经验(如从 ActiveMQ 迁移到 RocketMQ
**技术选型**
- 能根据业务特点选择合适的 MQ
- 有多种 MQ 混用的经验
- 了解 MQTT、AMQP 等其他消息协议