使用 C++11 編寫 Linux 多線程程序

前言

在這個多核時代,如何充分利用每個 CPU 內核是一個繞不開的話題,從需要為成千上萬的用戶同時提供服務的服務端應用程序,到需要同時打開十幾個頁面,每個頁面都有幾十上百個鏈接的 web 瀏覽器應用程序,從保持著幾 t 甚或幾 p 的數據的數據庫系統,到手機上的一個有良好用戶響應能力的 app,為了充分利用每個 CPU 內核,都會想到是否可以使用多線程技術。這裡所說的“充分利用”包含了兩個層面的意思,一個是使用到所有的內核,再一個是內核不空閒,不讓某個內核長時間處於空閒狀態。在 C++98 的時代,C++標準並沒有包含多線程的支持,人們只能直接調用操作系統提供的 SDK API 來編寫多線程程序,不同的操作系統提供的 SDK API 以及線程控制能力不盡相同,到了 C++11,終於在標準之中加入了正式的多線程的支持,從而我們可以使用標準形式的類來創建與執行線程,也使得我們可以使用標準形式的鎖、原子操作、線程本地存儲 (TLS) 等來進行復雜的各種模式的多線程編程,而且,C++11 還提供了一些高級概念,比如 promise/future,packaged_task,async 等以簡化某些模式的多線程編程。

多線程可以讓我們的應用程序擁有更加出色的性能,同時,如果沒有用好,多線程又是比較容易出錯的且難以查找錯誤所在,甚至可以讓人們覺得自己陷進了泥潭,希望本文能夠幫助您更好地使用 C++11 來進行 Linux 下的多線程編程。

零聲學院專門整理了Linux後臺服務開發大綱,有興趣的同學可以關注私信我(關鍵詞“Linux後臺開發”)!更多免費學習資料等你來取。

認識多線程

首先我們應該正確地認識線程。維基百科對線程的定義是:線程是一個編排好的指令序列,這個指令序列(線程)可以和其它的指令序列(線程)並行執行,操作系統調度器將線程作為最小的 CPU 調度單元。在進行架構設計時,我們應該多從操作系統線程調度的角度去考慮應用程序的線程安排,而不僅僅是代碼。

當只有一個 CPU 內核可供調度時,多個線程的運行示意如下:

圖 1、單個 CPU 內核上的多個線程運行示意圖

使用 C++11 編寫 Linux 多線程程序

我們可以看到,這時的多線程本質上是單個 CPU 的時間分片,一個時間片運行一個線程的代碼,它可以支持併發處理,但是不能說是真正的並行計算。

當有多個 CPU 或者多個內核可供調度時,可以做到真正的並行計算,多個線程的運行示意如下:

圖 2、雙核 CPU 上的多個線程運行示意圖

使用 C++11 編寫 Linux 多線程程序

從上述兩圖,我們可以直接得到使用多線程的一些常見場景:

  • 進程中的某個線程執行了一個阻塞操作時,其它線程可以依然運行,比如,等待用戶輸入或者等待網絡數據包的時候處理啟動後臺線程處理業務,或者在一個遊戲引擎中,一個線程等待用戶的交互動作輸入,另外一個線程在後臺合成下一幀要畫的圖像或者播放背景音樂等。
  • 將某個任務分解為小的可以並行進行的子任務,讓這些子任務在不同的 CPU 或者內核上同時進行計算,然後彙總結果,比如歸併排序,或者分段查找,這樣子來提高任務的執行速度。

需要注意一點,因為單個 CPU 內核下多個線程並不是真正的並行,有些問題,比如 CPU 緩存不一致問題,不一定能表現出來,一旦這些代碼被放到了多核或者多 CPU 的環境運行,就很可能會出現“在開發測試環境一切沒有問題,到了實施現場就莫名其妙”的情況,所以,在進行多線程開發時,開發與測試環境應該是多核或者多 CPU 的,以避免出現這類情況。

C++11 的線程類 std::thread

