1107 lines
26 KiB
Markdown
1107 lines
26 KiB
Markdown
# 交易撮合引擎突发流量处理
|
||
|
||
## 题目描述
|
||
|
||
在设计交易撮合引擎时,如果某一个标的(如某只股票或加密货币)突发流量太大,应该如何处理?
|
||
|
||
**场景示例**:
|
||
- 某只股票突然发布重大利好消息
|
||
- 加密货币市场出现极端行情
|
||
- 知名人士在社交媒体发表评论引发交易狂潮
|
||
- 系统故障导致的订单堆积
|
||
|
||
**问题**:
|
||
1. 如何保护系统不被击垮?
|
||
2. 如何保证系统公平性?
|
||
3. 如何保证数据一致性?
|
||
4. 如何快速恢复服务?
|
||
|
||
---
|
||
|
||
## 思路推导
|
||
|
||
### 问题分析
|
||
|
||
**突发流量的特征**:
|
||
- 流量峰值可能是正常流量的 10-100 倍
|
||
- 请求集中在短时间内(几秒到几分钟)
|
||
- 订单类型可能单一(如大量卖单或买单)
|
||
- 可能伴随恶意攻击或刷单行为
|
||
|
||
**系统面临的风险**:
|
||
1. **服务不可用**:系统资源耗尽,无法处理正常请求
|
||
2. **数据不一致**:高并发下订单状态混乱
|
||
3. **用户体验差**:请求超时、延迟过高
|
||
4. **不公平性**:部分用户订单被优先处理
|
||
|
||
### 为什么这样思考?
|
||
|
||
**核心原则**:**保护系统 > 处理请求 > 用户体验**
|
||
|
||
就像股票市场遇到极端行情时会采取"熔断机制"一样,交易系统也需要有多层保护机制。
|
||
|
||
---
|
||
|
||
## 解题思路
|
||
|
||
### 核心思想
|
||
|
||
**多层防护 + 降级策略 + 优先级队列**
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────┐
|
||
│ 用户请求 │
|
||
└──────────────────┬──────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────┐
|
||
│ 1. 限流层 │ ← 防止过载
|
||
│ - 用户级限流 │
|
||
│ - 标的级限流 │
|
||
│ - 系统级限流 │
|
||
└────────┬─────────┘
|
||
│
|
||
▼
|
||
┌─────────────────┐
|
||
│ 2. 缓冲层 │ ← 削峰填谷
|
||
│ - 消息队列 │
|
||
│ - 本地缓存 │
|
||
└────────┬─────────┘
|
||
│
|
||
▼
|
||
┌─────────────────┐
|
||
│ 3. 优先级层 │ ← 保证公平
|
||
│ - VIP 用户 │
|
||
│ - 订单类型 │
|
||
│ - 时间优先 │
|
||
└────────┬─────────┘
|
||
│
|
||
▼
|
||
┌─────────────────┐
|
||
│ 4. 处理层 │ ← 实际撮合
|
||
│ - 撮合引擎 │
|
||
│ - 风控系统 │
|
||
└─────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 详细方案
|
||
|
||
### 方案一:限流保护(第一道防线)
|
||
|
||
**目的**:防止系统被瞬时大流量击垮
|
||
|
||
#### 1.1 用户级限流
|
||
|
||
```go
|
||
// 滑动窗口限流
|
||
type RateLimiter struct {
|
||
windows map[string]*UserWindow
|
||
mutex sync.RWMutex
|
||
}
|
||
|
||
type UserWindow struct {
|
||
requests []time.Time
|
||
limit int // 时间窗口内最大请求数
|
||
window time.Duration // 时间窗口大小
|
||
}
|
||
|
||
func (r *RateLimiter) Allow(userID string) bool {
|
||
r.mutex.Lock()
|
||
defer r.mutex.Unlock()
|
||
|
||
now := time.Now()
|
||
window, exists := r.windows[userID]
|
||
|
||
if !exists {
|
||
window = &UserWindow{
|
||
requests: []time.Time{now},
|
||
limit: 100, // 每分钟100个订单
|
||
window: time.Minute, // 1分钟窗口
|
||
}
|
||
r.windows[userID] = window
|
||
return true
|
||
}
|
||
|
||
// 清理过期请求
|
||
validRequests := []time.Time{}
|
||
for _, req := range window.requests {
|
||
if now.Sub(req) < window.window {
|
||
validRequests = append(validRequests, req)
|
||
}
|
||
}
|
||
|
||
// 检查是否超过限制
|
||
if len(validRequests) >= window.limit {
|
||
return false // 超过限制,拒绝请求
|
||
}
|
||
|
||
// 添加新请求
|
||
validRequests = append(validRequests, now)
|
||
window.requests = validRequests
|
||
return true
|
||
}
|
||
```
|
||
|
||
**关键点**:
|
||
- 普通用户:100 次/分钟
|
||
- VIP 用户:500 次/分钟
|
||
- 机构用户:2000 次/分钟
|
||
|
||
#### 1.2 标的级限流
|
||
|
||
```go
|
||
// 单个标的的全局限流
|
||
type SymbolRateLimiter struct {
|
||
symbols map[string]*SymbolLimit
|
||
mutex sync.RWMutex
|
||
}
|
||
|
||
type SymbolLimit struct {
|
||
currentOrders int
|
||
maxOrders int // 并发订单数上限
|
||
queueSize int // 队列积压上限
|
||
}
|
||
|
||
func (s *SymbolRateLimiter) Allow(symbol string) bool {
|
||
s.mutex.Lock()
|
||
defer s.mutex.Unlock()
|
||
|
||
limit, exists := s.symbols[symbol]
|
||
if !exists {
|
||
limit = &SymbolLimit{
|
||
maxOrders: 10000, // 单标的最大并发订单数
|
||
queueSize: 50000, // 单标的最大队列积压
|
||
}
|
||
s.symbols[symbol] = limit
|
||
}
|
||
|
||
// 检查并发订单数
|
||
if limit.currentOrders >= limit.maxOrders {
|
||
return false // 并发订单已满,拒绝
|
||
}
|
||
|
||
// 检查队列积压
|
||
if limit.queueSize >= 50000 {
|
||
return false // 队列积压过多,拒绝
|
||
}
|
||
|
||
limit.currentOrders++
|
||
return true
|
||
}
|
||
|
||
func (s *SymbolRateLimiter) Release(symbol string) {
|
||
s.mutex.Lock()
|
||
defer s.mutex.Unlock()
|
||
|
||
if limit, exists := s.symbols[symbol]; exists {
|
||
limit.currentOrders--
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 1.3 系统级限流
|
||
|
||
```go
|
||
// 全局系统限流
|
||
type SystemRateLimiter struct {
|
||
currentLoad int64
|
||
maxLoad int64 // 系统最大负载
|
||
}
|
||
|
||
func (s *SystemRateLimiter) Allow() bool {
|
||
current := atomic.LoadInt64(&s.currentLoad)
|
||
if current >= s.maxLoad {
|
||
return false
|
||
}
|
||
|
||
atomic.AddInt64(&s.currentLoad, 1)
|
||
return true
|
||
}
|
||
```
|
||
|
||
**关键指标**:
|
||
- CPU 使用率 < 70%
|
||
- 内存使用率 < 80%
|
||
- 网络带宽 < 70%
|
||
- 请求响应时间 P99 < 100ms
|
||
|
||
---
|
||
|
||
### 方案二:消息队列缓冲(第二道防线)
|
||
|
||
**目的**:削峰填谷,异步处理
|
||
|
||
#### 2.1 架构设计
|
||
|
||
```go
|
||
type OrderQueue struct {
|
||
priorityQueues map[Priority]*queue.PriorityQueue
|
||
mutex sync.RWMutex
|
||
}
|
||
|
||
type Priority int
|
||
|
||
const (
|
||
PRIORITY_EMERGENCY Priority = iota // 紧急订单(如撤单)
|
||
PRIORITY_HIGH // 高优先级(VIP用户、市价单)
|
||
PRIORITY_NORMAL // 普通订单
|
||
PRIORITY_LOW // 低优先级(小额限价单)
|
||
)
|
||
|
||
type Order struct {
|
||
OrderID string
|
||
UserID string
|
||
Symbol string
|
||
Price decimal.Decimal
|
||
Quantity decimal.Decimal
|
||
Priority Priority
|
||
CreateTime time.Time
|
||
}
|
||
|
||
// 提交订单到队列
|
||
func (o *OrderQueue) Enqueue(order *Order) error {
|
||
o.mutex.Lock()
|
||
defer o.mutex.Unlock()
|
||
|
||
queue := o.priorityQueues[order.Priority]
|
||
return queue.Enqueue(order)
|
||
}
|
||
|
||
// 从队列取出订单
|
||
func (o *OrderQueue) Dequeue() (*Order, error) {
|
||
o.mutex.RLock()
|
||
defer o.mutex.RUnlock()
|
||
|
||
// 按优先级顺序取出
|
||
priorities := []Priority{PRIORITY_EMERGENCY, PRIORITY_HIGH, PRIORITY_NORMAL, PRIORITY_LOW}
|
||
|
||
for _, priority := range priorities {
|
||
queue := o.priorityQueues[priority]
|
||
if !queue.IsEmpty() {
|
||
return queue.Dequeue()
|
||
}
|
||
}
|
||
|
||
return nil, errors.New("queue is empty")
|
||
}
|
||
```
|
||
|
||
#### 2.2 队列配置
|
||
|
||
```yaml
|
||
kafka:
|
||
brokers:
|
||
- "kafka1:9092"
|
||
- "kafka2:9092"
|
||
- "kafka3:9092"
|
||
|
||
topics:
|
||
orders:
|
||
partitions: 100 # 分区数,提高并发
|
||
replication-factor: 3 # 副本数,保证高可用
|
||
retention-ms: 60000 # 消息保留1分钟
|
||
|
||
producer:
|
||
acks: 1 # 等待leader确认
|
||
retries: 3 # 重试3次
|
||
batch-size: 16384 # 批量发送
|
||
linger-ms: 10 # 等待10ms收集批量
|
||
|
||
consumer:
|
||
group-id: "matching-engine"
|
||
max-poll-records: 500 # 每次最多拉取500条
|
||
fetch-min-bytes: 1024 # 最小拉取1KB
|
||
session-timeout-ms: 30000
|
||
```
|
||
|
||
#### 2.3 动态扩容
|
||
|
||
```go
|
||
// 监控队列积压情况
|
||
func (o *OrderQueue) Monitor() {
|
||
ticker := time.NewTicker(5 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for range ticker.C {
|
||
o.mutex.RLock()
|
||
|
||
for priority, queue := range o.priorityQueues {
|
||
size := queue.Size()
|
||
|
||
// 队列积压超过阈值,告警
|
||
if size > 10000 {
|
||
log.Warnf("Priority %d queue backlog: %d", priority, size)
|
||
|
||
// 触发扩容
|
||
if priority == PRIORITY_HIGH || priority == PRIORITY_EMERGENCY {
|
||
o.triggerScaleUp()
|
||
}
|
||
}
|
||
}
|
||
|
||
o.mutex.RUnlock()
|
||
}
|
||
}
|
||
|
||
// 动态扩容处理节点
|
||
func (o *OrderQueue) triggerScaleUp() {
|
||
// 通知K8s增加Pod数量
|
||
// 或者通知自动扩缩容系统
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 方案三:优先级队列(第三道防线)
|
||
|
||
**目的**:保证重要订单优先处理
|
||
|
||
#### 3.1 订单优先级策略
|
||
|
||
```go
|
||
type OrderPriority struct {
|
||
order *Order
|
||
score float64
|
||
}
|
||
|
||
// 计算订单优先级分数
|
||
func CalculatePriorityScore(order *Order) float64 {
|
||
score := 0.0
|
||
|
||
// 1. 用户等级权重 (40%)
|
||
userLevel := getUserLevel(order.UserID)
|
||
score += float64(userLevel) * 0.4
|
||
|
||
// 2. 订单类型权重 (30%)
|
||
if order.Type == MARKET_ORDER {
|
||
score += 30 * 0.3 // 市价单优先
|
||
} else if order.Type == LIMIT_ORDER {
|
||
score += 20 * 0.3 // 限价单次之
|
||
} else if order.Type == CANCEL_ORDER {
|
||
score += 50 * 0.3 // 撤单最优先
|
||
}
|
||
|
||
// 3. 订单金额权重 (20%)
|
||
amount := order.Price.Mul(order.Quantity)
|
||
if amount.GreaterThan(decimal.NewFromInt(100000)) {
|
||
score += 30 * 0.2 // 大额订单
|
||
} else if amount.GreaterThan(decimal.NewFromInt(10000)) {
|
||
score += 20 * 0.2
|
||
} else {
|
||
score += 10 * 0.2
|
||
}
|
||
|
||
// 4. 等待时间权重 (10%)
|
||
waitTime := time.Since(order.CreateTime).Seconds()
|
||
score += math.Min(waitTime/60.0, 10) * 0.1 // 最多等待10分钟
|
||
|
||
return score
|
||
}
|
||
```
|
||
|
||
#### 3.2 优先级队列实现
|
||
|
||
```go
|
||
type PriorityQueue struct {
|
||
items []*OrderPriority
|
||
mutex sync.RWMutex
|
||
}
|
||
|
||
func (p *PriorityQueue) Push(order *Order) {
|
||
p.mutex.Lock()
|
||
defer p.mutex.Unlock()
|
||
|
||
score := CalculatePriorityScore(order)
|
||
item := &OrderPriority{order: order, score: score}
|
||
|
||
// 使用堆结构
|
||
p.items = append(p.items, item)
|
||
p.heapifyUp(len(p.items) - 1)
|
||
}
|
||
|
||
func (p *PriorityQueue) Pop() *Order {
|
||
p.mutex.Lock()
|
||
defer p.mutex.Unlock()
|
||
|
||
if len(p.items) == 0 {
|
||
return nil
|
||
}
|
||
|
||
root := p.items[0]
|
||
last := p.items[len(p.items)-1]
|
||
p.items = p.items[:len(p.items)-1]
|
||
|
||
if len(p.items) > 0 {
|
||
p.items[0] = last
|
||
p.heapifyDown(0)
|
||
}
|
||
|
||
return root.order
|
||
}
|
||
|
||
func (p *PriorityQueue) heapifyUp(index int) {
|
||
for index > 0 {
|
||
parent := (index - 1) / 2
|
||
if p.items[parent].score >= p.items[index].score {
|
||
break
|
||
}
|
||
p.items[parent], p.items[index] = p.items[index], p.items[parent]
|
||
index = parent
|
||
}
|
||
}
|
||
|
||
func (p *PriorityQueue) heapifyDown(index int) {
|
||
for {
|
||
left := 2*index + 1
|
||
right := 2*index + 2
|
||
largest := index
|
||
|
||
if left < len(p.items) && p.items[left].score > p.items[largest].score {
|
||
largest = left
|
||
}
|
||
|
||
if right < len(p.items) && p.items[right].score > p.items[largest].score {
|
||
largest = right
|
||
}
|
||
|
||
if largest == index {
|
||
break
|
||
}
|
||
|
||
p.items[index], p.items[largest] = p.items[largest], p.items[index]
|
||
index = largest
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 方案四:降级策略(第四道防线)
|
||
|
||
**目的**:在极端情况下保证核心功能
|
||
|
||
#### 4.1 服务降级
|
||
|
||
```go
|
||
type ServiceLevel int
|
||
|
||
const (
|
||
LEVEL_FULL ServiceLevel = iota // 全功能
|
||
LEVEL_BASIC // 基础功能
|
||
LEVEL_MINIMAL // 最小功能
|
||
LEVEL_EMERGENCY // 紧急模式
|
||
)
|
||
|
||
type DegradationManager struct {
|
||
currentLevel ServiceLevel
|
||
mutex sync.RWMutex
|
||
}
|
||
|
||
// 根据系统负载调整服务级别
|
||
func (d *DegradationManager) AdjustLevel(cpuUsage, memoryUsage float64) {
|
||
d.mutex.Lock()
|
||
defer d.mutex.Unlock()
|
||
|
||
if cpuUsage > 90 || memoryUsage > 90 {
|
||
d.currentLevel = LEVEL_EMERGENCY
|
||
d.applyEmergencyMode()
|
||
} else if cpuUsage > 80 || memoryUsage > 80 {
|
||
d.currentLevel = LEVEL_MINIMAL
|
||
d.applyMinimalMode()
|
||
} else if cpuUsage > 70 || memoryUsage > 70 {
|
||
d.currentLevel = LEVEL_BASIC
|
||
d.applyBasicMode()
|
||
} else {
|
||
d.currentLevel = LEVEL_FULL
|
||
d.applyFullMode()
|
||
}
|
||
}
|
||
|
||
func (d *DegradationManager) applyEmergencyMode() {
|
||
log.Warn("Entering EMERGENCY mode")
|
||
|
||
// 1. 只处理撤单请求
|
||
// 2. 暂停新的订单接收
|
||
// 3. 清理队列积压
|
||
// 4. 通知用户系统繁忙
|
||
|
||
// 发送告警
|
||
alerting.SendAlert("EMERGENCY", "System in emergency mode")
|
||
}
|
||
|
||
func (d *DegradationManager) applyMinimalMode() {
|
||
log.Warn("Entering MINIMAL mode")
|
||
|
||
// 1. 只处理VIP用户订单
|
||
// 2. 限制订单类型(只允许市价单)
|
||
// 3. 降低非核心功能
|
||
}
|
||
|
||
func (d *DegradationManager) applyBasicMode() {
|
||
log.Info("Entering BASIC mode")
|
||
|
||
// 1. 限制订单速率
|
||
// 2. 关闭查询功能
|
||
// 3. 延迟通知
|
||
}
|
||
|
||
func (d *DegradationManager) applyFullMode() {
|
||
log.Info("Entering FULL mode")
|
||
|
||
// 恢复所有功能
|
||
}
|
||
```
|
||
|
||
#### 4.2 熔断机制
|
||
|
||
```go
|
||
type CircuitBreaker struct {
|
||
maxFailures int
|
||
resetTimeout time.Duration
|
||
currentFailures int
|
||
lastFailureTime time.Time
|
||
state State
|
||
mutex sync.RWMutex
|
||
}
|
||
|
||
type State int
|
||
|
||
const (
|
||
STATE_CLOSED State = iota
|
||
STATE_OPEN
|
||
STATE_HALF_OPEN
|
||
)
|
||
|
||
func (c *CircuitBreaker) Allow() bool {
|
||
c.mutex.Lock()
|
||
defer c.mutex.Unlock()
|
||
|
||
// 如果熔断器打开,检查是否可以尝试恢复
|
||
if c.state == STATE_OPEN {
|
||
if time.Since(c.lastFailureTime) > c.resetTimeout {
|
||
c.state = STATE_HALF_OPEN
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
// 如果是半开状态,允许少量请求通过
|
||
if c.state == STATE_HALF_OPEN {
|
||
return true
|
||
}
|
||
|
||
// 关闭状态,正常允许
|
||
return true
|
||
}
|
||
|
||
func (c *CircuitBreaker) RecordSuccess() {
|
||
c.mutex.Lock()
|
||
defer c.mutex.Unlock()
|
||
|
||
c.currentFailures = 0
|
||
|
||
if c.state == STATE_HALF_OPEN {
|
||
c.state = STATE_CLOSED
|
||
log.Info("Circuit breaker recovered")
|
||
}
|
||
}
|
||
|
||
func (c *CircuitBreaker) RecordFailure() {
|
||
c.mutex.Lock()
|
||
defer c.mutex.Unlock()
|
||
|
||
c.currentFailures++
|
||
c.lastFailureTime = time.Now()
|
||
|
||
if c.currentFailures >= c.maxFailures {
|
||
c.state = STATE_OPEN
|
||
log.Error("Circuit breaker opened due to failures")
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 方案五:数据一致性保证
|
||
|
||
#### 5.1 分布式事务
|
||
|
||
```go
|
||
// 使用 Saga 模式处理订单
|
||
type OrderSaga struct {
|
||
steps []SagaStep
|
||
compensatable bool
|
||
}
|
||
|
||
type SagaStep struct {
|
||
Name string
|
||
Execute func() error
|
||
Compensate func() error
|
||
}
|
||
|
||
func (s *OrderSaga) Execute() error {
|
||
executedSteps := []int{}
|
||
|
||
for i, step := range s.steps {
|
||
if err := step.Execute(); err != nil {
|
||
// 执行失败,执行补偿
|
||
s.compensate(executedSteps)
|
||
return err
|
||
}
|
||
executedSteps = append(executedSteps, i)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *OrderSaga) compensate(executedSteps []int) {
|
||
// 反向执行补偿操作
|
||
for i := len(executedSteps) - 1; i >= 0; i-- {
|
||
stepIndex := executedSteps[i]
|
||
step := s.steps[stepIndex]
|
||
|
||
if err := step.Compensate(); err != nil {
|
||
log.Errorf("Compensation failed for step %s: %v", step.Name, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 使用示例
|
||
func ProcessOrder(order *Order) error {
|
||
saga := &OrderSaga{
|
||
steps: []SagaStep{
|
||
{
|
||
Name: "ValidateOrder",
|
||
Execute: func() error {
|
||
return validateOrder(order)
|
||
},
|
||
Compensate: func() error {
|
||
return nil // 无需补偿
|
||
},
|
||
},
|
||
{
|
||
Name: "ReserveBalance",
|
||
Execute: func() error {
|
||
return reserveBalance(order)
|
||
},
|
||
Compensate: func() error {
|
||
return releaseBalance(order)
|
||
},
|
||
},
|
||
{
|
||
Name: "MatchOrder",
|
||
Execute: func() error {
|
||
return matchOrder(order)
|
||
},
|
||
Compensate: func() error {
|
||
return cancelMatch(order)
|
||
},
|
||
},
|
||
{
|
||
Name: "UpdatePosition",
|
||
Execute: func() error {
|
||
return updatePosition(order)
|
||
},
|
||
Compensate: func() error {
|
||
return revertPosition(order)
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
return saga.Execute()
|
||
}
|
||
```
|
||
|
||
#### 5.2 数据库优化
|
||
|
||
```sql
|
||
-- 订单表分区(按时间)
|
||
CREATE TABLE orders (
|
||
order_id BIGINT PRIMARY KEY,
|
||
user_id BIGINT NOT NULL,
|
||
symbol VARCHAR(32) NOT NULL,
|
||
price DECIMAL(20, 8) NOT NULL,
|
||
quantity DECIMAL(20, 8) NOT NULL,
|
||
status TINYINT NOT NULL,
|
||
create_time DATETIME NOT NULL,
|
||
update_time DATETIME NOT NULL,
|
||
INDEX idx_user_symbol (user_id, symbol),
|
||
INDEX idx_create_time (create_time)
|
||
) PARTITION BY RANGE (YEAR(create_time)) (
|
||
PARTITION p2023 VALUES LESS THAN (2024),
|
||
PARTITION p2024 VALUES LESS THAN (2025),
|
||
PARTITION p2025 VALUES LESS THAN (2026),
|
||
PARTITION p_future VALUES LESS THAN MAXVALUE
|
||
);
|
||
|
||
-- 使用 Redis 缓存热点数据
|
||
type OrderCache struct {
|
||
redis *redis.Client
|
||
}
|
||
|
||
func (o *OrderCache) GetOrder(orderID string) (*Order, error) {
|
||
// 先从 Redis 获取
|
||
key := fmt.Sprintf("order:%s", orderID)
|
||
data, err := o.redis.Get(key).Bytes()
|
||
|
||
if err == nil {
|
||
var order Order
|
||
if err := json.Unmarshal(data, &order); err == nil {
|
||
return &order, nil
|
||
}
|
||
}
|
||
|
||
// Redis 没有,从数据库获取
|
||
order, err := o.getOrderFromDB(orderID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 写入 Redis
|
||
data, _ = json.Marshal(order)
|
||
o.redis.Set(key, data, 5*time.Minute)
|
||
|
||
return order, nil
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 方案六:监控和告警
|
||
|
||
#### 6.1 实时监控指标
|
||
|
||
```go
|
||
type Metrics struct {
|
||
// 系统指标
|
||
CPUUsage float64
|
||
MemoryUsage float64
|
||
DiskIO float64
|
||
NetworkIO float64
|
||
|
||
// 业务指标
|
||
OrdersPerSecond int
|
||
QueueSize int
|
||
AvgLatency time.Duration
|
||
P99Latency time.Duration
|
||
ErrorRate float64
|
||
|
||
// 标的指标
|
||
HotSymbols []string
|
||
SymbolOrders map[string]int
|
||
}
|
||
|
||
func CollectMetrics() *Metrics {
|
||
m := &Metrics{
|
||
SymbolOrders: make(map[string]int),
|
||
}
|
||
|
||
// 1. 收集系统指标
|
||
m.CPUUsage = getCPUUsage()
|
||
m.MemoryUsage = getMemoryUsage()
|
||
m.DiskIO = getDiskIO()
|
||
m.NetworkIO = getNetworkIO()
|
||
|
||
// 2. 收集业务指标
|
||
m.OrdersPerSecond = getOrdersPerSecond()
|
||
m.QueueSize = getQueueSize()
|
||
m.AvgLatency = getAvgLatency()
|
||
m.P99Latency = getP99Latency()
|
||
m.ErrorRate = getErrorRate()
|
||
|
||
// 3. 收集标的指标
|
||
m.SymbolOrders = getSymbolOrders()
|
||
|
||
// 4. 识别热点标的
|
||
m.HotSymbols = identifyHotSymbols(m.SymbolOrders)
|
||
|
||
return m
|
||
}
|
||
|
||
func identifyHotSymbols(symbolOrders map[string]int) []string {
|
||
type SymbolCount struct {
|
||
Symbol string
|
||
Count int
|
||
}
|
||
|
||
var counts []SymbolCount
|
||
totalOrders := 0
|
||
|
||
for symbol, count := range symbolOrders {
|
||
counts = append(counts, SymbolCount{Symbol: symbol, Count: count})
|
||
totalOrders += count
|
||
}
|
||
|
||
// 排序
|
||
sort.Slice(counts, func(i, j int) bool {
|
||
return counts[i].Count > counts[j].Count
|
||
})
|
||
|
||
// 识别热点标的(订单数 > 平均值的3倍)
|
||
avgOrders := totalOrders / len(symbolOrders)
|
||
var hotSymbols []string
|
||
|
||
for _, sc := range counts {
|
||
if sc.Count > avgOrders*3 {
|
||
hotSymbols = append(hotSymbols, sc.Symbol)
|
||
}
|
||
}
|
||
|
||
return hotSymbols
|
||
}
|
||
```
|
||
|
||
#### 6.2 告警规则
|
||
|
||
```go
|
||
type AlertRule struct {
|
||
Name string
|
||
Condition func(*Metrics) bool
|
||
Severity string
|
||
Message string
|
||
}
|
||
|
||
var alertRules = []AlertRule{
|
||
{
|
||
Name: "系统负载过高",
|
||
Condition: func(m *Metrics) bool {
|
||
return m.CPUUsage > 80 || m.MemoryUsage > 80
|
||
},
|
||
Severity: "WARNING",
|
||
Message: "系统负载过高,CPU: %.2f%%, Memory: %.2f%%",
|
||
},
|
||
{
|
||
Name: "队列积压过多",
|
||
Condition: func(m *Metrics) bool {
|
||
return m.QueueSize > 10000
|
||
},
|
||
Severity: "CRITICAL",
|
||
Message: "队列积压过多,当前积压: %d",
|
||
},
|
||
{
|
||
Name: "延迟过高",
|
||
Condition: func(m *Metrics) bool {
|
||
return m.P99Latency > 500*time.Millisecond
|
||
},
|
||
Severity: "WARNING",
|
||
Message: "P99延迟过高,当前: %v",
|
||
},
|
||
{
|
||
Name: "错误率过高",
|
||
Condition: func(m *Metrics) bool {
|
||
return m.ErrorRate > 0.05
|
||
},
|
||
Severity: "CRITICAL",
|
||
Message: "错误率过高,当前: %.2f%%",
|
||
},
|
||
{
|
||
Name: "热点标的",
|
||
Condition: func(m *Metrics) bool {
|
||
return len(m.HotSymbols) > 0
|
||
},
|
||
Severity: "INFO",
|
||
Message: "发现热点标的: %v",
|
||
},
|
||
}
|
||
|
||
func CheckAlerts(metrics *Metrics) {
|
||
for _, rule := range alertRules {
|
||
if rule.Condition(metrics) {
|
||
message := fmt.Sprintf(rule.Message, metrics)
|
||
SendAlert(rule.Severity, rule.Name, message)
|
||
}
|
||
}
|
||
}
|
||
|
||
func SendAlert(severity, name, message string) {
|
||
// 发送告警到多个渠道
|
||
// 1. 邮件
|
||
emailAlert := fmt.Sprintf("[%s] %s: %s", severity, name, message)
|
||
sendEmail(emailAlert)
|
||
|
||
// 2. 短信
|
||
if severity == "CRITICAL" {
|
||
sendSMS(message)
|
||
}
|
||
|
||
// 3. 即时通讯工具(如钉钉、企业微信)
|
||
sendIMAlert(severity, name, message)
|
||
|
||
// 4. 写入日志
|
||
log.Printf("[%s] %s: %s", severity, name, message)
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 实战案例
|
||
|
||
### 案例1:某只股票突发利好消息
|
||
|
||
**场景**:
|
||
- 10:00 AM 发布重大利好
|
||
- 10:00:01 订单量从 100/秒 飙升到 10000/秒
|
||
- 系统CPU从 30% 飙升到 95%
|
||
|
||
**处理流程**:
|
||
|
||
```
|
||
1. 10:00:01 - 监控检测到异常
|
||
└─ CPU: 95%, Memory: 85%
|
||
└─ 订单量: 10000/秒
|
||
└─ 触发 CRITICAL 告警
|
||
|
||
2. 10:00:02 - 自动限流
|
||
├─ 用户级限流: 100 → 10 次/分钟
|
||
├─ 标的级限流: 该股票订单限制为 1000/秒
|
||
└─ 拒绝非核心请求(查询、行情等)
|
||
|
||
3. 10:00:05 - 服务降级
|
||
├─ 进入 BASIC 模式
|
||
├─ 只处理市价单
|
||
├─ 限制限价单
|
||
└─ 延迟非核心功能
|
||
|
||
4. 10:00:10 - 队列积压
|
||
├─ 队列积压: 50000
|
||
├─ 启用优先级队列
|
||
├─ VIP 用户订单优先
|
||
└─ 撤单请求最优先
|
||
|
||
5. 10:00:30 - 扩容
|
||
├─ 自动扩容处理节点: 10 → 20
|
||
├─ 扩容消息队列分区: 50 → 100
|
||
└─ 增加数据库连接池
|
||
|
||
6. 10:01:00 - 恢复正常
|
||
├─ CPU: 60%, Memory: 70%
|
||
├─ 订单量: 2000/秒
|
||
├─ 队列积压: 5000
|
||
└─ 退出降级模式
|
||
|
||
7. 10:02:00 - 完全恢复
|
||
├─ CPU: 40%, Memory: 60%
|
||
├─ 订单量: 500/秒
|
||
├─ 队列积压: 0
|
||
└─ 恢复所有功能
|
||
```
|
||
|
||
### 案例2:恶意刷单攻击
|
||
|
||
**场景**:
|
||
- 某用户使用脚本大量下单
|
||
- 短时间内发送 10000 个小额订单
|
||
- 目的是测试系统性能或进行恶意攻击
|
||
|
||
**处理流程**:
|
||
|
||
```
|
||
1. 检测异常行为
|
||
├─ 单用户下单频率异常: 10000/秒
|
||
├─ 订单金额异常: 都是0.01元
|
||
└─ IP地址异常: 同一IP大量请求
|
||
|
||
2. 触发风控规则
|
||
├─ 限制该用户: 封禁24小时
|
||
├─ 限制该IP: 加入黑名单
|
||
└─ 撤销该用户所有未成交订单
|
||
|
||
3. 系统保护
|
||
├─ 清理恶意订单
|
||
├─ 释放系统资源
|
||
└─ 记录攻击日志
|
||
```
|
||
|
||
---
|
||
|
||
## P7 加分项
|
||
|
||
### 深度理解
|
||
|
||
1. **为什么需要多层防护?**
|
||
- 单一防线容易被突破
|
||
- 每层防线解决不同层面的问题
|
||
- 渐进式降级,保证核心功能
|
||
|
||
2. **限流 vs 降级 vs 熔断的区别?**
|
||
- **限流**:主动控制请求量
|
||
- **降级**:主动降低服务质量
|
||
- **熔断**:被动保护系统
|
||
|
||
3. **如何保证公平性?**
|
||
- 优先级队列:VIP用户优先
|
||
- 时间优先:先到先得
|
||
- 金额优先:大额订单优先
|
||
- 但不能完全牺牲普通用户
|
||
|
||
### 实战扩展
|
||
|
||
**相关技术**:
|
||
- Redis: 限流、缓存
|
||
- Kafka: 消息队列
|
||
- Hystrix: 熔断器
|
||
- Prometheus: 监控
|
||
- Sentinel: 流量控制
|
||
|
||
**最佳实践**:
|
||
1. 事前:压测、预案
|
||
2. 事中:监控、自动处理
|
||
3. 事后:分析、优化
|
||
|
||
### 变体问题
|
||
|
||
1. **如果数据库也扛不住了怎么办?**
|
||
- 读写分离
|
||
- 分库分表
|
||
- 使用缓存
|
||
- 异步写入
|
||
|
||
2. **如何保证订单不丢失?**
|
||
- 消息队列持久化
|
||
- 定时任务扫描
|
||
- 对账系统
|
||
- 事务日志
|
||
|
||
3. **如何测试系统抗压能力?**
|
||
- 压力测试
|
||
- 混沌工程
|
||
- 故障注入
|
||
- 全链路压测
|
||
|
||
---
|
||
|
||
## 总结
|
||
|
||
**核心要点**:
|
||
|
||
1. **多层防护**:限流 → 缓冲 → 优先级 → 降级
|
||
2. **快速响应**:实时监控 → 自动处理 → 及时告警
|
||
3. **保护核心**:保证撮合引擎正常运行
|
||
4. **用户体验**:透明沟通 → 快速恢复
|
||
|
||
**技术栈**:
|
||
|
||
- 限流:Redis + 滑动窗口
|
||
- 队列:Kafka + 优先级队列
|
||
- 监控:Prometheus + Grafana
|
||
- 熔断:Hystrix / Sentinel
|
||
- 缓存:Redis + 本地缓存
|
||
|
||
**关键指标**:
|
||
|
||
- CPU 使用率 < 70%
|
||
- 内存使用率 < 80%
|
||
- P99 延迟 < 100ms
|
||
- 错误率 < 0.01%
|
||
- 队列积压 < 10000
|
||
|
||
**易错点**:
|
||
|
||
- 限流太严格影响用户体验
|
||
- 优先级设计不合理导致不公平
|
||
- 降级策略不明确导致混乱
|
||
- 忘记恢复导致长期降级
|