欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > 以 Tomcat 为例分析 Java 中的线程池

以 Tomcat 为例分析 Java 中的线程池

2025/2/25 14:11:23 来源:https://blog.csdn.net/qq_56517253/article/details/145825357  浏览:    关键词:以 Tomcat 为例分析 Java 中的线程池

以 Tomcat 为例分析 Java 中的线程池

首先,为什么会有“池”的概念?

我们的项目在运行过程中,需要使用系统资源(CPU、内存、网络、磁盘等)来完成信息的处理,比如在 JVM 中新建对象就需要消耗 CPU 和内存资源,当需要频繁创建大量的对象,并且这些对象的存活时间短,就意味着需要进行频繁销毁,那么很有可能这部分代码会成为性能的瓶颈。

而“池”就是用来解决这个问题的,简单来说,对象池就是把用过的对象保存起来,等下一次需要这种对象的时候,直接从对象池中拿出来重复使用,避免频繁地创建和销毁。

Java 线程池

ThreadPoolExecutor

看下 java.util.concurrent.ThreadPoolExecutor 中的构造方法

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

每次提交任务时,如果线程数还没达到核心线程数 corePoolSize,线程池就创建新线程来执行。当线程数达到**corePoolSize **后,新增的任务就放到工作队列 workQueue 里,而线程池中的线程则努力地从 workQueue 里拉活来干,也就是调用 poll 方法来获取任务。

如果任务很多,并且 workQueue 是个有界队列,队列可能会满,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数 maximumPoolSize,则不能再创建新的临时线程了,转而执行拒绝策略 handler,比如抛出异常或者由调用者线程来执行任务等。

如果高峰过去了,线程池比较闲了怎么办?临时线程使用 poll(keepAliveTime, unit)方法从工作队列中拉活干,请注意 poll 方法设置了超时时间,如果超时了仍然两手空空没拉到活,表明它太闲了,这个线程会被销毁回收。

那还有一个参数 threadFactory 是用来做什么的呢?通过它你可以扩展原生的线程工厂,比如给创建出来的线程取个有意义的名字。

注意这些默认策略是可以修改的:

  • 声明线程池后立即调用 prestartAllCoreThreads 方法,来启动所有核心线程;
  • 传入 true 给 allowCoreThreadTimeOut 方法,来让线程池在空闲的时候同样回收核心线程。

FixedThreadPool/CachedThreadPool

Java 提供了一些默认的线程池实现,比如 FixedThreadPool 和 CachedThreadPool,它们的本质就是给 ThreadPoolExecutor 设置了不同的参数,是定制版的 ThreadPoolExecutor。

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

对比一下我们可以发现,线程池的两个关键点就是:

  • 是否限制线程个数。
  • 是否限制队列长度。

FixedThreadPool 的核心线程数就是最大线程数,当忙不过来时 task 会被丢到 LinkedBlockingQueue 队列中,注意:这是一个无界队列,也就是在任务量足够大时会触发 OOM。

CachedThreadPool 的核心线程数为 0,最大线程数是 Integer 的最大值,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是 SynchronousQueue,表明队列长度为 0。在任务量足够大时会触发 OOM,因为资源是有限的,无法一直创建新线程。

因此,不建议使用 Executors 提供的两种快捷的线程池,原因如下:

  • 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
  • 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。

拒绝策略

拒绝策略特点适用场景
AbortPolicy默认策略,抛出 RejectedExecutionException 异常。不允许丢失任务,任务必须立即处理的场景。
CallerRunsPolicy任务由调用者线程执行,降低并发度。调用者线程可以处理任务,适合减少任务提交速度的场景。
DiscardPolicy丢弃无法执行的任务,不抛出异常。可以容忍任务丢失,适合不重要的任务丢弃。
DiscardOldestPolicy丢弃队列中最旧的任务,加入新任务。适合丢弃最旧任务,保持队列中的新任务。

其中,注意 CallerRunsPolicy,在实际业务开发中,可能会导致 tomcat 的工作线程来进行业务的处理,进一步降低系统并发度。

Tomcat 线程池

Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。那么,我们有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?

有的兄弟,有的

按照正常的线程池流程,在任务到来时,如果队列已满,就会创建新的非核心线程,那么可以重写队列的 offer 方法,造成队列已满的假象,在线程数达到最大线程数时,执行拒绝策略的时候,把任务尝试加入队列,如果这时队列真的满了,再按照拒绝策略处理。

ThreadPoolExecutor

    public void execute(Runnable command, long timeout, TimeUnit unit) {// 计数器 + 1,维护提交到了线程池但是还没执行完成的任务数量submittedCount.incrementAndGet();try {// 尝试进行处理executeInternal(command);} catch (RejectedExecutionException rx) {if (getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue) getQueue();try {// 继续尝试把任务放到任务队列中去if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {// 计数器 - 1,抛出异常submittedCount.decrementAndGet();throw rx;}}}private void executeInternal(Runnable command) {int c = ctl.get();// 线程数 < 核心线程数,创建一个核心线程来接收if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) {return;}c = ctl.get();}// 线程数 >= 核心线程数,尝试让队列接收if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command)) {reject(command);} else if (workerCountOf(recheck) == 0) { // 有可能此时有线程死亡了,再次检查是否需要添加线程addWorker(null, false);}} else if (!addWorker(command, false)) { // 队列接收可能返回 false,尝试创建非核心线程来处理reject(command);}}

TaskQuene,注意继承了 LinkedBlockingQueue 无界队列,如果不重写线程池执行方法,新请求只会放入队列,直到 OOM。

    // TaskQueue extends LinkedBlockingQueue<Runnable>// 进入此方法的前提是 当前线程数已经达到了核心线程数public boolean offer(Runnable o) {//we can't do any checksif (parent == null) {return super.offer(o);}// we are maxed out on threads, simply queue the object// 线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。if (parent.getPoolSize() == parent.getMaximumPoolSize()) {return super.offer(o);}// we have idle threads, just add it to the queue// 已提交的任务数 <= 当前线程数,表示还有空闲线程,无需创建新线程// AtomicInteger submittedCount 维护已经提交到了线程池,但是还没有执行完的任务个数if (parent.getSubmittedCount() <= (parent.getPoolSize())) {return super.offer(o);}// if we have less threads than maximum force creation of a new thread// 已提交的任务数 > 当前线程数,并且当前线程数 < 最大线程数,返回 false 创建新线程if (parent.getPoolSize() < parent.getMaximumPoolSize()) {return false;}// if we reached here, we need to add it to the queue// 其他情况下添加到队列中return super.offer(o);}

可以看到,在 24 行如果 当前线程数 > 核心线程数,且 < 最大线程数,会优先创建新的非核心线程,而不是优先使用队列。原因是队列继承了无界队列,如果先放入队列会导致最大线程数失效,定制版的任务队列,重写了 offer 方法,使得在任务队列长度无限制的情况下,线程池仍然有机会创建新的线程。

Tomcat 的线程池与 Java 原生线程池的最大区别是:在线程数达到最大线程数后,继续尝试把任务添加到任务队列中去,如果这时候插入失败,再真正执行拒绝策略。

最佳实践

要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列,既然使用了线程池就需要确保线程池是在复用的,每次 new 一个线程池出来可能比不用线程池还糟糕。复用线程池不代表应用程序始终使用同一个线程池,我们应该根据任务的性质来选用不同的线程池。特别注意 IO 绑定的任务和 CPU 绑定的任务对于线程池属性的偏好,如果希望减少任务间的相互干扰,考虑按需使用隔离的线程池。

  • 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。
  • 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词