C++11 的標準類 std::thread 對線程進行了封裝,它的聲明放在頭文件 thread 中,其中聲明瞭線程類 thread, 線程標識符 id,以及名字空間 this_thread,按照 C++11 規範,這個頭文件至少應該兼容如下內容:

清單 1.例子 thread 頭文件主要內容

namespace std{
struct thread{
// native_handle_type 是連接 thread 類和操作系統 SDK API 之間的橋樑。
typedef implementation-dependent native_handle_type;
native_handle_type native_handle();
//
struct id{
id() noexcept;
// 可以由==, < 兩個運算衍生出其它大小關係運算。
bool operator==(thread::id x, thread::id y) noexcept;
bool operator template<class>
basic_ostream<chart>&
operator<&out, thread::id id);
// 哈希函數
template <class> struct hash;
template <> struct hash<:id>;
};
id get_id() const noexcept;
// 構造與析構
thread() noexcept;
template<class> explicit thread(F&f, Args&&… args);
~thread();
thread(const thread&) = delete;
thread(thread&&) noexcept;
thread& operator=( const thread&) = delete;
thread& operator=(thread&&) noexcept;
//
void swap(thread&) noexcept;
bool joinable() const noexcept;
void join();
void detach();
// 獲取物理線程數目

static unsigned hardware_concurrency() noexcept;
}
namespace this_thead{
thread::id get_id();
void yield();
template<class>
void sleep_until(const chrono::time_point<clock>& abs_time);
template<class>
void sleep_for(const chromo::duration& rel_time);
}
}
/<class>/<clock>/<class>/<class>/<class>/<chart>/<class>

和有些語言中定義的線程不同,C++11 所定義的線程是和操作系的線程是一一對應的,也就是說我們生成的線程都是直接接受操作系統的調度的,通過操作系統的相關命令(比如 ps -M 命令)是可以看到的,一個進程所能創建的線程數目以及一個操作系統所能創建的總的線程數目等都由運行時操作系統限定。

native_handle_type 是連接 thread 類和操作系統 SDK API 之間的橋樑,在 g++(libstdc++) for Linux 裡面,native_handle_type 其實就是 pthread 裡面的 pthread_t 類型,當 thread 類的功能不能滿足我們的要求的時候(比如改變某個線程的優先級),可以通過 thread 類實例的 native_handle() 返回值作為參數來調用相關的 pthread 函數達到目的。thread::id 定義了在運行時操作系統內唯一能夠標識該線程的標識符,同時其值還能指示所標識的線程的狀態,其默認值 (thread::id()) 表示不存在可控的正在執行的線程(即空線程,比如,調用 thead() 生成的沒有指定入口函數的線程類實例),當一個線程類實例的 get_id() 等於默認值的時候,即 get_id() == thread::id(),表示這個線程類實例處於下述狀態之一:

  • 尚未指定運行的任務
  • 線程運行完畢
  • 線程已經被轉移 (move) 到另外一個線程類實例
  • 線程已經被分離 (detached)

空線程 id 字符串表示形式依具體實現而定,有些編譯器為 0x0,有些為一句語義解釋。

有時候我們需要在線程執行代碼裡面對當前調用者線程進行操作,針對這種情況,C++11 裡面專門定義了一個名字空間 this_thread,其中包括 get_id() 函數可用來獲取當前調用者線程的 id,yield() 函數可以用來將調用者線程跳出運行狀態,重新交給操作系統進行調度,sleep_until 和 sleep_for 函數則可以讓調用者線程休眠若干時間。get_id() 函數實際上是通過調用 pthread_self() 函數獲得調用者線程的標識符,而 yield() 函數則是通過調用操作系統 API sched_yield() 進行調度切換。

如何創建和結束一個線程

和 pthread_create 不同,使用 thread 類創建線程可以使用一個函數作為入口,也可以是其它的 Callable 對象,而且,可以給入口傳入任意個數任意類型的參數:

清單 2.例子 thread_run_func_var_args.cc

int funcReturnInt(const char* fmt, ...){
va_list ap;
va_start(ap, fmt);
vprintf( fmt, ap );
va_end(ap);
return 0xabcd;
}
void threadRunFunction(void){
thread* t = new thread(funcReturnInt, "%d%s\\n", 100, "\\%");
t->join();
delete t;
}

我們也可以傳入一個 Lambda 表達式作為入口,比如:

清單 3.例子 thread_run_lambda.cc

void threadRunLambda(void){
int a = 100,
b = 200;
thread* t = new thread( [](int ia, int ib){
cout << (ia + ib) << endl;
},
a,
b );
t->join();
delete t;
}

一個類的成員函數也可以作為線程入口:

清單 4.例子 thread_run_member_func.cc

struct God{
void create(const char* anything){
cout << "create " << anything << endl;
}
};
void threadRunMemberFunction(void){
God god;
thread* t = new thread( &God::create, god, "the world" );
t->join();
delete t;

}

雖然 thread 類的初始化可以提供這麼豐富和方便的形式,其實現的底層依然是創建一個 pthread 線程並運行之,有些實現甚至是直接調用 pthread_create 來創建。

創建一個線程之後,我們還需要考慮一個問題:該如何處理這個線程的結束?一種方式是等待這個線程結束,在一個合適的地方調用 thread 實例的 join() 方法,調用者線程將會一直等待著目標線程的結束,當目標線程結束之後調用者線程繼續運行;另一個方式是將這個線程分離,由其自己結束,通過調用 thread 實例的 detach() 方法將目標線程置於分離模式。一個線程的 join() 方法與 detach() 方法只能調用一次,不能在調用了 join() 之後又調用 detach(),也不能在調用 detach() 之後又調用 join(),在調用了 join() 或者 detach() 之後,該線程的 id 即被置為默認值(空線程),表示不能繼續再對該線程作修改變化。如果沒有調用 join() 或者 detach(),那麼,在析構的時候,該線程實例將會調用 std::terminate(),這會導致整個進程退出,所以,如果沒有特別需要,一般都建議在生成子線程後調用其 join() 方法等待其退出,這樣子最起碼知道這些子線程在什麼時候已經確保結束。

在 C++11 裡面沒有提供 kill 掉某個線程的能力,只能被動地等待某個線程的自然結束,如果我們要主動停止某個線程的話,可以通過調用 Linux 操作系統提供的 pthread_kill 函數給目標線程發送信號來實現,示例如下:

清單 5.例子 thread_kill.cc

static void on_signal_term(int sig){
cout << "on SIGTERM:" << this_thread::get_id() << endl;
pthread_exit(NULL);
}
void threadPosixKill(void){
signal(SIGTERM, on_signal_term);
thread* t = new thread( [](){
while(true){
++counter;
}
});
pthread_t tid = t->native_handle();
cout << "tid=" << tid << endl;
// 確保子線程已經在運行。
this_thread::sleep_for( chrono::seconds(1) );
pthread_kill(tid, SIGTERM);
t->join();
delete t;
cout << "thread destroyed." << endl;
}

上述例子還可以用來給某個線程發送其它信號,具體的 pthread_exit 函數調用的約定依賴於具體的操作系統的實現,所以,這個方法是依賴於具體的操作系統的,而且,因為在 C++11 裡面沒有這方面的具體約定,用這種方式也是依賴於 C++編譯器的具體實現的。

線程類 std::thread 的其它方法和特點

thread 類是一個特殊的類,它不能被拷貝,只能被轉移或者互換,這是符合線程的語義的,不要忘記這裡所說的線程是直接被操作系統調度的。線程的轉移使用 move 函數,示例如下:

清單 6.例子 thread_move.cc

void threadMove(void){
int a = 1;
thread t( [](int* pa){
for(;;){
*pa = (*pa * 33) % 0x7fffffff;
if ( ( (*pa) >> 30) & 1) break;
}
}, &a);
thread t2 = move(t); // 改為 t2 = t 將不能編譯。
t2.join();
cout << "a=" << a << endl;
}

在這個例子中,如果將 t2.join() 改為 t.join() 將會導致整個進程被結束,因為忘記了調用 t2 也就是被轉移的線程的 join() 方法,從而導致整個進程被結束,而 t 則因為已經被轉移,其 id 已被置空。

線程實例互換使用 swap 函數,示例如下:

清單 7.例子 thread_swap.cc

void threadSwap(void){
int a = 1;
thread t( [](int* pa){
for(;;){
*pa = (*pa * 33) % 0x7fffffff;
if ( ( (*pa) >> 30) & 1) break;
}
}, &a);
thread t2;
cout << "before swap: t=" << t.get_id()
<< ", t2=" << t2.get_id() << endl;
swap(t, t2);
cout << "after swap : t=" << t.get_id()
<< ", t2=" << t2.get_id() << endl;
t2.join();
cout << "a=" << a << endl;
}

互換和轉移很類似,但是互換僅僅進行實例(以 id 作標識)的互換,而轉移則在進行實例標識的互換之前,還進行了轉移目的實例(如下例的t2)的清理,如果 t2 是可聚合的(joinable() 方法返回 true),則調用 std::terminate(),這會導致整個進程退出,比如下面這個例子:

清單 8.例子 thread_move_term.cc

void threadMoveTerm(void){
int a = 1;
thread t( [](int* pa){
for(;;){
*pa = (*pa * 33) % 0x7fffffff;
if ( ( (*pa) >> 30) & 1) break;
}
}, &a);
thread t2( [](){
int i = 0;
for(;;)i++;
} );
t2 = move(t); // 將會導致 std::terminate()
cout << "should not reach here" << endl;
t2.join();
}

所以,在進行線程實例轉移的時候,要注意判斷目的實例的 id 是否為空值(即 id())。

如果我們繼承了 thread 類,則還需要禁止拷貝構造函數、拷貝賦值函數以及賦值操作符重載函數等,另外,thread 類的析構函數並不是虛析構函數。示例如下:

清單 9.例子 thread_inherit.cc

class MyThread : public thread{
public:
MyThread() noexcept : thread(){};
template<typename>
explicit
MyThread(Callable&& func, Args&&... args) :
thread( std::forward<callable>(func),
std::forward<args>(args)...){
}
~MyThread() { thread::~thread(); }
// disable copy constructors
MyThread( MyThread& ) = delete;
MyThread( const MyThread& ) = delete;
MyThread& operator=(const MyThread&) = delete;
};
/<args>/<callable>/<typename>

因為 thread 類的析構函數不是虛析構函數,在上例中,需要避免出現下面這種情況:

MyThread* tc = new MyThread(...);

...

thread* tp = tc;

...

delete tp;

這種情況會導致 MyThread 的析構函數沒有被調用。

線程的調度

我們可以調用 this_thread::yield() 將當前調用者線程切換到重新等待調度,但是不能對非調用者線程進行調度切換,也不能讓非調用者線程休眠(這是操作系統調度器乾的活)。

清單 10.例子 thread_yield.cc

void threadYield(void){
unsigned int procs = thread::hardware_concurrency(), // 獲取物理線程數目
i = 0;
thread* ta = new thread( [](){
struct timeval t1, t2;
gettimeofday(&t1, NULL);
for(int i = 0, m = 13; i < COUNT; i++, m *= 17){
this_thread::yield();
}
gettimeofday(&t2, NULL);
print_time(t1, t2, " with yield");
} );
thread** tb = new thread*[ procs ];
for( i = 0; i < procs; i++){
tb[i] = new thread( [](){
struct timeval t1, t2;
gettimeofday(&t1, NULL);
for(int i = 0, m = 13; i < COUNT; i++, m *= 17){
do_nothing();
}
gettimeofday(&t2, NULL);
print_time(t1, t2, "without yield");
});
}
ta->join();
delete ta;
for( i = 0; i < procs; i++){
tb[i]->join();
delete tb[i];
};
delete tb;
}

ta 線程因為需要經常切換去重新等待調度,它運行的時間要比 tb 要多,比如在作者的機器上運行得到如下結果:

$time ./a.out
without yield elapse 0.050199s
without yield elapse 0.051042s
without yield elapse 0.05139s
without yield elapse 0.048782s
with yield elapse 1.63366s

real 0m1.643s
user 0m1.175s
sys 0m0.611s

ta 線程即使扣除系統調用運行時間 0.611s 之後,它的運行時間也遠大於沒有進行切換的線程。

C++11 沒有提供調整線程的調度策略或者優先級的能力,如果需要,只能通過調用相關的 pthread 函數來進行,需要的時候,可以通過調用 thread 類實例的 native_handle() 方法或者操作系統 API pthread_self() 來獲得 pthread 線程 id,作為 pthread 函數的參數。

線程間的數據交互和數據爭用 (Data Racing)

同一個進程內的多個線程之間多是免不了要有數據互相來往的,隊列和共享數據是實現多個線程之間的數據交互的常用方式,封裝好的隊列使用起來相對來說不容易出錯一些,而共享數據則是最基本的也是較容易出錯的,因為它會產生數據爭用的情況,即有超過一個線程試圖同時搶佔某個資源,比如對某塊內存進行讀寫等,如下例所示:

清單 11.例子 thread_data_race.cc

static void
inc(int *p ){
for(int i = 0; i < COUNT; i++){
(*p)++;
}
}
void threadDataRacing(void){

int a = 0;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}

這是簡化了的極端情況,我們可以一眼看出來這是兩個線程在同時對&a 這個內存地址進行寫操作,但是在實際工作中,在代碼的海洋中發現它並不一定容易。從表面看,兩個線程執行完之後,最後的 a 值應該是 COUNT * 2,但是實際上並非如此,因為簡單如 (*p)++這樣的操作並不是一個原子動作,要解決這個問題,對於簡單的基本類型數據如字符、整型、指針等,C++提供了原子模版類 atomic,而對於複雜的對象,則提供了最常用的鎖機制,比如互斥類 mutex,門鎖 lock_guard,唯一鎖 unique_lock,條件變量 condition_variable 等。

現在我們使用原子模版類 atomic 改造上述例子得到預期結果:

清單 12.例子 thread_atomic.cc

static void
inc(atomic *p ){
for(int i = 0; i < COUNT; i++){
(*p)++;
}
}
void threadDataRacing(void){
atomic a(0) ;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();

tb.join();
cout << "a=" << a << endl;
}

我們也可以使用 lock_guard,lock_guard 是一個範圍鎖,本質是 RAII(Resource Acquire Is Initialization),在構建的時候自動加鎖,在析構的時候自動解鎖,這保證了每一次加鎖都會得到解鎖。即使是調用函數發生了異常,在清理棧幀的時候也會調用它的析構函數得到解鎖,從而保證每次加鎖都會解鎖,但是我們不能手工調用加鎖方法或者解鎖方法來進行更加精細的資源佔用管理,使用 lock_guard 示例如下:

清單 13.例子 thread_lock_guard.cc

static mutex g_mutex;
static void
inc(int *p ){
for(int i = 0; i < COUNT; i++){
lock_guard<mutex> _(g_mutex);
(*p)++;
}
}
void threadLockGuard(void){
int a = 0;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}
/<mutex>

如果要支持手工加鎖,可以考慮使用 unique_lock 或者直接使用 mutex。unique_lock 也支持 RAII,它也可以一次性將多個鎖加鎖;如果使用 mutex 則直接調用 mutex 類的 lock, unlock, trylock 等方法進行更加精細的鎖管理:

清單 14.例子 thread_mutex.cc

static mutex g_mutex;
static void
inc(int *p ){
thread_local int i; // TLS 變量
for(; i < COUNT; i++){
g_mutex.lock();
(*p)++;
g_mutex.unlock();
}
}
void threadMutex(void){
int a = 0;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}

在上例中,我們還使用了線程本地存儲 (TLS) 變量,我們只需要在變量前面聲明它是 thread_local 即可。TLS 變量在線程棧內分配,線程棧只有在線程創建之後才生效,在線程退出的時候銷燬,需要注意不同系統的線程棧的大小是不同的,如果 TLS 變量佔用空間比較大,需要注意這個問題。TLS 變量一般不能跨線程,其初始化在調用線程第一次使用這個變量時進行,默認初始化為 0。

對於線程間的事件通知,C++11 提供了條件變量類 condition_variable,可視為 pthread_cond_t 的封裝,使用條件變量可以讓一個線程等待其它線程的通知 (wait,wait_for,wait_until),也可以給其它線程發送通知 (notify_one,notify_all),條件變量必須和鎖配合使用,在等待時因為有解鎖和重新加鎖,所以,在等待時必須使用可以手工解鎖和加鎖的鎖,比如 unique_lock,而不能使用 lock_guard,示例如下:

清單 15.例子 thread_cond_var.cc

#include <thread>
#include <iostream>
#include <condition>
using namespace std;
mutex m;
condition_variable cv;
void threadCondVar(void){
# define THREAD_COUNT 10
thread** t = new thread*[THREAD_COUNT];
int i;
for(i = 0; i < THREAD_COUNT; i++){
t[i] = new thread( [](int index){
unique_lock<mutex> lck(m);
cv.wait_for(lck, chrono::hours(1000));
cout << index << endl;
}, i );
this_thread::sleep_for( chrono::milliseconds(50));
}
for(i = 0; i < THREAD_COUNT; i++){
lock_guard<mutex> _(m);
cv.notify_one();
}
for(i = 0; i < THREAD_COUNT; i++){
t[i]->join();
delete t[i];
}
delete t;
}
/<mutex>/<mutex>/<condition>/<iostream>/<thread>

從上例的運行結果也可以看到,條件變量是不保證次序的,即首先調用 wait 的不一定首先被喚醒。

幾個高級概念

C++11 提供了若干多線程編程的高級概念:promise/future, packaged_task, async,來簡化多線程編程,尤其是線程之間的數據交互比較簡單的情況下,讓我們可以將注意力更多地放在業務處理上。

promise/future 可以用來在線程之間進行簡單的數據交互,而不需要考慮鎖的問題,線程 A 將數據保存在一個 promise 變量中,另外一個線程 B 可以通過這個 promise 變量的 get_future() 獲取其值,當線程 A 尚未在 promise 變量中賦值時,線程 B 也可以等待這個 promise 變量的賦值:

清單 16.例子 thread_promise_future.cc

promise<string> val;
static void
threadPromiseFuture(){
thread ta([](){
future<string> fu = val.get_future();
cout << "waiting promise->future" << endl;
cout << fu.get() << endl;
});
thread tb([](){
this_thread::sleep_for( chrono::milliseconds(100) );
val.set_value("promise is set");
});
ta.join();
tb.join();
}
/<string>/<string>

一個 future 變量只能調用一次 get(),如果需要多次調用 get(),可以使用 shared_future,通過 promise/future 還可以在線程之間傳遞異常。

如果將一個 callable 對象和一個 promise 組合,那就是 packaged_task,它可以進一步簡化操作:

清單 17.例子 thread_packaged_task.cc

static mutex g_mutex;
static void
threadPackagedTask(){
auto run = [=](int index){
{
lock_guard<mutex> _(g_mutex);
cout << "tasklet " << index << endl;

}
this_thread::sleep_for( chrono::seconds(10) );
return index * 1000;
};
packaged_task pt1(run);
packaged_task pt2(run);
thread t1([&](){pt1(2);} );
thread t2([&](){pt2(3);} );
int f1 = pt1.get_future().get();
int f2 = pt2.get_future().get();
cout << "task result=" << f1 << endl;
cout << "task result=" << f2 << endl;
t1.join();
t2.join();
}
/<mutex>

