你要的线程池全都有

近期一位测试朋友参加了天猫的面试后,感慨大厂视角与二线互联网公司的差异,对候选人的要求不仅仅局限在测试方面,同时在架构及开发方面也进行了全面的摸底,其中重点提到了线程池中的核心类ThreadPoolExecutor,今天让我们从源码出发,来一起学习一下吧。

ThreadPoolExecutor构造方法

线程池核心类ThreadPoolExecutor的构造方法如下所示

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue,ThreadFactory threadFactory);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue,RejectedExecutionHandler handler);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

其中参数及意义如下:

  • corePoolSize:核心线程池的大小,如果核心线程池有空闲位置,新的任务就会被核心线程池新建一个线程执行。执行完毕后不会销毁线程,线程会进入缓存队列等待再次被运行
  • maximunPoolSize:线程池能创建最大的线程数量。如果核心线程池和缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否则会采取拒绝接受任务策略,下面会具体分析。
  • keepAliveTime:非核心线程能够空闲的最长时间,超过时间,线程终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用。
  • unit:时间单位,和keepAliveTime配合使用。
  • workQueue:缓存队列,用来存放等待被执行的任务,一般有三种方式ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
  • threadFactory:新线程的创建方式。
  • handler:拒绝处理策略。拒绝策略一共有四种,分别是ThreadPoolExecutor.AbortPolicy(抛出RejectedExecutionException)、 ThreadPoolExecutor.DiscardPolicy(什么也不做,直接忽略)、 ThreadPoolExecutor.DiscardOldestPolicy(丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置)和ThreadPoolExecutor.CallerRunsPolicy(直接由提交任务者执行这个任务 )。下述任一情况都会发生:1)Executor已经被关闭; 2)线程数量大于最大线程数和任务队列已经达到最大值

线程池的状态

线程池和线程一样拥有自己的状态,在ThreadPoolExecutor类中定义了一个volatile变量runState来表示线程池的状态。

线程池有四种状态,分别为RUNNING、SHURDOWN、STOP、TERMINATED。

线程池创建后处于RUNNING状态。

调用shutdown后处于SHUTDOWN状态,线程池不能接受新的任务,会等待任务队列的任务完成。

调用shutdownNow后处于STOP状态,线程池不能接受新的任务,并尝试终止正在执行的任务。

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

线程池的工作顺序

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

running, the Executor always prefers queuing a request rather than adding a new thread.

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

以上是来自ThreadPoolExecutor API Doc的描述,简单来说就是corePoolSize -> workQueue -> maximumPoolSize -> handler,接下来让我们从execute源码入手来探寻一下吧。

public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 if (runState == RUNNING && workQueue.offer(command)) {
 if (runState != RUNNING || poolSize == 0)
 ensureQueuedTaskHandled(command);
 }
 else if (!addIfUnderMaximumPoolSize(command))
 reject(command); 
 }
}

从上面代码可以看出,首先判断传入任务是否为空,如果为空则抛空指针异常,否则会执行下一个判断,如果当前线程的数量小于核心线程池大小,就执行addIfUnderCorePollSize(command)方法,在核心线程池创建新的线程,并且执行这个任务。我们来看一下addIfUnderCorePollSize的具体实现。

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
 Thread t = null;
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 if (poolSize < corePoolSize && runState == RUNNING)
 t = addThread(firstTask); //创建线程去执行firstTask任务 
 } finally {
 mainLock.unlock();
 }
 if (t == null)
 return false;
 t.start();
 return true;
}

这边又进行了一次判断,对线程池线程数量和核心线程池进行比较,前面execute()代码中已经判断过,这里为什么还要进行判断呢?

因为我们执行完Execute()中的判断后,可能有新的任务进来了,并且为这个任务在核心线程池创建了新的线程去执行,如果刚好这个时刻核心线程池满了,那么就不能再加入新的线程到核心线程池了。这种可能性是存在的,因为不知道cpu时间片会分配给谁,所以从安全角度出发要再判断一遍,至于线程池状态为什么也要判断,也是因为可能存在其他线程执行了shutdown或者shutdownNow方法,导致线程池状态不是RUNNING,那么线程池自然需要停止接收新的任务,也就不会创建新的线程去执行这个任务了。

接下来看一下t=addThread(firstTask)中addTread的实现代码吧。

private Thread addThread(Runnable firstTask) {
 Worker w = new Worker(firstTask);
 Thread t = threadFactory.newThread(w); //创建一个线程,执行任务 
 if (t != null) {
 w.thread = t; //将创建的线程的引用赋值为w的成员变量 
 workers.add(w); //将当前任务添加到任务集
 int nt = ++poolSize; //当前线程数加1 
 if (nt > largestPoolSize)
 largestPoolSize = nt;
 }
 return t;
}

这个方法返回类型是Thread,所以可以新建一个线程并执行任务,之后将线程对象返回给外面的线程对象,然后执行t.start(),这里有一个Worker对象接收了任务,接下来看一下Worker类的实现:

private final class Worker implements Runnable {
 private final ReentrantLock runLock = new ReentrantLock();
 private Runnable firstTask;
 volatile long completedTasks;
 Thread thread;
 Worker(Runnable firstTask) {
 this.firstTask = firstTask;
 }
 boolean isActive() {
 return runLock.isLocked();
 }
 void interruptIfIdle() {
 final ReentrantLock runLock = this.runLock;
 if (runLock.tryLock()) {
 try {
 if (thread != Thread.currentThread())
 thread.interrupt();
 } finally {
 runLock.unlock();
 }
 }
 }
 void interruptNow() {
 thread.interrupt();
 }
 
 private void runTask(Runnable task) {
 final ReentrantLock runLock = this.runLock;
 runLock.lock();
 try {
 if (runState < STOP &&
 Thread.interrupted() &&
 runState >= STOP)
 boolean ran = false;
 beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
 //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等 
 try {
 task.run();
 ran = true;
 afterExecute(task, null);
 ++completedTasks;
 } catch (RuntimeException ex) {
 if (!ran)
 afterExecute(task, ex);
 throw ex; }
 } finally {
 runLock.unlock();
 }
 }
 
 public void run() {
 try {
 Runnable task = firstTask;
 firstTask = null;
 while (task != null || (task = getTask()) != null) {
 runTask(task);
 task = null;
 }
 } finally {
 workerDone(this); //当任务队列中没有任务时,进行清理工作 
 }
 }
}

这个类实现了Runnable接口,所以会有run()方法,在run中执行的还是传入的任务,其实相当于调用传入任务对象的run方法,之所以费力气将任务对象加到Worker类中去执行,是因为这个线程执行之后会进入阻塞队列等待被执行,这个线程的生命并没有结束,这也正是使用线程池的最大原因。这里用一个Set集合存储Worker,这样不会有重复的任务被存储,firstTask被执行完后进入缓存队列,而这个新创建的线程就一直从缓存队列中拿到任务去执行。这个方法为getTask(),所以接下来看看线程如何从缓存队列拿到任务。

Runnable getTask() {
 for (;;) {
 try {
 int state = runState;
 if (state > SHUTDOWN)
 return null;
 Runnable r;
 if (state == SHUTDOWN) // Help drain queue
 r = workQueue.poll();
 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
 //则通过poll取任务,若等待一定的时间取不到任务,则返回null
 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
 else
 r = workQueue.take();
 if (r != null)
 return r;
 if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出
 if (runState >= SHUTDOWN) // Wake up others
 interruptIdleWorkers(); //中断处于空闲状态的worker
 return null;
 }
 // Else retry } catch (InterruptedException ie) {
 // On interruption, re-check runState
 }
 }
}

从上面代码可知,如果核心线程池中创建的线程想要拿到缓存队列中的任务,需要先要判断线程池的状态,如果STOP或者TERMINATED,返回NULL,如果是RUNNING或者SHUTDOWN,则从缓存队列中拿到任务去执行。以上就是核心线程池执行任务的原理。

如果线程数量超过核心线程池大小呢?回到executor()方法

if (runState == RUNNING && workQueue.offer(command))

如果线程数量超过核心线程池大小,先进行线程池状态的判断,如果是RUNNING,则将新进来的线程加入缓存队列。如果失败,往往是因为缓存队列满了或者线程池状态不是RUNNING,就通过addIfUnderMaximumPoolSize(command)直接创建新的线程去执行任务,但是这个线程不是核心线程池中的,是临时扩展的,要保证线程数最大不超过线程池大小 maximumPoolSize,如果超过执行 reject(command)操作,拒绝接受新的任务。

如果任务已经加入缓存队列成功还要继续进行判断

if (runState != RUNNING || poolSize == 0)

这是为了防止在将任务加入缓存队列的同时其他线程调用shutdown或者shutdownNow方法,所以采取了保护措施。

addIfUnderMaximumPoolSize方法和addIfUnderCorePoolSize基本类似,只是方法中判断条件改变了,是在缓冲队列满了并且线程池状态是在RUNNING状态下才会执行,里面的判断条件是线程池数量小于线程池最大容量,并且线程池状态是RUNNING。

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
 Thread t = null;
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 if (poolSize < maximumPoolSize && runState == RUNNING)
 t = addThread(firstTask);
 } finally {
 mainLock.unlock();
 }
 if (t == null)
 return false;
 t.start();
 return true;
}

综上所述,总结如下:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

线程池的正确使用方法

1、避免使用无界队列,不要使用Executors.newXXXThreadPool()快捷方法创建线程池,因为这种方式会使用无界的任务队列,为避免OOM,我们应该使用ThreadPoolExecutor的构造方法手动指定队列的最大长度。

2、明确拒绝任务时的行为,任务队列总有占满的时候,这时需要通过RejectedExecutionHandler设置正确的拒绝,线程池默认的拒绝行为是AbortPolicy,也就是抛出RejectedExecutionHandler异常,该异常是非受检异常,很容易忘记捕获。如果不关心任务被拒绝的事件,可以将拒绝策略设置成DiscardPolicy,这样多余的任务会悄悄的被忽略。

3、获取处理结果和异常,根据结果及异常,进行相关处理。

以上是关于线程池核心类ThreadPoolExecutor的相关知识,掌握了这些,基本上可以驾驭面试过程中的这部分问题了。

ps:其他文章可以关注微信公众号测试架构师养成记,还有价值999的资料可以领哦~


分享到:


相關文章: