Java開發CyclicBarrier-同步屏障實現分析

CyclicBarrier

CyclicBarrier 是可循環使用的屏障,主要功能是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障才會打開;所有被屏障攔截的線程才會繼續執行。

使用示例

public class CyclicBarrierTest {
// 線程個數
private int parties = 3;
private AtomicInteger atomicInteger = new AtomicInteger(parties);
private CyclicBarrier cyclicBarrier;
class Protector implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " - 到達屏障前");
TimeUnit.SECONDS.sleep(2);
cyclicBarrier.await();
atomicInteger.decrementAndGet();
System.out.println(Thread.currentThread().getName() + " - 到達屏障後");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " - 等待中斷");
} catch (BrokenBarrierException e) {
System.out.println(Thread.currentThread().getName() + " - 屏障被破壞");
}
}
}
@Before
public void init() {
cyclicBarrier = new CyclicBarrier(parties);
}
@Test
public void allAwait() {
for (int i = 0; i < parties; i++) {
new Thread(new Protector(), "Thread-" + i).start();
}
while (true) {
if (atomicInteger.get() == 0) {
// 所有線程到達屏障後退出結束
System.out.println("test over");
break;
}
}
}
@Test
public void oneAwaitInterrupted() throws InterruptedException {
Thread threadA = new Thread(new Protector(), "Thread-A");
Thread threadB = new Thread(new Protector(), "Thread-B");
threadA.start();

threadB.start();
// 等待 3 秒,避免是 time sleep 觸發中斷異常
TimeUnit.SECONDS.sleep(3);
threadA.interrupt();
while (true) {
if (atomicInteger.get() == 0) {
System.out.println("test over");
break;
}
if (cyclicBarrier.isBroken()) {
System.out.println("屏障中斷退出");
break;
}
}
}
}
Thread-A - 到達屏障前
Thread-B - 到達屏障前
屏障中斷退出
Thread-A - 等待中斷
Thread-B - 屏障被破壞
Thread-0 - 到達屏障前
Thread-1 - 到達屏障前
Thread-2 - 到達屏障前
Thread-2 - 到達屏障後
Thread-0 - 到達屏障後
Thread-1 - 到達屏障後
test over

從 oneAwaitInterrupted 方法執行結果可以看出,當一個線程 A 執行中斷時,另外一個線程 B 會拋出 BrokenBarrierException

構造

// 可以指定攔截線程個數
public CyclicBarrier(int parties) {
this(parties, null);
}
// 指定攔截線程個數和所有線程到達屏障處後執行的動作

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

實現

概念

  • barrier : 屏障
  • parties : 為屏障攔截的線程數
  • tripped : 跳閘,可以理解為打開屏障
  • generation.broken : 屏障是否破損,當屏障被打開或被重置的時候會改變值

簡單的理解就是,當線程都到達屏障的時候,會打開屏障。

await()

await 說明線程到達屏障

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 獲取排他鎖
lock.lock();

try {
final Generation g = generation;
// 屏障被破壞則拋異常
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
// 線程中斷 則退出屏障
breakBarrier();
throw new InterruptedException();
}
// 到達屏障的計數減一
int index = --count;
if (index == 0) { // tripped
// index == 0, 說明指定 count 的線程均到達屏障
// 此時可以打開屏障
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 若指定了 barrierCommand 則執行
command.run();
ranAction = true;
// 喚醒阻塞在屏障的線程並重置 generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 若未指定阻塞在屏障處的等待時間,則一直等待;直至最後一個線程到達屏障處的時候被喚醒
trip.await();
else if (nanos > 0L)
// 若指定了阻塞在屏障處的等待時間,則在指定時間到達時會返回

nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 若等待過程中,線程發生了中斷,則退出屏障
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 屏障被破壞 則拋出異常
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
// g != generation 說明所有線程均到達屏障處 可直接返回
// 因為所有線程到達屏障處的時候,會重置 generation
// 參考 nextGeneration
return index;
if (timed && nanos <= 0L) {
// 說明指定時間內,還有線程未到達屏障處,也就是等待超時
// 退出屏障
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
// 喚醒阻塞在等待隊列的線程
trip.signalAll();
// set up next generation
// 重置 count
count = parties;

// 重置 generation
generation = new Generation();
}
private void breakBarrier() {
// broken 設置為 true
generation.broken = true;
// 重置 count
count = parties;
// 喚醒等待隊列的線程
trip.signalAll();
}

如下圖為 CyclicBarrier 實現效果圖:

Java開發CyclicBarrier-同步屏障實現分析

isBroken()

返回屏障是否被破壞,也是是否被中斷

public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

reset()

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 喚醒阻塞的線程
breakBarrier(); // break the current generation
// 重新設置 generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

getNumberWaiting

獲取阻塞在屏障處的線程數

public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 攔截線程數 - 未到達屏障數
return parties - count;
} finally {
lock.unlock();

}
}

小結

CyclicBarrier 和 CountDownLatch 功能類似,不同之處在於 CyclicBarrier 支持重複利用,而 CountDownLatch 計數只能使用一次。


分享到:


相關文章: