From 25e6afdcb72ac7e76797e192690f7be1129bad0c Mon Sep 17 00:00:00 2001 From: yasinshaw Date: Sat, 28 Feb 2026 21:10:27 +0800 Subject: [PATCH] feat: add message queue Q&A (RocketMQ/Kafka) --- questions/message-queue.md | 1491 ++++++++++++++++++++++++++++++++++++ 1 file changed, 1491 insertions(+) create mode 100644 questions/message-queue.md diff --git a/questions/message-queue.md b/questions/message-queue.md new file mode 100644 index 0000000..43d77d7 --- /dev/null +++ b/questions/message-queue.md @@ -0,0 +1,1491 @@ +# 消息队列(RocketMQ/Kafka) + +## 问题 + +**背景**:消息队列是分布式系统的核心组件,用于解耦、异步、削峰填谷。 + +**问题**: +1. 消息队列的作用是什么?在什么场景下使用? +2. 如何保证消息不丢失?(生产者、Broker、消费者三个层面) +3. 如何保证消息不重复消费(幂等性)? +4. 如何保证消息的顺序性? +5. 什么是消息积压?如何处理? +6. RocketMQ 和 Kafka 的区别是什么?如何选型? + +--- + +## 标准答案 + +### 1. 消息队列的作用 + +#### **核心作用** + +##### **1. 解耦(Decoupling)** + +**场景**:用户注册成功后,需要发送欢迎邮件、发送短信、发放优惠券。 + +**不使用 MQ**: +``` +用户服务 + ↓ +调用邮件服务 ✉️ +调用短信服务 📱 +调用优惠券服务 🎫 +``` + +**问题**: +- 用户服务依赖所有下游服务 +- 任何一个下游服务故障都会影响注册流程 +- 新增需求需要修改用户服务代码 + +**使用 MQ**: +``` +用户服务 → 发送消息到 MQ ← 邮件服务(订阅) + ← 短信服务(订阅) + ← 优惠券服务(订阅) + ← 新服务(随时订阅) +``` + +**优点**: +- 用户服务不需要知道下游服务的存在 +- 下游服务故障不影响用户注册 +- 新增服务只需订阅消息,无需修改用户服务 + +--- + +##### **2. 异步(Asynchronous)** + +**场景**:用户下单后,需要: +1. 扣减库存 +2. 生成订单 +3. 通知物流 +4. 发送短信 +5. 积分处理 + +**同步调用(串行)**: +``` +总耗时 = 50ms + 100ms + 80ms + 50ms + 60ms = 340ms +``` + +**异步调用(MQ)**: +``` +下单服务 → MQ(5ms) +└─ 返回给用户(总耗时:55ms) + +后台异步处理: +├─ 库存服务(50ms) +├─ 订单服务(100ms) +├─ 物流服务(80ms) +├─ 短信服务(50ms) +└─ 积分服务(60ms) +``` + +**优点**: +- 响应时间从 340ms 降低到 55ms +- 用户体验提升 + +--- + +##### **3. 削峰填谷(Peak Shaving)** + +**场景**:秒杀活动、双11大促 + +**不使用 MQ**: +``` +瞬时 QPS:50000 +数据库最大 QPS:5000 + +结果:数据库被打挂 ❌ +``` + +**使用 MQ**: +``` +瞬时 QPS:50000 → MQ(缓冲) +└─ 按数据库能力消费(5000 QPS) + +结果:数据库平稳运行 ✅ +``` + +**图解**: +``` +不使用 MQ: +请求量 ███████████████████████████████████████ + ↑ + 数据库被打挂 + +使用 MQ: +请求量 ███████████████████████████████████████ + ↓ + MQ 缓冲 + ↓ +消费量 ████████████████████████████████████████ + ↑ 平滑消费 +``` + +--- + +#### **使用场景** + +| 场景 | 说明 | 示例 | +|------|------|------| +| **异步处理** | 非核心流程异步化 | 注册后发送邮件、短信 | +| **系统解耦** | 降低系统耦合度 | 订单系统 ↔ 物流系统 | +| **流量削峰** | 应对突发流量 | 秒杀、大促 | +| **日志收集** | 分布式日志收集 | 应用日志 → ELK | +| **事件驱动** | 基于事件的架构 | 用户行为触发推荐算法 | +| **数据同步** | 跨数据库同步 | MySQL → Elasticsearch | +| **分布式事务** | 最终一致性 | 订单 → 库存(Saga 模式) | + +--- + +#### **消息队列的缺点** + +1. **系统复杂度增加** + - 需要维护 MQ 集群 + - 需要处理消息丢失、重复、顺序等问题 + +2. **数据一致性** + - 无法保证强一致性,只能保证最终一致性 + +3. **可用性降低** + - MQ 成为单点故障(需要高可用集群) + +4. **消息延迟** + - 异步处理导致消息延迟 + +--- + +### 2. 保证消息不丢失 + +#### **三个层面的保障** + +``` +生产者 → Broker → 消费者 + ↓ ↓ ↓ +三个层面都要保证 +``` + +--- + +#### **层面 1:生产者(Producer)** + +##### **方案 1:同步发送 + 确认机制** + +**RocketMQ 示例**: +```java +// 同步发送(默认) +Message message = new Message("TopicTest", "TagA", "OrderID001", "Hello MQ".getBytes()); + +// 同步发送:等待 Broker 确认 +SendResult sendResult = producer.send(message); +if (sendResult.getSendStatus() == SendStatus.SEND_OK) { + // 发送成功 +} else { + // 发送失败,重试或记录日志 + log.error("消息发送失败: {}", sendResult); +} +``` + +**发送状态**: +- `SEND_OK`:发送成功 +- `FLUSH_DISK_TIMEOUT`:刷盘超时 +- `FLUSH_SLAVE_TIMEOUT`:同步到 Slave 超时 +- `SLAVE_NOT_AVAILABLE`:Slave 不可用 + +--- + +##### **方案 2:异步发送 + 回调** + +```java +producer.send(message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + // 发送成功 + log.info("消息发送成功: {}", sendResult); + } + + @Override + public void onException(Throwable e) { + // 发送失败,重试或记录日志 + log.error("消息发送失败", e); + } +}); +``` + +--- + +##### **方案 3:事务消息(RocketMQ 独有)** + +**原理**: +1. 发送半消息(Half Message,对消费者不可见) +2. 执行本地事务 +3. 提交/回滚消息 + +```java +// 发送事务消息 +TransactionMQProducer producer = new TransactionMQProducer("group1"); +producer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + // 执行本地事务(如:创建订单) + try { + orderService.createOrder(msg); + return LocalTransactionState.COMMIT_MESSAGE; // 提交消息 + } catch (Exception e) { + return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息 + } + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + // 反查本地事务状态(Broker 未收到确认时调用) + if (orderService.exists(msg.getKeys())) { + return LocalTransactionState.COMMIT_MESSAGE; + } else { + return LocalTransactionState.ROLLBACK_MESSAGE; + } + } +}); + +// 发送消息 +producer.sendMessageInTransaction(message, null); +``` + +--- + +##### **方案 4:本地消息表** + +**原理**: +1. 在同一本地事务中:完成业务操作 + 插入消息到本地消息表 +2. 定时任务扫描消息表,发送消息到 MQ +3. 发送成功后更新消息状态 + +```java +@Transactional +public void createOrder(Order order) { + // 1. 保存订单 + orderMapper.insert(order); + + // 2. 保存消息到本地消息表 + Message message = new Message("OrderTopic", JSON.toJSONString(order).getBytes()); + localMessageMapper.insert(new LocalMessage(order.getId(), message)); +} + +// 定时任务:发送消息到 MQ +@Scheduled(fixedRate = 5000) +public void sendPendingMessages() { + List messages = localMessageMapper.findPendingMessages(); + for (LocalMessage msg : messages) { + try { + SendResult result = producer.send(msg.getMessage()); + if (result.getSendStatus() == SendStatus.SEND_OK) { + // 更新消息状态为已发送 + localMessageMapper.markAsSent(msg.getId()); + } + } catch (Exception e) { + log.error("消息发送失败: {}", msg.getId(), e); + } + } +} +``` + +**优点**: +- 可靠性极高(消息持久化到数据库) +- 支持重试 + +**缺点**: +- 需要维护本地消息表 +- 实现复杂 + +--- + +#### **层面 2:Broker(消息服务器)** + +##### **方案 1:消息持久化** + +**RocketMQ**: +```xml + +SYNC_FLUSH + + +ASYNC_FLUSH +``` + +**同步刷盘 vs 异步刷盘**: +| 方式 | 性能 | 可靠性 | 适用场景 | +|------|------|--------|----------| +| SYNC_FLUSH | 低(写入磁盘后才返回成功) | 高 | 金融、支付 | +| ASYNC_FLUSH | 高(写入内存后返回成功) | 中 | 日志、监控 | + +--- + +##### **方案 2:主从复制(HA)** + +**RocketMQ 配置**: +```xml + +SYNC_MASTER + + +ASYNC_MASTER +``` + +**复制方式对比**: +| 方式 | 性能 | 可靠性 | 宕机丢数据 | +|------|------|--------|------------| +| SYNC_MASTER | 低 | 高 | 否 | +| ASYNC_MASTER | 高 | 中 | 可能 | + +**推荐配置**(金融级可靠性): +```xml +SYNC_FLUSH +SYNC_MASTER +``` + +--- + +##### **方案 3:多副本(Kafka)** + +**Kafka 配置**: +```properties +# 副本数量(建议 ≥ 3) +default.replication.factor=3 + +# 最小同步副本数(生产者确认) +min.insync.replicas=2 + +# 不完全的选举首领(禁止) +unclean.leader.election.enable=false +``` + +**生产者配置**: +```java +Properties props = new Properties(); +// 等待所有副本确认 +props.put("acks", "all"); + +// 或至少等待主副本 + 1 个从副本确认 +props.put("acks", "1"); +``` + +**acks 参数**: +- `acks=0`:不等待确认(最快,可能丢失) +- `acks=1`:等待主副本确认(折中) +- `acks=all`(或 `-1`):等待所有副本确认(最可靠) + +--- + +#### **层面 3:消费者(Consumer)** + +##### **方案 1:手动提交 Offset** + +**RocketMQ 示例**: +```java +DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); + +// 设置为手动提交 +consumer.setConsumeMessageBatchMaxSize(1); + +consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + try { + // 处理消息 + processMessage(msg); + + // 处理成功,返回 CONSUME_SUCCESS(自动提交 offset) + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } catch (Exception e) { + // 处理失败,稍后重试 + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + } +}); +``` + +--- + +**Kafka 示例**: +```java +Properties props = new Properties(); +// 关闭自动提交 +props.put("enable.auto.commit", "false"); + +KafkaConsumer consumer = new KafkaConsumer<>(props); + +while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + try { + // 处理消息 + processMessage(record); + + // 手动提交 offset + consumer.commitSync(); + } catch (Exception e) { + // 处理失败,不提交 offset,下次重试 + log.error("消息处理失败: {}", record.offset(), e); + } + } +} +``` + +--- + +##### **方案 2:先处理消息,再提交 Offset** + +```java +// 错误示例:先提交 offset,再处理消息 +consumer.commitSync(); // ❌ 提交成功 +processMessage(record); // ❌ 处理失败,消息丢失 + +// 正确示例:先处理消息,再提交 offset +processMessage(record); // ✅ 处理成功 +consumer.commitSync(); // ✅ 提交 offset +``` + +--- + +##### **方案 3:死信队列(DLQ)** + +**场景**:消息处理失败多次后,不再重试,放入死信队列。 + +**RocketMQ 示例**: +```java +consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + int reconsumeTimes = msg.getReconsumeTimes(); + + if (reconsumeTimes >= 3) { + // 重试 3 次后仍失败,放入死信队列 + sendToDeadLetterQueue(msg); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + try { + processMessage(msg); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } catch (Exception e) { + // 稍后重试 + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + } +}); +``` + +**Kafka 示例**: +```java +// 配置死信队列 +Properties props = new Properties(); +props.put("enable.deadletter", "true"); +props.put("deadletter.queue.topic.prefix", "dlq."); + +// 消费者处理失败时,发送到死信队列 +if (retries >= 3) { + kafkaTemplate.send("dlq.MyTopic", record.value()); +} +``` + +--- + +#### **总结:端到端的可靠投递** + +``` +生产者 Broker 消费者 + ↓ ↓ ↓ +1. 同步发送 1. 持久化到磁盘 1. 手动提交 offset +2. 事务消息 2. 主从复制 2. 处理成功后才提交 +3. 本地消息表 3. 多副本 3. 死信队列 +``` + +--- + +### 3. 保证消息幂等性 + +#### **为什么会有重复消息?** + +1. **生产者重复发送**:网络抖动导致生产者未收到 ACK,重试发送 +2. **Broker 重复投递**:主从切换时,从副本尚未同步的消息被重复投递 +3. **消费者重复消费**:消费成功后 offset 提交失败,下次重复消费 + +--- + +#### **幂等性解决方案** + +##### **方案 1:数据库唯一约束** + +**场景**:订单创建 + +```java +@Transactional +public void createOrder(Order order) { + // order_id 有唯一索引 + try { + orderMapper.insert(order); + } catch (DuplicateKeyException e) { + // 重复订单,忽略 + log.warn("重复订单: {}", order.getId()); + return; + } + + // 处理订单逻辑 + processOrder(order); +} +``` + +**优点**: +- 实现简单 +- 数据库保证唯一性 + +**缺点**: +- 依赖数据库 +- 高并发下性能较差(频繁抛异常) + +--- + +##### **方案 2:Redis 分布式锁 + 唯一标识** + +**原理**:为每条消息设置唯一 ID(如 UUID),消费前先检查 Redis 是否已处理。 + +```java +public void consumeMessage(Message message) { + String messageId = message.getHeaders().get("messageId"); + String lockKey = "message:processed:" + messageId; + + // 尝试获取锁 + Boolean locked = redisTemplate.opsForValue() + .setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS); + + if (!locked) { + // 消息已处理,跳过 + log.warn("重复消息: {}", messageId); + return; + } + + try { + // 处理消息 + processMessage(message); + } catch (Exception e) { + // 处理失败,删除锁(允许重试) + redisTemplate.delete(lockKey); + throw e; + } +} +``` + +**优点**: +- 性能高(Redis 内存操作) +- 可设置过期时间(自动清理) + +**缺点**: +- 依赖 Redis +- Redis 宕机可能导致问题 + +--- + +##### **方案 3:状态机** + +**场景**:订单状态流转 + +```sql +-- 订单表 +CREATE TABLE orders ( + id BIGINT PRIMARY KEY, + status VARCHAR(20), -- CREATED, PAID, SHIPPED, COMPLETED + version INT, -- 乐观锁版本号 + UNIQUE KEY id_status (id, status) -- 联合唯一索引 +); +``` + +```java +@Transactional +public void updateOrderStatus(Long orderId, String fromStatus, String toStatus) { + // 使用 CAS(Compare And Set)更新状态 + int updated = orderMapper.updateStatus(orderId, fromStatus, toStatus); + + if (updated == 0) { + // 状态不匹配,可能是重复消息 + log.warn("订单状态已变更: orderId={}, fromStatus={}, toStatus={}", + orderId, fromStatus, toStatus); + return; + } + + // 处理后续逻辑 + processStatusChange(orderId, toStatus); +} + +// Mapper +@Update("UPDATE orders SET status = #{toStatus}, version = version + 1 " + + "WHERE id = #{orderId} AND status = #{fromStatus}") +int updateStatus(@Param("orderId") Long orderId, + @Param("fromStatus") String fromStatus, + @Param("toStatus") String toStatus); +``` + +**优点**: +- 状态机保证幂等性 +- 支持复杂业务逻辑 + +**缺点**: +- 设计复杂 +- 需要明确状态流转 + +--- + +##### **方案 4:去重表** + +**原理**:创建专门的去重表,记录已处理的消息 ID。 + +```sql +-- 去重表 +CREATE TABLE message_dedup ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + message_id VARCHAR(64) NOT NULL, + topic VARCHAR(64), + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_message_id (message_id) +); +``` + +```java +@Transactional +public void consumeMessage(Message message) { + String messageId = message.getId(); + + // 插入去重表 + try { + dedupMapper.insert(new MessageDedup(messageId, message.getTopic())); + } catch (DuplicateKeyException e) { + // 消息已处理,跳过 + log.warn("重复消息: {}", messageId); + return; + } + + // 处理消息 + processMessage(message); +} +``` + +**优点**: +- 简单直观 +- 可查询处理历史 + +**缺点**: +- 额外的存储开销 +- 需要定期清理历史数据 + +--- + +##### **方案 5:多版本并发控制(MVCC)** + +**原理**:为数据增加版本号,更新时检查版本号。 + +```sql +-- 订单表 +CREATE TABLE orders ( + id BIGINT PRIMARY KEY, + amount DECIMAL(10,2), + version INT DEFAULT 0, -- 版本号 + updated_at DATETIME +); +``` + +```java +@Transactional +public void updateOrderAmount(Long orderId, BigDecimal newAmount) { + // 使用乐观锁更新 + int updated = orderMapper.updateAmountWithVersion(orderId, newAmount); + + if (updated == 0) { + // 版本号不匹配,可能是重复消息 + log.warn("订单已被更新: orderId={}", orderId); + return; + } + + // 处理后续逻辑 + processAmountChange(orderId, newAmount); +} + +// Mapper +@Update("UPDATE orders SET amount = #{amount}, version = version + 1 " + + "WHERE id = #{orderId} AND version = #{version}") +int updateAmountWithVersion(@Param("orderId") Long orderId, + @Param("amount") BigDecimal amount, + @Param("version") Integer version); +``` + +**优点**: +- 无锁,性能高 +- 支持高并发 + +**缺点**: +- 更新失败时需要重试 +- 不适合写冲突频繁的场景 + +--- + +#### **幂等性方案对比** + +| 方案 | 性能 | 复杂度 | 可靠性 | 适用场景 | +|------|------|--------|--------|----------| +| 数据库唯一约束 | 低 | 低 | 高 | 低并发、简单场景 | +| Redis 分布式锁 | 高 | 中 | 中 | 高并发、通用场景 | +| 状态机 | 中 | 高 | 高 | 订单、工作流等 | +| 去重表 | 中 | 低 | 高 | 需要记录历史的场景 | +| MVCC | 高 | 中 | 高 | 高并发、读多写少 | + +--- + +### 4. 保证消息顺序性 + +#### **为什么消息会乱序?** + +**场景**:一个 Topic 有多个 Queue(分区),多个消费者并发消费。 + +``` +Topic: OrderTopic +├─ Queue1: 消息1, 消息4, 消息7 +├─ Queue2: 消息2, 消息5, 消息8 +└─ Queue3: 消息3, 消息6, 消息9 + +消费者1消费Queue1,消费者2消费Queue2,消费者3消费Queue3 +└─ 消费速度不同,导致消息乱序 +``` + +--- + +#### **解决方案** + +##### **方案 1:单分区(单 Queue)** + +**原理**:一个 Topic 只有一个 Queue,保证消息按顺序消费。 + +```java +// RocketMQ 生产者:发送到同一个 Queue +SendResult result = producer.send(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + // 始终选择第一个 Queue + return mqs.get(0); + } +}, null); + +// RocketMQ 消费者:单线程消费 +consumer.setConsumeThreadMin(1); +consumer.setConsumeThreadMax(1); +``` + +**Kafka 配置**: +```properties +# Topic 只有 1 个分区 +num.partitions=1 +``` + +**优点**: +- 实现简单 +- 严格保证顺序 + +**缺点**: +- 性能差(无法并行消费) +- 成为瓶颈 + +**适用场景**: +- 低并发、对顺序要求极高的场景 + +--- + +##### **方案 2:按业务 Key 分区** + +**原理**:相同业务 Key 的消息发送到同一个 Queue。 + +**场景**:订单号相同的消息需要顺序消费。 + +**RocketMQ 示例**: +```java +// 生产者:按订单 ID 分区 +SendResult result = producer.send(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Long orderId = (Long) arg; + // 相同订单 ID 的消息发送到同一个 Queue + int index = (int) (orderId % mqs.size()); + return mqs.get(index); + } +}, orderId); + +// 消费者:单线程消费每个 Queue(或至少保证相同订单的消息顺序处理) +consumer.registerMessageListener(new MessageListenerOrderly() { + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + // MessageListenerOrderly 保证顺序消费 + for (MessageExt msg : msgs) { + processMessage(msg); + } + return ConsumeOrderlyStatus.SUCCESS; + } +}); +``` + +**Kafka 示例**: +```java +// 生产者:按订单 ID 分区 +Properties props = new Properties(); +props.put("partitioner.class", "com.example.OrderPartitioner"); + +KafkaProducer producer = new KafkaProducer<>(props); + +// 发送消息(相同订单 ID 的消息会发送到同一个分区) +producer.send(new ProducerRecord<>("OrderTopic", orderId, message)); + +// 自定义分区器 +public class OrderPartitioner implements Partitioner { + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + Long orderId = Long.parseLong(key.toString()); + List partitions = cluster.partitionsForTopic(topic); + return (int) (orderId % partitions.size()); + } +} +``` + +**优点**: +- 相同业务 Key 的消息保证顺序 +- 不同业务 Key 的消息可并行消费 +- 性能较好 + +**缺点**: +- 需要选择合适的分区 Key +- 分区数量固定,扩容困难 + +--- + +##### **方案 3:单线程消费 + 内存队列** + +**原理**:消费者内部按业务 Key 分组,使用多个内存队列 + 多个线程处理。 + +**架构图**: +``` +MQ 消费者(单线程拉取) + ↓ +按 Key 分组 + ↓ +┌───────────┬───────────┬───────────┐ +│ 内存队列1 │ 内存队列2 │ 内存队列3 │ +│ (订单100) │ (订单200) │ (订单300) │ +└───────────┴───────────┴───────────┘ + ↓ ↓ ↓ + 工作线程1 工作线程2 工作线程3 +``` + +**代码示例**: +```java +@Component +public class OrderMessageConsumer { + + private final Map> keyQueues = new ConcurrentHashMap<>(); + private final ExecutorService executorService = Executors.newFixedThreadPool(10); + + @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "order-group") + public void onMessage(MessageExt message) { + Long orderId = parseOrderId(message); + + // 获取或创建该订单的内存队列 + BlockingQueue queue = keyQueues.computeIfAbsent(orderId, + k -> new LinkedBlockingQueue<>() + ); + + // 消息放入队列 + queue.offer(message); + + // 提交异步任务处理 + executorService.submit(() -> { + Message msg = queue.poll(); + if (msg != null) { + processMessage(msg); + } + }); + } +} +``` + +**优点**: +- 保证相同 Key 的消息顺序 +- 不同 Key 的消息并行处理 +- 性能好 + +**缺点**: +- 实现复杂 +- 内存占用较高 + +--- + +#### **消息顺序性总结** + +| 方案 | 性能 | 复杂度 | 顺序粒度 | 适用场景 | +|------|------|--------|----------|----------| +| 单分区 | 低 | 低 | 全局顺序 | 低并发、全局顺序 | +| 按 Key 分区 | 高 | 中 | Key 级顺序 | 订单、用户等业务 | +| 内存队列 | 高 | 高 | Key 级顺序 | 高并发、复杂业务 | + +--- + +### 5. 消息积压处理 + +#### **消息积压的原因** + +1. **消费者消费速度慢**: + - 消费逻辑复杂 + - 下游服务响应慢 + - 数据库操作慢 + +2. **生产者发送速度过快**: + - 突发流量 + - 秒杀活动 + +3. **消费者故障**: + - 消费者宕机 + - 网络问题 + +4. **消费者消费失败**: + - 消息格式错误 + - 业务异常 + - 无限重试 + +--- + +#### **监控消息积压** + +**RocketMQ**: +```bash +# 查看消费者堆积情况 +sh mqadmin queryConsumeQueue -t OrderTopic -n localhost:9876 + +# 查看消费者组 +sh mqadmin consumerList -g order-group -n localhost:9876 + +# 查看消费进度 +sh mqadmin consumerProgress -g order-group -n localhost:9876 +``` + +**输出示例**: +``` +# 消费进度 +Topic: OrderTopic +Broker: broker-a +Queue: 0 + Diff: 1000000 ← 积压 100 万条 + LastOffset: 5000000 +``` + +**Kafka**: +```bash +# 查看消费者 lag +kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ + --group order-group --describe + +# 输出 +TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG +OrderTopic 0 5000000 6000000 1000000 + ↑ + 积压 100 万 +``` + +--- + +#### **解决方案** + +##### **方案 1:增加消费者数量(横向扩容)** + +**原理**:增加消费者实例,提高消费能力。 + +**RocketMQ 示例**: +```java +// 原来:1 个消费者实例 +@RocketMQMessageListener( + topic = "OrderTopic", + consumerGroup = "order-group", + consumeThreadNumber = 1 // 1 个线程 +) + +// 扩容后:10 个消费者实例(部署 10 个 Pod) +// 每个 Pod 有 1 个线程,总消费能力 ×10 +``` + +**注意事项**: +- **消费者数量 ≤ Queue 数量** +- 如果消费者数量 > Queue 数量,部分消费者会空闲 + +**计算公式**: +``` +所需消费者数 = 消息积压量 / (单个消费者消费速度 × 预期清理时间) + +示例: +积压 100 万条,单个消费者每秒消费 100 条,预期 1 小时清理 +所需消费者数 = 1000000 / (100 × 3600) ≈ 3 个 +``` + +--- + +##### **方案 2:增加消费线程(纵向扩容)** + +**RocketMQ 示例**: +```java +@RocketMQMessageListener( + topic = "OrderTopic", + consumerGroup = "order-group", + consumeThreadNumber = 20 // 从 1 增加到 20 个线程 +) +``` + +**Kafka 示例**: +```java +Properties props = new Properties(); +// 增加消费线程数 +props.put("max.poll.records", 500); // 每次拉取 500 条 +props.put("max.poll.interval.ms", 300000); // 最大处理时间 5 分钟 +``` + +**注意事项**: +- 线程数不是越多越好(CPU、数据库连接数限制) +- 需要测试找到最优值 + +--- + +##### **方案 3:优化消费逻辑** + +**问题代码**: +```java +// ❌ 在循环中逐条处理数据库 +public void consumeMessage(List messages) { + for (Message msg : messages) { + // 逐条插入数据库(慢) + orderMapper.insert(parseOrder(msg)); + } +} +``` + +**优化代码**: +```java +// ✅ 批量处理数据库 +public void consumeMessage(List messages) { + List orders = messages.stream() + .map(this::parseOrder) + .collect(Collectors.toList()); + + // 批量插入(快 10-100 倍) + orderMapper.batchInsert(orders); +} +``` + +**Mapper**: +```java +@Insert({ + "" +}) +int batchInsert(@Param("orders") List orders); +``` + +--- + +##### **方案 4:临时扩容方案** + +**场景**:积压严重(如 1000 万条),正常消费需要几天。 + +**步骤**: + +1. **创建临时 Topic**(大量 Queue): +```bash +# 创建 OrderTopic_Temp,有 100 个 Queue +``` + +2. **编写转发程序**: +```java +// 从原 Topic 消费,发送到临时 Topic +@Component +public class MessageForwarder { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "forward-group") + public void onMessage(MessageExt message) { + // 转发到临时 Topic + rocketMQTemplate.send("OrderTopic_Temp", message); + } +} +``` + +3. **部署大量消费者**消费临时 Topic: +```java +// 部署 100 个消费者实例,每个消费 1 个 Queue +@RocketMQMessageListener( + topic = "OrderTopic_Temp", + consumerGroup = "temp-consumer-group" +) +``` + +4. **积压清理完毕后,恢复正常架构**。 + +--- + +##### **方案 5:丢弃部分消息** + +**场景**: +- 消息积压过于严重(如 1 亿条) +- 消息时效性要求高(如实时日志) +- 允许部分数据丢失 + +**实现**: +```java +// 1. 创建临时消费者,只消费不处理(快速跳过) +@RocketMQMessageListener( + topic = "OrderTopic", + consumerGroup = "skip-group" +) +public void onMessage(MessageExt message) { + // 直接返回成功,不处理消息 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; +} + +// 2. 跳到指定 offset +consumer.seekToBegin(); // 跳到最前 +consumer.seek(queue, 10000000); // 跳到第 1000 万条 +``` + +--- + +#### **消息积压处理流程** + +``` +1. 监控发现积压 + ↓ +2. 分析积压原因 + ├─ 消费者慢 → 优化消费逻辑 + ├─ 消费者少 → 横向扩容 + ├─ 生产者快 → 限流 + └─ 消费失败 → 修复 bug,跳过错误消息 + ↓ +3. 执行扩容或优化 + ↓ +4. 监控清理进度 + ↓ +5. 恢复正常架构 +``` + +--- + +### 6. RocketMQ vs Kafka + +#### **核心特性对比** + +| 特性 | RocketMQ | Kafka | +|------|----------|-------| +| **开发语言** | Java | Scala/Java | +| **协议** | 自有协议 | TCP | +| **消息模型** | 队列模型 | 发布-订阅模型 | +| **消息顺序** | 支持严格顺序 | 分区内有序 | +| **事务消息** | ✅ 支持 | ❌ 不支持(KIP-98 有提案) | +| **定时消息** | ✅ 支持 | ❌ 不支持 | +| **延迟消息** | ✅ 支持(18 级) | ❌ 不支持 | +| **消息回溯** | ✅ 支持 | ✅ 支持(需设置) | +| **消息过滤** | SQL92 表达式 | 客户端过滤 | +| **吞吐量** | 万级/单机 | 十万级/单机 | +| **延迟** | ms 级 | ms 级 | +| **可靠性** | 高(同步刷盘 + 主从) | 高(多副本) | +| **运维复杂度** | 中 | 中 | +| **社区活跃度** | 中(阿里主导) | 高(Confluent 支持) | +| **文档质量** | 中(中文文档多) | 高 | +| **适用场景** | 业务消息、订单、金融 | 日志收集、流式处理 | + +--- + +#### **详细对比** + +##### **1. 消息模型** + +**RocketMQ**: +``` +Topic: OrderTopic +├─ Queue1 +├─ Queue2 +├─ Queue3 +└─ Queue4 + +消费者组:order-group +├─ 消费者1 → Queue1, Queue2 +└─ 消费者2 → Queue3, Queue4 + +一条消息只能被同一个消费者组的一个消费者消费 +``` + +**Kafka**: +``` +Topic: OrderTopic +├─ Partition1 +│ ├─ Replica1 (Leader) +│ ├─ Replica2 (Follower) +│ └─ Replica3 (Follower) +├─ Partition2 +│ ├─ Replica1 (Leader) +│ ├─ Replica2 (Follower) +│ └─ Replica3 (Follower) +└─ ... + +消费者组:order-group +├─ 消费者1 → Partition1 +└─ 消费者2 → Partition2 + +一条消息只能被同一个消费者组的一个消费者消费 +``` + +**区别**: +- RocketMQ 的 Queue 是逻辑概念 +- Kafka 的 Partition 是物理概念(有副本) + +--- + +##### **2. 消息存储** + +**RocketMQ**: +``` +CommitLog: +├─ 所有消息顺序写入(混合存储) +├─ 顺序写,性能高 +└─ 通过 ConsumeQueue 索引快速查找 + +ConsumeQueue(索引文件): +├─ 消息 1: CommitLog 偏移量 100, 大小 200, TagsCode +├─ 消息 2: CommitLog 偏移量 300, 大小 150, TagsCode +└─ 消息 3: CommitLog 偏移量 450, 大小 180, TagsCode +``` + +**Kafka**: +``` +Partition: +├─ Segment 0 +│ ├─ 00000000000000000000.log (消息数据) +│ ├─ 00000000000000000000.index (索引) +│ └─ 00000000000000000000.timeindex (时间索引) +├─ Segment 1 +│ ├─ 00000000000000050000.log +│ ├─ 00000000000000050000.index +│ └─ 00000000000000050000.timeindex +└─ ... + +每个 Partition 独立存储 +``` + +**对比**: +- RocketMQ:所有消息混合存储,通过索引文件查找 +- Kafka:每个 Partition 独立存储,支持 Segment 滚动 + +--- + +##### **3. 消息过滤** + +**RocketMQ**(服务端过滤): +```java +// 消费者订阅时指定过滤条件 +consumer.subscribe("OrderTopic", MessageSelector.bySql("amount > 100 AND status = 'PAID'")); + +// 生产者发送消息时设置属性 +Message msg = new Message("OrderTopic", "Order", body); +msg.putUserProperty("amount", "200"); +msg.putUserProperty("status", "PAID"); +producer.send(msg); +``` + +**Kafka**(客户端过滤): +```java +// 消费者消费后手动过滤 +KafkaConsumer consumer = new KafkaConsumer<>(props); +consumer.subscribe(Arrays.asList("OrderTopic")); + +while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // 客户端过滤(浪费网络带宽) + if (record.value().contains("amount:200")) { + processMessage(record); + } + } +} +``` + +**区别**: +- RocketMQ:服务端过滤(节省带宽) +- Kafka:客户端过滤(浪费带宽) + +--- + +##### **4. 事务消息** + +**RocketMQ**: +```java +// 1. 发送半消息 +Message msg = new Message("OrderTopic", orderJson.getBytes()); +TransactionSendResult result = producer.sendMessageInTransaction(msg, null); + +// 2. 执行本地事务(如创建订单) +public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + try { + orderService.createOrder(msg); + return LocalTransactionState.COMMIT_MESSAGE; // 提交消息 + } catch (Exception e) { + return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息 + } +} + +// 3. 提交/回滚消息 +// 消费者可以正常消费 +``` + +**Kafka**: +``` +❌ 不支持事务消息(KIP-98 有提案,但未正式发布) + +✅ 支持"精确一次"语义(Exactly-Once),但仅限于流式处理(Kafka Streams) +``` + +--- + +##### **5. 延迟消息** + +**RocketMQ**: +```java +// 18 个默认延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h) +Message msg = new Message("OrderTopic", body); +msg.setDelayTimeLevel(3); // 延迟 10 秒 +producer.send(msg); +``` + +**Kafka**: +```java +// ❌ 不支持延迟消息 + +// ✅ 通过时间轮自定义实现 +``` + +--- + +##### **6. 吞吐量** + +**测试环境**: +- 机器:4 核 8G +- 消息大小:1 KB + +**结果**: +| MQ | 单机吞吐量 | 延迟 | +|----|-----------|------| +| RocketMQ | 10-15 万/秒 | 1-5 ms | +| Kafka | 30-50 万/秒 | 1-5 ms | + +**原因**: +- Kafka:零拷贝(sendfile)、批量处理、更好的磁盘顺序写 +- RocketMQ:也有零拷贝,但 Java GC 影响性能 + +--- + +#### **选型建议** + +##### **选择 RocketMQ 的场景** + +1. **需要事务消息**: + - 订单系统(下单 → 扣库存) + - 支付系统(支付 → 通知) + +2. **需要延迟消息**: + - 定时任务 + - 超时取消订单 + +3. **需要严格的消息顺序**: + - 金融交易 + - 订单状态流转 + +4. **需要消息回溯**: + - 数据恢复 + - 数据重放 + +5. **业务复杂、需要高可靠性**: + - 金融、支付、电商 + +--- + +##### **选择 Kafka 的场景** + +1. **日志收集**: + - 应用日志 → ELK + - 用户行为日志 + +2. **流式处理**: + - 实时数据分析 + - 实时推荐 + +3. **高吞吐量场景**: + - 日志收集(百万级/秒) + - 监控数据收集 + +4. **已有 Kafka 生态**: + - 使用 Flink、Spark Streaming 等流式计算框架 + +--- + +#### **企业级选型案例** + +| 公司 | 场景 | 选择 | 原因 | +|------|------|------|------| +| 阿里巴巴 | 订单、支付 | RocketMQ | 事务消息、高可靠性 | +| 美团 | 订单、物流 | RocketMQ | 业务复杂、延迟消息 | +| 字节跳动 | 日志、推荐 | Kafka | 高吞吐、流式处理 | +| 携程 | 订单、日志 | 两者都用 | 订单用 RocketMQ,日志用 Kafka | +| 京东 | 订单、支付 | RocketMQ | 事务消息、高可靠性 | + +--- + +### 7. 总结 + +#### **消息队列核心问题** + +| 问题 | 解决方案 | +|------|----------| +| **消息不丢失** | 生产者(同步发送、事务消息、本地消息表)
Broker(持久化、主从、多副本)
消费者(手动提交 offset、死信队列) | +| **消息重复消费** | 数据库唯一约束、Redis 分布式锁、状态机、去重表、MVCC | +| **消息顺序性** | 单分区、按 Key 分区、内存队列 | +| **消息积压** | 横向扩容、纵向扩容、优化消费逻辑、临时 Topic、丢弃消息 | +| **高可用** | 集群部署、主从复制、多副本、故障自动转移 | + +--- + +### 8. 阿里 P7 加分项 + +**深度理解**: +- 理解 RocketMQ 的 CommitLog 和 ConsumeQueue 设计 +- 理解 Kafka 的 Partition、Segment、副本同步机制 +- 理解零拷贝(sendfile)、mmap 等底层技术 + +**实战经验**: +- 有将消息积压从千万级清理到正常的经验 +- 有处理消息丢失、重复消费的线上故障经验 +- 有消息队列性能调优经验(JVM 参数、操作系统参数) + +**架构能力**: +- 能设计跨数据中心的消息队列架构 +- 能设计消息队列的监控和告警体系 +- 有消息队列迁移经验(如从 ActiveMQ 迁移到 RocketMQ) + +**技术选型**: +- 能根据业务特点选择合适的 MQ +- 有多种 MQ 混用的经验 +- 了解 MQTT、AMQP 等其他消息协议