26 KiB
26 KiB
交易撮合引擎突发流量处理
题目描述
在设计交易撮合引擎时,如果某一个标的(如某只股票或加密货币)突发流量太大,应该如何处理?
场景示例:
- 某只股票突然发布重大利好消息
- 加密货币市场出现极端行情
- 知名人士在社交媒体发表评论引发交易狂潮
- 系统故障导致的订单堆积
问题:
- 如何保护系统不被击垮?
- 如何保证系统公平性?
- 如何保证数据一致性?
- 如何快速恢复服务?
思路推导
问题分析
突发流量的特征:
- 流量峰值可能是正常流量的 10-100 倍
- 请求集中在短时间内(几秒到几分钟)
- 订单类型可能单一(如大量卖单或买单)
- 可能伴随恶意攻击或刷单行为
系统面临的风险:
- 服务不可用:系统资源耗尽,无法处理正常请求
- 数据不一致:高并发下订单状态混乱
- 用户体验差:请求超时、延迟过高
- 不公平性:部分用户订单被优先处理
为什么这样思考?
核心原则:保护系统 > 处理请求 > 用户体验
就像股票市场遇到极端行情时会采取"熔断机制"一样,交易系统也需要有多层保护机制。
解题思路
核心思想
多层防护 + 降级策略 + 优先级队列
┌─────────────────────────────────────────────────────┐
│ 用户请求 │
└──────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────┐
│ 1. 限流层 │ ← 防止过载
│ - 用户级限流 │
│ - 标的级限流 │
│ - 系统级限流 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 2. 缓冲层 │ ← 削峰填谷
│ - 消息队列 │
│ - 本地缓存 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 3. 优先级层 │ ← 保证公平
│ - VIP 用户 │
│ - 订单类型 │
│ - 时间优先 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 4. 处理层 │ ← 实际撮合
│ - 撮合引擎 │
│ - 风控系统 │
└─────────────────┘
详细方案
方案一:限流保护(第一道防线)
目的:防止系统被瞬时大流量击垮
1.1 用户级限流
// 滑动窗口限流
type RateLimiter struct {
windows map[string]*UserWindow
mutex sync.RWMutex
}
type UserWindow struct {
requests []time.Time
limit int // 时间窗口内最大请求数
window time.Duration // 时间窗口大小
}
func (r *RateLimiter) Allow(userID string) bool {
r.mutex.Lock()
defer r.mutex.Unlock()
now := time.Now()
window, exists := r.windows[userID]
if !exists {
window = &UserWindow{
requests: []time.Time{now},
limit: 100, // 每分钟100个订单
window: time.Minute, // 1分钟窗口
}
r.windows[userID] = window
return true
}
// 清理过期请求
validRequests := []time.Time{}
for _, req := range window.requests {
if now.Sub(req) < window.window {
validRequests = append(validRequests, req)
}
}
// 检查是否超过限制
if len(validRequests) >= window.limit {
return false // 超过限制,拒绝请求
}
// 添加新请求
validRequests = append(validRequests, now)
window.requests = validRequests
return true
}
关键点:
- 普通用户:100 次/分钟
- VIP 用户:500 次/分钟
- 机构用户:2000 次/分钟
1.2 标的级限流
// 单个标的的全局限流
type SymbolRateLimiter struct {
symbols map[string]*SymbolLimit
mutex sync.RWMutex
}
type SymbolLimit struct {
currentOrders int
maxOrders int // 并发订单数上限
queueSize int // 队列积压上限
}
func (s *SymbolRateLimiter) Allow(symbol string) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
limit, exists := s.symbols[symbol]
if !exists {
limit = &SymbolLimit{
maxOrders: 10000, // 单标的最大并发订单数
queueSize: 50000, // 单标的最大队列积压
}
s.symbols[symbol] = limit
}
// 检查并发订单数
if limit.currentOrders >= limit.maxOrders {
return false // 并发订单已满,拒绝
}
// 检查队列积压
if limit.queueSize >= 50000 {
return false // 队列积压过多,拒绝
}
limit.currentOrders++
return true
}
func (s *SymbolRateLimiter) Release(symbol string) {
s.mutex.Lock()
defer s.mutex.Unlock()
if limit, exists := s.symbols[symbol]; exists {
limit.currentOrders--
}
}
1.3 系统级限流
// 全局系统限流
type SystemRateLimiter struct {
currentLoad int64
maxLoad int64 // 系统最大负载
}
func (s *SystemRateLimiter) Allow() bool {
current := atomic.LoadInt64(&s.currentLoad)
if current >= s.maxLoad {
return false
}
atomic.AddInt64(&s.currentLoad, 1)
return true
}
关键指标:
- CPU 使用率 < 70%
- 内存使用率 < 80%
- 网络带宽 < 70%
- 请求响应时间 P99 < 100ms
方案二:消息队列缓冲(第二道防线)
目的:削峰填谷,异步处理
2.1 架构设计
type OrderQueue struct {
priorityQueues map[Priority]*queue.PriorityQueue
mutex sync.RWMutex
}
type Priority int
const (
PRIORITY_EMERGENCY Priority = iota // 紧急订单(如撤单)
PRIORITY_HIGH // 高优先级(VIP用户、市价单)
PRIORITY_NORMAL // 普通订单
PRIORITY_LOW // 低优先级(小额限价单)
)
type Order struct {
OrderID string
UserID string
Symbol string
Price decimal.Decimal
Quantity decimal.Decimal
Priority Priority
CreateTime time.Time
}
// 提交订单到队列
func (o *OrderQueue) Enqueue(order *Order) error {
o.mutex.Lock()
defer o.mutex.Unlock()
queue := o.priorityQueues[order.Priority]
return queue.Enqueue(order)
}
// 从队列取出订单
func (o *OrderQueue) Dequeue() (*Order, error) {
o.mutex.RLock()
defer o.mutex.RUnlock()
// 按优先级顺序取出
priorities := []Priority{PRIORITY_EMERGENCY, PRIORITY_HIGH, PRIORITY_NORMAL, PRIORITY_LOW}
for _, priority := range priorities {
queue := o.priorityQueues[priority]
if !queue.IsEmpty() {
return queue.Dequeue()
}
}
return nil, errors.New("queue is empty")
}
2.2 队列配置
kafka:
brokers:
- "kafka1:9092"
- "kafka2:9092"
- "kafka3:9092"
topics:
orders:
partitions: 100 # 分区数,提高并发
replication-factor: 3 # 副本数,保证高可用
retention-ms: 60000 # 消息保留1分钟
producer:
acks: 1 # 等待leader确认
retries: 3 # 重试3次
batch-size: 16384 # 批量发送
linger-ms: 10 # 等待10ms收集批量
consumer:
group-id: "matching-engine"
max-poll-records: 500 # 每次最多拉取500条
fetch-min-bytes: 1024 # 最小拉取1KB
session-timeout-ms: 30000
2.3 动态扩容
// 监控队列积压情况
func (o *OrderQueue) Monitor() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
o.mutex.RLock()
for priority, queue := range o.priorityQueues {
size := queue.Size()
// 队列积压超过阈值,告警
if size > 10000 {
log.Warnf("Priority %d queue backlog: %d", priority, size)
// 触发扩容
if priority == PRIORITY_HIGH || priority == PRIORITY_EMERGENCY {
o.triggerScaleUp()
}
}
}
o.mutex.RUnlock()
}
}
// 动态扩容处理节点
func (o *OrderQueue) triggerScaleUp() {
// 通知K8s增加Pod数量
// 或者通知自动扩缩容系统
}
方案三:优先级队列(第三道防线)
目的:保证重要订单优先处理
3.1 订单优先级策略
type OrderPriority struct {
order *Order
score float64
}
// 计算订单优先级分数
func CalculatePriorityScore(order *Order) float64 {
score := 0.0
// 1. 用户等级权重 (40%)
userLevel := getUserLevel(order.UserID)
score += float64(userLevel) * 0.4
// 2. 订单类型权重 (30%)
if order.Type == MARKET_ORDER {
score += 30 * 0.3 // 市价单优先
} else if order.Type == LIMIT_ORDER {
score += 20 * 0.3 // 限价单次之
} else if order.Type == CANCEL_ORDER {
score += 50 * 0.3 // 撤单最优先
}
// 3. 订单金额权重 (20%)
amount := order.Price.Mul(order.Quantity)
if amount.GreaterThan(decimal.NewFromInt(100000)) {
score += 30 * 0.2 // 大额订单
} else if amount.GreaterThan(decimal.NewFromInt(10000)) {
score += 20 * 0.2
} else {
score += 10 * 0.2
}
// 4. 等待时间权重 (10%)
waitTime := time.Since(order.CreateTime).Seconds()
score += math.Min(waitTime/60.0, 10) * 0.1 // 最多等待10分钟
return score
}
3.2 优先级队列实现
type PriorityQueue struct {
items []*OrderPriority
mutex sync.RWMutex
}
func (p *PriorityQueue) Push(order *Order) {
p.mutex.Lock()
defer p.mutex.Unlock()
score := CalculatePriorityScore(order)
item := &OrderPriority{order: order, score: score}
// 使用堆结构
p.items = append(p.items, item)
p.heapifyUp(len(p.items) - 1)
}
func (p *PriorityQueue) Pop() *Order {
p.mutex.Lock()
defer p.mutex.Unlock()
if len(p.items) == 0 {
return nil
}
root := p.items[0]
last := p.items[len(p.items)-1]
p.items = p.items[:len(p.items)-1]
if len(p.items) > 0 {
p.items[0] = last
p.heapifyDown(0)
}
return root.order
}
func (p *PriorityQueue) heapifyUp(index int) {
for index > 0 {
parent := (index - 1) / 2
if p.items[parent].score >= p.items[index].score {
break
}
p.items[parent], p.items[index] = p.items[index], p.items[parent]
index = parent
}
}
func (p *PriorityQueue) heapifyDown(index int) {
for {
left := 2*index + 1
right := 2*index + 2
largest := index
if left < len(p.items) && p.items[left].score > p.items[largest].score {
largest = left
}
if right < len(p.items) && p.items[right].score > p.items[largest].score {
largest = right
}
if largest == index {
break
}
p.items[index], p.items[largest] = p.items[largest], p.items[index]
index = largest
}
}
方案四:降级策略(第四道防线)
目的:在极端情况下保证核心功能
4.1 服务降级
type ServiceLevel int
const (
LEVEL_FULL ServiceLevel = iota // 全功能
LEVEL_BASIC // 基础功能
LEVEL_MINIMAL // 最小功能
LEVEL_EMERGENCY // 紧急模式
)
type DegradationManager struct {
currentLevel ServiceLevel
mutex sync.RWMutex
}
// 根据系统负载调整服务级别
func (d *DegradationManager) AdjustLevel(cpuUsage, memoryUsage float64) {
d.mutex.Lock()
defer d.mutex.Unlock()
if cpuUsage > 90 || memoryUsage > 90 {
d.currentLevel = LEVEL_EMERGENCY
d.applyEmergencyMode()
} else if cpuUsage > 80 || memoryUsage > 80 {
d.currentLevel = LEVEL_MINIMAL
d.applyMinimalMode()
} else if cpuUsage > 70 || memoryUsage > 70 {
d.currentLevel = LEVEL_BASIC
d.applyBasicMode()
} else {
d.currentLevel = LEVEL_FULL
d.applyFullMode()
}
}
func (d *DegradationManager) applyEmergencyMode() {
log.Warn("Entering EMERGENCY mode")
// 1. 只处理撤单请求
// 2. 暂停新的订单接收
// 3. 清理队列积压
// 4. 通知用户系统繁忙
// 发送告警
alerting.SendAlert("EMERGENCY", "System in emergency mode")
}
func (d *DegradationManager) applyMinimalMode() {
log.Warn("Entering MINIMAL mode")
// 1. 只处理VIP用户订单
// 2. 限制订单类型(只允许市价单)
// 3. 降低非核心功能
}
func (d *DegradationManager) applyBasicMode() {
log.Info("Entering BASIC mode")
// 1. 限制订单速率
// 2. 关闭查询功能
// 3. 延迟通知
}
func (d *DegradationManager) applyFullMode() {
log.Info("Entering FULL mode")
// 恢复所有功能
}
4.2 熔断机制
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
currentFailures int
lastFailureTime time.Time
state State
mutex sync.RWMutex
}
type State int
const (
STATE_CLOSED State = iota
STATE_OPEN
STATE_HALF_OPEN
)
func (c *CircuitBreaker) Allow() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
// 如果熔断器打开,检查是否可以尝试恢复
if c.state == STATE_OPEN {
if time.Since(c.lastFailureTime) > c.resetTimeout {
c.state = STATE_HALF_OPEN
return true
}
return false
}
// 如果是半开状态,允许少量请求通过
if c.state == STATE_HALF_OPEN {
return true
}
// 关闭状态,正常允许
return true
}
func (c *CircuitBreaker) RecordSuccess() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.currentFailures = 0
if c.state == STATE_HALF_OPEN {
c.state = STATE_CLOSED
log.Info("Circuit breaker recovered")
}
}
func (c *CircuitBreaker) RecordFailure() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.currentFailures++
c.lastFailureTime = time.Now()
if c.currentFailures >= c.maxFailures {
c.state = STATE_OPEN
log.Error("Circuit breaker opened due to failures")
}
}
方案五:数据一致性保证
5.1 分布式事务
// 使用 Saga 模式处理订单
type OrderSaga struct {
steps []SagaStep
compensatable bool
}
type SagaStep struct {
Name string
Execute func() error
Compensate func() error
}
func (s *OrderSaga) Execute() error {
executedSteps := []int{}
for i, step := range s.steps {
if err := step.Execute(); err != nil {
// 执行失败,执行补偿
s.compensate(executedSteps)
return err
}
executedSteps = append(executedSteps, i)
}
return nil
}
func (s *OrderSaga) compensate(executedSteps []int) {
// 反向执行补偿操作
for i := len(executedSteps) - 1; i >= 0; i-- {
stepIndex := executedSteps[i]
step := s.steps[stepIndex]
if err := step.Compensate(); err != nil {
log.Errorf("Compensation failed for step %s: %v", step.Name, err)
}
}
}
// 使用示例
func ProcessOrder(order *Order) error {
saga := &OrderSaga{
steps: []SagaStep{
{
Name: "ValidateOrder",
Execute: func() error {
return validateOrder(order)
},
Compensate: func() error {
return nil // 无需补偿
},
},
{
Name: "ReserveBalance",
Execute: func() error {
return reserveBalance(order)
},
Compensate: func() error {
return releaseBalance(order)
},
},
{
Name: "MatchOrder",
Execute: func() error {
return matchOrder(order)
},
Compensate: func() error {
return cancelMatch(order)
},
},
{
Name: "UpdatePosition",
Execute: func() error {
return updatePosition(order)
},
Compensate: func() error {
return revertPosition(order)
},
},
},
}
return saga.Execute()
}
5.2 数据库优化
-- 订单表分区(按时间)
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
symbol VARCHAR(32) NOT NULL,
price DECIMAL(20, 8) NOT NULL,
quantity DECIMAL(20, 8) NOT NULL,
status TINYINT NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_user_symbol (user_id, symbol),
INDEX idx_create_time (create_time)
) PARTITION BY RANGE (YEAR(create_time)) (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p2025 VALUES LESS THAN (2026),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 使用 Redis 缓存热点数据
type OrderCache struct {
redis *redis.Client
}
func (o *OrderCache) GetOrder(orderID string) (*Order, error) {
// 先从 Redis 获取
key := fmt.Sprintf("order:%s", orderID)
data, err := o.redis.Get(key).Bytes()
if err == nil {
var order Order
if err := json.Unmarshal(data, &order); err == nil {
return &order, nil
}
}
// Redis 没有,从数据库获取
order, err := o.getOrderFromDB(orderID)
if err != nil {
return nil, err
}
// 写入 Redis
data, _ = json.Marshal(order)
o.redis.Set(key, data, 5*time.Minute)
return order, nil
}
方案六:监控和告警
6.1 实时监控指标
type Metrics struct {
// 系统指标
CPUUsage float64
MemoryUsage float64
DiskIO float64
NetworkIO float64
// 业务指标
OrdersPerSecond int
QueueSize int
AvgLatency time.Duration
P99Latency time.Duration
ErrorRate float64
// 标的指标
HotSymbols []string
SymbolOrders map[string]int
}
func CollectMetrics() *Metrics {
m := &Metrics{
SymbolOrders: make(map[string]int),
}
// 1. 收集系统指标
m.CPUUsage = getCPUUsage()
m.MemoryUsage = getMemoryUsage()
m.DiskIO = getDiskIO()
m.NetworkIO = getNetworkIO()
// 2. 收集业务指标
m.OrdersPerSecond = getOrdersPerSecond()
m.QueueSize = getQueueSize()
m.AvgLatency = getAvgLatency()
m.P99Latency = getP99Latency()
m.ErrorRate = getErrorRate()
// 3. 收集标的指标
m.SymbolOrders = getSymbolOrders()
// 4. 识别热点标的
m.HotSymbols = identifyHotSymbols(m.SymbolOrders)
return m
}
func identifyHotSymbols(symbolOrders map[string]int) []string {
type SymbolCount struct {
Symbol string
Count int
}
var counts []SymbolCount
totalOrders := 0
for symbol, count := range symbolOrders {
counts = append(counts, SymbolCount{Symbol: symbol, Count: count})
totalOrders += count
}
// 排序
sort.Slice(counts, func(i, j int) bool {
return counts[i].Count > counts[j].Count
})
// 识别热点标的(订单数 > 平均值的3倍)
avgOrders := totalOrders / len(symbolOrders)
var hotSymbols []string
for _, sc := range counts {
if sc.Count > avgOrders*3 {
hotSymbols = append(hotSymbols, sc.Symbol)
}
}
return hotSymbols
}
6.2 告警规则
type AlertRule struct {
Name string
Condition func(*Metrics) bool
Severity string
Message string
}
var alertRules = []AlertRule{
{
Name: "系统负载过高",
Condition: func(m *Metrics) bool {
return m.CPUUsage > 80 || m.MemoryUsage > 80
},
Severity: "WARNING",
Message: "系统负载过高,CPU: %.2f%%, Memory: %.2f%%",
},
{
Name: "队列积压过多",
Condition: func(m *Metrics) bool {
return m.QueueSize > 10000
},
Severity: "CRITICAL",
Message: "队列积压过多,当前积压: %d",
},
{
Name: "延迟过高",
Condition: func(m *Metrics) bool {
return m.P99Latency > 500*time.Millisecond
},
Severity: "WARNING",
Message: "P99延迟过高,当前: %v",
},
{
Name: "错误率过高",
Condition: func(m *Metrics) bool {
return m.ErrorRate > 0.05
},
Severity: "CRITICAL",
Message: "错误率过高,当前: %.2f%%",
},
{
Name: "热点标的",
Condition: func(m *Metrics) bool {
return len(m.HotSymbols) > 0
},
Severity: "INFO",
Message: "发现热点标的: %v",
},
}
func CheckAlerts(metrics *Metrics) {
for _, rule := range alertRules {
if rule.Condition(metrics) {
message := fmt.Sprintf(rule.Message, metrics)
SendAlert(rule.Severity, rule.Name, message)
}
}
}
func SendAlert(severity, name, message string) {
// 发送告警到多个渠道
// 1. 邮件
emailAlert := fmt.Sprintf("[%s] %s: %s", severity, name, message)
sendEmail(emailAlert)
// 2. 短信
if severity == "CRITICAL" {
sendSMS(message)
}
// 3. 即时通讯工具(如钉钉、企业微信)
sendIMAlert(severity, name, message)
// 4. 写入日志
log.Printf("[%s] %s: %s", severity, name, message)
}
实战案例
案例1:某只股票突发利好消息
场景:
- 10:00 AM 发布重大利好
- 10:00:01 订单量从 100/秒 飙升到 10000/秒
- 系统CPU从 30% 飙升到 95%
处理流程:
1. 10:00:01 - 监控检测到异常
└─ CPU: 95%, Memory: 85%
└─ 订单量: 10000/秒
└─ 触发 CRITICAL 告警
2. 10:00:02 - 自动限流
├─ 用户级限流: 100 → 10 次/分钟
├─ 标的级限流: 该股票订单限制为 1000/秒
└─ 拒绝非核心请求(查询、行情等)
3. 10:00:05 - 服务降级
├─ 进入 BASIC 模式
├─ 只处理市价单
├─ 限制限价单
└─ 延迟非核心功能
4. 10:00:10 - 队列积压
├─ 队列积压: 50000
├─ 启用优先级队列
├─ VIP 用户订单优先
└─ 撤单请求最优先
5. 10:00:30 - 扩容
├─ 自动扩容处理节点: 10 → 20
├─ 扩容消息队列分区: 50 → 100
└─ 增加数据库连接池
6. 10:01:00 - 恢复正常
├─ CPU: 60%, Memory: 70%
├─ 订单量: 2000/秒
├─ 队列积压: 5000
└─ 退出降级模式
7. 10:02:00 - 完全恢复
├─ CPU: 40%, Memory: 60%
├─ 订单量: 500/秒
├─ 队列积压: 0
└─ 恢复所有功能
案例2:恶意刷单攻击
场景:
- 某用户使用脚本大量下单
- 短时间内发送 10000 个小额订单
- 目的是测试系统性能或进行恶意攻击
处理流程:
1. 检测异常行为
├─ 单用户下单频率异常: 10000/秒
├─ 订单金额异常: 都是0.01元
└─ IP地址异常: 同一IP大量请求
2. 触发风控规则
├─ 限制该用户: 封禁24小时
├─ 限制该IP: 加入黑名单
└─ 撤销该用户所有未成交订单
3. 系统保护
├─ 清理恶意订单
├─ 释放系统资源
└─ 记录攻击日志
P7 加分项
深度理解
-
为什么需要多层防护?
- 单一防线容易被突破
- 每层防线解决不同层面的问题
- 渐进式降级,保证核心功能
-
限流 vs 降级 vs 熔断的区别?
- 限流:主动控制请求量
- 降级:主动降低服务质量
- 熔断:被动保护系统
-
如何保证公平性?
- 优先级队列:VIP用户优先
- 时间优先:先到先得
- 金额优先:大额订单优先
- 但不能完全牺牲普通用户
实战扩展
相关技术:
- Redis: 限流、缓存
- Kafka: 消息队列
- Hystrix: 熔断器
- Prometheus: 监控
- Sentinel: 流量控制
最佳实践:
- 事前:压测、预案
- 事中:监控、自动处理
- 事后:分析、优化
变体问题
-
如果数据库也扛不住了怎么办?
- 读写分离
- 分库分表
- 使用缓存
- 异步写入
-
如何保证订单不丢失?
- 消息队列持久化
- 定时任务扫描
- 对账系统
- 事务日志
-
如何测试系统抗压能力?
- 压力测试
- 混沌工程
- 故障注入
- 全链路压测
总结
核心要点:
- 多层防护:限流 → 缓冲 → 优先级 → 降级
- 快速响应:实时监控 → 自动处理 → 及时告警
- 保护核心:保证撮合引擎正常运行
- 用户体验:透明沟通 → 快速恢复
技术栈:
- 限流:Redis + 滑动窗口
- 队列:Kafka + 优先级队列
- 监控:Prometheus + Grafana
- 熔断:Hystrix / Sentinel
- 缓存:Redis + 本地缓存
关键指标:
- CPU 使用率 < 70%
- 内存使用率 < 80%
- P99 延迟 < 100ms
- 错误率 < 0.01%
- 队列积压 < 10000
易错点:
- 限流太严格影响用户体验
- 优先级设计不合理导致不公平
- 降级策略不明确导致混乱
- 忘记恢复导致长期降级