跳至主要內容

⑥JAVA 线程池和Exector框架

holic-x...大约 40 分钟JAVA基础

⑥JAVA 线程池和Exector框架

学习核心

  • JAVA线程池
    • 线程池原理、为什么引入线程池
    • 线程池的核心参数
  • Executor框架
    • 两级调度模型
    • 框架结构:核心API
    • 框架成员
  • ThreadPoolExecutor详解
  • 性能优化
    • 为什么建议用ThreadPoolExecutor自定义线程池而不使用默认提供的方法
    • 如何合理分配线程池大小
    • 线程池如何实现动态修改
    • 使用无界队列的线程池会导致什么问题?

学习资料

JAVA线程池

​ ==线程池:==是一种多线程处理形式(池化思想管理线程的工具),处理过程中将任务添加到队列中,然后创建线程后自动启动这些任务

​ 理解池化概念:数据库连接池、线程池等

1.Java线程池原理

为什么要使用线程池?

目的:为了解决并发场景下频繁地创建和销毁线程所带来的性能消耗

​ 如果并发请求资源很多,但每个线程时间很短,就会出现频繁的创建和销毁线程。如此一来,会大大降低系统的效率,可能频繁创建和销毁线程的时间、资源开销要大于实际工作的所需。

​ Java中的线程池是运用场景最多的并发框架几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
  • 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

线程池核心参数说明

image-20240529150934875

​ 线程池有两个线程数的设置,一个为核心线程数,一个为最大线程数。在创建完线程池之后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务。但有一种情况排除在外,就是调用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法的话,可以提前创建等于核心线程数的线程数量,这种方式被称为预热,在抢购系统中就经常被用到

image-20240529164200004

线程池的状态

image-20240529160224995

RUNNING:这是最正常的状态,接受新的任务,处理等待队列中的任务

SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务

STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程

TIDYING:所有的任务都销毁了,workerCount 为0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()

TERMINATED:terminated()方法结束后,线程池的状态就会变成这个

线程池的实现原理?

​ 当提交一个任务到线程池之后,线程池处理这个任务的流程如下

image-20240528194523790

流程分析:依次判断:核心线程池、队列、线程池 是否已满(任务提交时,对应判断顺序为:corePoolSize => workQueue => maximumPoolSize

  • 首先会判断核心线程池是否已满
    • 如果没有满,则直接创建新线程来执行任务
    • 如果已经满了,则进行下一步判断,尝试将任务放入队列
  • 判断队列是否已经满了
    • 如果没有满,则任务放入队列,结束
    • 如果队列已经满了,则进行下一步判断
  • 判断线程池是否已经满
    • 如果没满,则创建新线程来执行任务
    • 如果已经满了,则按照饱和策略来处理任务

概念扩展:结合线程池实现原理的一些概念理解

问题1:核心线程池 VS 线程池 有什么不同?

​ 将线程池拆解为:核心线程池+非核心线程池,则核心线程池可以看做是线程池的一个子集

​ 非核心线程池在没有任务且空闲一段时间之后会进行销毁,而核心线程池不会销毁

​ 从场景化理解为:核心线程池是正式员工(常驻处理任务),非核心线程池是外包(临时救急);当任务做不完(队列满了),则找外包接手任务,一旦任务做完了就过段时间辞退外包

问题2:如何理解线程池无法保证线程的执行顺序这一概念?

​ 此处先判断队列后判断线程池,思考一种情况,当进入一个任务,判断队列如果已经满了,则会进入下一步线程池判断,此时线程池如果没满则会直接创建线程执行任务(直接执行当下提交的任务),那么就可能存在一种情况:后进入的任务会被线程池直接进行处理,其会优先于存储在队列中的任务先执行完,也就解释了其不保证顺序的概念

Executor框架

​ Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制

1.Exexutor框架的两级调度模型

​ 在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程 也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

​ 在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器 (Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这种两级调度模型的示意图所示。

image-20240529081301365

应用程序通过Executor框架控制上层的调度,而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制

2.Executor框架结构

Executor框架的3大部分

  • 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口

  • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口

    • Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
  • 异步计算的结果:包括接口Future和实现Future接口的FutureTask类。

类和接口的简介:核心API

  • Executor接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来
    • ExecutorService继承Executor接口(扩展能力):支持有返回值的线程、支持管理线程的生命周期
      • ScheduledExecutorService继承ExecutorService接口(扩展能力):支持定期执行任务
      • AbstractExecutorService:ExecutorService接口的默认实现
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令
  • Future接口和实现Future接口的FutureTask类,代表异步计算的结果
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled- ThreadPodExecutor执行

类和接口的使用示意图

image-20240529081804049

流程分析:

  • 主线程首先要创建实现Runnable或者Callable接口的任务对象
    • 工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task, Object resule))
  • 执行Runnable对象/Callable对象
    • 方式1:把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command);
    • 方式2:把Runnable对象或Callable对象提交给ExecutorService执行(Executor- Service.submit(Runnable task)ExecutorService.submit(Callable<T>task))
  • 如果执行ExecutorService.submit(...),ExecutorService将返回一个实现Future接口的对象FutureTask 。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行
  • 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel (boolean mayInterruptlfRunning)来取消此任务的执行

3.Executor框架成员

结合三大核心去记忆

Executor框架成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors

任务

任务:Runnable接口/Callable接口:实现这两个任一接口的任务对象可以被线程池执行,主要区别在与是否有结果返回

  • Runnable接口中的run方法没有返回值,无法返回一个结果给调用者
  • Callable接口中的call方法有返回值(可以是一个任意类型的对象),因此Callable任务可以计算结果并返回给调用者

任务的执行

线程实现类

ThreadPoolExecutor线程池的实现类:是Executor框架最核心的类

Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool、CachedThreadPool

  • FixedThreadPool: 一种使用固定线程数的线程池,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器
  • SingleThreadExecutor:一种使用单个线程数的线程池,适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景
  • CachedThreadPool:一种会根据需要创建新线程的线程池,是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者 是负载较轻的服务器

ScheduledThreadPooExecutor线程池实现类:一种用于定时任务或周期任务的线程池

Executors可以创建2种类型的ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor。适用于需要多个后台线程执行周期任务,同时为了满足资源 管理的需求而需要限制后台线程的数量的应用场景
  • SingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor。适用于需要单个后台线程执行周期任务,同时需要保证顺 序地执行各个任务的应用场景

概念补充扩展

问题1:如何理解负载比较重?

​ 负载比较重就是指服务器资源占用率比较高(例如内存占用高、CPU负载高等),这种情况需要限制下线程数量,否则无限开启更多线程的话容易拖垮服务器

异步计算的结果

Future接口:Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。当把Runnable 接口或Callable接口的实现类提交(submit)给线程池之后,可以拿到一个实现了Future接口的对象返回值

  • 提交Runnable任务到ExecutorService时,这个方法的返回类型是Future<?>(实际上是一个占位符,因为Runnable本身不返回任何值)。这里的Future对象主要用来检查计算是否完成,但没有实际的返回值

  • 提交Callable任务时,submit(Callable<T>task)方法会返回一个Future<T>对象,通过这个Future对象可以获取到Callable任务计算的结果

Executor框架详解

1.ThreadPoolExecutor详解

线程池的核心参数(构造函数参数列表分析)

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
核心参数(关注核心)说明
corePoolSize核心线程池的大小
maximumPoolSize线程池可创建线程的最大个数
keepAliveTime空闲线程的存活时间
unit时间单位(为keepAliveTime指定时间单位)
workQueue阻塞队列(用于保存任务)
threadFactory创建线程的工程类
handler饱和策略(拒绝策略)

核心概念

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成

  • corePoolsize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个核心线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于“线程池基本大小”时就不再创建。如果调用了线程池的prestartAlICoreThreads()方法, 线程池会提前创建并启动所有基本线程。

  • blockingQueue(任务队列):用于保存等待执行的任务的阻塞队列

    • ArrayBlockingQueue:一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高
    • LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool 使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • ==maximumPoolsize(线程池最大数量)==线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。Java线程池框架提供了以下4种策略。

    • AbortPolicy:直接抛出异常(RejectedExecutionException)
    • CallerRunsPolicy:使用调用者所在线程来运行任务
    • DiscardOldestPolicy:丟弃队列里最旧的一个任务,并重试提交当前任务
    • DiscardPolicy:不处理新任务,直接丢弃掉
    • 当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率

  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)

线程池的执行示意图:

image-20240529085116004

ThreadPoolExecutor执行execute方法情况分析

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务
  • 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务
  • 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

概念补充

问题1:任务队列大小设置?

​ 没有一个固定值参考,一般是要结合实际业务场景:任务量级、任务产生速度、执行速度等,设置太大、太小导致的问题做一个折中的经验值参考,最好是可以通过运行过程中动态调整大小。

问题2:不同任务队列的性能对比?

吞吐量概念:此处吞吐量可以理解为抗压能力,简单理解为单位时间能处理的任务数量

吞吐量对比:

  • LinkedBlockingQueue > ArrayBlockingQueue:因为链表结构实现队列比数据结构实现队列更快捷
  • SynchronousQueue > LinkedBlockingQueue:因为SynchronousQueue不存储任务,所以用它实现的线程池都是不断增加线程来承接更多的任务,不用放队列等待,这种线程池比较适合短平快的任务场景,因此吞吐量会高(但还需考虑系统负载是否能够承接无限制的线程新增所带来的影响)。

如何理解为什么生产完一个元素只能等待一个线程消费完才能继续生产的吞吐量会更高?

​ 基于理想的情况,以取快递场景分析,如果每个人排着队手把手从快递员中接收快递,那个快递发放的速度一定是最快的,单位时间内发放最多。但是结合实际场景分析,快递员可以提升发放速度,但是接收快递的人不一定取得很快(也就是生产者和消费者速度不匹配,基于这种场景所谓的高吞吐是有瓶颈的)。引入“快递驿站”实际上就是构建缓冲区,快递员将快递放在驿站,然后用户自行安排时间去接收,这种场景的收发速度显然会比前面场景要慢,即吞吐量自然会小。

​ SynchronousQueue这种高吞吐量的特性是以牺牲一定的灵活性和缓冲能力为代价的,它要求生产者和消费者的速度相匹配,以此减少数据流转开销,进而提高吞吐量。(如果生产者和消费者的速度不匹配,则不太适合使用这个队列)。此处说明的是在一般情况下SynchronousQueue具备高吞吐的特性更加适合高吞吐量的场景,而不是强调它的性能更佳。

问题3:如何理解keepAliveTime的设定?

​ keepAliveTime参数控制的是如果线程空闲了,多久后将其回收。

​ 通过调大时间,提高线程的利用率;其核心是通过调大时间防止频繁地创建和释放线程。

​ 这里的时间设定不应过小或过大,其整体是在寻找一个平衡,让任务多的时候不用频繁地创建和销毁线程、任务少的时候不至于空闲等太久而浪费资源

使用示例

public class ThreadPoolExecutorDemo {

    // 自定义线程
    static class MyThread implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 执行");
        }
    }

    public static void main(String[] args) {
        // 创建线程池(核心线程数5,线程池最大数量10,线程空闲时存活时长500)
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 1000; i++) {
            threadPoolExecutor.execute(new MyThread());
            String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行完别的任务数目:%s",
                    threadPoolExecutor.getPoolSize(),
                    threadPoolExecutor.getQueue().size(),
                    threadPoolExecutor.getCompletedTaskCount());
            System.out.println(info);
        }
        threadPoolExecutor.shutdown();
    }
}

线程池中的submit()和execute()方法有什么区别?

两者都是用于开启线程执行池中的任务,不同点结合3个方面阐述:接收参数、返回值、异常处理

  • 接收参数:execute方法只能执行Runnable类型任务;而submit方法可以执行Runnable和Callable接口
  • 返回值:submit方法可返回持有计算结果的Future对象,而execute没有
  • 异常处理:submit方便Execption处理(因为可以根据sumbit的返回值获取到异常信息)

2.常用的ThreadPoolExecutor解析

​ 查看java.util.concurrent.Executor类源码实现,结合案例场景理解记忆每种线程池的特性

ThreadPoolExecutor 运行流程

image-20240529153132395

​ 线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

​ 线程池的运行主要分成两部分:任务管理、线程管理。

  • 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
    • (1)直接申请线程执行该任务
    • (2)缓冲到队列中等待线程执行
    • (3)拒绝该任务

  • 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收

📌FixedThreadPool(固定大小的线程池)

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

​ FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads.

​ 当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。

​ FixedThreadPool执行execute执行示意图:

image-20240529091750836

  • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
  • 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  • 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。FixedThreadPoo使用无界3.队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响
    • a.当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中 的线程数不会超过corePoolSize
    • b.由于1,使用无界队列时maximumPoolSize将是一个无效参数
    • c.由于1和2,使用无界队列时keepAliveTime将是一个无效参数
    • d.由于使用无界队列,运行中的FixedThreadPo0(未执行方法shutdown()或 shutdownNow()不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)
// 创建一个固定大小的线程池
public class NewFixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行任务" + finalI);
                }
            });
        }
        es.shutdown();
    }

}

// output
pool-1-thread-2 执行任务1
pool-1-thread-2 执行任务3
pool-1-thread-1 执行任务0
pool-1-thread-1 执行任务5
pool-1-thread-1 执行任务6
pool-1-thread-2 执行任务4
pool-1-thread-3 执行任务2
pool-1-thread-2 执行任务8
pool-1-thread-1 执行任务7
pool-1-thread-3 执行任务9

📌SingleThreadPool(单线程线程池)

​ SingleThreadPool:使用单个worker线程的Executor

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

​ SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列 对线程池带来的影响与FixedThreadPool相同

SingleThreadExecutor的execute()方法的执行示意图:

image-20240529092156094

  • 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务
  • 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue
  • 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

​ newSingleThreadExecutor:创建一个单线程的线程池

只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它

​ 单工作线程最大的特点是:可保证顺序地执行各个任务(指定顺序:FIFO、LIFO、优先级)。

// 创建一个单线程的线程池
public class NewSingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newSingleThreadExecutor();
        // 模拟多线程操作
        for(int i=0;i<10;i++){
            int finalI = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    // 模拟任务执行操作
                    System.out.println(Thread.currentThread().getName() + " 执行" + finalI) ;
                }
            });
        }
        es.shutdown();
    }
}

// output
pool-1-thread-1 执行任务0
pool-1-thread-1 执行任务1
pool-1-thread-1 执行任务2
pool-1-thread-1 执行任务3
pool-1-thread-1 执行任务4
pool-1-thread-1 执行任务5
pool-1-thread-1 执行任务6
pool-1-thread-1 执行任务7
pool-1-thread-1 执行任务8
pool-1-thread-1 执行任务9

📌CachedThreadPool(可缓存的线程池)

​ CachedThreadPool:会根据需要创建线程的线程池

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

​ CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPodl中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

​ CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CachedThreadPool的execute()方法的执行示意图:

image-20240529092513929

​ 步骤1:首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPodl中有空闲线程 正在执行SynchronousQueue.poll(keepAliveTime,TimeUni.NANOSECONDS),那么主线程执行 ofer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤2)。

​ 步骤2:当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime, TimeUnt.NANOSECONDS)。这种情况下,步骤1将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。

​ 步骤3:在步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,Time-Unit.NANOSECONDS)。这个pol操作会让空闲线程最多在SynchronousQueue上等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执 行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

​ SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一 个线程的对应移除操作,反之亦然CachedThreadPool使用SynchronousQueue把主线程提交的任务传递给空闲线程执行。

image-20240529092652260

// 创建一个可缓存的线程池
public class NewCachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行任务" + finalI);
                }
            });
        }
        es.shutdown();
    }
}

// output
pool-1-thread-2 执行任务1
pool-1-thread-4 执行任务3
pool-1-thread-3 执行任务2
pool-1-thread-1 执行任务0
pool-1-thread-4 执行任务8
pool-1-thread-8 执行任务7
pool-1-thread-7 执行任务6
pool-1-thread-6 执行任务5
pool-1-thread-5 执行任务4
pool-1-thread-3 执行任务9
  
  
  
// 放大任务数:摘取部分结果分析
pool-1-thread-115 执行任务884
pool-1-thread-304 执行任务878
pool-1-thread-16 执行任务496
pool-1-thread-183 执行任务692
pool-1-thread-303 执行任务877
pool-1-thread-126 执行任务478
pool-1-thread-302 执行任务876
pool-1-thread-155 执行任务576
pool-1-thread-300 执行任务874

​ 可以试着放大任务数,进一步确认结果,可以看到这种线程池是通过不断创建线程来支持任务的执行,它不会限制线程池大小,线程池的大小完全依赖于操作系统(JVM)可创建的最大线程大小。

​ 可以查看其底层代码实现是通过SynchronousQueue(一个不存储元素的阻塞队列)进行构建的,该队列的设定是每个插入操作必须等到另一个线程调用移除操作。因此在使用CachedThreadPool要注意控制任务数量,否则由于大量线程同时运行,很有可能造成系统瘫痪

📌ScheduleThreadPool(周期性执行的线程池)

​ ScheduleThreadPool:一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求

// 创建一个大小无限的线程池
public class NewScheduleThreadPoolDemo {
    public static void schedule(){
        ScheduledExecutorService ses  = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            ses.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行任务" + finalI);
                }
            },1, TimeUnit.SECONDS);
        }
        ses.shutdown();
    }

    public static void scheduleAtFixedRate(){
        ScheduledExecutorService ses  = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            ses.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行任务" + finalI);
                }
            },1,1, TimeUnit.SECONDS);
        }
        ses.shutdown();
    }

    public static void main(String[] args) {
        schedule();
//        scheduleAtFixedRate();
    }
}

WorkStealingPool

​ Java8中引入的线程池,其内部会构建ForkJoinPool,利用Work-Stealing算法,并发处理任务,不保证处理顺序

3.ScheduleThreadPoolExecutor详解

ScheduleThreadPoolExecutor运行机制

​ ScheduledThreadPoolExecutor继承自ThreadPoolExecutor.任务。ScheduledThreadPoolExecutor的执行示意图如下 它主要用来在给定的延迟之后运行任务,或者定期执行

image-20240529092833603

分析:

  • 使用DelayQueue做为队列:DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义
  • ScheduledThreadPoolExecutor的执行主要分为两大部分:
    • 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith- FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
    • 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务

ScheduleThreadPoolExecutor具体实现

ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中

  • ScheduledFutureTask:主要包含3个成员变量
    • long型成员变量time:表示这个任务将要被执行的具体时间
    • long型成员变量sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中 的序号。
    • long型成员变量period:表示任务执行的间隔周期。
  • DelayQueue:封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时按照(time+sequenceNumber)字段进行比较,小的排在前面。(也就是说,时间早的任务将被先执行,如果两个任务的执行时间相同,那么先提交的任务(sequenceNumber小)将被先执行)。

周期任务执行步骤

image-20240529093141371

步骤说明:

步骤1:线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time小于等于当前时间

步骤2:线程1执行这个ScheduledFutureTask

步骤3:线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间

步骤4:线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add()

DelayQueue.take()获取任务解析

​ 查看java.util.concurrent.DelayQueue源码

private final Condition available = lock.newCondition();

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    for (;;) {
      E first = q.peek();
      if (first == null)
        available.await();
      else {
        long delay = first.getDelay(NANOSECONDS);
        if (delay <= 0)
          return q.poll();
        first = null; // don't retain ref while waiting
        if (leader != null)
          available.await();
        else {
          Thread thisThread = Thread.currentThread();
          leader = thisThread;
          try {
            available.awaitNanos(delay);
          } finally {
            if (leader == thisThread)
              leader = null;
          }
        }
      }
    }
  } finally {
    if (leader == null && q.peek() != null)
      available.signal();
    lock.unlock();
  }
}

​ take()方法执行示意图&步骤解析

image-20240529094052238

通过分析源码和示意图可以看出,获取任务主要分成3步:

  • 步骤1:获取Lock
  • 步骤2:循环中获取周期任务:
    • 2.1:如果PriorityQueue为空,当前线程到Condition中等待;否则执行下面的2.2.
    • 2.2:如果PriorityQueue的头元素的time时间比当前时间大,到Condition中等待到time时间;否则执行下面的2.3
    • 2.3:获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒一个在Condition中等待的线程(2.3.2)
  • 步骤3:释放Lock

DelayQueue.add()添加任务解析

public boolean add(E e) {
  return offer(e);
}

public boolean offer(E e) {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    q.offer(e);
    if (q.peek() == e) {
      leader = null;
      available.signal();
    }
    return true;
  } finally {
    lock.unlock();
  }
}

add()方法执行示意图

image-20240529094252982

通过分析源码和示意图可以看出,添加任务主要分成3步:

  • 步骤1:获取Lock
  • 步骤2:添加任务
    • 2.1:向PriorityQueue添加任务
    • 2.2:如果2.1中添加的任务是PriorityQueue的头元素,唤醒一个在Condition中等待的线程
  • 步骤3:释放Lock

4.FutureTask详解

​ FutureTask 为Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。FutureTask常用来封装 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证

👻FutureTask概念

(1)FutureTask的三种状态

三种状态

  • 未启动:FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态
  • 已启动:FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态
  • 已完成:FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel..)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

状态转化

image-20240529094816316

