Linux C實現純用戶態搶佔式多線程

所謂 搶佔式多線程調度 ,就是不依靠線程自己來放棄CPU從而將執行權交給別的線程,而是靠一種外部主動干擾模式的調度機制,在需要調度的時刻,強行剝奪當前線程的執行權,依靠策略選擇另一個線程來運行。

當時之所以沒有找到優雅的方案,是因為我沒有找到什麼地方可以同時做到兩件事:

中斷當前的線程執行,進入一個handler來根據調度策略實施調度和切換。

在這個handler中修改該進程的寄存器上下文,剝奪當前線程的執行權交給另一個線程。

首先,上述第1點是可以用信號完成的,比如用alarm函數,可以實現分時中斷。然而在中斷處理函數中,我沒有找到修改寄存器的方法,曾經想過用setjmp/longjmp,然而失敗,最終使用PTRACE機制實現了一個無比醜陋和粗糙的。

在九年前的文章的中,開篇我就說 純用戶空間的搶佔式多線程庫其實是很麻煩的一件事! 確實麻煩,之所以這麼認為就是因為上面的難題沒有解決。

當時確實是術業不精啊。後面的幾年,自己也沒怎麼看過Linux內核信號處理相關的東西。

週六恰逢正則喝完奶睡著了之後,我一個人又不能出去浪,突然就又想到了這個問題。我發誓要找一個優雅的方案出來,畢竟九年過去了,我想自己的內功應該比那時強太多了。

確實,這個方案也真的是信手拈來。

我知道,Linux進程在執行流返回用戶態前處理信號的時候,要調用信號處理函數,而這個信號處理函數是定義在用戶態的,所以Linux進程為了可以執行這個handler函數,便需要自己setup一下用戶態堆棧。

而這個機制,恰恰給了我們修改寄存器的機會。

九年前,我一直以為用戶態的寄存器上下文在完全返回用戶態之前,始終是保存在內核棧上,無法修改。但事實上,當執行信號處理函數的時候,內核會把該進程內核棧上的寄存器上下文sigcontext拷貝到用戶態的堆棧中,再壓入一個sigreturn系統調用作為返回地址,然後等信號處理函數完成後,sigreturn將會自動陷入內核,再將用戶態的sigcontext拷貝回內核棧,以徹底完成信號處理,恢復進程的寄存器上下文。

也就是說,當信號處理函數被執行時,是可以在當前堆棧上找到寄存器上下文的,我們只需要在堆棧上找sigcontext結構體即可。這時,我們對其進行修改,然後這些被修改過的寄存器上下文將會在信號處理完成返回內核時,更新內核棧上的寄存器上下文,從而達到我們的目的。

那麼,我先寫一個信號處理函數,看看信號處理函數執行時,堆棧上都有什麼:

<code>#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
int i, j, k = 0;
unsigned char *stack_buffer;
unsigned long *p;
void sig_start(int signo)
{
\tunsigned long a = 0x1234567811223344;
\tp = (unsigned char *)&a;
\tstack_buffer = (unsigned char *)&a;
\t// 以下按照8字節為一組,打印堆棧的內容
\tprintf("----begin stack----\\n");
\tfor (i = 0; i < 32; i++) {
\t\tfor (j = 0; j < 8; j++) {
\t\t\tprintf(" %.2x", stack_buffer[k]);
\t\t\tk++;
\t\t}
\t\tprintf("\\n");
\t}
\tprintf("----end stack----\\n");
\tif (signo = SIGINT)
\t\tsignal(SIGINT, NULL);
\tif (signo = SIGHUP)
\t\tsignal(SIGHUP, NULL);
\treturn;
}
int main()
{
\tprintf("process id is %d %p %p\\n",getpid(), main, wait_start);
\tsignal(SIGINT, sig_start);
\tsignal(SIGHUP, sig_start);
\tfor (;;);
}/<signal.h>/<string.h>/<stdlib.h>/<stdio.h>/<unistd.h>/<code>

讓我們執行之,按下Ctrl-C給它一個SIGINT信號,看看打印的堆棧的內容:

Linux C實現純用戶態搶佔式多線程

我是在x86_64平臺上做的實驗,所以我們要看x86_64的rt_sigframe結構體,它位於:

arch/x86/include/asm/sigframe.h:

<code>#ifdef CONFIG_X86_64
struct rt_sigframe {
char __user *pretcode;
struct ucontext uc;
struct siginfo info;
/* fp state follows here */
};
...
/* 一路追溯,看看rt_sigframe展開後的樣子 */
// include/uapi/asm-generic/ucontext.h
struct ucontext {
unsigned long uc_flags;
struct ucontext *uc_link;
stack_t uc_stack;
struct sigcontext uc_mcontext; // 這個就是我們要找的東西!
sigset_t uc_sigmask; /* mask last for extensibility */
};/<code>

計算一下偏移位置,正好是處在 pretcode字段 的 58 字節處。也就是說,只要我們找到信號處理函數的 pretcode 偏移,將其再加 58=40 字節就是sigcontext結構體了,這個結構體裡全部都是寄存器:

<code>struct sigcontext {
__u64 r8;
__u64 r9;
__u64 r10;
__u64 r11;
__u64 r12;
__u64 r13;
__u64 r14;
__u64 r15;
__u64 rdi;
__u64 rsi;
__u64 rbp;
__u64 rbx;

__u64 rdx;
__u64 rax;
__u64 rcx;
__u64 rsp;
__u64 rip;
__u64 eflags; /* RFLAGS */
__u16 cs;
__u16 gs;
__u16 fs;
__u16 __pad0;
__u64 err;
__u64 trapno;
__u64 oldmask;
__u64 cr2;
struct _fpstate *fpstate; /* zero when no FPU context */
#ifdef __ILP32__
__u32 __fpstate_pad;
#endif
__u64 reserved1[8];
};/<code>

我們所謂的純用戶態線程調度,就是在信號處理函數里save/restore上述的結構體就好了,而上述的結構體的位置,我們已經知道它在哪裡了。

<code>#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
// 僅僅是測試demo,分配4096字節的stack足夠了。
#define STACK_SIZE\t\t4096
/*
* 為什麼是72?
* 因為我們在信號處理中增加了一個局部變量,這樣pretcode的偏移就是32字節了。
* 於是32+40=72!
*/
#define CONTEXT_OFFSET\t72
// rip寄存器相對於局部變量a的偏移。注意rip在sigcontext中的偏移是16

#define PC_OFFSET\t\t200
int wait_start()
{
\tfor (;;) {
\t\tsleep(1000);
\t}
}
// 線程1的處理函數
void thread1()
{
\tint a = 1, ret = 0;
\tchar buf[64];
\tint fd = open("./file", O_RDWR);
\tfor (;;) {
\t\t// 線程1持續往一個文件裡寫內容。
\t\tsnprintf(buf, 32, "user thread 1 stack: %p value:%d\\n", &a, a++);
\t\tret = write(fd, buf, 32);
\t\tprintf("write buffer to file:%s size=%d\\n", buf, ret);
\t\tsleep(1);
\t}
}
// 線程2的處理函數
void thread2()
{
\tint a = 2;
\tfor (;;) {
\t\t// 線程2隨便打印些自己棧上的什麼東西。
\t\tprintf("tcp user cong 2 stack: %p value:%d\\n", &a, a++);
\t\tsleep(1);
\t}
}
unsigned char *buf;
int start = 0;
struct sigcontext context[2];
struct sigcontext *curr_con;
unsigned long pc[2];
int idx = 0;
unsigned char *stack1, *stack2;
// SIGINT用來啟動所有線程,每次信號啟動一個。
void sig_start(int dunno)
{
\tunsigned long a = 0, *p;
\tif (start == 0) { // 啟動第一個線程

\t\t// 首先定位到sigcontext的rip,啟動線程僅僅修改rip即可,目標是跳入到thread1線程處理函數
\t\tp = (unsigned long*)((unsigned char *)&a + PC_OFFSET);
\t\t*p = pc[0];
\t\t// 定位到sigcontext
\t\tp = (unsigned long *)((unsigned char *)&a + CONTEXT_OFFSET);
\t\tcurr_con = (struct sigcontext *)p;
\t\t// 初始化其堆棧寄存器為為該線程分配的獨立堆棧空間。
\t\tcurr_con->rsp = curr_con->rbp = (unsigned long)((unsigned char *)stack1 + STACK_SIZE);
\t\tstart++;
\t} else if (start == 1) { // 啟動第二個線程
\t\t// 定位線程1的sigcontext,保存其上下文,因為馬上就要schedule線程2了。
\t\tp = (unsigned long *)((unsigned char *)&a + CONTEXT_OFFSET);
\t\tcurr_con = (struct sigcontext *)p;
\t\tmemcpy((void *)&context[0], (const void *)curr_con, sizeof(struct sigcontext));
\t\t// 保存第一個線程的上下文後再定位到sigcontext的rip並修改之,同線程1
\t\tp = (unsigned long *)((char*)&a + PC_OFFSET);
\t\tidx = 1;
\t\t*p = pc[1];
\t\tp = (unsigned long *)((unsigned char *)&a + CONTEXT_OFFSET);
\t\tcurr_con = (struct sigcontext *)p;
\t\t// 初始化其堆棧寄存器為為該線程分配的獨立堆棧空間。
\t\tcurr_con->rsp = curr_con->rbp = (unsigned long)((unsigned char *)stack2 + STACK_SIZE);
\t\tstart++;
\t\t// 兩個線程均啟動完畢,開啟時間片輪轉調度吧。
\t\talarm(2);
\t\tsignal(SIGINT, NULL);
\t}
\treturn;
}
void sig_schedule(int unused)
{
\tunsigned long a = 0;
\tunsigned char *p;
\t
\t// 保存當前線程的上下文

\tp = (unsigned char *)((unsigned char *)&a + CONTEXT_OFFSET);
\tcurr_con = (struct sigcontext *)p;
\tmemcpy((void *)&context[idx%2], curr_con, sizeof(struct sigcontext));
\t
\t// 輪轉調度下一個線程,恢復其上下文。
\tidx++;
\tmemcpy(curr_con, (void *)&context[idx%2], sizeof(struct sigcontext));
\t// 2秒後再調度
\talarm(2);
\treturn;
}
int main()
{
\tprintf("process id is %d %p %p\\n",getpid(), thread1, thread2);
\t
\t// 為兩個線程分配stack空間。
\t// 注意,線程的stack空間一定要獨立,不然函數調用會衝突的。
\tstack1 = (unsigned char *)calloc(1, 4096);
\tstack2 = (unsigned char *)calloc(1, 4096);
\tsignal(SIGINT, sig_start);
\tsignal(SIGALRM, sig_schedule);
\tpc[0] = (unsigned long)thread1;
\tpc[1] = (unsigned long)thread2;
\twait_start();
}/<signal.h>/<string.h>/<stdlib.h>/<stdio.h>/<fcntl.h>/<unistd.h>/<code>

效果如下:

Linux C實現純用戶態搶佔式多線程

可以看出,兩個線程完美交替執行!

第一個例子有點複雜了,我們換個簡單的:

<code>void thread1()
{
int i = 1;
while (1) {
printf("I am thread:%d\\n", i);
sleep(1);
}
}
void thread2()
{
int i = 2;
while (1) {
printf("I am thread:%d\\n", i);
sleep(1);
}
}/<code>

效果如下:

Linux C實現純用戶態搶佔式多線程

以上的 純用戶態 多線程設計中,沒有使用任何操作系統進程級別以外的數據結構存儲線程上下文,這就是 純用戶態 的含義。我們看到所有的線程上下文以及線程調度相關的數據結構都存儲在單獨的一個進程地址空間。

換句話說, 在單獨的該進程之外,沒人意識得到這個多線程的存在! 這個容納用戶態多線程的進程容器,就是一個 虛擬機 實例,它完成了線程硬件上下文的save/restore,調度,切換,就像Linux內核之於Linux進程所做的那般。

我說,我的這個純用戶態搶佔式多線程,使用了信號處理機制。

有人會問,不是說 “純” 用戶態嗎?幹嘛用信號?信號不是內核機制嗎?

是的,信號是內核機制,但 這裡的關注點不在信號是不是內核機制,而是“需要一個第三方來實施搶佔式調度”, 為什麼需要 “第三方”?

因為搶佔式多線程不是協作式多線程,既然線程自己不參與調度決策,那就必然需要第三方來決策。使用信號只是一種方式,由於我在Linux系統做這個用戶態多線程,信號恰恰是可以滿足需求的。當然,也可以不用信號,如果你能找到等價的機制也是可以的。

【 注意⚠️: 採用信號機制來搶佔的開銷確實有點大,但是這只是一種可實現的方式,並不是唯一方式,此外,這種搶佔式調度完全是可以和協作式調度比如協程協同工作的,只有在發生 某種不得不搶佔 的事件後,才實施信號搶佔。】

需要C/C++ Linux服務器架構師學習資料私信“資料”(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享


分享到:


相關文章: