source: trunk/server/source3/lib/pthreadpool/pthreadpool.c

Last change on this file was 751, checked in by Silvan Scherrer, 13 years ago

Samba Server: updated trunk to 3.6.9

File size: 12.2 KB
Line 
1/*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <errno.h>
21#include <stdio.h>
22#include <unistd.h>
23#include <stdlib.h>
24#include <string.h>
25#include <pthread.h>
26#include <signal.h>
27#include <assert.h>
28#include <fcntl.h>
29#include <sys/time.h>
30
31#include "pthreadpool.h"
32#include "lib/util/dlinklist.h"
33#ifdef __OS2__
34#define pipe(A) os2_pipe(A)
35#endif
36
37struct pthreadpool_job {
38 struct pthreadpool_job *next;
39 int id;
40 void (*fn)(void *private_data);
41 void *private_data;
42};
43
44struct pthreadpool {
45 /*
46 * List pthreadpools for fork safety
47 */
48 struct pthreadpool *prev, *next;
49
50 /*
51 * Control access to this struct
52 */
53 pthread_mutex_t mutex;
54
55 /*
56 * Threads waiting for work do so here
57 */
58 pthread_cond_t condvar;
59
60 /*
61 * List of work jobs
62 */
63 struct pthreadpool_job *jobs, *last_job;
64
65 /*
66 * pipe for signalling
67 */
68 int sig_pipe[2];
69
70 /*
71 * indicator to worker threads that they should shut down
72 */
73 int shutdown;
74
75 /*
76 * maximum number of threads
77 */
78 int max_threads;
79
80 /*
81 * Number of threads
82 */
83 int num_threads;
84
85 /*
86 * Number of idle threads
87 */
88 int num_idle;
89
90 /*
91 * An array of threads that require joining.
92 */
93 int num_exited;
94 pthread_t *exited; /* We alloc more */
95};
96
97static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
98static struct pthreadpool *pthreadpools = NULL;
99static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
100
101static void pthreadpool_prep_atfork(void);
102
103/*
104 * Initialize a thread pool
105 */
106
107int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
108{
109 struct pthreadpool *pool;
110 int ret;
111
112 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
113 if (pool == NULL) {
114 return ENOMEM;
115 }
116
117 ret = pipe(pool->sig_pipe);
118 if (ret == -1) {
119 int err = errno;
120 free(pool);
121 return err;
122 }
123
124 ret = pthread_mutex_init(&pool->mutex, NULL);
125 if (ret != 0) {
126 close(pool->sig_pipe[0]);
127 close(pool->sig_pipe[1]);
128 free(pool);
129 return ret;
130 }
131
132 ret = pthread_cond_init(&pool->condvar, NULL);
133 if (ret != 0) {
134 pthread_mutex_destroy(&pool->mutex);
135 close(pool->sig_pipe[0]);
136 close(pool->sig_pipe[1]);
137 free(pool);
138 return ret;
139 }
140
141 pool->shutdown = 0;
142 pool->jobs = pool->last_job = NULL;
143 pool->num_threads = 0;
144 pool->num_exited = 0;
145 pool->exited = NULL;
146 pool->max_threads = max_threads;
147 pool->num_idle = 0;
148
149 ret = pthread_mutex_lock(&pthreadpools_mutex);
150 if (ret != 0) {
151 pthread_cond_destroy(&pool->condvar);
152 pthread_mutex_destroy(&pool->mutex);
153 close(pool->sig_pipe[0]);
154 close(pool->sig_pipe[1]);
155 free(pool);
156 return ret;
157 }
158 DLIST_ADD(pthreadpools, pool);
159
160 ret = pthread_mutex_unlock(&pthreadpools_mutex);
161 assert(ret == 0);
162
163 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
164
165 *presult = pool;
166
167 return 0;
168}
169
170static void pthreadpool_prepare(void)
171{
172 int ret;
173 struct pthreadpool *pool;
174
175 ret = pthread_mutex_lock(&pthreadpools_mutex);
176 assert(ret == 0);
177
178 pool = pthreadpools;
179
180 while (pool != NULL) {
181 ret = pthread_mutex_lock(&pool->mutex);
182 assert(ret == 0);
183 pool = pool->next;
184 }
185}
186
187static void pthreadpool_parent(void)
188{
189 int ret;
190 struct pthreadpool *pool;
191
192 pool = DLIST_TAIL(pthreadpools);
193
194 while (1) {
195 ret = pthread_mutex_unlock(&pool->mutex);
196 assert(ret == 0);
197
198 if (pool == pthreadpools) {
199 break;
200 }
201 pool = pool->prev;
202 }
203
204 ret = pthread_mutex_unlock(&pthreadpools_mutex);
205 assert(ret == 0);
206}
207
208static void pthreadpool_child(void)
209{
210 int ret;
211 struct pthreadpool *pool;
212
213 pool = DLIST_TAIL(pthreadpools);
214
215 while (1) {
216 close(pool->sig_pipe[0]);
217 close(pool->sig_pipe[1]);
218
219 ret = pipe(pool->sig_pipe);
220 assert(ret == 0);
221
222 pool->num_threads = 0;
223
224 pool->num_exited = 0;
225 free(pool->exited);
226 pool->exited = NULL;
227
228 pool->num_idle = 0;
229
230 while (pool->jobs != NULL) {
231 struct pthreadpool_job *job;
232 job = pool->jobs;
233 pool->jobs = job->next;
234 free(job);
235 }
236 pool->last_job = NULL;
237
238 ret = pthread_mutex_unlock(&pool->mutex);
239 assert(ret == 0);
240
241 if (pool == pthreadpools) {
242 break;
243 }
244 pool = pool->prev;
245 }
246
247 ret = pthread_mutex_unlock(&pthreadpools_mutex);
248 assert(ret == 0);
249}
250
251static void pthreadpool_prep_atfork(void)
252{
253 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
254 pthreadpool_child);
255}
256
257/*
258 * Return the file descriptor which becomes readable when a job has
259 * finished
260 */
261
262int pthreadpool_signal_fd(struct pthreadpool *pool)
263{
264 return pool->sig_pipe[0];
265}
266
267/*
268 * Do a pthread_join() on all children that have exited, pool->mutex must be
269 * locked
270 */
271static void pthreadpool_join_children(struct pthreadpool *pool)
272{
273 int i;
274
275 for (i=0; i<pool->num_exited; i++) {
276 pthread_join(pool->exited[i], NULL);
277 }
278 pool->num_exited = 0;
279
280 /*
281 * Deliberately not free and NULL pool->exited. That will be
282 * re-used by realloc later.
283 */
284}
285
286/*
287 * Fetch a finished job number from the signal pipe
288 */
289
290int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
291{
292 int ret_jobid;
293 ssize_t nread;
294
295 nread = -1;
296 errno = EINTR;
297
298 while ((nread == -1) && (errno == EINTR)) {
299 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
300 }
301 if (nread == -1) {
302 return errno;
303 }
304 if (nread != sizeof(int)) {
305 return EINVAL;
306 }
307 *jobid = ret_jobid;
308 return 0;
309}
310
311/*
312 * Destroy a thread pool, finishing all threads working for it
313 */
314
315int pthreadpool_destroy(struct pthreadpool *pool)
316{
317 int ret, ret1;
318
319 ret = pthread_mutex_lock(&pool->mutex);
320 if (ret != 0) {
321 return ret;
322 }
323
324 if ((pool->jobs != NULL) || pool->shutdown) {
325 ret = pthread_mutex_unlock(&pool->mutex);
326 assert(ret == 0);
327 return EBUSY;
328 }
329
330 if (pool->num_threads > 0) {
331 /*
332 * We have active threads, tell them to finish, wait for that.
333 */
334
335 pool->shutdown = 1;
336
337 if (pool->num_idle > 0) {
338 /*
339 * Wake the idle threads. They will find pool->quit to
340 * be set and exit themselves
341 */
342 ret = pthread_cond_broadcast(&pool->condvar);
343 if (ret != 0) {
344 pthread_mutex_unlock(&pool->mutex);
345 return ret;
346 }
347 }
348
349 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
350
351 if (pool->num_exited > 0) {
352 pthreadpool_join_children(pool);
353 continue;
354 }
355 /*
356 * A thread that shuts down will also signal
357 * pool->condvar
358 */
359 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
360 if (ret != 0) {
361 pthread_mutex_unlock(&pool->mutex);
362 return ret;
363 }
364 }
365 }
366
367 ret = pthread_mutex_unlock(&pool->mutex);
368 if (ret != 0) {
369 return ret;
370 }
371 ret = pthread_mutex_destroy(&pool->mutex);
372 ret1 = pthread_cond_destroy(&pool->condvar);
373
374 if (ret != 0) {
375 return ret;
376 }
377 if (ret1 != 0) {
378 return ret1;
379 }
380
381 ret = pthread_mutex_lock(&pthreadpools_mutex);
382 if (ret != 0) {
383 return ret;
384 }
385 DLIST_REMOVE(pthreadpools, pool);
386 ret = pthread_mutex_unlock(&pthreadpools_mutex);
387 assert(ret == 0);
388
389 close(pool->sig_pipe[0]);
390 pool->sig_pipe[0] = -1;
391
392 close(pool->sig_pipe[1]);
393 pool->sig_pipe[1] = -1;
394
395 free(pool->exited);
396 free(pool);
397
398 return 0;
399}
400
401/*
402 * Prepare for pthread_exit(), pool->mutex must be locked
403 */
404static void pthreadpool_server_exit(struct pthreadpool *pool)
405{
406 pthread_t *exited;
407
408 pool->num_threads -= 1;
409
410 exited = (pthread_t *)realloc(
411 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
412
413 if (exited == NULL) {
414 /* lost a thread status */
415 return;
416 }
417 pool->exited = exited;
418
419 pool->exited[pool->num_exited] = pthread_self();
420 pool->num_exited += 1;
421}
422
423static void *pthreadpool_server(void *arg)
424{
425 struct pthreadpool *pool = (struct pthreadpool *)arg;
426 int res;
427
428 res = pthread_mutex_lock(&pool->mutex);
429 if (res != 0) {
430 return NULL;
431 }
432
433 while (1) {
434 struct timeval tv;
435 struct timespec ts;
436 struct pthreadpool_job *job;
437
438 /*
439 * idle-wait at most 1 second. If nothing happens in that
440 * time, exit this thread.
441 */
442
443 gettimeofday(&tv, NULL);
444 ts.tv_sec = tv.tv_sec + 1;
445 ts.tv_nsec = tv.tv_usec*1000;
446
447 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
448
449 pool->num_idle += 1;
450 res = pthread_cond_timedwait(
451 &pool->condvar, &pool->mutex, &ts);
452 pool->num_idle -= 1;
453
454 if (res == ETIMEDOUT) {
455
456 if (pool->jobs == NULL) {
457 /*
458 * we timed out and still no work for
459 * us. Exit.
460 */
461 pthreadpool_server_exit(pool);
462 pthread_mutex_unlock(&pool->mutex);
463 return NULL;
464 }
465
466 break;
467 }
468 assert(res == 0);
469 }
470
471 job = pool->jobs;
472
473 if (job != NULL) {
474 ssize_t written;
475
476 /*
477 * Ok, there's work for us to do, remove the job from
478 * the pthreadpool list
479 */
480 pool->jobs = job->next;
481 if (pool->last_job == job) {
482 pool->last_job = NULL;
483 }
484
485 /*
486 * Do the work with the mutex unlocked
487 */
488
489 res = pthread_mutex_unlock(&pool->mutex);
490 assert(res == 0);
491
492 job->fn(job->private_data);
493
494 written = write(pool->sig_pipe[1], &job->id,
495 sizeof(int));
496
497 free(job);
498
499 res = pthread_mutex_lock(&pool->mutex);
500 assert(res == 0);
501
502 if (written != sizeof(int)) {
503 pthreadpool_server_exit(pool);
504 pthread_mutex_unlock(&pool->mutex);
505 return NULL;
506 }
507 }
508
509 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
510 /*
511 * No more work to do and we're asked to shut down, so
512 * exit
513 */
514 pthreadpool_server_exit(pool);
515
516 if (pool->num_threads == 0) {
517 /*
518 * Ping the main thread waiting for all of us
519 * workers to have quit.
520 */
521 pthread_cond_broadcast(&pool->condvar);
522 }
523
524 pthread_mutex_unlock(&pool->mutex);
525 return NULL;
526 }
527 }
528}
529
530int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
531 void (*fn)(void *private_data), void *private_data)
532{
533 struct pthreadpool_job *job;
534 pthread_t thread_id;
535 int res;
536 sigset_t mask, omask;
537
538 job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
539 if (job == NULL) {
540 return ENOMEM;
541 }
542
543 job->fn = fn;
544 job->private_data = private_data;
545 job->id = job_id;
546 job->next = NULL;
547
548 res = pthread_mutex_lock(&pool->mutex);
549 if (res != 0) {
550 free(job);
551 return res;
552 }
553
554 if (pool->shutdown) {
555 /*
556 * Protect against the pool being shut down while
557 * trying to add a job
558 */
559 res = pthread_mutex_unlock(&pool->mutex);
560 assert(res == 0);
561 free(job);
562 return EINVAL;
563 }
564
565 /*
566 * Just some cleanup under the mutex
567 */
568 pthreadpool_join_children(pool);
569
570 /*
571 * Add job to the end of the queue
572 */
573 if (pool->jobs == NULL) {
574 pool->jobs = job;
575 }
576 else {
577 pool->last_job->next = job;
578 }
579 pool->last_job = job;
580
581 if (pool->num_idle > 0) {
582 /*
583 * We have idle threads, wake one.
584 */
585 res = pthread_cond_signal(&pool->condvar);
586 pthread_mutex_unlock(&pool->mutex);
587 return res;
588 }
589
590 if ((pool->max_threads != 0) &&
591 (pool->num_threads >= pool->max_threads)) {
592 /*
593 * No more new threads, we just queue the request
594 */
595 pthread_mutex_unlock(&pool->mutex);
596 return 0;
597 }
598
599 /*
600 * Create a new worker thread. It should not receive any signals.
601 */
602
603 sigfillset(&mask);
604
605 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
606 if (res != 0) {
607 pthread_mutex_unlock(&pool->mutex);
608 return res;
609 }
610
611 res = pthread_create(&thread_id, NULL, pthreadpool_server,
612 (void *)pool);
613 if (res == 0) {
614 pool->num_threads += 1;
615 }
616
617 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
618
619 pthread_mutex_unlock(&pool->mutex);
620 return res;
621}
Note: See TracBrowser for help on using the repository browser.