Java并发编程—线程池
创始人
2024-03-16 05:14:09
0

文章目录

  • 线程池
    • 什么是线程池
      • 线程池优点:
      • 线程复用技术
    • 线程池的实现原理是什么
    • 线程池执行任务的流程?
      • 线程池如何知道一个线程的任务已经执行完成
    • 线程池的核心参数
      • 拒绝策略
    • 线程池类型(常用线程池)
      • 阻塞队列
        • 执行execute()方法和submit()方法的区别
    • 代码
      • 进行优化
      • 线程池代码描述

——————————————————————————————————

线程池

什么是线程池

  • 首先,线程池本质上是一种池化技术,而池化技术是一种资源复用的思想,比较常见的有连接池、内存池、对象池。
  • 而线程池里面复用的是线程资源。
  • 线程池就是创建若干个可执行的线程放入一个池中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。
  • 自己实现比较麻烦,所以有线程池帮助完成这些功能,让线程处于一直存活状态。
  • 有任务之后会交给线程池,线程池交给线程。

线程池优点:

①:减少线程的频繁创建和销毁带来的性能开销,因为线程创建会涉及到 CPU 上下文切换、内存分配等工作。
②:提升快速响应的能力,不需要创建线程,直接在线程池调用就行
③:线程池本身会有参数来控制线程创建的数量,这样就可以避免无休止的创建线程带来的资源利用率过高的问题,起到了资源保护的作用。
④:减少代码之间的耦合,能够实现异步操作(异步:直接发送下一个请求,不需要等待回复)

线程复用技术

线程复用技术,因为线程的生命周期时由任务运行的状态决定的,无法人为控制。所以为了实现线程的复用,线程池里面用到了阻塞队列,也就是说线程池里面的工作线程处于一直运行状态,它会从阻塞队列中去获取待执行的任务,一旦队列空了,那这个工作线程就会被阻塞,直到下次有新的任务进来。工作线程是根据任务的情况实现阻塞和唤醒,从而达到线程复用的目的。
最后,线程池里面的资源限制,是通过几个关键参数来控制的,分别是核心线程数、最大线程数。核心线程数表示默认长期存在的工作线程,而最大线程数是根据任务的情况动态创建的线程,主要是提高阻塞队列中任务的处理效率。

线程池的实现原理是什么

线程里面有个死循环
线程池里面存数据的集合是队列

线程池执行任务的流程?

  1. 线程池执行executelsubmit方法向线程池添加任务,当任务小于核心线程数corePoolSize,线程池中可以创建新的线程。
  2. 当任务大于核心线程数corePoolSize,就向阻塞队列添加任务。
  3. 如果阻塞队列已满,需要通过比较参数maximumPoolSize,在线程池创建新的线程,当线程数量大于maximumPoolSize,说明当前设置线程池中线程已经处理不了了,就会执行饱和策略。
    在这里插入图片描述

线程池如何知道一个线程的任务已经执行完成

(1)在线程池内部,当我们把一个任务丢给线程池去执行,线程池会调度工作线程来执行这个任务的 run 方法,run 方法正常结束,也就意味着任务完成了。
(线程池中的工作线程是通过同步调用任务的 run()方法并且等待 run 方法返回后,再去统计任务的完成数量。)
(2)如果想在线程池外部去获得线程池内部任务的执行状态,有几种方法可以实现。线程池提供了一个 isTerminated()方法,可以判断线程池的运行状态,我们可以循环判断 isTerminated()方法的返回结果来了解线程池的运行状态,一旦线程池的运行状态是 Terminated,意味着线程池中的所有任务都已经执行完了。想要通过这个方法获取状态的前提是,程序中主动调用了线程池的 shutdown()方法。在实际业务中,一般不会主动去关闭线程池,因此这个方法在实用性和灵活性方面都不是很好。
(3)在线程池中,有一个 submit()方法,它提供了一个 Future 的返回值,我们通过Future.get()方法来获得任务的执行结果,当线程池中的任务没执行完之前future.get()方法会一直阻塞,直到任务执行结束。因此,只要 future.get()方法正常返回,也就意味着传入到线程池中的任务已经执行完成了!

  • 也可以引入一个 CountDownLatch 计数器,它可以通过初始化指定一个计数器进行倒计时,其中有两个方法分别是 await()阻塞线程,以及 countDown()进行倒计时,一旦倒计时归零,所以被阻塞在 await()方法的线程都会被释放。基于这样的原理,我们可以定义一个 CountDownLatch 对象并且计数器为 1,接着在线程池代码块后面调用 await()方法阻塞主线程,然后,当传入到线程池中的任务执行完成后,调用countDown()方法表示任务执行结束。最后,计数器归零 0,唤醒阻塞在 await()方法的线程。
    (4)总结:不管是线程池内部还是外部,要想知道线程是否执行结束,我们必须要获取线程执行结束后的状态,而线程本身没有返回值,所以只能通过阻塞-唤醒的方式来实现,future.get 和 CountDownLatch 都是这样一个原理。

线程池的核心参数

线程池的真正实现类是 ThreadPoolExecutor,其构造方法需要如下参数:

  • corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
  • maximumPoolSize(必需):线程池所能容纳的 最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞。
  • keepAliveTime(必需):线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
  • unit(必需):指定 keepAliveTime 参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
  • workQueue(必需):任务队列。通过线程池的 execute() 方法提交的 Runnable 对象将存储在该参数中。其采用阻塞队列实现。
  • threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。
  • handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。

拒绝策略

AbortPolicy(默认):丢弃任务,并抛出 RejectedExecutionException 异常。
CallerRunsPolicy:由调用线程处理该任务。
DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。

线程池类型(常用线程池)

  • 定长线程池(FixedThreadPool)
    • 特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列。
    • 应用场景:控制线程最大并发数。
  • 定时线程池(ScheduledThreadPool )
    • 特点:核心线程数量固定,非核心线程数量无限,执行完闲置 10ms 后回收,任务队列为延时阻塞队列。
    • 应用场景:执行定时或周期性的任务。
  • 可缓存线程池(CachedThreadPool)
    • 特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列。
    • 应用场景:执行大量、耗时少的任务。
  • 单线程化线程池(SingleThreadExecutor)
    • 特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
    • 应用场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。

阻塞队列

在这里插入图片描述

执行execute()方法和submit()方法的区别

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否:
  • submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get (long timeout,Timeunit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

代码

public class DequeThread {//存储任务private Deque threads=new LinkedList<>();private int defaultThreadCount=4;public DequeThread(){for (int i = 0; i < defaultThreadCount; i++) {Worker worker=new Worker();//当前类实例化的时候,不断在任务列表放任务threads.addLast(worker);//执行任务worker.start();}}private Deque tasks=new LinkedList<>();public int  addTask(T task) {synchronized (tasks){//添加的时候也应该锁住tasks.addLast(task);//放任务tasks.notifyAll();//添加新任务时应该做一次唤醒,唤醒全部的来抢任务}return 0;}class Worker extends Thread{@Overridepublic void run() {super.run();while (true){T first=tasks.pollFirst();//取任务synchronized (tasks){//synchronized执行完之后,锁就释放出来了//拿到锁就判断是否为空while(first==null){//防止空指针try {tasks.wait();//为空的时候等待在task上//.wait():让当前线程阻塞在对象上,并把锁释放掉} catch (InterruptedException e) {e.printStackTrace();}//再判断是不是空,如果是空就进下一次循环//应该是取出来再执行first=tasks.pollFirst();}}first.run();//执行任务}}}public static void main(String[] args) {DequeThread thread=new DequeThread<>();thread.addTask(new Thread(new Runnable() {@Overridepublic void run() {System.out.println("1");}}));thread.addTask(new Thread(new Runnable() {@Overridepublic void run() {System.out.println("2");}}));thread.addTask(new Thread(new Runnable() {@Overridepublic void run() {System.out.println("3");}}));}
}

进行优化

public class DequeThread {//存储任务private Deque threads=new LinkedList<>();private int defaultThreadCount=4;//限制个数,线程安全的计数private AtomicInteger taskCount = new AtomicInteger();private Object full = new Object();public DequeThread(){for (int i = 0; i < defaultThreadCount; i++) {Worker worker=new Worker();//当前类实例化的时候,不断在任务列表放任务threads.addLast(worker);//执行任务worker.start();}}private Deque tasks=new LinkedList<>();public int  addTask(T task) throws InterruptedException {synchronized (tasks){//添加的时候也应该锁住synchronized (full){int i = taskCount.incrementAndGet();//自增//如果i加到10,就进行阻塞while (i > 10){full.wait();}}tasks.addLast(task);//放任务//添加新任务时应该做一次唤醒,唤醒全部的来抢任务tasks.notifyAll();}return 0;}class Worker extends Thread{@Overridepublic void run() {super.run();while (true){synchronized (tasks){//synchronized执行完之后,锁就释放出来了//优化——》tasks.pollFirst();放到锁里T first=tasks.pollFirst();//取任务//拿到锁就判断是否为空while(first==null){//防止空指针try {tasks.wait();//为空的时候等待在task上//.wait():让当前线程阻塞在对象上,并把锁释放掉} catch (InterruptedException e) {e.printStackTrace();}//再判断是不是空,如果是空就进下一次循环//应该是取出来再执行first=tasks.pollFirst();}//优化——》first.run();放到锁里first.run();//执行任务//执行完成后,做减一的操作taskCount.decrementAndGet();//阻塞到full上的线程就能被唤醒//如果空间满了,让调用的线程阻塞住synchronized (full){full.notifyAll();}}}}}//main方法里面执行的代码结束了public static void main(String[] args) throws InterruptedException {DequeThread thread=new DequeThread<>();thread.addTask(new Thread(new Runnable() {@Overridepublic void run() {System.out.println("1");}}));thread.addTask(new Thread(new Runnable() {@Overridepublic void run() {System.out.println("2");}}));thread.addTask(new Thread(new Runnable() {@Overridepublic void run() {System.out.println("3");}}));}
}

线程池代码描述

继承了Thread类,也可以实现Runnable接口,
如何让任务和线程关联起来?
选择了自己封装线程的run方法,让它和提交的任务关联起来;
又因为线程池的线程是优先于任务启动起来的,所以看不到任务,只能借助一个另外的集合private Deque tasks=new LinkedList<>(); tasks这个变量联系起来;
类似于生产者消费者的思想,没有任务的时候等着,有任务的时候执行,进而完成线程池的处理
而且线程池的初始化是在程序运行起来的时候就进行了
所以是线程池运行起来之后,才等着任务进来

相关内容

热门资讯

新华鲜报丨利好跨国公司!这项跨... 新华社北京12月26日电(记者刘开雄、吴雨)中国人民银行、国家外汇管理局12月26日发布通知,在总结...
日元空头共识渐成:2026年或... 随着日本央行最新加息举措未能提振汇率,华尔街对日元的看空情绪再度升温,市场正逐渐形成日元将长期疲软的...
北平锋:民进党当局对所谓“两岸... 12月26日,台湾《中国时报》报道,陆委会近日推动所谓“两岸人民关系条例”四项修正,包含:公务员赴陆...
AI核心产业超万亿,工信部将完... 今年,工业经济顶压前行、向新向优发展,展现强大韧性和活力。 12月25日至26日,全国工业和信息化工...
神州泰岳(300002)披露全... 截至2025年12月26日收盘,神州泰岳(300002)报收于11.37元,较前一交易日上涨0.09...
车企起诉电池企业第一案!吉利旗... 出品 | 搜狐汽车·汽车咖啡馆 作者 | 胡耀丹 2024年底发出的回旋镖,在2025年底向欣旺达疾...
海南产经新观察:封关政策释红利... 中新网海南东方12月26日电 (陈英清)“海南自贸港封关运作顺利实施,政策红利持续释放,南繁水稻制种...
无证售药、两地维权!养生馆纠纷... 一副自制中药制剂,引发两地法院诉讼;一次耐心调解,让双方握手言和。近日,饶平县人民法院调解一宗因养生...
*ST节能(000820)披露... 截至2025年12月26日收盘,*ST节能(000820)报收于3.36元,较前一交易日上涨0.9%...
疑电芯质量存问题,500亿巨头... 二线电池厂商欣旺达(300207.SZ)被起诉了! 12月26日盘后,欣旺达披露公告称,公司子公司欣...