執行與任務分離的組件— 線程池
https://github.com/wangbojing/threadpool
多線程技術主要解決了處理器單元內多個線程執行的問題,它可以顯著的減少處理器單元的閒置時間,增加處理器單元的吞吐能力。線程池是多線程編程的一個必要組件,並且對於很多編程人員都是透明的,更是神秘的。有幸能為大家解析其中緣由,尚有不妥之處,歡迎大家拋磚。
線程池的概念,是一個用來管理一組執行任務線程的工具。既然是管理工具,那麼該工具管理是用來管理任務與執行的。如圖一線程池組件拓撲圖,執行隊列(Workers),任務隊列(Jobs)和池管理(Pool Manager)三部分組成。
執行隊列(Workers)是用來存放運行線程的隊列。
任務隊列(Jobs)是用來存放需要被執行的任務隊列。
池管理(Pool Manager)主要是管理執行隊列的執行順序,執行任務的時間長短,對長時間沒有使用的執行單元進行釋放,執行單元滿負荷運行的時及時添加執行單元;記錄未執行的任務數量,對新任務入隊,即將執行的任務出隊等等。
圖一 線程池組件拓撲圖
執行隊列(Workers)中的每一個執行單元(Worker)由哪些元素組成?線程ID,退出標誌。
任務隊列(Jobs)中的每一個任務(Jobs)的組成元素?執行每一個任務的具體執行函數,每一個任務的執行參數。
池管理(Pool Manager)由哪些元素組成?每一個新任務添加與執行時的移除用的互斥鎖,每一個線程掛起的時所等待的條件變量。
根據分析如圖二線程池的類圖。
圖二線程池的類圖
到這裡一個簡單的線程池就已經可以呼之欲出了。以下為實現代碼
/*
* Author: WangBoJing
* email: [email protected]
* github: https://github.com/wangbojing
*/
#include
#include
#include
#include
#define LL_ADD(item, list) do { \
item->prev = NULL; \
item->next = list; \
list = item; \
}
while
(0)
#define LL_REMOVE(item, list) do { \
if
(item->prev != NULL) item->prev->next = item->next; \
if
(item->next != NULL) item->next->prev = item->prev; \
if
(list == item) list = item->next; \
item->prev = item->next = NULL; \
}
while
(0)
typedef
void
(*JOB_CALLBACK)(
void
*);
struct
NTHREADPOOL;
typedef
struct
NWORKER {
pthread_t
thread
;
int
terminate;
struct
NTHREADPOOL *pool;
struct
NWORKER *next;
struct
NWORKER *prev;
} nWorker;
typedef
struct
NJOB {
JOB_CALLBACK job_func;
void
*arg;
struct
NJOB *next;
struct
NJOB *prev;
} nJob;
typedef
struct
NTHREADPOOL {
struct
NWORKER *workers;
struct
NJOB *jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nThreadPool;
void
*ntyWorkerThread(
void
*arg) {
nWorker *worker = (nWorker*)arg;
while
(1) {
pthread_mutex_lock(&worker->pool->jobs_mtx);
while
(worker->pool->jobs == NULL) {
if
(worker->terminate)
break
;
pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);
}
if
(worker->terminate) {
pthread_mutex_unlock(&worker->pool->jobs_mtx);
break
;
}
nJob *job = worker->pool->jobs;
if
(job != NULL) {
LL_REMOVE(job, worker->pool->jobs);
}
pthread_mutex_unlock(&worker->pool->jobs_mtx);
if
(job == NULL)
continue
;
job->job_func(job);
usleep(1);
}
free
(worker);
pthread_exit(NULL);
}
int
ntyThreadPoolCreate(nThreadPool *pool,
int
numWorkers) {
if
(pool == NULL)
return
1;
if
(numWorkers < 1) numWorkers = 1;
memset
(pool, 0,
sizeof
(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy
(&pool->jobs_cond, &blank_cond,
sizeof
(pool->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy
(&pool->jobs_mtx, &blank_mutex,
sizeof
(pool->jobs_mtx));
int
i = 0;
for
(i = 0;i < numWorkers;i ++) {
nWorker *worker = (nWorker*)
malloc
(
sizeof
(nWorker));
if
(worker == NULL) {
perror
(
"malloc"
);
return
1;
}
memset
(worker, 0,
sizeof
(nWorker));
worker->pool = pool;
int
ret = pthread_create(&worker->
thread
, NULL, ntyWorkerThread, (
void
*)worker);
if
(ret) {
perror
(
"pthread_create"
);
free
(worker);
return
1;
}
LL_ADD(worker, worker->pool->workers);
}
}
void
ntyThreadPoolShutdown(nThreadPool *pool) {
nWorker *worker = NULL;
for
(worker = pool->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1;
}
pthread_mutex_lock(&pool->jobs_mtx);
pool->workers = NULL;
pool->jobs = NULL;
pthread_cond_broadcast(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mtx);
}
void
ntyThreadPoolPush(nThreadPool *pool, nJob *job) {
pthread_mutex_lock(&pool->jobs_mtx);
LL_ADD(job, pool->jobs);
pthread_cond_signal(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mtx);
}
/********************************* debug thread pool *********************************/
#define KING_MAX_THREADS 80
#define KING_COUNTER_SIZE 1000
void
king_counter(
void
*arg) {
nJob *job = (nJob*)arg;
int
index = *(
int
*)job->arg;
printf
(
"index: %d, selfid:%lu\n"
, index, pthread_self());
free
(job->arg);
free
(job);
}
int
main(
int
argc,
char
*argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, KING_MAX_THREADS);
int
i = 0;
for
(i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)
malloc
(
sizeof
(nJob));
if
(job == NULL) {
perror
(
"malloc"
);
exit
(1);
}
job->job_func = king_counter;
job->arg =
malloc
(
sizeof
(
int
));
*(
int
*)job->arg = i;
ntyThreadPoolPush(&pool, job);
}
getchar
();
printf
(
"You are very good !!!!\n"
);
}
這樣的線程池還是隻是一個Demo,原因有如下幾點需要我們值得改進的。
線程池的線程數量是確定的,不能隨著系統任務請求數量放縮線程池的大小。
任務數量的統計,並沒有對任務隊列進行統計
執行任務中的線程數量,等待執行的任務數量進行統計
每一個執行任務的時間沒有做限制,
-
IO密集型與計算密集型區分,線程池非常常用,但是根據不同的業務場景需要設置不同配置
在用戶任務執行函數里,用戶主動的調用了pthread_exit退出線程的保護機制
針對於以上幾點問題,改進了一版線程池
/*
* Author: WangBoJing
* email: [email protected]
* github: https://github.com/wangbojing
*/
#include
#include
#include
#include
#include
#include
#include
#include
typedef
void
(*JOB_CALLBACK)(
void
*);
typedef
struct
NJOB {
struct
NJOB *next;
JOB_CALLBACK func;
void
*arg;
} nJob;
typedef
struct
NWORKER {
struct
NWORKER *active_next;
pthread_t active_tid;
} nWorker;
typedef
struct
NTHREADPOOL {
struct
NTHREADPOOL *forw;
struct
NTHREADPOOL *back;
pthread_mutex_t mtx;
pthread_cond_t busycv;
pthread_cond_t workcv;
pthread_cond_t waitcv;
nWorker *active;
nJob *head;
nJob *tail;
pthread_attr_t attr;
int
flags;
unsigned
int
linger;
int
minimum;
int
maximum;
int
nthreads;
int
idle;
} nThreadPool;
static
void
* ntyWorkerThread(
void
*arg);
#define NTY_POOL_WAIT 0x01
#define NTY_POOL_DESTROY 0x02
static
pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;
static
sigset_t fillset;
nThreadPool *thread_pool = NULL;
static
int
ntyWorkerCreate(nThreadPool *pool) {
sigset_t oset;
pthread_t thread_id;
pthread_sigmask(SIG_SETMASK, &fillset, &oset);
int
error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool);
pthread_sigmask(SIG_SETMASK, &oset, NULL);
return
error;
}
static
void
ntyWorkerCleanup(nThreadPool * pool) {
--pool->nthreads;
if
(pool->flags & NTY_POOL_DESTROY) {
if
(pool->nthreads == 0) {
pthread_cond_broadcast(&pool->busycv);
}
}
else
if
(pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {
pool->nthreads ++;
}
pthread_mutex_unlock(&pool->mtx);
}
static
void
ntyNotifyWaiters(nThreadPool *pool) {
if
(pool->head == NULL && pool->active == NULL) {
pool->flags &= ~NTY_POOL_WAIT;
pthread_cond_broadcast(&pool->waitcv);
}
}
static
void
ntyJobCleanup(nThreadPool *pool) {
pthread_t tid = pthread_self();
nWorker *activep;
nWorker **activepp;
pthread_mutex_lock(&pool->mtx);
for
(activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {
*activepp = activep->active_next;
break
;
}
if
(pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool);
}
static
void
* ntyWorkerThread(
void
*arg) {
nThreadPool *pool = (nThreadPool*)arg;
nWorker active;
int
timeout;
struct
timespec ts;
JOB_CALLBACK func;
pthread_mutex_lock(&pool->mtx);
pthread_cleanup_push(ntyWorkerCleanup, pool);
active.active_tid = pthread_self();
while
(1) {
pthread_sigmask(SIG_SETMASK, &fillset, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
timeout = 0;
pool->idle ++;
if
(pool->flags & NTY_POOL_WAIT) {
ntyNotifyWaiters(pool);
}
while
(pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {
if
(pool->nthreads <= pool->minimum) {
pthread_cond_wait(&pool->workcv, &pool->mtx);
}
else
{
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += pool->linger;
if
(pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {
timeout = 1;
break
;
}
}
}
pool->idle --;
if
(pool->flags & NTY_POOL_DESTROY)
break
;
nJob *job = pool->head;
if
(job != NULL) {
timeout = 0;
func = job->func;
void
*job_arg = job->arg;
pool->head = job->next;
if
(job == pool->tail) {
pool->tail == NULL;
}
active.active_next = pool->active;
pool->active = &active;
pthread_mutex_unlock(&pool->mtx);
pthread_cleanup_push(ntyJobCleanup, pool);
free
(job);
func(job_arg);
pthread_cleanup_pop(1);
}
if
(timeout && (pool->nthreads > pool->minimum)) {
break
;
}
}
pthread_cleanup_pop(1);
return
NULL;
}
static
void
ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) {
struct
sched_param param;
void
*addr;
size_t
size;
int
value;
pthread_attr_init(new_attr);
if
(old_attr != NULL) {
pthread_attr_getstack(old_attr, &addr, &size);
pthread_attr_setstack(new_attr, NULL, size);
pthread_attr_getscope(old_attr, &value);
pthread_attr_setscope(new_attr, value);
pthread_attr_getinheritsched(old_attr, &value);
pthread_attr_setinheritsched(new_attr, value);
pthread_attr_getschedpolicy(old_attr, &value);
pthread_attr_setschedpolicy(new_attr, value);
pthread_attr_getschedparam(old_attr, ¶m);
pthread_attr_setschedparam(new_attr, ¶m);
pthread_attr_getguardsize(old_attr, &size);
pthread_attr_setguardsize(new_attr, size);
}
pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);
}
nThreadPool *ntyThreadPoolCreate(
int
min_threads,
int
max_threads,
int
linger, pthread_attr_t *attr) {
sigfillset(&fillset);
if
(min_threads > max_threads || max_threads < 1) {
errno
= EINVAL;
return
NULL;
}
nThreadPool *pool = (nThreadPool*)
malloc
(
sizeof
(nThreadPool));
if
(pool == NULL) {
errno
= ENOMEM;
return
NULL;
}
pthread_mutex_init(&pool->mtx, NULL);
pthread_cond_init(&pool->busycv, NULL);
pthread_cond_init(&pool->workcv, NULL);
pthread_cond_init(&pool->waitcv, NULL);
pool->active = NULL;
pool->head = NULL;
pool->tail = NULL;
pool->flags = 0;
pool->linger = linger;
pool->minimum = min_threads;
pool->maximum = max_threads;
pool->nthreads = 0;
pool->idle = 0;
ntyCloneAttributes(&pool->attr, attr);
pthread_mutex_lock(&nty_pool_lock);
if
(thread_pool == NULL) {
pool->forw = pool;
pool->back = pool;
thread_pool = pool;
}
else
{
thread_pool->back->forw = pool;
pool->forw = thread_pool;
pool->back = pool->back;
thread_pool->back = pool;
}
pthread_mutex_unlock(&nty_pool_lock);
return
pool;
}
int
ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func,
void
*arg) {
nJob *job = (nJob*)
malloc
(
sizeof
(nJob));
if
(job == NULL) {
errno
= ENOMEM;
return
-1;
}
job->next = NULL;
job->func = func;
job->arg = arg;
pthread_mutex_lock(&pool->mtx);
if
(pool->head == NULL) {
pool->head = job;
}
else
{
pool->tail->next = job;
}
pool->tail = job;
if
(pool->idle > 0) {
pthread_cond_signal(&pool->workcv);
}
else
if
(pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {
pool->nthreads ++;
}
pthread_mutex_unlock(&pool->mtx);
}
void
nThreadPoolWait(nThreadPool *pool) {
pthread_mutex_lock(&pool->mtx);
pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);
while
(pool->head != NULL || pool->active != NULL) {
pool->flags |= NTY_POOL_WAIT;
pthread_cond_wait(&pool->waitcv, &pool->mtx);
}
pthread_cleanup_pop(1);
}
void
nThreadPoolDestroy(nThreadPool *pool) {
nWorker *activep;
nJob *job;
pthread_mutex_lock(&pool->mtx);
pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);
pool->flags |= NTY_POOL_DESTROY;
pthread_cond_broadcast(&pool->workcv);
for
(activep = pool->active;activep != NULL;activep = activep->active_next) {
pthread_cancel(activep->active_tid);
}
while
(pool->nthreads != 0) {
pthread_cond_wait(&pool->busycv, &pool->mtx);
}
pthread_cleanup_pop(1);
pthread_mutex_lock(&nty_pool_lock);
if
(thread_pool == pool) {
thread_pool = pool->forw;
}
if
(thread_pool == pool) {
thread_pool = NULL;
}
else
{
pool->back->forw = pool->forw;
pool->forw->back = pool->back;
}
pthread_mutex_unlock(&nty_pool_lock);
for
(job = pool->head;job != NULL;job = pool->head) {
pool->head = job->next;
free
(job);
}
pthread_attr_destroy(&pool->attr);
free
(pool);
}
/********************************* debug thread pool *********************************/
void
king_counter(
void
*arg) {
int
index = *(
int
*)arg;
printf
(
"index : %d, selfid : %lu\n"
, index, pthread_self());
free
(arg);
usleep(1);
}
#define KING_COUNTER_SIZE 1000
int
main(
int
argc,
char
*argv[]) {
nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);
int
i = 0;
for
(i = 0;i < KING_COUNTER_SIZE;i ++) {
int
*index = (
int
*)
malloc
(
sizeof
(
int
));
memset
(index, 0,
sizeof
(
int
));
memcpy
(index, &i,
sizeof
(
int
));
ntyThreadPoolQueue(pool, king_counter, index);
}
getchar
();
printf
(
"You are very good !!!!\n"
);
}
每天會更新論文和視頻,還有如果想學習c++知識在晚上8.30免費觀看這個直播:https://ke.qq.com/course/131973#tuin=b52b9a80
閱讀更多 IT布丁老師 的文章