FutureTask.get()、FutureTask.cancel()方法执行示意图

image-20240529095013853

分析:

FutureTask.get()分析

  • 当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
  • 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常

FutureTask.cancel分析

  • 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);
  • 当FutureTask处于已完成状态时,执行FutureTask.cancel(...)方法将返回false.
(2)FutureTask的使用

Future的使用方式

  • 方式1:把FutureTask交给Executor执行
  • 方式2:通过ExecutorService.submit(..)方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(..)方法
  • 方式3:单独使用FutureTask

👻FutureTask的实现

​ jdk1.7 开始的FutureTask有个说明:不再使用AQS作为FutureTask的同步

​ 修订原文翻译说明:这与这个类以前依赖AbstractQueuedsynchronizer的版本不同,主要是为了避免在取消竞争期间保留中断状态让用户感到意外。在当前的设计中,Sync控件依赖于通过CAS更新的“state”字段来跟踪完成,以及一个简单的Treiber堆栈来保存等待的线程

(1)FutureTask实现了Future接口的5个方法

boolean cancel(boolean mayInterruptlfRunning)

​ 尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptlifRunning将决定任务是否应 该中断执行该任务的线程,以尝试中断该任务。 ​ 如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true

boolean isCancelled()

​ 如果任务在正常结束之前被被取消返回true

booleanisDone()

​ 正常结束、异常或者被取消导致任务完成,将返回true

V get()

​ 阴塞等待任务结束,然后获取结果。如果任务在等待过程中被中断将抛出InterryptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException

V get(long timeout, TimeUnit unit)

​ 任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException

(2)FutureTask内部状态转换

state:表示当前任务的运行状态,FutureTask的所有方法都是围绕state开展的,state声明为volatile,保证了state的可见性,当对state进行修改时所有的线程都会看到。

​ 查看FutureTask源码:

public class FutureTask<V> implements RunnableFuture<V> {
 
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
  
    private Callable<V> callable;
  
    private Object outcome;
  
    private volatile Thread runner;
  
    private volatile WaitNode waiters;
       
}

状态解释:初始状态、中间状态、最终状态

NEW(初始状态):表示是个新的任务或者还没被执行完的任务

COMPLETING(中间状态):任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态

NORMAL(最终态:正常结束):任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL

EXCEPTIONAL(最终态:异常结束):任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL

CANCELLED(最终态:任务被取消):任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态

INTERRUPTING(中间状态):任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING

INTERRUPTED(最终态:任务出现中断):调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)

可能的状态过渡

  • NEW->COMPLETING->NORMAL(正常结束)
  • NEW->COMPLETING->EXCEPTIONAL(异常结束)
  • NEW->CANCELLED(任务被取消)
  • NEW->INTERRUPTING->INTERRUPTED(任务出现中断)

线程池应用

1.异常处理

​ 在线程池中,对异常执行的任务可以手动做异常处理

处理方式1:主动捉异常

public static void handleByCatch(){
  ExecutorService es = Executors.newFixedThreadPool(1);
  es.submit(()->{
    int res = 1/0;
  });
  es.shutdown();
}

// output:分别测试调用submit、execute两种情况下任务出现异常时JAVA默认的处理机制
# submit:无任何异常提示
  
# execute:抛出异常提示
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at com.noob.threadPool.ThreadPoolExceptionDemo.lambda$handleByCatch$0(ThreadPoolExceptionDemo.java:12)
  • submit:任务执行异常,不抛出任何异常,也不捕获异常,程序执行结束(实际上是任务失败的状态)
  • execute:抛出异常

实际业务场景中,如果使用submit方法,则需要关注对任务异常情况的处理,例如主动捕获异常信息并处理

public static void handleByCatch(){
  ExecutorService es = Executors.newFixedThreadPool(1);
  es.submit(()->{
    // 异常处理
    try{
      int res = 1/0;
    }catch (Exception e){
      System.out.println("任务执行失败,请检查异常信息" + e.getMessage());
    }
  });
  es.shutdown();
}

处理方式2:使用Future

public static void handleByFuture() throws Exception {
  ExecutorService es = Executors.newFixedThreadPool(1);
  Future<Boolean> future = es.submit(() -> {
    int res = 1 / 0;
    // lambda表达式中要有返回值,编译器才会将其识别为Callable(否则识别为Runnable则无法用Future)
    return true;
  });

  // 方法中如果出现异常,则调用Future的get()方法会返回这个异常,否则返回正常信息
  System.out.println("任务处理结果:" + future.get());
  es.shutdown();
}

2.动态线程池配置

  • 如何修改:线程池提供了部分setter方法可以设置线程池的参数

    • 修改核心线程数,最大线程数,空闲线程停留时间,拒绝策略等
    • 可以将线程池的配置参数放入配置中心,当需要调整的时候,去配置中心修改就行
  • 什么时候修改呢?

    • 需要监控报警策略,获取线程池状态指标,当指标判定为异常之后进行报警
    • 分析指标异常原因,评估处理策略,然后进行动态修改

线程池的最佳实践

线程池性能调优问题:为什么推荐使用ThreadPoolExecutor自定义线程池,而不是使用Executors提供的工厂方法创建线程池

​ Executors 利用工厂模式实现的四种线程池,在使用的时候需要结合生产环境下的实际场景。

​ 一般情况下不太推荐使用它们,因为选择使用 Executors 提供的工厂类,将会忽略很多线程池的参数设置,工厂类一旦选择设置默认参数,就很容易导致无法调优参数设置,从而产生性能问题或者资源浪费

1.计算线程数量

一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,计算线程数的方法也不一样。

​ **CPU 密集型任务:**这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

​ **I/O 密集型任务:**这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,可以多配置一些线程,具体的计算方法是 2N

业务场景:通过一个线程池实现向用户定时推送消息的业务,该如何设置线程池的数量呢

参考以下公式来计算线程数:

线程数=N(CPU核数)*(1+WT(线程等待时间)/ST(线程时间运行时间))

可以通过 JDK 自带的工具 VisualVM 来查看 WT/ST 比例,以下例子是基于运行纯 CPU 运算的例子:

WT(线程等待时间)= 36788ms [线程运行总时间] - 36788ms[ST(线程时间运行时间)]= 0

线程数=N(CPU核数)*(1+ 0 [WT(线程等待时间)]/36788ms[ST(线程时间运行时间)])= N(CPU核数)

这跟之前通过 CPU 密集型的计算公式 N+1 所得出的结果差不多。综合来看,我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测,逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量

线程池核心线程数的设置多少不仅仅依赖cpu核数和执行时间,还有线程执行的资源,比如调用的db,db连接数有限,线程数太多就可能打满db连接(做性能压测的时候会经常遇到这种情况,当某条SQL比较耗时的时候,如果线程数设置过大,就会出现不能打开DB连接的异常)

​ 在一些非核心业务,可以将核心线程数设置小一些,最大线程数量设置为计算线程数量。在一些核心业务中,两者可以设置一样。阻塞队列可以根据具体业务场景设置,如果线程处理业务非常迅速,可以考虑将阻塞队列设置大一些,处理的请求吞吐量会大些;如果线程处理业务非常耗时,阻塞队列设置小些,防止请求在阻塞队列中等待过长时间而导致请求已超时

2.建议使用有界阻塞队列

​ 不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列

以newFixedThreadPool使用了LinkedBlockingQueue:如果线程获取一个任务,任务执行时间比较长,就会导致队列中的任务堆积越来越多,进而导致机器内存使用不断飙升,最终导致OOM问题

​ 《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。制订这条规则是因为容易导致生产事故,最典型的就是 newFixedThreadPoolnewCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

【示例】newFixedThreadPool OOM

ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
	threadPool.execute(() -> {
		String payload = IntStream.rangeClosed(1, 1000000)
			.mapToObj(__ -> "a")
			.collect(Collectors.joining("")) + UUID.randomUUID().toString();
		try {
			TimeUnit.HOURS.sleep(1);
		} catch (InterruptedException e) {
		}
		log.info(payload);
	});
}

threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newFixedThreadPool 使用的工作队列是 LinkedBlockingQueue ,而默认构造方法的 LinkedBlockingQueue 是一个 Integer.MAX_VALUE 长度的队列,可以认为是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。

【示例】newCachedThreadPool OOM

ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
	threadPool.execute(() -> {
		String payload = UUID.randomUUID().toString();
		try {
			TimeUnit.HOURS.sleep(1);
		} catch (InterruptedException e) {
		}
		log.info(payload);
	});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newCachedThreadPool 的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

​ 如果大量的任务进来后会创建大量的线程。线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM

3.重要任务应自定义拒绝策略

​ 使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3