近期一位測試朋友參加了天貓的面試後,感慨大廠視角與二線互聯網公司的差異,對候選人的要求不僅僅侷限在測試方面,同時在架構及開發方面也進行了全面的摸底,其中重點提到了線程池中的核心類ThreadPoolExecutor,今天讓我們從源碼出發,來一起學習一下吧。
ThreadPoolExecutor構造方法
線程池核心類ThreadPoolExecutor的構造方法如下所示
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
其中參數及意義如下:
- corePoolSize:核心線程池的大小,如果核心線程池有空閒位置,新的任務就會被核心線程池新建一個線程執行。執行完畢後不會銷燬線程,線程會進入緩存隊列等待再次被運行
- maximunPoolSize:線程池能創建最大的線程數量。如果核心線程池和緩存隊列都已經滿了,新的任務進來就會創建新的線程來執行。但是數量不能超過maximunPoolSize,否則會採取拒絕接受任務策略,下面會具體分析。
- keepAliveTime:非核心線程能夠空閒的最長時間,超過時間,線程終止。這個參數默認只有在線程數量超過核心線程池大小時才會起作用。只要線程數量不超過核心線程大小,就不會起作用。
- unit:時間單位,和keepAliveTime配合使用。
- workQueue:緩存隊列,用來存放等待被執行的任務,一般有三種方式ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
- threadFactory:新線程的創建方式。
- handler:拒絕處理策略。拒絕策略一共有四種,分別是ThreadPoolExecutor.AbortPolicy(拋出RejectedExecutionException)、 ThreadPoolExecutor.DiscardPolicy(什麼也不做,直接忽略)、 ThreadPoolExecutor.DiscardOldestPolicy(丟棄執行隊列中最老的任務,嘗試為當前提交的任務騰出位置)和ThreadPoolExecutor.CallerRunsPolicy(直接由提交任務者執行這個任務 )。下述任一情況都會發生:1)Executor已經被關閉; 2)線程數量大於最大線程數和任務隊列已經達到最大值
線程池的狀態
線程池和線程一樣擁有自己的狀態,在ThreadPoolExecutor類中定義了一個volatile變量runState來表示線程池的狀態。
線程池有四種狀態,分別為RUNNING、SHURDOWN、STOP、TERMINATED。
線程池創建後處於RUNNING狀態。
調用shutdown後處於SHUTDOWN狀態,線程池不能接受新的任務,會等待任務隊列的任務完成。
調用shutdownNow後處於STOP狀態,線程池不能接受新的任務,並嘗試終止正在執行的任務。
當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置為TERMINATED狀態。
線程池的工作順序
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
以上是來自ThreadPoolExecutor API Doc的描述,簡單來說就是corePoolSize -> workQueue -> maximumPoolSize -> handler,接下來讓我們從execute源碼入手來探尋一下吧。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); } }
從上面代碼可以看出,首先判斷傳入任務是否為空,如果為空則拋空指針異常,否則會執行下一個判斷,如果當前線程的數量小於核心線程池大小,就執行addIfUnderCorePollSize(command)方法,在核心線程池創建新的線程,並且執行這個任務。我們來看一下addIfUnderCorePollSize的具體實現。
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //創建線程去執行firstTask任務 } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
這邊又進行了一次判斷,對線程池線程數量和核心線程池進行比較,前面execute()代碼中已經判斷過,這裡為什麼還要進行判斷呢?
因為我們執行完Execute()中的判斷後,可能有新的任務進來了,並且為這個任務在核心線程池創建了新的線程去執行,如果剛好這個時刻核心線程池滿了,那麼就不能再加入新的線程到核心線程池了。這種可能性是存在的,因為不知道cpu時間片會分配給誰,所以從安全角度出發要再判斷一遍,至於線程池狀態為什麼也要判斷,也是因為可能存在其他線程執行了shutdown或者shutdownNow方法,導致線程池狀態不是RUNNING,那麼線程池自然需要停止接收新的任務,也就不會創建新的線程去執行這個任務了。
接下來看一下t=addThread(firstTask)中addTread的實現代碼吧。
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //創建一個線程,執行任務 if (t != null) { w.thread = t; //將創建的線程的引用賦值為w的成員變量 workers.add(w); //將當前任務添加到任務集 int nt = ++poolSize; //當前線程數加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
這個方法返回類型是Thread,所以可以新建一個線程並執行任務,之後將線程對象返回給外面的線程對象,然後執行t.start(),這裡有一個Worker對象接收了任務,接下來看一下Worker類的實現:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據 //自己需要重載這個方法和後面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當任務隊列中沒有任務時,進行清理工作 } } }
這個類實現了Runnable接口,所以會有run()方法,在run中執行的還是傳入的任務,其實相當於調用傳入任務對象的run方法,之所以費力氣將任務對象加到Worker類中去執行,是因為這個線程執行之後會進入阻塞隊列等待被執行,這個線程的生命並沒有結束,這也正是使用線程池的最大原因。這裡用一個Set集合存儲Worker,這樣不會有重複的任務被存儲,firstTask被執行完後進入緩存隊列,而這個新創建的線程就一直從緩存隊列中拿到任務去執行。這個方法為getTask(),所以接下來看看線程如何從緩存隊列拿到任務。
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閒時間, //則通過poll取任務,若等待一定的時間取不到任務,則返回null r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //中斷處於空閒狀態的worker return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
從上面代碼可知,如果核心線程池中創建的線程想要拿到緩存隊列中的任務,需要先要判斷線程池的狀態,如果STOP或者TERMINATED,返回NULL,如果是RUNNING或者SHUTDOWN,則從緩存隊列中拿到任務去執行。以上就是核心線程池執行任務的原理。
如果線程數量超過核心線程池大小呢?回到executor()方法
if (runState == RUNNING && workQueue.offer(command))
如果線程數量超過核心線程池大小,先進行線程池狀態的判斷,如果是RUNNING,則將新進來的線程加入緩存隊列。如果失敗,往往是因為緩存隊列滿了或者線程池狀態不是RUNNING,就通過addIfUnderMaximumPoolSize(command)直接創建新的線程去執行任務,但是這個線程不是核心線程池中的,是臨時擴展的,要保證線程數最大不超過線程池大小 maximumPoolSize,如果超過執行 reject(command)操作,拒絕接受新的任務。
如果任務已經加入緩存隊列成功還要繼續進行判斷
if (runState != RUNNING || poolSize == 0)
這是為了防止在將任務加入緩存隊列的同時其他線程調用shutdown或者shutdownNow方法,所以採取了保護措施。
addIfUnderMaximumPoolSize方法和addIfUnderCorePoolSize基本類似,只是方法中判斷條件改變了,是在緩衝隊列滿了並且線程池狀態是在RUNNING狀態下才會執行,裡面的判斷條件是線程池數量小於線程池最大容量,並且線程池狀態是RUNNING。
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
綜上所述,總結如下:
- 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
- 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
- 如果當前線程池中的線程數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
- 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。
線程池的正確使用方法
1、避免使用無界隊列,不要使用Executors.newXXXThreadPool()快捷方法創建線程池,因為這種方式會使用無界的任務隊列,為避免OOM,我們應該使用ThreadPoolExecutor的構造方法手動指定隊列的最大長度。
2、明確拒絕任務時的行為,任務隊列總有佔滿的時候,這時需要通過RejectedExecutionHandler設置正確的拒絕,線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。
3、獲取處理結果和異常,根據結果及異常,進行相關處理。
以上是關於線程池核心類ThreadPoolExecutor的相關知識,掌握了這些,基本上可以駕馭面試過程中的這部分問題了。
ps:其他文章可以關注微信公眾號測試架構師養成記,還有價值999的資料可以領哦~
關鍵字: 源碼 ThreadPoolExecutor 構造方法