# 线程池核心参数详解 ## 问题 1. 线程池的核心参数有哪些?各自的作用是什么? 2. 如何合理设置线程池大小? 3. 线程池的拒绝策略有哪些?如何自定义? 4. 线程池如何优雅关闭? 5. 线程池的监控指标有哪些? 6. 在实际项目中如何使用线程池? --- ## 标准答案 ### 1. 线程池核心参数 #### **ThreadPoolExecutor 构造函数** ```java public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 非核心线程空闲存活时间 TimeUnit unit, // 时间单位 BlockingQueue workQueue, // 任务队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 拒绝策略 ) ``` --- #### **参数详解** **1. corePoolSize(核心线程数)** - **说明**:即使空闲也保留的线程数 - **默认值**:创建时无核心线程(任务到达时才创建) - **预热**:`prestartAllCoreThreads()` 提前创建核心线程 ```java ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, // 核心线程数 20, // 最大线程数 ... ); // 预热核心线程 executor.prestartAllCoreThreads(); ``` --- **2. maximumPoolSize(最大线程数)** - **说明**:线程池允许的最大线程数 - **限制**:`maximumPoolSize >= corePoolSize` - **动态调整**:运行时可通过 `setMaximumPoolSize()` 调整 ```java // 动态调整最大线程数 executor.setMaximumPoolSize(50); ``` --- **3. keepAliveTime(非核心线程存活时间)** - **说明**:非核心线程的空闲存活时间 - **超时回收**:超过时间后,线程会被回收 - **允许回收核心线程**:`allowCoreThreadTimeOut(true)` ```java ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60, // 存活时间 TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); // 允许核心线程超时回收 executor.allowCoreThreadTimeOut(true); ``` --- **4. workQueue(任务队列)** **常见队列**: | 队列 | 特性 | 适用场景 | |------|------|----------| | **SynchronousQueue** | 不存储,直接传递 | 高并发、低延迟 | | **LinkedBlockingQueue** | 无界队列(默认 Integer.MAX_VALUE) | 任务提交频繁 | | **ArrayBlockingQueue** | 有界队列 | 防止资源耗尽 | | **PriorityBlockingQueue** | 优先级队列 | 优先级任务 | **示例**: ```java // 1. SynchronousQueue(高并发) ExecutorService executor1 = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue() // 无队列,直接传递 ); // 2. LinkedBlockingQueue(无界) ExecutorService executor2 = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000) // 队列长度 1000 ); // 3. PriorityBlockingQueue(优先级) ExecutorService executor3 = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(100) ); ``` --- **5. threadFactory(线程工厂)** **作用**: - 设置线程名称(便于排查) - 设置线程优先级 - 设置是否为守护线程 **示例**: ```java ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("order-pool-%d") // 线程名称前缀 .setDaemon(false) // 非守护线程 .setPriority(Thread.NORM_PRIORITY) .build(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), namedThreadFactory ); ``` --- **6. handler(拒绝策略)** **内置策略**: | 策略 | 说明 | |------|------| | **AbortPolicy(默认)** | 抛出异常 | | **CallerRunsPolicy** | 调用者线程执行 | | **DiscardPolicy** | 静默丢弃 | | **DiscardOldestPolicy** | 丢弃最旧的任务 | ```java // 自定义拒绝策略 RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 记录日志 log.warn("任务被拒绝: {}", r); // 重试(加入队列等待) if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), handler ); ``` --- ### 2. 线程池工作流程 ``` 任务提交 ↓ 核心线程数 < corePoolSize? ├─ 是 → 创建核心线程并执行 └─ 否 → 继续 ↓ 队列未满? ├─ 是 → 加入队列 └─ 否 → 继续 ↓ 线程数 < maximumPoolSize? ├─ 是 → 创建非核心线程并执行 └─ 否 → 继续 ↓ 拒绝策略 ``` --- ### 3. 合理设置线程池大小 #### **CPU 密集型任务** **特点**:主要消耗 CPU 资源(计算、加密) **公式**: ``` 线程数 = CPU 核心数 + 1 ``` **原因**: - CPU 密集型任务不需要太多线程 - +1 是为了当某线程因页故障等原因暂停时,CPU 不会闲置 **示例**: ```java int cpuCore = Runtime.getRuntime().availableProcessors(); // 8 int poolSize = cpuCore + 1; // 9 ThreadPoolExecutor executor = new ThreadPoolExecutor( poolSize, // 核心线程数 poolSize, // 最大线程数 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100) ); ``` --- #### **IO 密集型任务** **特点**:主要等待 IO(网络、磁盘) **公式**: ``` 线程数 = CPU 核心数 × (1 + IO 耗时 / CPU 耗时) ``` **示例**: ```java // IO 耗时 / CPU 耗时 = 2(IO 占 2/3,CPU 占 1/3) int cpuCore = Runtime.getRuntime().availableProcessors(); // 8 int poolSize = cpuCore * (1 + 2); // 24 ThreadPoolExecutor executor = new ThreadPoolExecutor( cpuCore, // 核心线程数 = CPU 核心数 poolSize, // 最大线程数 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(500) ); ``` --- #### **通用公式** ``` 线程数 = CPU 核心数 × 目标 CPU 使用率 × (1 + IO 耗时 / CPU 耗时) ``` **参数调整**: - 目标 CPU 使用率:80% - 90% - IO / CPU 比例:通过压测获得 --- ### 4. 线程池监控 #### **监控指标** | 指标 | 说明 | 获取方法 | |------|------|----------| | **活跃线程数** | 正在执行任务的线程数 | `getActiveCount()` | | **已完成任务数** | 历史完成的任务总数 | `getCompletedTaskCount()` | | **总任务数** | 已完成 + 正在执行 | `getTaskCount()` | | **队列大小** | 队列中待执行任务数 | `getQueue().size()` | | **最大线程数** | 历史最大线程数 | `getLargestPoolSize()` | | **线程池是否关闭** | `isShutdown()` | `isShutdown()` | --- #### **监控代码** ```java @Component public class ThreadPoolMonitor { @Autowired private Map executorMap; @Scheduled(fixedRate = 60000) // 每分钟 public void monitor() { executorMap.forEach((name, executor) -> { ThreadPoolExecutorStats stats = new ThreadPoolExecutorStats(); stats.setName(name); stats.setCorePoolSize(executor.getCorePoolSize()); stats.setMaximumPoolSize(executor.getMaximumPoolSize()); stats.setActiveCount(executor.getActiveCount()); stats.setCompletedTaskCount(executor.getCompletedTaskCount()); stats.setTaskCount(executor.getTaskCount()); stats.setQueueSize(executor.getQueue().size()); stats.setLargestPoolSize(executor.getLargestPoolSize()); // 上报到监控系统(Prometheus、Grafana) Metrics.report(stats); // 告警判断 if (executor.getActiveCount() >= executor.getMaximumPoolSize() * 0.8) { alert("线程池 " + name + " 负载过高"); } }); } } ``` --- #### **Actuator 监控(Spring Boot)** **依赖**: ```xml org.springframework.boot spring-boot-starter-actuator ``` **配置**: ```yaml management: endpoints: web: exposure: include: health,info,metrics metrics: export: prometheus: enabled: true ``` **访问**: ```bash curl http://localhost:8080/actuator/metrics curl http://localhost:8080/actuator/metrics/executor.pool.size ``` --- ### 5. 线程池优雅关闭 #### **问题** 不优雅关闭的后果: - 已提交的任务可能丢失 - 正在执行的任务可能被中断 --- #### **shutdown()** ```java executor.shutdown(); try { // 等待任务完成 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 超时,强制关闭 executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } ``` **特点**: - 不再接受新任务 - 等待已提交的任务完成 - 超时后可调用 `shutdownNow()` 强制关闭 --- #### **shutdownNow()** ```java List unfinishedTasks = executor.shutdownNow(); ``` **特点**: - 不再接受新任务 - 尝试停止正在执行的任务(通过 `Thread.interrupt()`) - 返回未执行的任务列表 --- ### 6. Spring 线程池配置 #### **配置类** ```java @Configuration public class ThreadPoolConfig { @Bean("orderThreadPool") public ThreadPoolExecutor orderThreadPool() { return new ThreadPoolExecutor( 10, // 核心线程数 20, // 最大线程数 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder() .setNameFormat("order-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); } @Bean("emailThreadPool") public ThreadPoolExecutor emailThreadPool() { return new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), new ThreadFactoryBuilder() .setNameFormat("email-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); } } ``` --- #### **使用** ```java @Service public class OrderService { @Autowired @Qualifier("orderThreadPool") private ThreadPoolExecutor orderThreadPool; public void createOrder(Order order) { // 异步处理 orderThreadPool.execute(() -> { // 处理订单 processOrder(order); }); } } ``` --- #### **@Async(Spring 异步)** **配置**: ```java @Configuration @EnableAsync public class AsyncConfig { @Bean("asyncExecutor") public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(100); executor.setThreadNamePrefix("async-pool-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } } ``` **使用**: ```java @Service public class EmailService { @Async("asyncExecutor") public void sendEmail(String to, String subject, String body) { // 异步发送邮件 mailSender.send(to, subject, body); } } ``` --- ### 7. 实际项目经验 #### **案例 1:线程池参数调优** **问题**: - 订单接口响应慢 - CPU 使用率低(30%),线程池队列满 **分析**: ```java // 原配置 corePoolSize = 5 maximumPoolSize = 10 queue = LinkedBlockingQueue(100) ``` **问题**: - 线程数太少,任务堆积在队列 - 数据库连接池用满,等待连接 **优化**: ```java // 优化后配置 corePoolSize = 20 // 增加 maximumPoolSize = 50 // 增加 queue = LinkedBlockingQueue(500) // 增加 ``` **结果**:响应时间从 2s 降至 200ms --- #### **案例 2:动态线程池** **需求**:根据流量动态调整线程池大小 **实现**: ```java @Component public class DynamicThreadPoolManager { private final Map executorMap = new ConcurrentHashMap<>(); @PostConstruct public void init() { // 定时调整线程池大小 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(this::adjustThreadPoolSize, 1, 1, TimeUnit.MINUTES); } private void adjustThreadPoolSize() { executorMap.forEach((name, executor) -> { // 获取当前负载 int activeCount = executor.getActiveCount(); int maximumPoolSize = executor.getMaximumPoolSize(); // 负载 > 80%,扩容 if (activeCount > maximumPoolSize * 0.8) { int newSize = Math.min(maximumPoolSize * 2, 100); executor.setMaximumPoolSize(newSize); log.info("扩容线程池: {} -> {}", name, newSize); } // 负载 < 20%,缩容 else if (activeCount < maximumPoolSize * 0.2) { int newSize = Math.max(maximumPoolSize / 2, executor.getCorePoolSize()); executor.setMaximumPoolSize(newSize); log.info("缩容线程池: {} -> {}", name, newSize); } }); } } ``` --- ### 8. 阿里 P7 加分项 **深度理解**: - 理解线程池的状态转换(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED) - 理解 `Worker` 的实现原理(继承 AQS、实现 Runnable) - 理解线程池的异常处理机制 **实战经验**: - 有线程池参数调优的经验 - 有处理线程池饱和问题的经验 - 有线程池监控和告警的经验 **架构能力**: - 能设计动态线程池(根据流量调整) - 能设计线程池隔离(不同业务独立线程池) - 能设计线程池监控体系 **技术选型**: - 了解 `ForkJoinPool`(工作窃取线程池) - 了解 `ScheduledThreadPoolExecutor`(定时任务线程池) - 了解 `Vert.x`、WebFlux 等响应式框架的线程模型