# MySQL 主从延迟面试指南 ## 1. 主从复制原理 ### MySQL 主从复制架构 **主从复制的三种架构模式**: 1. **主从复制**(Master-Slave) ``` Master → Slave1 → Slave2 → Slave3 ``` 2. **主主复制**(Master-Master) ``` Master1 ↔ Master2 ↓ Slave1 ↓ Slave2 ``` 3. **级联复制**(Master-Slave-Slave) ``` Master → Slave1 → Slave2 ↓ Slave3 ``` ### 主从复制流程 MySQL 主从复制基于二进制日志(Binary Log),主要分为三个步骤: ``` 1. 主库写操作: ↓ 主库执行 SQL 事务 ↓ 写入 binlog ↓ 发送 binlog 到从库 2. 从库读取: ↓ 从库 I/O 线程读取 binlog ↓ 写入中继日志(Relay Log) ↓ 更新 master-info 3. 从库应用: ↓ 从库 SQL 线程执行中继日志 ↓ 更新 slave-relay-info ↓ 应用完成 ``` ### 复制的核心组件 **主端组件**: - **binlog**:记录所有更改操作 - **dump thread**:发送 binlog 到从库 **从端组件**: - **I/O thread**:接收 binlog - **SQL thread**:执行中继日志 - **relay log**:中继日志 - **master.info**:记录主库连接信息 - **relay-log.info**:记录中继日志位置 ### 复制的配置示例 **主库配置(my.cnf)**: ```ini [mysqld] # 启用二进制日志 server-id = 1 log-bin = mysql-bin binlog-format = ROW binlog-row-image = FULL expire_logs_days = 7 max_binlog_size = 1G # GTID 配置 gtid_mode = ON enforce_gtid_consistency = ON # 复制过滤 replicate-wild-ignore-table = mysql.% replicate-wild-ignore-table = test.% ``` **从库配置(my.cnf)**: ```ini [mysqld] # 从库配置 server-id = 2 relay-log = mysql-relay-bin read-only = 1 # GTID 配置 gtid_mode = ON enforce_gtid_consistency = ON # 中继日志自动清理 relay_log_purge = 1 ``` **主从复制建立**: ```sql -- 主库创建复制用户 CREATE USER 'repl'@'%' IDENTIFIED BY 'password'; GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%'; -- 从库配置主库连接 CHANGE REPLICATION SOURCE TO SOURCE_HOST = '192.168.1.100', SOURCE_PORT = 3306, SOURCE_USER = 'repl', SOURCE_PASSWORD = 'password', SOURCE_AUTO_POSITION = 1; -- 使用 GTID -- 启动复制 START REPLICA; -- 查看复制状态 SHOW REPLICA STATUS \G ``` ## 2. 主从延迟的原因 ### 硬件层面 **1. 磁盘 I/O 瓶颈** ```bash # 查看磁盘性能 iostat -x 1 10 # 监控磁盘使用情况 df -h ``` **2. 网络延迟** ```bash # 网络延迟测试 ping 192.168.1.100 traceroute 192.168.1.100 mtr 192.168.1.100 # 网络带宽监控 iftop nload ``` **3. CPU 负载过高** ```bash # CPU 使用率监控 top htop mpstat 1 10 # MySQL 相关进程 ps aux | grep mysql ``` ### 配置层面 **1. 复制参数配置不当** ```ini # 主库配置优化 [mysqld] # 二进制日志相关 sync_binlog = 1 # 1: 每次事务提交都同步,0: 操作系统决定 binlog_cache_size = 4M # binlog 缓冲区大小 binlog_stmt_cache_size = 4M # 语句缓存大小 # 从库配置优化 [mysqld] # SQL 线程配置 slave_parallel_workers = 4 # MySQL 5.7+ 并行复制 slave_parallel_type = LOGICAL_CLOCK # 并行复制类型 slave_pending_jobs_size_max = 2G # 待处理任务队列大小 # 中继日志相关 relay_log_space_limit = 8G # 中继日志限制 ``` **2. 存储引擎配置** ```sql -- 主库使用 InnoDB 配置 SET GLOBAL innodb_flush_log_at_trx_commit = 1; -- 1: 每次事务提交都刷新 SET GLOBAL innodb_buffer_pool_size = 8G; -- 50-70% 内存 SET GLOBAL innodb_io_capacity = 2000; -- 根据 IOPS 调整 SET GLOBAL innodb_io_capacity_max = 4000; -- 最大 I/O capacity -- 从库优化配置 SET GLOBAL read_only = 1; -- 只读模式 SET GLOBAL innodb_flush_log_at_trx_commit = 1; -- 主从一致性 ``` ### 业务层面 **1. 大事务处理** ```sql -- 问题示例:大事务导致延迟 BEGIN; -- 执行大量更新操作 UPDATE order_table SET status = 'completed' WHERE create_time < '2023-01-01'; UPDATE order_table SET status = 'shipped' WHERE create_time < '2023-02-01'; ... -- 大量操作 COMMIT; -- 优化方案:批量处理 BEGIN; -- 分批处理 UPDATE order_table SET status = 'completed' WHERE create_time < '2023-01-01' LIMIT 1000; COMMIT; -- 或者使用事件调度 CREATE EVENT batch_update_order_status ON SCHEDULE EVERY 1 MINUTE DO BEGIN UPDATE order_table SET status = 'completed' WHERE create_time < '2023-01-01' LIMIT 1000; END; ``` **2. 复杂查询影响复制** ```sql -- 复杂查询可能导致 SQL 线程阻塞 SELECT o.* FROM order o JOIN user u ON o.user_id = u.id WHERE o.amount > 10000 AND u.create_time > '2023-01-01' AND o.status IN ('pending', 'processing') ORDER BY o.create_time DESC LIMIT 1000; -- 优化方案:创建索引 CREATE INDEX idx_order_user_status ON order(user_id, status, create_time); CREATE INDEX idx_user_create_time ON user(create_time); -- 或者使用物化视图 CREATE MATERIALIZED VIEW mv_order_user_status AS SELECT o.*, u.name FROM order o JOIN user u ON o.user_id = u.id; -- 定期刷新 CREATE EVENT refresh_mv_order_user_status ON SCHEDULE EVERY 5 MINUTE DO REFRESH MATERIALIZED VIEW mv_order_user_status; ``` ### 复制模式影响 **1. 语句复制(STATEMENT)** ```sql -- 语句复制的问题 CREATE PROCEDURE update_order_amount(IN p_user_id INT, IN p_factor DECIMAL) BEGIN DECLARE done INT DEFAULT FALSE; DECLARE v_order_id INT; DECLARE v_amount DECIMAL; DECLARE cur CURSOR FOR SELECT id, amount FROM order WHERE user_id = p_user_id; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; OPEN cur; read_loop: LOOP FETCH cur INTO v_order_id, v_amount; IF done THEN LEAVE read_loop; END IF; UPDATE order SET amount = v_amount * p_factor WHERE id = v_order_id; END LOOP; CLOSE cur; END; -- 每次执行都会在从库重复执行,导致不同结果 CALL update_order_amount(1, 1.1); ``` **2. 行复制(ROW)** ```sql -- 行复制配置 [mysqld] binlog_format = ROW binlog_row_image = FULL -- 优点:数据一致性好 -- 缺点:binlog 体积大,复制性能较低 ``` **3. 混合复制(MIXED)** ```ini [mysqld] binlog_format = MIXED ``` ## 3. 如何监控主从延迟 ### 基础监控命令 **1. 查看复制延迟** ```sql -- MySQL 8.0+ SHOW REPLICA STATUS\G -- 关键字段 Seconds_Behind_Master: 延迟秒数 Replica_IO_Running: I/O 线程状态 Replica_SQL_Running: SQL 线程状态 -- MySQL 5.7 SHOW SLAVE STATUS\G ``` **2. GTID 延迟监控** ```sql -- 使用 GTID 监控延迟 SELECT master_executed_gtid_set, received_gtid_set, SUBSTRING(master_executed_gtid_set, 1, 20) as master_gtid, SUBSTRING(received_gtid_set, 1, 20) as slave_gtid FROM performance_schema.replication_connection_status WHERE channel_name = ''; ``` ### 延迟监控脚本 **1. Python 监控脚本** ```python #!/usr/bin/env python3 import pymysql import time import sys from datetime import datetime class MySQLReplicationMonitor: def __init__(self, host, user, password, port=3306): self.host = host self.user = user self.password = password self.port = port def get_replication_status(self): try: conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port) cursor = conn.cursor() query = """ SELECT Seconds_Behind_Master, Slave_IO_Running, Slave_SQL_Running, Last_IO_Error, Last_SQL_Error, Last_IO_Error_Timestamp, Last_SQL_Error_Timestamp FROM information_schema.replica_status """ cursor.execute(query) result = cursor.fetchone() return { 'delay': result[0] if result[0] is not None else 0, 'io_running': result[1] == 'Yes', 'sql_running': result[2] == 'Yes', 'io_error': result[3], 'sql_error': result[4], 'io_error_time': result[5], 'sql_error_time': result[6] } except Exception as e: print(f"Error: {e}") return None finally: if 'conn' in locals(): conn.close() def monitor(self, interval=60, threshold=300): while True: status = self.get_replication_status() if status: print(f"[{datetime.now()}] Delay: {status['delay']}s, IO: {status['io_running']}, SQL: {status['sql_running']}") if status['delay'] > threshold: print(f"ALERT: Replication delay exceeds threshold: {status['delay']}s") if not status['io_running']: print("ERROR: IO thread stopped") if not status['sql_running']: print("ERROR: SQL thread stopped") if status['io_error']: print(f"IO Error: {status['io_error']}") if status['sql_error']: print(f"SQL Error: {status['sql_error']}") time.sleep(interval) # 使用示例 if __name__ == "__main__": monitor = MySQLReplicationMonitor( host="192.168.1.200", user="monitor", password="password" ) monitor.monitor(interval=30, threshold=60) ``` **2. Shell 监控脚本** ```bash #!/bin/bash # replication_monitor.sh MYSQL_HOST="192.168.1.200" MYSQL_USER="monitor" MYSQL_PASSWORD="password" MYSQL_PORT="3306" THRESHOLD=300 while true; do DELAY=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASSWORD -P$MYSQL_PORT -e "SHOW REPLICA STATUS\G" | grep "Seconds_Behind_Master" | awk '{print $2}') if [ -z "$DELAY" ]; then DELAY="0" fi TIMESTAMP=$(date "+%Y-%m-%d %H:%M:%S") echo "[$TIMESTAMP] Replication Delay: $DELAY seconds" if [ "$DELAY" -gt "$THRESHOLD" ]; then echo "ALERT: Replication delay exceeds threshold: $DELAY seconds" # 发送告警 # curl -X POST -H "Content-Type: application/json" -d '{"text":"Replication delay: '$DELAY' seconds"}' https://your-webhook-url fi sleep 30 done ``` ### 监控系统集成 **1. Prometheus + Grafana 监控** ```yaml # prometheus.yml global: scrape_interval: 15s scrape_configs: - job_name: 'mysql_replication' static_configs: - targets: ['192.168.1.100:9104', '192.168.1.200:9104'] ``` **2. Exporter 配置** ```python # mysql_exporter 配置 collector_groups: - replication - process - schema - global_innodb_metrics # 查询示例 SELECT variable_value as seconds_behind_master FROM performance_schema.global_status WHERE variable_name = 'Seconds_Behind_Master'; SELECT variable_name, variable_value FROM performance_schema.global_status WHERE variable_name IN ( 'Slave_running', 'Slave_io_running', 'Slave_sql_running' ); ``` **3. Grafana Dashboard** ```json { "dashboard": { "title": "MySQL Replication Monitor", "panels": [ { "title": "Replication Delay", "type": "graph", "targets": [ { "expr": "mysql_global_status_seconds_behind_master", "legendFormat": "{{instance}}" } ] }, { "title": "IO Thread Status", "type": "singlestat", "targets": [ { "expr": "mysql_global_status_slave_io_running", "legendFormat": "{{instance}}" } ] }, { "title": "SQL Thread Status", "type": "singlestat", "targets": [ { "expr": "mysql_global_status_slave_sql_running", "legendFormat": "{{instance}}" } ] } ] } } ``` ### 延迟告警配置 **1. Alertmanager 配置** ```yaml # alertmanager.yml groups: - name: mysql_replication rules: - alert: MySQLReplicationLag expr: mysql_global_status_seconds_behind_master > 300 for: 5m labels: severity: warning annotations: summary: "MySQL replication lag is {{ $value }} seconds" description: "Replication delay exceeds 5 minutes" - alert: MySQLReplicationStopped expr: mysql_global_status_slave_io_running == 0 for: 1m labels: severity: critical annotations: summary: "MySQL replication IO thread stopped" description: "IO thread is not running" - alert: MySQLSQLThreadStopped expr: mysql_global_status_slave_sql_running == 0 for: 1m labels: severity: critical annotations: summary: "MySQL replication SQL thread stopped" description: "SQL thread is not running" ``` **2. 企业级监控告警** ```python # 企业级监控服务 class EnterpriseReplicationMonitor: def __init__(self, config): self.config = config self.alert_channels = [] def add_alert_channel(self, channel): self.alert_channels.append(channel) def check_replication_health(self): status = self.get_replication_status() alerts = [] if status['delay'] > self.config['threshold']: alerts.append({ 'level': 'warning', 'message': f"Replication delay: {status['delay']}s", 'timestamp': datetime.now() }) if not status['io_running']: alerts.append({ 'level': 'critical', 'message': "IO thread stopped", 'timestamp': datetime.now() }) if not status['sql_running']: alerts.append({ 'level': 'critical', 'message': "SQL thread stopped", 'timestamp': datetime.now() }) # 发送告警 for alert in alerts: self.send_alert(alert) def send_alert(self, alert): for channel in self.alert_channels: channel.send(alert) # 邮件告警 class EmailAlertChannel: def send(self, alert): # 发送邮件逻辑 pass # 钉钉告警 class DingTalkAlertChannel: def send(self, alert): # 发送钉钉消息 pass # 企业微信告警 class WeChatAlertChannel: def send(self, alert): # 发送企业微信消息 pass ``` ## 4. 如何解决主从延迟 ### 读写分离策略 **1. 基础读写分离** ```java // Java 读写分离实现 public class DataSourceRouter { private final DataSource masterDataSource; private final List slaveDataSources; private final AtomicInteger counter = new AtomicInteger(0); public DataSource getDataSource(boolean isWrite) { if (isWrite) { return masterDataSource; } else { int index = counter.getAndIncrement() % slaveDataSources.size(); return slaveDataSources.get(index); } } // 使用注解 @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface ReadOnly { } } @Service public class UserService { @Autowired @ReadOnly public List getUsers() { // 从从库读取 } @Autowired public void createUser(User user) { // 写入主库 } } ``` **2. 动态数据源路由** ```java @Configuration public class DynamicDataSourceConfig { @Bean @ConfigurationProperties("spring.datasource.master") public DataSource masterDataSource() { return DataSourceBuilder.create().build(); } @Bean @ConfigurationProperties("spring.datasource.slave") public DataSource slaveDataSource1() { return DataSourceBuilder.create().build(); } @Bean @ConfigurationProperties("spring.datasource.slave") public DataSource slaveDataSource2() { return DataSourceBuilder.create().build(); } @Bean public DataSource dynamicDataSource() { Map targetDataSources = new HashMap<>(); targetDataSources.put("master", masterDataSource()); targetDataSources.put("slave1", slaveDataSource1()); targetDataSources.put("slave2", slaveDataSource2()); DynamicDataSource dynamicDataSource = new DynamicDataSource(); dynamicDataSource.setDefaultTargetDataSource(masterDataSource()); dynamicDataSource.setTargetDataSources(targetDataSources); return dynamicDataSource; } } public class DynamicDataSource extends AbstractRoutingDataSource { private static final ThreadLocal dataSourceKey = new ThreadLocal<>(); public static void setDataSourceKey(String key) { dataSourceKey.set(key); } public static String getDataSourceKey() { return dataSourceKey.get(); } @Override protected Object determineCurrentLookupKey() { return getDataSourceKey(); } } ``` **3. 中间件实现读写分离** ```yaml # MyCat 配置 # schema.xml
select user() select user() ``` ### 并行复制优化 **1. MySQL 5.7+ 并行复制** ```sql -- 从库配置 SET GLOBAL slave_parallel_workers = 4; SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK'; SET GLOBAL slave_pending_jobs_size_max = 1024M; -- 查看并行复制状态 SHOW VARIABLES LIKE '%parallel%'; SHOW STATUS LIKE '%slave_parallel%'; ``` **2. 基于库的并行复制** ```ini # my.cnf 配置 [mysqld] # MySQL 5.7.2+ 支持库级别并行复制 slave_parallel_workers = 8 slave_parallel_type = DATABASE replicate_wild_ignore_table=mysql.% replicate_wild_ignore_table=test.% ``` **3. 基于组提交的并行复制** ```ini # 主库配置 [mysqld] # 启用组提交 binlog_group_commit_sync_delay = 1000 binlog_group_commit_sync_no_delay_count = 10 # 优化二进制日志 sync_binlog = 1 innodb_flush_log_at_trx_commit = 1 # 从库配置 [mysqld] # 启用并行复制 slave_parallel_workers = 16 slave_parallel_type = LOGICAL_CLOCK slave_preserve_commit_order = 1 ``` ### 半同步复制 **1. 半同步复制配置** ```sql -- 主库配置 -- 安装插件 INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so'; SET GLOBAL rpl_semi_sync_master_enabled = 1; -- 从库配置 -- 安装插件 INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so'; SET GLOBAL rpl_semi_sync_slave_enabled = 1; -- 查看半同步状态 SHOW STATUS LIKE 'Rpl_semi_sync%'; ``` **2. 半同步复制超时设置** ```sql -- 主库超时设置 SET GLOBAL rpl_semi_sync_master_timeout = 1000; -- 毫秒 -- 从库超时设置 SET GLOBAL rpl_semi_sync_slave_timeout = 1000; -- 主库等待从库确认数 SET GLOBAL rpl_semi_sync_master_wait_no_slave = 1; ``` **3. 半同步复制监控** ```java // 半同步复制监控组件 @Component public class SemiSyncMonitor { @Scheduled(fixeedRate = 5000) public void monitorSemiSync() { // 检查半同步状态 boolean isMasterSemiSync = checkMasterSemiSyncStatus(); boolean isSlaveSemiSync = checkSlaveSemiSyncStatus(); // 监控延迟 long delay = getReplicationDelay(); // 监控等待时间 long waitTime = getSemiSyncWaitTime(); // 告警检查 if (!isMasterSemiSync) { alert("Master semi-sync disabled"); } if (delay > 60) { alert("Replication delay too high: " + delay + "s"); } if (waitTime > 1000) { alert("Semi-sync wait time too long: " + waitTime + "ms"); } } } ``` ### 优化主库性能 **1. 主库配置优化** ```ini # my.cnf 主库优化 [mysqld] # 缓冲池 innodb_buffer_pool_size = 16G innodb_buffer_pool_instances = 8 # 日志配置 innodb_log_file_size = 4G innodb_log_buffer_size = 64M innodb_flush_log_at_trx_commit = 1 # I/O 配置 innodb_io_capacity = 2000 innodb_io_capacity_max = 4000 innodb_read_io_threads = 16 innodb_write_io_threads = 16 # 二进制日志 binlog_format = ROW sync_binlog = 1 binlog_cache_size = 32M binlog_stmt_cache_size = 32M expire_logs_days = 7 max_binlog_size = 1G # 连接配置 max_connections = 1000 thread_cache_size = 100 ``` **2. 主库SQL优化** ```sql -- 优化主库查询 -- 避免全表扫描 CREATE INDEX idx_user_id_status ON order(user_id, status); CREATE INDEX idx_order_time ON order(create_time); -- 优化大表更新 -- 使用批量更新 UPDATE order SET status = 'completed' WHERE status = 'pending' AND create_time < NOW() - INTERVAL 1 DAY LIMIT 1000; -- 使用临时表处理大操作 CREATE TEMPORARY TABLE temp_order_update AS SELECT id, user_id FROM order WHERE status = 'pending' AND create_time < NOW() - INTERVAL 1 DAY LIMIT 1000; UPDATE temp_order_update t JOIN order o ON t.id = o.id SET o.status = 'completed'; -- 定期优化表 ANALYZE TABLE order, user; OPTIMIZE TABLE order; ``` ### 从库优化策略 **1. 从库配置优化** ```ini # my.cnf 从库优化 [mysqld] # 只读模式 read_only = 1 super_read_only = 1 # 缓冲池(通常比主库大) innodb_buffer_pool_size = 24G innodb_buffer_pool_instances = 8 # 读取优化 innodb_read_io_threads = 32 innodb_write_io_threads = 16 # 中继日志 relay_log_space_limit = 8G relay_log_purge = 1 # 复制优化 slave_parallel_workers = 16 slave_parallel_type = LOGICAL_CLOCK slave_pending_jobs_size_max = 2G # 查询缓存(MySQL 8.0已移除) query_cache_size = 0 ``` **2. 从库SQL优化** ```sql -- 从库专用索引 CREATE INDEX idx_query_user ON user(create_time); CREATE INDEX idx_report_order ON order(create_time, amount); -- 优化复杂查询 -- 使用覆盖索引 SELECT SQL_NO_CACHE id, name, email FROM user WHERE create_time > '2023-01-01' AND status = 'active'; -- 使用物化视图 CREATE MATERIALIZED VIEW mv_user_active AS SELECT id, name, email, create_time FROM user WHERE status = 'active' AND create_time > '2023-01-01'; -- 定期刷新 CREATE EVENT refresh_mv_user_active ON SCHEDULE EVERY 5 MINUTE DO REFRESH MATERIALIZED VIEW mv_user_active; ``` ### 故障恢复方案 **1. 主从切换自动化** ```java // 主从切换服务 @Service public class MasterSlaveFailoverService { @Autowired private DataSource masterDataSource; @Autowired private List slaveDataSources; @Autowired private NotificationService notificationService; public void failover() { // 1. 检测主库故障 if (!checkMasterHealth()) { // 2. 选择新的主库 DataSource newMaster = selectNewMaster(); // 3. 执行主从切换 executeFailover(newMaster); // 4. 通知应用 notifyApplication(newMaster); // 5. 发送告警 notificationService.sendAlert("Master-Slave failover completed"); } } private boolean checkMasterHealth() { try { Connection conn = masterDataSource.getConnection(); return conn.isValid(5); } catch (Exception e) { return false; } } private DataSource selectNewMaster() { // 选择最健康的从库 return slaveDataSources.stream() .filter(this::checkSlaveHealth) .findFirst() .orElseThrow(() -> new RuntimeException("No healthy slave available")); } private void executeFailover(DataSource newMaster) { // 1. 停止从库复制 stopReplication(newMaster); // 2. 重新配置主库 reconfigureAsMaster(newMaster); // 3. 更新应用配置 updateDataSourceConfig(newMaster); } } ``` **2. 主从切换脚本** ```bash #!/bin/bash # master_failover.sh MASTER_HOST="192.168.1.100" SLAVE_HOSTS=("192.168.1.101" "192.168.1.102" "192.168.1.103") NEW_MASTER="192.168.1.101" # 1. 检查主库状态 if ! mysql -h$MASTER_HOST -uadmin -padmin -e "SELECT 1" >/dev/null 2>&1; then echo "Master database is down" # 2. 选择新主库 for slave in "${SLAVE_HOSTS[@]}"; do if mysql -h$slave -uadmin -padmin -e "SHOW REPLICA STATUS\G" | grep -q "Slave_SQL_Running: Yes"; then NEW_MASTER=$slave break fi done echo "New master selected: $NEW_MASTER" # 3. 停止从库复制 mysql -h$NEW_MASTER -uadmin -padmin -e "STOP REPLICA" # 4. 重新配置为主库 mysql -h$NEW_MASTER -uadmin -padmin -e " RESET MASTER; RESET SLAVE; CHANGE REPLICATION SOURCE TO SOURCE_HOST='', SOURCE_PORT=0, SOURCE_USER='', SOURCE_PASSWORD='', SOURCE_AUTO_POSITION=0; " # 5. 通知应用 curl -X POST -H "Content-Type: application/json" \ -d '{"master_host": "'$NEW_MASTER'"}' \ http://localhost:8080/api/change-master echo "Failover completed" else echo "Master is healthy, no failover needed" fi ``` ## 5. 实际项目中的解决方案 ### 大型电商系统主从优化 **场景描述**: - 日订单量:100万+ - 数据库:MySQL 8.0 - 架构:1主16从 **解决方案**: 1. **分库分表 + 主从复制** ```java // 分库分表配置 @Configuration public class EcommerceShardingConfig { @Bean public DataSource shardingDataSource() { // 8个分片,每个分片1主3从 Map dataSourceMap = new HashMap<>(); for (int i = 0; i < 8; i++) { // 主库 HikariDataSource master = createDataSource("192.168.1." + (100 + i), 3306, "master"); dataSourceMap.put("master_" + i, master); // 从库 for (int j = 1; j <= 3; j++) { HikariDataSource slave = createDataSource("192.168.1." + (200 + i * 3 + j), 3306, "slave"); dataSourceMap.put("slave_" + i + "_" + j, slave); } } ShardingRuleConfiguration ruleConfig = new ShardingRuleConfiguration(); // 订单表分片 TableRuleConfiguration orderRule = new TableRuleConfiguration("order", "order_ds_$->{0..7}.order_$->{order_id % 8}"); ruleConfig.getTableRuleConfigs().add(orderRule); return ShardingDataSourceFactory.createDataSource(dataSourceMap, ruleConfig); } } ``` 2. **多级缓存** ```java // 多级缓存策略 @Service public class OrderService { @Autowired private RedisTemplate redisTemplate; @Autowired private OrderRepository orderRepository; @Cacheable(value = "order", key = "#orderId") public OrderDTO getOrder(Long orderId) { // 从数据库读取 Order order = orderRepository.findById(orderId); // 缓存到Redis redisTemplate.opsForValue().set("order:" + orderId, order, 1, TimeUnit.HOURS); return convertToDTO(order); } @Cacheable(value = "order_list", key = "#userId + '_' + #page") public List getUserOrders(Long userId, int page) { // 从从库读取 List orders = orderRepository.findByUserId(userId, page); // 缓存列表 redisTemplate.opsForList().leftPushAll("order:list:" + userId, orders); return convertToDTOList(orders); } } ``` ### 社交媒体平台优化 **场景描述**: - 日活跃用户:5000万+ - 数据量:TB级 - 读多写少 **解决方案**: 1. **读写分离策略** ```java // 智能读写分离 public class SmartDataSourceRouter { private final DataSource masterDataSource; private final List slaveDataSources; private final LoadBalancer loadBalancer; public DataSource getDataSource() { // 根据延迟选择从库 List healthySlaves = getHealthySlaves(); if (healthySlaves.isEmpty()) { return masterDataSource; } // 根据延迟选择最优从库 DataSource bestSlave = selectBestSlave(healthySlaves); return bestSlave; } private DataSource selectBestSlave(List slaves) { return slaves.stream() .min(Comparator.comparingDouble(this::getSlaveDelay)) .orElse(slaves.get(0)); } private double getSlaveDelay(DataSource dataSource) { // 获取从库延迟 try (Connection conn = dataSource.getConnection()) { long delay = conn.createStatement() .executeQuery("SHOW REPLICA STATUS") .getLong("Seconds_Behind_Master"); return delay; } catch (Exception e) { return Double.MAX_VALUE; } } } ``` 2. **数据预热** ```java // 数据预热服务 @Component public class DataWarmupService { @Autowired private UserService userService; @Autowired private PostService postService; @Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点 public void warmupData() { // 预热热门用户 List hotUsers = userService.getHotUsers(); hotUsers.forEach(user -> { userService.getUserCache(user.getId()); }); // 预热热门帖子 List hotPosts = postService.getHotPosts(); hotPosts.forEach(post -> { postService.getPostCache(post.getId()); }); } } ``` ### 金融系统主从优化 **场景描述**: - 数据一致性要求高 - 不能丢失数据 - 低延迟要求 **解决方案**: 1. **半同步复制 + 事务同步** ```java // 金融系统数据同步 @Service public class FinancialTransactionService { @Autowired private DataSource masterDataSource; @Autowired private DataSource slaveDataSource; @Transactional public void transferMoney(String fromAccount, String toAccount, BigDecimal amount) { try { // 1. 执行转账 transfer(fromAccount, toAccount, amount); // 2. 同步到从库 syncToSlave(fromAccount, toAccount, amount); // 3. 记录日志 logTransaction(fromAccount, toAccount, amount); } catch (Exception e) { // 回滚事务 throw new FinancialException("Transfer failed", e); } } private void syncToSlave(String fromAccount, String toAccount, BigDecimal amount) { try (Connection conn = slaveDataSource.getConnection()) { // 同步转账记录 String sql = "INSERT INTO transaction_log (from_account, to_account, amount, status) VALUES (?, ?, ?, 'SUCCESS')"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, fromAccount); ps.setString(2, toAccount); ps.setBigDecimal(3, amount); ps.executeUpdate(); } } catch (Exception e) { throw new RuntimeException("Sync to slave failed", e); } } } ``` 2. **数据校验** ```java // 数据一致性校验 @Service public class DataConsistencyService { @Scheduled(fixedRate = 300000) // 每5分钟校验一次 public void checkConsistency() { // 1. 检查数据一致性 List inconsistencies = findInconsistencies(); // 2. 修复不一致数据 for (Inconsistency issue : inconsistencies) { fixInconsistency(issue); } // 3. 发送告警 if (!inconsistencies.isEmpty()) { notificationService.sendAlert("Data consistency issues found: " + inconsistencies.size()); } } private List findInconsistencies() { List result = new ArrayList<>(); // 比较主从数据 String masterQuery = "SELECT COUNT(*) FROM account"; String slaveQuery = "SELECT COUNT(*) FROM account"; // 执行查询并比较 if (!compareQueryResults(masterQuery, slaveQuery)) { result.add(new Inconsistency("account", "COUNT_MISMATCH")); } return result; } private boolean compareQueryResults(String masterQuery, String slaveQuery) { // 比较查询结果 // 实现细节 return true; } } ``` ### 总结 解决 MySQL 主从延迟需要从多个层面考虑: 1. **硬件层面**:优化磁盘、网络、CPU性能 2. **配置层面**:合理配置MySQL参数 3. **架构层面**:设计合理的读写分离策略 4. **业务层面**:优化查询,避免大事务 5. **监控层面**:建立完善的监控体系 6. **运维层面**:自动化故障恢复 在实际项目中,需要根据业务特点选择合适的解决方案,并持续优化和改进。