25 KiB
25 KiB
限购库存系统设计
题目描述
设计一个限购库存系统,用于处理商品秒杀、限购等场景。
业务场景:
- 某商品库存有限(如1000件)
- 用户抢购热情高涨,短时间内大量请求
- 每个用户有购买数量限制(如每人最多买3件)
- 需要防止超卖、刷单、恶意攻击
核心需求:
- 保证库存准确性,不能超卖
- 支持高并发,QPS 达到 10万+
- 防止用户刷单、恶意抢购
- 保证系统可用性和数据一致性
- 提供良好的用户体验
问题:
- 如何设计库存扣减方案?
- 如何防止超卖?
- 如何处理高并发?
- 如何防止刷单?
- 如何保证数据一致性?
思路推导
问题分析
核心挑战:
- 超卖问题:库存1000件,卖了1200件
- 性能问题:10万QPS下系统不崩溃
- 公平性问题:防止机器刷单
- 一致性问题:分布式环境下的数据一致性
为什么难?
- 高并发 + 库存扣减 = 严重的锁竞争
- 分布式环境 + 数据一致性 = CAP权衡
- 用户体验 + 安全防护 = 难以平衡
为什么这样思考?
核心原则:性能 > 一致性 > 用户体验
就像演唱会抢票一样:
- 先到先得(性能优先)
- 不允许卖出比座位多的票(准确性)
- 防止黄牛刷票(安全性)
解题思路
核心思想
多级缓存 + 异步扣减 + 分布式锁 + 限流防护
┌─────────────────────────────────────────────────┐
│ 用户请求 │
└──────────────────┬──────────────────────────────┘
│
▼
┌─────────────────┐
│ 1. 限流防护层 │ ← 防止刷单、恶意攻击
│ - 用户限流 │
│ - IP限流 │
│ - 验证码 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 2. 预热层 │ ← 减轻数据库压力
│ - Redis库存 │
│ - 本地缓存 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 3. 防刷层 │ ← 防止机器刷单
│ - 用户行为分析 │
│ - 风控规则 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 4. 库存扣减层 │ ← 核心逻辑
│ - Redis原子操作│
│ - 分布式锁 │
│ - 数据库乐观锁 │
└────────┬─────────┘
│
▼
┌─────────────────┐
│ 5. 订单处理层 │ ← 异步处理
│ - 消息队列 │
│ - 订单落地 │
└─────────────────┘
详细方案
方案一:Redis + Lua 脚本(推荐)
核心思路:利用 Redis 的原子性操作,保证库存扣减的准确性
1.1 数据结构设计
// 库存信息
type StockInfo struct {
ProductID string // 商品ID
TotalStock int64 // 总库存
AvailableStock int64 // 可用库存
SoldStock int64 // 已售库存
Price float64 // 价格
LimitPerUser int // 每人限购
}
// 用户购买记录
type UserPurchase struct {
UserID string
ProductID string
Quantity int
OrderTime time.Time
}
1.2 Redis 数据结构
# 商品库存
product:{productID}:stock = 1000
# 用户购买数量
user:{userID}:product:{productID}:qty = 2
# 购买记录(用于防刷)
product:{productID}:purchase_set = {userID1, userID2, ...}
# 分布式锁
lock:product:{productID}
1.3 Lua 脚本实现
-- 扣减库存的 Lua 脚本
local function deduct_stock(product_id, user_id, quantity)
-- 1. 获取商品库存
local stock_key = "product:" .. product_id .. ":stock"
local stock = tonumber(redis.call('GET', stock_key))
-- 2. 检查库存是否充足
if stock < quantity then
return {err = "insufficient_stock", remaining = stock}
end
-- 3. 获取用户已购买数量
local user_qty_key = "user:" .. user_id .. ":product:" .. product_id .. ":qty"
local user_qty = tonumber(redis.call('GET', user_qty_key)) or 0
-- 4. 检查是否超过限购
local limit_key = "product:" .. product_id .. ":limit"
local limit_per_user = tonumber(redis.call('GET', limit_key)) or 1
if user_qty + quantity > limit_per_user then
return {err = "exceed_limit", user_qty = user_qty, limit = limit_per_user}
end
-- 5. 扣减库存(原子操作)
redis.call('DECRBY', stock_key, quantity)
-- 6. 更新用户购买数量
redis.call('INCRBY', user_qty_key, quantity)
redis.call('EXPIRE', user_qty_key, 3600) -- 1小时过期
-- 7. 记录购买用户
local purchase_set = "product:" .. product_id .. ":purchase_set"
redis.call('SADD', purchase_set, user_id)
redis.call('EXPIRE', purchase_set, 86400) -- 24小时过期
return {ok = true, remaining = stock - quantity}
end
-- 执行脚本
return deduct_stock(KEYS[1], ARGV[1], ARGV[2])
1.4 Go 实现
type StockService struct {
redis *redis.Client
script *redis.Script
}
func NewStockService(redis *redis.Client) *StockService {
script := redis.NewScript(`
local product_id = KEYS[1]
local user_id = ARGV[1]
local quantity = tonumber(ARGV[2])
local stock_key = "product:" .. product_id .. ":stock"
local stock = tonumber(redis.call("GET", stock_key))
if stock == nil then
return {err = "product_not_found"}
end
if stock < quantity then
return {err = "insufficient_stock", remaining = stock}
end
local user_qty_key = "user:" .. user_id .. ":product:" .. product_id .. ":qty"
local user_qty = tonumber(redis.call("GET", user_qty_key)) or 0
local limit_key = "product:" .. product_id .. ":limit"
local limit_per_user = tonumber(redis.call("GET", limit_key)) or 1
if user_qty + quantity > limit_per_user then
return {err = "exceed_limit", user_qty = user_qty, limit = limit_per_user}
end
redis.call("DECRBY", stock_key, quantity)
redis.call("INCRBY", user_qty_key, quantity)
redis.call("EXPIRE", user_qty_key, 3600)
local purchase_set = "product:" .. product_id .. ":purchase_set"
redis.call("SADD", purchase_set, user_id)
redis.call("EXPIRE", purchase_set, 86400)
return {ok = true, remaining = stock - quantity}
`)
return &StockService{
redis: redis,
script: script,
}
}
func (s *StockService) DeductStock(ctx context.Context, productID, userID string, quantity int) (*StockResult, error) {
keys := []string{productID}
values := []interface{}{userID, quantity}
result, err := s.script.Run(ctx, s.redis, keys, values).Result()
if err != nil {
return nil, err
}
// 解析结果
resultMap, ok := result.(map[string]interface{})
if !ok {
return nil, errors.New("invalid result type")
}
stockResult := &StockResult{}
if errMsg, exists := resultMap["err"]; exists {
stockResult.Error = errMsg.(string)
if errMsg == "insufficient_stock" {
stockResult.Remaining = int(resultMap["remaining"].(int64))
}
} else if _, exists := resultMap["ok"]; exists {
stockResult.Success = true
stockResult.Remaining = int(resultMap["remaining"].(int64))
}
return stockResult, nil
}
type StockResult struct {
Success bool
Error string
Remaining int
}
方案二:数据库乐观锁
核心思路:利用数据库的原子性和行锁,保证数据一致性
2.1 表设计
-- 商品库存表
CREATE TABLE products (
product_id BIGINT PRIMARY KEY,
total_stock INT NOT NULL,
available_stock INT NOT NULL,
version INT NOT NULL DEFAULT 0, -- 乐观锁版本号
limit_per_user INT NOT NULL DEFAULT 1,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_available_stock (available_stock)
) ENGINE=InnoDB;
-- 用户购买记录表
CREATE TABLE user_purchases (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_user_product (user_id, product_id), -- 防止重复购买
INDEX idx_product (product_id)
) ENGINE=InnoDB;
-- 订单表
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
status TINYINT NOT NULL DEFAULT 0, -- 0:待支付, 1:已支付, 2:已取消
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_product (user_id, product_id),
INDEX idx_status (status)
) ENGINE=InnoDB;
2.2 乐观锁实现
func (s *StockService) DeductStockWithOptimisticLock(ctx context.Context, productID, userID int64, quantity int) error {
return s.db.Transaction(ctx, func(tx *gorm.DB) error {
// 1. 查询商品信息(带锁)
var product Product
err := tx.Set("gorm:query_option", "FOR UPDATE"). // 行锁
Where("product_id = ?", productID).
First(&product).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errors.New("product not found")
}
return err
}
// 2. 检查库存
if product.AvailableStock < quantity {
return errors.New("insufficient stock")
}
// 3. 检查用户购买数量
var totalPurchase int64
err = tx.Model(&UserPurchase{}).
Where("user_id = ? AND product_id = ?", userID, productID).
Select("COALESCE(SUM(quantity), 0)").
Scan(&totalPurchase).Error
if err != nil {
return err
}
if int(totalPurchase)+quantity > product.LimitPerUser {
return errors.New("exceed purchase limit")
}
// 4. 扣减库存(乐观锁)
result := tx.Model(&Product{}).
Where("product_id = ? AND available_stock >= ? AND version = ?",
productID, quantity, product.Version).
Updates(map[string]interface{}{
"available_stock": gorm.Expr("available_stock - ?", quantity),
"version": gorm.Expr("version + 1"),
})
if result.RowsAffected == 0 {
return errors.New("stock update failed, please retry")
}
// 5. 记录购买信息
purchase := &UserPurchase{
UserID: userID,
ProductID: productID,
Quantity: quantity,
}
if err := tx.Create(purchase).Error; err != nil {
return err
}
return nil
})
}
问题:数据库压力大,不适合超高并发场景
方案三:分布式锁 + Redis
核心思路:使用分布式锁保证并发安全
3.1 Redis 分布式锁
type DistributedLock struct {
redis *redis.Client
key string
value string
expire time.Duration
}
func (d *DistributedLock) Lock(ctx context.Context) error {
// 使用 SETNX 实现
ok, err := d.redis.SetNX(ctx, d.key, d.value, d.expire).Result()
if err != nil {
return err
}
if !ok {
return errors.New("lock already held")
}
return nil
}
func (d *DistributedLock) Unlock(ctx context.Context) error {
// 使用 Lua 脚本保证原子性
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
end
return 0
`
_, err := d.redis.Eval(ctx, script, []string{d.key}, []string{d.value}).Result()
return err
}
// 使用分布式锁扣减库存
func (s *StockService) DeductStockWithLock(ctx context.Context, productID, userID string, quantity int) error {
lockKey := fmt.Sprintf("lock:product:%s", productID)
lock := &DistributedLock{
redis: s.redis,
key: lockKey,
value: userID,
expire: 5 * time.Second,
}
// 获取锁
if err := lock.Lock(ctx); err != nil {
return err
}
defer lock.Unlock(ctx)
// 扣减库存逻辑
stockKey := fmt.Sprintf("product:%s:stock", productID)
result, err := s.redis.DecrBy(ctx, stockKey, quantity).Result()
if err != nil {
return err
}
if result < 0 {
return errors.New("insufficient stock")
}
return nil
}
方案四:限流防护
核心思路:在扣减库存前进行多层限流
4.1 用户级限流
type RateLimiter struct {
redis *redis.Client
}
// 滑动窗口限流
func (r *RateLimiter) AllowUser(ctx context.Context, userID string, maxRequests int, window time.Duration) bool {
key := fmt.Sprintf("ratelimit:user:%s", userID)
now := time.Now().UnixNano()
windowStart := now - window.Nanoseconds()
// 使用 Redis Sorted Set 实现滑动窗口
pipe := r.redis.Pipeline()
// 移除窗口外的记录
pipe.ZRemRangeByScore(ctx, key, "0", strconv.FormatInt(windowStart, 10))
// 添加当前请求
pipe.ZAdd(ctx, key, redis.Z{Score: float64(now), Member: userID})
// 设置过期时间
pipe.Expire(ctx, key, window)
// 统计窗口内请求数
pipe.ZCard(ctx, key)
results, err := pipe.Exec(ctx)
if err != nil {
return false
}
count := results[3].(*redis.IntCmd).Val()
return count <= int64(maxRequests)
}
4.2 验证码
func (s *StockService) VerifyCaptcha(ctx context.Context, userID, ticket, captcha string) bool {
key := fmt.Sprintf("captcha:%s:%s", userID, ticket)
stored, err := s.redis.Get(ctx, key).Result()
if err != nil {
return false
}
// 验证码一次性使用
s.redis.Del(ctx, key)
return stored == captcha
}
方案五:防刷策略
核心思路:多维度识别和防止机器刷单
5.1 用户行为分析
type AntiSpamService struct {
redis *redis.Client
}
// 检查用户行为风险
func (a *AntiSpamService) CheckUserRisk(ctx context.Context, userID string) (risk float64, reason string) {
risk := 0.0
// 1. 检查购买频率
purchaseCount, _ := a.redis.Get(ctx, fmt.Sprintf("user:%s:purchase_count:1h", userID)).Int()
if purchaseCount > 100 {
risk += 0.5
reason = "高频购买"
}
// 2. 检查注册时间
registerTime, _ := a.redis.Get(ctx, fmt.Sprintf("user:%s:register_time", userID)).Int64()
if time.Now().Unix()-registerTime < 86400 { // 注册不到1天
risk += 0.3
if reason == "" {
reason = "新用户"
}
}
// 3. 检查设备指纹
deviceCount, _ := a.redis.Get(ctx, fmt.Sprintf("user:%s:device_count", userID)).Int()
if deviceCount > 5 {
risk += 0.4
if reason == "" {
reason = "多设备"
}
}
// 4. 检查IP关联账号数
ipKey := fmt.Sprintf("ip:%s:user_count", getUserIP(ctx))
ipUserCount, _ := a.redis.Get(ctx, ipKey).Int()
if ipUserCount > 10 {
risk += 0.3
if reason == "" {
reason = "同IP多账号"
}
}
return risk, reason
}
// 是否允许购买
func (a *AntiSpamService) AllowPurchase(ctx context.Context, userID string) (bool, string) {
risk, reason := a.CheckUserRisk(ctx, userID)
if risk > 0.7 {
// 高风险:直接拒绝
return false, "高风险用户:" + reason
}
if risk > 0.4 {
// 中风险:要求验证码
return false, "需要验证码"
}
return true, ""
}
5.2 黑名单机制
type BlacklistService struct {
redis *redis.Client
}
// 检查是否在黑名单
func (b *BlacklistService) IsBlacklisted(ctx context.Context, userID string) (bool, string) {
// 1. 用户黑名单
if exists, _ := b.redis.SIsMember(ctx, "blacklist:user", userID).Result(); exists {
return true, "用户黑名单"
}
// 2. IP黑名单
ip := getUserIP(ctx)
if exists, _ := b.redis.SIsMember(ctx, "blacklist:ip", ip).Result(); exists {
return true, "IP黑名单"
}
// 3. 设备黑名单
deviceID := getDeviceID(ctx)
if exists, _ := b.redis.SIsMember(ctx, "blacklist:device", deviceID).Result(); exists {
return true, "设备黑名单"
}
return false, ""
}
方案六:库存预加载与预热
核心思路:提前将库存加载到 Redis,减轻数据库压力
6.1 预热库存
func (s *StockService) PreloadStock(ctx context.Context, productID string) error {
// 1. 从数据库加载库存
var product Product
err := s.db.Where("product_id = ?", productID).First(&product).Error
if err != nil {
return err
}
// 2. 加载到 Redis
stockKey := fmt.Sprintf("product:%s:stock", productID)
limitKey := fmt.Sprintf("product:%s:limit", productID)
pipe := s.redis.Pipeline()
pipe.Set(ctx, stockKey, product.AvailableStock, 24*time.Hour)
pipe.Set(ctx, limitKey, product.LimitPerUser, 24*time.Hour)
// 3. 加载用户购买记录
var purchases []UserPurchase
s.db.Where("product_id = ? AND created_at > ?", productID, time.Now().Add(-24*time.Hour)).
Find(&purchases)
for _, purchase := range purchases {
userQtyKey := fmt.Sprintf("user:%d:product:%s:qty", purchase.UserID, productID)
pipe.Set(ctx, userQtyKey, purchase.Quantity, time.Hour)
}
_, err = pipe.Exec(ctx)
return err
}
// 批量预热多个商品
func (s *StockService) PreloadStocks(ctx context.Context, productIDs []string) error {
for _, productID := range productIDs {
if err := s.PreloadStock(ctx, productID); err != nil {
log.Errorf("Failed to preload stock for product %s: %v", productID, err)
}
}
return nil
}
6.2 定时同步
// 定时将 Redis 中的数据同步回数据库
func (s *StockService) SyncToDatabase(ctx context.Context) error {
// 1. 获取所有商品库存
keys, _, err := s.redis.Scan(ctx, "product:*:stock").Result()
if err != nil {
return err
}
for _, key := range keys {
// 2. 获取 Redis 中的库存
stock, _ := s.redis.Get(ctx, key).Int()
// 3. 提取 productID
productID := strings.TrimPrefix(key, "product:")
productID = strings.TrimSuffix(productID, ":stock")
// 4. 更新数据库
s.db.Model(&Product{}).
Where("product_id = ?", productID).
Update("available_stock", stock)
}
return nil
}
方案七:订单异步处理
核心思路:库存扣减后,异步创建订单
7.1 消息队列
type OrderMessage struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Timestamp time.Time `json:"timestamp"`
}
func (s *StockService) DeductStockAsync(ctx context.Context, productID, userID string, quantity int) (*OrderResult, error) {
// 1. 扣减库存(同步)
result, err := s.DeductStock(ctx, productID, userID, quantity)
if err != nil {
return nil, err
}
// 2. 发送订单创建消息(异步)
orderID := generateOrderID()
orderMsg := &OrderMessage{
OrderID: orderID,
UserID: userID,
ProductID: productID,
Quantity: quantity,
Timestamp: time.Now(),
}
msgBytes, _ := json.Marshal(orderMsg)
if err := s.kafka.SendMessage("orders", msgBytes); err != nil {
// 消息发送失败,回滚库存
s.rollbackStock(ctx, productID, userID, quantity)
return nil, err
}
return &OrderResult{
OrderID: orderID,
Success: true,
}, nil
}
func (s *StockService) rollbackStock(ctx context.Context, productID, userID string, quantity int) {
stockKey := fmt.Sprintf("product:%s:stock", productID)
userQtyKey := fmt.Sprintf("user:%s:product:%s:qty", userID, productID)
pipe := s.redis.Pipeline()
pipe.IncrBy(ctx, stockKey, quantity)
pipe.DecrBy(ctx, userQtyKey, quantity)
pipe.Exec(ctx)
}
实战案例
案例1:秒杀活动
场景:
- 商品:iPhone 15 Pro,库存1000台
- 限购:每人1台
- 预估QPS:10万
处理流程:
1. 活动前预热(T-10分钟)
├─ 预热库存到 Redis
├─ 预热用户购买记录
└─ 启动监控告警
2. 活动开始(T=0)
├─ 10万QPS 请求涌入
├─ 第一层:用户限流(每秒1次)
│ └─ 拦截 90% 请求 → 剩余 1万QPS
├─ 第二层:验证码验证
│ └─ 拦截 50% 请求 → 剩余 5千QPS
├─ 第三层:Redis Lua 扣减库存
│ └─ 原子操作,保证准确
└─ 第四层:异步创建订单
└─ Kafka 消息队列
3. 活动进行中(T+1分钟)
├─ 库存:1000 → 0
├─ Redis 压力:正常
├─ 数据库压力:低(异步写入)
└─ 订单队列:5000 个待处理
4. 活动结束后
├─ 停止接收新订单
├─ 处理剩余订单
├─ 同步数据到数据库
└─ 生成销售报表
案例2:黄牛刷单检测
场景:
- 某用户使用脚本在1秒内发起100次请求
- IP地址相同
- 设备指纹相同
处理流程:
1. 监控检测异常
├─ 用户购买频率:100次/秒
├─ IP地址:单一来源
└─ 触发告警
2. 风控规则匹配
├─ 规则1:1分钟内购买>10次 → 高风险
├─ 规则2:同IP多账号>5个 → 高风险
└─ 规则3:新用户大额购买 → 中风险
3. 自动处理
├─ 加入黑名单(24小时)
├─ 冻结订单
├─ 释放库存
└─ 通知风控团队
4. 人工审核
├─ 检查用户历史行为
├─ 确认是否为恶意刷单
└─ 决定是否永久封禁
P7 加分项
深度理解
-
为什么选择 Redis 而不是数据库?
- 性能:Redis QPS 可达 10万+,MySQL 只有几千
- 原子性:Redis Lua 脚本保证原子操作
- 轻量级:内存操作,比磁盘IO快得多
-
如何保证 Redis 和数据库一致性?
- 最终一致性:允许短暂的不一致
- 定时同步:定时将 Redis 数据同步到数据库
- 对账系统:定时比对 Redis 和数据库数据
- 补偿机制:发现不一致时自动修复
-
如何处理网络分区?
- CAP权衡:选择 AP(可用性 + 分区容错)
- Redis Cluster:多主复制,保证高可用
- 降级策略:Redis 不可用时降级为数据库
实战扩展
相关技术:
- Redis:缓存、分布式锁
- Kafka:消息队列、异步处理
- Hystrix:熔断器
- Sentinel:限流熔断
- Prometheus:监控告警
最佳实践:
- 预热:提前加载库存到 Redis
- 限流:多层限流保护
- 异步:订单异步处理
- 监控:实时监控告警
- 降级:异常情况降级处理
变体问题
-
如何支持多种商品同时秒杀?
- 为每个商品单独设置库存 key
- 使用 Redis Cluster 分片
- 不同商品使用不同的 key 前缀
-
如何处理库存回滚?
- 订单超时未支付自动回滚
- 使用延迟消息队列
- 设置订单 TTL
-
如何支持预约抢购?
- 预售阶段:只允许预约,不扣减库存
- 抢购阶段:按预约顺序扣减库存
- 使用 Sorted Set 实现排队
-
如何防止黄牛倒卖?
- 实名制购买
- 限制转卖时间(如7天内不可转卖)
- 价格波动策略(价格逐步上涨)
总结
核心要点:
- Redis + Lua:原子操作,保证库存准确性
- 多层限流:防止系统过载
- 异步处理:订单异步创建,提高性能
- 防刷策略:多维度识别刷单行为
- 预热机制:提前加载库存,减轻压力
技术栈:
- 缓存:Redis(库存、用户记录)
- 消息队列:Kafka(异步订单)
- 限流:Redis + 滑动窗口
- 分布式锁:Redis SETNX
- 监控:Prometheus + Grafana
关键指标:
- QPS:10万+
- 库存准确性:100%
- 响应时间:P99 < 100ms
- 系统可用性:99.99%
易错点:
- 忘记使用 Lua 脚本导致并发问题
- 过度依赖数据库导致性能瓶颈
- 忽视防刷策略导致黄牛刷单
- 没有降级预案导致系统雪崩
- 忘记异步处理导致响应慢