你要的線程池全都有

近期一位測試朋友參加了天貓的面試後,感慨大廠視角與二線互聯網公司的差異,對候選人的要求不僅僅侷限在測試方面,同時在架構及開發方面也進行了全面的摸底,其中重點提到了線程池中的核心類ThreadPoolExecutor,今天讓我們從源碼出發,來一起學習一下吧。

ThreadPoolExecutor構造方法

線程池核心類ThreadPoolExecutor的構造方法如下所示

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue,ThreadFactory threadFactory);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue,RejectedExecutionHandler handler);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

其中參數及意義如下:

  • corePoolSize:核心線程池的大小,如果核心線程池有空閒位置,新的任務就會被核心線程池新建一個線程執行。執行完畢後不會銷燬線程,線程會進入緩存隊列等待再次被運行
  • maximunPoolSize:線程池能創建最大的線程數量。如果核心線程池和緩存隊列都已經滿了,新的任務進來就會創建新的線程來執行。但是數量不能超過maximunPoolSize,否則會採取拒絕接受任務策略,下面會具體分析。
  • keepAliveTime:非核心線程能夠空閒的最長時間,超過時間,線程終止。這個參數默認只有在線程數量超過核心線程池大小時才會起作用。只要線程數量不超過核心線程大小,就不會起作用。
  • unit:時間單位,和keepAliveTime配合使用。
  • workQueue:緩存隊列,用來存放等待被執行的任務,一般有三種方式ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
  • threadFactory:新線程的創建方式。
  • handler:拒絕處理策略。拒絕策略一共有四種,分別是ThreadPoolExecutor.AbortPolicy(拋出RejectedExecutionException)、 ThreadPoolExecutor.DiscardPolicy(什麼也不做,直接忽略)、 ThreadPoolExecutor.DiscardOldestPolicy(丟棄執行隊列中最老的任務,嘗試為當前提交的任務騰出位置)和ThreadPoolExecutor.CallerRunsPolicy(直接由提交任務者執行這個任務 )。下述任一情況都會發生:1)Executor已經被關閉; 2)線程數量大於最大線程數和任務隊列已經達到最大值

線程池的狀態

線程池和線程一樣擁有自己的狀態,在ThreadPoolExecutor類中定義了一個volatile變量runState來表示線程池的狀態。

線程池有四種狀態,分別為RUNNING、SHURDOWN、STOP、TERMINATED。

線程池創建後處於RUNNING狀態。

調用shutdown後處於SHUTDOWN狀態,線程池不能接受新的任務,會等待任務隊列的任務完成。

調用shutdownNow後處於STOP狀態,線程池不能接受新的任務,並嘗試終止正在執行的任務。

當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置為TERMINATED狀態。

線程池的工作順序

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

running, the Executor always prefers queuing a request rather than adding a new thread.

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

以上是來自ThreadPoolExecutor API Doc的描述,簡單來說就是corePoolSize -> workQueue -> maximumPoolSize -> handler,接下來讓我們從execute源碼入手來探尋一下吧。

public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 if (runState == RUNNING && workQueue.offer(command)) {
 if (runState != RUNNING || poolSize == 0)
 ensureQueuedTaskHandled(command);
 }
 else if (!addIfUnderMaximumPoolSize(command))
 reject(command); 
 }
}

從上面代碼可以看出,首先判斷傳入任務是否為空,如果為空則拋空指針異常,否則會執行下一個判斷,如果當前線程的數量小於核心線程池大小,就執行addIfUnderCorePollSize(command)方法,在核心線程池創建新的線程,並且執行這個任務。我們來看一下addIfUnderCorePollSize的具體實現。

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
 Thread t = null;
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 if (poolSize < corePoolSize && runState == RUNNING)
 t = addThread(firstTask); //創建線程去執行firstTask任務 
 } finally {
 mainLock.unlock();
 }
 if (t == null)
 return false;
 t.start();
 return true;
}

這邊又進行了一次判斷,對線程池線程數量和核心線程池進行比較,前面execute()代碼中已經判斷過,這裡為什麼還要進行判斷呢?

因為我們執行完Execute()中的判斷後,可能有新的任務進來了,並且為這個任務在核心線程池創建了新的線程去執行,如果剛好這個時刻核心線程池滿了,那麼就不能再加入新的線程到核心線程池了。這種可能性是存在的,因為不知道cpu時間片會分配給誰,所以從安全角度出發要再判斷一遍,至於線程池狀態為什麼也要判斷,也是因為可能存在其他線程執行了shutdown或者shutdownNow方法,導致線程池狀態不是RUNNING,那麼線程池自然需要停止接收新的任務,也就不會創建新的線程去執行這個任務了。

接下來看一下t=addThread(firstTask)中addTread的實現代碼吧。

private Thread addThread(Runnable firstTask) {
 Worker w = new Worker(firstTask);
 Thread t = threadFactory.newThread(w); //創建一個線程,執行任務 
 if (t != null) {
 w.thread = t; //將創建的線程的引用賦值為w的成員變量 
 workers.add(w); //將當前任務添加到任務集
 int nt = ++poolSize; //當前線程數加1 
 if (nt > largestPoolSize)
 largestPoolSize = nt;
 }
 return t;
}

這個方法返回類型是Thread,所以可以新建一個線程並執行任務,之後將線程對象返回給外面的線程對象,然後執行t.start(),這裡有一個Worker對象接收了任務,接下來看一下Worker類的實現:

private final class Worker implements Runnable {
 private final ReentrantLock runLock = new ReentrantLock();
 private Runnable firstTask;
 volatile long completedTasks;
 Thread thread;
 Worker(Runnable firstTask) {
 this.firstTask = firstTask;
 }
 boolean isActive() {
 return runLock.isLocked();
 }
 void interruptIfIdle() {
 final ReentrantLock runLock = this.runLock;
 if (runLock.tryLock()) {
 try {
 if (thread != Thread.currentThread())
 thread.interrupt();
 } finally {
 runLock.unlock();
 }
 }
 }
 void interruptNow() {
 thread.interrupt();
 }
 
 private void runTask(Runnable task) {
 final ReentrantLock runLock = this.runLock;
 runLock.lock();
 try {
 if (runState < STOP &&
 Thread.interrupted() &&
 runState >= STOP)
 boolean ran = false;
 beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據
 //自己需要重載這個方法和後面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等 
 try {
 task.run();
 ran = true;
 afterExecute(task, null);
 ++completedTasks;
 } catch (RuntimeException ex) {
 if (!ran)
 afterExecute(task, ex);
 throw ex; }
 } finally {
 runLock.unlock();
 }
 }
 
 public void run() {
 try {
 Runnable task = firstTask;
 firstTask = null;
 while (task != null || (task = getTask()) != null) {
 runTask(task);
 task = null;
 }
 } finally {
 workerDone(this); //當任務隊列中沒有任務時,進行清理工作 
 }
 }
}

這個類實現了Runnable接口,所以會有run()方法,在run中執行的還是傳入的任務,其實相當於調用傳入任務對象的run方法,之所以費力氣將任務對象加到Worker類中去執行,是因為這個線程執行之後會進入阻塞隊列等待被執行,這個線程的生命並沒有結束,這也正是使用線程池的最大原因。這裡用一個Set集合存儲Worker,這樣不會有重複的任務被存儲,firstTask被執行完後進入緩存隊列,而這個新創建的線程就一直從緩存隊列中拿到任務去執行。這個方法為getTask(),所以接下來看看線程如何從緩存隊列拿到任務。

Runnable getTask() {
 for (;;) {
 try {
 int state = runState;
 if (state > SHUTDOWN)
 return null;
 Runnable r;
 if (state == SHUTDOWN) // Help drain queue
 r = workQueue.poll();
 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閒時間,
 //則通過poll取任務,若等待一定的時間取不到任務,則返回null
 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
 else
 r = workQueue.take();
 if (r != null)
 return r;
 if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
 if (runState >= SHUTDOWN) // Wake up others
 interruptIdleWorkers(); //中斷處於空閒狀態的worker
 return null;
 }
 // Else retry } catch (InterruptedException ie) {
 // On interruption, re-check runState
 }
 }
}

從上面代碼可知,如果核心線程池中創建的線程想要拿到緩存隊列中的任務,需要先要判斷線程池的狀態,如果STOP或者TERMINATED,返回NULL,如果是RUNNING或者SHUTDOWN,則從緩存隊列中拿到任務去執行。以上就是核心線程池執行任務的原理。

如果線程數量超過核心線程池大小呢?回到executor()方法

if (runState == RUNNING && workQueue.offer(command))

如果線程數量超過核心線程池大小,先進行線程池狀態的判斷,如果是RUNNING,則將新進來的線程加入緩存隊列。如果失敗,往往是因為緩存隊列滿了或者線程池狀態不是RUNNING,就通過addIfUnderMaximumPoolSize(command)直接創建新的線程去執行任務,但是這個線程不是核心線程池中的,是臨時擴展的,要保證線程數最大不超過線程池大小 maximumPoolSize,如果超過執行 reject(command)操作,拒絕接受新的任務。

如果任務已經加入緩存隊列成功還要繼續進行判斷

if (runState != RUNNING || poolSize == 0)

這是為了防止在將任務加入緩存隊列的同時其他線程調用shutdown或者shutdownNow方法,所以採取了保護措施。

addIfUnderMaximumPoolSize方法和addIfUnderCorePoolSize基本類似,只是方法中判斷條件改變了,是在緩衝隊列滿了並且線程池狀態是在RUNNING狀態下才會執行,裡面的判斷條件是線程池數量小於線程池最大容量,並且線程池狀態是RUNNING。

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
 Thread t = null;
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 if (poolSize < maximumPoolSize && runState == RUNNING)
 t = addThread(firstTask);
 } finally {
 mainLock.unlock();
 }
 if (t == null)
 return false;
 t.start();
 return true;
}

綜上所述,總結如下:

  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  • 如果當前線程池中的線程數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
  • 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

線程池的正確使用方法

1、避免使用無界隊列,不要使用Executors.newXXXThreadPool()快捷方法創建線程池,因為這種方式會使用無界的任務隊列,為避免OOM,我們應該使用ThreadPoolExecutor的構造方法手動指定隊列的最大長度。

2、明確拒絕任務時的行為,任務隊列總有佔滿的時候,這時需要通過RejectedExecutionHandler設置正確的拒絕,線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。

3、獲取處理結果和異常,根據結果及異常,進行相關處理。

以上是關於線程池核心類ThreadPoolExecutor的相關知識,掌握了這些,基本上可以駕馭面試過程中的這部分問題了。

ps:其他文章可以關注微信公眾號測試架構師養成記,還有價值999的資料可以領哦~


分享到:


相關文章: