可重入鎖(ReentrantLock)源碼續讀-Condition

上一編一起解讀了 ,還有一塊沒有解讀,就是Condition,試想一個場景,如果一個線程獲取到鎖了,但是處理一半時發現數據不夠了,需要釋放鎖喚醒另外一個線程創造數據,這時候就需要Condition了。

Condition可以在鎖隊列中控制線程等待和喚醒,很類似線程的wait和notify,Condition也有類似的方法await和signal,Condition控制的更為細粒度。

ReentrantLock獲取鎖的方法如下:

public Condition newCondition() {
//同步器new一個新的Condition對象
return sync.newCondition();
}
//同步器裡創建一個ConditionObject
final ConditionObject newCondition() {
return new ConditionObject();
}

先看下返回的Condition裡都有什麼方法:

public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

主要是await的不同時間方式控制的多態方法,和signal一個和signalAll所有線程的方法。

ConditionObject是同步器隊列AbstractQueuedSynchronizer內置的一個類,看下ReentrantLock的類結構吧:

可重入鎖(ReentrantLock)源碼續讀-Condition

我們接下來看下ConditionObject核心的兩個方法,

1、首先看await方法:

public final void await() throws InterruptedException {
//相應中斷,如果已經中斷被中斷過拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//把當前線程放入到等待隊列尾部
Node node = addConditionWaiter();
//完全釋放當前線程鎖持有的鎖,並返回持有的鎖的狀態
int savedState = fullyRelease(node);
//中斷狀態標識
int interruptMode = 0;
//因為當前線程剛釋放完鎖,且加到等待隊列中,並不在同步隊列中
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//阻塞時中斷時跳出循環,並且記錄中斷方式
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//等待完成被喚醒後重新獲取鎖,並處理中斷模式
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果下一個等待這不為空,清洗一下等待隊列,把取消的線程刪除
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果中斷過,中斷處理:拋出異常或補上中斷

if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

這個方法展示了整個wait的過程,再詳細看下內部的方法。

1.1、addConditionWaiter方法如下:

private Node addConditionWaiter() {
//取最後一個等待節點
Node t = lastWaiter;
//如果最後一個節點為空或非等待狀態,清洗等待隊列
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//再次取最後一個等待節點
t = lastWaiter;
}
//創建當前線程等待節點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果最後一個等待節點為空,當前節點即為最後一個
//如果不為空,加入到等待隊列尾部,
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

1.2、再看下一個方法fullyRelease

final int fullyRelease(Node node) {
boolean failed = true;
try {
//獲取當前線程鎖狀態

int savedState = getState();
//全部釋放鎖
if (release(savedState)) {
failed = false;
//返回釋放前的狀態
return savedState;
} else {
//釋放失敗拋出異常
throw new IllegalMonitorStateException();
}
} finally {
//如果釋放失敗,此線程的等待狀態置為取消
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

1.3、再看isOnSyncQueue

/** 這個方法是判斷當前線程是否在獲取鎖的隊列中,是否具有獲取鎖的條件, */
final boolean isOnSyncQueue(Node node) {
//等待線程或前一節點為空直接返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果前一節點為非空,後一節點為非空返回true
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* 從尾到都再檢查一遍前節點是否為空
*/
return findNodeFromTail(node);
}

1.4、再看下當被喚醒重新獲取鎖的方法acquireQueued(node, savedState)

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//循環獲取鎖
for (;;) {
//獲取前任節點
final Node p = node.predecessor();
//如果前任節點為頭節點則獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//獲取失敗後是否阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//獲取失敗後取消獲取
if (failed)
cancelAcquire(node);
}
}

2、signal方法源碼如下:

public final void signal() {
//當前線程非獨佔線程時拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//取第一個等待節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

2.1、喚醒首節點方法:doSignal(first)

private void doSignal(Node first) {
do {
//下一個節點置為首節點,如果為空,尾節點也置為空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//把當前節點從隊列中刪除
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

2.2、再看transferForSignal方法

final boolean transferForSignal(Node node) {
//清空Condition等待狀態
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//把當前線程加入到同步隊列尾部,對應wait方法裡的“while (!isOnSyncQueue(node))”,並返回前一個節點
Node p = enq(node);
int ws = p.waitStatus;
//如果前一個節點取消了或者設置喚醒狀態失敗了,喚醒當前線程,感覺屬於一個補充動作,假設前節點從隊列中失效了即使喚醒,其實如果前節點不失效,first也會從等待的while循環中跳出。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

Condition的兩個重要方法解讀完了,如有疑問或不同的見解,請留言。


分享到:


相關文章: