Java中的5個併發工具類,你真的瞭解清楚了嗎?

Java中的5個併發工具類,你真的瞭解清楚了嗎?

引言

在JDK的併發包裡提供了很多有意思的併發工具類。CountDownLatch、CyclicBarrier和Semaphore 工具類提供了一種併發流程控制的手段,Exchanger 工具類則提供了在線程間交換數據的一種手段。

CountDownLatch

CountDownLatch允許一個或多個線程等待其他線程完成操作。

其實最簡單的做噶是使用join()方法,join用於讓當前執行線程等待join線程執行結束。其實現原理是不停檢查join線程是否存活,如果join線程存活則讓當前線程永遠等待。其中,wait(0) 表示永遠等待下去,代碼片段如下:

while (isAlive()) {
wait(0);
}

知道線程中止後,線程的 this.notifyAll() 方法被調用,調用 notifyAll() 方法是在 JVM裡實現的,所以在JDK裡看不到,大家可以查看JVM源碼。

在JDK1.5之後的併發包CountDownLatch也可以實現join的功能,並且功能更多,更強大。

示例代碼:

public static void main(String[] args) throws InterruptedException {
CountDownLatch c = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();//註釋這行
}
}).start();
c.await();
System.out.println("3");
}

運行結果:

1
2
3

CountDownLatch的構造方法接收一個int類型的參數作為計數器,如果你想等待N個點完成,這裡就傳入N。

當我們調用CountDownLatch的countDown()方法時,N就會減1,CountDownLatch的 await() 方法會阻塞當前線程,直到N變成零。由於countDown()方法可以用在任何地方,所以這裡說的N個點,也可以是N個線程。用在多個線程時,你只需要把這個CountDownLatch的引用傳遞到線程裡。

如果有某個線程處理的比較慢,我們不可能讓主線程一直等待,所以我們可以使用另外一個帶指定時間的await方法,await(long time, TimeUnit unit), 這個方法等待特定時間後,就會不再阻塞當前線程。join也有類似的方法。

注意:計數器必須大於等於0,只是等於0時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法 happen-before 另外一個線程調用await方法。

CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。

1 構造方法

CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。

示例代碼:

 public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier c = new CyclicBarrier(2);
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {

e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("1");
}
}).start();
c.await();
System.out.println(2);
}

運行結果:

1
2

或者

2
1

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)則主線程和子線程會永遠等待,因為沒有第三個線程執行await方法,即沒有第三個線程到達屏障,所以之前到達屏障的兩個線程都不會繼續執行


CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

示例代碼:

 public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier c = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println(3);
}
});

new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("1");
}
}).start();
c.await();
System.out.println(2);
}

運行結果:

3
1
2

2 應用場景

CyclicBarrier可以用於多線程計算數據,最後合併計算結果的應用場景。比如我們用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個帳戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

2.3 CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為複雜的業務場景,例如,如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次。

CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。

示例代碼:

 public static void main(String[] args){
CyclicBarrier c = new CyclicBarrier(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
t.start();
t.interrupt();
try {
c.await();
} catch (InterruptedException e) {
// e.printStackTrace();
} catch (BrokenBarrierException e) {
// e.printStackTrace();
}finally {
System.out.println(c.isBroken());
}
}

運行結果:

true

Semaphore

1 作用

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。

2 簡介

Semaphore也是一個線程同步的輔助類,可以維護當前訪問自身的線程個數,並提供了同步機制。使用Semaphore可以控制同時訪問資源的線程個數,例如,實現一個文件允許的併發訪問數。

3 應用場景

Semaphore可以用於做流量控制,特別公用資源有限的應用場景,比如數據庫連接。假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程併發的讀取,但是如果讀到內存後,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有十個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,我們就可以使用Semaphore來做流控,代碼如下:

public static void main(String[] args) {
final int THREAD_NUM = 30;
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUM);
Semaphore s = new Semaphore(10);//只允許10個線程併發執行
for (int i = 0; i < THREAD_NUM; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("play");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}

在代碼中,雖然有30個線程在執行,但是隻允許10個併發的執行。Semaphore的構造方法Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示允許10個線程獲取許可證,也就是最大併發數是10。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()獲取一個許可證,使用完之後調用release()歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

4 其他方法

Semaphore還提供一些其他方法:

  • int availablePermits() :返回此信號量中當前可用的許可證數。
  • int getQueueLength():返回正在等待獲取許可證的線程數。
  • boolean hasQueuedThreads() :是否有線程正在等待獲取許可證。
  • void reducePermits(int reduction) :減少reduction個許可證。是個protected方法。
  • Collection getQueuedThreads() :返回所有等待獲取許可證的線程集合。是個protected方法。

Exchanger

Exchanger(交換者)是一個用於線程間協作的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據, 如果第一個線程先執行exchange方法,它會一直等待第二個線程也執行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

應用場景

1、Exchanger可以用於遺傳算法,遺傳算法裡需要選出兩個人作為交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。

2、Exchanger也可以用於校對工作。比如我們需要將紙製銀流通過人工的方式錄入成電子銀行流水,為了避免錯誤,採用AB崗兩人進行錄入,錄入到Excel之後,系統需要加載這兩個Excel,並對這兩個Excel數據進行校對,看看是否錄入的一致。代碼如下:

 public static void main(String[] args) {
Exchanger<string> exchanger = new Exchanger<>();
new Thread(new Runnable() {
@Override
public void run() {
try {
String a = "銀行流水A";
exchanger.exchange(a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
String b = "銀行流水B";
String a = exchanger.exchange(b);
System.out.println("在B中獲取到錄入的A是:"+a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/<string>

運行結果:

在B中獲取到錄入的A是:銀行流水A

如果兩個線程有一個沒有到達exchange方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以使用exchange(V data, long time, TimeUnit unit)設置最大等待時長。


精彩內容:

想學習Java的小夥伴注意啦!我整理了一套從基礎的Java入門級學習到Java框架內容,送給每一位想要學習Java的小夥伴,想要獲取資料,【點擊頭像,右上角私信:學習】這裡是小白聚集地,歡迎初學和進階中的小夥伴~


分享到:


相關文章: