- <strong>
- <strong>
- <strong>
- <strong>
![Java併發編程—併發流程控制與AQS原理及相關源碼解析](http://p2.ttnews.xyz/loading.gif)
0. 主要內容
- 文章分為兩部分:
- 第一個部分主要講併發流程控制的各大類的使用及案例
- 第二部分主要是先將AQS的組成及原理,然後結合CountDownLatch、Semaphore等分析源碼邏輯
ps: 文章內容比較多
1. 併發流程控制
1.1 什麼是併發流程控制
- 併發流程控制,就是讓線程之間相互配合完成任務,來滿足業務邏輯
- 如:讓線程A等待線程B完成後再執行等策略
1.2 併發流程控制的工具
![Java併發編程—併發流程控制與AQS原理及相關源碼解析](http://p2.ttnews.xyz/loading.gif)
2. CountDownLatch計數門閂
2.1 作用
- 併發流程控制的工具,用於等待數量(我們設定的)足夠後再執行某些任務
2.2 主要方法
- CountDownLatch(int count):只有一個構造方法,參數count為需要倒數的值
- await():調用此方法的線程會被掛起,它會等到count值為零的時候才繼續執行
- countdown():講count減1,直到0,等待的線程會被喚醒
2.3 用法一:等待線程執行完畢
<code>/** * @author yiren */public class CountDownLatchExample01 { public static void main(String[] args) throws InterruptedException { AtomicInteger integer = new AtomicInteger(1); CountDownLatch latch = new CountDownLatch(5); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName()+ " produce ...."); TimeUnit.SECONDS.sleep(1); integer.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } }); } System.out.println(Thread.currentThread().getName() + " waiting...."); latch.await(); System.out.println(Thread.currentThread().getName() + " finished!"); System.out.println(Thread.currentThread().getName() + " num: " + integer.get()); executorService.shutdown(); }}/<code>
<code>pool-1-thread-1 produce ....pool-1-thread-2 produce ....pool-1-thread-3 produce ....main waiting....pool-1-thread-4 produce ....pool-1-thread-5 produce ....main finished!main num: 6Process finished with exit code 0/<code>
2.4 用法二:多等一
<code>/** * @author yiren */public class CountDownLatchExample02 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " ready!"); try { latch.await(); System.out.println(Thread.currentThread().getName()+ " produce ...."); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } }); } Thread.sleep(10); System.out.println(Thread.currentThread().getName() + " ready!"); latch.countDown(); System.out.println(Thread.currentThread().getName() + " go!"); executorService.shutdown(); }}/<code>
<code>pool-1-thread-1 ready!pool-1-thread-4 ready!pool-1-thread-3 ready!pool-1-thread-2 ready!pool-1-thread-5 ready!main ready!main go!pool-1-thread-1 produce ....pool-1-thread-2 produce ....pool-1-thread-5 produce ....pool-1-thread-3 produce ....pool-1-thread-4 produce ....Process finished with exit code 0/<code>
2.4 注意
- CountDownLatch不僅可以無限等待,還可以給參數,在指定的事件內如果等到就喚醒線程繼續執行
- boolean await(long timeout, TimeUnit unit) 複製代碼
- CountDownLatch不能重用,如果涉及重新計數,可以使用CyclicBarrier或者新創建CountDownLatch
3. Semaphore信號量
3.1 信號量作用
- Semaphore可以用來限制或管理數量有限的資源使用情況
- 信號量的租用是維護一個許可計數,線程可以獲取許可,然後信號量減一;線程也可以釋放許可,信號量就加一;如果信號量的許可頒發完了,其他線程想要獲取,就需要等待,直到有另外的線程釋放了許可。
3.2 信號量使用
- 初始化Semaphore指定許可數量
- 在需要獲取許可的代碼前面加上acquire()或者acquireUniterruptibly()方法
- 任務執行完成有調用release()釋放許可
3.3 主要方法
- Semaphore(int permits, boolean fair)這裡設置許可數量,以及是否使用公平策略。 如果傳入true那麼久吧等待線程放入到FIFO的隊列裡面。
- aquire()請求許可,可以響應中斷
- aquireUnniterruptibly()請求許可不可中斷
- tryAcquire()看看現在有沒有空閒的許可,如果有那就返回true;這個方法還可以設置等待時間給一個timeout,讓線程等待一段時間。
- release()釋放許可
3.4 案例演示
<code>/** * @author yiren */public class SemaphoreExample01 { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3, true); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 8; i++) { executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName()+" start to get permit"); semaphore.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }); } executorService.shutdown(); }}/<code>
<code>pool-1-thread-1 start to get permitpool-1-thread-4 start to get permitpool-1-thread-3 start to get permitpool-1-thread-2 start to get permitpool-1-thread-5 start to get permitpool-1-thread-6 start to get permitpool-1-thread-7 start to get permitpool-1-thread-8 start to get permitpool-1-thread-3 2020-02-21T19:54:47.392 finished!pool-1-thread-1 2020-02-21T19:54:47.392 finished!pool-1-thread-4 2020-02-21T19:54:47.392 finished!pool-1-thread-6 2020-02-21T19:54:49.396 finished!pool-1-thread-2 2020-02-21T19:54:49.396 finished!pool-1-thread-5 2020-02-21T19:54:49.396 finished!pool-1-thread-8 2020-02-21T19:54:51.401 finished!pool-1-thread-7 2020-02-21T19:54:51.401 finished!Process finished with exit code 0/<code>
3.5 注意點
- 獲取和釋放的許可證必須一致,acquire和release都是可以傳入數值的來確定獲取和釋放的數量。如果我們獲取和釋放不一致,就會容易導致程序bug。當然也不是絕對,除非有特殊業務需求,否則都獲取釋放設置為一樣的
- 注意在初始化Semaphore的時候設置公平性,一般設置為true會比較合理。如果插隊情況比較嚴重的話,某些線程可能一直阻塞
- 獲取和釋放許可對線程並不要求,線程A獲取了可以線程B釋放。
4. Condition接口
4.1 作用
- 當線程A需要等待某個任務或者某個資源,就可以執行condition.await()方法,然後就會陷入阻塞狀態。
- 此時另一個線程B,去獲取資源或者執行任務完成後,調用condition.signal()或者signalAll()方法,通知線程A,繼續執行
- 這個類似於object.wait()、notify()、notifyAll()
- signal()方法如果遇到多個線程都在等待的時候,會去喚醒等待時間最長的那個
- 在我們ReentrantLock中就可以直接新建Condition。看下面案例
4.2 案例演示
- 普通用法
<code>/** * @author yiren */public class ConditionExample01 { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { task1(); }); Thread thread2 = new Thread(() -> { task2(); }); thread1.start(); Thread.sleep(100); thread2.start(); } private static void task1() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " start await()"); condition.await(); System.out.println(Thread.currentThread().getName() + " await finished!"); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void task2() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " start signal()"); Thread.sleep(1000); condition.signal(); System.out.println(Thread.currentThread().getName() + " signal finished!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }}/<code>
<code>Thread-0 start await()Thread-1 start signal()Thread-1 signal finished!Thread-0 await finished!Process finished with exit code 0/<code>
- 生產者消費者模式
<code>/** * @author yiren */public class ConditionExample02 { private int queueSize = 10; private PriorityQueue<integer> queue = new PriorityQueue<>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionExample02 conditionDemo2 = new ConditionExample02(); Producer producer = conditionDemo2.new Producer(); Consumer consumer = conditionDemo2.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("隊列空,等待數據"); try { notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); notFull.signalAll(); System.out.println("從隊列裡取走了一個數據,隊列剩餘" + queue.size() + "個元素"); } finally { lock.unlock(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { lock.lock(); try { while (queue.size() == queueSize) { System.out.println("隊列滿,等待有空餘"); try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer(1); notEmpty.signalAll(); System.out.println("向隊列插入了一個元素,隊列剩餘空間" + (queueSize - queue.size())); } finally { lock.unlock(); } } } }}/<integer>/<code>
- 以上使用兩個Condition作為隊列滿和空的通知傳遞工具在生產者和消費者之間互通
4.3 注意點
- 我們知道Lock可以看做synchronized的替代方案,而Condition就是用來替代object.wait/notify的,在用法上幾乎一致。
- 調用await()方法時必須持有Lock鎖,否則會拋出異常,並且await()方法會釋放當前持有的Lock鎖,
- 一個Lock鎖可以有多個Condition更加靈活
5. CyclicBarrier循環柵欄
5.1 作用
- CyclicBarrier循環柵欄和CountDownLatch很類似,都能阻塞一組線程
- 當需要多個線程配合完成任務,並最後需要統一彙總時,我們就可以使用CyclicBarrier,當某個線程完成任務後,它先會等待,等到所有線程都執行好了任務,再一起繼續執行剩下的任務 比如:同時出去聚餐約在了公司,等大家到公司了一起走過去。
- 但是注意CyclicBarrier是可以重複使用的,這個和CountDownLatch不同
5.2 案例
<code>/** * @author yiren */public class CyclicBarrierExample { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("所有人都到場了, 大家統一出發!"); } }); for (int i = 0; i < 10; i++) { new Thread(new Task(i, cyclicBarrier)).start(); } } static class Task implements Runnable { private int id; private CyclicBarrier cyclicBarrier; public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線程" + id + "現在前往集合地點"); try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("線程" + id + "到了集合地點,開始等待其他人到達"); cyclicBarrier.await(); System.out.println("線程" + id + "出發了"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }}/<code>
<code>線程0現在前往集合地點線程2現在前往集合地點線程3現在前往集合地點線程1現在前往集合地點線程4現在前往集合地點線程5現在前往集合地點線程6現在前往集合地點線程7現在前往集合地點線程8現在前往集合地點線程9現在前往集合地點線程3到了集合地點,開始等待其他人到達線程9到了集合地點,開始等待其他人到達線程8到了集合地點,開始等待其他人到達線程4到了集合地點,開始等待其他人到達線程5到了集合地點,開始等待其他人到達所有人都到場了, 大家統一出發!線程5出發了線程3出發了線程8出發了線程4出發了線程9出發了線程1到了集合地點,開始等待其他人到達線程6到了集合地點,開始等待其他人到達線程0到了集合地點,開始等待其他人到達線程7到了集合地點,開始等待其他人到達線程2到了集合地點,開始等待其他人到達所有人都到場了, 大家統一出發!線程2出發了線程1出發了線程7出發了線程0出發了線程6出發了Process finished with exit code 0/<code>
- 每五個人到了過後,就出發一批
5.3 CountDownLatch和CyclicBarrier`區別
- 作用不同:CountDownLatch使用countDown()是用於事件的,而CyclicBarrier使用await()是用於線程的
- 可重用性不同:CountDownLatch在倒數到0後不能再次重用,除非創建新對象;而CyclicBarrier是可以直接重用的
6. 深入AQS理解J.U.C的根基
6.1 AQS作用及其重要性
- AQS在CountDownLatch等工具內都有使用,全稱是:AbstractQueuedSynchronizer是一個抽象類
- 鎖和上面的線程併發控制類(Semaphore等)都有類似的地方。 其實他們底層都是使用了AQS作為基類的拓展
- 正因為他們很多工作都類似,JDK就把這部分通用邏輯抽離了出來,提供給他們直接使用,使其不必關注很多深層次的細節,從而完成他們的功能。
- 我們可以大致看一下我們鎖用到的這些併發控制的工具類和鎖的內部實現 `Semaphore``
<code>
- ReentrantLock
<code>public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L;....../<code>
- CountDownLatch
<code>public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); }....../<code>
- 由上源碼我們可以看到,裡面都有一個內部類,Sync繼承自AbstractQueuedSynchronizer
- 那麼AQS是用來幹些什麼事情的呢? J.U.C基本都是是基於AQS實現的,AQS是一個用於構建鎖、同步器、線程協作工具類的框架供給子類使用,主要使用模板模式來設計。 它主要工作就是管理線程的阻塞與喚醒,實現同步的管理,以及阻塞線程的隊列管理工作
6.2 AQS的組成及內部原理
- AbstractQueuedSynchronizer自JDK1.5加入,是基於FIFO等待隊列實現的一個用於同步器的基礎框架。
- JDK1.8 繼承AQS實現的類:
- 我們可以看到,在可重入鎖,讀寫鎖,計數門閂等,信號量裡面都是用了AQS的子類,接下來我們就學習一下AQS的內部原理
- AQS的三大部分 state:狀態, FIFO隊列:線程競爭鎖的管理隊列 獲取和釋放方法:需要工具類去實現的方法
- state:狀態
<code> /** * The synchronization state. */ private volatile int state; /<code>
- 它的含義並不具體,根據實現的不同而不同,如:Semaphore內是剩餘許可數量、CountDownLatch內是還需要倒數的數量,可看做一個計數器,只是不同類的作用及意義不用
<code>
- 狀態值的更新,是使用Unsafe的CAS完成
- 在ReentrantLock中:state表示鎖的佔用情況,可重入的計數,每重入一次就加一,當要釋放鎖時,它的值就會變成0,表示不被任何線程佔有。
- FIFO隊列:
<code>
這個隊列是用來存放等待的線程的,AQS會對這個隊列進行管理。當多個線程競爭鎖時,沒有拿到鎖的,就會被翻到隊列中,當前拿到鎖的執行任務的線程結束,AQS就會從隊列中選一個線程來佔有這個鎖。 AQS維護一個雙向鏈表的等待隊列,把等待線程都放到這個隊列裡面管理;隊列頭節點是當前拿到鎖的線程;在AQS中保存了這個隊列的頭尾節點。
獲取和釋放的方法 獲取方法: 獲取操作會依賴state變量,經常會阻塞,如:獲取不到鎖的時候,獲取不到許可的時候等 在ReentrantLock中,就是獲取鎖。state+1 在Semaphore中就是acquire獲取許可,state-1,當state==0就會阻塞 在CountDownLatch中就是await方法,就是等待state==0 釋放方法: 釋放操作不會阻塞 在ReentrantLock中就是unlock方法調用release(1)對應state-1 在Semaphore中就是realease,也是state-1 CountDownLatch中就是countDown方法,也是state-1 一般情況下,實現類都會實現tryAcquire和tryRelease相關方法,以對應各個類的需求
6.3 AQS的用法
- 指定協作邏輯,實現獲取和釋放方法
- 在內部寫一個Sync類繼承AQS
- 根據是否獨佔來決定重寫的方法:獨佔使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主邏輯裡面的獲取釋放相關方法中調用Sync的方法
7. AQS在CountDownLatch中的源碼剖析
- 下面我們以CountDownLatch為例分析源碼:
- 構造函數 我們看到內部實現就是初始化一個Sync然後把計數值傳入
<code> public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }/<code>
- 我們可以看下面的CountDownlatch中Sync的實現,在構造方法創建的Sync傳入的count調用了setState方法傳入了AQS的state中
- 在CountDownLatch內部有一個繼承AQS的
<code> private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }/<code>
- CountDownLatch的getCount()方法
<code>
我們可以看到getCount實際也是調用Sync的getCount()來獲取state並返回
- CountDownLatch的countDown()方法
<code>
CountDownLatch的countDown()方法
<code> public void countDown() { sync.releaseShared(1); }/<code>
- 我們看一看到它直接調用了AQS的releaseShared(1)
<code> public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }/<code>
- 而releaseShared則是回去調用CountDownLatch中實現的tryReleaseShared
<code> protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }/<code>
- 而在tryReleaseShared中則是主要對state的值做-1操作,如果state大於零可以獲取到就減一併且用CAS併發更新值,如果最新值為0就返回true
- 返回true過後就doReleaseShared釋放鎖,喚醒隊列裡面的等待線程。也就是調用了await()方法的線程
- CountDownLatch的await()方法
<code>
而await則會調用AQS中的默認實現sync.acquireSharedInterruptibly(1);
<code> protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }/<code>
<code> protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }/<code>
- 而在tryReleaseShared中則是主要對state的值做-1操作,如果state大於零可以獲取到就減一併且用CAS併發更新值,如果最新值為0就返回true
- 返回true過後就doReleaseShared釋放鎖,喚醒隊列裡面的等待線程。也就是調用了await()方法的線程
- AQS在CountDownLatch中使用的一些點: 調用CountDownLatch的await()時,便會嘗試獲取共享鎖,開始時是獲取不到鎖的,於是就被阻塞 可以獲取到的條件就是計數器為0,也就是state==0的時候。 只有每次調用countDown方法才會使得計數器減一,減到0時就回去喚醒阻塞中的線程。
8. AQS在Semaphore中的源碼剖析
- 由於上面講得很細了,接下來就簡略一些
- 在Semaphore中state就是許可證的數量
- 主要的操作就是acquire和release,也是借用Sync對state的操作來控制線程的阻塞與喚醒
<code> public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }/<code>
<code> public void release() { sync.releaseShared(1); }/<code>
- 先看下acquire調用的acquireSharedInterruptibly此方法在上面已經說過。
<code> public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }/<code>
- 而在Semaphore中Sync有兩個實現:NonfairSync、FairSync
- 在FairSync中tryAcquireShared就會有hasQueuedPredecessors判斷,如果不是頭節點,那就返回-1,在acquireSharedInterruptibly方法中去調用doAcquireSharedInterruptibly入隊並且阻塞線程
<code> protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }/<code>
而在NonfairSync中而是直接調用Sync的nonfairTryAcquireShared
<code>protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires);}/<code>
<code> final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }/<code>
- 可以看到其中並沒有對是否阻塞隊列的頭節點判斷,直接去獲取值,判斷是會否許可足夠。
<code> public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }/<code>
<code> protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }/<code>
- 我們可以看到此處就是關於Semaphore的已獲取許可的釋放 把state加回去然後用CAS更新state
9. AQS在ReentrantLock中的應用
- 源碼就不分析了
- 在ReentrantLock中,state主要是重入的次數,加鎖的時候state+1 ,而在釋放鎖的時候,state-1然後判斷當前的state==0
- 在ReentrantLock中與AQS相關的有三個類:UnfairSync,FairSync,Sync
- 關於加鎖和解鎖的邏輯也是AQS中的acquire方法的邏輯(獲取鎖失敗就會放入隊列中)和release方法(調用子類的tryRelease來去掉頭部,並且喚醒線程)
- 而加鎖解鎖中的邏輯,主要是公平鎖和非公平鎖的區別,公平鎖會去判斷是否在隊列頭部,如果在才會去執行,而非公平鎖則會搶鎖。不會管你是不是在隊列頭部。
- 相信在上面的源碼分析過後,分析ReentrantLock是十分簡單的。大家可以自行分析。
作者:苡仁
原文鏈接:https://juejin.im/post/5e5354e6f265da5724466dbf
閱讀更多 追逐仰望星空 的文章