# 数据库锁机制面试指南 ## 1. 行锁、表锁、页锁 ### 行锁(Row Lock) **定义**:锁定数据库表的某一行或多行数据 **特点**: - 锁粒度最细,并发性能最好 - 开销大,需要更多的锁资源 - 适合高并发场景 **InnoDB 行锁类型**: 1. **记录锁**:锁定单行记录 2. **间隙锁**:锁定间隙,防止幻读 3. **临键锁**:记录锁+间隙锁的组合 **代码示例**: ```sql -- 记录锁 SELECT * FROM user WHERE id = 1 FOR UPDATE; -- 间隙锁 SELECT * FROM user WHERE id BETWEEN 1 AND 10 FOR UPDATE; -- 临键锁 SELECT * FROM user WHERE id > 5 FOR UPDATE; ``` **Java 实现**: ```java // 行锁实现示例 public class RowLockExample { @Autowired private JdbcTemplate jdbcTemplate; // 记录锁 public User getUserWithRecordLock(Long userId) { String sql = "SELECT * FROM user WHERE id = ? FOR UPDATE"; return jdbcTemplate.queryForObject(sql, new Object[]{userId}, (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); } // 间隙锁 public List getUsersWithGapLock(String minAge, String maxAge) { String sql = "SELECT * FROM user WHERE age BETWEEN ? AND ? FOR UPDATE"; return jdbcTemplate.query(sql, new Object[]{minAge, maxAge}, (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); } } ``` ### 表锁(Table Lock) **定义**:锁定整个表,所有操作都需要获取表级锁 **特点**: - 锁粒度最粗,并发性能最差 - 开销小,实现简单 - 适用于读多写少或操作简单的场景 **代码示例**: ```sql -- 表锁 LOCK TABLES user WRITE; -- 写锁,阻塞其他所有操作 -- 执行操作 UNLOCK TABLES; -- 释放锁 -- 读锁 LOCK TABLES user READ; -- 读锁,允许并发读,阻塞写 -- 执行读操作 UNLOCK TABLES; -- 释放锁 ``` **Java 实现**: ```java // 表锁实现示例 public class TableLockExample { @Autowired private DataSource dataSource; // 写锁 public void executeWithWriteLock(Runnable operation) { try (Connection conn = dataSource.getConnection()) { // 获取写锁 conn.createStatement().execute("LOCK TABLES user WRITE"); try { // 执行操作 operation.run(); } finally { // 释放锁 conn.createStatement().execute("UNLOCK TABLES"); } } catch (SQLException e) { throw new RuntimeException("执行写锁操作失败", e); } } // 读锁 public List executeWithReadLock(Runnable operation) { try (Connection conn = dataSource.getConnection()) { // 获取读锁 conn.createStatement().execute("LOCK TABLES user READ"); try { // 执行读操作 operation.run(); } finally { // 释放锁 conn.createStatement().execute("UNLOCK TABLES"); } } catch (SQLException e) { throw new RuntimeException("执行读锁操作失败", e); } return Collections.emptyList(); } } ``` ### 页锁(Page Lock) **定义**:锁定数据库表的数据页,介于行锁和表锁之间 **特点**: - 锁粒度居中 - 开销居中 - 适用于中等到高并发场景 **InnoDB 页锁**: - InnoDB 默认使用行锁,页锁主要用于某些特定操作 - 在全表扫描时可能会升级为表锁 **代码示例**: ```sql -- InnoDB 自动管理页锁 -- 通常不需要手动控制 -- 全表扫描时可能触发页锁 SELECT * FROM user WHERE age > 100 FOR UPDATE; ``` ### 锁粒度对比 | 锁类型 | 锁粒度 | 并发性能 | 开销 | 适用场景 | |--------|--------|----------|------|----------| | 表锁 | 最粗 | 最差 | 最小 | 简单操作、维护 | | 页锁 | 中等 | 中等 | 中等 | 中等并发 | | 行锁 | 最细 | 最好 | 最大 | 高并发、高吞吐 | ## 2. 共享锁、排他锁 ### 共享锁(Shared Lock / S Lock) **定义**:多个事务可以同时持有共享锁,但不能持有排他锁 **特点**: - 读锁,允许多个事务同时读取 - 不允许修改数据 - 用于读多写少的场景 **代码示例**: ```sql -- 共享锁语法 SELECT * FROM user WHERE id = 1 LOCK IN SHARE MODE; -- Java 实现 public class SharedLockExample { @Autowired private JdbcTemplate jdbcTemplate; public User getUserWithSharedLock(Long userId) { String sql = "SELECT * FROM user WHERE id = ? LOCK IN SHARE MODE"; return jdbcTemplate.queryForObject(sql, new Object[]{userId}, (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); } public List getUsersWithSharedLock() { String sql = "SELECT * FROM user WHERE age > 18 LOCK IN SHARE MODE"; return jdbcTemplate.query(sql, (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); } } ``` ### 排他锁(Exclusive Lock / X Lock) **定义**:只有一个事务可以持有排他锁,阻止其他事务获取任何锁 **特点**: - 写锁,独占访问 - 阻止其他事务读取和修改 - 用于写操作 **代码示例**: ```sql -- 排他锁语法 SELECT * FROM user WHERE id = 1 FOR UPDATE; -- Java 实现 public class ExclusiveLockExample { @Autowired private JdbcTemplate jdbcTemplate; public User getUserWithExclusiveLock(Long userId) { String sql = "SELECT * FROM user WHERE id = ? FOR UPDATE"; return jdbcTemplate.queryForObject(sql, new Object[]{userId}, (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); } public void updateUserWithExclusiveLock(Long userId, String newName) { String sql = "UPDATE user SET name = ? WHERE id = ?"; jdbcTemplate.update(sql, newName, userId); } } ``` ### 锁兼容性矩阵 | 已有锁 | 请求共享锁 | 请求排他锁 | |--------|------------|------------| | 共享锁 | 兼容 | 不兼容 | | 排他锁 | 不兼容 | 不兼容 | **兼容性图解**: ``` 锁兼容性矩阵 S锁 X锁 S锁 ✓ ✗ X锁 ✗ ✗ ``` ### 锁升级与降级 ```java // 锁升级示例 public class LockUpgradeExample { @Autowired private JdbcTemplate jdbcTemplate; // 锁升级:从共享锁升级到排他锁 public void upgradeLock(Long userId) { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { // 1. 获取共享锁 conn.setAutoCommit(false); User user = getUserWithSharedLock(userId); // 2. 检查是否需要升级 if (needUpgrade(user)) { // 3. 升级为排他锁 try { conn.createStatement().execute( "SELECT * FROM user WHERE id = ? FOR UPDATE"); // 4. 执行更新操作 updateUserWithExclusiveLock(userId, "New Name"); conn.commit(); } catch (SQLException e) { conn.rollback(); throw new RuntimeException("锁升级失败", e); } } else { conn.commit(); } } catch (SQLException e) { throw new RuntimeException("锁操作失败", e); } } // 锁降级示例 public void downgradeLock(Long userId) { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 1. 获取排他锁 User user = getUserWithExclusiveLock(userId); // 2. 降级为共享锁 try { conn.createStatement().execute( "SELECT * FROM user WHERE id = ? LOCK IN SHARE MODE"); // 3. 执行读操作 List users = getUsersWithSharedLock(); conn.commit(); } catch (SQLException e) { conn.rollback(); throw new RuntimeException("锁降级失败", e); } } catch (SQLException e) { throw new RuntimeException("锁操作失败", e); } } } ``` ## 3. 意向锁、间隙锁、临键锁 ### 意向锁(Intention Lock) **定义**:表级锁,表示事务意图获取行级锁 **类型**: 1. **意向共享锁(IS)**:事务意图获取某些行的共享锁 2. **意向排他锁(IX)**:事务意图获取某些行的排他锁 **特点**: - 提高锁效率,避免逐行检查 - 表级锁与行级锁协同工作 **代码示例**: ```sql -- 意向锁(InnoDB 自动管理) -- 不需要手动设置 -- 在获取行锁时自动添加 -- 查看意向锁 SELECT * FROM information_schema.innodb_locks; ``` **Java 实现**: ```java // 意向锁示例 public class IntentionLockExample { @Autowired private JdbcTemplate jdbcTemplate; // 多个事务同时获取行锁,意向锁提高效率 public void multipleRowLocks() { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int userId = i + 1; executor.submit(() -> { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 获取行锁(自动添加意向锁) User user = getUserWithExclusiveLock((long) userId); // 处理数据 Thread.sleep(1000); conn.commit(); } catch (Exception e) { throw new RuntimeException("行锁操作失败", e); } }); } executor.shutdown(); } } ``` ### 间隙锁(Gap Lock) **定义**:锁定索引记录之间的间隙,防止其他事务插入数据 **特点**: - 防止幻读 - 只在 REPEATABLE READ 隔离级别下有效 - 使用间隙锁锁定范围查询 **代码示例**: ```sql -- 间隙锁示例 -- 用户表数据:id: 1, 5, 10, 15, 20 -- 锁定id=5和id=10之间的间隙 SELECT * FROM user WHERE id > 5 AND id < 10 FOR UPDATE; -- 锁定间隙:(5, 10) -- 锁定id>10的间隙 SELECT * FROM user WHERE id > 10 FOR UPDATE; -- 锁定间隙:(10, ∞) -- 防止插入 -- 其他事务无法在(5, 10)之间插入新记录 ``` **Java 实现**: ```java // 间隙锁示例 public class GapLockExample { @Autowired private JdbcTemplate jdbcTemplate; // 间隙锁防止幻读 public void preventPhantomRead() { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 查询并锁定间隙 List users = jdbcTemplate.query( "SELECT * FROM user WHERE age BETWEEN 20 AND 30 FOR UPDATE", (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); // 处理数据 System.out.println("找到 " + users.size() + " 个用户"); // 模拟长时间处理 Thread.sleep(5000); conn.commit(); // 其他事务无法在age 20-30之间插入新记录 } catch (Exception e) { throw new RuntimeException("间隙锁操作失败", e); } } // 优化后的范围查询(避免间隙锁) public void optimizedRangeQuery() { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 使用主键查询避免间隙锁 List users = jdbcTemplate.query( "SELECT * FROM user WHERE age >= 20 AND age <= 30 FOR UPDATE", (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); conn.commit(); } catch (Exception e) { throw new RuntimeException("范围查询失败", e); } } } ``` ### 临键锁(Next-Key Lock) **定义**:记录锁与间隙锁的组合,锁定记录本身及其相邻间隙 **特点**: - 解决幻读问题 - 在 REPEATABLE READ 隔离级别下默认使用 - 锁定记录+前一个间隙 **代码示例**: ```sql -- 临键锁示例 -- 用户表数据:id: 1, 5, 10, 15, 20 -- 锁定id=10及其前后间隙 SELECT * FROM user WHERE id = 10 FOR UPDATE; -- 锁定:(5, 10] + (10, 15) -- 锁定范围查询 SELECT * FROM user WHERE id <= 10 FOR UPDATE; -- 锁定:(-∞, 1] + (1, 5] + (5, 10] + (10, 15) ``` **Java 实现**: ```java // 临键锁示例 public class NextKeyLockExample { @Autowired private JdbcTemplate jdbcTemplate; // 临键锁防止幻读 public void preventPhantomReadWithNextKeyLock() { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 查询并获取临键锁 List users = jdbcTemplate.query( "SELECT * FROM user WHERE age <= 30 FOR UPDATE", (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); return user; }); // 处理数据 System.out.println("找到 " + users.size() + " 个用户"); // 防止其他事务插入age<=30的记录 conn.commit(); } catch (Exception e) { throw new RuntimeException("临键锁操作失败", e); } } // 优化查询避免不必要的锁 public void optimizedQueryWithNextKeyLock() { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 使用主键查询减少锁范围 User user = jdbcTemplate.queryForObject( "SELECT * FROM user WHERE id = ? FOR UPDATE", new Object[]{10L}, (rs, rowNum) -> { User u = new User(); u.setId(rs.getLong("id")); u.setName(rs.getString("name")); u.setAge(rs.getInt("age")); return u; }); conn.commit(); } catch (Exception e) { throw new RuntimeException("查询失败", e); } } } ``` ## 4. 乐观锁、悲观锁 ### 乐观锁(Optimistic Locking) **定义**:假设冲突不常发生,只在更新时检查冲突 **实现方式**: 1. **版本号**:使用版本号字段 2. **时间戳**:使用更新时间 3. **CAS**:Compare And Swap **代码示例**: ```java // 乐观锁实现 @Component public class OptimisticLockService { @Autowired private UserRepository userRepository; // 使用版本号实现乐观锁 @Transactional public User updateUserWithVersion(Long userId, String newName) { User user = userRepository.findById(userId) .orElseThrow(() -> new RuntimeException("用户不存在")); // 尝试更新 int affected = userRepository.updateWithVersion( userId, newName, user.getVersion()); if (affected == 0) { throw new OptimisticLockException("数据已被其他事务修改"); } return userRepository.findById(userId).orElse(null); } // 使用时间戳实现乐观锁 @Transactional public User updateUserWithTimestamp(Long userId, String newName) { User user = userRepository.findById(userId) .orElseThrow(() -> new RuntimeException("用户不存在")); // 尝试更新 int affected = userRepository.updateWithTimestamp( userId, newName, user.getUpdateTime()); if (affected == 0) { throw new OptimisticLockException("数据已被其他事务修改"); } return userRepository.findById(userId).orElse(null); } } // Repository 实现 @Repository public class UserRepository { @Autowired private JdbcTemplate jdbcTemplate; // 带版本号的更新 public int updateWithVersion(Long userId, String newName, Integer version) { String sql = "UPDATE user SET name = ?, version = version + 1 " + "WHERE id = ? AND version = ?"; return jdbcTemplate.update(sql, newName, userId, version); } // 带时间戳的更新 public int updateWithTimestamp(Long userId, String newName, Timestamp timestamp) { String sql = "UPDATE user SET name = ?, update_time = NOW() " + "WHERE id = ? AND update_time = ?"; return jdbcTemplate.update(sql, newName, userId, timestamp); } } ``` ### 悲观锁(Pessimistic Locking) **定义**:假设冲突经常发生,直接加锁 **实现方式**: 1. **行锁**:SELECT ... FOR UPDATE 2. **表锁**:LOCK TABLES 3. **间隙锁**:防止幻读 **代码示例**: ```java // 悲观锁实现 @Component public class PessimisticLockService { @Autowired private UserRepository userRepository; // 使用行锁实现悲观锁 @Transactional public User updateUserWithRowLock(Long userId, String newName) { // 1. 加锁 User user = userRepository.findByIdWithLock(userId) .orElseThrow(() -> new RuntimeException("用户不存在")); // 2. 更新数据 user.setName(newName); user.setUpdateTime(new Date()); // 3. 保存(锁在事务提交时释放) return userRepository.save(user); } // 使用表锁实现悲观锁 @Transactional public void updateUserWithTableLock(Long userId, String newName) { // 1. 获取表锁 jdbcTemplate.update("LOCK TABLES user WRITE"); try { // 2. 执行更新 int affected = userRepository.update(userId, newName); if (affected == 0) { throw new RuntimeException("更新失败"); } // 3. 提交事务时自动释放锁 } finally { // 确保释放锁 jdbcTemplate.update("UNLOCK TABLES"); } } } // Repository 实现 @Repository public class UserRepository { @Autowired private JdbcTemplate jdbcTemplate; // 带锁的查询 public Optional findByIdWithLock(Long userId) { String sql = "SELECT * FROM user WHERE id = ? FOR UPDATE"; return jdbcTemplate.queryForObject(sql, new Object[]{userId}, (rs, rowNum) -> { User user = new User(); user.setId(rs.getLong("id")); user.setName(rs.getString("name")); user.setAge(rs.getInt("age")); user.setVersion(rs.getInt("version")); user.setUpdateTime(rs.getTimestamp("update_time")); return user; }); } // 普通更新 public int update(Long userId, String newName) { String sql = "UPDATE user SET name = ? WHERE id = ?"; return jdbcTemplate.update(sql, newName, userId); } } ``` ### 乐观锁 vs 悲观锁 | 特性 | 乐观锁 | 悲观锁 | |------|--------|--------| | 冲突假设 | 冲突少 | 冲突多 | | 性能 | 读性能好,写有冲突时开销大 | 写性能好,读性能差 | | 适用场景 | 读多写少,冲突少 | 写多,冲突多 | | 实现复杂度 | 简单 | 复杂 | | 锁类型 | 无锁,版本号控制 | 行锁、表锁等 | ### 优化策略 ```java // 锁策略优化 @Service public class LockOptimizationService { @Autowired private UserRepository userRepository; // 根据业务场景选择锁策略 public User updateUser(Long userId, String newName) { // 1. 首先使用乐观锁 try { return updateUserWithVersion(userId, newName); } catch (OptimisticLockException e) { // 2. 乐观锁冲突,使用悲观锁重试 return updateUserWithPessimisticLock(userId, newName); } } // 乐观锁重试机制 @Retryable(value = OptimisticLockException.class, maxAttempts = 3, backoff = @Backoff(delay = 100)) public User updateUserWithVersion(Long userId, String newName) { User user = userRepository.findById(userId) .orElseThrow(() -> new RuntimeException("用户不存在")); int affected = userRepository.updateWithVersion( userId, newName, user.getVersion()); if (affected == 0) { throw new OptimisticLockException("版本冲突"); } return userRepository.findById(userId).orElse(null); } // 悲观锁实现 @Transactional public User updateUserWithPessimisticLock(Long userId, String newName) { User user = userRepository.findByIdWithLock(userId) .orElseThrow(() -> new RuntimeException("用户不存在")); user.setName(newName); return userRepository.save(user); } } ``` ## 5. 死锁的产生和解决 ### 死锁的产生条件 **四个必要条件**: 1. **互斥条件**:资源不能共享 2. **请求和保持**:进程保持资源的同时请求其他资源 3. **不可剥夺**:资源不能被强制释放 4. **循环等待**:形成等待环路 **死锁示例**: ```java // 死锁示例 public class DeadlockExample { @Autowired private JdbcTemplate jdbcTemplate; public void createDeadlock() { // 线程1:获取资源A,等待资源B new Thread(() -> { try (Connection conn1 = jdbcTemplate.getDataSource().getConnection()) { conn1.setAutoCommit(false); // 获取用户表的锁 conn1.createStatement().execute("SELECT * FROM user WHERE id = 1 FOR UPDATE"); System.out.println("线程1获取了用户表锁"); // 等待订单表锁 Thread.sleep(1000); // 尝试获取订单表锁 conn1.createStatement().execute("SELECT * FROM order WHERE id = 1 FOR UPDATE"); System.out.println("线程1获取了订单表锁"); conn1.commit(); } catch (Exception e) { System.out.println("线程1异常: " + e.getMessage()); } }).start(); // 线程2:获取资源B,等待资源A new Thread(() -> { try (Connection conn2 = jdbcTemplate.getDataSource().getConnection()) { conn2.setAutoCommit(false); // 获取订单表的锁 conn2.createStatement().execute("SELECT * FROM order WHERE id = 1 FOR UPDATE"); System.out.println("线程2获取了订单表锁"); // 等待用户表锁 Thread.sleep(1000); // 尝试获取用户表锁 conn2.createStatement().execute("SELECT * FROM user WHERE id = 1 FOR UPDATE"); System.out.println("线程2获取了用户表锁"); conn2.commit(); } catch (Exception e) { System.out.println("线程2异常: " + e.getMessage()); } }).start(); } } ``` ### 死锁检测和监控 **1. MySQL 死锁检测**: ```sql -- 查看死锁信息 SHOW ENGINE INNODB STATUS; -- 查看锁等待 SELECT * FROM information_schema.innodb_lock_waits; ``` **2. 死锁监控实现**: ```java // 死锁监控 @Component public class DeadlockMonitor { @Autowired private JdbcTemplate jdbcTemplate; @Scheduled(fixedRate = 60000) // 每分钟监控一次 public void monitorDeadlocks() { String sql = "SELECT * FROM information_schema.innodb_lock_waits"; try { List> deadlocks = jdbcTemplate.queryForList(sql); if (!deadlocks.isEmpty()) { // 发送告警 notificationService.sendAlert("检测到死锁", deadlocks); // 记录日志 log.error("检测到死锁: {}", deadlocks); } } catch (Exception e) { log.error("死锁监控失败", e); } } // 死锁分析 public void analyzeDeadlock(String deadlockReport) { // 解析死锁报告 DeadlockAnalysis analysis = parseDeadlockReport(deadlockReport); // 生成解决方案建议 List suggestions = generateSuggestions(analysis); // 记录分析结果 log.info("死锁分析结果: {}", suggestions); } } ``` ### 死锁预防策略 **1. 锁排序**: ```java // 锁排序预防死锁 public class LockOrderService { private final Map lockMap = new ConcurrentHashMap<>(); // 使用固定的锁顺序 @Transactional public void transferMoney(String fromAccount, String toAccount, BigDecimal amount) { // 1. 按账户ID排序,确保锁顺序一致 String account1 = fromAccount.compareTo(toAccount) < 0 ? fromAccount : toAccount; String account2 = fromAccount.compareTo(toAccount) < 0 ? toAccount : fromAccount; // 2. 按固定顺序加锁 synchronized (getLock(account1)) { synchronized (getLock(account2)) { // 执行转账 transfer(fromAccount, toAccount, amount); } } } private Object getLock(String account) { return lockMap.computeIfAbsent(account, k -> new Object()); } } ``` **2. 超时机制**: ```java // 超时机制预防死锁 public class TimeoutLockService { @Autowired private DataSource dataSource; @Transactional(timeout = 3) // 事务超时3秒 public void executeWithTimeout(Runnable operation) { try { operation.run(); } catch (TransactionTimedOutException e) { // 超时回滚,释放锁 throw new RuntimeException("操作超时,已回滚", e); } } // 设置锁等待超时 public void executeWithLockTimeout(Runnable operation, long timeout) { try (Connection conn = dataSource.getConnection()) { conn.setAutoCommit(false); // 设置锁等待超时 conn.createStatement().execute( "SET innodb_lock_wait_timeout = " + timeout); try { operation.run(); conn.commit(); } catch (SQLException e) { conn.rollback(); throw new RuntimeException("锁等待超时", e); } } catch (SQLException e) { throw new RuntimeException("数据库操作失败", e); } } } ``` **3. 避免嵌套锁**: ```java // 避免嵌套锁示例 public class NestedLockService { // 不好的示例:嵌套锁 @Transactional public void badNestedLock(Long userId, Long orderId) { // 锁1 User user = userRepository.findByIdWithLock(userId); // 业务逻辑 processUser(user); // 锁2(嵌套锁可能导致死锁) Order order = orderRepository.findByIdWithLock(orderId); processOrder(order); } // 好的示例:统一锁 @Transactional public void goodSingleLock(Long userId, Long orderId) { // 统一获取需要的锁 List users = userRepository.findUsersByIdsWithLock(Arrays.asList(userId)); List orders = orderRepository.findOrdersByIdsWithLock(Arrays.asList(orderId)); // 处理业务逻辑 processUsersAndOrders(users, orders); } } ``` ### 死锁恢复策略 **1. 自动重试**: ```java // 自动重试机制 @Service public class RetryService { @Retryable(value = DeadlockLoserDataAccessException.class, maxAttempts = 3, backoff = @Backoff(delay = 100)) public T executeWithRetry(Callable operation) { return operation.call(); } // 使用示例 @Transactional public User updateUserWithRetry(Long userId, String newName) { return executeWithRetry(() -> { User user = userRepository.findByIdWithLock(userId); user.setName(newName); return userRepository.save(user); }); } } ``` **2. 降级处理**: ```java // 降级处理策略 @Service public class FallbackService { @Autowired private CacheManager cacheManager; @Transactional public User updateUserWithFallback(Long userId, String newName) { try { // 正常更新 User user = userRepository.findByIdWithLock(userId); user.setName(newName); return userRepository.save(user); } catch (Exception e) { // 降级处理:先更新缓存 updateCache(userId, newName); // 异步更新数据库 asyncUpdateDatabase(userId, newName); // 返回缓存中的数据 return getCachedUser(userId); } } private void updateCache(Long userId, String newName) { Cache cache = cacheManager.getCache("userCache"); User user = new User(); user.setId(userId); user.setName(newName); cache.put(userId, user); } private User getCachedUser(Long userId) { Cache cache = cacheManager.getCache("userCache"); return cache.get(userId, User.class); } } ``` ## 6. 实际项目中的锁使用 ### 电商系统锁策略 **1. 库存管理**: ```java // 库存管理锁策略 @Service public class InventoryService { @Autowired private InventoryRepository inventoryRepository; // 乐观锁实现库存扣减 @Transactional public boolean reduceStockWithOptimisticLock(Long productId, int quantity) { Inventory inventory = inventoryRepository.findByProductId(productId) .orElseThrow(() -> new RuntimeException("商品不存在")); // 检查库存 if (inventory.getStock() < quantity) { return false; } // 乐观锁更新 int affected = inventoryRepository.reduceStockWithVersion( productId, quantity, inventory.getVersion()); if (affected == 0) { // 版本冲突,重试或抛出异常 throw new OptimisticLockException("库存已被其他订单占用"); } return true; } // 悲观锁实现库存扣减 @Transactional public boolean reduceStockWithPessimisticLock(Long productId, int quantity) { // 悲观锁防止并发扣减 Inventory inventory = inventoryRepository.findByProductIdWithLock(productId) .orElseThrow(() -> new RuntimeException("商品不存在")); if (inventory.getStock() < quantity) { return false; } inventory.setStock(inventory.getStock() - quantity); inventoryRepository.save(inventory); return true; } // 批量库存扣减 @Transactional public boolean batchReduceStock(List items) { // 使用事务隔离级别避免死锁 try { for (OrderItem item : items) { boolean success = reduceStockWithOptimisticLock( item.getProductId(), item.getQuantity()); if (!success) { // 回滚整个事务 throw new RuntimeException("库存不足"); } } return true; } catch (Exception e) { throw new RuntimeException("批量扣减库存失败", e); } } } ``` **2. 订单处理**: ```java // 订单处理锁策略 @Service public class OrderService { @Autowired private OrderRepository orderRepository; @Autowired private PaymentService paymentService; // 订单创建锁 @Transactional(isolation = Isolation.REPEATABLE_READ) public Order createOrder(OrderDTO orderDTO) { // 1. 检查用户和商品状态 User user = userRepository.findById(orderDTO.getUserId()) .orElseThrow(() -> new RuntimeException("用户不存在")); List products = productRepository.findByIds( orderDTO.getItems().stream() .map(OrderItemDTO::getProductId) .collect(Collectors.toList())); // 2. 创建订单 Order order = new Order(); BeanUtils.copyProperties(orderDTO, order); order.setStatus("PENDING"); order.setCreateTime(new Date()); order = orderRepository.save(order); // 3. 扣减库存(悲观锁) for (OrderItem item : order.getItems()) { boolean success = inventoryService.reduceStockWithPessimisticLock( item.getProductId(), item.getQuantity()); if (!success) { throw new RuntimeException("库存不足"); } } // 4. 预扣金额(乐观锁) try { paymentService.reserveAmount(order.getUserId(), order.getTotalAmount()); } catch (Exception e) { // 回滚库存 rollbackInventory(order.getItems()); throw e; } return order; } // 订单取消锁 @Transactional public void cancelOrder(Long orderId) { Order order = orderRepository.findByIdWithLock(orderId) .orElseThrow(() -> new RuntimeException("订单不存在")); // 检查订单状态 if (!"PENDING".equals(order.getStatus())) { throw new RuntimeException("订单状态不允许取消"); } // 更新订单状态 order.setStatus("CANCELLED"); order.setUpdateTime(new Date()); orderRepository.save(order); // 释放库存 for (OrderItem item : order.getItems()) { inventoryService.releaseStock(item.getProductId(), item.getQuantity()); } // 释放金额 paymentService.releaseAmount(order.getUserId(), order.getTotalAmount()); } } ``` ### 社交系统锁策略 **1. 点赞功能**: ```java // 点赞功能锁策略 @Service public class LikeService { @Autowired private LikeRepository likeRepository; @Autowired private PostRepository postRepository; // 乐观锁实现点赞 @Transactional public LikeDTO addLike(Long postId, Long userId) { // 1. 检查是否已经点赞 boolean alreadyLiked = likeRepository.existsByPostIdAndUserId(postId, userId); if (alreadyLiked) { throw new RuntimeException("已经点赞"); } // 2. 创建点赞记录(乐观锁) Like like = new Like(); like.setPostId(postId); like.setUserId(userId); like.setCreateTime(new Date()); like = likeRepository.save(like); // 3. 更新点赞数(乐观锁) updateLikeCount(postId, 1); return convertToDTO(like); } // 批量点赞 @Transactional public void batchAddLikes(List postIds, Long userId) { for (Long postId : postIds) { addLike(postId, userId); } } // 分布式锁实现点赞 @Transactional public LikeDTO addLikeWithDistributedLock(Long postId, Long userId) { String lockKey = "like_" + postId + "_" + userId; // 获取分布式锁 try { boolean locked = redisLock.tryLock(lockKey, 5, TimeUnit.SECONDS); if (!locked) { throw new RuntimeException("系统繁忙,请稍后重试"); } // 检查是否已经点赞 boolean alreadyLiked = likeRepository.existsByPostIdAndUserId(postId, userId); if (alreadyLiked) { throw new RuntimeException("已经点赞"); } // 创建点赞记录 Like like = new Like(); like.setPostId(postId); like.setUserId(userId); like.setCreateTime(new Date()); like = likeRepository.save(like); // 更新点赞数 updateLikeCount(postId, 1); return convertToDTO(like); } finally { // 释放锁 redisLock.unlock(lockKey); } } } ``` **2. 评论系统**: ```java // 评论系统锁策略 @Service public class CommentService { @Autowired private CommentRepository commentRepository; // 间隙锁防止评论重复 @Transactional(isolation = Isolation.REPEATABLE_READ) public CommentDTO addComment(CommentDTO commentDTO) { // 1. 获取帖子间隙锁 Post post = postRepository.findByIdWithGapLock(commentDTO.getPostId()) .orElseThrow(() -> new RuntimeException("帖子不存在")); // 2. 创建评论 Comment comment = new Comment(); BeanUtils.copyProperties(commentDTO, comment); comment.setCreateTime(new Date()); comment = commentRepository.save(comment); // 3. 更新帖子统计 post.setCommentCount(post.getCommentCount() + 1); postRepository.save(post); return convertToDTO(comment); } // 评论回复锁 @Transactional public CommentDTO replyComment(CommentDTO replyDTO) { // 1. 获取父评论锁 Comment parentComment = commentRepository.findByIdWithLock(replyDTO.getParentId()) .orElseThrow(() -> new RuntimeException("父评论不存在")); // 2. 创建回复 replyDTO.setParentId(parentComment.getId()); replyDTO.setPostId(parentComment.getPostId()); Comment reply = new Comment(); BeanUtils.copyProperties(replyDTO, reply); reply.setCreateTime(new Date()); reply = commentRepository.save(reply); // 3. 更新统计 parentComment.setReplyCount(parentComment.getReplyCount() + 1); commentRepository.save(parentComment); return convertToDTO(reply); } } ``` ### 金融系统锁策略 **1. 转账服务**: ```java // 转账服务锁策略 @Service public class TransferService { @Autowired private AccountRepository accountRepository; // 分布式锁实现转账 @Transactional(isolation = Isolation.SERIALIZABLE) public void transferWithDistributedLock(Long fromAccountId, Long toAccountId, BigDecimal amount) { String lockKey = "transfer_" + Math.min(fromAccountId, toAccountId) + "_" + Math.max(fromAccountId, toAccountId); try { // 获取分布式锁 boolean locked = redisLock.tryLock(lockKey, 10, TimeUnit.SECONDS); if (!locked) { throw new RuntimeException("转账繁忙,请稍后重试"); } transfer(fromAccountId, toAccountId, amount); } finally { // 释放锁 redisLock.unlock(lockKey); } } // 乐观锁实现转账 @Transactional public void transferWithOptimisticLock(Long fromAccountId, Long toAccountId, BigDecimal amount) { // 1. 获取账户信息(带乐观锁) Account fromAccount = accountRepository.findByIdWithVersion(fromAccountId) .orElseThrow(() -> new RuntimeException("转出账户不存在")); Account toAccount = accountRepository.findByIdWithVersion(toAccountId) .orElseThrow(() -> new RuntimeException("转入账户不存在")); // 2. 检查余额 if (fromAccount.getBalance().compareTo(amount) < 0) { throw new RuntimeException("余额不足"); } // 3. 执行转账(乐观锁) boolean success = transferWithVersion(fromAccount, toAccount, amount); if (!success) { throw new RuntimeException("转账失败,可能余额已变动"); } } private boolean transferWithVersion(Account fromAccount, Account toAccount, BigDecimal amount) { try { // 扣减转出账户 fromAccount.setBalance(fromAccount.getBalance().subtract(amount)); // 增加转入账户 toAccount.setBalance(toAccount.getBalance().add(amount)); // 批量更新(乐观锁) accountRepository.saveAll(Arrays.asList(fromAccount, toAccount)); return true; } catch (OptimisticLockException e) { // 版本冲突,重试 return false; } } } ``` **2. 余额查询**: ```java // 余额查询锁策略 @Service public class BalanceQueryService { @Autowired private AccountRepository accountRepository; // 乐观锁实现余额查询 @Transactional(readOnly = true) public BigDecimal getBalance(Long accountId) { Account account = accountRepository.findById(accountId) .orElseThrow(() -> new RuntimeException("账户不存在")); // 使用缓存优化 BigDecimal balance = balanceCache.get(accountId); if (balance == null) { balance = account.getBalance(); balanceCache.put(accountId, balance); } return balance; } // 悲观锁实现余额查询 @Transactional(readOnly = true) public BigDecimal getBalanceWithLock(Long accountId) { Account account = accountRepository.findByIdWithLock(accountId) .orElseThrow(() -> new RuntimeException("账户不存在")); return account.getBalance(); } } ``` ## 7. 阿里 P7 加分项 ### 分布式锁实现 **1. Redis 分布式锁**: ```java // Redis 分布式锁实现 @Component public class RedisLock { @Autowired private RedisTemplate redisTemplate; private static final String LOCK_PREFIX = "lock:"; private static final long DEFAULT_EXPIRE_TIME = 30; // 秒 /** * 获取分布式锁 */ public boolean tryLock(String lockKey, long waitTime, TimeUnit timeUnit) { String key = LOCK_PREFIX + lockKey; long expireTime = DEFAULT_EXPIRE_TIME; try { long startTime = System.currentTimeMillis(); long waitMillis = timeUnit.toMillis(waitTime); while (true) { // 尝试获取锁 Boolean success = redisTemplate.opsForValue().setIfAbsent( key, Thread.currentThread().getName(), expireTime, TimeUnit.SECONDS); if (Boolean.TRUE.equals(success)) { return true; } // 检查是否超时 if (System.currentTimeMillis() - startTime > waitMillis) { return false; } // 避免CPU空转 Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } /** * 释放分布式锁 */ public void unlock(String lockKey) { String key = LOCK_PREFIX + lockKey; redisTemplate.delete(key); } /** * 可重入分布式锁 */ private final ConcurrentHashMap lockCountMap = new ConcurrentHashMap<>(); public boolean tryReentrantLock(String lockKey, long waitTime, TimeUnit timeUnit) { Thread currentThread = Thread.currentThread(); String key = LOCK_PREFIX + lockKey; // 检查是否是当前线程持有锁 Integer count = lockCountMap.get(key); if (count != null && count > 0) { lockCountMap.put(key, count + 1); return true; } // 获取新锁 if (tryLock(lockKey, waitTime, timeUnit)) { lockCountMap.put(key, 1); return true; } return false; } public void releaseReentrantLock(String lockKey) { String key = LOCK_PREFIX + lockKey; Integer count = lockCountMap.get(key); if (count == null) { return; } if (count > 1) { lockCountMap.put(key, count - 1); } else { lockCountMap.remove(key); unlock(lockKey); } } } // Redis 分布式锁使用示例 @Service public class RedisLockService { @Autowired private RedisLock redisLock; @Autowired private InventoryService inventoryService; public boolean reduceStockWithRedisLock(Long productId, int quantity) { String lockKey = "inventory_" + productId; try { // 获取分布式锁 boolean locked = redisLock.tryLock(lockKey, 5, TimeUnit.SECONDS); if (!locked) { return false; } // 扣减库存 return inventoryService.reduceStock(productId, quantity); } finally { // 释放锁 redisLock.unlock(lockKey); } } } ``` **2. Zookeeper 分布式锁**: ```java // Zookeeper 分布式锁实现 @Component public class ZookeeperLock { @Autowired private CuratorFramework curatorFramework; /** * 创建分布式锁 */ public InterProcessMutex createDistributedLock(String lockPath) { return new InterProcessMutex(curatorFramework, lockPath); } /** * 获取锁 */ public boolean tryLock(String lockPath, long timeout, TimeUnit unit) { InterProcessMutex lock = createDistributedLock(lockPath); try { return lock.acquire(timeout, unit); } catch (Exception e) { throw new RuntimeException("获取分布式锁失败", e); } } /** * 释放锁 */ public void releaseLock(String lockPath) { InterProcessMutex lock = createDistributedLock(lockPath); try { lock.release(); } catch (Exception e) { throw new RuntimeException("释放分布式锁失败", e); } } } // Zookeeper 分布式锁使用示例 @Service public class ZookeeperLockService { @Autowired private ZookeeperLock zookeeperLock; @Autowired private PaymentService paymentService; public boolean processPaymentWithZkLock(Long orderId, BigDecimal amount) { String lockPath = "/payment/lock/" + orderId; try { // 获取分布式锁 boolean locked = zookeeperLock.tryLock(lockPath, 10, TimeUnit.SECONDS); if (!locked) { return false; } // 处理支付 return paymentService.processPayment(orderId, amount); } finally { // 释放锁 zookeeperLock.releaseLock(lockPath); } } } ``` ### 高级锁优化策略 **1. 自适应锁策略**: ```java // 自适应锁策略 @Service public class AdaptiveLockService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private RedisLock redisLock; // 根据业务复杂度选择锁策略 public T executeWithAdaptiveLock(Callable operation, String lockKey, BusinessType type) { switch (type) { case SIMPLE: return executeWithoutLock(operation); case NORMAL: return executeWithOptimisticLock(operation); case COMPLEX: return executeWithPessimisticLock(operation, lockKey); case CRITICAL: return executeWithDistributedLock(operation, lockKey); default: return executeWithOptimisticLock(operation); } } private T executeWithoutLock(Callable operation) { try { return operation.call(); } catch (Exception e) { throw new RuntimeException("操作失败", e); } } private T executeWithOptimisticLock(Callable operation) { int retryCount = 0; final int maxRetries = 3; while (retryCount < maxRetries) { try { return operation.call(); } catch (OptimisticLockException e) { retryCount++; if (retryCount >= maxRetries) { throw new RuntimeException("乐观锁重试次数已用尽", e); } // 指数退避 try { Thread.sleep(100 * (1 << retryCount)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException("线程被中断", ie); } } } throw new RuntimeException("操作失败"); } private T executeWithPessimisticLock(Callable operation, String lockKey) { try (Connection conn = jdbcTemplate.getDataSource().getConnection()) { conn.setAutoCommit(false); // 获取悲观锁 conn.createStatement().execute( "SELECT * FROM business_data WHERE key = '" + lockKey + "' FOR UPDATE"); try { T result = operation.call(); conn.commit(); return result; } catch (Exception e) { conn.rollback(); throw e; } } catch (SQLException e) { throw new RuntimeException("悲观锁操作失败", e); } } private T executeWithDistributedLock(Callable operation, String lockKey) { try { boolean locked = redisLock.tryLock(lockKey, 5, TimeUnit.SECONDS); if (!locked) { throw new RuntimeException("获取分布式锁失败"); } try { return operation.call(); } finally { redisLock.unlock(lockKey); } } catch (Exception e) { throw new RuntimeException("分布式锁操作失败", e); } } } // 业务类型枚举 public enum BusinessType { SIMPLE, // 简单操作,不需要锁 NORMAL, // 普通操作,使用乐观锁 COMPLEX, // 复杂操作,使用悲观锁 CRITICAL // 关键操作,使用分布式锁 } ``` **2. 智能锁监控**: ```java // 智能锁监控 @Component public class SmartLockMonitor { @Autowired private MeterRegistry meterRegistry; @Autowired private JdbcTemplate jdbcTemplate; // 监控锁使用情况 @Scheduled(fixedRate = 60000) public void monitorLockUsage() { // 1. 监控数据库锁等待 monitorDatabaseLocks(); // 2. 监控分布式锁等待 monitorDistributedLocks(); // 3. 监控锁超时 monitorLockTimeout(); } private void monitorDatabaseLocks() { String sql = "SELECT COUNT(*) FROM information_schema.innodb_lock_waits"; Integer waitCount = jdbcTemplate.queryForObject(sql, Integer.class); if (waitCount != null && waitCount > 0) { meterRegistry.gauge("database.lock.waits", waitCount); // 发送告警 if (waitCount > 10) { notificationService.sendAlert("数据库锁等待过多", waitCount); } } } private void monitorDistributedLocks() { // 监控 Redis 锁等待 Set keys = redisTemplate.keys("lock:*"); if (keys != null) { meterRegistry.gauge("distributed.lock.count", keys.size()); // 监控锁等待时间 for (String key : keys) { String value = redisTemplate.opsForValue().get(key); if (value != null) { long ttl = redisTemplate.getExpire(key); if (ttl > 0) { meterRegistry.gauge("distributed.lock.ttl", ttl, k -> ttl); } } } } } private void monitorLockTimeout() { // 监控锁超时 String sql = "SELECT COUNT(*) FROM transaction WHERE status = 'timeout'"; Integer timeoutCount = jdbcTemplate.queryForObject(sql, Integer.class); if (timeoutCount != null && timeoutCount > 0) { meterRegistry.gauge("transaction.lock.timeout", timeoutCount); } } // 智能锁推荐 public LockRecommendation recommendLockStrategy(String tableName, String operation) { // 1. 分析表特征 TableFeature feature = analyzeTableFeature(tableName); // 2. 分析操作频率 OperationFrequency frequency = analyzeOperationFrequency(tableName, operation); // 3. 基于规则推荐锁策略 if (feature.isHighWrite() && frequency.isHighFrequency()) { return new LockRecommendation( LockType.OPTIMISTIC, "乐观锁适合高写入频率场景", 3 // 重试次数 ); } else if (feature.isCritical() && frequency.isLowFrequency()) { return new LockRecommendation( LockType.PESSIMISTIC, "悲观锁适合关键数据", 1 ); } else { return new LockRecommendation( LockType.ADAPTIVE, "自适应锁策略", 3 ); } } } ``` ### 总结 数据库锁机制是并发控制的核心,需要根据业务场景选择合适的锁策略: 1. **掌握各种锁类型**:行锁、表锁、页锁、共享锁、排他锁、意向锁、间隙锁、临键锁 2. **理解锁的兼容性**:锁与锁之间的兼容关系 3. **掌握乐观锁和悲观锁**:两种锁策略的优缺点和适用场景 4. **避免死锁**:通过锁排序、超时、避免嵌套锁等方式预防死锁 5. **实际项目应用**:电商、社交、金融等不同场景的锁策略 6. **高级特性**:分布式锁、智能锁监控、自适应锁策略 在实际项目中,需要平衡性能和一致性,选择合适的锁策略,并持续优化锁的使用。