Files
interview/questions/01-分布式系统/事务隔离级别.md
yasinshaw 0e46a367c4 refactor: rename files to Chinese and organize by category
Organized 50 interview questions into 12 categories:
- 01-分布式系统 (9 files): 分布式事务, 分布式锁, 一致性哈希, CAP理论, etc.
- 02-数据库 (2 files): MySQL索引优化, MyBatis核心原理
- 03-缓存 (5 files): Redis数据结构, 缓存问题, LRU算法, etc.
- 04-消息队列 (1 file): RocketMQ/Kafka
- 05-并发编程 (4 files): 线程池, 设计模式, 限流策略, etc.
- 06-JVM (1 file): JVM和垃圾回收
- 07-系统设计 (8 files): 秒杀系统, 短链接, IM, Feed流, etc.
- 08-算法与数据结构 (4 files): B+树, 红黑树, 跳表, 时间轮
- 09-网络与安全 (3 files): TCP/IP, 加密安全, 性能优化
- 10-中间件 (4 files): Spring Boot, Nacos, Dubbo, Nginx
- 11-运维 (4 files): Kubernetes, CI/CD, Docker, 可观测性
- 12-面试技巧 (1 file): 面试技巧和职业规划

All files renamed to Chinese for better accessibility and
organized into categorized folders for easier navigation.

Generated with [Claude Code](https://claude.com/claude-code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-01 00:10:53 +08:00

43 KiB
Raw Blame History

数据库事务隔离级别面试指南

1. ACID 特性

ACID 四大特性

1. 原子性Atomicity

  • 事务是一个不可分割的工作单位
  • 事务中的操作要么全部成功,要么全部失败
  • 需要事务日志Undo Log实现

2. 一致性Consistency

  • 事务必须使数据库从一个一致性状态变到另一个一致性状态
  • 数据库的完整性约束不被破坏
  • 需要业务逻辑和数据库约束保证

3. 隔离性Isolation

  • 多个并发事务之间相互隔离
  • 一个事务的执行不能被其他事务干扰
  • 通过锁机制和MVCC实现

4. 持久性Durability

  • 事务一旦提交,其对数据库的修改就是永久性的
  • 即使系统发生故障,修改也不会丢失
  • 通过Redo Log实现

ACID 实现原理

// ACID 在数据库中的实现示例
public class TransactionACIDExample {

    // 1. 原子性实现
    public void atomicTransfer(String fromAccount, String toAccount, BigDecimal amount) {
        try {
            // 开启事务
            beginTransaction();

            // 执行转账操作
            deductAmount(fromAccount, amount);
            addAmount(toAccount, amount);

            // 提交事务
            commitTransaction();

        } catch (Exception e) {
            // 回滚事务
            rollbackTransaction();
            throw new TransactionException("Transfer failed", e);
        }
    }

    // 2. 一致性实现
    public void consistentUpdate() {
        // 事务前检查
        if (!checkBusinessRule()) {
            throw new BusinessRuleException("Business rule violated");
        }

        try {
            beginTransaction();

            // 执行更新
            updateData();

            // 事务后检查
            if (!checkBusinessRule()) {
                rollbackTransaction();
                throw new BusinessRuleException("Business rule violated");
            }

            commitTransaction();
        } catch (Exception e) {
            rollbackTransaction();
            throw e;
        }
    }

    // 3. 隔离性实现
    public void isolatedOperation() {
        // 设置隔离级别
        setTransactionIsolation(Isolation.READ_COMMITTED);

        try {
            beginTransaction();

            // 执行操作
            performOperation();

            commitTransaction();
        } catch (Exception e) {
            rollbackTransaction();
            throw e;
        }
    }

    // 4. 持久性实现
    public void durableOperation() {
        try {
            beginTransaction();

            // 执行操作
            performOperation();

            // 提交到内存
            commitTransaction();

            // 刷入磁盘
            forceWriteToDisk();

        } catch (Exception e) {
            rollbackTransaction();
            throw e;
        }
    }
}

事务日志实现

Undo Log回滚日志

-- Undo Log 结构
CREATE TABLE undo_log (
    id BIGINT AUTO_INCREMENT,
    branch_id VARCHAR(64) NOT NULL,
    xid VARCHAR(100) NOT NULL,
    rollback_info LONGBLOB NOT NULL,
    log_status INT NOT NULL,
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log_branch_id (xid, branch_id)
);

-- Undo Log 示例
INSERT INTO undo_log VALUES (
    1,
    'branch_id_001',
    'xid_123456',
    '{"before": {"name": "Alice", "age": 25}, "after": {"name": "Alice", "age": 26}}',
    1,
    NOW(),
    NOW()
);

Redo Log重做日志

-- Redo Log 结构
CREATE TABLE redo_log (
    id BIGINT AUTO_INCREMENT,
    xid VARCHAR(100) NOT NULL,
    branch_id VARCHAR(64) NOT NULL,
    log_data LONGBLOB NOT NULL,
    log_status INT NOT NULL,
    create_time DATETIME NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_redo_log_branch_id (xid, branch_id)
);

-- Redo Log 示例
INSERT INTO redo_log VALUES (
    1,
    'xid_123456',
    'branch_id_001',
    '{"sql": "UPDATE user SET age = 26 WHERE id = 1", "before": "25", "after": "26"}',
    1,
    NOW()
);

2. 四种隔离级别

读未提交Read Uncommitted

特点

  • 最低的隔离级别
  • 可能读到未提交的数据(脏读)
  • 性能最好,一致性最差

问题演示

-- 会话1事务A
BEGIN;
UPDATE user SET balance = balance - 100 WHERE id = 1;  -- 余额900

-- 会话2事务B
BEGIN;
SELECT balance FROM user WHERE id = 1;  -- 读取到900脏读
-- 事务A未提交事务B读取到未提交的数据

-- 会话1事务A
ROLLBACK;  -- 回滚余额恢复为1000

-- 会话2事务B
SELECT balance FROM user WHERE id = 1;  -- 余额1000数据不一致

MySQL 配置

-- 设置隔离级别
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

-- 查看当前隔离级别
SELECT @@transaction_isolation;

-- 全局设置
SET GLOBAL transaction_isolation = 'READ-UNCOMMITTED';

读已提交Read Committed

特点

  • 只能读取到已提交的数据
  • 不会出现脏读
  • 可能出现不可重复读

问题演示

-- 会话1事务A
BEGIN;
SELECT balance FROM user WHERE id = 1;  -- 余额1000

-- 会话2事务B
BEGIN;
UPDATE user SET balance = balance - 100 WHERE id = 1;  -- 余额900
COMMIT;  -- 提交

-- 会话1事务A
SELECT balance FROM user WHERE id = 1;  -- 余额900不可重复读
-- 同一事务中两次读取结果不同

MySQL 配置

-- 设置隔离级别
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- Oracle、SQL Server 默认隔离级别

可重复读Repeatable Read

特点

  • 同一事务中多次读取数据结果一致
  • 不会出现脏读和不可重复读
  • 可能出现幻读

问题演示

-- 会话1事务A
BEGIN;
SELECT * FROM order WHERE user_id = 1 AND status = 'pending';  -- 0条记录

-- 会话2事务B
BEGIN;
INSERT INTO order (user_id, amount, status) VALUES (1, 100, 'pending');
COMMIT;  -- 提交

-- 会话1事务A
SELECT * FROM order WHERE user_id = 1 AND status = 'pending';  -- 1条记录幻读
-- 同一事务中两次查询结果不同

MySQL 配置

-- 设置隔离级别
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- MySQL 默认隔离级别

串行化Serializable

特点

  • 最高隔离级别
  • 事务串行执行
  • 完全隔离,性能最差
  • 不会出现任何问题

演示

-- 设置隔离级别
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- 事务串行执行
-- 性能最差,但一致性最好

隔离级别对比表

隔离级别 脏读 不可重复读 幻读 并发性能 适用场景
读未提交 可能 可能 可能 最高 不重要数据
读已提交 不可能 可能 可能 普通业务
可重复读 不可能 不可能 可能 重要业务
串行化 不可能 不可能 不可能 极端重要

3. MVCC 原理

MVCC 基本概念

MVCCMulti-Version Concurrency Control

  • 多版本并发控制
  • 通过数据版本号实现并发控制
  • 读写不阻塞

核心组件

  • 隐藏字段DB_TRX_ID事务IDDB_ROLL_PTR(回滚指针)
  • 版本链:通过回滚指针连接历史版本
  • Read View:读视图,用于确定可见性

MVCC 实现原理

1. 数据行结构

-- InnoDB 行结构示意图
┌─────────┬─────────────────┬─────────────────┬─────────────────┐
| Row Header | Transaction ID | Rollback Pointer | Column Data     
└─────────┴─────────────────┴─────────────────┴─────────────────┘

2. 版本链构建

-- 版本链构建过程
-- 初始状态
UPDATE user SET name = 'Alice' WHERE id = 1;  -- TRX_ID = 100
┌─────────┬─────────────────┬─────────────────┬─────────────────┐
| Row Header | TRX_ID: 100     | NULL            | name: 'Alice'    
└─────────┴─────────────────┴─────────────────┴─────────────────┘

-- 更新
UPDATE user SET name = 'Alice Smith' WHERE id = 1;  -- TRX_ID = 200
┌─────────┬─────────────────┬─────────────────┬─────────────────┐
| Row Header | TRX_ID: 200     | PTR  TRX_ID:100 | name: 'Alice Smith'
└─────────┴─────────────────┴─────────────────┴─────────────────┘

3. Read View 机制

// Read View 实现
public class ReadView {
    private long creatorTrxId;      // 创建ReadView的事务ID
    private long[] trxIds;          // 活跃事务ID列表
    private long upLimitId;         // 最小活跃事务ID
    private long lowLimitId;        // 最大活跃事务ID + 1
    private long creatorTrxId;      // 创建ReadView的事务ID

    public boolean isVisible(long trxId, long rollPointer) {
        // 1. 如果当前事务在创建ReadView之后不可见
        if (trxId >= lowLimitId) {
            return false;
        }

        // 2. 如果事务在活跃列表中,不可见
        if (isInActiveTrxIds(trxId)) {
            return false;
        }

        // 3. 如果事务在创建ReadView之前且已提交可见
        return trxId < upLimitId;
    }
}

MVCC 工作流程

1. 读操作流程

// MVCC 读操作
public class MVCCReader {
    public Object read(Object row, long trxId) {
        // 1. 创建ReadView
        ReadView readView = createReadView(trxId);

        // 2. 从最新版本开始遍历
        while (row != null) {
            long rowTrxId = getRowTrxId(row);

            // 3. 判断可见性
            if (readView.isVisible(rowTrxId, getRowRollPointer(row))) {
                return row;
            }

            // 4. 沿着回滚指针查找历史版本
            row = getHistoricalVersion(row);
        }

        // 5. 未找到合适版本返回null
        return null;
    }
}

2. 写操作流程

// MVCC 写操作
public class MVCCWriter {
    public void update(Object row, Object newValue, long trxId) {
        // 1. 创建新版本
        Object newRow = createNewVersion(row, newValue, trxId);

        // 2. 更新版本链
        Object current = getLatestVersion(row);
        setRowRollPointer(current, newRow);

        // 3. 更新数据行
        updateRowData(row, newValue);
    }
}

MySQL MVCC 实现细节

1. 隐藏字段

-- 查看隐藏字段
SHOW CREATE TABLE user;
-- 输出包含:
--   `DB_TRX_ID` 6 byte
--   `DB_ROLL_PTR` 7 byte
--   `DB_ROW_ID` 6 byte

-- 隐藏字段含义:
-- DB_TRX_ID: 最近修改该行的事务ID
-- DB_ROLL_PTR: 回滚指针,指向该行的历史版本
-- DB_ROW_ID: 行ID6字节

2. Undo Log 结构

-- Undo Log 示例
CREATE TABLE undo_log (
    id BIGINT AUTO_INCREMENT,
    branch_id VARCHAR(64) NOT NULL,
    xid VARCHAR(100) NOT NULL,
    rollback_info LONGBLOB NOT NULL,
    log_status INT NOT NULL,
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log_branch_id (xid, branch_id)
);

-- INSERT 操作的 Undo Log
-- 记录插入前的数据(实际上是空,因为插入前没有数据)
-- UPDATE 操作的 Undo Log
-- 记录更新前的数据
-- DELETE 操作的 Undo Log
-- 记录被删除的数据

3. MVCC 演示

-- 创建测试表
CREATE TABLE user (
    id INT PRIMARY KEY,
    name VARCHAR(50),
    age INT,
    INDEX idx_name (name)
);

-- 插入初始数据
INSERT INTO user VALUES (1, 'Alice', 25);

-- 事务A可重复读
START TRANSACTION;
SELECT * FROM user WHERE id = 1;  -- 结果:(1, 'Alice', 25)

-- 事务B更新数据
START TRANSACTION;
UPDATE user SET name = 'Alice Smith' WHERE id = 1;
COMMIT;

-- 事务A再次查询
SELECT * FROM user WHERE id = 1;  -- 结果:(1, 'Alice', 25)  -- 可重复读
-- 事务A仍然读取到旧版本

-- 事务A提交后再次查询
COMMIT;
SELECT * FROM user WHERE id = 1;  -- 结果:(1, 'Alice Smith', 25)  -- 读取到最新版本

4. 脏读、幻读、不可重复读

脏读Dirty Read

定义:读取到未提交的事务数据

问题场景

// 脏读问题演示
public class DirtyReadExample {
    public void dirtyReadDemo() {
        // 线程1事务A
        new Thread(() -> {
            Connection conn1 = getDataSource().getConnection();
            try {
                conn1.setAutoCommit(false);

                // 执行更新
                conn1.createStatement().executeUpdate(
                    "UPDATE account SET balance = balance - 100 WHERE id = 1");

                // 模拟长时间处理
                Thread.sleep(5000);

                // 回滚
                conn1.rollback();

            } catch (Exception e) {
                try {
                    conn1.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        }).start();

        // 线程2事务B
        new Thread(() -> {
            try {
                Thread.sleep(1000); // 等待事务A开始

                Connection conn2 = getDataSource().getConnection();
                conn2.setAutoCommit(false);

                // 读取未提交的数据
                ResultSet rs = conn2.createStatement().executeQuery(
                    "SELECT balance FROM account WHERE id = 1");
                if (rs.next()) {
                    double balance = rs.getDouble("balance");
                    System.out.println("读取到余额: " + balance); // 900脏读
                }

                conn2.commit();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

解决方案

  • 使用 READ COMMITTED 或更高隔离级别
  • 在数据库层面设置隔离级别

不可重复读Non-repeatable Read

定义:同一事务中多次读取同一数据,结果不同

问题场景

// 不可重复读问题演示
public class NonRepeatableReadExample {
    public void nonRepeatableReadDemo() {
        // 线程1事务A
        new Thread(() -> {
            Connection conn1 = getDataSource().getConnection();
            try {
                conn1.setAutoCommit(false);

                // 第一次读取
                ResultSet rs1 = conn1.createStatement().executeQuery(
                    "SELECT balance FROM account WHERE id = 1");
                if (rs1.next()) {
                    double balance1 = rs1.getDouble("balance");
                    System.out.println("第一次读取余额: " + balance1); // 1000
                }

                // 模拟长时间处理
                Thread.sleep(5000);

                // 第二次读取
                ResultSet rs2 = conn1.createStatement().executeQuery(
                    "SELECT balance FROM account WHERE id = 1");
                if (rs2.next()) {
                    double balance2 = rs2.getDouble("balance");
                    System.out.println("第二次读取余额: " + balance2); // 900不可重复读
                }

                conn1.commit();

            } catch (Exception e) {
                try {
                    conn1.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        }).start();

        // 线程2事务B
        new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待事务A第一次读取

                Connection conn2 = getDataSource().getConnection();
                conn2.setAutoCommit(false);

                // 更新数据
                conn2.createStatement().executeUpdate(
                    "UPDATE account SET balance = balance - 100 WHERE id = 1");

                conn2.commit();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

解决方案

  • 使用 REPEATABLE READ 或 SERIALIZABLE 隔离级别
  • 使用 MVCC 机制

幻读Phantom Read

定义:同一事务中多次查询,返回的行数不同

问题场景

// 幻读问题演示
public class PhantomReadExample {
    public void phantomReadDemo() {
        // 线程1事务A
        new Thread(() -> {
            Connection conn1 = getDataSource().getConnection();
            try {
                conn1.setAutoCommit(false);

                // 第一次查询
                ResultSet rs1 = conn1.createStatement().executeQuery(
                    "SELECT COUNT(*) FROM account WHERE balance > 500");
                if (rs1.next()) {
                    int count1 = rs1.getInt(1);
                    System.out.println("第一次查询结果: " + count1); // 1
                }

                // 模拟长时间处理
                Thread.sleep(5000);

                // 第二次查询
                ResultSet rs2 = conn1.createStatement().executeQuery(
                    "SELECT COUNT(*) FROM account WHERE balance > 500");
                if (rs2.next()) {
                    int count2 = rs2.getInt(1);
                    System.out.println("第二次查询结果: " + count2); // 2幻读
                }

                conn1.commit();

            } catch (Exception e) {
                try {
                    conn1.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        }).start();

        // 线程2事务B
        new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待事务A第一次查询

                Connection conn2 = getDataSource().getConnection();
                conn2.setAutoCommit(false);

                // 插入新数据
                conn2.createStatement().executeUpdate(
                    "INSERT INTO account (id, balance) VALUES (2, 600)");

                conn2.commit();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

解决方案

  • 使用 SERIALIZABLE 隔离级别
  • 使用间隙锁Gap Lock

问题解决策略

1. 代码层面解决

// 使用乐观锁解决并发问题
public class OptimisticLockExample {
    public void updateWithOptimisticLock() {
        try {
            Connection conn = getDataSource().getConnection();
            conn.setAutoCommit(false);

            // 1. 读取数据
            ResultSet rs = conn.createStatement().executeQuery(
                "SELECT * FROM user WHERE id = 1");
            if (rs.next()) {
                User user = new User();
                user.setId(rs.getLong("id"));
                user.setName(rs.getString("name"));
                user.setVersion(rs.getInt("version"));
            }

            // 2. 模拟业务处理
            Thread.sleep(1000);

            // 3. 更新数据(包含版本号检查)
            int affected = conn.createStatement().executeUpdate(
                "UPDATE user SET name = ?, version = version + 1 " +
                "WHERE id = ? AND version = ?",
                new Object[]{"New Name", 1L, user.getVersion()});

            if (affected == 0) {
                throw new OptimisticLockException("数据已被其他事务修改");
            }

            conn.commit();

        } catch (Exception e) {
            throw new RuntimeException("更新失败", e);
        }
    }
}

// 使用悲观锁解决并发问题
public class PessimisticLockExample {
    public void updateWithPessimisticLock() {
        try {
            Connection conn = getDataSource().getConnection();
            conn.setAutoCommit(false);

            // 1. 加锁
            ResultSet rs = conn.createStatement().executeQuery(
                "SELECT * FROM user WHERE id = 1 FOR UPDATE");

            // 2. 更新数据
            conn.createStatement().executeUpdate(
                "UPDATE user SET name = 'New Name' WHERE id = 1");

            conn.commit();

        } catch (Exception e) {
            try {
                conn.rollback();
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            throw new RuntimeException("更新失败", e);
        }
    }
}

2. 数据库层面解决

-- 设置隔离级别解决并发问题
-- 1. 避免脏读
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 2. 避免不可重复读
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- 3. 避免幻读
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- 使用锁机制
-- 1. 共享锁(读锁)
SELECT * FROM user WHERE id = 1 LOCK IN SHARE MODE;

-- 2. 排他锁(写锁)
SELECT * FROM user WHERE id = 1 FOR UPDATE;

5. 实际项目中的隔离级别选择

不同业务场景的隔离级别选择

1. 电商系统

// 电商系统事务管理
@Configuration
@EnableTransactionManagement
public class EcommerceTransactionConfig {

    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    // 订单服务 - 使用可重复读
    @Service
    public class OrderService {

        @Transactional(isolation = Isolation.REPEATABLE_READ)
        public Order createOrder(OrderDTO orderDTO) {
            // 1. 减库存
            inventoryService.reduceStock(orderDTO.getItems());

            // 2. 创建订单
            Order order = new Order();
            BeanUtils.copyProperties(orderDTO, order);
            order.setStatus("PENDING");
            order.setCreateTime(LocalDateTime.now());

            orderRepository.save(order);

            // 3. 预扣账户余额
            paymentService.reserveAmount(order.getUserId(), order.getTotalAmount());

            return order;
        }

        @Transactional(isolation = Isolation.READ_COMMITTED)
        public void cancelOrder(Long orderId) {
            // 读取已提交的数据
            Order order = orderRepository.findById(orderId);

            if ("PENDING".equals(order.getStatus())) {
                // 取消订单
                order.setStatus("CANCELLED");
                order.setUpdateTime(LocalDateTime.now());
                orderRepository.save(order);

                // 释放库存和余额
                inventoryService.releaseStock(order.getItems());
                paymentService.releaseAmount(order.getUserId(), order.getTotalAmount());
            }
        }
    }

    // 支付服务 - 使用串行化
    @Service
    public class PaymentService {

        @Transactional(isolation = Isolation.SERIALIZABLE)
        public void processPayment(Long orderId) {
            // 加锁确保数据一致性
            Order order = orderRepository.findByIdWithLock(orderId);

            // 防止重复支付
            if ("PROCESSING".equals(order.getStatus())) {
                throw new PaymentException("订单已在处理中");
            }

            // 处理支付
            order.setStatus("PROCESSING");
            order.setUpdateTime(LocalDateTime.now());
            orderRepository.save(order);

            // 调用支付网关
            paymentGateway.process(order);

            // 更新订单状态
            order.setStatus("PAID");
            order.setPayTime(LocalDateTime.now());
            orderRepository.save(order);
        }
    }
}

2. 金融系统

// 金融系统事务管理
@Service
public class FinancialTransactionService {

    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void transferMoney(String fromAccount, String toAccount, BigDecimal amount) {
        try {
            // 1. 加锁账户
            Account from = accountRepository.findByIdForLock(fromAccount);
            Account to = accountRepository.findByIdForLock(toAccount);

            // 2. 检查余额
            if (from.getBalance().compareTo(amount) < 0) {
                throw new InsufficientBalanceException("余额不足");
            }

            // 3. 执行转账
            from.setBalance(from.getBalance().subtract(amount));
            to.setBalance(to.getBalance().add(amount));

            // 4. 记录交易
            Transaction transaction = new Transaction();
            transaction.setFromAccount(fromAccount);
            transaction.setToAccount(toAccount);
            transaction.setAmount(amount);
            transaction.setStatus("COMPLETED");
            transaction.setTime(LocalDateTime.now());

            accountRepository.saveAll(Arrays.asList(from, to));
            transactionRepository.save(transaction);

        } catch (Exception e) {
            throw new FinancialException("转账失败", e);
        }
    }

    @Transactional(isolation = Isolation.READ_COMMITTED)
    public BigDecimal getAccountBalance(String accountNumber) {
        // 读取已提交的数据
        Account account = accountRepository.findByAccountNumber(accountNumber);
        return account.getBalance();
    }
}

3. 社交系统

// 社交系统事务管理
@Service
public class SocialService {

    @Transactional(isolation = Isolation.REPEATABLE_READ)
    public void createPost(PostDTO postDTO) {
        // 1. 创建帖子
        Post post = new Post();
        BeanUtils.copyProperties(postDTO, post);
        post.setStatus("PUBLISHED");
        post.setCreateTime(LocalDateTime.now());

        postRepository.save(post);

        // 2. 更新用户统计
        User user = userRepository.findById(post.getUserId());
        user.setPostCount(user.getPostCount() + 1);
        user.setLastPostTime(LocalDateTime.now());

        userRepository.save(user);
    }

    @Transactional(isolation = Isolation.READ_COMMITTED)
    public List<Post> getFeedByUserId(Long userId, int page, int size) {
        // 读取已提交的数据
        return postRepository.findByUserIdOrderByCreateTimeDesc(userId, page, size);
    }
}

隔离级别调优策略

1. 性能优化

// 隔离级别性能测试
@SpringBootTest
public class IsolationLevelPerformanceTest {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Test
    public void testReadUncommittedPerformance() {
        // 测试读未提交性能
        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 10000; i++) {
            jdbcTemplate.query("SELECT * FROM user WHERE id = 1", rs -> {
                return rs.getString("name");
            });
        }

        long endTime = System.currentTimeMillis();
        System.out.println("READ UNCOMMITTED: " + (endTime - startTime) + "ms");
    }

    @Test
    public void testSerializablePerformance() {
        // 测试串行化性能
        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 10000; i++) {
            jdbcTemplate.query("SELECT * FROM user WHERE id = 1 FOR UPDATE", rs -> {
                return rs.getString("name");
            });
        }

        long endTime = System.currentTimeMillis();
        System.out.println("SERIALIZABLE: " + (endTime - startTime) + "ms");
    }
}

2. 动态隔离级别切换

// 动态隔离级别切换
@Service
public class DynamicIsolationService {

    @Autowired
    private TransactionManager transactionManager;

    public void executeWithIsolation(Runnable operation, Isolation isolation) {
        TransactionTemplate template = new TransactionTemplate(transactionManager);
        template.setIsolationLevel(isolation);

        template.execute(status -> {
            operation.run();
            return null;
        });
    }

    // 根据业务场景选择隔离级别
    public void processOrder(Order order) {
        if (order.getType() == OrderType.NORMAL) {
            // 普通订单使用可重复读
            executeWithIsolation(() -> {
                processNormalOrder(order);
            }, Isolation.REPEATABLE_READ);
        } else if (order.getType() == OrderType.PREMIUM) {
            // 高级订单使用串行化
            executeWithIsolation(() -> {
                processPremiumOrder(order);
            }, Isolation.SERIALIZABLE);
        }
    }
}

3. 事务监控和告警

// 事务监控
@Component
public class TransactionMonitor {

    @Autowired
    private TransactionManager transactionManager;

    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitorTransactions() {
        // 监控长事务
        List<ActiveTransaction> activeTransactions = getActiveTransactions();

        for (ActiveTransaction tx : activeTransactions) {
            if (tx.getDuration() > 300000) { // 5分钟
                monitorService.alert("长事务: " + tx.getSql() + ", 持续时间: " + tx.getDuration());
            }
        }

        // 监控死锁
        int deadlockCount = getDeadlockCount();
        if (deadlockCount > 0) {
            monitorService.alert("检测到死锁,数量: " + deadlockCount);
        }
    }

    // 监控隔离级别使用情况
    public Map<Isolation, Integer> getIsolationLevelUsage() {
        Map<Isolation, Integer> result = new HashMap<>();

        // 查询数据库获取隔离级别使用情况
        String sql = "SELECT variable_name, variable_value " +
                    "FROM information_schema.global_status " +
                    "WHERE variable_name IN ('Com_select', 'Com_update', 'Com_delete')";

        List<Map<String, Object>> stats = jdbcTemplate.queryForList(sql);

        for (Map<String, Object> stat : stats) {
            String name = (String) stat.get("variable_name");
            String value = (String) stat.get("variable_value");

            if ("Com_select".equals(name)) {
                result.put(Isolation.READ_COMMITTED, Integer.parseInt(value));
            } else if ("Com_update".equals(name)) {
                result.put(Isolation.REPEATABLE_READ, Integer.parseInt(value));
            } else if ("Com_delete".equals(name)) {
                result.put(Isolation.SERIALIZABLE, Integer.parseInt(value));
            }
        }

        return result;
    }
}

最佳实践总结

1. 隔离级别选择原则

  • READ UNCOMMITTED:极少使用,主要用于临时性、不重要的数据
  • READ COMMITTED通用场景Oracle、PostgreSQL默认级别
  • REPEATABLE READMySQL默认大多数业务场景适用
  • SERIALIZABLE:极端重要的金融场景,性能要求低

2. 事务管理最佳实践

// 事务管理最佳实践
@Service
public class BestPracticeService {

    // 1. 合理设置事务边界
    @Transactional
    public void processOrder(OrderDTO orderDTO) {
        // 单个事务处理完整业务流程
        Order order = createOrder(orderDTO);
        processPayment(order);
        updateInventory(order);
        notifyUser(order);
    }

    // 2. 避免长事务
    @Transactional
    public void processOrderInParts(OrderDTO orderDTO) {
        // 分步骤处理,避免长事务
        Order order = createOrder(orderDTO);

        // 立即提交事务
        commitTransaction();

        // 异步处理后续流程
        asyncProcessPayment(order);
        asyncUpdateInventory(order);
    }

    // 3. 正确处理异常
    @Transactional(rollbackFor = {BusinessException.class, RuntimeException.class})
    public void processOrderWithErrorHandling(OrderDTO orderDTO) {
        try {
            Order order = createOrder(orderDTO);
            processPayment(order);
            updateInventory(order);
        } catch (BusinessException e) {
            // 业务异常,记录日志但不回滚
            log.error("业务处理失败", e);
            throw e;
        } catch (Exception e) {
            // 系统异常,回滚
            throw e;
        }
    }

    // 4. 只读事务优化
    @Transactional(readOnly = true)
    public List<OrderDTO> getUserOrders(Long userId) {
        // 只读操作,使用只读事务提高性能
        return orderRepository.findByUserId(userId);
    }
}

3. 性能优化建议

  • 批量操作:使用批量插入和更新减少事务数量
  • 异步处理:耗时操作异步化,避免阻塞主事务
  • 缓存策略:合理使用缓存减少数据库访问
  • 连接池配置:优化连接池大小避免连接耗尽

6. 阿里 P7 加分项

分布式事务管理

1. Seata 分布式事务

// Seata 分布式事务配置
@Configuration
public class SeataConfig {

    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        return new GlobalTransactionScanner(
            "demo-service",
            "my_test_tx_group");
    }

    @Bean
    public XADataSource xaDataSource(DataSource dataSource) {
        // 配置XA数据源
        return new AtomikosDataSourceBean();
    }
}

// Seata 服务调用示例
@Service
public class OrderServiceWithSeata {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @GlobalTransactional(timeoutMills = 300000)
    public void createOrder(OrderDTO orderDTO) {
        // 1. 创建订单
        Order order = createOrder(orderDTO);

        // 2. 减库存
        inventoryService.reduceStock(orderDTO.getItems());

        // 3. 扣除余额
        paymentService.deductAmount(order.getUserId(), order.getTotalAmount());
    }
}

2. RocketMQ 分布式事务

// RocketMQ 分布式事务消息
@Service
public class RocketMQTransactionService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessageWithTransaction(String topic, String message) {
        // 1. 准备本地事务
        Transaction transaction = prepareLocalTransaction();

        // 2. 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            topic,
            MessageBuilder.withBody(message.getBytes()).build(),
            transaction);

        // 3. 处理结果
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("发送事务消息失败");
        }
    }

    @TransactionalEventListener
    public void handleTransactionEvent(TransactionEvent event) {
        // 处理事务事件
        if (event.isSuccess()) {
            commitTransaction(event.getTxId());
        } else {
            rollbackTransaction(event.getTxId());
        }
    }
}

高级并发控制

1. 乐观锁实现

// 乐观锁实现
@Component
public class OptimisticLockExecutor {

    public <T> T executeWithOptimisticLock(Supplier<T> operation,
                                         Supplier<T> fallback,
                                         int maxRetries) {
        int retryCount = 0;
        RuntimeException lastException = null;

        while (retryCount < maxRetries) {
            try {
                // 1. 执行操作
                T result = operation.get();

                // 2. 检查结果
                if (result != null) {
                    return result;
                }

            } catch (OptimisticLockException e) {
                lastException = e;
                retryCount++;

                // 指数退避
                try {
                    Thread.sleep(100 * (1 << retryCount));
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("线程被中断", ie);
                }
            }
        }

        // 重试次数用完,执行回退操作
        return fallback.get();
    }
}

// 使用示例
@Service
public class UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private OptimisticLockExecutor optimisticLockExecutor;

    public User updateUserWithOptimisticLock(Long userId, String newName) {
        return optimisticLockExecutor.executeWithOptimisticLock(
            () -> {
                User user = userRepository.findById(userId);
                if (user == null) {
                    return null;
                }

                user.setName(newName);
                user.setVersion(user.getVersion() + 1);

                userRepository.save(user);
                return user;
            },
            () -> {
                // 回退操作
                User user = userRepository.findById(userId);
                if (user != null) {
                    user.setName("Fallback_" + newName);
                    user.setVersion(user.getVersion() + 1);
                    userRepository.save(user);
                }
                return user;
            },
            3  // 最大重试次数
        );
    }
}

2. 悲观锁优化

// 悲观锁优化
@Service
public class PessimisticLockService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 使用间隙锁优化
    public List<User> getUsersWithGapLock(String minAge, String maxAge) {
        String sql = "SELECT * FROM user WHERE age BETWEEN ? AND ? FOR UPDATE";
        return jdbcTemplate.query(sql, new Object[]{minAge, maxAge},
            (rs, rowNum) -> {
                User user = new User();
                user.setId(rs.getLong("id"));
                user.setName(rs.getString("name"));
                user.setAge(rs.getInt("age"));
                return user;
            });
    }

    // 使用Next-Key Lock优化
    public User getUserWithNextKeyLock(Long userId) {
        String sql = "SELECT * FROM user WHERE id >= ? LIMIT 1 FOR UPDATE";
        return jdbcTemplate.queryForObject(sql, new Object[]{userId},
            (rs, rowNum) -> {
                User user = new User();
                user.setId(rs.getLong("id"));
                user.setName(rs.getString("name"));
                user.setAge(rs.getInt("age"));
                return user;
            });
    }
}

性能监控和调优

1. 事务性能监控

// 事务性能监控
@Component
public class TransactionPerformanceMonitor {

    @Autowired
    private MeterRegistry meterRegistry;

    @Autowired
    private TransactionManager transactionManager;

    // 监控事务执行时间
    @Around("execution(* com.example.service..*(..)) && @annotation(transactional)")
    public Object monitorTransactionTime(ProceedingJoinPoint joinPoint, Transactional transactional) throws Throwable {
        String methodName = joinPoint.getSignature().getName();
        long startTime = System.currentTimeMillis();

        try {
            Object result = joinPoint.proceed();

            // 记录成功事务
            recordTransactionMetric(methodName, true, System.currentTimeMillis() - startTime);

            return result;
        } catch (Exception e) {
            // 记录失败事务
            recordTransactionMetric(methodName, false, System.currentTimeMillis() - startTime);
            throw e;
        }
    }

    private void recordTransactionMetric(String methodName, boolean success, long duration) {
        // 记录事务指标
        meterRegistry.counter("transaction.count",
            "method", methodName,
            "success", String.valueOf(success))
            .increment();

        meterRegistry.timer("transaction.duration",
            "method", methodName,
            "success", String.valueOf(success))
            .record(duration, TimeUnit.MILLISECONDS);
    }

    // 监控隔离级别使用情况
    @Scheduled(fixedRate = 60000)
    public void monitorIsolationLevels() {
        Map<Isolation, AtomicInteger> usage = new ConcurrentHashMap<>();

        // 收集正在执行的事务隔离级别
        collectActiveTransactionIsolationLevels(usage);

        // 上报指标
        usage.forEach((isolation, count) -> {
            meterRegistry.gauge("transaction.isolation.count",
                "level", isolation.name(),
                count);
        });
    }
}

2. 动态调优策略

// 动态调优策略
@Component
public class DynamicOptimizationStrategy {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private TransactionTemplate transactionTemplate;

    // 根据系统负载动态调整隔离级别
    public void adjustIsolationLevelBasedOnLoad() {
        SystemLoad load = getSystemLoad();

        if (load.getCpuUsage() > 80) {
            // 高负载时使用较低隔离级别
            setDefaultIsolation(Isolation.READ_COMMITTED);
        } else if (load.getMemoryUsage() > 70) {
            // 中等负载使用可重复读
            setDefaultIsolation(Isolation.REPEATABLE_READ);
        } else {
            // 低负载使用串行化
            setDefaultIsolation(Isolation.SERIALIZABLE);
        }
    }

    // 自动优化事务超时时间
    public void optimizeTransactionTimeout() {
        String sql = "SELECT AVG(duration) FROM transaction_metrics WHERE success = 1";
        Double avgDuration = jdbcTemplate.queryForObject(sql, Double.class);

        if (avgDuration != null) {
            // 设置超时时间为平均执行时间的2倍
            int timeout = (int) (avgDuration * 2);
            setDefaultTransactionTimeout(timeout);
        }
    }

    // 自适应重试策略
    public <T> T executeWithAdaptiveRetry(Supplier<T> operation,
                                         Class<? extends Exception> retryException,
                                         int maxRetries) {
        int retryCount = 0;
        long baseDelay = 100;

        while (retryCount < maxRetries) {
            try {
                return operation.get();
            } catch (Exception e) {
                if (retryException.isInstance(e)) {
                    retryCount++;

                    // 指数退避
                    long delay = baseDelay * (1 << retryCount);
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("线程被中断", ie);
                    }
                } else {
                    throw e;
                }
            }
        }

        throw new RetryExhaustedException("重试次数已用完");
    }
}

总结

数据库事务隔离级别是并发控制的核心,需要根据业务场景选择合适的隔离级别:

  1. 理解 ACID 特性:原子性、一致性、隔离性、持久性
  2. 掌握四种隔离级别:读未提交、读已提交、可重复读、串行化
  3. 理解 MVCC 机制:多版本并发控制的基本原理
  4. 识别并发问题:脏读、不可重复读、幻读
  5. 选择合适隔离级别:根据业务需求和性能要求
  6. 优化事务性能:避免长事务、合理使用锁
  7. 监控和调优:持续监控事务性能,动态调整策略

在实际项目中,需要平衡数据一致性和系统性能,选择合适的隔离级别和并发控制策略。