工作中遇到的线程池问题一则
自从上次投产以后,生产监控告警的消息接连不断——我们提供给集中作业平台的接口(API001),我们系统的响应太慢了,耗时接近40多秒,经常超时。
经过查看报文信息,项目组锁定造成这些告警的单据,基本都是结算单。 因为这种单据的票据信息很多,而且都是数电票,而数电票的查验比其他发票的查验慢很多,查验是在这个接口响应报文之前做的动作。
目前合理的解决办法是——先返回报文信息,再异步的查验发票。
Java使用异步需要用到线程池,这里我遇到了之前没有考虑过的问题
public class ThreadPoolExample {
public static ThreadPoolExecutor getThreadPool() {
return new ThreadPoolExecutor(
10,
50,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
}
}
这里到底创建了一个怎样的线程池。
核心线程是10个,最大线程数是50个,非核心线程最大空闲时间是60秒。
我一直认为,在任务超过核心线程数后,会先增加线程数量,然后放置到队列中,最终执行拒绝策略。
func exec(task) {
if (t = getRestThread(threadPool.thread) && t != null) {
t(task)
}
if count(threadPool.threads) < coreSize {
createThreadAndRun(task)
return
}
if (count(threadPool.threads) > coreSize && count(threadPool.threads) < maxSize) {
createThreadAndRun(task)
return
}
if (!full?(threadPool.taskQueue)) {
add(threadPool.taskQueue, task);
return
}
runPolicy(threadPool.policy)
}
其实并不是这样的,核心线程的数量不足以处理任务时,先把任务放置在队列中,而不是先增加线程数量。
func exec(task) {
if (t = getRestThread(threadPool.thread) && t != null) {
t(task)
}
if count(threadPool.threads) < coreSize {
createThreadAndRun(task)
return
}
if (!full?(threadPool.taskQueue)) {
add(threadPool.taskQueue, task);
return
}
if (count(threadPool.threads) > coreSize && count(threadPool.threads) < maxSize) {
createThreadAndRun(task)
return
}
runPolicy(threadPool.policy)
}
这里Java的设计哲学是避免系统因过度并发而崩溃。
原因 | 说明 |
---|---|
🧠 控制线程数量 | 线程是重量级资源,创建/调度成本高。如果每个任务都新建线程,很容易导致线程过多、上下文切换频繁、CPU 被打爆。 |
🛠️ 线程复用优先 | corePoolSize 的线程是“常驻线程”,希望先复用它们来完成尽可能多的任务,避免频繁建销线程。 |
🧭 流量缓冲器作用 | 队列起到“缓冲作用”,避免任务高峰期瞬间扩展大量线程,把系统搞崩。 |
🧱 可配置性强 | 使用队列可以通过调整队列大小、线程池大小、拒绝策略来灵活应对不同业务负载。 |
🔧 减少上下文切换 | 太多线程会导致 CPU 在不同线程之间频繁切换,反而性能下降。 |
可以做一个测试,看看上述内容是真是假
public class Main {
public static void main(String[] args) {
Runnable testPoolBehavior = () -> {
try {
Thread.sleep(500000);
} catch (InterruptedException ignored) {
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
100,
500,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(5000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
threadPoolExecutor.execute(testPoolBehavior);
// 这里直接创建100个线程,按照策略,应该会放置在队列中,而不是直接扩容线程池
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(testPoolBehavior);
}
System.out.println("线程池中线程数: "+threadPoolExecutor.getPoolSize());;
threadPoolExecutor.shutdownNow();
}
}
这里创建了一个非常大的队列,我们的预期是:
即使核心线程没有空闲,如果待执行的任务没有超过队列的最大容量,那么不会创建新的线程。
经过测试我们可以看到,100个线程明显超过了核心线程数,但是没有超过队列的容量,所以线程池中的线程个数依然是10个。
写了这么多年的Java,这种关键的知识竟然不知道,我很惭愧。