线程池的简易达成,浅谈两种服务器端模型

作者: 编程应用  发布:2019-09-26

几个基本的线程函数:(本人有强迫症,为了分清返回值、函数名、参数列表,间距有点大,用的时候不要这样)

引言:上篇文章说到了多进程并发式的服务端模型,如上一篇文章所述,进程的频繁创建会导致服务器不堪负载,那这一篇博客主要讲述的是线程模型和线程池的方式来提高服务端的负载能力。同时比较一下不同的模型的好处与坏处。

线程操纵函数

(如果不加以说明,我们都是考虑开发是基于GNU/Linux的)在Linux下创建一个线程的方式很简单,pthread_create() 函数来创建线程,其中的一个参数的回调函数,也就是线程本身的执行体函数。

1 int  pthread_create  (pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //创建2 void pthread_exit    (void *retval);            //终止自身3 int  pthread_cancel  (pthread_t tid);             //终止其他.发送终止信号后目标线程不一定终止,要调用join函数等待4 int  pthread_join    (pthread_t tid, void **retval);   //阻塞并等待其他线程
void *thread_entry( void * args );

属性

这里不过多的强调怎样利用线程等来创建执行体以及其他的系统调用怎样使用的。

1 int  pthread_attr_init            (pthread_attr_t *attr);           //初始化属性2 int  pthread_attr_setdetachstate  (pthread_attr_t *attr, int detachstate); //设置分离状态3 int  pthread_attr_destroy         (pthread_attr_t *attr);           //销毁属性

那么,在服务端的线程使用方式一般为三种种:

同步函数
互斥锁

(1)按需生成(来一个连接生成一个线程)

1 int pthread_mutex_init    (pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);  //初始化锁2 int pthread_mutex_destroy (pthread_mutex_t *mutex);  //销毁锁3 int pthread_mutex_lock    (pthread_mutex_t *mutex);  //加锁4 int pthread_mutex_trylock (pthread_mutex_t *mutex);  //尝试加锁,上面lock的非阻塞版本5 int pthread_mutex_unlock  (pthread_mutex_t *mutex);  //解锁

(2)线程池(预先生成很多线程)

条件变量

(3)Leader follower(LF)

1 int pthread_cond_init    (pthread_cond_t *cv, const pthread_condattr_t *cattr);  //初始化2 int pthread_cond_destroy (pthread_cond_t *cond);                                 //销毁  3 int pthread_cond_wait    (pthread_cond_t *cond, pthread_mutex_t *mutex);         //等待条件4 int pthread_cond_signal  (pthread_cond_t *cond);                                 //通知,唤醒第一个调用pthread_cond_wait()而进入睡眠的线程

主要讲解第一种和第二种,第三种暂时手上没有实例代码,最近也没写、

工具函数

第一种方式的范式大概是这样:

1 int       pthread_equal  (pthread_t t1, pthread_t t2); //比较线程ID2 int       pthread_detach (pthread_t tid);              //分离线程3 pthread_t pthread_self   (void);                       //自身ID

回调函数:

上述代码中,线程的cancel和join,以及最后的工具函数,这些函数的参数都为结构体变量,其他的函数参数都是结构体变量指针;品味一下,参数为指针的,因为都需要改变结构体的内容,而参数为普通变量的,则只需要读内容即可。

void *thread_entry( void *args )
{
        int fd = *(int *)args ;
        do_handler_fd( fd );
}

线程池的优点可以自行百度一下, 一个线程池包括4个部分:

程序主体:

1、线程池管理器:用于创建、管理、销毁线程池2、工作线程:线程池中线程3、任务接口:线程要调用的函数,任务的具体实现函数4、任务队列:用于存放没有处理的任务。提供一种缓冲机制。

for(;;){
    fd = accept();
    pthread_create(...,thread_entry,&fd);
}

线程池代码:

这里所展示的只是一个最简单的方式,但是可以代表多线程的服务器端模型。

  1 #include <stdio.h>  2 #include <stdlib.h>  3 #include <pthread.h>    //linux环境中多线程的头文件,非C语言标准库,编译时最后要加 -lpthread 调用动态链接库  4   5 //任务链表的结构  6 typedef struct worker   7 {  8     void  * (void *arg);   //工作函数  9     void  *arg;                      //函数的参数 10     struct worker *next; 11 }CThread_worker; 12  13 //线程池的管理结构 14 typedef struct  15 { 16     pthread_mutex_t queue_lock;     //互斥锁 17     pthread_cond_t  queue_ready;    //条件变量/信号量 18  19     CThread_worker *queue_head;     //指向工作链表的头结点,临界区 20     int cur_queue_size;             //记录链表中工作的数量,临界区 21  22     int max_thread_num;             //最大线程数 23     pthread_t *threadid;            //线程ID 24  25     int shutdown;                   //开关 26 }CThread_pool; 27  28 static CThread_pool *pool = NULL;   //一个线程池变量 29 int pool_add_worker(void *(void *arg), void *arg);    //负责向工作链表中添加实际的工作 30 void *thread_routine(void *arg);    //线程例程 31  32 //线程池初始化 33 void 34 pool_init(int max_thread_num) 35 { 36     int i = 0; 37  38     pool = (CThread_pool *) malloc (sizeof(CThread_pool));    //创建线程池 39  40     pthread_mutex_init(&(pool->queue_lock),  NULL);     //互斥锁初始化,参数为锁的地址 41     pthread_cond_init( &(pool->queue_ready), NULL);     //条件变量初始化,参数为变量地址 42  43     pool->queue_head = NULL; 44     pool->cur_queue_size = 0; 45  46     pool->max_thread_num = max_thread_num; 47     pool->threadid = (pthread_t *)malloc(max_thread_num * sizeof(pthread_t)); 48     for (i = 0; i < max_thread_num; i++)  49     { 50         pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);  //创建线程, 参数为线程ID变量地址、属性、例程、参数 51     } 52  53     pool->shutdown = 0; 54 } 55  56 //例程,调用具体的工作函数 57 void * 58 thread_routine(void *arg) 59 { 60     //线程开始 61     while(1) 62     { 63         pthread_mutex_lock(&(pool->queue_lock));    //从工作链表中取工作,要先加互斥锁,参数为锁地址 64  65         while(pool->cur_queue_size == 0 && !pool->shutdown)  66         {   //链表为空,线程等待 67             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));   //等待资源,信号量用于通知。会释放第二个参数的锁,以供添加;函数返回时重新加锁。 68         } 69  70         if(pool->shutdown)  71         { 72             pthread_mutex_unlock(&(pool->queue_lock));          //结束开关开启,释放锁并退出线程 73             pthread_exit;     //参数为void * 74         } 75      76         //线程准备工作 77         --pool->cur_queue_size; 78         CThread_worker *worker = pool->queue_head; 79         pool->queue_head = worker->next; 80  81         pthread_mutex_unlock (&(pool->queue_lock));     //获取一个工作后释放锁 82  83  84         (*(worker->process))(worker->arg);      //做工作 85         free; 86         worker = NULL; 87     } 88  89     pthread_exit; 90 } 91  92 //销毁线程池 93 int 94 pool_destroy() 95 { 96     if(pool->shutdown)      //检测结束开关是否开启,若开启,则所有线程会自动退出 97         return -1; 98     pool->shutdown = 1; 99 100     pthread_cond_broadcast( &(pool->queue_ready) );     //广播,唤醒所有线程,准备退出101 102     int i;103     for(i = 0; i < pool->max_thread_num; ++i)104         pthread_join(pool->threadid[i], NULL);  //主线程等待所有线程退出,只有join第一个参数不是指针,第二个参数类型是void **,接收exit的返回值,需要强制转换105     free(pool->threadid);106 107     CThread_worker *head = NULL;108     while(pool->queue_head != NULL)            //释放未执行的工作链表剩余结点109     {110         head = pool->queue_head;111         pool->queue_head = pool->queue_head->next;112         free;113     }114 115     pthread_mutex_destroy(&(pool->queue_lock));     //销毁锁和条件变量116     pthread_cond_destroy(&(pool->queue_ready));117 118     free;119     pool=NULL;120     return 0;121 }122 123 void *124 myprocess(void *arg)125 {126     printf("threadid is 0x%x, working on task %dn", (int)pthread_self(), *(int*)arg);    //工作函数,添加实际的代码,用sleep来代替127     sleep (1);128     return NULL;129 }130 131 //添加工作132 int133 pool_add_worker(void *(void *arg), void *arg)134 {135     CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));136     newworker->process = process;   //具体的工作函数137     newworker->arg = arg;138     newworker->next = NULL;139 140     pthread_mutex_lock( &(pool->queue_lock) );      //加锁141 142     CThread_worker *member = pool->queue_head;      //插入链表尾部143     if( member != NULL ) 144     {145         while( member->next != NULL )146             member = member->next;147         member->next = newworker;148     }149     else 150     {151         pool->queue_head = newworker;152     }153     ++pool->cur_queue_size;154 155     pthread_mutex_unlock( &(pool->queue_lock) );   //解锁156 157     pthread_cond_signal( &(pool->queue_ready) );   //通知一个等待的线程158     return 0;159 }160 161 int162 main(int argc, char **argv)163 {164     pool_init(3);   //主线程创建线程池,3个线程165 166     int *workingnum = (int *) malloc(sizeof(int) * 10);167     int i;168     for(i = 0; i < 10; ++i) 169     {170         workingnum[i] = i;171         pool_add_worker(myprocess, &workingnum[i]);     //主线程负责添加工作,10个工作172     }173 174     sleep (5);175     pool_destroy();     //销毁线程池176     free (workingnum);177 178     return 0;179 }

大体服务端分为主线程和工作线程,主线程负责accept()连接,而工作线程负责处理业务逻辑和流的读取等。这样,即使在工作线程阻塞的情况下,也只是阻塞在线程范围内,关于这部分内容,可以参考《C++网络编程》第一卷的第五章。在应用层和内核之间的线程比例为1:1的操作系统线程机制中,一个线程在内核中会有一个内核线程实例,那么就是说,如果这个线程阻塞,不会引起在同一个进程里面的线程也阻塞。现在大多是的操作系统采用的都是 1:1的模型,但是这个比传统的N:1模型更消耗资源。 N:1模型就是,在应用层级别的多个线程在操作系统中只有一个实例,可以看做一个组,一旦一个线程阻塞,这个工作组的其他线程都会阻塞。

故上述代码的 do_handler_fd( fd ) 里面的系统调用如果阻塞,不会引起整个进程阻塞,线程的阻塞只是在线程范围内。所以,主线程可以一直等待客户连接,而把工作处理过程放到线程中去。

这个是传统的线程方式,这种方式也会带来一些问题:

(1)工作开销过大,线程的频繁创建的销毁也是一个很消耗资源的过程,虽然较进程小很多。 

(2)对于临界资源的访问需要控制加锁等操作,加大了程序设计的复杂性。

(3)一个线程的崩溃会导致整个进程的崩溃,比如调用了exit() 函数等,虽然阻塞操作只阻塞一个线程,但是其他一些系统调用的失败或崩溃将导致服务器整个down机。后果不堪设想。

但是在很多地方也提到了,多线程的方式适合IO密集型的程序,比如大文件传输等,这样可以在用户看来所有的操作都是并行的。

 

下面来说说线程池的方式,它改进了上述的问题的第一个,频繁的创建线程。

线程池的基本思想就是预先创建一部分线程,然后等到任务来的时候,通过条件变量或者其他的机制来唤醒一个工作线程。

下面详细的讲述一下前段时间写的一个简单的线程池方案。

 

线程池有一个任务队列,即由任务对象组成的一组队列。

我们为这个任务队列提供两个接口:

void mc_thread_pool_add_task(void *task , size_t tasksize )

解释一下这个接口的含义和参数, task 是一个指向任务实例的指针,tasksize 一般取 sizeof( instance_task ) 为的是在加入任务队列的时候队列的一些其他操作。为了简单化,这里没有提供任务优先级的考虑。

void *mc_thread_pool_get_task()

这个函数用来取得一个指向任务实例的指针,然后可以操作这个任务。

一般情况下,由主线程调用第一个函数,而工作线程调用第二个函数。

我们来看看线程池的结构:

typedef struct _thread_pool_t
{
    pthread_mutex_t  queue_lock ;
    pthread_cond_t   task_cond  ;
    list_t         * tasks       // treat it as queue thread_task_t type
    pthread_t      * pthreads   ;
    int              isdestoried;
    int              workersnum ;
    char             ready      ;
    thread_task_handler  thread_pool_task_handler;
}thread_pool_t;
    /*
     *  this structure is a global control block of threads poll
     *  as you can see , queue_lock and task_cond is define to protecte access of this whole poll
     *  and task_cond is used to signal to threads that the task queue is ready
     *  tasks is a queue of tasks , each task should posted to this queue and threads
     *  in this pool can get it , we defined this task as void * to use wildly
     *  isdestoried is a boolean flag as his/her name
     *  workersnum is the total number of threads
     *  ready is a flag also and used to judge if the tasks queue is ready
     *  thread_pool_task_handler is a function point which points to the task handler you defined
     */

在线程池的结构中,我们定义了两个变量, queue_lock 和 task_cond

一个是锁,用来控制线程对于 task 任务队列的访问,另一个 task_cond 用来唤醒工作线程。

 

说说基本原理:工作线程默认情况下是阻塞在 pthread_cond_wait() 系统调用下的,如果有任务到来,我们可用使用 pthread_cond_singal() 来唤醒一个处于阻塞状态的线程,这样这个线程就可以执行 mc_thread_pool_get_task() 来取得一个任务,并调用相应的回调函数。

 

tasks就是上面所说的任务队列,pthreads是一个pthread_t 的数组,也就是用来标示线程id 的数组。每一次创建线程的时候都会返回线程id,所以我们需要记录。

ready 是一个flag , 标示是否任务队列可用。thread_task_handler   是一个函数指针,定义是这样的:

typedef void ( *thread_task_handler )( void * args ) ;

结构体里的 thread_pool_task_handler 就是在初始化的时候设置的线程的执行体。

下面看看初始化函数:

void mc_thread_pool_ini( mc_thread_pool_t * par_tp , int workersnum ,thread_task_handler par_handler )
{
    int err ;
    //par_tp = ( thread_pool_t *)malloc( sizeof(thread_pool_t) );

    if( par_tp == NULL )
    {
        fprintf( stderr , "thread_pool_t mallocn");
        return  ;
    }
    par_tp->workersnum = workersnum ;

    pthread_mutex_init( &par_tp->queue_lock ,NULL );
    pthread_cond_init(&par_tp->task_cond , NULL );

    /*
    par_tp->queue_lock = PTHREAD_MUTEX_INITIALIZER ;
    par_tp->task_cond  = PTHREAD_COND_INITIALIZER  ;
    */
    par_tp->tasks = mc_listcreate() ;
    if( par_tp->tasks == NULL )
    {
        fprintf( stderr , "listcreate() errorn");
        //free( par_tp ) ;
        return  ;
    }

    par_tp->pthreads = ( pthread_t *)malloc( sizeof( pthread_t )*workersnum );

    if( par_tp->pthreads == NULL )
    {
        fprintf( stderr , "pthreads mallocn");
        //free( par_tp );
        mc_freelist( par_tp->tasks ) ;
        return NULL ;
    }

    int i = 0 ; 
    for( ; i < workersnum ; i++ )
    {
        fprintf(stderr,"start to create threadsn");
        err = pthread_create(&(par_tp->pthreads[i]),NULL,mc_thread_entry,NULL) ;
        if( err == -1 )
        {
            fprintf( stderr , "pthread_create errorn");
            //free( par_tp );
            mc_freelist( par_tp->tasks ) ;
            free(par_tp->pthreads) ;
        }
    }

    par_tp->thread_pool_task_handler = par_handler ;
    par_tp->ready = 0 ;
    fprintf(stderr,"successed to create threadsn");
}

在初始化函数中,我们传递了一个函数执行体的入口点,也就是函数指针给线程池,当我们有任务的时候,一个线程被唤醒,执行相应的回调函数。

其他需要注意的地方是使用 for循环来创建很多的线程,并利用数组方式记录了线程的id 。

创建线程时候的回调函数并不是我们的参数传递的回调函数地址。因为在创建线程好线程的时候,我们需要一个阻塞操作,使得线程处于睡眠状态,不然函数执行完毕后线程就退出了。所以,创建线程时候的回调函数是这样的:

static void *mc_thread_entry( void *args )
{
    void * task ;
    for(;;)
    {
        pthread_mutex_lock( &mc_global_threads_pool.queue_lock ) ;
        fprintf(stderr, " locked to wait taskn");
        while( mc_global_threads_pool.ready == 0 )
        {
            pthread_cond_wait( &mc_global_threads_pool.task_cond , &mc_global_threads_pool.queue_lock ) ;
        }
        task = mc_thread_pool_get_task() ;
        fprintf(stderr, "get a task and ready to unlock n");
        pthread_mutex_unlock( &mc_global_threads_pool.queue_lock ) ;
        mc_global_threads_pool.thread_pool_task_handler( task ) ;
    }
}

需要注意的一点是,我们要用两个变量来判断一个队列是否就绪,ready 和条件变量本身。

判断条件是 while() 而不是 if,这样可以使得线程在没有工作任务的时候,也就是工作队列为空的时候阻塞在 pthread_cond_wait 上,关于pthread_cond_wait 的工作机制可以参考IBM developerworks上的很多好文章。

pthread_cond_wait 在发现没有任务的时候,条件不成立的时候,是会有一个默认的操作的,就是释放锁,第二个参数的锁,使得其他线程可以得到condition 的竞争权利。所以我们在函数体内 pthread_cond_wait 的调用上下有一个加锁和释放锁的操作。

在函数内部有一个  mc_global_threads_pool.thread_pool_task_handler( task ) 这个操作就是线程内部得到了任务后调用回调函数过程。

将任务队列加入的函数实例如下:

void mc_thread_pool_add_task(void *task , size_t tasksize )
{
    pthread_mutex_lock( &mc_global_threads_pool.queue_lock );

    fprintf( stderr ,"thread locked and append to listn");

    mc_list_append( mc_global_threads_pool.tasks , task , tasksize ) ;

    pthread_mutex_unlock( &mc_global_threads_pool.queue_lock );

    fprintf( stderr ,"thread unlocked and successed append to listn");

    mc_global_threads_pool.ready = 1 ;

    if( mc_global_threads_pool.ready == 1 )
    {
        fprintf( stderr ,"signal to threadsn");
        pthread_cond_signal( &mc_global_threads_pool.task_cond ) ;
    }
}

  

这里使用了 ready 来判断是有任务,如果有,使用 pthread_cond_signal 来唤醒一个等待的线程。

取得一个队列的任务方式很简单,直接返回队列的第一个任务:

 

void *mc_thread_pool_get_task()
{
    void * ret_task ;
    ret_task = mc_getnode_del( mc_global_threads_pool.tasks , 0 );
    if( ret_task == NULL )
    {
        fprintf(stderr,"get node_del errorn");
    }
    fprintf( stderr ," got a taskn");
    mc_global_threads_pool.ready = 0 ;
    if( ret_task == NULL )
    {
        fprintf(stderr, "getnode_del errorn");
        return NULL ;
    }
    else
        return ret_task ;
}

 主体框架是这样的:

定义一个自己的task结构体比如:

typedef struct _thread_task_t
{
    int     task_num ;
}mc_thread_task_t ;

定义自己的回调函数:

void my_thread_task_handler( void * task )
{

    fprintf(stderr,"task->tasknum %dn",((mc_thread_task_t *)task)->task_num );

    /*
     *  if the task is a event we can like this demo:
     *  (event_t *)task->handler( (event_t *)task );
     *  so in event_t structure there should be a callback called handler 
     */
}

  

函数主体就是这样:

int main()
{
    mc_thread_task_t ltask;
    ltask.task_num = 1 ;
    fprintf(stderr,"begin to ini pooln");
    mc_thread_pool_ini( &mc_global_threads_pool , 20 , my_thread_task_handler );
    mc_thread_pool_add_task( &ltask , sizeof(mc_thread_task_t) );
    int i = 0 ;
    for(;i < 10000; i++)
    {
        ltask.task_num = i ;
        mc_thread_pool_add_task( &ltask , sizeof(mc_thread_task_t) );
        sleep(1);
    }
    return 0;
}

线程池初始化的时候所传入的结构体就是自己定义的 task 的回调函数。

上述所说的是线程池一个方案。回到我们的服务端模型上来看。

我们的服务端的改写方式可以换成这样:

 

定义只的一个任务结构,比如说,我们定义为:

struct task
{
    int fd ;
}

void *task_handler( void *task )
{
        int fd = *(int *)task ;
        do_handler_fd( fd );
}

好了,我们的服务器主体框架可以是这样:

mc_thread_pool_ini( &mc_global_threads_pool , N , task_handler );  // 第二个参数为线程池工作线程数

for(;;)
{
    fd = accept();
    struct task * newtask = ( struct task *)malloc( sizeof(struct task) );
    newtask->fd = fd ;
    mc_thread_pool_add_task( &newtask,sizeof(struct task*) ); //将newtask 指针加入队列,而不是实例,可以减少队列的存储空间
}   

总结:

  线程池的方案能够减少线程创建时候带来的开销,但是对于临界资源的访问控制等变得更加的复杂,考虑的因素更多。这里没有完整的贴出线程池的代码。上述模型在平常使用的过程中适合并发连接数目不大的情况,IO密集型。对于CPU 密集型的服务端,线程池返回会加大资源消耗。下一篇文章我们来看看反应堆模型,异步事件驱动,非阻塞IO,并贴出一个简单的 epoll 的反应堆。

本文由金沙澳门官网送注册58发布于编程应用,转载请注明出处:线程池的简易达成,浅谈两种服务器端模型

关键词:

上一篇:没有了
下一篇:没有了