深入分析線程池的實現原理

www.jianshu.com/p/704a6c5d337c

一.概述

線程池,顧名思義就是存放線程的池子,池子裡存放了很多可以複用的線程。

如果不用類似線程池的容器,每當我們需要執行用戶任務的時候都去創建新的線程,任務執行完之後線程就被回收了,這樣頻繁地創建和銷燬線程會浪費大量的系統資源。

因此,線程池通過線程複用機制,並對線程進行統一管理,具有以下優點:

  • 降低系統資源消耗。通過複用已存在的線程,降低線程創建和銷燬造成的消耗;
  • 提高響應速度。當有任務到達時,無需等待新線程的創建便能立即執行;
  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗大量系統資源,還會降低系統的穩定性,使用線程池可以進行對線程進行統一的分配、調優和監控。

ThreadPoolExecutor是線程池框架的一個核心類,本文通過對ThreadPoolExecutor源碼的分析(基於JDK 1.8),來深入分析線程池的實現原理。

二.ThreadPoolExecutor類的屬性

先從ThreadPoolExecutor類中的字段開始:

深入分析線程池的實現原理

在ThreadPoolExecutor類的這些屬性中,線程池狀態是控制線程池生命週期至關重要的屬性,這裡就以線程池狀態為出發點進行研究。

通過上面的源碼可知,線程池的運行狀態總共有5種,其值和含義分別如下:

  • RUNNING: 高3位為111,接受新任務並處理阻塞隊列中的任務
  • SHUTDOWN: 高3位為000,不接受新任務但會處理阻塞隊列中的任務
  • STOP:高3位為001,不會接受新任務,也不會處理阻塞隊列中的任務,並且中斷正在運行的任務
  • TIDYING: 高3位為010,所有任務都已終止,工作線程數量為0,線程池將轉化到TIDYING狀態,即將要執行terminated()鉤子方法
  • TERMINATED: 高3位為011,terminated()方法已經執行結束

然而,線程池中並沒有使用單獨的變量來表示線程池的運行狀態,而是使用一個AtomicInteger類型的變量ctl來表示線程池的控制狀態,其將線程池運行狀態與工作線程的數量打包在一個整型中,用高3位來表示線程池的運行狀態,低29位來表示線程池中工作線程的數量,對ctl的操作主要參考以下幾個函數:

深入分析線程池的實現原理

接下來,我們看一下線程池狀態的所有轉換情況,如下:

  • RUNNING -> SHUTDOWN:調用shutdown(),可能在finalize()中隱式調用
  • (RUNNING or SHUTDOWN) -> STOP:調用shutdownNow()
  • SHUTDOWN -> TIDYING:當緩存隊列和線程池都為空時
  • STOP -> TIDYING:當線程池為空時
  • TIDYING -> TERMINATED:當terminated()方法執行結束時

通常情況下,線程池有如下兩種狀態轉換流程:

  • RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
  • RUNNING -> STOP -> TIDYING -> TERMINATED

三.ThreadPoolExecutor類的構造方法

通常情況下,我們使用線程池的方式就是new一個ThreadPoolExecutor對象來生成一個線程池。接下來,先看ThreadPoolExecutor類的構造函數:

深入分析線程池的實現原理

接下來,看下最後一個構造函數的具體實現:

深入分析線程池的實現原理

下面解釋下一下構造器中各個參數的含義:

1.corePoolSize

線程池中的核心線程數。當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等於corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行。

2.maximumPoolSize

線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize。

3.keepAliveTime

線程空閒時的存活時間。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,如果一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime參數也會起作用,直到線程池中的線程數為0。

4.unit

keepAliveTime參數的時間單位。

5.workQueue

任務緩存隊列,用來存放等待執行的任務。如果當前線程數為corePoolSize,繼續提交的任務就會被保存到任務緩存隊列中,等待被執行。

一般來說,這裡的BlockingQueue有以下三種選擇:

  • SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態。因此,如果線程池中始終沒有空閒線程(任務提交的平均速度快於被處理的速度),可能出現無限制的線程增長。
  • LinkedBlockingQueue:基於鏈表結構的阻塞隊列,如果不設置初始化容量,其容量為Integer.MAX_VALUE,即為無界隊列。因此,如果線程池中線程數達到了corePoolSize,且始終沒有空閒線程(任務提交的平均速度快於被處理的速度),任務緩存隊列可能出現無限制的增長。
  • ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務。

6.threadFactory

線程工廠,創建新線程時使用的線程工廠。

7.handler

任務拒絕策略,當阻塞隊列滿了,且線程池中的線程數達到maximumPoolSize,如果繼續提交任務,就會採取任務拒絕策略處理該任務,線程池提供了4種任務拒絕策略:

  • AbortPolicy:丟棄任務並拋出RejectedExecutionException異常,默認策略;
  • CallerRunsPolicy:由調用execute方法的線程執行該任務;
  • DiscardPolicy:丟棄任務,但是不拋出異常;
  • DiscardOldestPolicy:丟棄阻塞隊列最前面的任務,然後重新嘗試執行任務(重複此過程)。

當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。

四.線程池的實現原理

1.提交任務

線程池框架提供了兩種方式提交任務,submit()和execute(),通過submit()方法提交的任務可以返回任務執行的結果,通過execute()方法提交的任務不能獲取任務執行的結果。

submit()方法的實現有以下三種:

<code>public Future> submit(Runnable task);
public Future submit(Runnable task, T result);
public Future submit(Callable task);
/<code>

下面以第一個方法為例簡單看一下submit()方法的實現:

<code>public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}/<void>/<code>

submit()方法是在ThreadPoolExecutor的父類AbstractExecutorService類實現的,最終還是調用的ThreadPoolExecutor類的execute()方法,下面著重看一下execute()方法的實現。

深入分析線程池的實現原理

execute()方法的執行流程可以總結如下:

  • 若線程池工作線程數量小於corePoolSize,則創建新線程來執行任務
  • 若工作線程數量大於或等於corePoolSize,則將任務加入BlockingQueue
  • 若無法將任務加入BlockingQueue(BlockingQueue已滿),且工作線程數量小於maximumPoolSize,則創建新的線程來執行任務
  • 若工作線程數量達到maximumPoolSize,則創建線程失敗,採取任務拒絕策略

可以結合下面的兩張圖來理解線程池提交任務的執行流程。

深入分析線程池的實現原理

2.創建線程

從execute()方法的實現可以看出,addWorker()方法主要負責創建新的線程並執行任務,代碼實現如下:

深入分析線程池的實現原理


深入分析線程池的實現原理

因為代碼(1)處的邏輯不利於理解,我們通過(1)的等價實現來理解:

<code>if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
//等價實現
rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())/<code>

其含義為,滿足下列條件之一則直接返回false,線程創建失敗:

  • rs > SHUTDOWN,也就是STOP,TIDYING或TERMINATED,此時不再接受新的任務,且中斷正在執行的任務
  • rs = SHUTDOWN且firstTask != null,此時不再接受任務,但是仍會處理任務緩存隊列中的任務
  • rs = SHUTDOWN,隊列為空

多說一句,若線程池處於 SHUTDOWN, firstTask 為 null,且 workQueue 非空,那麼還得創建線程繼續處理任務緩存隊列中的任務。

總結一下,addWorker()方法完成了如下幾件任務:

  1. 原子性的增加workerCount
  2. 將用戶給定的任務封裝成為一個worker,並將此worker添加進workers集合中
  3. 啟動worker對應的線程
  4. 若線程啟動失敗,回滾worker的創建動作,即從workers中移除新添加的worker,並原子性的減少workerCount

3.工作線程的實現

從addWorker()方法的實現可以看出,工作線程的創建和啟動都跟ThreadPoolExecutor中的內部類Worker有關。下面我們分析Worker類來看一下工作線程的實現。

Worker類繼承自AQS類,具有鎖的功能;實現了Runable接口,可以將自身作為一個任務在線程中執行。

<code>private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable/<code>

Worker的主要字段就下面三個,代碼也比較簡單。

<code>//用來封裝worker的線程,線程池中真正運行的線程,通過線程工廠創建而來
final Thread thread;
//worker所對應的第一個任務,可能為空
Runnable firstTask;
//記錄當前線程完成的任務數
volatile long completedTasks;/<code>

Worker的構造函數如下。

<code>Worker(Runnable firstTask) {
//設置AQS的state為-1,在執行runWorker()方法之前阻止線程中斷
setState(-1);
//初始化第一個任務
this.firstTask = firstTask;
//利用指定的線程工廠創建一個線程,注意,參數是Worker實例本身this
//也就是當執行start方法啟動線程thread時,真正執行的是Worker類的run方法
this.thread = getThreadFactory().newThread(this);
}/<code>

Worker類繼承了AQS類,重寫了其相應的方法,實現了一個自定義的同步器,實現了不可重入鎖。

深入分析線程池的實現原理

Worker類還提供了一箇中斷線程thread的方法。

<code>void interruptIfStarted() {
Thread t;
//AQS狀態大於等於0,worker對應的線程不為null,且該線程沒有被中斷
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}/<code>

再來看一下Worker類的run()方法的實現,會發現run()方法最終調用了ThreadPoolExecutor類的runWorker()方法。

<code>public void run() {
runWorker(this);
}/<code>

4.線程複用機制

通過上文可以知道,worker中的線程start 後,執行的是worker的run()方法,而run()方法最終會調用ThreadPoolExecutor類的runWorker()方法,runWorker()方法實現了線程池中的線程複用機制。下面我們來看一下runWorker()方法的實現。

深入分析線程池的實現原理

runWorker()方法是線程池的核心,實現了線程池中的線程複用機制,來看一下

runWorker()方法都做了哪些工作:

  1. 運行第一個任務firstTask之後,循環調用getTask()方法獲取任務,不斷從任務緩存隊列獲取任務並執行;
  2. 獲取到任務之後就對worker對象加鎖,保證線程在執行任務的過程中不會被中斷,任務執行完會釋放鎖;
  3. 在執行任務的前後,可以根據業務場景重寫beforeExecute()和afterExecute()等Hook方法;
  4. 執行通過getTask()方法獲取到的任務
  5. 線程執行結束後,調用processWorkerExit()方法執行結束線程的一些清理工作

從runWorker()方法的實現可以看出,runWorker()方法中主要調用了getTask()方法和processWorkerExit()方法,下面分別看一下這兩個方法的實現。

getTask()的實現

getTask()方法用來不斷地從任務緩存隊列獲取任務並交給線程執行,下面分析一下其實現。

深入分析線程池的實現原理

接下來總結一下getTask()方法會在哪些情況下返回:

  1. 線程池處於RUNNING狀態,阻塞隊列不為空,返回成功獲取的task對象
  2. 線程池處於SHUTDOWN狀態,阻塞隊列不為空,返回成功獲取的task對象
  3. 線程池狀態大於等於STOP,返回null,回收線程
  4. 線程池處於SHUTDOWN狀態,並且阻塞隊列為空,返回null,回收線程
  5. worker數量大於maximumPoolSize,返回null,回收線程
  6. 線程空閒時間超時,返回null,回收線程

processWorkerExit()的實現

processWorkerExit()方法負責執行結束線程的一些清理工作,下面分析一下其實現。

深入分析線程池的實現原理

processWorkerExit()方法中主要調用了tryTerminate()方法,下面看一下tryTerminate()方法的實現。

深入分析線程池的實現原理

tryTerminate()方法的作用是嘗試終止線程池,它會在所有可能終止線程池的地方被調用,滿足終止線程池的條件有兩個:首先,線程池狀態為STOP,或者為SHUTDOWN且任務緩存隊列為空;其次,工作線程數量為0。

滿足了上述兩個條件之後,tryTerminate()方法獲取全局鎖,設置線程池運行狀態為TIDYING,之後執行terminated()鉤子方法,最後設置線程池狀態為TERMINATED。

至此,線程池運行狀態變為TERMINATED,工作線程數量為0,workers已清空,且workQueue也已清空,所有線程都執行結束,線程池的生命週期到此結束。

5.關閉線程池

關閉線程池有兩個方法,shutdown()和shutdownNow(),下面分別看一下這兩個方法的實現。

shutdown()的實現

shutdown()方法將線程池運行狀態設置為SHUTDOWN,此時線程池不會接受新的任務,但會處理阻塞隊列中的任務。

深入分析線程池的實現原理

shutdown()方法首先會檢查是否具有shutdown的權限,然後設置線程池的運行狀態為SHUTDOWN,之後中斷所有空閒的worker,再調用onShutdown()鉤子方法,最後嘗試終止線程池。

shutdown()方法調用了interruptIdleWorkers()方法中斷所有空閒的worker,其實現如下。

深入分析線程池的實現原理

shutdownNow()的實現

shutdownNow()方法將線程池運行狀態設置為STOP,此時線程池不會接受新任務,也不會處理阻塞隊列中的任務,並且中斷正在運行的任務。

深入分析線程池的實現原理

shutdownNow()方法與shutdown()方法相似,不同之處在於,前者設置線程池的運行狀態為STOP,之後中斷所有的worker(並非只是空閒的worker),嘗試終止線程池之後,返回任務緩存隊列中等待執行的任務列表。

shutdownNow()方法調用了interruptWorkers()方法中斷所有的worker(並非只是空閒的worker),其實現如下。

深入分析線程池的實現原理

五.總結

至此,我們已經閱讀了線程池框架的核心類ThreadPoolExecutor類的大部分源碼,由衷地讚歎這個類很多地方設計的巧妙之處:

  • 將線程池的運行狀態和工作線程數量打包在一起,並使用了大量的位運算
  • 使用CAS操作更新線程控制狀態ctl,確保對ctl的更新是原子操作
  • 內部類Worker類繼承了AQS,實現了一個自定義的同步器,實現了不可重入鎖
  • 使用while循環自旋地從任務緩存隊列中獲取任務並執行,實現了線程複用機制
  • 調用interrupt()方法中斷線程,但注意該方法並不能直接中斷線程的運行,只是發出了中斷信號,配合BlockingQueue的take(),poll()方法的使用,打斷線程的阻塞狀態

其實,線程池的本質就是生產者消費者模式,線程池的調用者不斷向線程池提交任務,線程池裡面的工作線程不斷獲取這些任務並執行(從任務緩存隊列獲取任務或者直接執行任務)。

讀完本文,相信大家對線程池的實現原理有了深刻的認識,比如向線程池提交一個任務之後線程池的執行流程,一個任務從被提交到被執行會經歷哪些過程,一個工作線程從被創建到正常執行到執行結束的執行過程,等等。


對了,在這裡說一下,我目前是在職Java開發,如果你現在正在學習Java,瞭解Java,渴望成為一名合格的Java開發工程師,在入門學習Java的過程當中缺乏基礎入門的視頻教程,可以關注並私信我:01。獲取。我這裡有最新的Java基礎全套視頻教程。


分享到:


相關文章: