首页 > 操作系统 > 线程池(Pools of threads)

线程池(Pools of threads)

2010年2月26日 发表评论 阅读评论

  在编程中你可能注意到你想要能够运行多个线程,并且你也想在某个限度上控制这些线程的行为。例如,在一个服务器中,你可能决定只让一个线程阻塞,等待来自客户端的一个消息。当这个线程获得了消息并开始处理这个请求的时候,你可能需要再创建一个新的线程来等待下一个请求的到来,以便在新的请求到了的时候由这个线程完成相应的处理。如此下来,过了一段时间所有的请求都被处理之后,你就会有多个线程在那里等待后续的客户端请求了。为了保护资源,你可能需要杀掉一些多余的线程。

  这其实是一个常见的操作,Neutrino实时系统也提供了一个库来帮助处理这些操作。

  现在需要注意的是这些线程池中的线程做了两个不同的操作:

 

  • 阻塞(等待操作)  
  • 处理操作

  阻塞操作并不消耗CPU。在典型的服务器中,线程就是这样等待消息的到达的。这与处理操作是不同的,处理操作根据处理结构的不同有可能消耗或不消耗CPU。

  系统提供了如下的函数来处理线程池:

#include <sys/dispatch.h>

thread_pool_t *
thread_pool_create (thread_pool_attr_t *attr,
                    unsigned flags);

int
thread_pool_destroy (thread_pool_t *pool);

int
thread_pool_start (void *pool);

int
thread_pool_limits (thread_pool_t *pool,
                    int lowater,
                    int hiwater,
                    int maximum,
                    int increment,
                    unsigned flags);

int
thread_pool_control (thread_pool_t *pool,
                     thread_pool_attr_t *attr,
                     uint16_t lower,
                     uint16_t upper,
                     unsigned flags);

  从提供的函数,你先使用thread_pool_create()来创建线程池的定义,之后通过thread_pool_start()来启动线程池。当使用完线程池之后,你可以使用thread_pool_destory()来清理线程池。如果你的程序是个永远运行的服务器的话,你可能永远没有机会调用这个thread_pool_destory()函数。thread_pool_limits()函数用来设定线程池的行为并调整线程池的属性的,thread_pool_control()函数是thread_pool_limits()函数的封装。

  首先看看thread_pool_create()函数,它有2个参数,attr和flags。attr是定义线程池的操作特性的属性结构。

typedef struct _thread_pool_attr {
    // thread pool functions and handle
    THREAD_POOL_HANDLE_T    *handle;

    THREAD_POOL_PARAM_T
        *(*block_func)(THREAD_POOL_PARAM_T *ctp);

    void
        (*unblock_func)(THREAD_POOL_PARAM_T *ctp);

    int
        (*handler_func)(THREAD_POOL_PARAM_T *ctp);

    THREAD_POOL_PARAM_T
        *(*context_alloc)(THREAD_POOL_HANDLE_T *handle);

    void
        (*context_free)(THREAD_POOL_PARAM_T *ctp);

    // thread pool parameters
    pthread_attr_t          *attr;
    unsigned short          lo_water;
    unsigned short          increment;
    unsigned short          hi_water;
    unsigned short          maximum;
} thread_pool_attr_t;

  这里我们将thread_pool_attr_t类型分为两个部分,一部分包含了函数以及线程池中线程的句柄,另一部分是线程池的操作参数。

控制线程的数量

  先看看线程池参数,看是如何控制在线程池中操作的线程的数目与属性的。要注意我们要谈论的是“阻塞操作”与“处理操作”。

  下面的框图演示了lo_water、hi_water与maximum参数之间的关系:

线程

  CA是context_alloc()函数,CF是context_free()函数,“阻塞操作”  是block_func()函数,“处理操作”是handler_func()函数。

attr:是线程创建是使用的属性结构,它控制了新创建线程的优先级、堆栈大小等等;

lo_water:应该总有不少于lo_water数目的线程执行阻塞操作。在典型的服务器中,这就是等待接收消息的线程的数目。如果执行阻塞操作的线程的数目少于lo_water,那么就有更多的线程按照increment参数被创建。这在框图中的标记为create thread的第一步中表示着。

increment:表示了当阻塞操作线程的数目低于lo_water时,一次创建的新线程的数目。如果你还不能确定,你可以先将其设为1.也就是说执行阻塞操作的线程数目低于lo_water之后,只有一个新线程会被线程池创建。为了优化你的程序中的这个参数,你需要观察进程的行为来确定将该值设为不是1的数。例如,你的进程有时会收到“爆炸式”的请求,那么你就得调整这个值使其能够应付这些请求,这时的值就应该大于1了。

hi_water:是处于阻塞操作的线程数目的上限。当线程结束操作之后一般就会返回到阻塞操作状态。但是,线程池会统计当前有多少个线程处于阻塞操作状态,如果数目大于hi_water,线程池就会杀掉导致溢出的线程。这在框图中,就是从处理操作块中split出来的操作,在那里有两个路径,一个是返回到“阻塞操作”,另一个是进入CF来销毁线程。lo_water与high_water的结合可以让你能够控制处于阻塞操作的线程的数目范围。

maximum:是线程池中可以同时运行的线程的最大数目。

  另外控制线程的关键参数是传送给thread_pool_create()的flags参数。它可以是下面的一个值:

POOL_FLAG_EXIT_SELF:thread_pool_start()函数将不返回,调用该函数的线程也不会成为线程池的一部分;

POOL_FLAG_USE_SELF:thread_pool_start()函数将不返回,调用该函数的线程将成为线程池的一部分;

0:thread_pool_start()函数将返回,新线程会按要求创建。

  下面看一个例子:

/*
 * part of 	tp1.c
*/

#include <sys/dispatch.h>

int
main ()
{
    thread_pool_attr_t  tp_attr;
    void                *tpp;

    …
    tp_attr.lo_water  = 3;
    tp_attr.increment = 2;
    tp_attr.hi_water  = 7;
    tp_attr.maximum   = 10;
    …

    tpp = thread_pool_create (&tp_attr, POOL_FLAG_USE_SELF);
    if (tpp == NULL) {
        fprintf (stderr,
                 "%s:  can't thread_pool_create, errno %s\n",
                 progname, strerror (errno));
        exit (EXIT_FAILURE);
    }

    thread_pool_start (tpp);
    …

  设置完元素之后,通过调用thread_pool_create()函数来创建线程池。这个函数会返回指向一个线程池控制结构的指针,我们会将这个指针与NULL比较以检查该函数是否出错。最后我们使用这个tpp指针来调用thread_pool_start()函数。

  在这里我们使用了POOL_FLAG_USE_SELF参数,也就是说调用thread_pool_start()的线程将成为线程池中的一个线程。这时线程池中只有一个线程。由于我们设置的lo_water的值为3,库函数会立即创建increment个数的新线程,在我们的例子里面该值为2。之后,线程池中有3个线程,并且都处于阻塞状态。lo_water条件被满足了,high_water条件也被满足,并且maximum条件也满足了。

  之后,阻塞操作的线程中有一个线程解除阻塞。这就意味着3个线程中的一个已不再是阻塞状态了。由于阻塞线程的数目小于lo_water,就出发了lo_water的处理机制,并有increment数目的新线程被创建。现在就一共有5个线程(4个处于阻塞状态,1个处于处理状态)。

  更多的线程解除阻塞。假设处于处理状态的线程都没有完成它们的处理操作。下面的表格就演示了会发生的情况:

Event Proc Op Blk Op Total
Initial 0 1 1
lo_water trip 0 3 3
Unblock 1 2 3
lo_water trip 1 4 5
Unblock 2 3 5
Unblock 3 2 5
lo_water trip 3 4 7
Unblock 4 3 7
Unblock 5 2 7
lo_water trip 5 4 9
Unblock 6 3 9
Unblock 7 2 9
lo_water trip 7 3 10
Unblock 8 2 10
Unblock 9 1 10
Unblock 10 0 10

  可以看到库函数会一直检查lo_water变量并创建increment数目的新线程直到线程数目超过了maximum的限制。

  这就是说这时,没有线程处于阻塞状态。假设线程结束了它们的处理请求,看看对hi_water触发器会有什么情况发生:

Event Proc Op Blk Op Total
Completion 9 1 10
Completion 8 2 10
Completion 7 3 10
Completion 6 4 10
Completion 5 5 10
Completion 4 6 10
Completion 3 7 10
Completion 2 8 10
hi_water trip 2 7 9
Completion 1 8 9
hi_water trip 1 7 8
Completion 0 8 8
hi_water trip 0 7 7

  可以注意到,在触发hi_water触发器之前线程结束处理状态时什么也没有发生。当线程结束的时候,它会先检查阻塞线程的数目,如果阻塞线程数目太多,它就会将自己杀死。这个结构中hi_water与lo_water限制的好处就是你有一个有效的“范围”,只要线程数目在该范围内你就没有必要创建或销毁线程。在我们的例子里面,在结束了上表中的操作后,我们的系统可以同时处理4个请求而不用创建新的线程。

线程池函数

  现在我们已经知道如何控制线程的数目。下面再看看线程池结构的其他元素:

    // thread pool functions and handle
    THREAD_POOL_HANDLE_T    *handle;

    THREAD_POOL_PARAM_T
        *(*block_func)(THREAD_POOL_PARAM_T *ctp);

    void
        (*unblock_func)(THREAD_POOL_PARAM_T *ctp);

    int
        (*handler_func)(THREAD_POOL_PARAM_T *ctp);

    THREAD_POOL_PARAM_T
        *(*context_alloc)(THREAD_POOL_HANDLE_T *handle);

    void
        (*context_free)(THREAD_POOL_PARAM_T *ctp);

  在前面的线程运行框图中,每个新线程被创建的时候都调用了context_alloc()函数,在每个线程被销毁的时候都调用了context_free()函数。

  上面的数据结构中的handle元素是作为唯一的参数传送给context_alloc()函数。context_alloc()函数负责每个线程的必要设置以及返回一个环境指针(ctp)。这个指针的内容完全由你控制,库函数对你的指针指向哪里是不感兴趣的。

  环境变量被context_alloc()创建,之后就调用block_func()函数来执行阻塞操作。可以看到block_func()函数接收了context_alloc()函数的结果。一旦block_func()解除阻塞,它将返回一个环境指针,这个指针就会被库函数传递给handler_func()函数。handler_func()函数负责执行工作,例如,在一个典型的服务器中,就是客户端的消息被处理的部分。handler_func()函数必须返回一个0,非0值被保留为未来扩展用途。unblock_func()函数同样被保留为后续用途,现在就将其作为NULL。下面的伪代码可能将这些事情解释清楚:

FOREVER DO
    IF (#threads < lo_water) THEN
        IF (#threads_total < maximum) THEN
            create new thread
            context = (*context_alloc) (handle);
        ENDIF
    ENDIF
    retval = (*block_func) (context);
    (*handler_func) (retval);
    IF (#threads > hi_water) THEN
        (*context_free) (context)
        kill thread
    ENDIF
DONE

  上面的例子是极端简化的,只是用来解释ctp以及handle参数的流向,以及对控制线程数的算法有些初步的印象。

Related posts:

  1. 线程的启动
  2. 多线程中壁垒(barrier)的使用
  3. 用于线程同步的条件变量(Condition Variables)
  4. 用于线程同步的Sleepon锁
  5. 互不关联的多线程

  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.