java多線程:線程池原理、阻塞隊列

一、線程池定義和使用

jdk 1.5 之後就引入了線程池。

1.1 定義

從上面的空間切換看得出來,線程是稀缺資源,它的創建與銷燬是一個相對偏重且耗資源的操作,而Java線程依賴於內核線程,創建線程需要進行操作系統狀態切換。為避免資源過度消耗需要設法重用線程執行多個任務。線程池就是一個線程緩存,負責對線程進行統一分配、調優與監控。(數據庫連接池也是一樣的道理)

什麼時候使用線程池?

單個任務處理時間比較短;需要處理的任務數量很大。

線程池優勢?

重用存在的線程,減少線程創建、消亡的開銷,提高性能、提高響應速度。

當任務到達時,任務可以不需要等到線程創建就能立即執行。

提高線程的可管理性,可統一分配,調優和監控。

1.2 線程池在 jdk 已有的實現

在 juc 包下,有一個接口:Executor :

Executor 又有兩個子接口:ExecutorService 和 ScheduledExecutorService,常用的接口是 ExecutorService。

同時常用的線程池的工具類叫 Executors。

例如:

<code>ExecutorService service = Executors.newCachedThreadPool();/<code>

Executor 框架雖然提供瞭如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等創建線程池的方法,但都有其侷限性,不夠靈活。

上面的幾種方式點進去會發現,都是用 ThreadPoolExecutor 進行創建的:

newSingleThreadExecutor 字面意思

簡單線程執行器

newFixedThreadPool 字面意思

固定的線程池

,傳參就是線程固定數目,適用於執行長期任務的場景。

newCachedThreadPool 字面意思

緩存線程池

,核心線程0,最大線程非常大,動態創建的特點。

newScheduledThreadPool 字面意思

時間安排線程池

,指定核心線程數。

newSingleThreadScheduledExecutor 字面意思

單線程安排執行器

,也就是基於只有一個核心線程的執行器之外,又可以擴展。其中又用 DelegatedExecutorService 委託執行器服務進行了包裝。

可以看到,上面直接用 Executors 工具類默認的一些實現 new 出來的線程池都是用的 ThreadPoolExecutor 線程執行器這個類進行構造的,不過參數不同,導致了效果的側重點不同。

因此,自己創建線程池推薦的方法就是,直接使用 ThreadPoolExecutor 進行個性化的創建:

構造方法種的參數有 7 個:

corePoolSize:線程池維護線程的最少數量 (core :

核心

maximumPoolSize:線程池維護線程的

最大

數量,顯然必須>=1

keepAliveTime:線程池維護的

多餘的線程

所允許的

空閒時間

,最長可以空閒多久,時間到了,如果超過 corePoolSize 的線程一直空閒,他們就會被銷燬。

unit:線程池維護線程所允許的空閒時間的單位

workQueue:線程池所使用的

緩衝隊列

,已經提交但是沒有執行的任務會放進這裡

threadFactory:生成線程池種工作線程的線程

工廠

,一般使用默認

handler:線程池對

拒絕

任務的處理

策略

,當隊列滿且工作線程已經達到maximumPoolSize。

阿里的 java 開發手冊, 強制要求,通過 ThreadPoolExecutor 來自定義 ,不能使用內置的,避免資源耗盡。這個很好理解,1 的類型就只有一個核心線程和最大現場,2 沒有擴展性,3、4、5的最大線程數太大,內存會爆炸。

1.3 線程池使用方法

這裡我們用固定線程池來測試,傳入核心線程數為 5,最大數量自然就也是 5,

<code>public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5); try { //模擬10個顧客辦理業務 for (int i = 0; i < 10; i++){ //execute 執行方法,傳入參數為實現了 Runnable 接口的類 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"號線程辦理業務"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } }/<code>

其中,execute 方法就是將 任務提交 的方法,我們用 lambda 表達式給 execute 方法傳入了參數,實際上相當於一個完整的實現了 Runnable 接口的類。

執行結果:

可以看到,我們循環了 10 次,執行任務,但是線程只用到了 1-5 ,其中有多次 複用

再比如,我們按照各種類型的線程池,自己定義一個線程池,核心線程數 2, 最大線程數 5,阻塞隊列長度為 3:

<code>public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { //模擬10個顧客辦理業務 for (int i = 0; i < 10; i++){ //execute 執行方法,傳入參數為實現了 Runnable 接口的類 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"號線程辦理業務"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } }/<code>

同樣 10 個線程,執行起來:

可以看到,執行了 8 個任務後,就拋出了異常,說明執行了 拒絕策略

上面兩個示例,我們的任務本身都是沒有返回值的,如果創建的任務本身需要有返回值就需要實現 Callable 接口,然後搭配FutureTask 來傳入任務,那麼線程池就應該調用 submit 方法而不是 execute。

二、線程池底層原理

2.1 線程池執行邏輯

處理的流程核心就 execute() 方法,他接收一個實現了 Runnable 接口的任務,決定對這個任務的處理策略。

下圖是一個比較形象的策略流程:

可能的情況有四種,也就是圖中的1234:

如果線程池中的線程數量少於corePoolSize,就創建新的

核心線程

來執行新添加的任務

如果線程池中的線程數量大於等於corePoolSize,但隊列workQueue未滿,則將新添加的任務放到

隊列

workQueue中

如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的

非核心線程

來處理被添加的任務

如果線程池中的線程數量等於了maximumPoolSize,就用RejectedExecutionHandler來

執行拒絕策略

。會拋出異常,一般的拒絕策略是RejectedExecutionException

注意,執行的順序,在 java 裡有一個不合理的地方:

在池裡安排任務的時候,我們的核心線程,隊列,非核心線程裡面排的任務順序應該是 1 2 3;

但是真正實現上,如果三個都滿了,開始執行的時候,依次執行的順序卻是 核心線程,非核心線程,隊列。也就是執行順序會變成 1 3 2

2.2 拒絕策略

有些時候,我們並不希望拒絕策略是直接拋出異常,那麼 jdk 裡面提供的默認拒絕策略有 4 種,他們體現在代碼中就是 ThreadPoolExecutor 的四個靜態內部類:

2.2.1 CallerRunsPolicy:調用者運行策略。

這種策略不會拋棄任務,也不拋出異常,而是將某些任務回退給調用者,從而降低新任務的流量。

實現非常簡單,那就是如果說 e 這個線程池已經 shutdown 了,那麼就什麼也不幹,也就是這個任務直接丟了;否則,r.run() ,相當於調用這個方法的線程裡直接執行了這個 Runnable 任務。

此時我們可以把 1.3 裡的代碼修改一下,只修改策略為 CallerRunsPolicy:

可以看到,有些任務會在 main 線程裡處理。

2.2.2 AbortPolicy:終止策略。

拋異常。前面已經試過了,這個是默認的拒絕策略。

2.2.3 DiscardPolicy:丟棄任務。

可以看到,源碼裡就是是什麼也不做。如果場景中允許任務丟失,這個是最好的策略。

2.2.4 DiscardOldestPolicy:拋棄隊列中等待最久的任務。

拋棄隊列中等待最久的任務,然後把當前的任務加入隊列中,嘗試再次提交當前任務。

源碼裡也就是利用隊列操作,進行一次出隊操作,然後重新調用 execute 方法。

2.3 線程池的五種狀態

一個正常的線程的生命週期 區別開,這個是 線程池裡線程 的狀態。

Running,能接受新任務以及處理已添加的任務;

Shutdown,不接受新任務,可以處理已經添加的任務,也就是不能再調用execute或者submit了;

Stop,不接受新任務,不處理已經添加的任務,並且中斷正在處理的任務;

Tidying,所有的任務已經終止,CTL記錄的任務數量為0,CTL負責記錄線程池的運行狀態與活動線程數量;

Terminated,線程池徹底終止,則線程池轉變為terminated的狀態。

如圖所示,從running狀態轉換為 shutdown,調用 shutdown()方法;如果調用shutdownNow()方法,就直接會變成stop。

terminated()是鉤子函數,默認是什麼也不做的,我們可以重寫,然後決定結束之前要做一些別的處理邏輯。這個鉤子函數,就是模板模式的方法。

三、阻塞隊列

線程池裡的 BlockingQueue, 阻塞隊列 ,事實上在消費者生產者問題裡的管程法實現,我們的策略也是類似阻塞隊列的,用它來做一個緩存池的作用。

阻塞隊列:任意時刻,不管併發有多高,永遠保證 只有一個線程能夠進行隊列的入隊或出隊操作 。也就意味著他是能夠保證線程安全的。

另外,阻塞隊列分為有界和無界隊列,理論上來說一個是隊列的size有固定,另一個是無界的。對於有界隊列來說,如果隊列存滿,只能出隊了,入隊操作就只能阻塞。

在 juc 包裡,阻塞隊列的實現有很多:

ArrayBlockingQueue

:有界阻塞隊列;

LinkedBlockingQueue

:鏈表結構(大小默認值為Integer.MAX_VALUE)的阻塞隊列;

PriorityBlockingQueue:支持優先級排序的無界阻塞隊列;

DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列;

SynchronousQueue:不存儲元素的阻塞隊列,相當於只有一個元素;

LinkedTransferQueue:鏈表組成的無界阻塞隊列;

LinkedBlockingDeque:鏈表組成的雙向阻塞隊列。

對於 BlockingQueue 來說,核心操作主要有幾類:插入、刪除、查找。

其中的四種異常策略:

拋異常

:如果阻塞隊列滿,再往隊列裡 add 插入元素會拋 IllegalStateException:Queue full,如果阻塞隊列空,再 remove 就會

NoSuchElementException。

特殊值

:offer 方法:成功 true,失敗 false,poll 方法,成功就返回元素,沒有就返回 null。

阻塞

:阻塞隊列滿的時候,生產者線程繼續 put 元素,隊列就會阻塞直到可以 put 數據或者響應中斷然後退出,阻塞隊列空的時候,消費者線程繼續 take 元素,隊列就會一直阻塞直到有元素可以 take。

超時退出

:阻塞隊列滿的時候,會阻塞生產者線程且超時退出,空的時候會阻塞消費者線程且超時退出。

那麼使用的時候,增刪的方法按對應的同一組使用比較合理。(其實這個策略的設計對應的在單線程集合裡也有,那就是Deque接口的實現類 LinkedList 使用的時候,不同的增刪方法策略不同)