线程池

Java的线程池是通过Executor框架实现的,在该框架中用到了Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future、FutureTask这几个核心类。

为什么用线程池?介绍下线程池的几个参数

线程池的主要作用是线程复用、线程资源管理、控制操作系统的最大并发数,以保证系统高效(通过线程资源复用)和安全(通过控制最大并发线程数)的运行。

使用线程池可以降低资源消耗、提高响应速度、提高线程的可管理性,线程池ThreadPoolExecutor参数如下:

  • corePoolSize:线程池中核心线程数,任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize:线程池中最大线程数。
  • keepAliveTime:当前线程数量大于核心线程数时,空闲线程的等待时间。
  • unit:keepAliveTime的时间单位。
  • workQueue:任务队列,被提交但尚未被执行的任务存放的地方。
  • threadFactory:线程工厂,用于创建线程,可使用默认的线程工厂或自定义线程工厂。
  • handler:拒绝策略,由于任务过多或其他原因导致线程池无法处理时的任务拒绝策略。

创建线程池的方法

创建线程池的方法:通过**Executors工厂方法创建和通过new ThreadPoolExecutor方法**创建

  • Executors工厂方法创建,在工具类Executors提供了一些静态的工厂方法

    newFixedThreadPool:创建固定大小的线程池。

    newCachedThreadPool:创建一个带缓冲的线程池。创建一个不限制线程数量的线程池,任何提交的任务都将立即执行,但是空闲线程会得到及时回收。

    newSingleThreadExecutor:创建一个单线程的线程池。

    newScheduledThreadPool:调度线程池,可以按照一定的周期执行任务,即定时任务。

  • new ThreadPoolExecutor创建:

new ThreadPoolExecutor(int corePoolSize,				//核心线程数
                       int maximumPoolSize,		//最大线程数
                    long keepAliveTime,		//当前线程数量大于核心线程数时,空闲线程的等待时间
                       TimeUnit unit,					//等待时间单位
                       BlockingQueue<Runnable> workQueue,	//存放任务的阻塞队列
                       ThreadFactory threadFactory,		//为线程池提供创建新线程的线程工厂
                       RejectedExecutionHandler handler)//拒绝策略

线程池的拒绝策略有哪些?

如果线程池中的核心线程被用完且阻塞队列已满,则此时线程池的线程资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。

  • AbortPolicy:直接抛出异常,阻止线程正常运行(默认的拒绝策略)。
  • CallerRunsPolicy:调用者执行策略。在新任务被加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。
  • DiscardOldestPolicy:抛弃最老任务策略。移除线程队列中最早的一个线程任务,并尝试提交当前任务。
  • DiscardPolicy:丢弃当前线程任务不做任何处理。
  • 自定义拒绝策略:实现RejectedExecutionHandler接口的rejectedExecution方法。

向线程池提交任务的两种方式

(1)调用execute()方法,例如:

//Executor接口中的方法
void execute(Runnable command);

(2)调用submit()方法

//ExecutorService接口中的方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

execute和submit方法的区别是什么?

  • 二者接收的参数不一样execute()方法只能接收Runnable类型,而submit()可以接收CallableRunnable两种类型。Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果。
  • submit()提交任务后会有返回值,而execute()没有:execute()方法主要用于启动任务的执行,而任务的执行结果和可能的异常调用者并不关心。submit()方法也用于启动任务的执行,但是启动之后会返回Future对象,代表一个异步执行实例,可以通过该异步执行实例去获取结果。
  • submit()方法Exception处理execute()方法在启动任务执行后,任务执行过程中可能发生的异常调用者并不关心。而通过submit()方法返回的Future对象(异步执行实例),可以进行异步执行过程中的异常捕获。

线程池的任务调度流程

线程池的任务调度流程(包含接收新任务和执行下一个任务)大致如下:

  1. 如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
  2. 如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
  3. 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
  4. 在核心线程数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务。
  5. 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时了,会为新任务执行拒绝策略。

总体的线程池的任务调度流程如下图所示:

image-20231029162156460

线程池参数详解

public ThreadPoolExecutor(int corePoolSize,				//核心线程数,即使线程空闲,也不会回收
                          int maximumPoolSize,		//线程数的上限
                          long keepAliveTime,			//线程最大空闲时长
                          TimeUnit unit,					//时间单位
                          BlockingQueue<Runnable> workQueue,//任务的排队队列
                          ThreadFactory threadFactory,			//新线程的产生方式
                          RejectedExecutionHandler handler)	//拒绝策略

1.核心和最大线程数量

参数corePoolSize用于设置核心(Core)线程数量,参数maximumPoolSize用于设置最大线程数量。线程池执行器根据corePoolSizemaximumPoolSize自动维护线程池中的工作线程,大致规则为:

  1. 当在线程池接收到新任务,并且当前工作线程少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求,直到线程数达到corePoolSize
  2. 如果当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程。通过设置corePoolSizemaximumPoolSize相同,可以创建一个固定大小的线程池。
  3. maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务。
  4. corePoolSizemaximumPoolSize不仅能在线程池构造时设置,也可以调用setCorePoolSize()setMaximumPoolSize()方法进行动态更改。

2.BlockingQueue

BlockingQueue(阻塞队列)的实例用于暂存接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。

3.keepAliveTime

线程构造器的keepAliveTime(空闲线程存活时间)参数用于设置池内线程最大Idle(空闲)时长(或者说保活时长),如果超过这个时间,默认情况下Idle、非Core线程会被回收。

如果池在使用过程中提交任务的频率变高,也可以调用方法setKeepAliveTime(long time, TimeUnit unit) 进行线程存活时间的动态调整,可以将时长延长。如果需要防止Idle线程被终止,可以将Idle时间设置为无限大,具体如下:

setkeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

默认情况下,Idle超时策略仅适用于存在超过corePoolSize线程的情况。但若调用了allowCoreThreadTimeOut(boolean)方法,并且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于核心线程。

4.ThreadFactory(线程工厂)

ThreadFactory是Java线程工厂接口:

public interface ThreadFactory {
		//创建一个新线程
    Thread newThread(Runnable r);
}

在调用ThreadFactory的唯一方法newThread()创建新线程时,可以更改所创建的新线程的名称、线程组、优先级、守护进程状态等。如果newThread()的返回值为null,表示线程工厂未能成功创建线程,线程池可能无法执行任何任务。

使用Executors创建新的线程时,也可以基于ThreadFactory(线程工厂)创建,在创建新线程池时可以指定将要使用的ThreadFactorry实例。只不过,如果没有指定的话,就会使用Executors.defaultThreadFactory默认实例。使用默认的线程工厂实例所创建的线程全部位于同一个ThreadGroup(线程组)中,具有相同的NORM_PRIORITY(优先级为5),而且都是非守护线程状态。

Executors为线程池工厂类,用于快捷创建线程池。

ThreadFactory为线程工厂类,用于创建线程(Thread)。

基于自定义的ThreadFactory实例创建线程池,首先需要实现一个ThreadFactory类,实现其唯一的方法newThread(Runnable r)

//一个简单的线程工厂
public class SimpleThreadFactory implements ThreadFactory {

    static AtomicInteger threadNo=new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable target) {
        String threadName="simpleThread-"+threadNo.get();
        System.out.println("创建一个线程,名称为:"+threadName);
        threadNo.incrementAndGet();
        //设置线程名称和异步执行目标
        Thread thread = new Thread(target, threadName);
        //设置为守护线程
        thread.setDaemon(true);
        return thread;
    }
    static class TargetTask implements Runnable{
        static AtomicInteger taskNo = new AtomicInteger(1);
        private String taskName;

        public TargetTask(){
            taskName="task-"+taskNo.get();
            taskNo.incrementAndGet();
        }
        @Override
        public void run() {
            System.out.println("任务:"+taskName+" doing");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(taskName+"运行结束.");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //使用自定义线程工厂快捷创建一个固定大小的线程池
        ExecutorService pool = Executors.newFixedThreadPool(2, new SimpleThreadFactory());
        for (int i = 0; i < 5; i++) {
            pool.submit(new TargetTask());
        }
        //等待10秒
        Thread.sleep(10000);
        System.out.println("关闭线程池");
        pool.shutdown();
    }
}

image-20231029181151378

从结果可以看到,新建池中的线程名称都不是默认的pool-1-thread-1的形式,是线程工厂更改后的形式。

确定线程池的线程数

1、按照任务类型对线程池进行分类

使用标准构造器ThreadPoolExecutor创建线程池时,会涉及线程数的配置,而线程数的配置与异步任务类型是分不开的。这里将线程池的异步任务大致分为以下三类:

(1)IO密集型任务

此类任务主要是执行IO操作。由于执行IO操作的时间较长,导致CPU的利用率不高,这类任务CPU常处于空闲状态。Netty的IO读写操作为此类任务的典型例子。

(2)CPU密集型任务

此类任务主要是执行计算任务。由于响应时间很快,CPU一直在运行,这种任务CPU得到利用率很高。

(3)混合型任务

此类任务既要执行逻辑计算,又要进行IO操作(如RPC调用、数据库访问)。相对来说,由于执行IO操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的CPU利用率也不是太高。Web服务器的HTTP请求处理操作为此类任务的典型例子。

一般情况下,针对以上不同类型的异步任务需要创建不同类型的线程池,并进行针对性的参数配置。

2、为IO密集型任务确定线程数

由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用其他线程继续使用CPU,以提高CPU的使用率。

3、为CPU密集型任务确定线程数

CPU密集型任务也叫计算密集型任务,其特定是要进行大量计算而需要消耗CPU资源。CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要最高效地利用CPU,CPU密集型任务并行执行地数量应当等于CPU的核心数

比如4隔核心的CPU,通过4个线程并行地执行4个CPU密集型任务,此时的效率是最高的。但是如果线程数远远超出CPU核心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。因此,对于CPU密集型的任务来说,线程数等于CPU核心数就行。

4、为混合型任务确定线程数

混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作(如RPC调用、数据库访问、网络通信等),所以混合型任务CPU的利用率不是太高,非CPU耗时往往是CPU耗时的两倍。比如在Web应用中处理HTTP请求时,一次请求处理会包括DB操作、RPC操作、缓存操作等多种耗时操作。一般来说,一次Web请求的CPU计算耗时往往较少,大致在100~500毫秒,而其他耗时操作会占用500~1000毫秒,甚至更多的时间。

在为混合型任务创建线程池时,如何确定线程数?业界有一个比较成熟的估算公式,如下:

最佳线程数=\frac{线程等待时间+线程CPU时间}{线程CPU时间}*CPU核数

经过简单的换算,以上公式可以进一步转换为:

最佳线程数=(线程等待时间与线程CPU时间之比+1)*CPU核数

通过公式可以看出:等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。

比如在Web服务器处理HTTP请求时,假设平均线程CPU运行时间为100毫秒,而线程等待时间(比如DB操作、RPC操作、缓存操作等)为900毫秒,如果CPU核数为8,那么根据上面整个公式,估算如下:

最佳线程数=\frac{900毫秒+100毫秒}{100毫秒}*8=10*8=80

经过计算,以上案例中需要的线程数为80。这些都是理论值,生产环境中仅供参考。