# 交易撮合引擎突发流量处理 ## 题目描述 在设计交易撮合引擎时,如果某一个标的(如某只股票或加密货币)突发流量太大,应该如何处理? **场景示例**: - 某只股票突然发布重大利好消息 - 加密货币市场出现极端行情 - 知名人士在社交媒体发表评论引发交易狂潮 - 系统故障导致的订单堆积 **问题**: 1. 如何保护系统不被击垮? 2. 如何保证系统公平性? 3. 如何保证数据一致性? 4. 如何快速恢复服务? --- ## 思路推导 ### 问题分析 **突发流量的特征**: - 流量峰值可能是正常流量的 10-100 倍 - 请求集中在短时间内(几秒到几分钟) - 订单类型可能单一(如大量卖单或买单) - 可能伴随恶意攻击或刷单行为 **系统面临的风险**: 1. **服务不可用**:系统资源耗尽,无法处理正常请求 2. **数据不一致**:高并发下订单状态混乱 3. **用户体验差**:请求超时、延迟过高 4. **不公平性**:部分用户订单被优先处理 ### 为什么这样思考? **核心原则**:**保护系统 > 处理请求 > 用户体验** 就像股票市场遇到极端行情时会采取"熔断机制"一样,交易系统也需要有多层保护机制。 --- ## 解题思路 ### 核心思想 **多层防护 + 降级策略 + 优先级队列** ``` ┌─────────────────────────────────────────────────────┐ │ 用户请求 │ └──────────────────┬──────────────────────────────────┘ │ ▼ ┌─────────────────┐ │ 1. 限流层 │ ← 防止过载 │ - 用户级限流 │ │ - 标的级限流 │ │ - 系统级限流 │ └────────┬─────────┘ │ ▼ ┌─────────────────┐ │ 2. 缓冲层 │ ← 削峰填谷 │ - 消息队列 │ │ - 本地缓存 │ └────────┬─────────┘ │ ▼ ┌─────────────────┐ │ 3. 优先级层 │ ← 保证公平 │ - VIP 用户 │ │ - 订单类型 │ │ - 时间优先 │ └────────┬─────────┘ │ ▼ ┌─────────────────┐ │ 4. 处理层 │ ← 实际撮合 │ - 撮合引擎 │ │ - 风控系统 │ └─────────────────┘ ``` --- ## 详细方案 ### 方案一:限流保护(第一道防线) **目的**:防止系统被瞬时大流量击垮 #### 1.1 用户级限流 ```go // 滑动窗口限流 type RateLimiter struct { windows map[string]*UserWindow mutex sync.RWMutex } type UserWindow struct { requests []time.Time limit int // 时间窗口内最大请求数 window time.Duration // 时间窗口大小 } func (r *RateLimiter) Allow(userID string) bool { r.mutex.Lock() defer r.mutex.Unlock() now := time.Now() window, exists := r.windows[userID] if !exists { window = &UserWindow{ requests: []time.Time{now}, limit: 100, // 每分钟100个订单 window: time.Minute, // 1分钟窗口 } r.windows[userID] = window return true } // 清理过期请求 validRequests := []time.Time{} for _, req := range window.requests { if now.Sub(req) < window.window { validRequests = append(validRequests, req) } } // 检查是否超过限制 if len(validRequests) >= window.limit { return false // 超过限制,拒绝请求 } // 添加新请求 validRequests = append(validRequests, now) window.requests = validRequests return true } ``` **关键点**: - 普通用户:100 次/分钟 - VIP 用户:500 次/分钟 - 机构用户:2000 次/分钟 #### 1.2 标的级限流 ```go // 单个标的的全局限流 type SymbolRateLimiter struct { symbols map[string]*SymbolLimit mutex sync.RWMutex } type SymbolLimit struct { currentOrders int maxOrders int // 并发订单数上限 queueSize int // 队列积压上限 } func (s *SymbolRateLimiter) Allow(symbol string) bool { s.mutex.Lock() defer s.mutex.Unlock() limit, exists := s.symbols[symbol] if !exists { limit = &SymbolLimit{ maxOrders: 10000, // 单标的最大并发订单数 queueSize: 50000, // 单标的最大队列积压 } s.symbols[symbol] = limit } // 检查并发订单数 if limit.currentOrders >= limit.maxOrders { return false // 并发订单已满,拒绝 } // 检查队列积压 if limit.queueSize >= 50000 { return false // 队列积压过多,拒绝 } limit.currentOrders++ return true } func (s *SymbolRateLimiter) Release(symbol string) { s.mutex.Lock() defer s.mutex.Unlock() if limit, exists := s.symbols[symbol]; exists { limit.currentOrders-- } } ``` #### 1.3 系统级限流 ```go // 全局系统限流 type SystemRateLimiter struct { currentLoad int64 maxLoad int64 // 系统最大负载 } func (s *SystemRateLimiter) Allow() bool { current := atomic.LoadInt64(&s.currentLoad) if current >= s.maxLoad { return false } atomic.AddInt64(&s.currentLoad, 1) return true } ``` **关键指标**: - CPU 使用率 < 70% - 内存使用率 < 80% - 网络带宽 < 70% - 请求响应时间 P99 < 100ms --- ### 方案二:消息队列缓冲(第二道防线) **目的**:削峰填谷,异步处理 #### 2.1 架构设计 ```go type OrderQueue struct { priorityQueues map[Priority]*queue.PriorityQueue mutex sync.RWMutex } type Priority int const ( PRIORITY_EMERGENCY Priority = iota // 紧急订单(如撤单) PRIORITY_HIGH // 高优先级(VIP用户、市价单) PRIORITY_NORMAL // 普通订单 PRIORITY_LOW // 低优先级(小额限价单) ) type Order struct { OrderID string UserID string Symbol string Price decimal.Decimal Quantity decimal.Decimal Priority Priority CreateTime time.Time } // 提交订单到队列 func (o *OrderQueue) Enqueue(order *Order) error { o.mutex.Lock() defer o.mutex.Unlock() queue := o.priorityQueues[order.Priority] return queue.Enqueue(order) } // 从队列取出订单 func (o *OrderQueue) Dequeue() (*Order, error) { o.mutex.RLock() defer o.mutex.RUnlock() // 按优先级顺序取出 priorities := []Priority{PRIORITY_EMERGENCY, PRIORITY_HIGH, PRIORITY_NORMAL, PRIORITY_LOW} for _, priority := range priorities { queue := o.priorityQueues[priority] if !queue.IsEmpty() { return queue.Dequeue() } } return nil, errors.New("queue is empty") } ``` #### 2.2 队列配置 ```yaml kafka: brokers: - "kafka1:9092" - "kafka2:9092" - "kafka3:9092" topics: orders: partitions: 100 # 分区数,提高并发 replication-factor: 3 # 副本数,保证高可用 retention-ms: 60000 # 消息保留1分钟 producer: acks: 1 # 等待leader确认 retries: 3 # 重试3次 batch-size: 16384 # 批量发送 linger-ms: 10 # 等待10ms收集批量 consumer: group-id: "matching-engine" max-poll-records: 500 # 每次最多拉取500条 fetch-min-bytes: 1024 # 最小拉取1KB session-timeout-ms: 30000 ``` #### 2.3 动态扩容 ```go // 监控队列积压情况 func (o *OrderQueue) Monitor() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { o.mutex.RLock() for priority, queue := range o.priorityQueues { size := queue.Size() // 队列积压超过阈值,告警 if size > 10000 { log.Warnf("Priority %d queue backlog: %d", priority, size) // 触发扩容 if priority == PRIORITY_HIGH || priority == PRIORITY_EMERGENCY { o.triggerScaleUp() } } } o.mutex.RUnlock() } } // 动态扩容处理节点 func (o *OrderQueue) triggerScaleUp() { // 通知K8s增加Pod数量 // 或者通知自动扩缩容系统 } ``` --- ### 方案三:优先级队列(第三道防线) **目的**:保证重要订单优先处理 #### 3.1 订单优先级策略 ```go type OrderPriority struct { order *Order score float64 } // 计算订单优先级分数 func CalculatePriorityScore(order *Order) float64 { score := 0.0 // 1. 用户等级权重 (40%) userLevel := getUserLevel(order.UserID) score += float64(userLevel) * 0.4 // 2. 订单类型权重 (30%) if order.Type == MARKET_ORDER { score += 30 * 0.3 // 市价单优先 } else if order.Type == LIMIT_ORDER { score += 20 * 0.3 // 限价单次之 } else if order.Type == CANCEL_ORDER { score += 50 * 0.3 // 撤单最优先 } // 3. 订单金额权重 (20%) amount := order.Price.Mul(order.Quantity) if amount.GreaterThan(decimal.NewFromInt(100000)) { score += 30 * 0.2 // 大额订单 } else if amount.GreaterThan(decimal.NewFromInt(10000)) { score += 20 * 0.2 } else { score += 10 * 0.2 } // 4. 等待时间权重 (10%) waitTime := time.Since(order.CreateTime).Seconds() score += math.Min(waitTime/60.0, 10) * 0.1 // 最多等待10分钟 return score } ``` #### 3.2 优先级队列实现 ```go type PriorityQueue struct { items []*OrderPriority mutex sync.RWMutex } func (p *PriorityQueue) Push(order *Order) { p.mutex.Lock() defer p.mutex.Unlock() score := CalculatePriorityScore(order) item := &OrderPriority{order: order, score: score} // 使用堆结构 p.items = append(p.items, item) p.heapifyUp(len(p.items) - 1) } func (p *PriorityQueue) Pop() *Order { p.mutex.Lock() defer p.mutex.Unlock() if len(p.items) == 0 { return nil } root := p.items[0] last := p.items[len(p.items)-1] p.items = p.items[:len(p.items)-1] if len(p.items) > 0 { p.items[0] = last p.heapifyDown(0) } return root.order } func (p *PriorityQueue) heapifyUp(index int) { for index > 0 { parent := (index - 1) / 2 if p.items[parent].score >= p.items[index].score { break } p.items[parent], p.items[index] = p.items[index], p.items[parent] index = parent } } func (p *PriorityQueue) heapifyDown(index int) { for { left := 2*index + 1 right := 2*index + 2 largest := index if left < len(p.items) && p.items[left].score > p.items[largest].score { largest = left } if right < len(p.items) && p.items[right].score > p.items[largest].score { largest = right } if largest == index { break } p.items[index], p.items[largest] = p.items[largest], p.items[index] index = largest } } ``` --- ### 方案四:降级策略(第四道防线) **目的**:在极端情况下保证核心功能 #### 4.1 服务降级 ```go type ServiceLevel int const ( LEVEL_FULL ServiceLevel = iota // 全功能 LEVEL_BASIC // 基础功能 LEVEL_MINIMAL // 最小功能 LEVEL_EMERGENCY // 紧急模式 ) type DegradationManager struct { currentLevel ServiceLevel mutex sync.RWMutex } // 根据系统负载调整服务级别 func (d *DegradationManager) AdjustLevel(cpuUsage, memoryUsage float64) { d.mutex.Lock() defer d.mutex.Unlock() if cpuUsage > 90 || memoryUsage > 90 { d.currentLevel = LEVEL_EMERGENCY d.applyEmergencyMode() } else if cpuUsage > 80 || memoryUsage > 80 { d.currentLevel = LEVEL_MINIMAL d.applyMinimalMode() } else if cpuUsage > 70 || memoryUsage > 70 { d.currentLevel = LEVEL_BASIC d.applyBasicMode() } else { d.currentLevel = LEVEL_FULL d.applyFullMode() } } func (d *DegradationManager) applyEmergencyMode() { log.Warn("Entering EMERGENCY mode") // 1. 只处理撤单请求 // 2. 暂停新的订单接收 // 3. 清理队列积压 // 4. 通知用户系统繁忙 // 发送告警 alerting.SendAlert("EMERGENCY", "System in emergency mode") } func (d *DegradationManager) applyMinimalMode() { log.Warn("Entering MINIMAL mode") // 1. 只处理VIP用户订单 // 2. 限制订单类型(只允许市价单) // 3. 降低非核心功能 } func (d *DegradationManager) applyBasicMode() { log.Info("Entering BASIC mode") // 1. 限制订单速率 // 2. 关闭查询功能 // 3. 延迟通知 } func (d *DegradationManager) applyFullMode() { log.Info("Entering FULL mode") // 恢复所有功能 } ``` #### 4.2 熔断机制 ```go type CircuitBreaker struct { maxFailures int resetTimeout time.Duration currentFailures int lastFailureTime time.Time state State mutex sync.RWMutex } type State int const ( STATE_CLOSED State = iota STATE_OPEN STATE_HALF_OPEN ) func (c *CircuitBreaker) Allow() bool { c.mutex.Lock() defer c.mutex.Unlock() // 如果熔断器打开,检查是否可以尝试恢复 if c.state == STATE_OPEN { if time.Since(c.lastFailureTime) > c.resetTimeout { c.state = STATE_HALF_OPEN return true } return false } // 如果是半开状态,允许少量请求通过 if c.state == STATE_HALF_OPEN { return true } // 关闭状态,正常允许 return true } func (c *CircuitBreaker) RecordSuccess() { c.mutex.Lock() defer c.mutex.Unlock() c.currentFailures = 0 if c.state == STATE_HALF_OPEN { c.state = STATE_CLOSED log.Info("Circuit breaker recovered") } } func (c *CircuitBreaker) RecordFailure() { c.mutex.Lock() defer c.mutex.Unlock() c.currentFailures++ c.lastFailureTime = time.Now() if c.currentFailures >= c.maxFailures { c.state = STATE_OPEN log.Error("Circuit breaker opened due to failures") } } ``` --- ### 方案五:数据一致性保证 #### 5.1 分布式事务 ```go // 使用 Saga 模式处理订单 type OrderSaga struct { steps []SagaStep compensatable bool } type SagaStep struct { Name string Execute func() error Compensate func() error } func (s *OrderSaga) Execute() error { executedSteps := []int{} for i, step := range s.steps { if err := step.Execute(); err != nil { // 执行失败,执行补偿 s.compensate(executedSteps) return err } executedSteps = append(executedSteps, i) } return nil } func (s *OrderSaga) compensate(executedSteps []int) { // 反向执行补偿操作 for i := len(executedSteps) - 1; i >= 0; i-- { stepIndex := executedSteps[i] step := s.steps[stepIndex] if err := step.Compensate(); err != nil { log.Errorf("Compensation failed for step %s: %v", step.Name, err) } } } // 使用示例 func ProcessOrder(order *Order) error { saga := &OrderSaga{ steps: []SagaStep{ { Name: "ValidateOrder", Execute: func() error { return validateOrder(order) }, Compensate: func() error { return nil // 无需补偿 }, }, { Name: "ReserveBalance", Execute: func() error { return reserveBalance(order) }, Compensate: func() error { return releaseBalance(order) }, }, { Name: "MatchOrder", Execute: func() error { return matchOrder(order) }, Compensate: func() error { return cancelMatch(order) }, }, { Name: "UpdatePosition", Execute: func() error { return updatePosition(order) }, Compensate: func() error { return revertPosition(order) }, }, }, } return saga.Execute() } ``` #### 5.2 数据库优化 ```sql -- 订单表分区(按时间) CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, user_id BIGINT NOT NULL, symbol VARCHAR(32) NOT NULL, price DECIMAL(20, 8) NOT NULL, quantity DECIMAL(20, 8) NOT NULL, status TINYINT NOT NULL, create_time DATETIME NOT NULL, update_time DATETIME NOT NULL, INDEX idx_user_symbol (user_id, symbol), INDEX idx_create_time (create_time) ) PARTITION BY RANGE (YEAR(create_time)) ( PARTITION p2023 VALUES LESS THAN (2024), PARTITION p2024 VALUES LESS THAN (2025), PARTITION p2025 VALUES LESS THAN (2026), PARTITION p_future VALUES LESS THAN MAXVALUE ); -- 使用 Redis 缓存热点数据 type OrderCache struct { redis *redis.Client } func (o *OrderCache) GetOrder(orderID string) (*Order, error) { // 先从 Redis 获取 key := fmt.Sprintf("order:%s", orderID) data, err := o.redis.Get(key).Bytes() if err == nil { var order Order if err := json.Unmarshal(data, &order); err == nil { return &order, nil } } // Redis 没有,从数据库获取 order, err := o.getOrderFromDB(orderID) if err != nil { return nil, err } // 写入 Redis data, _ = json.Marshal(order) o.redis.Set(key, data, 5*time.Minute) return order, nil } ``` --- ### 方案六:监控和告警 #### 6.1 实时监控指标 ```go type Metrics struct { // 系统指标 CPUUsage float64 MemoryUsage float64 DiskIO float64 NetworkIO float64 // 业务指标 OrdersPerSecond int QueueSize int AvgLatency time.Duration P99Latency time.Duration ErrorRate float64 // 标的指标 HotSymbols []string SymbolOrders map[string]int } func CollectMetrics() *Metrics { m := &Metrics{ SymbolOrders: make(map[string]int), } // 1. 收集系统指标 m.CPUUsage = getCPUUsage() m.MemoryUsage = getMemoryUsage() m.DiskIO = getDiskIO() m.NetworkIO = getNetworkIO() // 2. 收集业务指标 m.OrdersPerSecond = getOrdersPerSecond() m.QueueSize = getQueueSize() m.AvgLatency = getAvgLatency() m.P99Latency = getP99Latency() m.ErrorRate = getErrorRate() // 3. 收集标的指标 m.SymbolOrders = getSymbolOrders() // 4. 识别热点标的 m.HotSymbols = identifyHotSymbols(m.SymbolOrders) return m } func identifyHotSymbols(symbolOrders map[string]int) []string { type SymbolCount struct { Symbol string Count int } var counts []SymbolCount totalOrders := 0 for symbol, count := range symbolOrders { counts = append(counts, SymbolCount{Symbol: symbol, Count: count}) totalOrders += count } // 排序 sort.Slice(counts, func(i, j int) bool { return counts[i].Count > counts[j].Count }) // 识别热点标的(订单数 > 平均值的3倍) avgOrders := totalOrders / len(symbolOrders) var hotSymbols []string for _, sc := range counts { if sc.Count > avgOrders*3 { hotSymbols = append(hotSymbols, sc.Symbol) } } return hotSymbols } ``` #### 6.2 告警规则 ```go type AlertRule struct { Name string Condition func(*Metrics) bool Severity string Message string } var alertRules = []AlertRule{ { Name: "系统负载过高", Condition: func(m *Metrics) bool { return m.CPUUsage > 80 || m.MemoryUsage > 80 }, Severity: "WARNING", Message: "系统负载过高,CPU: %.2f%%, Memory: %.2f%%", }, { Name: "队列积压过多", Condition: func(m *Metrics) bool { return m.QueueSize > 10000 }, Severity: "CRITICAL", Message: "队列积压过多,当前积压: %d", }, { Name: "延迟过高", Condition: func(m *Metrics) bool { return m.P99Latency > 500*time.Millisecond }, Severity: "WARNING", Message: "P99延迟过高,当前: %v", }, { Name: "错误率过高", Condition: func(m *Metrics) bool { return m.ErrorRate > 0.05 }, Severity: "CRITICAL", Message: "错误率过高,当前: %.2f%%", }, { Name: "热点标的", Condition: func(m *Metrics) bool { return len(m.HotSymbols) > 0 }, Severity: "INFO", Message: "发现热点标的: %v", }, } func CheckAlerts(metrics *Metrics) { for _, rule := range alertRules { if rule.Condition(metrics) { message := fmt.Sprintf(rule.Message, metrics) SendAlert(rule.Severity, rule.Name, message) } } } func SendAlert(severity, name, message string) { // 发送告警到多个渠道 // 1. 邮件 emailAlert := fmt.Sprintf("[%s] %s: %s", severity, name, message) sendEmail(emailAlert) // 2. 短信 if severity == "CRITICAL" { sendSMS(message) } // 3. 即时通讯工具(如钉钉、企业微信) sendIMAlert(severity, name, message) // 4. 写入日志 log.Printf("[%s] %s: %s", severity, name, message) } ``` --- ## 实战案例 ### 案例1:某只股票突发利好消息 **场景**: - 10:00 AM 发布重大利好 - 10:00:01 订单量从 100/秒 飙升到 10000/秒 - 系统CPU从 30% 飙升到 95% **处理流程**: ``` 1. 10:00:01 - 监控检测到异常 └─ CPU: 95%, Memory: 85% └─ 订单量: 10000/秒 └─ 触发 CRITICAL 告警 2. 10:00:02 - 自动限流 ├─ 用户级限流: 100 → 10 次/分钟 ├─ 标的级限流: 该股票订单限制为 1000/秒 └─ 拒绝非核心请求(查询、行情等) 3. 10:00:05 - 服务降级 ├─ 进入 BASIC 模式 ├─ 只处理市价单 ├─ 限制限价单 └─ 延迟非核心功能 4. 10:00:10 - 队列积压 ├─ 队列积压: 50000 ├─ 启用优先级队列 ├─ VIP 用户订单优先 └─ 撤单请求最优先 5. 10:00:30 - 扩容 ├─ 自动扩容处理节点: 10 → 20 ├─ 扩容消息队列分区: 50 → 100 └─ 增加数据库连接池 6. 10:01:00 - 恢复正常 ├─ CPU: 60%, Memory: 70% ├─ 订单量: 2000/秒 ├─ 队列积压: 5000 └─ 退出降级模式 7. 10:02:00 - 完全恢复 ├─ CPU: 40%, Memory: 60% ├─ 订单量: 500/秒 ├─ 队列积压: 0 └─ 恢复所有功能 ``` ### 案例2:恶意刷单攻击 **场景**: - 某用户使用脚本大量下单 - 短时间内发送 10000 个小额订单 - 目的是测试系统性能或进行恶意攻击 **处理流程**: ``` 1. 检测异常行为 ├─ 单用户下单频率异常: 10000/秒 ├─ 订单金额异常: 都是0.01元 └─ IP地址异常: 同一IP大量请求 2. 触发风控规则 ├─ 限制该用户: 封禁24小时 ├─ 限制该IP: 加入黑名单 └─ 撤销该用户所有未成交订单 3. 系统保护 ├─ 清理恶意订单 ├─ 释放系统资源 └─ 记录攻击日志 ``` --- ## P7 加分项 ### 深度理解 1. **为什么需要多层防护?** - 单一防线容易被突破 - 每层防线解决不同层面的问题 - 渐进式降级,保证核心功能 2. **限流 vs 降级 vs 熔断的区别?** - **限流**:主动控制请求量 - **降级**:主动降低服务质量 - **熔断**:被动保护系统 3. **如何保证公平性?** - 优先级队列:VIP用户优先 - 时间优先:先到先得 - 金额优先:大额订单优先 - 但不能完全牺牲普通用户 ### 实战扩展 **相关技术**: - Redis: 限流、缓存 - Kafka: 消息队列 - Hystrix: 熔断器 - Prometheus: 监控 - Sentinel: 流量控制 **最佳实践**: 1. 事前:压测、预案 2. 事中:监控、自动处理 3. 事后:分析、优化 ### 变体问题 1. **如果数据库也扛不住了怎么办?** - 读写分离 - 分库分表 - 使用缓存 - 异步写入 2. **如何保证订单不丢失?** - 消息队列持久化 - 定时任务扫描 - 对账系统 - 事务日志 3. **如何测试系统抗压能力?** - 压力测试 - 混沌工程 - 故障注入 - 全链路压测 --- ## 总结 **核心要点**: 1. **多层防护**:限流 → 缓冲 → 优先级 → 降级 2. **快速响应**:实时监控 → 自动处理 → 及时告警 3. **保护核心**:保证撮合引擎正常运行 4. **用户体验**:透明沟通 → 快速恢复 **技术栈**: - 限流:Redis + 滑动窗口 - 队列:Kafka + 优先级队列 - 监控:Prometheus + Grafana - 熔断:Hystrix / Sentinel - 缓存:Redis + 本地缓存 **关键指标**: - CPU 使用率 < 70% - 内存使用率 < 80% - P99 延迟 < 100ms - 错误率 < 0.01% - 队列积压 < 10000 **易错点**: - 限流太严格影响用户体验 - 优先级设计不合理导致不公平 - 降级策略不明确导致混乱 - 忘记恢复导致长期降级