多線程!你只要看這一篇就夠了

碼個蛋(codeegg)第 749 次推文

原文: https://juejin.im/post/5d7da37d6fb9a06b0202f156

多線程併發問題,基本是面試必問的。

大部分同學應該都知道SynchronizedLock,部分同學能說到volatile併發包,優秀的同學則能在前面的基礎上,說出Synchronized、volatile的原理,以及併發包中常用的數據結構,例如ConcurrentHashMap的原理。

這篇文章將總結多線程併發的各種處理方式,希望對大家有所幫助。

一、多線程為什麼會有併發問題

為什麼多線程同時訪問(讀寫)同個變量,會有併發問題?

  1. Java 內存模型規定了所有的變量都存儲在主內存中,每條線程有自己的工作內存。

  2. 線程的工作內存中保存了該線程中用到的變量的主內存副本拷貝,線程對變量的所有操作都必須在工作內存中進行,而不能直接讀寫主內存。

  3. 線程訪問一個變量,首先將變量從主內存拷貝到工作內存,對變量的寫操作,不會馬上同步到主內存。

  4. 不同的線程之間也無法直接訪問對方工作內存中的變量,線程間變量的傳遞均需要自己的工作內存和主存之間進行數據同步進行。

二、Java 內存模型(JMM)

Java 內存模型(JMM) 作用於工作內存(本地內存)和主存之間數據同步過程,它規定了如何做數據同步以及什麼時候做數據同步,如下圖。

多线程!你只要看这一篇就够了

三、併發三要素

原子性:在一個操作中,CPU 不可以在中途暫停然後再調度,即不被中斷操作,要麼執行完成,要麼就不執行。

可見性:多個線程訪問同一個變量時,一個線程修改了這個變量的值,其他線程能夠立即看得到修改的值。

有序性:程序執行的順序按照代碼的先後順序執行。

四、怎麼做,才能解決併發問題?(重點)

下面結合不同場景分析解決併發問題的處理方式。

一、volatile

1.1 volatile 特性

保證可見性,不保證原子性。

1. 當寫一個volatile變量時,JVM會把本地內存的變量強制刷新到主內存中。

2. 這個寫操作導致其他線程中的緩存無效,其他線程讀,會從主內存讀。volatile的寫操作對其它線程實時可見。

禁止指令重排序 指令重排序是指編譯器和處理器為了優化程序性能對指令進行排序的一種手段,需要遵守一定規則:

1. 不會對存在依賴關係的指令重排序,例如 a = 1;b = a; a 和b存在依賴關係,不會被重排序。

2. 不能影響單線程下的執行結果。比如:a=1;b=2;c=a+b這三個操作,前兩個操作可以重排序,但是c=a+b不會被重排序,因為要保證結果是3。

1.2 使用場景

對於一個變量,只有一個線程執行寫操作,其它線程都是讀操作,這時候可以用 volatile 修飾這個變量。

1.3 單例雙重鎖為什麼要用到volatile?

<code>public class TestInstance {

private static volatile TestInstance mInstance;

public static TestInstance getInstance{ //1
if (mInstance == ){ //2
synchronized (TestInstance.class){ //3
if (mInstance == ){ //4
mInstance = new TestInstance; //5
}
}
}
return mInstance;
}/<code>

假如沒有用volatile,併發情況下會出現問題,線程A執行到註釋5 new TestInstance 的時候,分為如下幾個幾步操作:

  1. 分配內存

  2. 初始化對象

  3. mInstance 指向內存

這時候如果發生指令重排,執行順序是132,執行到第3的時候,線程B剛好進來了,並且執行到註釋2,這時候判斷mInstance 不為空,直接使用一個未初始化的對象。所以使用volatile關鍵字來禁止指令重排序。

1.4 volatile 原理

在JVM底層volatile是採用內存屏障來實現的,內存屏障會提供3個功能:

  1. 它確保指令重排序時不會把其後面的指令排到內存屏障之前的位置,也不會把前面的指令排到內存屏障的後面;即在執行到內存屏障這句指令時,在它前面的操作已經全部完成;

  2. 它會強制將緩存的修改操作立即寫到主內存

  3. 寫操作會導致其它CPU中的緩存行失效,寫之後,其它線程的讀操作會從主內存讀。

1.5 volatile 的侷限性

volatile 只能保證可見性不能保證原子性寫操作對其它線程可見,但是不能解決多個線程同時寫的問題。

二、Synchronized

2.1 Synchronized 使用場景

多個線程同時寫一個變量。

例如售票,餘票是100張,窗口A和窗口B同時各賣出一張票, 假如餘票變量用 volatile 修飾,是有問題的。

A窗口獲取餘票是100,B窗口獲取餘票也是100,A賣出一張變成99,刷新回主內存,同時B賣出一張變成99,也刷新回主內存,會導致最終主內存餘票是99而不是98。

前面說到 volatile 的侷限性,就是多個線程同時寫的情況,這種情況一般可以使用Synchronized。

Synchronized 可以保證同一時刻,只有一個線程可執行某個方法或某個代碼塊。

2.2 Synchronized 原理

<code>public class SynchronizedTest {

public static void main(String[] args) {
synchronized (SynchronizedTest.class) {
System.out.println("123");
}
method;
}

private static void method {
}
}/<code>

將這段代碼先用javac命令編譯,再java p -v SynchronizedTest.class命令查看字節碼,部分字節碼如下

<code>public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class com/lanshifu/opengldemo/test/SynchronizedTest
2: dup
3: astore_1
4: monitorenter
5: getstatic #3 // Field java/lang/System.out:Ljava/io/PrintStream;
8: ldc #4 // String 123
10: invokevirtual #5 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
13: aload_1
14: monitorexit
15: goto 23
18: astore_2
19: aload_1
20: monitorexit
21: aload_2
22: athrow
23: invokestatic #6 // Method method:V
26: return/<code>

可以看到 4: monitorenter 和 14: monitorexit,中間是打印的語句。

執行同步代碼塊,首先會執行monitorenter指令,然後執行同步代碼塊中的代碼,退出同步代碼塊的時候會執行monitorexit指令 。

使用Synchronized進行同步,其關鍵就是必須要對對象的監視器monitor進行獲取,當線程獲取monitor後才能繼續往下執行,否則就進入同步隊列,線程狀態變成BLOCK,同一時刻只有一個線程能夠獲取到monitor,當監聽到monitore

xit被調用,隊列裡就有一個線程出隊,獲取monitor。詳情參考:

https://www.jianshu.com/p/d53bf830fa09

每個對象擁有一個計數器,當線程獲取該對象鎖後,計數器就會加一,釋放鎖後就會將計數器減一,所以只要這個鎖的計數器大於0,其它線程訪問就只能等待。

2.3 Synchronized 鎖的升級

大家對Synchronized的理解可能就是重量級鎖,但是Java1.6對 Synchronized 進行了各種優化之後,有些情況下它就並不那麼重,Java1.6 中為了減少獲得鎖和釋放鎖帶來的性能消耗而引入的偏向鎖和輕量級鎖。

偏向鎖: 大多數情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得,為了讓線程獲得鎖的代價更低而引入了偏向鎖。

當一個線程A訪問加了同步鎖的代碼塊時,會在對象頭中存 儲當前線程的id,後續這個線程進入和退出這段加了同步鎖的代碼塊時,不需要再次加鎖和釋放鎖。

輕量級鎖: 在偏向鎖情況下,如果線程B也訪問了同步代碼塊,比較對象頭的線程id不一樣,會升級為輕量級鎖,並且通過自旋的方式來獲取輕量級鎖。

重量級鎖: 如果線程A和線程B同時訪問同步代碼塊,則輕量級鎖會升級為重量級鎖,線程A獲取到重量級鎖的情況下,線程B只能入隊等待,進入BLOCK狀態。

2.4 Synchronized 缺點

  1. 不能設置鎖超時時間

  2. 不能通過代碼釋放鎖

  3. 容易造成死鎖

三、ReentrantLock

上面說到Synchronized的缺點,不能設置鎖超時時間和不能通過代碼釋放鎖,ReentranLock就可以解決這個問題。

在多個條件變量和高度競爭鎖的地方,用ReentrantLock更合適,ReentrantLock還提供了Condition,對線程的等待和喚醒等操作更加靈活,一個ReentrantLock可以有多個Condition實例,所以更有擴展性。

3.1 ReentrantLock 的使用

lock 和 unlock

<code>ReentrantLock reentrantLock = new ReentrantLock;
System.out.println("reentrantLock->lock");
reentrantLock.lock;
try {
System.out.println("睡眠2秒...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace;
}finally {
reentrantLock.unlock;
System.out.println("reentrantLock->unlock");
}/<code>

實現可定時的鎖請求:tryLock

<code>public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock;
Thread thread1 = new Thread_tryLock(reentrantLock);
thread1.setName("thread1");
thread1.start;
Thread thread2 = new Thread_tryLock(reentrantLock);
thread2.setName("thread2");
thread2.start;
}


static class Thread_tryLock extends Thread {
ReentrantLock reentrantLock;
public Thread_tryLock(ReentrantLock reentrantLock) {
this.reentrantLock = reentrantLock;
}

@Override
public void run {

try {
System.out.println("try lock:" + Thread.currentThread.getName);
boolean tryLock = reentrantLock.tryLock(3, TimeUnit.SECONDS);
if (tryLock) {
System.out.println("try lock success :" + Thread.currentThread.getName);
System.out.println("睡眠一下:" + Thread.currentThread.getName);
Thread.sleep(5000);
System.out.println("醒了:" + Thread.currentThread.getName);
} else {
System.out.println("try lock 超時 :" + Thread.currentThread.getName);
}
} catch (InterruptedException e) {
e.printStackTrace;
} finally {
System.out.println("unlock:" + Thread.currentThread.getName);
reentrantLock.unlock;
}
}
}/<code>

打印的日誌:

<code>try lock:thread1
try lock:thread2
try lock success :thread2
睡眠一下:thread2
try lock 超時 :thread1
unlock:thread1
Exception in thread "thread1" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at com.lanshifu.demo_module.test.lock.ReentranLockTest$Thread_tryLock.run(ReentranLockTest.java:60)
醒了:thread2
unlock:thread2/<code>

上面演示了trtLock的使用,trtLock設置獲取鎖的等待時間,超過3秒直接返回失敗,可以從日誌中看到結果。有異常是因為thread1獲取鎖失敗,不應該調用unlock。

3.2 Condition 條件

<code>public static void main(String[] args) {
Thread_Condition thread_condition = new Thread_Condition;
thread_condition.setName("測試Condition的線程");
thread_condition.start;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace;
}
thread_condition.singal;
}
static class Thread_Condition extends Thread {

@Override
public void run {
await;
}

private ReentrantLock lock = new ReentrantLock;
public Condition condition = lock.newCondition;

public void await {
try {
System.out.println("lock");
lock.lock;
System.out.println(Thread.currentThread.getName + ":我在等待通知的到來...");
condition.await;//await 和 signal 對應
//condition.await(2, TimeUnit.SECONDS); //設置等待超時時間
System.out.println(Thread.currentThread.getName + ":等到通知了,我繼續執行>>>");
} catch (Exception e) {
e.printStackTrace;
} finally {
System.out.println("unlock");
lock.unlock;
}
}

public void singal {
try {
System.out.println("lock");
lock.lock;
System.out.println("我要通知在等待的線程,condition.signal");
condition.signal;//await 和 signal 對應

Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace;
} finally {
System.out.println("unlock");
lock.unlock;
}
}
}/<code>

運行打印日誌

<code>lock
測試Condition的線程:我在等待通知的到來...
lock
我要通知在等待的線程,condition.signal
unlock
測試Condition的線程:等到通知了,我繼續執行>>>
unlock/<code>

上面演示了Condition的 await 和 signal 使用,前提要先lock。

3.3 公平鎖與非公平鎖

ReentrantLock 構造函數傳true表示公平鎖。

公平鎖表示線程獲取鎖的順序是按照線程加鎖的順序來分配的,即先來先得的順序。而非公平鎖就是一種鎖的搶佔機制,是隨機獲得鎖的,可能會導致某些線程一致拿不到鎖,所以是不公平的。

3.4 ReentrantLock 注意點

  1. ReentrantLock使用lock和unlock來獲得鎖和釋放鎖

  2. unlock要放在finally中,這樣正常運行或者異常都會釋放鎖

  3. 使用condition的await和signal方法之前,必須調用lock方法獲得對象監視器

四、併發包

通過上面分析,併發嚴重的情況下,使用鎖顯然效率低下,因為同一時刻只能有一個線程可以獲得鎖,其它線程只能乖乖等待。

Java提供了併發包解決這個問題,接下來介紹併發包裡一些常用的數據結構。

4.1 ConcurrentHashMap

我們都知道HashMap是線程不安全的數據結構,HashTable則在HashMap基礎上,get方法和put方法加上Synchronized修飾變成線程安全,不過在高併發情況下效率底下,最終被ConcurrentHashMap替代。

ConcurrentHashMap 採用分段鎖,內部默認有16個桶,get和put操作,首先將key計算hashcode,然後跟16取餘,落到16個桶中的一個,然後每個桶中都加了鎖(ReentrantLock),桶中是HashMap結構(數組加鏈表,鏈表過長轉紅黑樹)。

所以理論上最多支持16個線程同時訪問。

4.2 LinkBlockingQueue

鏈表結構的阻塞隊列,內部使用多個ReentrantLock

<code>/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock;
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition;
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock;
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition;

private void signalNotEmpty {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock;
try {
notEmpty.signal;
} finally {
takeLock.unlock;
}
}

/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull {
final ReentrantLock putLock = this.putLock;
putLock.lock;
try {
notFull.signal;
} finally {
putLock.unlock;
}
}/<code>

源碼不貼太多,簡單說一下LinkBlockingQueue 的邏輯:

1. 從隊列獲取數據,如果隊列中沒有數據,會調用notEmpty.await;進入等待。

2. 在放數據進去隊列的時候會調用notEmpty.signal;,通知消費者,1中的等待結束,喚醒繼續執行。

3. 從隊列裡取到數據的時候會調用notFull.signal;,通知生產者繼續生產。

4. 在put數據進入隊列的時候,如果判斷隊列中的數據達到最大值,那麼會調用notFull.await;,等待消費者消費掉,也就是等待3去取數據並且發出notFull.signal;,這時候生產者才能繼續生產。

LinkBlockingQueue 是典型的生產者消費者模式,源碼細節就不多說。

4.3 原子操作類:AtomicInteger

內部採用CAS(compare and swap)保證原子性

舉一個int自增的例子

<code>AtomicInteger atomicInteger = new AtomicInteger(0);
atomicInteger.incrementAndGet;//自增/<code>

源碼看一下

<code>/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet {

return U.getAndAddInt(this, VALUE, 1) + 1;
}/<code>

U 是 Unsafe,看下 Unsafe#getAndAddInt

<code>public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}/<code>

通過compareAndSwapInt保證原子性。

五、總結

面試中問到多線程併發問題,可以這麼答:

  1. 當只有一個線程寫,其它線程都是讀的時候,可以用volatile修飾變量

  2. 當多個線程寫,那麼一般情況下併發不嚴重的話可以用Synchronized,Synchronized並不是一開始就是重量級鎖,在併發不嚴重的時候,比如只有一個線程訪問的時候,是偏向鎖;當多個線程訪問,但不是同時訪問,這時候鎖升級為輕量級鎖;當多個線程同時訪問,這時候升級為重量級鎖。所以在併發不是很嚴重的情況下,使用Synchronized是可以的。不過Synchronized有侷限性,比如不能設置鎖超時,不能通過代碼釋放鎖。

  3. ReentranLock 可以通過代碼釋放鎖,可以設置鎖超時。

  4. 高併發下,Synchronized、ReentranLock 效率低,因為同一時刻只有一個線程能進入同步代碼塊,如果同時有很多線程訪問,那麼其它線程就都在等待鎖。這個時候可以使用併發包下的數據結構,例如ConcurrentHashMap,LinkBlockingQueue,以及原子性的數據結構如:AtomicInteger。

面試的時候按照上面總結的這個思路回答基本就ok了。既然說到併發包,那麼除了ConcurrentHashMap,其它一些常用的數據結構的原理也需要去了解下,例如HashMap、HashTable、TreeMap原理,ArrayList、LinkedList對比,這些都是老生常談的,自己去看源碼或者一些博客。

關於多線程併發就先總結到這裡,如果是應付面試的話按照這篇文章的思路來準備應該是沒太大問題的。


分享到:


相關文章: