您好,登錄后才能下訂單哦!
多線程編程—線程池的實現
執行與任務分離的組件— 線程池
https://github.com/wangbojing/threadpool
多線程技術主要解決了處理器單元內多個線程執行的問題,它可以顯著的減少處理器單元的閑置時間,增加處理器單元的吞吐能力。線程池是多線程編程的一個必要組件,并且對于很多編程人員都是透明的,更是神秘的。有幸能為大家解析其中緣由,尚有不妥之處,歡迎大家拋磚。
線程池的概念,是一個用來管理一組執行任務線程的工具。既然是管理工具,那么該工具管理是用來管理任務與執行的。如圖一線程池組件拓撲圖,執行隊列(Workers),任務隊列(Jobs)和池管理(Pool Manager)三部分組成。
執行隊列(Workers)是用來存放運行線程的隊列。
任務隊列(Jobs)是用來存放需要被執行的任務隊列。
池管理(Pool Manager)主要是管理執行隊列的執行順序,執行任務的時間長短,對長時間沒有使用的執行單元進行釋放,執行單元滿負荷運行的時及時添加執行單元;記錄未執行的任務數量,對新任務入隊,即將執行的任務出隊等等。
圖一 線程池組件拓撲圖
執行隊列(Workers)中的每一個執行單元(Worker)由哪些元素組成?線程ID,退出標志。
任務隊列(Jobs)中的每一個任務(Jobs)的組成元素?執行每一個任務的具體執行函數,每一個任務的執行參數。
池管理(Pool Manager)由哪些元素組成?每一個新任務添加與執行時的移除用的互斥鎖,每一個線程掛起的時所等待的條件變量。
根據分析如圖二線程池的類圖。
圖二線程池的類圖
到這里一個簡單的線程池就已經可以呼之欲出了。以下為實現代碼
/* * Author: WangBoJing * email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing */ #include <pthread.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #define LL_ADD(item, list) do { \ item->prev = NULL; \ item->next = list; \ list = item; \ } while(0) #define LL_REMOVE(item, list) do { \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ } while(0) typedef void (*JOB_CALLBACK)(void *); struct NTHREADPOOL; typedef struct NWORKER { pthread_t thread; int terminate; struct NTHREADPOOL *pool; struct NWORKER *next; struct NWORKER *prev; } nWorker; typedef struct NJOB { JOB_CALLBACK job_func; void *arg; struct NJOB *next; struct NJOB *prev; } nJob; typedef struct NTHREADPOOL { struct NWORKER *workers; struct NJOB *jobs; pthread_mutex_t jobs_mtx; pthread_cond_t jobs_cond; } nThreadPool; void *ntyWorkerThread(void *arg) { nWorker *worker = (nWorker*)arg; while (1) { pthread_mutex_lock(&worker->pool->jobs_mtx); while (worker->pool->jobs == NULL) { if (worker->terminate) break; pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx); } if (worker->terminate) { pthread_mutex_unlock(&worker->pool->jobs_mtx); break; } nJob *job = worker->pool->jobs; if (job != NULL) { LL_REMOVE(job, worker->pool->jobs); } pthread_mutex_unlock(&worker->pool->jobs_mtx); if (job == NULL) continue; job->job_func(job); usleep(1); } free(worker); pthread_exit(NULL); } int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) { if (pool == NULL) return 1; if (numWorkers < 1) numWorkers = 1; memset(pool, 0, sizeof(nThreadPool)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond)); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx)); int i = 0; for (i = 0;i < numWorkers;i ++) { nWorker *worker = (nWorker*)malloc(sizeof(nWorker)); if (worker == NULL) { perror("malloc"); return 1; } memset(worker, 0, sizeof(nWorker)); worker->pool = pool; int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker); if (ret) { perror("pthread_create"); free(worker); return 1; } LL_ADD(worker, worker->pool->workers); } } void ntyThreadPoolShutdown(nThreadPool *pool) { nWorker *worker = NULL; for (worker = pool->workers;worker != NULL;worker = worker->next) { worker->terminate = 1; } pthread_mutex_lock(&pool->jobs_mtx); pool->workers = NULL; pool->jobs = NULL; pthread_cond_broadcast(&pool->jobs_cond); pthread_mutex_unlock(&pool->jobs_mtx); } void ntyThreadPoolPush(nThreadPool *pool, nJob *job) { pthread_mutex_lock(&pool->jobs_mtx); LL_ADD(job, pool->jobs); pthread_cond_signal(&pool->jobs_cond); pthread_mutex_unlock(&pool->jobs_mtx); } /********************************* debug thread pool *********************************/ #define KING_MAX_THREADS 80 #define KING_COUNTER_SIZE 1000 void king_counter(void *arg) { nJob *job = (nJob*)arg; int index = *(int *)job->arg; printf("index: %d, selfid:%lu\n", index, pthread_self()); free(job->arg); free(job); } int main(int argc, char *argv[]) { nThreadPool pool; ntyThreadPoolCreate(&pool, KING_MAX_THREADS); int i = 0; for (i = 0;i < KING_COUNTER_SIZE;i ++) { nJob *job = (nJob*)malloc(sizeof(nJob)); if (job == NULL) { perror("malloc"); exit(1); } job->job_func = king_counter; job->arg = malloc(sizeof(int)); *(int*)job->arg = i; ntyThreadPoolPush(&pool, job); } getchar(); printf("You are very good !!!!\n"); }
這樣的線程池還是只是一個Demo,原因有如下幾點需要我們值得改進的。
線程池的線程數量是確定的,不能隨著系統任務請求數量放縮線程池的大小。
任務數量的統計,并沒有對任務隊列進行統計
執行任務中的線程數量,等待執行的任務數量進行統計
每一個執行任務的時間沒有做限制,
IO密集型與計算密集型區分,線程池非常常用,但是根據不同的業務場景需要設置不同配置
在用戶任務執行函數里,用戶主動的調用了pthread_exit退出線程的保護機制
針對于以上幾點問題,改進了一版線程池
/* * Author: WangBoJing * email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing */ #include <stdio.h> #include <string.h> #include <stdlib.h> #include <signal.h> #include <errno.h> #include <time.h> #include <unistd.h> #include <pthread.h> typedef void (*JOB_CALLBACK)(void *); typedef struct NJOB { struct NJOB *next; JOB_CALLBACK func; void *arg; } nJob; typedef struct NWORKER { struct NWORKER *active_next; pthread_t active_tid; } nWorker; typedef struct NTHREADPOOL { struct NTHREADPOOL *forw; struct NTHREADPOOL *back; pthread_mutex_t mtx; pthread_cond_t busycv; pthread_cond_t workcv; pthread_cond_t waitcv; nWorker *active; nJob *head; nJob *tail; pthread_attr_t attr; int flags; unsigned int linger; int minimum; int maximum; int nthreads; int idle; } nThreadPool; static void* ntyWorkerThread(void *arg); #define NTY_POOL_WAIT 0x01 #define NTY_POOL_DESTROY 0x02 static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER; static sigset_t fillset; nThreadPool *thread_pool = NULL; static int ntyWorkerCreate(nThreadPool *pool) { sigset_t oset; pthread_t thread_id; pthread_sigmask(SIG_SETMASK, &fillset, &oset); int error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool); pthread_sigmask(SIG_SETMASK, &oset, NULL); return error; } static void ntyWorkerCleanup(nThreadPool * pool) { --pool->nthreads; if (pool->flags & NTY_POOL_DESTROY) { if (pool->nthreads == 0) { pthread_cond_broadcast(&pool->busycv); } } else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) { pool->nthreads ++; } pthread_mutex_unlock(&pool->mtx); } static void ntyNotifyWaiters(nThreadPool *pool) { if (pool->head == NULL && pool->active == NULL) { pool->flags &= ~NTY_POOL_WAIT; pthread_cond_broadcast(&pool->waitcv); } } static void ntyJobCleanup(nThreadPool *pool) { pthread_t tid = pthread_self(); nWorker *activep; nWorker **activepp; pthread_mutex_lock(&pool->mtx); for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) { *activepp = activep->active_next; break; } if (pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool); } static void* ntyWorkerThread(void *arg) { nThreadPool *pool = (nThreadPool*)arg; nWorker active; int timeout; struct timespec ts; JOB_CALLBACK func; pthread_mutex_lock(&pool->mtx); pthread_cleanup_push(ntyWorkerCleanup, pool); active.active_tid = pthread_self(); while (1) { pthread_sigmask(SIG_SETMASK, &fillset, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); timeout = 0; pool->idle ++; if (pool->flags & NTY_POOL_WAIT) { ntyNotifyWaiters(pool); } while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) { if (pool->nthreads <= pool->minimum) { pthread_cond_wait(&pool->workcv, &pool->mtx); } else { clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += pool->linger; if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) { timeout = 1; break; } } } pool->idle --; if (pool->flags & NTY_POOL_DESTROY) break; nJob *job = pool->head; if (job != NULL) { timeout = 0; func = job->func; void *job_arg = job->arg; pool->head = job->next; if (job == pool->tail) { pool->tail == NULL; } active.active_next = pool->active; pool->active = &active; pthread_mutex_unlock(&pool->mtx); pthread_cleanup_push(ntyJobCleanup, pool); free(job); func(job_arg); pthread_cleanup_pop(1); } if (timeout && (pool->nthreads > pool->minimum)) { break; } } pthread_cleanup_pop(1); return NULL; } static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) { struct sched_param param; void *addr; size_t size; int value; pthread_attr_init(new_attr); if (old_attr != NULL) { pthread_attr_getstack(old_attr, &addr, &size); pthread_attr_setstack(new_attr, NULL, size); pthread_attr_getscope(old_attr, &value); pthread_attr_setscope(new_attr, value); pthread_attr_getinheritsched(old_attr, &value); pthread_attr_setinheritsched(new_attr, value); pthread_attr_getschedpolicy(old_attr, &value); pthread_attr_setschedpolicy(new_attr, value); pthread_attr_getschedparam(old_attr, ¶m); pthread_attr_setschedparam(new_attr, ¶m); pthread_attr_getguardsize(old_attr, &size); pthread_attr_setguardsize(new_attr, size); } pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED); } nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) { sigfillset(&fillset); if (min_threads > max_threads || max_threads < 1) { errno = EINVAL; return NULL; } nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool)); if (pool == NULL) { errno = ENOMEM; return NULL; } pthread_mutex_init(&pool->mtx, NULL); pthread_cond_init(&pool->busycv, NULL); pthread_cond_init(&pool->workcv, NULL); pthread_cond_init(&pool->waitcv, NULL); pool->active = NULL; pool->head = NULL; pool->tail = NULL; pool->flags = 0; pool->linger = linger; pool->minimum = min_threads; pool->maximum = max_threads; pool->nthreads = 0; pool->idle = 0; ntyCloneAttributes(&pool->attr, attr); pthread_mutex_lock(&nty_pool_lock); if (thread_pool == NULL) { pool->forw = pool; pool->back = pool; thread_pool = pool; } else { thread_pool->back->forw = pool; pool->forw = thread_pool; pool->back = pool->back; thread_pool->back = pool; } pthread_mutex_unlock(&nty_pool_lock); return pool; } int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) { nJob *job = (nJob*)malloc(sizeof(nJob)); if (job == NULL) { errno = ENOMEM; return -1; } job->next = NULL; job->func = func; job->arg = arg; pthread_mutex_lock(&pool->mtx); if (pool->head == NULL) { pool->head = job; } else { pool->tail->next = job; } pool->tail = job; if (pool->idle > 0) { pthread_cond_signal(&pool->workcv); } else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) { pool->nthreads ++; } pthread_mutex_unlock(&pool->mtx); } void nThreadPoolWait(nThreadPool *pool) { pthread_mutex_lock(&pool->mtx); pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx); while (pool->head != NULL || pool->active != NULL) { pool->flags |= NTY_POOL_WAIT; pthread_cond_wait(&pool->waitcv, &pool->mtx); } pthread_cleanup_pop(1); } void nThreadPoolDestroy(nThreadPool *pool) { nWorker *activep; nJob *job; pthread_mutex_lock(&pool->mtx); pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx); pool->flags |= NTY_POOL_DESTROY; pthread_cond_broadcast(&pool->workcv); for (activep = pool->active;activep != NULL;activep = activep->active_next) { pthread_cancel(activep->active_tid); } while (pool->nthreads != 0) { pthread_cond_wait(&pool->busycv, &pool->mtx); } pthread_cleanup_pop(1); pthread_mutex_lock(&nty_pool_lock); if (thread_pool == pool) { thread_pool = pool->forw; } if (thread_pool == pool) { thread_pool = NULL; } else { pool->back->forw = pool->forw; pool->forw->back = pool->back; } pthread_mutex_unlock(&nty_pool_lock); for (job = pool->head;job != NULL;job = pool->head) { pool->head = job->next; free(job); } pthread_attr_destroy(&pool->attr); free(pool); } /********************************* debug thread pool *********************************/ void king_counter(void *arg) { int index = *(int*)arg; printf("index : %d, selfid : %lu\n", index, pthread_self()); free(arg); usleep(1); } #define KING_COUNTER_SIZE 1000 int main(int argc, char *argv[]) { nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL); int i = 0; for (i = 0;i < KING_COUNTER_SIZE;i ++) { int *index = (int*)malloc(sizeof(int)); memset(index, 0, sizeof(int)); memcpy(index, &i, sizeof(int)); ntyThreadPoolQueue(pool, king_counter, index); } getchar(); printf("You are very good !!!!\n"); }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。