# 消息队列(RocketMQ/Kafka) ## 问题 **背景**:消息队列是分布式系统的核心组件,用于解耦、异步、削峰填谷。 **问题**: 1. 消息队列的作用是什么?在什么场景下使用? 2. 如何保证消息不丢失?(生产者、Broker、消费者三个层面) 3. 如何保证消息不重复消费(幂等性)? 4. 如何保证消息的顺序性? 5. 什么是消息积压?如何处理? 6. RocketMQ 和 Kafka 的区别是什么?如何选型? --- ## 标准答案 ### 1. 消息队列的作用 #### **核心作用** ##### **1. 解耦(Decoupling)** **场景**:用户注册成功后,需要发送欢迎邮件、发送短信、发放优惠券。 **不使用 MQ**: ``` 用户服务 ↓ 调用邮件服务 ✉️ 调用短信服务 📱 调用优惠券服务 🎫 ``` **问题**: - 用户服务依赖所有下游服务 - 任何一个下游服务故障都会影响注册流程 - 新增需求需要修改用户服务代码 **使用 MQ**: ``` 用户服务 → 发送消息到 MQ ← 邮件服务(订阅) ← 短信服务(订阅) ← 优惠券服务(订阅) ← 新服务(随时订阅) ``` **优点**: - 用户服务不需要知道下游服务的存在 - 下游服务故障不影响用户注册 - 新增服务只需订阅消息,无需修改用户服务 --- ##### **2. 异步(Asynchronous)** **场景**:用户下单后,需要: 1. 扣减库存 2. 生成订单 3. 通知物流 4. 发送短信 5. 积分处理 **同步调用(串行)**: ``` 总耗时 = 50ms + 100ms + 80ms + 50ms + 60ms = 340ms ``` **异步调用(MQ)**: ``` 下单服务 → MQ(5ms) └─ 返回给用户(总耗时:55ms) 后台异步处理: ├─ 库存服务(50ms) ├─ 订单服务(100ms) ├─ 物流服务(80ms) ├─ 短信服务(50ms) └─ 积分服务(60ms) ``` **优点**: - 响应时间从 340ms 降低到 55ms - 用户体验提升 --- ##### **3. 削峰填谷(Peak Shaving)** **场景**:秒杀活动、双11大促 **不使用 MQ**: ``` 瞬时 QPS:50000 数据库最大 QPS:5000 结果:数据库被打挂 ❌ ``` **使用 MQ**: ``` 瞬时 QPS:50000 → MQ(缓冲) └─ 按数据库能力消费(5000 QPS) 结果:数据库平稳运行 ✅ ``` **图解**: ``` 不使用 MQ: 请求量 ███████████████████████████████████████ ↑ 数据库被打挂 使用 MQ: 请求量 ███████████████████████████████████████ ↓ MQ 缓冲 ↓ 消费量 ████████████████████████████████████████ ↑ 平滑消费 ``` --- #### **使用场景** | 场景 | 说明 | 示例 | |------|------|------| | **异步处理** | 非核心流程异步化 | 注册后发送邮件、短信 | | **系统解耦** | 降低系统耦合度 | 订单系统 ↔ 物流系统 | | **流量削峰** | 应对突发流量 | 秒杀、大促 | | **日志收集** | 分布式日志收集 | 应用日志 → ELK | | **事件驱动** | 基于事件的架构 | 用户行为触发推荐算法 | | **数据同步** | 跨数据库同步 | MySQL → Elasticsearch | | **分布式事务** | 最终一致性 | 订单 → 库存(Saga 模式) | --- #### **消息队列的缺点** 1. **系统复杂度增加** - 需要维护 MQ 集群 - 需要处理消息丢失、重复、顺序等问题 2. **数据一致性** - 无法保证强一致性,只能保证最终一致性 3. **可用性降低** - MQ 成为单点故障(需要高可用集群) 4. **消息延迟** - 异步处理导致消息延迟 --- ### 2. 保证消息不丢失 #### **三个层面的保障** ``` 生产者 → Broker → 消费者 ↓ ↓ ↓ 三个层面都要保证 ``` --- #### **层面 1:生产者(Producer)** ##### **方案 1:同步发送 + 确认机制** **RocketMQ 示例**: ```java // 同步发送(默认) Message message = new Message("TopicTest", "TagA", "OrderID001", "Hello MQ".getBytes()); // 同步发送:等待 Broker 确认 SendResult sendResult = producer.send(message); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { // 发送成功 } else { // 发送失败,重试或记录日志 log.error("消息发送失败: {}", sendResult); } ``` **发送状态**: - `SEND_OK`:发送成功 - `FLUSH_DISK_TIMEOUT`:刷盘超时 - `FLUSH_SLAVE_TIMEOUT`:同步到 Slave 超时 - `SLAVE_NOT_AVAILABLE`:Slave 不可用 --- ##### **方案 2:异步发送 + 回调** ```java producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 发送成功 log.info("消息发送成功: {}", sendResult); } @Override public void onException(Throwable e) { // 发送失败,重试或记录日志 log.error("消息发送失败", e); } }); ``` --- ##### **方案 3:事务消息(RocketMQ 独有)** **原理**: 1. 发送半消息(Half Message,对消费者不可见) 2. 执行本地事务 3. 提交/回滚消息 ```java // 发送事务消息 TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务(如:创建订单) try { orderService.createOrder(msg); return LocalTransactionState.COMMIT_MESSAGE; // 提交消息 } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息 } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 反查本地事务状态(Broker 未收到确认时调用) if (orderService.exists(msg.getKeys())) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 发送消息 producer.sendMessageInTransaction(message, null); ``` --- ##### **方案 4:本地消息表** **原理**: 1. 在同一本地事务中:完成业务操作 + 插入消息到本地消息表 2. 定时任务扫描消息表,发送消息到 MQ 3. 发送成功后更新消息状态 ```java @Transactional public void createOrder(Order order) { // 1. 保存订单 orderMapper.insert(order); // 2. 保存消息到本地消息表 Message message = new Message("OrderTopic", JSON.toJSONString(order).getBytes()); localMessageMapper.insert(new LocalMessage(order.getId(), message)); } // 定时任务:发送消息到 MQ @Scheduled(fixedRate = 5000) public void sendPendingMessages() { List messages = localMessageMapper.findPendingMessages(); for (LocalMessage msg : messages) { try { SendResult result = producer.send(msg.getMessage()); if (result.getSendStatus() == SendStatus.SEND_OK) { // 更新消息状态为已发送 localMessageMapper.markAsSent(msg.getId()); } } catch (Exception e) { log.error("消息发送失败: {}", msg.getId(), e); } } } ``` **优点**: - 可靠性极高(消息持久化到数据库) - 支持重试 **缺点**: - 需要维护本地消息表 - 实现复杂 --- #### **层面 2:Broker(消息服务器)** ##### **方案 1:消息持久化** **RocketMQ**: ```xml SYNC_FLUSH ASYNC_FLUSH ``` **同步刷盘 vs 异步刷盘**: | 方式 | 性能 | 可靠性 | 适用场景 | |------|------|--------|----------| | SYNC_FLUSH | 低(写入磁盘后才返回成功) | 高 | 金融、支付 | | ASYNC_FLUSH | 高(写入内存后返回成功) | 中 | 日志、监控 | --- ##### **方案 2:主从复制(HA)** **RocketMQ 配置**: ```xml SYNC_MASTER ASYNC_MASTER ``` **复制方式对比**: | 方式 | 性能 | 可靠性 | 宕机丢数据 | |------|------|--------|------------| | SYNC_MASTER | 低 | 高 | 否 | | ASYNC_MASTER | 高 | 中 | 可能 | **推荐配置**(金融级可靠性): ```xml SYNC_FLUSH SYNC_MASTER ``` --- ##### **方案 3:多副本(Kafka)** **Kafka 配置**: ```properties # 副本数量(建议 ≥ 3) default.replication.factor=3 # 最小同步副本数(生产者确认) min.insync.replicas=2 # 不完全的选举首领(禁止) unclean.leader.election.enable=false ``` **生产者配置**: ```java Properties props = new Properties(); // 等待所有副本确认 props.put("acks", "all"); // 或至少等待主副本 + 1 个从副本确认 props.put("acks", "1"); ``` **acks 参数**: - `acks=0`:不等待确认(最快,可能丢失) - `acks=1`:等待主副本确认(折中) - `acks=all`(或 `-1`):等待所有副本确认(最可靠) --- #### **层面 3:消费者(Consumer)** ##### **方案 1:手动提交 Offset** **RocketMQ 示例**: ```java DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 设置为手动提交 consumer.setConsumeMessageBatchMaxSize(1); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 处理消息 processMessage(msg); // 处理成功,返回 CONSUME_SUCCESS(自动提交 offset) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 处理失败,稍后重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } }); ``` --- **Kafka 示例**: ```java Properties props = new Properties(); // 关闭自动提交 props.put("enable.auto.commit", "false"); KafkaConsumer consumer = new KafkaConsumer<>(props); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { try { // 处理消息 processMessage(record); // 手动提交 offset consumer.commitSync(); } catch (Exception e) { // 处理失败,不提交 offset,下次重试 log.error("消息处理失败: {}", record.offset(), e); } } } ``` --- ##### **方案 2:先处理消息,再提交 Offset** ```java // 错误示例:先提交 offset,再处理消息 consumer.commitSync(); // ❌ 提交成功 processMessage(record); // ❌ 处理失败,消息丢失 // 正确示例:先处理消息,再提交 offset processMessage(record); // ✅ 处理成功 consumer.commitSync(); // ✅ 提交 offset ``` --- ##### **方案 3:死信队列(DLQ)** **场景**:消息处理失败多次后,不再重试,放入死信队列。 **RocketMQ 示例**: ```java consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { int reconsumeTimes = msg.getReconsumeTimes(); if (reconsumeTimes >= 3) { // 重试 3 次后仍失败,放入死信队列 sendToDeadLetterQueue(msg); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } try { processMessage(msg); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 稍后重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } }); ``` **Kafka 示例**: ```java // 配置死信队列 Properties props = new Properties(); props.put("enable.deadletter", "true"); props.put("deadletter.queue.topic.prefix", "dlq."); // 消费者处理失败时,发送到死信队列 if (retries >= 3) { kafkaTemplate.send("dlq.MyTopic", record.value()); } ``` --- #### **总结:端到端的可靠投递** ``` 生产者 Broker 消费者 ↓ ↓ ↓ 1. 同步发送 1. 持久化到磁盘 1. 手动提交 offset 2. 事务消息 2. 主从复制 2. 处理成功后才提交 3. 本地消息表 3. 多副本 3. 死信队列 ``` --- ### 3. 保证消息幂等性 #### **为什么会有重复消息?** 1. **生产者重复发送**:网络抖动导致生产者未收到 ACK,重试发送 2. **Broker 重复投递**:主从切换时,从副本尚未同步的消息被重复投递 3. **消费者重复消费**:消费成功后 offset 提交失败,下次重复消费 --- #### **幂等性解决方案** ##### **方案 1:数据库唯一约束** **场景**:订单创建 ```java @Transactional public void createOrder(Order order) { // order_id 有唯一索引 try { orderMapper.insert(order); } catch (DuplicateKeyException e) { // 重复订单,忽略 log.warn("重复订单: {}", order.getId()); return; } // 处理订单逻辑 processOrder(order); } ``` **优点**: - 实现简单 - 数据库保证唯一性 **缺点**: - 依赖数据库 - 高并发下性能较差(频繁抛异常) --- ##### **方案 2:Redis 分布式锁 + 唯一标识** **原理**:为每条消息设置唯一 ID(如 UUID),消费前先检查 Redis 是否已处理。 ```java public void consumeMessage(Message message) { String messageId = message.getHeaders().get("messageId"); String lockKey = "message:processed:" + messageId; // 尝试获取锁 Boolean locked = redisTemplate.opsForValue() .setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS); if (!locked) { // 消息已处理,跳过 log.warn("重复消息: {}", messageId); return; } try { // 处理消息 processMessage(message); } catch (Exception e) { // 处理失败,删除锁(允许重试) redisTemplate.delete(lockKey); throw e; } } ``` **优点**: - 性能高(Redis 内存操作) - 可设置过期时间(自动清理) **缺点**: - 依赖 Redis - Redis 宕机可能导致问题 --- ##### **方案 3:状态机** **场景**:订单状态流转 ```sql -- 订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY, status VARCHAR(20), -- CREATED, PAID, SHIPPED, COMPLETED version INT, -- 乐观锁版本号 UNIQUE KEY id_status (id, status) -- 联合唯一索引 ); ``` ```java @Transactional public void updateOrderStatus(Long orderId, String fromStatus, String toStatus) { // 使用 CAS(Compare And Set)更新状态 int updated = orderMapper.updateStatus(orderId, fromStatus, toStatus); if (updated == 0) { // 状态不匹配,可能是重复消息 log.warn("订单状态已变更: orderId={}, fromStatus={}, toStatus={}", orderId, fromStatus, toStatus); return; } // 处理后续逻辑 processStatusChange(orderId, toStatus); } // Mapper @Update("UPDATE orders SET status = #{toStatus}, version = version + 1 " + "WHERE id = #{orderId} AND status = #{fromStatus}") int updateStatus(@Param("orderId") Long orderId, @Param("fromStatus") String fromStatus, @Param("toStatus") String toStatus); ``` **优点**: - 状态机保证幂等性 - 支持复杂业务逻辑 **缺点**: - 设计复杂 - 需要明确状态流转 --- ##### **方案 4:去重表** **原理**:创建专门的去重表,记录已处理的消息 ID。 ```sql -- 去重表 CREATE TABLE message_dedup ( id BIGINT AUTO_INCREMENT PRIMARY KEY, message_id VARCHAR(64) NOT NULL, topic VARCHAR(64), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_message_id (message_id) ); ``` ```java @Transactional public void consumeMessage(Message message) { String messageId = message.getId(); // 插入去重表 try { dedupMapper.insert(new MessageDedup(messageId, message.getTopic())); } catch (DuplicateKeyException e) { // 消息已处理,跳过 log.warn("重复消息: {}", messageId); return; } // 处理消息 processMessage(message); } ``` **优点**: - 简单直观 - 可查询处理历史 **缺点**: - 额外的存储开销 - 需要定期清理历史数据 --- ##### **方案 5:多版本并发控制(MVCC)** **原理**:为数据增加版本号,更新时检查版本号。 ```sql -- 订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY, amount DECIMAL(10,2), version INT DEFAULT 0, -- 版本号 updated_at DATETIME ); ``` ```java @Transactional public void updateOrderAmount(Long orderId, BigDecimal newAmount) { // 使用乐观锁更新 int updated = orderMapper.updateAmountWithVersion(orderId, newAmount); if (updated == 0) { // 版本号不匹配,可能是重复消息 log.warn("订单已被更新: orderId={}", orderId); return; } // 处理后续逻辑 processAmountChange(orderId, newAmount); } // Mapper @Update("UPDATE orders SET amount = #{amount}, version = version + 1 " + "WHERE id = #{orderId} AND version = #{version}") int updateAmountWithVersion(@Param("orderId") Long orderId, @Param("amount") BigDecimal amount, @Param("version") Integer version); ``` **优点**: - 无锁,性能高 - 支持高并发 **缺点**: - 更新失败时需要重试 - 不适合写冲突频繁的场景 --- #### **幂等性方案对比** | 方案 | 性能 | 复杂度 | 可靠性 | 适用场景 | |------|------|--------|--------|----------| | 数据库唯一约束 | 低 | 低 | 高 | 低并发、简单场景 | | Redis 分布式锁 | 高 | 中 | 中 | 高并发、通用场景 | | 状态机 | 中 | 高 | 高 | 订单、工作流等 | | 去重表 | 中 | 低 | 高 | 需要记录历史的场景 | | MVCC | 高 | 中 | 高 | 高并发、读多写少 | --- ### 4. 保证消息顺序性 #### **为什么消息会乱序?** **场景**:一个 Topic 有多个 Queue(分区),多个消费者并发消费。 ``` Topic: OrderTopic ├─ Queue1: 消息1, 消息4, 消息7 ├─ Queue2: 消息2, 消息5, 消息8 └─ Queue3: 消息3, 消息6, 消息9 消费者1消费Queue1,消费者2消费Queue2,消费者3消费Queue3 └─ 消费速度不同,导致消息乱序 ``` --- #### **解决方案** ##### **方案 1:单分区(单 Queue)** **原理**:一个 Topic 只有一个 Queue,保证消息按顺序消费。 ```java // RocketMQ 生产者:发送到同一个 Queue SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { // 始终选择第一个 Queue return mqs.get(0); } }, null); // RocketMQ 消费者:单线程消费 consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); ``` **Kafka 配置**: ```properties # Topic 只有 1 个分区 num.partitions=1 ``` **优点**: - 实现简单 - 严格保证顺序 **缺点**: - 性能差(无法并行消费) - 成为瓶颈 **适用场景**: - 低并发、对顺序要求极高的场景 --- ##### **方案 2:按业务 Key 分区** **原理**:相同业务 Key 的消息发送到同一个 Queue。 **场景**:订单号相同的消息需要顺序消费。 **RocketMQ 示例**: ```java // 生产者:按订单 ID 分区 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Long orderId = (Long) arg; // 相同订单 ID 的消息发送到同一个 Queue int index = (int) (orderId % mqs.size()); return mqs.get(index); } }, orderId); // 消费者:单线程消费每个 Queue(或至少保证相同订单的消息顺序处理) consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { // MessageListenerOrderly 保证顺序消费 for (MessageExt msg : msgs) { processMessage(msg); } return ConsumeOrderlyStatus.SUCCESS; } }); ``` **Kafka 示例**: ```java // 生产者:按订单 ID 分区 Properties props = new Properties(); props.put("partitioner.class", "com.example.OrderPartitioner"); KafkaProducer producer = new KafkaProducer<>(props); // 发送消息(相同订单 ID 的消息会发送到同一个分区) producer.send(new ProducerRecord<>("OrderTopic", orderId, message)); // 自定义分区器 public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Long orderId = Long.parseLong(key.toString()); List partitions = cluster.partitionsForTopic(topic); return (int) (orderId % partitions.size()); } } ``` **优点**: - 相同业务 Key 的消息保证顺序 - 不同业务 Key 的消息可并行消费 - 性能较好 **缺点**: - 需要选择合适的分区 Key - 分区数量固定,扩容困难 --- ##### **方案 3:单线程消费 + 内存队列** **原理**:消费者内部按业务 Key 分组,使用多个内存队列 + 多个线程处理。 **架构图**: ``` MQ 消费者(单线程拉取) ↓ 按 Key 分组 ↓ ┌───────────┬───────────┬───────────┐ │ 内存队列1 │ 内存队列2 │ 内存队列3 │ │ (订单100) │ (订单200) │ (订单300) │ └───────────┴───────────┴───────────┘ ↓ ↓ ↓ 工作线程1 工作线程2 工作线程3 ``` **代码示例**: ```java @Component public class OrderMessageConsumer { private final Map> keyQueues = new ConcurrentHashMap<>(); private final ExecutorService executorService = Executors.newFixedThreadPool(10); @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "order-group") public void onMessage(MessageExt message) { Long orderId = parseOrderId(message); // 获取或创建该订单的内存队列 BlockingQueue queue = keyQueues.computeIfAbsent(orderId, k -> new LinkedBlockingQueue<>() ); // 消息放入队列 queue.offer(message); // 提交异步任务处理 executorService.submit(() -> { Message msg = queue.poll(); if (msg != null) { processMessage(msg); } }); } } ``` **优点**: - 保证相同 Key 的消息顺序 - 不同 Key 的消息并行处理 - 性能好 **缺点**: - 实现复杂 - 内存占用较高 --- #### **消息顺序性总结** | 方案 | 性能 | 复杂度 | 顺序粒度 | 适用场景 | |------|------|--------|----------|----------| | 单分区 | 低 | 低 | 全局顺序 | 低并发、全局顺序 | | 按 Key 分区 | 高 | 中 | Key 级顺序 | 订单、用户等业务 | | 内存队列 | 高 | 高 | Key 级顺序 | 高并发、复杂业务 | --- ### 5. 消息积压处理 #### **消息积压的原因** 1. **消费者消费速度慢**: - 消费逻辑复杂 - 下游服务响应慢 - 数据库操作慢 2. **生产者发送速度过快**: - 突发流量 - 秒杀活动 3. **消费者故障**: - 消费者宕机 - 网络问题 4. **消费者消费失败**: - 消息格式错误 - 业务异常 - 无限重试 --- #### **监控消息积压** **RocketMQ**: ```bash # 查看消费者堆积情况 sh mqadmin queryConsumeQueue -t OrderTopic -n localhost:9876 # 查看消费者组 sh mqadmin consumerList -g order-group -n localhost:9876 # 查看消费进度 sh mqadmin consumerProgress -g order-group -n localhost:9876 ``` **输出示例**: ``` # 消费进度 Topic: OrderTopic Broker: broker-a Queue: 0 Diff: 1000000 ← 积压 100 万条 LastOffset: 5000000 ``` **Kafka**: ```bash # 查看消费者 lag kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group order-group --describe # 输出 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OrderTopic 0 5000000 6000000 1000000 ↑ 积压 100 万 ``` --- #### **解决方案** ##### **方案 1:增加消费者数量(横向扩容)** **原理**:增加消费者实例,提高消费能力。 **RocketMQ 示例**: ```java // 原来:1 个消费者实例 @RocketMQMessageListener( topic = "OrderTopic", consumerGroup = "order-group", consumeThreadNumber = 1 // 1 个线程 ) // 扩容后:10 个消费者实例(部署 10 个 Pod) // 每个 Pod 有 1 个线程,总消费能力 ×10 ``` **注意事项**: - **消费者数量 ≤ Queue 数量** - 如果消费者数量 > Queue 数量,部分消费者会空闲 **计算公式**: ``` 所需消费者数 = 消息积压量 / (单个消费者消费速度 × 预期清理时间) 示例: 积压 100 万条,单个消费者每秒消费 100 条,预期 1 小时清理 所需消费者数 = 1000000 / (100 × 3600) ≈ 3 个 ``` --- ##### **方案 2:增加消费线程(纵向扩容)** **RocketMQ 示例**: ```java @RocketMQMessageListener( topic = "OrderTopic", consumerGroup = "order-group", consumeThreadNumber = 20 // 从 1 增加到 20 个线程 ) ``` **Kafka 示例**: ```java Properties props = new Properties(); // 增加消费线程数 props.put("max.poll.records", 500); // 每次拉取 500 条 props.put("max.poll.interval.ms", 300000); // 最大处理时间 5 分钟 ``` **注意事项**: - 线程数不是越多越好(CPU、数据库连接数限制) - 需要测试找到最优值 --- ##### **方案 3:优化消费逻辑** **问题代码**: ```java // ❌ 在循环中逐条处理数据库 public void consumeMessage(List messages) { for (Message msg : messages) { // 逐条插入数据库(慢) orderMapper.insert(parseOrder(msg)); } } ``` **优化代码**: ```java // ✅ 批量处理数据库 public void consumeMessage(List messages) { List orders = messages.stream() .map(this::parseOrder) .collect(Collectors.toList()); // 批量插入(快 10-100 倍) orderMapper.batchInsert(orders); } ``` **Mapper**: ```java @Insert({ "" }) int batchInsert(@Param("orders") List orders); ``` --- ##### **方案 4:临时扩容方案** **场景**:积压严重(如 1000 万条),正常消费需要几天。 **步骤**: 1. **创建临时 Topic**(大量 Queue): ```bash # 创建 OrderTopic_Temp,有 100 个 Queue ``` 2. **编写转发程序**: ```java // 从原 Topic 消费,发送到临时 Topic @Component public class MessageForwarder { @Autowired private RocketMQTemplate rocketMQTemplate; @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "forward-group") public void onMessage(MessageExt message) { // 转发到临时 Topic rocketMQTemplate.send("OrderTopic_Temp", message); } } ``` 3. **部署大量消费者**消费临时 Topic: ```java // 部署 100 个消费者实例,每个消费 1 个 Queue @RocketMQMessageListener( topic = "OrderTopic_Temp", consumerGroup = "temp-consumer-group" ) ``` 4. **积压清理完毕后,恢复正常架构**。 --- ##### **方案 5:丢弃部分消息** **场景**: - 消息积压过于严重(如 1 亿条) - 消息时效性要求高(如实时日志) - 允许部分数据丢失 **实现**: ```java // 1. 创建临时消费者,只消费不处理(快速跳过) @RocketMQMessageListener( topic = "OrderTopic", consumerGroup = "skip-group" ) public void onMessage(MessageExt message) { // 直接返回成功,不处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // 2. 跳到指定 offset consumer.seekToBegin(); // 跳到最前 consumer.seek(queue, 10000000); // 跳到第 1000 万条 ``` --- #### **消息积压处理流程** ``` 1. 监控发现积压 ↓ 2. 分析积压原因 ├─ 消费者慢 → 优化消费逻辑 ├─ 消费者少 → 横向扩容 ├─ 生产者快 → 限流 └─ 消费失败 → 修复 bug,跳过错误消息 ↓ 3. 执行扩容或优化 ↓ 4. 监控清理进度 ↓ 5. 恢复正常架构 ``` --- ### 6. RocketMQ vs Kafka #### **核心特性对比** | 特性 | RocketMQ | Kafka | |------|----------|-------| | **开发语言** | Java | Scala/Java | | **协议** | 自有协议 | TCP | | **消息模型** | 队列模型 | 发布-订阅模型 | | **消息顺序** | 支持严格顺序 | 分区内有序 | | **事务消息** | ✅ 支持 | ❌ 不支持(KIP-98 有提案) | | **定时消息** | ✅ 支持 | ❌ 不支持 | | **延迟消息** | ✅ 支持(18 级) | ❌ 不支持 | | **消息回溯** | ✅ 支持 | ✅ 支持(需设置) | | **消息过滤** | SQL92 表达式 | 客户端过滤 | | **吞吐量** | 万级/单机 | 十万级/单机 | | **延迟** | ms 级 | ms 级 | | **可靠性** | 高(同步刷盘 + 主从) | 高(多副本) | | **运维复杂度** | 中 | 中 | | **社区活跃度** | 中(阿里主导) | 高(Confluent 支持) | | **文档质量** | 中(中文文档多) | 高 | | **适用场景** | 业务消息、订单、金融 | 日志收集、流式处理 | --- #### **详细对比** ##### **1. 消息模型** **RocketMQ**: ``` Topic: OrderTopic ├─ Queue1 ├─ Queue2 ├─ Queue3 └─ Queue4 消费者组:order-group ├─ 消费者1 → Queue1, Queue2 └─ 消费者2 → Queue3, Queue4 一条消息只能被同一个消费者组的一个消费者消费 ``` **Kafka**: ``` Topic: OrderTopic ├─ Partition1 │ ├─ Replica1 (Leader) │ ├─ Replica2 (Follower) │ └─ Replica3 (Follower) ├─ Partition2 │ ├─ Replica1 (Leader) │ ├─ Replica2 (Follower) │ └─ Replica3 (Follower) └─ ... 消费者组:order-group ├─ 消费者1 → Partition1 └─ 消费者2 → Partition2 一条消息只能被同一个消费者组的一个消费者消费 ``` **区别**: - RocketMQ 的 Queue 是逻辑概念 - Kafka 的 Partition 是物理概念(有副本) --- ##### **2. 消息存储** **RocketMQ**: ``` CommitLog: ├─ 所有消息顺序写入(混合存储) ├─ 顺序写,性能高 └─ 通过 ConsumeQueue 索引快速查找 ConsumeQueue(索引文件): ├─ 消息 1: CommitLog 偏移量 100, 大小 200, TagsCode ├─ 消息 2: CommitLog 偏移量 300, 大小 150, TagsCode └─ 消息 3: CommitLog 偏移量 450, 大小 180, TagsCode ``` **Kafka**: ``` Partition: ├─ Segment 0 │ ├─ 00000000000000000000.log (消息数据) │ ├─ 00000000000000000000.index (索引) │ └─ 00000000000000000000.timeindex (时间索引) ├─ Segment 1 │ ├─ 00000000000000050000.log │ ├─ 00000000000000050000.index │ └─ 00000000000000050000.timeindex └─ ... 每个 Partition 独立存储 ``` **对比**: - RocketMQ:所有消息混合存储,通过索引文件查找 - Kafka:每个 Partition 独立存储,支持 Segment 滚动 --- ##### **3. 消息过滤** **RocketMQ**(服务端过滤): ```java // 消费者订阅时指定过滤条件 consumer.subscribe("OrderTopic", MessageSelector.bySql("amount > 100 AND status = 'PAID'")); // 生产者发送消息时设置属性 Message msg = new Message("OrderTopic", "Order", body); msg.putUserProperty("amount", "200"); msg.putUserProperty("status", "PAID"); producer.send(msg); ``` **Kafka**(客户端过滤): ```java // 消费者消费后手动过滤 KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("OrderTopic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { // 客户端过滤(浪费网络带宽) if (record.value().contains("amount:200")) { processMessage(record); } } } ``` **区别**: - RocketMQ:服务端过滤(节省带宽) - Kafka:客户端过滤(浪费带宽) --- ##### **4. 事务消息** **RocketMQ**: ```java // 1. 发送半消息 Message msg = new Message("OrderTopic", orderJson.getBytes()); TransactionSendResult result = producer.sendMessageInTransaction(msg, null); // 2. 执行本地事务(如创建订单) public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { orderService.createOrder(msg); return LocalTransactionState.COMMIT_MESSAGE; // 提交消息 } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚消息 } } // 3. 提交/回滚消息 // 消费者可以正常消费 ``` **Kafka**: ``` ❌ 不支持事务消息(KIP-98 有提案,但未正式发布) ✅ 支持"精确一次"语义(Exactly-Once),但仅限于流式处理(Kafka Streams) ``` --- ##### **5. 延迟消息** **RocketMQ**: ```java // 18 个默认延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h) Message msg = new Message("OrderTopic", body); msg.setDelayTimeLevel(3); // 延迟 10 秒 producer.send(msg); ``` **Kafka**: ```java // ❌ 不支持延迟消息 // ✅ 通过时间轮自定义实现 ``` --- ##### **6. 吞吐量** **测试环境**: - 机器:4 核 8G - 消息大小:1 KB **结果**: | MQ | 单机吞吐量 | 延迟 | |----|-----------|------| | RocketMQ | 10-15 万/秒 | 1-5 ms | | Kafka | 30-50 万/秒 | 1-5 ms | **原因**: - Kafka:零拷贝(sendfile)、批量处理、更好的磁盘顺序写 - RocketMQ:也有零拷贝,但 Java GC 影响性能 --- #### **选型建议** ##### **选择 RocketMQ 的场景** 1. **需要事务消息**: - 订单系统(下单 → 扣库存) - 支付系统(支付 → 通知) 2. **需要延迟消息**: - 定时任务 - 超时取消订单 3. **需要严格的消息顺序**: - 金融交易 - 订单状态流转 4. **需要消息回溯**: - 数据恢复 - 数据重放 5. **业务复杂、需要高可靠性**: - 金融、支付、电商 --- ##### **选择 Kafka 的场景** 1. **日志收集**: - 应用日志 → ELK - 用户行为日志 2. **流式处理**: - 实时数据分析 - 实时推荐 3. **高吞吐量场景**: - 日志收集(百万级/秒) - 监控数据收集 4. **已有 Kafka 生态**: - 使用 Flink、Spark Streaming 等流式计算框架 --- #### **企业级选型案例** | 公司 | 场景 | 选择 | 原因 | |------|------|------|------| | 阿里巴巴 | 订单、支付 | RocketMQ | 事务消息、高可靠性 | | 美团 | 订单、物流 | RocketMQ | 业务复杂、延迟消息 | | 字节跳动 | 日志、推荐 | Kafka | 高吞吐、流式处理 | | 携程 | 订单、日志 | 两者都用 | 订单用 RocketMQ,日志用 Kafka | | 京东 | 订单、支付 | RocketMQ | 事务消息、高可靠性 | --- ### 7. 总结 #### **消息队列核心问题** | 问题 | 解决方案 | |------|----------| | **消息不丢失** | 生产者(同步发送、事务消息、本地消息表)
Broker(持久化、主从、多副本)
消费者(手动提交 offset、死信队列) | | **消息重复消费** | 数据库唯一约束、Redis 分布式锁、状态机、去重表、MVCC | | **消息顺序性** | 单分区、按 Key 分区、内存队列 | | **消息积压** | 横向扩容、纵向扩容、优化消费逻辑、临时 Topic、丢弃消息 | | **高可用** | 集群部署、主从复制、多副本、故障自动转移 | --- ### 8. 阿里 P7 加分项 **深度理解**: - 理解 RocketMQ 的 CommitLog 和 ConsumeQueue 设计 - 理解 Kafka 的 Partition、Segment、副本同步机制 - 理解零拷贝(sendfile)、mmap 等底层技术 **实战经验**: - 有将消息积压从千万级清理到正常的经验 - 有处理消息丢失、重复消费的线上故障经验 - 有消息队列性能调优经验(JVM 参数、操作系统参数) **架构能力**: - 能设计跨数据中心的消息队列架构 - 能设计消息队列的监控和告警体系 - 有消息队列迁移经验(如从 ActiveMQ 迁移到 RocketMQ) **技术选型**: - 能根据业务特点选择合适的 MQ - 有多种 MQ 混用的经验 - 了解 MQTT、AMQP 等其他消息协议