FutureTask隨手筆記

主要的實現FutureTask

# FutureTask實際上運行還是一個runnable,它對callable做了一個封裝,讓開發人員可以從其中獲取返回值;FutrueTask是有狀態的 共7種狀態,四種狀態變換的可能NEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> COMPLETING -> NORMALNEW -> INTERRUPTING -> INTERRUPTED 

Callable和runnable的區別

0. 通過call方法調用;1. 有返回值2. 可以拋異常

get結果的實現原理

1. 判斷狀態;2. 非NEW,COMPLETING狀態則直接 進入report返回結果;3. 處於NEW,COMPLETING狀態,則進入等待awaitDone();

3.x awaitDone 流程

3.1. 獲取等待的超時時間deadline;3.2. 進入自旋3.3. 判斷線程是否被中斷:如果被中斷則移出等待waiters隊列;並拋出異常;3.4. 判斷FutrueTask狀態:如果">COMPLETING",代表執行完成,進入report;3.5. 判斷FutrueTask狀態:如果"=COMPLETING",讓出CPU執行Thread.yield();3.6. 為當前線程創建一個node節點;3.7. 將當前線程WaitNode加入等待隊列waiters中;3.8. 判斷是否超時;3.9. 通過LockSupport.park掛起線程,等待運行許可;4. report返回執行結果:如果一切正常就返回執行結果,否則返回Exception; 

run具體執行原理如下:

1. 判斷狀態是否正常,避免重複執行;2. 調用callable的call()方法;3. 修改執行狀態;保存執行結果;並通知正在等待get的線程;## 3.x通知機制finishCompletion3.1. 獲取所有waiters的集合;3.2. 通過cas 拿到執行權;3.3. 循環遍歷所有等待的線程,通過LockSupport.unpark 喚醒其執行;

Callable和Future的實現原理(JDK8源碼分析)

1. cancel 取消執行

public boolean cancel(boolean mayInterruptIfRunning) { // 判斷狀態:只有剛創建的情況下才能取消 // mayInterruptIfRunning:是否中斷當前正在運行這個FutureTask的線程; if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 如果要中斷當前線程,則對runner發佈interrupt信號; if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 修改狀態為:已經通知線程進行中斷 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 通知其他在等待結果的線程 finishCompletion(); } return true;}

2. run

public void run() { // 判斷狀態及設置futuretask歸屬的線程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執行Callable result = c.call(); // 標記為執行成功 ran = true; } catch (Throwable ex) { result = null; // 標記為執行不成功 ran = false; // 設置為異常狀態,並通知其他在等待結果的線程 setException(ex); } // 如果執行成功,修改狀態為正常,並通知其他在等待結果的線程 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 如果狀態為準備發起中斷信號或者已經發出中斷信號,則讓出CPU(Thread.yield()) if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}

3. get

public V get() throws InterruptedException, ExecutionException { int s = state; // 如果還沒執行完,則等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 通過report取結果 return report(s);}

3.1 report 取執行結果

private V report(int s) throws ExecutionException { Object x = outcome; // 如果一切正常,則返回x(x是callable執行的結果outcome) if (s == NORMAL) return (V)x; // 如果被取消,則拋出已取消異常 if (s >= CANCELLED) throw new CancellationException(); // 否則拋出執行異常 throw new ExecutionException((Throwable)x);}

3.2 awaitDone 等待FutureTask執行結束

private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 記錄等待超時的時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 多個在等待結果的線程,通過一個鏈表進行保存,waitNode就是每個線程在鏈表中的節點; WaitNode q = null; boolean queued = false; // 死循環...也可以說是自旋鎖同步 for (;;) { // 判斷當前這個調用get的線程是否被中斷 if (Thread.interrupted()) { // 將當前線程移出隊列 removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果狀態非初創或執行完畢了,則跳出循環,通過report()取執行結果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果狀態等於已執行,讓出CPU執行,等待狀態變為正常結束 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果當前線程還沒有創建對象的waitNode節點,則創建一個 else if (q == null) q = new WaitNode(); // 如果當前線程對應的waitNode還沒有加入到等待鏈表中,則加入進去; else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有設置等待超時時間,則通過parkNanos掛起當前線程,等待繼續執行的信號 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 通過park掛起當前線程,等待task執行結束後給它發一個繼續執行的信號(unpark) else LockSupport.park(this); }} 

4. finishCompletion 通知所有在等待結果的線程

private void finishCompletion() { // assert state > COMPLETING; // 遍歷所有正在等待執行結果的線程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // unpark,發佈一個讓它繼續執行的“許可” LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint}
FutureTask隨手筆記


分享到:


相關文章: