一、線程池定義和使用
jdk 1.5 之後就引入了線程池。
1.1 定義
從上面的空間切換看得出來,線程是稀缺資源,它的創建與銷燬是一個相對偏重且耗資源的操作,而Java線程依賴於內核線程,創建線程需要進行操作系統狀態切換。為避免資源過度消耗需要設法重用線程執行多個任務。線程池就是一個線程緩存,負責對線程進行統一分配、調優與監控。(數據庫連接池也是一樣的道理)
什麼時候使用線程池?
單個任務處理時間比較短;需要處理的任務數量很大。
線程池優勢?
重用存在的線程,減少線程創建、消亡的開銷,提高性能、提高響應速度。
當任務到達時,任務可以不需要等到線程創建就能立即執行。
提高線程的可管理性,可統一分配,調優和監控。
1.2 線程池在 jdk 已有的實現
在 juc 包下,有一個接口:Executor :
Executor 又有兩個子接口:ExecutorService 和 ScheduledExecutorService,常用的接口是 ExecutorService。
同時常用的線程池的工具類叫 Executors。
例如:
<code>ExecutorService service = Executors.newCachedThreadPool();/<code>
Executor 框架雖然提供瞭如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等創建線程池的方法,但都有其侷限性,不夠靈活。
上面的幾種方式點進去會發現,都是用 ThreadPoolExecutor 進行創建的:
newSingleThreadExecutor 字面意思
簡單線程執行器
。
newFixedThreadPool 字面意思
固定的線程池
,傳參就是線程固定數目,適用於執行長期任務的場景。
newCachedThreadPool 字面意思
緩存線程池
,核心線程0,最大線程非常大,動態創建的特點。
newScheduledThreadPool 字面意思
時間安排線程池
,指定核心線程數。
newSingleThreadScheduledExecutor 字面意思
單線程安排執行器
,也就是基於只有一個核心線程的執行器之外,又可以擴展。其中又用 DelegatedExecutorService 委託執行器服務進行了包裝。
可以看到,上面直接用 Executors 工具類默認的一些實現 new 出來的線程池都是用的 ThreadPoolExecutor 線程執行器這個類進行構造的,不過參數不同,導致了效果的側重點不同。
因此,自己創建線程池推薦的方法就是,直接使用 ThreadPoolExecutor 進行個性化的創建:
構造方法種的參數有 7 個:
corePoolSize:線程池維護線程的最少數量 (core :
核心
)
maximumPoolSize:線程池維護線程的
最大
數量,顯然必須>=1
keepAliveTime:線程池維護的
多餘的線程
所允許的
空閒時間
,最長可以空閒多久,時間到了,如果超過 corePoolSize 的線程一直空閒,他們就會被銷燬。
unit:線程池維護線程所允許的空閒時間的單位
workQueue:線程池所使用的
緩衝隊列
,已經提交但是沒有執行的任務會放進這裡
threadFactory:生成線程池種工作線程的線程
工廠
,一般使用默認
handler:線程池對
拒絕
任務的處理
策略
,當隊列滿且工作線程已經達到maximumPoolSize。
阿里的 java 開發手冊, 強制要求,通過 ThreadPoolExecutor 來自定義 ,不能使用內置的,避免資源耗盡。這個很好理解,1 的類型就只有一個核心線程和最大現場,2 沒有擴展性,3、4、5的最大線程數太大,內存會爆炸。
1.3 線程池使用方法
這裡我們用固定線程池來測試,傳入核心線程數為 5,最大數量自然就也是 5,
<code>public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5); try { //模擬10個顧客辦理業務 for (int i = 0; i < 10; i++){ //execute 執行方法,傳入參數為實現了 Runnable 接口的類 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"號線程辦理業務"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } }/<code>
其中,execute 方法就是將 任務提交 的方法,我們用 lambda 表達式給 execute 方法傳入了參數,實際上相當於一個完整的實現了 Runnable 接口的類。
執行結果:
可以看到,我們循環了 10 次,執行任務,但是線程只用到了 1-5 ,其中有多次 複用 。
再比如,我們按照各種類型的線程池,自己定義一個線程池,核心線程數 2, 最大線程數 5,阻塞隊列長度為 3:
<code>public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { //模擬10個顧客辦理業務 for (int i = 0; i < 10; i++){ //execute 執行方法,傳入參數為實現了 Runnable 接口的類 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"號線程辦理業務"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } }/<code>
同樣 10 個線程,執行起來:
可以看到,執行了 8 個任務後,就拋出了異常,說明執行了 拒絕策略 。
上面兩個示例,我們的任務本身都是沒有返回值的,如果創建的任務本身需要有返回值就需要實現 Callable 接口,然後搭配FutureTask 來傳入任務,那麼線程池就應該調用 submit 方法而不是 execute。
二、線程池底層原理
2.1 線程池執行邏輯
處理的流程核心就 execute() 方法,他接收一個實現了 Runnable 接口的任務,決定對這個任務的處理策略。
下圖是一個比較形象的策略流程:
可能的情況有四種,也就是圖中的1234:
如果線程池中的線程數量少於corePoolSize,就創建新的
核心線程
來執行新添加的任務
如果線程池中的線程數量大於等於corePoolSize,但隊列workQueue未滿,則將新添加的任務放到
隊列
workQueue中
如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的
非核心線程
來處理被添加的任務
如果線程池中的線程數量等於了maximumPoolSize,就用RejectedExecutionHandler來
執行拒絕策略
。會拋出異常,一般的拒絕策略是RejectedExecutionException
注意,執行的順序,在 java 裡有一個不合理的地方:
在池裡安排任務的時候,我們的核心線程,隊列,非核心線程裡面排的任務順序應該是 1 2 3;
但是真正實現上,如果三個都滿了,開始執行的時候,依次執行的順序卻是 核心線程,非核心線程,隊列。也就是執行順序會變成 1 3 2
2.2 拒絕策略
有些時候,我們並不希望拒絕策略是直接拋出異常,那麼 jdk 裡面提供的默認拒絕策略有 4 種,他們體現在代碼中就是 ThreadPoolExecutor 的四個靜態內部類:
2.2.1 CallerRunsPolicy:調用者運行策略。
這種策略不會拋棄任務,也不拋出異常,而是將某些任務回退給調用者,從而降低新任務的流量。
實現非常簡單,那就是如果說 e 這個線程池已經 shutdown 了,那麼就什麼也不幹,也就是這個任務直接丟了;否則,r.run() ,相當於調用這個方法的線程裡直接執行了這個 Runnable 任務。
此時我們可以把 1.3 裡的代碼修改一下,只修改策略為 CallerRunsPolicy:
可以看到,有些任務會在 main 線程裡處理。
2.2.2 AbortPolicy:終止策略。
拋異常。前面已經試過了,這個是默認的拒絕策略。
2.2.3 DiscardPolicy:丟棄任務。
可以看到,源碼裡就是是什麼也不做。如果場景中允許任務丟失,這個是最好的策略。
2.2.4 DiscardOldestPolicy:拋棄隊列中等待最久的任務。
拋棄隊列中等待最久的任務,然後把當前的任務加入隊列中,嘗試再次提交當前任務。
源碼裡也就是利用隊列操作,進行一次出隊操作,然後重新調用 execute 方法。
2.3 線程池的五種狀態
和 一個正常的線程的生命週期 區別開,這個是 線程池裡線程 的狀態。
Running,能接受新任務以及處理已添加的任務;
Shutdown,不接受新任務,可以處理已經添加的任務,也就是不能再調用execute或者submit了;
Stop,不接受新任務,不處理已經添加的任務,並且中斷正在處理的任務;
Tidying,所有的任務已經終止,CTL記錄的任務數量為0,CTL負責記錄線程池的運行狀態與活動線程數量;
Terminated,線程池徹底終止,則線程池轉變為terminated的狀態。
如圖所示,從running狀態轉換為 shutdown,調用 shutdown()方法;如果調用shutdownNow()方法,就直接會變成stop。
terminated()是鉤子函數,默認是什麼也不做的,我們可以重寫,然後決定結束之前要做一些別的處理邏輯。這個鉤子函數,就是模板模式的方法。
三、阻塞隊列
線程池裡的 BlockingQueue, 阻塞隊列 ,事實上在消費者生產者問題裡的管程法實現,我們的策略也是類似阻塞隊列的,用它來做一個緩存池的作用。
阻塞隊列:任意時刻,不管併發有多高,永遠保證 只有一個線程能夠進行隊列的入隊或出隊操作 。也就意味著他是能夠保證線程安全的。
另外,阻塞隊列分為有界和無界隊列,理論上來說一個是隊列的size有固定,另一個是無界的。對於有界隊列來說,如果隊列存滿,只能出隊了,入隊操作就只能阻塞。
在 juc 包裡,阻塞隊列的實現有很多:
ArrayBlockingQueue
:有界阻塞隊列;
LinkedBlockingQueue
:鏈表結構(大小默認值為Integer.MAX_VALUE)的阻塞隊列;
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列;
DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列;
SynchronousQueue:不存儲元素的阻塞隊列,相當於只有一個元素;
LinkedTransferQueue:鏈表組成的無界阻塞隊列;
LinkedBlockingDeque:鏈表組成的雙向阻塞隊列。
對於 BlockingQueue 來說,核心操作主要有幾類:插入、刪除、查找。
其中的四種異常策略:
拋異常
:如果阻塞隊列滿,再往隊列裡 add 插入元素會拋 IllegalStateException:Queue full,如果阻塞隊列空,再 remove 就會
拋
NoSuchElementException。
特殊值
:offer 方法:成功 true,失敗 false,poll 方法,成功就返回元素,沒有就返回 null。
阻塞
:阻塞隊列滿的時候,生產者線程繼續 put 元素,隊列就會阻塞直到可以 put 數據或者響應中斷然後退出,阻塞隊列空的時候,消費者線程繼續 take 元素,隊列就會一直阻塞直到有元素可以 take。
超時退出
:阻塞隊列滿的時候,會阻塞生產者線程且超時退出,空的時候會阻塞消費者線程且超時退出。
那麼使用的時候,增刪的方法按對應的同一組使用比較合理。(其實這個策略的設計對應的在單線程集合裡也有,那就是Deque接口的實現類 LinkedList 使用的時候,不同的增刪方法策略不同)
關鍵字: ThreadPoolExecutor execute 接口