多線程編程—線程池的實現

執行與任務分離的組件— 線程池

https://github.com/wangbojing/threadpool

多線程技術主要解決了處理器單元內多個線程執行的問題,它可以顯著的減少處理器單元的閒置時間,增加處理器單元的吞吐能力。線程池是多線程編程的一個必要組件,並且對於很多編程人員都是透明的,更是神秘的。有幸能為大家解析其中緣由,尚有不妥之處,歡迎大家拋磚。

線程池的概念,是一個用來管理一組執行任務線程的工具。既然是管理工具,那麼該工具管理是用來管理任務與執行的。如圖一線程池組件拓撲圖,執行隊列(Workers),任務隊列(Jobs)和池管理(Pool Manager)三部分組成。

執行隊列(Workers)是用來存放運行線程的隊列。

任務隊列(Jobs)是用來存放需要被執行的任務隊列。

池管理(Pool Manager)主要是管理執行隊列的執行順序,執行任務的時間長短,對長時間沒有使用的執行單元進行釋放,執行單元滿負荷運行的時及時添加執行單元;記錄未執行的任務數量,對新任務入隊,即將執行的任務出隊等等。

多線程編程—線程池的實現

圖一 線程池組件拓撲圖

執行隊列(Workers)中的每一個執行單元(Worker)由哪些元素組成?線程ID,退出標誌。

任務隊列(Jobs)中的每一個任務(Jobs)的組成元素?執行每一個任務的具體執行函數,每一個任務的執行參數。

池管理(Pool Manager)由哪些元素組成?每一個新任務添加與執行時的移除用的互斥鎖,每一個線程掛起的時所等待的條件變量。

根據分析如圖二線程池的類圖。

多線程編程—線程池的實現

圖二線程池的類圖

到這裡一個簡單的線程池就已經可以呼之欲出了。以下為實現代碼

/*

* Author: WangBoJing

* email: [email protected]

* github: https://github.com/wangbojing

*/

#include

#include

#include

#include

#define LL_ADD(item, list) do { \

item->prev = NULL; \

item->next = list; \

list = item; \

} while(0)

#define LL_REMOVE(item, list) do { \

if (item->prev != NULL) item->prev->next = item->next; \

if (item->next != NULL) item->next->prev = item->prev; \

if (list == item) list = item->next; \

item->prev = item->next = NULL; \

} while(0)

typedef void (*JOB_CALLBACK)(void *);

struct NTHREADPOOL;

typedef struct

NWORKER {

pthread_t thread;

int terminate;

struct NTHREADPOOL *pool;

struct NWORKER *next;

struct NWORKER *prev;

} nWorker;

typedef struct NJOB {

JOB_CALLBACK job_func;

void *arg;

struct NJOB *next;

struct NJOB *prev;

} nJob;

typedef struct NTHREADPOOL {

struct NWORKER *workers;

struct NJOB *jobs;

pthread_mutex_t jobs_mtx;

pthread_cond_t jobs_cond;

} nThreadPool;

void *ntyWorkerThread(void *arg) {

nWorker *worker = (nWorker*)arg;

while (1) {

pthread_mutex_lock(&worker->pool->jobs_mtx);

while (worker->pool->jobs == NULL) {

if (worker->terminate) break;

pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);

}

if (worker->terminate) {

pthread_mutex_unlock(&worker->pool->jobs_mtx);

break;

}

nJob *job = worker->pool->jobs;

if (job != NULL) {

LL_REMOVE(job, worker->pool->jobs);

}

pthread_mutex_unlock(&worker->pool->jobs_mtx);

if (job == NULL) continue;

job->job_func(job);

usleep(1);

}

free(worker);

pthread_exit(NULL);

}

int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) {

if (pool == NULL) return 1;

if (numWorkers < 1) numWorkers = 1;

memset(pool, 0, sizeof(nThreadPool));

pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;

memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond));

pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;

memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx));

int i = 0;

for (i = 0;i < numWorkers;i ++) {

nWorker *worker = (nWorker*)

malloc(sizeof(nWorker));

if (worker == NULL) {

perror("malloc");

return 1;

}

memset(worker, 0,

sizeof(nWorker));

worker->pool = pool;

int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker);

if (ret) {

perror("pthread_create");

free(worker);

return 1;

}

LL_ADD(worker, worker->pool->workers);

}

}

void ntyThreadPoolShutdown(nThreadPool *pool) {

nWorker *worker = NULL;

for (worker = pool->workers;worker != NULL;worker = worker->next) {

worker->terminate = 1;

}

pthread_mutex_lock(&pool->jobs_mtx);

pool->workers = NULL;

pool->jobs = NULL;

pthread_cond_broadcast(&pool->jobs_cond);

pthread_mutex_unlock(&pool->jobs_mtx);

}

void ntyThreadPoolPush(nThreadPool *pool, nJob *job) {

pthread_mutex_lock(&pool->jobs_mtx);

LL_ADD(job, pool->jobs);

pthread_cond_signal(&pool->jobs_cond);

pthread_mutex_unlock(&pool->jobs_mtx);

}

