Organized 50 interview questions into 12 categories: - 01-分布式系统 (9 files): 分布式事务, 分布式锁, 一致性哈希, CAP理论, etc. - 02-数据库 (2 files): MySQL索引优化, MyBatis核心原理 - 03-缓存 (5 files): Redis数据结构, 缓存问题, LRU算法, etc. - 04-消息队列 (1 file): RocketMQ/Kafka - 05-并发编程 (4 files): 线程池, 设计模式, 限流策略, etc. - 06-JVM (1 file): JVM和垃圾回收 - 07-系统设计 (8 files): 秒杀系统, 短链接, IM, Feed流, etc. - 08-算法与数据结构 (4 files): B+树, 红黑树, 跳表, 时间轮 - 09-网络与安全 (3 files): TCP/IP, 加密安全, 性能优化 - 10-中间件 (4 files): Spring Boot, Nacos, Dubbo, Nginx - 11-运维 (4 files): Kubernetes, CI/CD, Docker, 可观测性 - 12-面试技巧 (1 file): 面试技巧和职业规划 All files renamed to Chinese for better accessibility and organized into categorized folders for easier navigation. Generated with [Claude Code](https://claude.com/claude-code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
1492 lines
37 KiB
Markdown
1492 lines
37 KiB
Markdown
# 消息队列(RocketMQ/Kafka)
|
||
|
||
## 问题
|
||
|
||
**背景**:消息队列是分布式系统的核心组件,用于解耦、异步、削峰填谷。
|
||
|
||
**问题**:
|
||
1. 消息队列的作用是什么?在什么场景下使用?
|
||
2. 如何保证消息不丢失?(生产者、Broker、消费者三个层面)
|
||
3. 如何保证消息不重复消费(幂等性)?
|
||
4. 如何保证消息的顺序性?
|
||
5. 什么是消息积压?如何处理?
|
||
6. RocketMQ 和 Kafka 的区别是什么?如何选型?
|
||
|
||
---
|
||
|
||
## 标准答案
|
||
|
||
### 1. 消息队列的作用
|
||
|
||
#### **核心作用**
|
||
|
||
##### **1. 解耦(Decoupling)**
|
||
|
||
**场景**:用户注册成功后,需要发送欢迎邮件、发送短信、发放优惠券。
|
||
|
||
**不使用 MQ**:
|
||
```
|
||
用户服务
|
||
↓
|
||
调用邮件服务 ✉️
|
||
调用短信服务 📱
|
||
调用优惠券服务 🎫
|
||
```
|
||
|
||
**问题**:
|
||
- 用户服务依赖所有下游服务
|
||
- 任何一个下游服务故障都会影响注册流程
|
||
- 新增需求需要修改用户服务代码
|
||
|
||
**使用 MQ**:
|
||
```
|
||
用户服务 → 发送消息到 MQ ← 邮件服务(订阅)
|
||
← 短信服务(订阅)
|
||
← 优惠券服务(订阅)
|
||
← 新服务(随时订阅)
|
||
```
|
||
|
||
**优点**:
|
||
- 用户服务不需要知道下游服务的存在
|
||
- 下游服务故障不影响用户注册
|
||
- 新增服务只需订阅消息,无需修改用户服务
|
||
|
||
---
|
||
|
||
##### **2. 异步(Asynchronous)**
|
||
|
||
**场景**:用户下单后,需要:
|
||
1. 扣减库存
|
||
2. 生成订单
|
||
3. 通知物流
|
||
4. 发送短信
|
||
5. 积分处理
|
||
|
||
**同步调用(串行)**:
|
||
```
|
||
总耗时 = 50ms + 100ms + 80ms + 50ms + 60ms = 340ms
|
||
```
|
||
|
||
**异步调用(MQ)**:
|
||
```
|
||
下单服务 → MQ(5ms)
|
||
└─ 返回给用户(总耗时:55ms)
|
||
|
||
后台异步处理:
|
||
├─ 库存服务(50ms)
|
||
├─ 订单服务(100ms)
|
||
├─ 物流服务(80ms)
|
||
├─ 短信服务(50ms)
|
||
└─ 积分服务(60ms)
|
||
```
|
||
|
||
**优点**:
|
||
- 响应时间从 340ms 降低到 55ms
|
||
- 用户体验提升
|
||
|
||
---
|
||
|
||
##### **3. 削峰填谷(Peak Shaving)**
|
||
|
||
**场景**:秒杀活动、双11大促
|
||
|
||
**不使用 MQ**:
|
||
```
|
||
瞬时 QPS:50000
|
||
数据库最大 QPS:5000
|
||
|
||
结果:数据库被打挂 ❌
|
||
```
|
||
|
||
**使用 MQ**:
|
||
```
|
||
瞬时 QPS:50000 → MQ(缓冲)
|
||
└─ 按数据库能力消费(5000 QPS)
|
||
|
||
结果:数据库平稳运行 ✅
|
||
```
|
||
|
||
**图解**:
|
||
```
|
||
不使用 MQ:
|
||
请求量 ███████████████████████████████████████
|
||
↑
|
||
数据库被打挂
|
||
|
||
使用 MQ:
|
||
请求量 ███████████████████████████████████████
|
||
↓
|
||
MQ 缓冲
|
||
↓
|
||
消费量 ████████████████████████████████████████
|
||
↑ 平滑消费
|
||
```
|
||
|
||
---
|
||
|
||
#### **使用场景**
|
||
|
||
| 场景 | 说明 | 示例 |
|
||
|------|------|------|
|
||
| **异步处理** | 非核心流程异步化 | 注册后发送邮件、短信 |
|
||
| **系统解耦** | 降低系统耦合度 | 订单系统 ↔ 物流系统 |
|
||
| **流量削峰** | 应对突发流量 | 秒杀、大促 |
|
||
| **日志收集** | 分布式日志收集 | 应用日志 → ELK |
|
||
| **事件驱动** | 基于事件的架构 | 用户行为触发推荐算法 |
|
||
| **数据同步** | 跨数据库同步 | MySQL → Elasticsearch |
|
||
| **分布式事务** | 最终一致性 | 订单 → 库存(Saga 模式) |
|
||
|
||
---
|
||
|
||
#### **消息队列的缺点**
|
||
|
||
1. **系统复杂度增加**
|
||
- 需要维护 MQ 集群
|
||
- 需要处理消息丢失、重复、顺序等问题
|
||
|
||
2. **数据一致性**
|
||
- 无法保证强一致性,只能保证最终一致性
|
||
|
||
3. **可用性降低**
|
||
- MQ 成为单点故障(需要高可用集群)
|
||
|
||
4. **消息延迟**
|
||
- 异步处理导致消息延迟
|
||
|
||
---
|
||
|
||
### 2. 保证消息不丢失
|
||
|
||
#### **三个层面的保障**
|
||
|
||
```
|
||
生产者 → Broker → 消费者
|
||
↓ ↓ ↓
|
||
三个层面都要保证
|
||
```
|
||
|
||
---
|
||
|
||
#### **层面 1:生产者(Producer)**
|
||
|
||
##### **方案 1:同步发送 + 确认机制**
|
||
|
||
**RocketMQ 示例**:
|
||
```java
|
||
// 同步发送(默认)
|
||
Message message = new Message("TopicTest", "TagA", "OrderID001", "Hello MQ".getBytes());
|
||
|
||
// 同步发送:等待 Broker 确认
|
||
SendResult sendResult = producer.send(message);
|
||
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
|
||
// 发送成功
|
||
} else {
|
||
// 发送失败,重试或记录日志
|
||
log.error("消息发送失败: {}", sendResult);
|
||
}
|
||
```
|
||
|
||
**发送状态**:
|
||
- `SEND_OK`:发送成功
|
||
- `FLUSH_DISK_TIMEOUT`:刷盘超时
|
||
- `FLUSH_SLAVE_TIMEOUT`:同步到 Slave 超时
|
||
- `SLAVE_NOT_AVAILABLE`:Slave 不可用
|
||
|
||
---
|
||
|
||
##### **方案 2:异步发送 + 回调**
|
||
|
||
```java
|
||
producer.send(message, new SendCallback() {
|
||
@Override
|
||
public void onSuccess(SendResult sendResult) {
|
||
// 发送成功
|
||
log.info("消息发送成功: {}", sendResult);
|
||
}
|
||
|
||
@Override
|
||
public void onException(Throwable e) {
|
||
// 发送失败,重试或记录日志
|
||
log.error("消息发送失败", e);
|
||
}
|
||
});
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 3:事务消息(RocketMQ 独有)**
|
||
|
||
**原理**:
|
||
1. 发送半消息(Half Message,对消费者不可见)
|
||
2. 执行本地事务
|
||
3. 提交/回滚消息
|
||
|
||
```java
|
||
// 发送事务消息
|
||
TransactionMQProducer producer = new TransactionMQProducer("group1");
|
||
producer.setTransactionListener(new TransactionListener() {
|
||
@Override
|
||
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
|
||
// 执行本地事务(如:创建订单)
|
||
try {
|
||
orderService.createOrder(msg);
|
||
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
|
||
} catch (Exception e) {
|
||
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
|
||
}
|
||
}
|
||
|
||
@Override
|
||
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
|
||
// 反查本地事务状态(Broker 未收到确认时调用)
|
||
if (orderService.exists(msg.getKeys())) {
|
||
return LocalTransactionState.COMMIT_MESSAGE;
|
||
} else {
|
||
return LocalTransactionState.ROLLBACK_MESSAGE;
|
||
}
|
||
}
|
||
});
|
||
|
||
// 发送消息
|
||
producer.sendMessageInTransaction(message, null);
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 4:本地消息表**
|
||
|
||
**原理**:
|
||
1. 在同一本地事务中:完成业务操作 + 插入消息到本地消息表
|
||
2. 定时任务扫描消息表,发送消息到 MQ
|
||
3. 发送成功后更新消息状态
|
||
|
||
```java
|
||
@Transactional
|
||
public void createOrder(Order order) {
|
||
// 1. 保存订单
|
||
orderMapper.insert(order);
|
||
|
||
// 2. 保存消息到本地消息表
|
||
Message message = new Message("OrderTopic", JSON.toJSONString(order).getBytes());
|
||
localMessageMapper.insert(new LocalMessage(order.getId(), message));
|
||
}
|
||
|
||
// 定时任务:发送消息到 MQ
|
||
@Scheduled(fixedRate = 5000)
|
||
public void sendPendingMessages() {
|
||
List<LocalMessage> messages = localMessageMapper.findPendingMessages();
|
||
for (LocalMessage msg : messages) {
|
||
try {
|
||
SendResult result = producer.send(msg.getMessage());
|
||
if (result.getSendStatus() == SendStatus.SEND_OK) {
|
||
// 更新消息状态为已发送
|
||
localMessageMapper.markAsSent(msg.getId());
|
||
}
|
||
} catch (Exception e) {
|
||
log.error("消息发送失败: {}", msg.getId(), e);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**优点**:
|
||
- 可靠性极高(消息持久化到数据库)
|
||
- 支持重试
|
||
|
||
**缺点**:
|
||
- 需要维护本地消息表
|
||
- 实现复杂
|
||
|
||
---
|
||
|
||
#### **层面 2:Broker(消息服务器)**
|
||
|
||
##### **方案 1:消息持久化**
|
||
|
||
**RocketMQ**:
|
||
```xml
|
||
<!-- 刷盘方式:同步刷盘 -->
|
||
<flushDiskType>SYNC_FLUSH</flushDiskType>
|
||
|
||
<!-- 或异步刷盘(性能更高,但可能丢失少量消息) -->
|
||
<flushDiskType>ASYNC_FLUSH</flushDiskType>
|
||
```
|
||
|
||
**同步刷盘 vs 异步刷盘**:
|
||
| 方式 | 性能 | 可靠性 | 适用场景 |
|
||
|------|------|--------|----------|
|
||
| SYNC_FLUSH | 低(写入磁盘后才返回成功) | 高 | 金融、支付 |
|
||
| ASYNC_FLUSH | 高(写入内存后返回成功) | 中 | 日志、监控 |
|
||
|
||
---
|
||
|
||
##### **方案 2:主从复制(HA)**
|
||
|
||
**RocketMQ 配置**:
|
||
```xml
|
||
<!-- 同步复制:Master 收到消息后,同步到 Slave 才返回成功 -->
|
||
<brokerRole>SYNC_MASTER</brokerRole>
|
||
|
||
<!-- 异步复制:Master 收到消息后立即返回,异步复制到 Slave -->
|
||
<brokerRole>ASYNC_MASTER</brokerRole>
|
||
```
|
||
|
||
**复制方式对比**:
|
||
| 方式 | 性能 | 可靠性 | 宕机丢数据 |
|
||
|------|------|--------|------------|
|
||
| SYNC_MASTER | 低 | 高 | 否 |
|
||
| ASYNC_MASTER | 高 | 中 | 可能 |
|
||
|
||
**推荐配置**(金融级可靠性):
|
||
```xml
|
||
<flushDiskType>SYNC_FLUSH</flushDiskType>
|
||
<brokerRole>SYNC_MASTER</brokerRole>
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 3:多副本(Kafka)**
|
||
|
||
**Kafka 配置**:
|
||
```properties
|
||
# 副本数量(建议 ≥ 3)
|
||
default.replication.factor=3
|
||
|
||
# 最小同步副本数(生产者确认)
|
||
min.insync.replicas=2
|
||
|
||
# 不完全的选举首领(禁止)
|
||
unclean.leader.election.enable=false
|
||
```
|
||
|
||
**生产者配置**:
|
||
```java
|
||
Properties props = new Properties();
|
||
// 等待所有副本确认
|
||
props.put("acks", "all");
|
||
|
||
// 或至少等待主副本 + 1 个从副本确认
|
||
props.put("acks", "1");
|
||
```
|
||
|
||
**acks 参数**:
|
||
- `acks=0`:不等待确认(最快,可能丢失)
|
||
- `acks=1`:等待主副本确认(折中)
|
||
- `acks=all`(或 `-1`):等待所有副本确认(最可靠)
|
||
|
||
---
|
||
|
||
#### **层面 3:消费者(Consumer)**
|
||
|
||
##### **方案 1:手动提交 Offset**
|
||
|
||
**RocketMQ 示例**:
|
||
```java
|
||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
|
||
|
||
// 设置为手动提交
|
||
consumer.setConsumeMessageBatchMaxSize(1);
|
||
|
||
consumer.registerMessageListener(new MessageListenerConcurrently() {
|
||
@Override
|
||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
|
||
for (MessageExt msg : msgs) {
|
||
try {
|
||
// 处理消息
|
||
processMessage(msg);
|
||
|
||
// 处理成功,返回 CONSUME_SUCCESS(自动提交 offset)
|
||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||
} catch (Exception e) {
|
||
// 处理失败,稍后重试
|
||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||
}
|
||
}
|
||
}
|
||
});
|
||
```
|
||
|
||
---
|
||
|
||
**Kafka 示例**:
|
||
```java
|
||
Properties props = new Properties();
|
||
// 关闭自动提交
|
||
props.put("enable.auto.commit", "false");
|
||
|
||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||
|
||
while (true) {
|
||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||
|
||
for (ConsumerRecord<String, String> record : records) {
|
||
try {
|
||
// 处理消息
|
||
processMessage(record);
|
||
|
||
// 手动提交 offset
|
||
consumer.commitSync();
|
||
} catch (Exception e) {
|
||
// 处理失败,不提交 offset,下次重试
|
||
log.error("消息处理失败: {}", record.offset(), e);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 2:先处理消息,再提交 Offset**
|
||
|
||
```java
|
||
// 错误示例:先提交 offset,再处理消息
|
||
consumer.commitSync(); // ❌ 提交成功
|
||
processMessage(record); // ❌ 处理失败,消息丢失
|
||
|
||
// 正确示例:先处理消息,再提交 offset
|
||
processMessage(record); // ✅ 处理成功
|
||
consumer.commitSync(); // ✅ 提交 offset
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 3:死信队列(DLQ)**
|
||
|
||
**场景**:消息处理失败多次后,不再重试,放入死信队列。
|
||
|
||
**RocketMQ 示例**:
|
||
```java
|
||
consumer.registerMessageListener(new MessageListenerConcurrently() {
|
||
@Override
|
||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
|
||
for (MessageExt msg : msgs) {
|
||
int reconsumeTimes = msg.getReconsumeTimes();
|
||
|
||
if (reconsumeTimes >= 3) {
|
||
// 重试 3 次后仍失败,放入死信队列
|
||
sendToDeadLetterQueue(msg);
|
||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||
}
|
||
|
||
try {
|
||
processMessage(msg);
|
||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||
} catch (Exception e) {
|
||
// 稍后重试
|
||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||
}
|
||
}
|
||
}
|
||
});
|
||
```
|
||
|
||
**Kafka 示例**:
|
||
```java
|
||
// 配置死信队列
|
||
Properties props = new Properties();
|
||
props.put("enable.deadletter", "true");
|
||
props.put("deadletter.queue.topic.prefix", "dlq.");
|
||
|
||
// 消费者处理失败时,发送到死信队列
|
||
if (retries >= 3) {
|
||
kafkaTemplate.send("dlq.MyTopic", record.value());
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
#### **总结:端到端的可靠投递**
|
||
|
||
```
|
||
生产者 Broker 消费者
|
||
↓ ↓ ↓
|
||
1. 同步发送 1. 持久化到磁盘 1. 手动提交 offset
|
||
2. 事务消息 2. 主从复制 2. 处理成功后才提交
|
||
3. 本地消息表 3. 多副本 3. 死信队列
|
||
```
|
||
|
||
---
|
||
|
||
### 3. 保证消息幂等性
|
||
|
||
#### **为什么会有重复消息?**
|
||
|
||
1. **生产者重复发送**:网络抖动导致生产者未收到 ACK,重试发送
|
||
2. **Broker 重复投递**:主从切换时,从副本尚未同步的消息被重复投递
|
||
3. **消费者重复消费**:消费成功后 offset 提交失败,下次重复消费
|
||
|
||
---
|
||
|
||
#### **幂等性解决方案**
|
||
|
||
##### **方案 1:数据库唯一约束**
|
||
|
||
**场景**:订单创建
|
||
|
||
```java
|
||
@Transactional
|
||
public void createOrder(Order order) {
|
||
// order_id 有唯一索引
|
||
try {
|
||
orderMapper.insert(order);
|
||
} catch (DuplicateKeyException e) {
|
||
// 重复订单,忽略
|
||
log.warn("重复订单: {}", order.getId());
|
||
return;
|
||
}
|
||
|
||
// 处理订单逻辑
|
||
processOrder(order);
|
||
}
|
||
```
|
||
|
||
**优点**:
|
||
- 实现简单
|
||
- 数据库保证唯一性
|
||
|
||
**缺点**:
|
||
- 依赖数据库
|
||
- 高并发下性能较差(频繁抛异常)
|
||
|
||
---
|
||
|
||
##### **方案 2:Redis 分布式锁 + 唯一标识**
|
||
|
||
**原理**:为每条消息设置唯一 ID(如 UUID),消费前先检查 Redis 是否已处理。
|
||
|
||
```java
|
||
public void consumeMessage(Message message) {
|
||
String messageId = message.getHeaders().get("messageId");
|
||
String lockKey = "message:processed:" + messageId;
|
||
|
||
// 尝试获取锁
|
||
Boolean locked = redisTemplate.opsForValue()
|
||
.setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS);
|
||
|
||
if (!locked) {
|
||
// 消息已处理,跳过
|
||
log.warn("重复消息: {}", messageId);
|
||
return;
|
||
}
|
||
|
||
try {
|
||
// 处理消息
|
||
processMessage(message);
|
||
} catch (Exception e) {
|
||
// 处理失败,删除锁(允许重试)
|
||
redisTemplate.delete(lockKey);
|
||
throw e;
|
||
}
|
||
}
|
||
```
|
||
|
||
**优点**:
|
||
- 性能高(Redis 内存操作)
|
||
- 可设置过期时间(自动清理)
|
||
|
||
**缺点**:
|
||
- 依赖 Redis
|
||
- Redis 宕机可能导致问题
|
||
|
||
---
|
||
|
||
##### **方案 3:状态机**
|
||
|
||
**场景**:订单状态流转
|
||
|
||
```sql
|
||
-- 订单表
|
||
CREATE TABLE orders (
|
||
id BIGINT PRIMARY KEY,
|
||
status VARCHAR(20), -- CREATED, PAID, SHIPPED, COMPLETED
|
||
version INT, -- 乐观锁版本号
|
||
UNIQUE KEY id_status (id, status) -- 联合唯一索引
|
||
);
|
||
```
|
||
|
||
```java
|
||
@Transactional
|
||
public void updateOrderStatus(Long orderId, String fromStatus, String toStatus) {
|
||
// 使用 CAS(Compare And Set)更新状态
|
||
int updated = orderMapper.updateStatus(orderId, fromStatus, toStatus);
|
||
|
||
if (updated == 0) {
|
||
// 状态不匹配,可能是重复消息
|
||
log.warn("订单状态已变更: orderId={}, fromStatus={}, toStatus={}",
|
||
orderId, fromStatus, toStatus);
|
||
return;
|
||
}
|
||
|
||
// 处理后续逻辑
|
||
processStatusChange(orderId, toStatus);
|
||
}
|
||
|
||
// Mapper
|
||
@Update("UPDATE orders SET status = #{toStatus}, version = version + 1 " +
|
||
"WHERE id = #{orderId} AND status = #{fromStatus}")
|
||
int updateStatus(@Param("orderId") Long orderId,
|
||
@Param("fromStatus") String fromStatus,
|
||
@Param("toStatus") String toStatus);
|
||
```
|
||
|
||
**优点**:
|
||
- 状态机保证幂等性
|
||
- 支持复杂业务逻辑
|
||
|
||
**缺点**:
|
||
- 设计复杂
|
||
- 需要明确状态流转
|
||
|
||
---
|
||
|
||
##### **方案 4:去重表**
|
||
|
||
**原理**:创建专门的去重表,记录已处理的消息 ID。
|
||
|
||
```sql
|
||
-- 去重表
|
||
CREATE TABLE message_dedup (
|
||
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||
message_id VARCHAR(64) NOT NULL,
|
||
topic VARCHAR(64),
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
UNIQUE KEY uk_message_id (message_id)
|
||
);
|
||
```
|
||
|
||
```java
|
||
@Transactional
|
||
public void consumeMessage(Message message) {
|
||
String messageId = message.getId();
|
||
|
||
// 插入去重表
|
||
try {
|
||
dedupMapper.insert(new MessageDedup(messageId, message.getTopic()));
|
||
} catch (DuplicateKeyException e) {
|
||
// 消息已处理,跳过
|
||
log.warn("重复消息: {}", messageId);
|
||
return;
|
||
}
|
||
|
||
// 处理消息
|
||
processMessage(message);
|
||
}
|
||
```
|
||
|
||
**优点**:
|
||
- 简单直观
|
||
- 可查询处理历史
|
||
|
||
**缺点**:
|
||
- 额外的存储开销
|
||
- 需要定期清理历史数据
|
||
|
||
---
|
||
|
||
##### **方案 5:多版本并发控制(MVCC)**
|
||
|
||
**原理**:为数据增加版本号,更新时检查版本号。
|
||
|
||
```sql
|
||
-- 订单表
|
||
CREATE TABLE orders (
|
||
id BIGINT PRIMARY KEY,
|
||
amount DECIMAL(10,2),
|
||
version INT DEFAULT 0, -- 版本号
|
||
updated_at DATETIME
|
||
);
|
||
```
|
||
|
||
```java
|
||
@Transactional
|
||
public void updateOrderAmount(Long orderId, BigDecimal newAmount) {
|
||
// 使用乐观锁更新
|
||
int updated = orderMapper.updateAmountWithVersion(orderId, newAmount);
|
||
|
||
if (updated == 0) {
|
||
// 版本号不匹配,可能是重复消息
|
||
log.warn("订单已被更新: orderId={}", orderId);
|
||
return;
|
||
}
|
||
|
||
// 处理后续逻辑
|
||
processAmountChange(orderId, newAmount);
|
||
}
|
||
|
||
// Mapper
|
||
@Update("UPDATE orders SET amount = #{amount}, version = version + 1 " +
|
||
"WHERE id = #{orderId} AND version = #{version}")
|
||
int updateAmountWithVersion(@Param("orderId") Long orderId,
|
||
@Param("amount") BigDecimal amount,
|
||
@Param("version") Integer version);
|
||
```
|
||
|
||
**优点**:
|
||
- 无锁,性能高
|
||
- 支持高并发
|
||
|
||
**缺点**:
|
||
- 更新失败时需要重试
|
||
- 不适合写冲突频繁的场景
|
||
|
||
---
|
||
|
||
#### **幂等性方案对比**
|
||
|
||
| 方案 | 性能 | 复杂度 | 可靠性 | 适用场景 |
|
||
|------|------|--------|--------|----------|
|
||
| 数据库唯一约束 | 低 | 低 | 高 | 低并发、简单场景 |
|
||
| Redis 分布式锁 | 高 | 中 | 中 | 高并发、通用场景 |
|
||
| 状态机 | 中 | 高 | 高 | 订单、工作流等 |
|
||
| 去重表 | 中 | 低 | 高 | 需要记录历史的场景 |
|
||
| MVCC | 高 | 中 | 高 | 高并发、读多写少 |
|
||
|
||
---
|
||
|
||
### 4. 保证消息顺序性
|
||
|
||
#### **为什么消息会乱序?**
|
||
|
||
**场景**:一个 Topic 有多个 Queue(分区),多个消费者并发消费。
|
||
|
||
```
|
||
Topic: OrderTopic
|
||
├─ Queue1: 消息1, 消息4, 消息7
|
||
├─ Queue2: 消息2, 消息5, 消息8
|
||
└─ Queue3: 消息3, 消息6, 消息9
|
||
|
||
消费者1消费Queue1,消费者2消费Queue2,消费者3消费Queue3
|
||
└─ 消费速度不同,导致消息乱序
|
||
```
|
||
|
||
---
|
||
|
||
#### **解决方案**
|
||
|
||
##### **方案 1:单分区(单 Queue)**
|
||
|
||
**原理**:一个 Topic 只有一个 Queue,保证消息按顺序消费。
|
||
|
||
```java
|
||
// RocketMQ 生产者:发送到同一个 Queue
|
||
SendResult result = producer.send(message, new MessageQueueSelector() {
|
||
@Override
|
||
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
||
// 始终选择第一个 Queue
|
||
return mqs.get(0);
|
||
}
|
||
}, null);
|
||
|
||
// RocketMQ 消费者:单线程消费
|
||
consumer.setConsumeThreadMin(1);
|
||
consumer.setConsumeThreadMax(1);
|
||
```
|
||
|
||
**Kafka 配置**:
|
||
```properties
|
||
# Topic 只有 1 个分区
|
||
num.partitions=1
|
||
```
|
||
|
||
**优点**:
|
||
- 实现简单
|
||
- 严格保证顺序
|
||
|
||
**缺点**:
|
||
- 性能差(无法并行消费)
|
||
- 成为瓶颈
|
||
|
||
**适用场景**:
|
||
- 低并发、对顺序要求极高的场景
|
||
|
||
---
|
||
|
||
##### **方案 2:按业务 Key 分区**
|
||
|
||
**原理**:相同业务 Key 的消息发送到同一个 Queue。
|
||
|
||
**场景**:订单号相同的消息需要顺序消费。
|
||
|
||
**RocketMQ 示例**:
|
||
```java
|
||
// 生产者:按订单 ID 分区
|
||
SendResult result = producer.send(message, new MessageQueueSelector() {
|
||
@Override
|
||
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
||
Long orderId = (Long) arg;
|
||
// 相同订单 ID 的消息发送到同一个 Queue
|
||
int index = (int) (orderId % mqs.size());
|
||
return mqs.get(index);
|
||
}
|
||
}, orderId);
|
||
|
||
// 消费者:单线程消费每个 Queue(或至少保证相同订单的消息顺序处理)
|
||
consumer.registerMessageListener(new MessageListenerOrderly() {
|
||
@Override
|
||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
|
||
// MessageListenerOrderly 保证顺序消费
|
||
for (MessageExt msg : msgs) {
|
||
processMessage(msg);
|
||
}
|
||
return ConsumeOrderlyStatus.SUCCESS;
|
||
}
|
||
});
|
||
```
|
||
|
||
**Kafka 示例**:
|
||
```java
|
||
// 生产者:按订单 ID 分区
|
||
Properties props = new Properties();
|
||
props.put("partitioner.class", "com.example.OrderPartitioner");
|
||
|
||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||
|
||
// 发送消息(相同订单 ID 的消息会发送到同一个分区)
|
||
producer.send(new ProducerRecord<>("OrderTopic", orderId, message));
|
||
|
||
// 自定义分区器
|
||
public class OrderPartitioner implements Partitioner {
|
||
@Override
|
||
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
|
||
Long orderId = Long.parseLong(key.toString());
|
||
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
|
||
return (int) (orderId % partitions.size());
|
||
}
|
||
}
|
||
```
|
||
|
||
**优点**:
|
||
- 相同业务 Key 的消息保证顺序
|
||
- 不同业务 Key 的消息可并行消费
|
||
- 性能较好
|
||
|
||
**缺点**:
|
||
- 需要选择合适的分区 Key
|
||
- 分区数量固定,扩容困难
|
||
|
||
---
|
||
|
||
##### **方案 3:单线程消费 + 内存队列**
|
||
|
||
**原理**:消费者内部按业务 Key 分组,使用多个内存队列 + 多个线程处理。
|
||
|
||
**架构图**:
|
||
```
|
||
MQ 消费者(单线程拉取)
|
||
↓
|
||
按 Key 分组
|
||
↓
|
||
┌───────────┬───────────┬───────────┐
|
||
│ 内存队列1 │ 内存队列2 │ 内存队列3 │
|
||
│ (订单100) │ (订单200) │ (订单300) │
|
||
└───────────┴───────────┴───────────┘
|
||
↓ ↓ ↓
|
||
工作线程1 工作线程2 工作线程3
|
||
```
|
||
|
||
**代码示例**:
|
||
```java
|
||
@Component
|
||
public class OrderMessageConsumer {
|
||
|
||
private final Map<Long, BlockingQueue<Message>> keyQueues = new ConcurrentHashMap<>();
|
||
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||
|
||
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "order-group")
|
||
public void onMessage(MessageExt message) {
|
||
Long orderId = parseOrderId(message);
|
||
|
||
// 获取或创建该订单的内存队列
|
||
BlockingQueue<Message> queue = keyQueues.computeIfAbsent(orderId,
|
||
k -> new LinkedBlockingQueue<>()
|
||
);
|
||
|
||
// 消息放入队列
|
||
queue.offer(message);
|
||
|
||
// 提交异步任务处理
|
||
executorService.submit(() -> {
|
||
Message msg = queue.poll();
|
||
if (msg != null) {
|
||
processMessage(msg);
|
||
}
|
||
});
|
||
}
|
||
}
|
||
```
|
||
|
||
**优点**:
|
||
- 保证相同 Key 的消息顺序
|
||
- 不同 Key 的消息并行处理
|
||
- 性能好
|
||
|
||
**缺点**:
|
||
- 实现复杂
|
||
- 内存占用较高
|
||
|
||
---
|
||
|
||
#### **消息顺序性总结**
|
||
|
||
| 方案 | 性能 | 复杂度 | 顺序粒度 | 适用场景 |
|
||
|------|------|--------|----------|----------|
|
||
| 单分区 | 低 | 低 | 全局顺序 | 低并发、全局顺序 |
|
||
| 按 Key 分区 | 高 | 中 | Key 级顺序 | 订单、用户等业务 |
|
||
| 内存队列 | 高 | 高 | Key 级顺序 | 高并发、复杂业务 |
|
||
|
||
---
|
||
|
||
### 5. 消息积压处理
|
||
|
||
#### **消息积压的原因**
|
||
|
||
1. **消费者消费速度慢**:
|
||
- 消费逻辑复杂
|
||
- 下游服务响应慢
|
||
- 数据库操作慢
|
||
|
||
2. **生产者发送速度过快**:
|
||
- 突发流量
|
||
- 秒杀活动
|
||
|
||
3. **消费者故障**:
|
||
- 消费者宕机
|
||
- 网络问题
|
||
|
||
4. **消费者消费失败**:
|
||
- 消息格式错误
|
||
- 业务异常
|
||
- 无限重试
|
||
|
||
---
|
||
|
||
#### **监控消息积压**
|
||
|
||
**RocketMQ**:
|
||
```bash
|
||
# 查看消费者堆积情况
|
||
sh mqadmin queryConsumeQueue -t OrderTopic -n localhost:9876
|
||
|
||
# 查看消费者组
|
||
sh mqadmin consumerList -g order-group -n localhost:9876
|
||
|
||
# 查看消费进度
|
||
sh mqadmin consumerProgress -g order-group -n localhost:9876
|
||
```
|
||
|
||
**输出示例**:
|
||
```
|
||
# 消费进度
|
||
Topic: OrderTopic
|
||
Broker: broker-a
|
||
Queue: 0
|
||
Diff: 1000000 ← 积压 100 万条
|
||
LastOffset: 5000000
|
||
```
|
||
|
||
**Kafka**:
|
||
```bash
|
||
# 查看消费者 lag
|
||
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
|
||
--group order-group --describe
|
||
|
||
# 输出
|
||
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
|
||
OrderTopic 0 5000000 6000000 1000000
|
||
↑
|
||
积压 100 万
|
||
```
|
||
|
||
---
|
||
|
||
#### **解决方案**
|
||
|
||
##### **方案 1:增加消费者数量(横向扩容)**
|
||
|
||
**原理**:增加消费者实例,提高消费能力。
|
||
|
||
**RocketMQ 示例**:
|
||
```java
|
||
// 原来:1 个消费者实例
|
||
@RocketMQMessageListener(
|
||
topic = "OrderTopic",
|
||
consumerGroup = "order-group",
|
||
consumeThreadNumber = 1 // 1 个线程
|
||
)
|
||
|
||
// 扩容后:10 个消费者实例(部署 10 个 Pod)
|
||
// 每个 Pod 有 1 个线程,总消费能力 ×10
|
||
```
|
||
|
||
**注意事项**:
|
||
- **消费者数量 ≤ Queue 数量**
|
||
- 如果消费者数量 > Queue 数量,部分消费者会空闲
|
||
|
||
**计算公式**:
|
||
```
|
||
所需消费者数 = 消息积压量 / (单个消费者消费速度 × 预期清理时间)
|
||
|
||
示例:
|
||
积压 100 万条,单个消费者每秒消费 100 条,预期 1 小时清理
|
||
所需消费者数 = 1000000 / (100 × 3600) ≈ 3 个
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 2:增加消费线程(纵向扩容)**
|
||
|
||
**RocketMQ 示例**:
|
||
```java
|
||
@RocketMQMessageListener(
|
||
topic = "OrderTopic",
|
||
consumerGroup = "order-group",
|
||
consumeThreadNumber = 20 // 从 1 增加到 20 个线程
|
||
)
|
||
```
|
||
|
||
**Kafka 示例**:
|
||
```java
|
||
Properties props = new Properties();
|
||
// 增加消费线程数
|
||
props.put("max.poll.records", 500); // 每次拉取 500 条
|
||
props.put("max.poll.interval.ms", 300000); // 最大处理时间 5 分钟
|
||
```
|
||
|
||
**注意事项**:
|
||
- 线程数不是越多越好(CPU、数据库连接数限制)
|
||
- 需要测试找到最优值
|
||
|
||
---
|
||
|
||
##### **方案 3:优化消费逻辑**
|
||
|
||
**问题代码**:
|
||
```java
|
||
// ❌ 在循环中逐条处理数据库
|
||
public void consumeMessage(List<Message> messages) {
|
||
for (Message msg : messages) {
|
||
// 逐条插入数据库(慢)
|
||
orderMapper.insert(parseOrder(msg));
|
||
}
|
||
}
|
||
```
|
||
|
||
**优化代码**:
|
||
```java
|
||
// ✅ 批量处理数据库
|
||
public void consumeMessage(List<Message> messages) {
|
||
List<Order> orders = messages.stream()
|
||
.map(this::parseOrder)
|
||
.collect(Collectors.toList());
|
||
|
||
// 批量插入(快 10-100 倍)
|
||
orderMapper.batchInsert(orders);
|
||
}
|
||
```
|
||
|
||
**Mapper**:
|
||
```java
|
||
@Insert({
|
||
"<script>",
|
||
"INSERT INTO orders (id, user_id, amount) VALUES ",
|
||
"<foreach collection='orders' item='order' separator=','>",
|
||
"(#{order.id}, #{order.userId}, #{order.amount})",
|
||
"</foreach>",
|
||
"</script>"
|
||
})
|
||
int batchInsert(@Param("orders") List<Order> orders);
|
||
```
|
||
|
||
---
|
||
|
||
##### **方案 4:临时扩容方案**
|
||
|
||
**场景**:积压严重(如 1000 万条),正常消费需要几天。
|
||
|
||
**步骤**:
|
||
|
||
1. **创建临时 Topic**(大量 Queue):
|
||
```bash
|
||
# 创建 OrderTopic_Temp,有 100 个 Queue
|
||
```
|
||
|
||
2. **编写转发程序**:
|
||
```java
|
||
// 从原 Topic 消费,发送到临时 Topic
|
||
@Component
|
||
public class MessageForwarder {
|
||
|
||
@Autowired
|
||
private RocketMQTemplate rocketMQTemplate;
|
||
|
||
@RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "forward-group")
|
||
public void onMessage(MessageExt message) {
|
||
// 转发到临时 Topic
|
||
rocketMQTemplate.send("OrderTopic_Temp", message);
|
||
}
|
||
}
|
||
```
|
||
|
||
3. **部署大量消费者**消费临时 Topic:
|
||
```java
|
||
// 部署 100 个消费者实例,每个消费 1 个 Queue
|
||
@RocketMQMessageListener(
|
||
topic = "OrderTopic_Temp",
|
||
consumerGroup = "temp-consumer-group"
|
||
)
|
||
```
|
||
|
||
4. **积压清理完毕后,恢复正常架构**。
|
||
|
||
---
|
||
|
||
##### **方案 5:丢弃部分消息**
|
||
|
||
**场景**:
|
||
- 消息积压过于严重(如 1 亿条)
|
||
- 消息时效性要求高(如实时日志)
|
||
- 允许部分数据丢失
|
||
|
||
**实现**:
|
||
```java
|
||
// 1. 创建临时消费者,只消费不处理(快速跳过)
|
||
@RocketMQMessageListener(
|
||
topic = "OrderTopic",
|
||
consumerGroup = "skip-group"
|
||
)
|
||
public void onMessage(MessageExt message) {
|
||
// 直接返回成功,不处理消息
|
||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||
}
|
||
|
||
// 2. 跳到指定 offset
|
||
consumer.seekToBegin(); // 跳到最前
|
||
consumer.seek(queue, 10000000); // 跳到第 1000 万条
|
||
```
|
||
|
||
---
|
||
|
||
#### **消息积压处理流程**
|
||
|
||
```
|
||
1. 监控发现积压
|
||
↓
|
||
2. 分析积压原因
|
||
├─ 消费者慢 → 优化消费逻辑
|
||
├─ 消费者少 → 横向扩容
|
||
├─ 生产者快 → 限流
|
||
└─ 消费失败 → 修复 bug,跳过错误消息
|
||
↓
|
||
3. 执行扩容或优化
|
||
↓
|
||
4. 监控清理进度
|
||
↓
|
||
5. 恢复正常架构
|
||
```
|
||
|
||
---
|
||
|
||
### 6. RocketMQ vs Kafka
|
||
|
||
#### **核心特性对比**
|
||
|
||
| 特性 | RocketMQ | Kafka |
|
||
|------|----------|-------|
|
||
| **开发语言** | Java | Scala/Java |
|
||
| **协议** | 自有协议 | TCP |
|
||
| **消息模型** | 队列模型 | 发布-订阅模型 |
|
||
| **消息顺序** | 支持严格顺序 | 分区内有序 |
|
||
| **事务消息** | ✅ 支持 | ❌ 不支持(KIP-98 有提案) |
|
||
| **定时消息** | ✅ 支持 | ❌ 不支持 |
|
||
| **延迟消息** | ✅ 支持(18 级) | ❌ 不支持 |
|
||
| **消息回溯** | ✅ 支持 | ✅ 支持(需设置) |
|
||
| **消息过滤** | SQL92 表达式 | 客户端过滤 |
|
||
| **吞吐量** | 万级/单机 | 十万级/单机 |
|
||
| **延迟** | ms 级 | ms 级 |
|
||
| **可靠性** | 高(同步刷盘 + 主从) | 高(多副本) |
|
||
| **运维复杂度** | 中 | 中 |
|
||
| **社区活跃度** | 中(阿里主导) | 高(Confluent 支持) |
|
||
| **文档质量** | 中(中文文档多) | 高 |
|
||
| **适用场景** | 业务消息、订单、金融 | 日志收集、流式处理 |
|
||
|
||
---
|
||
|
||
#### **详细对比**
|
||
|
||
##### **1. 消息模型**
|
||
|
||
**RocketMQ**:
|
||
```
|
||
Topic: OrderTopic
|
||
├─ Queue1
|
||
├─ Queue2
|
||
├─ Queue3
|
||
└─ Queue4
|
||
|
||
消费者组:order-group
|
||
├─ 消费者1 → Queue1, Queue2
|
||
└─ 消费者2 → Queue3, Queue4
|
||
|
||
一条消息只能被同一个消费者组的一个消费者消费
|
||
```
|
||
|
||
**Kafka**:
|
||
```
|
||
Topic: OrderTopic
|
||
├─ Partition1
|
||
│ ├─ Replica1 (Leader)
|
||
│ ├─ Replica2 (Follower)
|
||
│ └─ Replica3 (Follower)
|
||
├─ Partition2
|
||
│ ├─ Replica1 (Leader)
|
||
│ ├─ Replica2 (Follower)
|
||
│ └─ Replica3 (Follower)
|
||
└─ ...
|
||
|
||
消费者组:order-group
|
||
├─ 消费者1 → Partition1
|
||
└─ 消费者2 → Partition2
|
||
|
||
一条消息只能被同一个消费者组的一个消费者消费
|
||
```
|
||
|
||
**区别**:
|
||
- RocketMQ 的 Queue 是逻辑概念
|
||
- Kafka 的 Partition 是物理概念(有副本)
|
||
|
||
---
|
||
|
||
##### **2. 消息存储**
|
||
|
||
**RocketMQ**:
|
||
```
|
||
CommitLog:
|
||
├─ 所有消息顺序写入(混合存储)
|
||
├─ 顺序写,性能高
|
||
└─ 通过 ConsumeQueue 索引快速查找
|
||
|
||
ConsumeQueue(索引文件):
|
||
├─ 消息 1: CommitLog 偏移量 100, 大小 200, TagsCode
|
||
├─ 消息 2: CommitLog 偏移量 300, 大小 150, TagsCode
|
||
└─ 消息 3: CommitLog 偏移量 450, 大小 180, TagsCode
|
||
```
|
||
|
||
**Kafka**:
|
||
```
|
||
Partition:
|
||
├─ Segment 0
|
||
│ ├─ 00000000000000000000.log (消息数据)
|
||
│ ├─ 00000000000000000000.index (索引)
|
||
│ └─ 00000000000000000000.timeindex (时间索引)
|
||
├─ Segment 1
|
||
│ ├─ 00000000000000050000.log
|
||
│ ├─ 00000000000000050000.index
|
||
│ └─ 00000000000000050000.timeindex
|
||
└─ ...
|
||
|
||
每个 Partition 独立存储
|
||
```
|
||
|
||
**对比**:
|
||
- RocketMQ:所有消息混合存储,通过索引文件查找
|
||
- Kafka:每个 Partition 独立存储,支持 Segment 滚动
|
||
|
||
---
|
||
|
||
##### **3. 消息过滤**
|
||
|
||
**RocketMQ**(服务端过滤):
|
||
```java
|
||
// 消费者订阅时指定过滤条件
|
||
consumer.subscribe("OrderTopic", MessageSelector.bySql("amount > 100 AND status = 'PAID'"));
|
||
|
||
// 生产者发送消息时设置属性
|
||
Message msg = new Message("OrderTopic", "Order", body);
|
||
msg.putUserProperty("amount", "200");
|
||
msg.putUserProperty("status", "PAID");
|
||
producer.send(msg);
|
||
```
|
||
|
||
**Kafka**(客户端过滤):
|
||
```java
|
||
// 消费者消费后手动过滤
|
||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||
consumer.subscribe(Arrays.asList("OrderTopic"));
|
||
|
||
while (true) {
|
||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||
for (ConsumerRecord<String, String> record : records) {
|
||
// 客户端过滤(浪费网络带宽)
|
||
if (record.value().contains("amount:200")) {
|
||
processMessage(record);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**区别**:
|
||
- RocketMQ:服务端过滤(节省带宽)
|
||
- Kafka:客户端过滤(浪费带宽)
|
||
|
||
---
|
||
|
||
##### **4. 事务消息**
|
||
|
||
**RocketMQ**:
|
||
```java
|
||
// 1. 发送半消息
|
||
Message msg = new Message("OrderTopic", orderJson.getBytes());
|
||
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
|
||
|
||
// 2. 执行本地事务(如创建订单)
|
||
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
|
||
try {
|
||
orderService.createOrder(msg);
|
||
return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
|
||
} catch (Exception e) {
|
||
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息
|
||
}
|
||
}
|
||
|
||
// 3. 提交/回滚消息
|
||
// 消费者可以正常消费
|
||
```
|
||
|
||
**Kafka**:
|
||
```
|
||
❌ 不支持事务消息(KIP-98 有提案,但未正式发布)
|
||
|
||
✅ 支持"精确一次"语义(Exactly-Once),但仅限于流式处理(Kafka Streams)
|
||
```
|
||
|
||
---
|
||
|
||
##### **5. 延迟消息**
|
||
|
||
**RocketMQ**:
|
||
```java
|
||
// 18 个默认延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
|
||
Message msg = new Message("OrderTopic", body);
|
||
msg.setDelayTimeLevel(3); // 延迟 10 秒
|
||
producer.send(msg);
|
||
```
|
||
|
||
**Kafka**:
|
||
```java
|
||
// ❌ 不支持延迟消息
|
||
|
||
// ✅ 通过时间轮自定义实现
|
||
```
|
||
|
||
---
|
||
|
||
##### **6. 吞吐量**
|
||
|
||
**测试环境**:
|
||
- 机器:4 核 8G
|
||
- 消息大小:1 KB
|
||
|
||
**结果**:
|
||
| MQ | 单机吞吐量 | 延迟 |
|
||
|----|-----------|------|
|
||
| RocketMQ | 10-15 万/秒 | 1-5 ms |
|
||
| Kafka | 30-50 万/秒 | 1-5 ms |
|
||
|
||
**原因**:
|
||
- Kafka:零拷贝(sendfile)、批量处理、更好的磁盘顺序写
|
||
- RocketMQ:也有零拷贝,但 Java GC 影响性能
|
||
|
||
---
|
||
|
||
#### **选型建议**
|
||
|
||
##### **选择 RocketMQ 的场景**
|
||
|
||
1. **需要事务消息**:
|
||
- 订单系统(下单 → 扣库存)
|
||
- 支付系统(支付 → 通知)
|
||
|
||
2. **需要延迟消息**:
|
||
- 定时任务
|
||
- 超时取消订单
|
||
|
||
3. **需要严格的消息顺序**:
|
||
- 金融交易
|
||
- 订单状态流转
|
||
|
||
4. **需要消息回溯**:
|
||
- 数据恢复
|
||
- 数据重放
|
||
|
||
5. **业务复杂、需要高可靠性**:
|
||
- 金融、支付、电商
|
||
|
||
---
|
||
|
||
##### **选择 Kafka 的场景**
|
||
|
||
1. **日志收集**:
|
||
- 应用日志 → ELK
|
||
- 用户行为日志
|
||
|
||
2. **流式处理**:
|
||
- 实时数据分析
|
||
- 实时推荐
|
||
|
||
3. **高吞吐量场景**:
|
||
- 日志收集(百万级/秒)
|
||
- 监控数据收集
|
||
|
||
4. **已有 Kafka 生态**:
|
||
- 使用 Flink、Spark Streaming 等流式计算框架
|
||
|
||
---
|
||
|
||
#### **企业级选型案例**
|
||
|
||
| 公司 | 场景 | 选择 | 原因 |
|
||
|------|------|------|------|
|
||
| 阿里巴巴 | 订单、支付 | RocketMQ | 事务消息、高可靠性 |
|
||
| 美团 | 订单、物流 | RocketMQ | 业务复杂、延迟消息 |
|
||
| 字节跳动 | 日志、推荐 | Kafka | 高吞吐、流式处理 |
|
||
| 携程 | 订单、日志 | 两者都用 | 订单用 RocketMQ,日志用 Kafka |
|
||
| 京东 | 订单、支付 | RocketMQ | 事务消息、高可靠性 |
|
||
|
||
---
|
||
|
||
### 7. 总结
|
||
|
||
#### **消息队列核心问题**
|
||
|
||
| 问题 | 解决方案 |
|
||
|------|----------|
|
||
| **消息不丢失** | 生产者(同步发送、事务消息、本地消息表)<br>Broker(持久化、主从、多副本)<br>消费者(手动提交 offset、死信队列) |
|
||
| **消息重复消费** | 数据库唯一约束、Redis 分布式锁、状态机、去重表、MVCC |
|
||
| **消息顺序性** | 单分区、按 Key 分区、内存队列 |
|
||
| **消息积压** | 横向扩容、纵向扩容、优化消费逻辑、临时 Topic、丢弃消息 |
|
||
| **高可用** | 集群部署、主从复制、多副本、故障自动转移 |
|
||
|
||
---
|
||
|
||
### 8. 阿里 P7 加分项
|
||
|
||
**深度理解**:
|
||
- 理解 RocketMQ 的 CommitLog 和 ConsumeQueue 设计
|
||
- 理解 Kafka 的 Partition、Segment、副本同步机制
|
||
- 理解零拷贝(sendfile)、mmap 等底层技术
|
||
|
||
**实战经验**:
|
||
- 有将消息积压从千万级清理到正常的经验
|
||
- 有处理消息丢失、重复消费的线上故障经验
|
||
- 有消息队列性能调优经验(JVM 参数、操作系统参数)
|
||
|
||
**架构能力**:
|
||
- 能设计跨数据中心的消息队列架构
|
||
- 能设计消息队列的监控和告警体系
|
||
- 有消息队列迁移经验(如从 ActiveMQ 迁移到 RocketMQ)
|
||
|
||
**技术选型**:
|
||
- 能根据业务特点选择合适的 MQ
|
||
- 有多种 MQ 混用的经验
|
||
- 了解 MQTT、AMQP 等其他消息协议
|