摘要
C++11標準從發佈到現在已經快10年了。筆者在工作中陸陸續續學習並應用了移動語義(move semantics),智能指針(unique_ptr<>, shared_ptr<>),lamda等C++11的新特性。總體感覺還是真香。最近因為項目開發,要搭建多線程的自動化測試,於是嘗試使用了條件變量(conditional variable)來協調不同線程的進度。在這個過程中踩了一些坑,想記錄下來跟大家一起學習。
問題與背景
這個自動化的測試裡面有一組工作線程負責處理數據。開始時,它們處於等待狀態。當另一組線程把數據準備好了以後,它們就開始處理數據。處理完畢後,各自完成剩下的任務,進程結束。為了方便演示,筆者把這個過程簡化為如下代碼:
<code>```cpp
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;
mutex aMutex;
void PrintString(const char* s)
{
unique_lock<mutex> lock(aMutex);
cout << s << endl;
}
void ProcessData()
{
PrintString("Waiting for data...");
PrintString("Got data. Processing data...");
}
void PrepareData()
{
PrintString("Preparing data...");
PrintString("Data is ready!");
}
int main()
{
cout << "Start!" << endl;
thread dataProcessor(ProcessData);
thread dataProducer(PrepareData);
dataProcessor.join();
dataProducer.join();
cout << "Finished!" << endl;
}
```/<mutex>/<thread>/<mutex>/<iostream>/<code>
其中,線程dataProcessor負責處理數據而dataProducer負責產生數據。因為沒有機制去協調兩個線程,程序的輸出是隨機的。在筆者編譯運行時,能看到輸出如下:
Start!
Waiting for data...
Got data. Processing data...
Preparing data...
Data is ready!
Finished!
從打印的結果可以看到,dataProcessor在dataProducer把數據準備好之前就開始處理數據了。這是不合理的,所以我們要引入機制——條件變量來協調兩者的執行順序,從而解決這個問題。
第一次嘗試
先來看下cplusplus.com對於條件變量的說明:
Condition variable
A condition variable is an object able to block the calling thread until notified to resume.
It uses a unique_lock (over a mutex) to lock the thread when one of its wait functions is called. The thread remains blocked until woken up by another thread that calls a notification function on the same condition_variable object.
總結起來兩點就是:
1. 條件變量可以掛起當前線程,直到收到其他線程的通知。
2. 它需要同鎖配合使用。
聽上去非常適合解決現在的問題。筆者查詢了一下條件變量的基本使用,很快就把它應用到程序中:
<code>```cpp
...
condition_variable condVar; // (1)
...
void ProcessData()
{
PrintString("Waiting for data...");
{
unique_lock<mutex> lock(aMutex); // (2)
condVar.wait(lock);
}
PrintString("Got data. Processing data...");
}
void PrepareData()
{
PrintString("Preparing data...");
PrintString("Data is ready!");
condVar.notify_one(); // (3)
}
```/<mutex>/<code>
程序主體沒有大的變化,主要的改動是:
1. 新定義了一個全局的條件變量conVar。
2. 在ProcessData中調用條件變量的wait方法,進入等待狀態。
3. 在PrepareData中,在數據生成完畢後,調用條件變量的notify_one方法,來讓ProcessData繼續。
編譯執行,輸出如下:
Waiting for data...
Preparing data...
Data is ready!
Got data. Processing data...
Finished!
第一個坑
看上去很完美。但是多測試幾次,馬上就發現有的時候程序沒法結束。Debug了一下,原來是dataProcessor卡在了條件變量的wait方法那裡。這是怎麼回事呢?
筆者稍微分析了下,感覺多半是因為dataProducer的調用notify_one發生在dataProcessor調用wait之前。這就導致dataProducer的notify_one沒有起到效果,然後dataProcessor陷入了深深的睡眠之中……
一個顯而易見的解決思路是讓dataProducer執行的慢一點:
<code>```cpp
void PrepareData()
{
PrintString("Preparing data...");
this_thread::sleep_for(5s);
PrintString("Data is ready!");
condVar.notify_one();
}
```/<code>
加入5秒的睡眠確實可以達到目的,但是效率上面讓人不是很舒服。
第二次嘗試
在網上查了一下,發現碰到這個問題人不少,而解決方案也不是什麼其他的黑科技,就是使用wait的另一個帶條件判斷的重載:
<code>```cpp
template <class>
void wait (unique_lock<mutex>& lck, Predicate pred);
```/<mutex>/<class>/<code>
現在可以總結一下wait-notify的工作流程了。
當程序進入wait調用時,
1. 傳入的第一個參數-互斥鎖會被鎖上。
2. 然後檢查傳入的第二參數-條件謂詞。如果條件為
* true:直接返回,當前線程繼續工作。
* false:解開互斥鎖,當前線程進入掛起狀態。
當在掛起狀態的條件變量收到喚醒通知時,
1. 進程被喚醒並嘗試獲取互斥鎖。
2. 檢查條件謂詞。如果條件為
* true:直接返回,當前線程繼續工作。
* false:解開互斥鎖,當前線程進入掛起狀態。
是不是頭暈了?知道它為什麼叫條件變量了嗎?這東西用起來真的是有點複雜。但是沒辦法,現實就是這麼的殘酷:)
用了帶條件判斷的wait,代碼看起來大概是這個樣子:
<code>```cpp
...
bool isDataReady = false;
...
void ProcessData()
{
PrintString("Waiting for data...");
{
unique_lock<mutex> lock(aMutex);
condVar.wait(lock, [] { return isDataReady; });
}
PrintString("Got data. Processing data...");
}
void PrepareData()
{
PrintString("Preparing data...");
isDataReady = true;
PrintString("Data is ready!");
condVar.notify_one();
}
```/<mutex>/<code>
第二個坑
以為這下應該完美了,但是經過測試發現,程序還是有極小的概率出現dataProcessor卡在wait那裡。到底什麼地方還藏著坑呢?
在仔細檢查兩個線程的整個流程後,終於發現了問題所在——在PrepareData裡面isDataReady的修改沒有被正確的同步。這句話是什麼意思呢?要解釋清楚的話,先來看下條件變量的帶謂詞條件判斷的wait裡面具體做些什麼事。根據前面的分析,它的實現大概是長這個樣子:
<code>```cpp
template <class>
void wait(unique_lock<mutex>& lck, Predicate pred)
{
while (!pred())
{
wait(lck);
}
}
```/<mutex>/<class>/<code>
所以
<code>```cpp
void ProcessData()
{
PrintString("Waiting for data...");
{
unique_lock<mutex> lock(aMutex);
condVar.wait(lock, [] { return isDataReady; });
}
PrintString("Got data. Processing data...");
}
```/<mutex>/<code>
等價於
<code>```cpp
void ProcessData()
{
PrintString("Waiting for data...");
{
unique_lock<mutex> lock(aMutex);
while (![] { return isDataReady; }())
{
//
condVar.wait(lock);
}
}
PrintString("Got data. Processing data...");
}
```/<mutex>/<code>
在這裡有個關鍵的時間窗口(已經標在上面了)。如果在這個窗口裡面,dataProducer把isDataReady的值修改成true並且完成notify_one的調用,那麼dataProcessor仍然會錯過通知,陷入深深的睡眠……
第三次嘗試
既然這個坑的情況已經清楚了,那就來把它填上吧。解決方案就是把對isDataReady的修改,放在互斥鎖的範圍裡面,即:
<code>```cpp
void PrepareData()
{
PrintString("Preparing data...");
{
unique_lock<mutex> lock(aMutex);
isDataReady = true;
}
PrintString("Data is ready!");
condVar.notify_one();
}
```/<mutex>/<code>
注意,你可以把condVar.notify_one()也放在鎖的範圍內,但這不是必須的。
有了這個保護後,dataProducer對isDataReady的修改不會和dataProcessor調用wait同時發生。如果dataProducer對isDataReady的修改先發生,則dataProducer不會進入等待,因為謂詞條件檢查結果是false;反之,dataProducer不會錯過來自dataProducer的通知。這樣一來,所有的坑終於都被填上了。
總結
通過這一番學習,可以看到條件變量使用起來雖然比較靈活,但是要完全把它用對也頗有難度。那麼有沒有更簡單的方法呢?在選擇條件變量作為同步機制的前提下,目前我沒有發現更簡單的方法。但是你也許可以試一下future和promise(C++的另一種線程同步機制),個人認為在這個使用場景下會少踩很多坑。
我們會每週推送商業智能、數據分析資訊、技術乾貨和程序員日常生活,歡迎關注我們的頭條&知乎公眾號“
微策略中國”或微信公眾號“微策略 商業智能"。閱讀更多 微策略中國 的文章
關鍵字: isDataReady wait 架構師