/********************************* debug thread pool *********************************/

#define KING_MAX_THREADS 80

#define KING_COUNTER_SIZE 1000

void

king_counter(void *arg) {

nJob *job = (nJob*)arg;

int index = *(int *)job->arg;

printf("index: %d, selfid:%lu\n", index, pthread_self());

free(job->arg);

free

(job);

}

int main(int argc, char *argv[]) {

nThreadPool pool;

ntyThreadPoolCreate(&pool, KING_MAX_THREADS);

int i = 0;

for (i = 0;i < KING_COUNTER_SIZE;i ++) {

nJob *job = (nJob*)malloc(sizeof(nJob));

if (job == NULL) {

perror("malloc");

exit(1);

}

job->job_func = king_counter;

job->arg = malloc(sizeof(int));

*(int*)job->arg = i;

ntyThreadPoolPush(&pool, job);

}

getchar();

printf("You are very good !!!!\n");

}

這樣的線程池還是隻是一個Demo,原因有如下幾點需要我們值得改進的。

  1. 線程池的線程數量是確定的,不能隨著系統任務請求數量放縮線程池的大小。

  2. 任務數量的統計,並沒有對任務隊列進行統計

  3. 執行任務中的線程數量,等待執行的任務數量進行統計

  4. 每一個執行任務的時間沒有做限制,

  5. IO密集型與計算密集型區分,線程池非常常用,但是根據不同的業務場景需要設置不同配置

  6. 在用戶任務執行函數里,用戶主動的調用了pthread_exit退出線程的保護機制

針對於以上幾點問題,改進了一版線程池

/*

* Author: WangBoJing

* email: [email protected]

* github: https://github.com/wangbojing

*/

#include

#include

#include

#include

#include

#include

#include

#include

typedef void (*JOB_CALLBACK)(void *);

typedef

struct NJOB {

struct NJOB *next;

JOB_CALLBACK func;

void *arg;

} nJob;

typedef struct NWORKER {

struct NWORKER *active_next;

pthread_t active_tid;

} nWorker;

typedef struct NTHREADPOOL {

struct NTHREADPOOL *forw;

struct NTHREADPOOL *back;

pthread_mutex_t mtx;

pthread_cond_t busycv;

pthread_cond_t workcv;

pthread_cond_t waitcv;

nWorker *active;

nJob *head;

nJob *tail;

pthread_attr_t attr;

int flags;

unsigned int linger;

int minimum;

int maximum;

int nthreads;

int idle;

} nThreadPool;

static void* ntyWorkerThread(void *arg);

#define NTY_POOL_WAIT 0x01

#define NTY_POOL_DESTROY 0x02

static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;

static sigset_t fillset;

nThreadPool *thread_pool = NULL;

static int ntyWorkerCreate(nThreadPool *pool) {

sigset_t oset;

pthread_t thread_id;

pthread_sigmask(SIG_SETMASK, &fillset, &oset);

int

error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool);

pthread_sigmask(SIG_SETMASK, &oset, NULL);

return error;

}

static void ntyWorkerCleanup(nThreadPool * pool) {

--pool->nthreads;

if (pool->flags & NTY_POOL_DESTROY) {

if (pool->nthreads == 0) {

pthread_cond_broadcast(&pool->busycv);

}

} else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {

pool->nthreads ++;

}

pthread_mutex_unlock(&pool->mtx);

}

static

void ntyNotifyWaiters(nThreadPool *pool) {

if (pool->head == NULL && pool->active == NULL) {

pool->flags &= ~NTY_POOL_WAIT;

pthread_cond_broadcast(&pool->waitcv);

}

}

static void ntyJobCleanup(nThreadPool *pool) {

pthread_t tid = pthread_self();

nWorker *activep;

nWorker **activepp;

pthread_mutex_lock(&pool->mtx);

for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {

*activepp = activep->active_next;

break;

}

if

(pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool);

}

static void* ntyWorkerThread(void *arg) {

nThreadPool *pool = (nThreadPool*)arg;

nWorker active;

int timeout;

struct timespec ts;

JOB_CALLBACK func;

pthread_mutex_lock(&pool->mtx);

pthread_cleanup_push(ntyWorkerCleanup, pool);

active.active_tid = pthread_self();

while (1) {

pthread_sigmask(SIG_SETMASK, &fillset, NULL);

pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

timeout = 0;

pool->idle ++;

if (pool->flags & NTY_POOL_WAIT) {

ntyNotifyWaiters(pool);

}

while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {

if (pool->nthreads <= pool->minimum) {

pthread_cond_wait(&pool->workcv, &pool->mtx);

} else {

clock_gettime(CLOCK_REALTIME, &ts);

ts.tv_sec += pool->linger;

if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {

timeout = 1;

break;

}

}

}

pool->idle --;

if (pool->flags & NTY_POOL_DESTROY) break;

nJob *job = pool->head;

if (job != NULL) {

timeout = 0;

func = job->func;

void *job_arg = job->arg;

pool->head = job->next;

if (job == pool->tail) {

pool->tail == NULL;

}

active.active_next = pool->active;

pool->active = &active;

pthread_mutex_unlock(&pool->mtx);

pthread_cleanup_push(ntyJobCleanup, pool);

free(job);

func(job_arg);

pthread_cleanup_pop(1);

}

if (timeout && (pool->nthreads > pool->minimum)) {

break;

}

}

pthread_cleanup_pop(1);

return NULL;

}

static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) {

struct

sched_param param;

void *addr;

size_t size;

int value;

pthread_attr_init(new_attr);

if (old_attr != NULL) {

pthread_attr_getstack(old_attr, &addr, &size);

pthread_attr_setstack(new_attr, NULL, size);

pthread_attr_getscope(old_attr, &value);

pthread_attr_setscope(new_attr, value);

pthread_attr_getinheritsched(old_attr, &value);

pthread_attr_setinheritsched(new_attr, value);

pthread_attr_getschedpolicy(old_attr, &value);

pthread_attr_setschedpolicy(new_attr, value);

pthread_attr_getschedparam(old_attr, &param);

pthread_attr_setschedparam(new_attr, &param);

pthread_attr_getguardsize(old_attr, &size);

pthread_attr_setguardsize(new_attr, size);

}

pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);

}

nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) {

sigfillset(&fillset);

if (min_threads > max_threads || max_threads < 1) {

errno = EINVAL;

return NULL;

}

nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool));

if

(pool == NULL) {

errno = ENOMEM;

return NULL;

}

pthread_mutex_init(&pool->mtx, NULL);

pthread_cond_init(&pool->busycv, NULL);

pthread_cond_init(&pool->workcv, NULL);

pthread_cond_init(&pool->waitcv, NULL);

pool->active = NULL;

pool->head = NULL;

pool->tail = NULL;

pool->flags = 0;

pool->linger = linger;

pool->minimum = min_threads;

pool->maximum = max_threads;

pool->nthreads = 0;

pool->idle = 0;

ntyCloneAttributes(&pool->attr, attr);

pthread_mutex_lock(&nty_pool_lock);

if (thread_pool == NULL) {

pool->forw = pool;

pool->back = pool;

thread_pool = pool;

} else {

thread_pool->back->forw = pool;

pool->forw = thread_pool;

pool->back = pool->back;

thread_pool->back = pool;

}

pthread_mutex_unlock(&nty_pool_lock);

return pool;

}

int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) {

nJob *job = (nJob*)malloc(sizeof(nJob));

if (job == NULL) {

errno = ENOMEM;

return -1;

}

job->next = NULL;

job->func = func;

job->arg = arg;

pthread_mutex_lock(&pool->mtx);

if (pool->head == NULL) {

pool->head = job;

} else {

pool->tail->next = job;

}

pool->tail = job;

if (pool->idle > 0) {

pthread_cond_signal(&pool->workcv);

} else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {

pool->nthreads ++;

}

pthread_mutex_unlock(&pool->mtx);

}

void nThreadPoolWait(nThreadPool *pool) {

pthread_mutex_lock(&pool->mtx);

pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);

while (pool->head != NULL || pool->active != NULL) {

pool->flags |= NTY_POOL_WAIT;

pthread_cond_wait(&pool->waitcv, &pool->mtx);

}

pthread_cleanup_pop(1);

}

void nThreadPoolDestroy(nThreadPool *pool) {

nWorker *activep;

nJob *job;

pthread_mutex_lock(&pool->mtx);

pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);

pool->flags |= NTY_POOL_DESTROY;

pthread_cond_broadcast(&pool->workcv);

for (activep = pool->active;activep != NULL;activep = activep->active_next) {

pthread_cancel(activep->active_tid);

}

while (pool->nthreads != 0) {

pthread_cond_wait(&pool->busycv, &pool->mtx);

}

pthread_cleanup_pop(1);

pthread_mutex_lock(&nty_pool_lock);

if (thread_pool == pool) {

thread_pool = pool->forw;

}

if (thread_pool == pool) {

thread_pool = NULL;

} else {

pool->back->forw = pool->forw;

pool->forw->back = pool->back;

}

pthread_mutex_unlock(&nty_pool_lock);

for (job = pool->head;job != NULL;job = pool->head) {

pool->head = job->next;

free(job);

}

pthread_attr_destroy(&pool->attr);

free(pool);

}

/********************************* debug thread pool *********************************/

void king_counter(void *arg) {

int index = *(

int*)arg;

printf("index : %d, selfid : %lu\n", index, pthread_self());

free(arg);

usleep(1);

}

#define KING_COUNTER_SIZE 1000

int main(int argc, char

*argv[]) {

nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);

int i = 0;

for (i = 0;i < KING_COUNTER_SIZE;i ++) {

int *index = (int*)malloc(sizeof(int));

memset(index, 0, sizeof(int));

memcpy(index, &i, sizeof(int));

ntyThreadPoolQueue(pool, king_counter, index);

}

getchar();

printf("You are very good !!!!\n");

}

每天會更新論文和視頻,還有如果想學習c++知識在晚上8.30免費觀看這個直播:https://ke.qq.com/course/131973#tuin=b52b9a80


分享到:


相關文章: