「并发编程技术」线程池中各参数含义及具体执行流程
创始人
2025-05-28 12:37:07
0

「并发编程技术」线程池中各参数含义及具体执行流程

参考&鸣谢

通过源码理解 Java 线程池的核心参数

通俗易懂,各常用线程池执行的-流程图

ThreadPoolExecutor 的参数含义及源码执行流程


文章目录

  • 「并发编程技术」线程池中各参数含义及具体执行流程
    • 一、线程池概述
    • 二、参数含义
    • 二、执行流程
    • 四、源码验证
    • 五、小结

一、线程池概述

线程池是为了避免线程频繁的创建和销毁带来的性能消耗,而建立的一种池化技术,它是把已创建的线程放入“池”中,当有任务来临时就可以重用已有的线程,无需等待创建的过程,这样就可以有效提高程序的响应速度。但如果要说线程池的话一定离不开ThreadPoolExecutor


二、参数含义

其实,如果研究过线程池的话,其实并不难,他的参数并不多,java.util.concurrent.ThreadPoolExecutor中的参数列举出来就是这些.

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • corePoolSize:线程池中的核心线程数,即使没有任务执行的时候,他们也是存在的.(不考虑配置了参数:allowCoreThreadTimeOut,allowCoreThreadTimeOut通过字面意思也能知道,就是是否允许核心线程超时,一般情况下不需要设置,本文不考虑)
  • maximumPoolSize:线程池中的允许存在的最大线程数.
  • keepAliveTime: 当线程池中的线程超过核心线程数的时候,这部分多余的空闲线程等待执行新任务的超时时间.例如:核心线程数为1 ,最大线程数为5,当前运行线程为4,keepAliveTime为60s,那么4-1=3个线程在空闲状态下等待60s 后还没有新任务到来,就会被销毁了.
  • unit :keepAliveTime 的时间单位.
  • workQueue: 线程队列,如果当前时间核心线程都在运行,又来了一个新任务,那么这个新任务就会被放进这个线程队列中,等待执行.
  • threadFactory: 线程池创建线程的工厂类.
  • handler: 如果线程队列满了同事执行线程数也达到了maximumPoolSize,如果此时再来新的线程,将执行什么 handler 来处理这个线程. handler的默认提供的类型有:
    • AbortPolicy: 抛出RejectedExecutionException异常
    • DiscardPolicy: 什么都不做.
    • DiscardOldestPolicy: 将线程队列中的最老的任务抛弃掉,换区一个空间执行当前的任务.
    • CallerRunsPolicy: 使用当前的线程(比如 main)来执行这个线程.

二、执行流程

image-20230315090428677

我们知道了参数的含义,那么这些参数在执行过程中到底是怎么运行的呢,我们先用文字分几种情况来描述一下.

在说之前,先来看一个例子.

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new BasicThreadFactory.Builder().namingPattern("name-%d").build());
threadPoolExecutor.execute(new Runnable() {@Overridepublic void run() {System.out.println("test");}});

不得不说,在很长一段时间内,我都有一个疑问或者说误区,明明是线程池,为什么每次都需要我 new 一个 线程(错误) 呢? 因为我们开始学线程的时候先学 new Thread(),后来又学了new Runnable(),慢慢的就把这两个混为一坛了,其实 new Runnable()并没有新起一个线程,只是新建了一个可运行的任务,就是一个普通的对象而已,哈哈,这应该是一个很傻的错误认知. 回到上面说的具体含义.

  1. 如果新加入一个运行的任务,当前运行的线程小于corePoolSize,这时候会在线程池中新建一个线程用于执行这个新的任务.
  2. 如果新加入一个运行的任务,当前运行的线程大于等于corePoolSize,这个时候就需要将这个新的任务加入到线程队列workQueue中,一旦线程中的线程执行完成了一个任务,就会马上从队列中去一个任务来执行.
  3. 接2,如果队列也满了,怎么办呢? 如果maximumPoolSize大于corePoolSize,就会新建线程来处理这个新的任务,直到总运行线程数达到maximumPoolSize.
  4. 如果总运行线程数达到了maximumPoolSize,还来了新的任务怎么办呢?就需要执行上面所说的拒绝策略了handler了,按照配置的策略进行处理,默认不配置的情况下,使用的是AbortPolicy.
 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

四、源码验证

怎么判断上面说的流程是正确的呢?我们可以跟进源码来仔细查看一下上面的流程,其实线程池执行的代码比较简单,一看变动,看了源码,掌握得应该会更加深刻一些.

首先来看看execute()方法

public void execute(Runnable command) {if (command == null)throw new NullPointerException();// ctl是一个原子的控制位,可以表示线程池的状态和运行的线程数;int c = ctl.get();// 1. 如果运行线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {//直接新建 worker(线程)执行.if (addWorker(command, true))return;c = ctl.get();}// 2. 如果上面的addWorker 失败了,就需要加入线程队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 加入后,检查状态;if (! isRunning(recheck) && remove(command))//检查运行状态不通过,移除任务,执行拒绝策略reject(command);// 如果当前的运行线程为0else if (workerCountOf(recheck) == 0)//就是用核心线程执行刚刚添加到队列的线程addWorker(null, false);}// 3. 如果队列也满了,就新建线程继续处理else if (!addWorker(command, false))// 4. 如果不允许新建了,就执行拒绝策略reject(command);}

按照一个正常流程来说,我们只考虑一个理想的环境.我们可以分为上面的4步,正好和上面的文字描述对应.

可能爱思考的同学发现,第2步,加入队列后,什么时候执行这个新加入的呢,难道有一个定时任务吗?并不是.我们可以看看这个addWorker()方法.

private boolean addWorker(Runnable firstTask, boolean core) {retry://第一层循环for (;;) {int c = ctl.get();//获取当前线程池的状态;int rs = runStateOf(c);...//第二层循环for (;;) {//获取线程池的运行线程个数int wc = workerCountOf(c);// 大于了最大允许的线程个数,当然要返回 falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过了检查,就把 正在运行的线程数加1if (compareAndIncrementWorkerCount(c))//跳出第一层循环break retry;c = ctl.get();  // Re-read ctl//加1 失败,可能有多线程冲突,检查一下最新的状态,继续重试;if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一个线程包装我们的 Runnablew = new Worker(firstTask);final Thread t = w.thread;if (t != null) {...//加入 hashSet 中管理存在于线程池中线程workers.add(w);workerAdded = true;if (workerAdded) {// 启动 worker,worker就是线程中真正执行的线程,包装了我们提供的 Runnablet.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

上面的addWorker()方法中,就是靠t.start()来启动线程的. Worker这个类存在于java.util.concurrent.ThreadPoolExecutor.Worker,定义如下 只保留了相对重要的代码.

 private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {....try {task.run();} catch (RuntimeException x) {thrown = x; throw x;...} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}}

所以当t.start()的时候,实际上,新建了一个线程,执行了runWorker(this);方法: 这个方法里面有一个while循环,getTask()是从队列中获取一个任务.所以说这里可以解答上面放到队列里面的任务什么时候执行了,等到任意一个核心线程空闲出来时候,他就会循环去取队列中的任务执行.每个核心线程和新起来的线程都是同步来执行你传进来的Runnablerun方法.

整个流程应该就比较清楚了.

上面说了这么多,核心参数都说的差不多了,那么keepAliveTime 这个参数在源码怎么来用的呢?

上面说到一个getTask()方法从队列中取一个任务,看一下这个方法的代码(省略非主要的).

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);...int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
复制代码

主要就是用于取任务这里,poll()不会阻塞,take()是阻塞的,所以当使用poll取数据的时候,到达设定的超时,就会继续往下执行,如果超过设定时间还是没有任务进来,就会将timedOut设置为 true,返回 null. 这个timedOut会控制上面的 if 判断,最终控制compareAndDecrementWorkerCount()方法,就是讲运行的线程数减1个,那么下次如果又满了,就会新建一个,所以这个 Alive 就失效了.


五、小结

线程池的使用必须要通过 ThreadPoolExecutor 的方式来创建,这样才可以更加明确线程池的运行规则,规避资源耗尽的风险。同时,也介绍了 ThreadPoolExecutor 的七大核心参数,包括核心线程数和最大线程数之间的区别,当线程池的任务队列没有可用空间且线程池的线程数量已经达到了最大线程数时,则会执行拒绝策略,Java 自动的拒绝策略有 4 种,用户也可以通过重写 rejectedExecution() 来自定义拒绝策略,还可以通过重写 beforeExecute() 和 afterExecute() 来实现 ThreadPoolExecutor 的扩展功能。

相关内容

热门资讯

从“纸上政策”到“落地实效” ... 在建湖县高新技术园区,文辰精密科技有限公司的生产车间内机器轰鸣,工人们正忙着赶制新一批订单。这家专注...
跨省办公更便捷!京津冀律师驿站... 11月22日至23日,京津冀律师驿站在北京启动“百千万行动计划”,计划通过建立百家律所联系点,推动千...
梅花生物:因侵害专利权被味之素... 北京商报讯(记者 郭秀娟 王悦彤) 11月23日,北京商报记者获悉,近日梅花生物发布公告称,公司及全...
吉林益豆食品有限公司:依托互市... 珲春地处中、俄、朝三国交界,得天独厚的地理位置,使其成为连接东北亚的“黄金通道”。在这里,吉林益豆食...
北京外国语大学教授宁强在敦煌因... 中新网11月23日电 据“北京外国语大学国新学院”微信公众号消息,11月23日,北京外国语大学国际新...
上海市民建议“推行地铁月票制度... 近日 有网友在人民网“领导留言板”留言 希望上海地铁推出月票制度 上海申通地铁集团有限公司答复表示 ...
原创 中... 近期,日本首相高市早苗首次明确表态,必要时将武力介入台海,这一惊人的言论犹如对历史的挑衅,并直接威胁...
女儿为留学想卖掉其名下的房子,... 父母已离婚多年 很少与父亲见面 一直是姑姑 照顾病重的父亲…… 女儿因求学花销 急需卖房变现 200...
《最高人民法院关于审理建设工程... 为深入贯彻习近平法治思想,认真落实党的二十届四中全会精神,正确审理建设工程施工合同纠纷案件,统一法律...
【金昌】老百姓的“声音”,是这... 编 者 按 人民,是推动经济社会发展的主角;基层,是新闻永不枯竭的源头。为深入学习和践行“四力”要求...