我們還可以試圖將一個 packaged_task 和一個線程組合,那就是 async() 函數。使用 async() 函數啟動執行代碼,返回一個 future 對象來保存代碼返回值,不需要我們顯式地創建和銷燬線程等,而是由 C++11 庫的實現決定何時創建和銷燬線程,以及創建幾個線程等,示例如下:

清單 18.例子 thread_async.cc

static long
do_sum(vector<long> *arr, size_t start, size_t count){
static mutex _m;
long sum = 0;
for(size_t i = 0; i < count; i++){
sum += (*arr)[start + i];
}
{
lock_guard<mutex> _(_m);
cout << "thread " << this_thread::get_id()
<< ", count=" << count
<< ", sum=" << sum << endl;
}
return sum;
}

static void
threadAsync(){
# define COUNT 1000000
vector<long> data(COUNT);
for(size_t i = 0; i < COUNT; i++){
data[i] = random() & 0xff;
}
//
vector< future<long> > result;
size_t ptc = thread::hardware_concurrency() * 2;
for(size_t batch = 0; batch < ptc; batch++){
size_t batch_each = COUNT / ptc;
if (batch == ptc - 1){
batch_each = COUNT - (COUNT / ptc * batch);
}
result.push_back(async(do_sum, &data, batch * batch_each, batch_each));
}
long total = 0;
for(size_t batch = 0; batch < ptc; batch++){
total += result[batch].get();
}
cout << "total=" << total << endl;
}
/<long>/<long>/<mutex>/<long>

如果是在多核或者多 CPU 的環境上面運行上述例子,仔細觀察輸出結果,可能會發現有些線程 ID 是重複的,這說明重複使用了線程,也就是說,通過使用 async() 還可達到一些線程池的功能。

幾個需要注意的地方

thread 同時也是棉線、毛線、絲線等意思,我想大家都能體會面對一團亂麻不知從何處查找頭緒的感受,不要忘了,線程不是靜態的,它是不斷變化的,請想像一下面對一團會動態變化的亂麻的情景。所以,使用多線程技術的首要準則是我們自己要十分清楚我們的線程在哪裡?線頭(線程入口和出口)在哪裡?先安排好線程的運行,注意不同線程的交叉點(訪問或者修改同一個資源,包括內存、I/O 設備等),儘量減少線程的交叉點,要知道幾條線堆在一起最怕的是互相打結。

當我們的確需要不同線程訪問一個共同的資源時,一般都需要進行加鎖保護,否則很可能會出現數據不一致的情況,從而出現各種時現時不現的莫名其妙的問題,加鎖保護時有幾個問題需要特別注意:一是一個線程內連續多次調用非遞歸鎖 (non-recursive lock) 的加鎖動作,這很可能會導致異常;二是加鎖的粒度;三是出現死鎖 (deadlock),多個線程互相等待對方釋放鎖導致這些線程全部處於罷工狀態。

第一個問題只要根據場景調用合適的鎖即可,當我們可能會在某個線程內重複調用某個鎖的加鎖動作時,我們應該使用遞歸鎖 (recursive lock),在 C++11 中,可以根據需要來使用 recursive_mutex,或者 recursive_timed_mutex。

第二個問題,即鎖的粒度,原則上應該是粒度越小越好,那意味著阻塞的時間越少,效率更高,比如一個數據庫,給一個數據行 (data row) 加鎖當然比給一個表 (table) 加鎖要高效,但是同時複雜度也會越大,越容易出錯,比如死鎖等。

對於第三個問題我們需要先看下出現死鎖的條件:

  1. 資源互斥,某個資源在某一時刻只能被一個線程持有 (hold);
  2. 吃著碗裡的還看著鍋裡的,持有一個以上的互斥資源的線程在等待被其它進程持有的互斥資源;
  3. 不可搶佔,只有在某互斥資源的持有線程釋放了該資源之後,其它線程才能去持有該資源;
  4. 環形等待,有兩個或者兩個以上的線程各自持有某些互斥資源,並且各自在等待其它線程所持有的互斥資源。

我們只要不讓上述四個條件中的任意一個不成立即可。在設計的時候,非常有必要先分析一下會否出現滿足四個條件的情況,特別是檢查有無試圖去同時保持兩個或者兩個以上的鎖,當我們發現試圖去同時保持兩個或者兩個以上的鎖的時候,就需要特別警惕了。下面我們來看一個簡化了的死鎖的例子:

清單 19.例子 thread_deadlock.cc

static mutex g_mutex1, g_mutex2;
static void
inc1(int *p ){
for(int i = 0; i < COUNT; i++){
g_mutex1.lock();
(*p)++;
g_mutex2.lock();
// do something.
g_mutex2.unlock();
g_mutex1.unlock();
}
}
static void
inc2(int *p ){
for(int i = 0; i < COUNT; i++){
g_mutex2.lock();
g_mutex1.lock();
(*p)++;

g_mutex1.unlock();
// do other thing.
g_mutex2.unlock();
}
}
void threadMutex(void){
int a = 0;
thread ta( inc1, &a);
thread tb( inc2, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}

在這個例子中,g_mutex1 和 g_mutex2 都是互斥的資源,任意時刻都只有一個線程可以持有(加鎖成功),而且只有持有線程調用 unlock 釋放鎖資源的時候其它線程才能去持有,滿足條件 1 和 3,線程 ta 持有了 g_mutex1 之後,在釋放 g_mutex1 之前試圖去持有 g_mutex2,而線程 tb 持有了 g_mutex2 之後,在釋放 g_mutex2 之前試圖去持有 g_mutex1,滿足條件 2 和 4,這種情況之下,當線程 ta 試圖去持有 g_mutex2 的時候,如果 tb 正持有 g_mutex2 而試圖去持有 g_mutex1 時就發生了死鎖。在有些環境下,可能要多次運行這個例子才出現死鎖,實際工作中這種偶現特性讓查找問題變難。要破除這個死鎖,我們只要按如下代碼所示破除條件 3 和 4 即可:

清單 20.例子 thread_break_deadlock.cc

static mutex g_mutex1, g_mutex2;
static voi
inc1(int *p ){
for(int i = 0; i < COUNT; i++){
g_mutex1.lock();
(*p)++;
g_mutex1.unlock();
g_mutex2.lock();
// do something.

g_mutex2.unlock();
}
}
static void
inc2(int *p ){
for(int i = 0; i < COUNT; i++){
g_mutex2.lock();
// do other thing.
g_mutex2.unlock();
g_mutex1.lock();
(*p)++;
g_mutex1.unlock();
}
}
void threadMutex(void){
int a = 0;
thread ta( inc1, &a);
thread tb( inc2, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}

在一些複雜的並行編程場景,如何避免死鎖是一個很重要的話題,在實踐中,當我們看到有兩個鎖嵌套加鎖的時候就要特別提高警惕,它極有可能滿足了條件 2 或者 4。

結束語

上述例子在 CentOS 6.5,g++ 4.8.1/g++4.9 以及 clang 3.5 下面編譯通過,在編譯的時候,請注意下述幾點:

  • 設置 -std=c++11;
  • 鏈接的時候設置 -pthread;
  • 使用 g++編譯鏈接時設置 -Wl,--no-as-needed 傳給鏈接器,有些版本的 g++需要這個設置;
  • 設置宏定義 -D_REENTRANT,有些庫函數是依賴於這個宏定義來確定是否使用多線程版本的。

具體可以參考本文所附的代碼中的 Makefile 文件。

在用 gdb 調試多線程程序的時候,可以輸入命令 info threads 查看當前的線程列表,通過命令 thread n 切換到第 n 個線程的上下文,這裡的 n 是 info threads 命令輸出的線程索引數字,例如,如果要切換到第 2 個線程的上下文,則輸入命令 thread 2。

聰明地使用多線程,擁抱多線程吧。


分享到:


相關文章: