线程池(Pools of threads)
在编程中你可能注意到你想要能够运行多个线程,并且你也想在某个限度上控制这些线程的行为。例如,在一个服务器中,你可能决定只让一个线程阻塞,等待来自客户端的一个消息。当这个线程获得了消息并开始处理这个请求的时候,你可能需要再创建一个新的线程来等待下一个请求的到来,以便在新的请求到了的时候由这个线程完成相应的处理。如此下来,过了一段时间所有的请求都被处理之后,你就会有多个线程在那里等待后续的客户端请求了。为了保护资源,你可能需要杀掉一些多余的线程。
这其实是一个常见的操作,Neutrino实时系统也提供了一个库来帮助处理这些操作。
现在需要注意的是这些线程池中的线程做了两个不同的操作:
- 阻塞(等待操作)
- 处理操作
阻塞操作并不消耗CPU。在典型的服务器中,线程就是这样等待消息的到达的。这与处理操作是不同的,处理操作根据处理结构的不同有可能消耗或不消耗CPU。
系统提供了如下的函数来处理线程池:
#include <sys/dispatch.h>
thread_pool_t *
thread_pool_create (thread_pool_attr_t *attr,
unsigned flags);
int
thread_pool_destroy (thread_pool_t *pool);
int
thread_pool_start (void *pool);
int
thread_pool_limits (thread_pool_t *pool,
int lowater,
int hiwater,
int maximum,
int increment,
unsigned flags);
int
thread_pool_control (thread_pool_t *pool,
thread_pool_attr_t *attr,
uint16_t lower,
uint16_t upper,
unsigned flags);
从提供的函数,你先使用thread_pool_create()来创建线程池的定义,之后通过thread_pool_start()来启动线程池。当使用完线程池之后,你可以使用thread_pool_destory()来清理线程池。如果你的程序是个永远运行的服务器的话,你可能永远没有机会调用这个thread_pool_destory()函数。thread_pool_limits()函数用来设定线程池的行为并调整线程池的属性的,thread_pool_control()函数是thread_pool_limits()函数的封装。
首先看看thread_pool_create()函数,它有2个参数,attr和flags。attr是定义线程池的操作特性的属性结构。
typedef struct _thread_pool_attr {
// thread pool functions and handle
THREAD_POOL_HANDLE_T *handle;
THREAD_POOL_PARAM_T
*(*block_func)(THREAD_POOL_PARAM_T *ctp);
void
(*unblock_func)(THREAD_POOL_PARAM_T *ctp);
int
(*handler_func)(THREAD_POOL_PARAM_T *ctp);
THREAD_POOL_PARAM_T
*(*context_alloc)(THREAD_POOL_HANDLE_T *handle);
void
(*context_free)(THREAD_POOL_PARAM_T *ctp);
// thread pool parameters
pthread_attr_t *attr;
unsigned short lo_water;
unsigned short increment;
unsigned short hi_water;
unsigned short maximum;
} thread_pool_attr_t;
这里我们将thread_pool_attr_t类型分为两个部分,一部分包含了函数以及线程池中线程的句柄,另一部分是线程池的操作参数。
控制线程的数量
先看看线程池参数,看是如何控制在线程池中操作的线程的数目与属性的。要注意我们要谈论的是“阻塞操作”与“处理操作”。
下面的框图演示了lo_water、hi_water与maximum参数之间的关系:
CA是context_alloc()函数,CF是context_free()函数,“阻塞操作” 是block_func()函数,“处理操作”是handler_func()函数。
attr:是线程创建是使用的属性结构,它控制了新创建线程的优先级、堆栈大小等等;
lo_water:应该总有不少于lo_water数目的线程执行阻塞操作。在典型的服务器中,这就是等待接收消息的线程的数目。如果执行阻塞操作的线程的数目少于lo_water,那么就有更多的线程按照increment参数被创建。这在框图中的标记为create thread的第一步中表示着。
increment:表示了当阻塞操作线程的数目低于lo_water时,一次创建的新线程的数目。如果你还不能确定,你可以先将其设为1.也就是说执行阻塞操作的线程数目低于lo_water之后,只有一个新线程会被线程池创建。为了优化你的程序中的这个参数,你需要观察进程的行为来确定将该值设为不是1的数。例如,你的进程有时会收到“爆炸式”的请求,那么你就得调整这个值使其能够应付这些请求,这时的值就应该大于1了。
hi_water:是处于阻塞操作的线程数目的上限。当线程结束操作之后一般就会返回到阻塞操作状态。但是,线程池会统计当前有多少个线程处于阻塞操作状态,如果数目大于hi_water,线程池就会杀掉导致溢出的线程。这在框图中,就是从处理操作块中split出来的操作,在那里有两个路径,一个是返回到“阻塞操作”,另一个是进入CF来销毁线程。lo_water与high_water的结合可以让你能够控制处于阻塞操作的线程的数目范围。
maximum:是线程池中可以同时运行的线程的最大数目。
另外控制线程的关键参数是传送给thread_pool_create()的flags参数。它可以是下面的一个值:
POOL_FLAG_EXIT_SELF:thread_pool_start()函数将不返回,调用该函数的线程也不会成为线程池的一部分;
POOL_FLAG_USE_SELF:thread_pool_start()函数将不返回,调用该函数的线程将成为线程池的一部分;
0:thread_pool_start()函数将返回,新线程会按要求创建。
下面看一个例子:
/*
* part of tp1.c
*/
#include <sys/dispatch.h>
int
main ()
{
thread_pool_attr_t tp_attr;
void *tpp;
…
tp_attr.lo_water = 3;
tp_attr.increment = 2;
tp_attr.hi_water = 7;
tp_attr.maximum = 10;
…
tpp = thread_pool_create (&tp_attr, POOL_FLAG_USE_SELF);
if (tpp == NULL) {
fprintf (stderr,
"%s: can't thread_pool_create, errno %s\n",
progname, strerror (errno));
exit (EXIT_FAILURE);
}
thread_pool_start (tpp);
…
设置完元素之后,通过调用thread_pool_create()函数来创建线程池。这个函数会返回指向一个线程池控制结构的指针,我们会将这个指针与NULL比较以检查该函数是否出错。最后我们使用这个tpp指针来调用thread_pool_start()函数。
在这里我们使用了POOL_FLAG_USE_SELF参数,也就是说调用thread_pool_start()的线程将成为线程池中的一个线程。这时线程池中只有一个线程。由于我们设置的lo_water的值为3,库函数会立即创建increment个数的新线程,在我们的例子里面该值为2。之后,线程池中有3个线程,并且都处于阻塞状态。lo_water条件被满足了,high_water条件也被满足,并且maximum条件也满足了。
之后,阻塞操作的线程中有一个线程解除阻塞。这就意味着3个线程中的一个已不再是阻塞状态了。由于阻塞线程的数目小于lo_water,就出发了lo_water的处理机制,并有increment数目的新线程被创建。现在就一共有5个线程(4个处于阻塞状态,1个处于处理状态)。
更多的线程解除阻塞。假设处于处理状态的线程都没有完成它们的处理操作。下面的表格就演示了会发生的情况:
| Event | Proc Op | Blk Op | Total |
|---|---|---|---|
| Initial | 0 | 1 | 1 |
| lo_water trip | 0 | 3 | 3 |
| Unblock | 1 | 2 | 3 |
| lo_water trip | 1 | 4 | 5 |
| Unblock | 2 | 3 | 5 |
| Unblock | 3 | 2 | 5 |
| lo_water trip | 3 | 4 | 7 |
| Unblock | 4 | 3 | 7 |
| Unblock | 5 | 2 | 7 |
| lo_water trip | 5 | 4 | 9 |
| Unblock | 6 | 3 | 9 |
| Unblock | 7 | 2 | 9 |
| lo_water trip | 7 | 3 | 10 |
| Unblock | 8 | 2 | 10 |
| Unblock | 9 | 1 | 10 |
| Unblock | 10 | 0 | 10 |
可以看到库函数会一直检查lo_water变量并创建increment数目的新线程直到线程数目超过了maximum的限制。
这就是说这时,没有线程处于阻塞状态。假设线程结束了它们的处理请求,看看对hi_water触发器会有什么情况发生:
| Event | Proc Op | Blk Op | Total |
|---|---|---|---|
| Completion | 9 | 1 | 10 |
| Completion | 8 | 2 | 10 |
| Completion | 7 | 3 | 10 |
| Completion | 6 | 4 | 10 |
| Completion | 5 | 5 | 10 |
| Completion | 4 | 6 | 10 |
| Completion | 3 | 7 | 10 |
| Completion | 2 | 8 | 10 |
| hi_water trip | 2 | 7 | 9 |
| Completion | 1 | 8 | 9 |
| hi_water trip | 1 | 7 | 8 |
| Completion | 0 | 8 | 8 |
| hi_water trip | 0 | 7 | 7 |
可以注意到,在触发hi_water触发器之前线程结束处理状态时什么也没有发生。当线程结束的时候,它会先检查阻塞线程的数目,如果阻塞线程数目太多,它就会将自己杀死。这个结构中hi_water与lo_water限制的好处就是你有一个有效的“范围”,只要线程数目在该范围内你就没有必要创建或销毁线程。在我们的例子里面,在结束了上表中的操作后,我们的系统可以同时处理4个请求而不用创建新的线程。
线程池函数
现在我们已经知道如何控制线程的数目。下面再看看线程池结构的其他元素:
// thread pool functions and handle
THREAD_POOL_HANDLE_T *handle;
THREAD_POOL_PARAM_T
*(*block_func)(THREAD_POOL_PARAM_T *ctp);
void
(*unblock_func)(THREAD_POOL_PARAM_T *ctp);
int
(*handler_func)(THREAD_POOL_PARAM_T *ctp);
THREAD_POOL_PARAM_T
*(*context_alloc)(THREAD_POOL_HANDLE_T *handle);
void
(*context_free)(THREAD_POOL_PARAM_T *ctp);
在前面的线程运行框图中,每个新线程被创建的时候都调用了context_alloc()函数,在每个线程被销毁的时候都调用了context_free()函数。
上面的数据结构中的handle元素是作为唯一的参数传送给context_alloc()函数。context_alloc()函数负责每个线程的必要设置以及返回一个环境指针(ctp)。这个指针的内容完全由你控制,库函数对你的指针指向哪里是不感兴趣的。
环境变量被context_alloc()创建,之后就调用block_func()函数来执行阻塞操作。可以看到block_func()函数接收了context_alloc()函数的结果。一旦block_func()解除阻塞,它将返回一个环境指针,这个指针就会被库函数传递给handler_func()函数。handler_func()函数负责执行工作,例如,在一个典型的服务器中,就是客户端的消息被处理的部分。handler_func()函数必须返回一个0,非0值被保留为未来扩展用途。unblock_func()函数同样被保留为后续用途,现在就将其作为NULL。下面的伪代码可能将这些事情解释清楚:
FOREVER DO
IF (#threads < lo_water) THEN
IF (#threads_total < maximum) THEN
create new thread
context = (*context_alloc) (handle);
ENDIF
ENDIF
retval = (*block_func) (context);
(*handler_func) (retval);
IF (#threads > hi_water) THEN
(*context_free) (context)
kill thread
ENDIF
DONE
上面的例子是极端简化的,只是用来解释ctp以及handle参数的流向,以及对控制线程数的算法有些初步的印象。
Related posts:
近期评论