大家好,我是小圖靈視界,最近在分析Java8源碼,使用的JDK是OpenJDK8,打算將分析源碼的筆記都會分享出來,在頭條上代碼排版比較難看,想要筆記的可以關注並私信我。
由於分析Objec類源碼比較長,所以將Object類源碼分析的筆記分為兩部分,沒有看第一部分分析的可以點擊下面進行鏈接進行閱讀。
notify、notifyAll、wait
這三個方法放在一起講,都是native修飾的本地方法,另外這三個方法被final修飾,說明這三個方法是不能重寫。
notify:喚醒任意一個等待的線程
notifyAll:喚醒所有等待的線程
wait:釋放鎖,線程進入等待。
這幾個方法主要用於線程之間的通信,特別適合與生產者和消費者的場景,下面用消費者和生產者的例子看看這幾個方法的使用:
<code>//生產者 public class Producer implements Runnable{ //產品容器 private final List container; //產品容器的大小 private final int size=5; public Producer( List container){ this.container=container; } private void produce() throws InterruptedException { synchronized (container){ //判斷容器是否已滿 while (container.size()==size){ System.out.println("容器已滿,暫定生產。"); container.wait(); } Random random = new Random(); int num = random.nextInt(10); //模擬1秒生產一個產品 Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"時間"+new Date()+" 生產產品:" + num); container.add(num); //生產一個產品就可以通知消費者消費了 container.notifyAll(); } } public void run() { while (true){ try { produce(); } catch (InterruptedException e) { System.out.println("生產機器異常"); e.printStackTrace(); } } } }/<code>
生產者每秒生產一個產品,然後通知消費者消費,當容器滿了,就進行等待消費者喚醒。
<code>//消費者 public class Consumer implements Runnable{ //消費容器 private final List container; public Consumer(List container){ this.container=container; } private void consume() throws InterruptedException { synchronized(container){ while (container.isEmpty()){ System.out.println("沒有可消費的產品"); //等待 container.wait(); } //消費產品 Thread.sleep(1000); Integer num=container.remove(0); System.out.println(Thread.currentThread().getName()+"時間"+new Date()+" 消費產品:"+num); //消費一個就可以通知生產者消費了 container.notifyAll(); } } public void run() { while (true){ try { consume(); } catch (InterruptedException e) { System.out.println("消費錯誤"); e.printStackTrace(); } } } }/<code>
消費者每秒消費一個產品,當容器減少一個產品就可以通知生產者生產產品了,當容器為空時,進行等待生產者喚醒。
<code>//生產消費過程 public class Main { public static void main(String[] args){ List container = new ArrayList(); Thread producer = new Thread(new Producer(container)); Thread consumer = new Thread(new Consumer(container)); producer.start(); consumer.start(); } }/<code>
啟動一個消費者和一個生產者,也可以啟動多個消費者和多個生產者,消費者和生產者之間共用容器,上述消費者的消費速度和生產者的生產速度都是每秒一個產品,可以改變消費者消費的速度和生產者生產的速度觀察程序運行的結果。
notify、notifyAll、wait一般配合著關鍵synchronized 一起使用,這三個方法在synchronized 代碼塊中使用,否則會拋出
IllegalMonitorStateException。當它們在synchronized 代碼塊中執行,說明當前線程一定獲得了鎖。優先級高的線程競爭到對象鎖的概率大。notify只會喚醒一個等待的線程,而notifyAll喚醒所有的線程,喚醒所有的線程,並不意味著所有的線程會立刻執行,這些被喚醒的鎖還需要競爭鎖。
既然notify、notifyAll、wait這幾個方法都是本地方法,那JVM層面是怎麼實現的?先來看看wait方法對應的本地方法是JVM_MonitorWait,JVM_MonitorWait的實現如下:
<code>JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms)) JVMWrapper("JVM_MonitorWait"); //resolve_non_null將傳入的對象強轉為oop Handle obj(THREAD, JNIHandles::resolve_non_null(handle)); JavaThreadInObjectWaitState jtiows(thread, ms != 0); //是否已經交於monitor監控等待了 if (JvmtiExport::should_post_monitor_wait()) { //觸發等待事件 JVMTI [%s] montior wait event triggered // 將等待事件發送給線程 JVMTI [%s] monitor wait event sent JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms); } //重點分析這句 ObjectSynchronizer::wait(obj, ms, CHECK); JVM_END/<code>
monitor指的是監控器,monitor的作用監視線程,保證在同一時間內只有一個線程可以訪問共享的數據和代碼(加鎖對象的數據),其他線程需要訪問共享數據就先進行等待,等這個線程釋放鎖以後,其他線程才有機會進行訪問。monitor有wait set(等待池)和entry set(實例池),wait set存放處於wait狀態的線程隊列,entry set存放處於等待鎖block狀態的線程隊列。
JVM_MonitorWait方法中首先判斷下是否已經交於monitor進行監控等待了,如果是的話,調用post_monitor_wait方法,這個方法主要作用觸發等待事件,將等待事件發送給線程。
分割線上部分代碼主要是消除偏向鎖、判斷等待的時間是否小於0,小於0的話,拋出拋出IllegalArgumentException異常。這個方法重要代碼就在分割線的下部分,ObjectMonitor就是監視器對象,ObjectMonitor的結構如下:
<code>ObjectMonitor() { _header = NULL; _count = 0; _waiters = 0, _recursions = 0; //重試的次數 _object = NULL; _owner = NULL; //指向持有ObjectMonitor對象的線程 _WaitSet = NULL; //存放所有wait狀態的線程的對象 _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ;//阻塞在Entry最近可達的線程列表,該列表其實是waitNode所構成的線程代理 FreeNext = NULL ; _EntryList = NULL ;//存放處於等待鎖block狀態的線程隊列 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; _previous_owner_tid = 0; }/<code>
_WaitSet就是monitor的等待池,存放處於wait狀態的線程隊列,_EntryList就是實例池,存放處於等待鎖block狀態的線程隊列。_cxq阻塞在Entry最近可達的線程列表,該列表其實是waitNode所構成的線程代理, _owner 是指向持有ObjectMonitor對象,ObjectSynchronizer::wait方法首先調用
ObjectSynchronizer::inflate獲取到monitor以後才執行monitor的wait方法。由於monitor的wait方法的源碼如下:
<code>// Wait/Notify/NotifyAll // // Note: a subset of changes to ObjectMonitor::wait() // will need to be replicated in complete_exit above void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { //獲取線程 Thread * const Self = THREAD ; //斷言是否是java線程 assert(Self->is_Java_thread(), "Must be Java thread!"); //強轉為java線程 JavaThread *jt = (JavaThread *)THREAD; //一些全局初始化工作、 DeferredInitialize () ; // Throw IMSX or IEX. //檢查是否擁有monitor(監視器) CHECK_OWNER(); EventJavaMonitorWait event; // check for a pending interrupt 檢查線程是否中斷和掛起 if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { // post monitor waited event. Note that this is past-tense, we are done waiting. //遞交給monitor等待事件 if (JvmtiExport::should_post_monitor_waited()) { // Note: 'false' parameter is passed here because the // wait was not timed out due to thread interrupt. JvmtiExport::post_monitor_waited(jt, this, false); } if (event.should_commit()) { post_monitor_wait_event(&event, 0, millis, false); } //監聽到異常事件 TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; } TEVENT (Wait) ; assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; //設置當前的等待monitor jt->set_current_waiting_monitor(this); // create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. //將當前線程包裝為ObjectWaiter ObjectWaiter node(Self); //狀態設置為TS_WAIT node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag // Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor *except* in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; //將ObjectWaiter對象放進等待池(wait set)中 AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; if ((SyncFlags & 4) == 0) { _Responsible = NULL ; } //保存舊的重試次數 intptr_t save = _recursions; // record the old recursion count //增加ObjectWaiter的數量 _waiters++; // increment the number of waiters _recursions = 0; // set the recursion level to be 1 exit (true, Self) ; // exit the monitor guarantee (_owner != Self, "invariant") ; // As soon as the ObjectMonitor's ownership is dropped in the exit() // call above, another thread can enter() the ObjectMonitor, do the // notify(), and exit() the ObjectMonitor. If the other thread's // exit() call chooses this thread as the successor and the unpark() // call happens to occur while this thread is posting a // MONITOR_CONTENDED_EXIT event, then we run the risk of the event // handler using RawMonitors and consuming the unpark(). // // To avoid the problem, we re-post the event. This does no harm // even if the original unpark() was not consumed because we are the // chosen successor for this monitor. if (node._notified != 0 && _succ == Self) { node._event->unpark(); } // The thread is on the WaitSet list - now park() it. // On MP systems it's conceivable that a brief spin before we park // could be profitable. // // TODO-FIXME: change the following logic to a loop of the form // while (!timeout && !interrupted && _notified == 0) park() int ret = OS_OK ; int WasNotified = 0 ; { // State transition wrappers OSThread* osthread = Self->osthread(); OSThreadWaitState osts(osthread, true); { ThreadBlockInVM tbivm(jt); // Thread is in thread_blocked state and oop access is unsafe. jt->set_suspend_equivalent(); if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { //當前線程通過park()方法開始掛起(suspend) if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; } } // were we externally suspended while we were waiting? if (ExitSuspendEquivalent (jt)) { // TODO-FIXME: add -- if succ == Self then succ = null. jt->java_suspend_self(); } } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm // Node may be on the WaitSet, the EntryList (or cxq), or in transition // from the WaitSet to the EntryList. // See if we need to remove Node from the WaitSet. // We use double-checked locking to avoid grabbing _WaitSetLock // if the thread is not on the wait queue. // // Note that we don't need a fence before the fetch of TState. // In the worst case we'll fetch a old-stale value of TS_WAIT previously // written by the is thread. (perhaps the fetch might even be satisfied // by a look-aside into the processor's own store buffer, although given // the length of the code path between the prior ST and this load that's // highly unlikely). If the following LD fetches a stale TS_WAIT value // then we'll acquire the lock and then re-fetch a fresh TState value. // That is, we fail toward safety. //當node的狀態為TS_WAIT時,從WaitSet中刪除 if (node.TState == ObjectWaiter::TS_WAIT) { Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ; if (node.TState == ObjectWaiter::TS_WAIT) { DequeueSpecificWaiter (&node) ; // unlink from WaitSet assert(node._notified == 0, "invariant"); //node狀態更改為TS_RUN node.TState = ObjectWaiter::TS_RUN ; } Thread::SpinRelease (&_WaitSetLock) ; } // The thread is now either on off-list (TS_RUN), // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ). // The Node's TState variable is stable from the perspective of this thread. // No other threads will asynchronously modify TState. guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ; OrderAccess::loadload() ; if (_succ == Self) _succ = NULL ; WasNotified = node._notified ; // Reentry phase -- reacquire the monitor. // re-enter contended monitor after object.wait(). // retain OBJECT_WAIT state until re-enter successfully completes // Thread state is thread_in_vm and oop access is again safe, // although the raw address of the object may have changed. // (Don't cache naked oops over safepoints, of course). // post monitor waited event. Note that this is past-tense, we are done waiting. if (JvmtiExport::should_post_monitor_waited()) { JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT); } if (event.should_commit()) { post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT); } OrderAccess::fence() ; assert (Self->_Stalled != 0, "invariant") ; Self->_Stalled = 0 ; assert (_owner != Self, "invariant") ; ObjectWaiter::TStates v = node.TState ; if (v == ObjectWaiter::TS_RUN) { enter (Self) ; } else { guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ; ReenterI (Self, &node) ; node.wait_reenter_end(this); } // Self has reacquired the lock. // Lifecycle - the node representing Self must not appear on any queues. // Node is about to go out-of-scope, but even if it were immortal we wouldn't // want residual elements associated with this thread left on any lists. guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ; assert (_owner == Self, "invariant") ; assert (_succ != Self , "invariant") ; } // OSThreadWaitState() jt->set_current_waiting_monitor(NULL); guarantee (_recursions == 0, "invariant") ; _recursions = save; // restore the old recursion count _waiters--; // decrement the number of waiters // Verify a few postconditions assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; if (SyncFlags & 32) { OrderAccess::fence() ; } // check if the notification happened if (!WasNotified) { // no, it could be timeout or Thread.interrupt() or both // check for interrupt event, otherwise it is timeout if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } } // NOTE: Spurious wake up will be consider as timeout. // Monitor notify has precedence over thread interrupt. }/<code>
ObjectMonitor::wait方法的作用主要作用為:
- 將當前線程包裝為ObjectWaiter,狀態設置為TS_WAIT,ObjectWaiter的結構可以參考分析下面分析ObjectSynchronizer::notify的內容。
- 執行 AddWaiter (&node) ,將ObjectWaiter放進等待池(wait set)中,即_WaitSet,_WaitSet是ObjectWaiter的一個隊列。AddWaiter方法就是將ObjectWaiter放進_WaitSet隊尾中。
- 將當前線程掛起,在上述源碼中並沒與釋放鎖。也就是釋放鎖的工作不在方法內。
其他邏輯,源碼中有詳細的註釋,感興趣的可以直接深入下。ObjectMonitor::AddWaiter作用是將線程加入等待池(wait set)中,ObjectMonitor::AddWaiter的代碼為:
<code>inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet ; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }/<code>
當_WaitSet為空,放在_WaitSet的頭部,_WaitSet的_prev和_next都指向node,當_WaitSet不為空時,將node放在_WaitSet頭部。
notify方法對應的JVM層面的函數是JVM_MonitorNotify,JVM_MonitorNotify的源碼為:
<code>JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle)) JVMWrapper("JVM_MonitorNotify"); //轉成oop,oop表示普通對象 Handle obj(THREAD, JNIHandles::resolve_non_null(handle)); //調用notify方法 ObjectSynchronizer::notify(obj, CHECK); JVM_END/<code>
首先將傳入的對象轉換成oop,上述源碼中,最重要的是調用了
ObjectSynchronizer::notify進行喚醒等待的線程,
ObjectSynchronizer::notify的源碼如下:
<code>void ObjectSynchronizer::notify(Handle obj, TRAPS) { //是否使用偏向鎖 if (UseBiasedLocking) { //消除偏向鎖 BiasedLocking::revoke_and_rebias(obj, false, THREAD); assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } markOop mark = obj->mark(); //如果已經獲取了鎖和獲取了監視器 if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { return; } //獲取monitor(監視器),監視THREAD,然後調用notify方法 ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD); }/<code>
ObjectSynchronizer::notify方法先判斷是否使用偏向鎖,如果使用了就消除偏向鎖。
ObjectSynchronizer::inflate是獲取monitor,在調用wait、notify、notifyAll方法的時候,首先需要獲取monitor(監視器),獲取了monitor可以看成是獲取了鎖,monitor相當於是邊界,沒有monitor監控的線程就不能進來monitor中。獲取了monitor,然後調用notify方法,notify在JVM層面的源碼為:
<code>void ObjectMonitor::notify(TRAPS) { //OWNES就是ObjectMonitor的_owner,指向持有ObjectMonitor對象的線程 CHECK_OWNER(); //判斷等待池(Wait Set)是是否為空,如果為空,直接返回 if (_WaitSet == NULL) { //觸發Empty的Notify TEVENT (Empty-Notify) ; return ; } //追蹤監視器探針,獲取java線程的id以及獲取java當前class的名字、字節大小、長度等 DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; ===============================分割線1============================= //線程自旋 Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; //等待池隊列,將等待池(wait set)隊列的第一個值取出並返回 ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { TEVENT (Notify1 - Transfer) ; //ObjectWaiterd的狀態 guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; guarantee (iterator->_notified == 0, "invariant") ; //如果Policy!=4 ,狀態設置為在獲取鎖隊列的狀態 if (Policy != 4) { iterator->TState = ObjectWaiter::TS_ENTER ; } iterator->_notified = 1 ; Thread * Self = THREAD; //線程id iterator->_notifier_tid = Self->osthread()->thread_id(); //獲取等待鎖block狀態的線程隊列 ObjectWaiter * List = _EntryList ; if (List != NULL) { //斷言,進行List前指針、狀態的斷言 assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } if (Policy == 0) { // prepend to EntryList //為空,等待鎖block狀態的線程隊列為空 if (List == NULL) { //如果為空,將隊列設置為空 iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { //放入_EntryList隊列的排頭位置 List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append to EntryList //Policy == 1:放入_EntryList隊列的末尾位置; if (List == NULL) { //如果為空,隊列設置為空 iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. //放入_EntryList隊列的末尾位置; ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } //Policy == 2 時,將List放在_cxq隊列的排頭位置 } else if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { //如果為空,隊列設置為空 iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; //放進_cxq隊列時,CAS操作,有其他線程競爭 for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } //Policy == 3:放入_cxq隊列中,末尾位置; } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; //同樣CAS操作 for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; //尾指針為空,設置為空 if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { //尾指針不為空,添加在隊尾 while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { //Policy等於其他值,立即喚醒ObjectWaiter對應的線程; ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } //Policy<4,等待重試 if (Policy < 4) { iterator->wait_reenter_begin(this); } // _WaitSetLock protects the wait queue, not the EntryList. We could // move the add-to-EntryList operation, above, outside the critical section // protected by _WaitSetLock. In practice that's not useful. With the // exception of wait() timeouts and interrupts the monitor owner // is the only thread that grabs _WaitSetLock. There's almost no contention // on _WaitSetLock so it's not profitable to reduce the length of the // critical section. } //線程自旋釋放 Thread::SpinRelease (&_WaitSetLock) ; if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) { ObjectMonitor::_sync_Notifications->inc() ; } }/<code>
ObjectMonitor::notify函數的邏輯主要分為兩部分,第一部分做一些檢查和準備喚醒操作過程需要的一些信息,第二部所及是根據Policy的大小將需要喚醒的線程放進等待鎖block狀態的線程隊列,即ObjectMonitor的_EntryList和_cxq隊列中,這兩個隊列的線程將等待獲取鎖。在分割線上部分, CHECK_OWNER()檢測是否擁有monitor,只有擁有monitor,才可以喚醒等待的線程,當等待池(wait set)為空,說明沒有等待池中沒有需要喚醒的線程,直接返回。如果等待池不為空,則準備獲取java線程以及獲取java當前class(JVM層面讀取的是class文件)的名字、字節大小、長度等。分割線下部分第二部分邏輯,當等待池中線程不為空的時候,首先調用 ObjectWaiter * iterator = DequeueWaiter() 從等待池中將第一個等待的線程取出來,DequeueWaiter() 的源代碼為:
<code>inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter //將wait set賦值給waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; }/<code>
ObjectMonitor::DequeueWaiter()中,當_WaitSet不為空時,調用
ObjectMonitor::DequeueSpecificWaiter方法返回_WaitSet的第一個元素,
ObjectMonitor::DequeueSpecificWaiter方法的源代碼為:
<code>//將_WaitSet中第一個線程返回回來 inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev != NULL, "node already removed from list"); assert(node->_next != NULL, "node already removed from list"); // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; // if (next == node) { assert(node->_prev == node, "invariant check"); _WaitSet = NULL; } else { ObjectWaiter* prev = node->_prev; assert(prev->_next == node, "invariant check"); assert(next->_prev == node, "invariant check"); next->_prev = prev; prev->_next = next; if (_WaitSet == node) { _WaitSet = next; } } node->_next = NULL; node->_prev = NULL; }/<code>
ObjectMonitor::DequeueSpecificWaiter方法中,首先判斷ObjectWaiter的_next是否等於_WaitSet,如果是否,則說明_WaitSet為空,將_WaitSet設置為NULL,如果不是,則將第一元素返回。
ObjectWaiter是JVM層面的C++類,ObjectWaiter類為:
<code>// ObjectWaiter serves as a "proxy" or surrogate thread. // TODO-FIXME: Eliminate ObjectWaiter and use the thread-specific // ParkEvent instead. Beware, however, that the JVMTI code // knows about ObjectWaiters, so we'll have to reconcile that code. // See next_waiter(), first_waiter(), etc. class ObjectWaiter : public StackObj { public: //狀態 enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ; enum Sorted { PREPEND, APPEND, SORTED } ; //下一個ObjectWaiter指針 ObjectWaiter * volatile _next; //前一個ObjectWaiter指針 ObjectWaiter * volatile _prev; //線程 Thread* _thread; //被喚醒的線程id jlong _notifier_tid; ParkEvent * _event; volatile int _notified ; volatile TStates TState ; Sorted _Sorted ; // List placement disposition bool _active ; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); };/<code>
ObjectWaiter類充當“代理”或代理線程,也就是ObjectWaiter充當_thread的代理角色,負責與其他對外的工作對接。_next指向下一個ObjectWaiter指針,_prev指向前一個ObjectWaiter指針。回到 ObjectMonitor::notify函數的第二部分邏輯,當從線程池中取出第一個ObjectWaiter(線程代理),根據Policy的值不同,將取出的線程放入等待鎖的_EnterList或者_cxq隊列中的起始或末尾位置。當Policy == 0時,將等待池取出的iterator(線程或者線程代理)放進 _EntryList中的排頭位置;當Policy == 1時,將等待池取出的iterator放進_EntryList中的末尾位置;當Policy == 2時,將等待池中取出的iterator放進放在_cxq隊列的排頭位置。因為有其他線程的競爭,當放入_cxq隊列時,進行CAS操作保證線程的安全;在講解ObjectMonitor結構出現過,_cxq是阻塞在_EnterList最近可達的線程列表,該列表其實是waitNode所構成的線程代理;當Policy == 3時,將等待池中取出的iterator放入_cxq隊列中的末尾位置;當Policy等於其他值,立即喚醒ObjectWaiter對應的線程,喚醒線程以後並沒有釋放鎖。經過上面的分析,我們知道java中調用notify方法時,不一定是立即喚醒線程,可能先將等待池中取出的線程放在獲取鎖阻塞池中(_EntryList或_cxq)。
java的notifyAll方法在JVM中的實現跟java的notify方法基本一樣,這裡就不貼源碼了,主要區別是遍歷ObjectWaiter * iterator = DequeueWaiter() ,重複java的notify方法的JVM實現過程,把所有的_WaitSet中的ObjectWaiter對象放入到_EntryList中。
JVM中wait、notify、notifyAll 方法中都沒有釋放鎖,鎖的釋放是在Synchronizer同步塊結束的時候釋放的。
釋放鎖調用ObjectMonitor::exit方法,主要將ObjectWaiter從_cxq或者_EntryList中取出後喚醒,喚醒的線程會繼續執行掛起前的代碼,通過CAS去競爭鎖,exit方式釋放鎖後,被喚醒的線程佔用了該鎖。
protected void finalize()
Object類中最後一個方法是finalize(),由protected 修飾,由子類進行重寫。當對象的引用不再被使用時,垃圾回收器進行調用finalize。這個方法是由垃圾回收器進行調用的,所以該方法可能不會被觸發,finalize方法不會被任何對象調用多次。當子類重寫了finalize方法,並且這個方法體不為空時,JVM層面則會調用register_finalizer函數進行註冊這個方法,finalize方法是在Java對象初始化過程中註冊的,當進行垃圾回收時,對象被回收並且在finalize中引用自身時,會逃過一次回收,這樣對象不一定會被回,finalize方法也不可能調用第二次。