source: vendor/current/source3/lib/pthreadpool/pthreadpool.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 12.8 KB
Line 
1/*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "config.h"
21#include <errno.h>
22#include <stdio.h>
23#include <stdlib.h>
24#include <string.h>
25#include <pthread.h>
26#include <signal.h>
27#include <assert.h>
28#include <fcntl.h>
29#include "system/time.h"
30#include "system/filesys.h"
31#include "replace.h"
32
33#include "pthreadpool.h"
34#include "lib/util/dlinklist.h"
35
36struct pthreadpool_job {
37 int id;
38 void (*fn)(void *private_data);
39 void *private_data;
40};
41
42struct pthreadpool {
43 /*
44 * List pthreadpools for fork safety
45 */
46 struct pthreadpool *prev, *next;
47
48 /*
49 * Control access to this struct
50 */
51 pthread_mutex_t mutex;
52
53 /*
54 * Threads waiting for work do so here
55 */
56 pthread_cond_t condvar;
57
58 /*
59 * Array of jobs
60 */
61 size_t jobs_array_len;
62 struct pthreadpool_job *jobs;
63
64 size_t head;
65 size_t num_jobs;
66
67 /*
68 * pipe for signalling
69 */
70 int sig_pipe[2];
71
72 /*
73 * indicator to worker threads that they should shut down
74 */
75 int shutdown;
76
77 /*
78 * maximum number of threads
79 */
80 int max_threads;
81
82 /*
83 * Number of threads
84 */
85 int num_threads;
86
87 /*
88 * Number of idle threads
89 */
90 int num_idle;
91
92 /*
93 * An array of threads that require joining.
94 */
95 int num_exited;
96 pthread_t *exited; /* We alloc more */
97};
98
99static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
100static struct pthreadpool *pthreadpools = NULL;
101static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
102
103static void pthreadpool_prep_atfork(void);
104
105/*
106 * Initialize a thread pool
107 */
108
109int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
110{
111 struct pthreadpool *pool;
112 int ret;
113
114 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
115 if (pool == NULL) {
116 return ENOMEM;
117 }
118
119 pool->jobs_array_len = 4;
120 pool->jobs = calloc(
121 pool->jobs_array_len, sizeof(struct pthreadpool_job));
122
123 if (pool->jobs == NULL) {
124 free(pool);
125 return ENOMEM;
126 }
127
128 pool->head = pool->num_jobs = 0;
129
130 ret = pipe(pool->sig_pipe);
131 if (ret == -1) {
132 int err = errno;
133 free(pool->jobs);
134 free(pool);
135 return err;
136 }
137
138 ret = pthread_mutex_init(&pool->mutex, NULL);
139 if (ret != 0) {
140 close(pool->sig_pipe[0]);
141 close(pool->sig_pipe[1]);
142 free(pool->jobs);
143 free(pool);
144 return ret;
145 }
146
147 ret = pthread_cond_init(&pool->condvar, NULL);
148 if (ret != 0) {
149 pthread_mutex_destroy(&pool->mutex);
150 close(pool->sig_pipe[0]);
151 close(pool->sig_pipe[1]);
152 free(pool->jobs);
153 free(pool);
154 return ret;
155 }
156
157 pool->shutdown = 0;
158 pool->num_threads = 0;
159 pool->num_exited = 0;
160 pool->exited = NULL;
161 pool->max_threads = max_threads;
162 pool->num_idle = 0;
163
164 ret = pthread_mutex_lock(&pthreadpools_mutex);
165 if (ret != 0) {
166 pthread_cond_destroy(&pool->condvar);
167 pthread_mutex_destroy(&pool->mutex);
168 close(pool->sig_pipe[0]);
169 close(pool->sig_pipe[1]);
170 free(pool->jobs);
171 free(pool);
172 return ret;
173 }
174 DLIST_ADD(pthreadpools, pool);
175
176 ret = pthread_mutex_unlock(&pthreadpools_mutex);
177 assert(ret == 0);
178
179 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
180
181 *presult = pool;
182
183 return 0;
184}
185
186static void pthreadpool_prepare(void)
187{
188 int ret;
189 struct pthreadpool *pool;
190
191 ret = pthread_mutex_lock(&pthreadpools_mutex);
192 assert(ret == 0);
193
194 pool = pthreadpools;
195
196 while (pool != NULL) {
197 ret = pthread_mutex_lock(&pool->mutex);
198 assert(ret == 0);
199 pool = pool->next;
200 }
201}
202
203static void pthreadpool_parent(void)
204{
205 int ret;
206 struct pthreadpool *pool;
207
208 for (pool = DLIST_TAIL(pthreadpools);
209 pool != NULL;
210 pool = DLIST_PREV(pool)) {
211 ret = pthread_mutex_unlock(&pool->mutex);
212 assert(ret == 0);
213 }
214
215 ret = pthread_mutex_unlock(&pthreadpools_mutex);
216 assert(ret == 0);
217}
218
219static void pthreadpool_child(void)
220{
221 int ret;
222 struct pthreadpool *pool;
223
224 for (pool = DLIST_TAIL(pthreadpools);
225 pool != NULL;
226 pool = DLIST_PREV(pool)) {
227
228 close(pool->sig_pipe[0]);
229 close(pool->sig_pipe[1]);
230
231 ret = pipe(pool->sig_pipe);
232 assert(ret == 0);
233
234 pool->num_threads = 0;
235
236 pool->num_exited = 0;
237 free(pool->exited);
238 pool->exited = NULL;
239
240 pool->num_idle = 0;
241 pool->head = 0;
242 pool->num_jobs = 0;
243
244 ret = pthread_mutex_unlock(&pool->mutex);
245 assert(ret == 0);
246 }
247
248 ret = pthread_mutex_unlock(&pthreadpools_mutex);
249 assert(ret == 0);
250}
251
252static void pthreadpool_prep_atfork(void)
253{
254 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
255 pthreadpool_child);
256}
257
258/*
259 * Return the file descriptor which becomes readable when a job has
260 * finished
261 */
262
263int pthreadpool_signal_fd(struct pthreadpool *pool)
264{
265 return pool->sig_pipe[0];
266}
267
268/*
269 * Do a pthread_join() on all children that have exited, pool->mutex must be
270 * locked
271 */
272static void pthreadpool_join_children(struct pthreadpool *pool)
273{
274 int i;
275
276 for (i=0; i<pool->num_exited; i++) {
277 int ret;
278
279 ret = pthread_join(pool->exited[i], NULL);
280 if (ret != 0) {
281 /*
282 * Severe internal error, we can't do much but
283 * abort here.
284 */
285 abort();
286 }
287 }
288 pool->num_exited = 0;
289
290 /*
291 * Deliberately not free and NULL pool->exited. That will be
292 * re-used by realloc later.
293 */
294}
295
296/*
297 * Fetch a finished job number from the signal pipe
298 */
299
300int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
301 unsigned num_jobids)
302{
303 ssize_t to_read, nread;
304
305 nread = -1;
306 errno = EINTR;
307
308 to_read = sizeof(int) * num_jobids;
309
310 while ((nread == -1) && (errno == EINTR)) {
311 nread = read(pool->sig_pipe[0], jobids, to_read);
312 }
313 if (nread == -1) {
314 return -errno;
315 }
316 if ((nread % sizeof(int)) != 0) {
317 return -EINVAL;
318 }
319 return nread / sizeof(int);
320}
321
322/*
323 * Destroy a thread pool, finishing all threads working for it
324 */
325
326int pthreadpool_destroy(struct pthreadpool *pool)
327{
328 int ret, ret1;
329
330 ret = pthread_mutex_lock(&pool->mutex);
331 if (ret != 0) {
332 return ret;
333 }
334
335 if ((pool->num_jobs != 0) || pool->shutdown) {
336 ret = pthread_mutex_unlock(&pool->mutex);
337 assert(ret == 0);
338 return EBUSY;
339 }
340
341 if (pool->num_threads > 0) {
342 /*
343 * We have active threads, tell them to finish, wait for that.
344 */
345
346 pool->shutdown = 1;
347
348 if (pool->num_idle > 0) {
349 /*
350 * Wake the idle threads. They will find
351 * pool->shutdown to be set and exit themselves
352 */
353 ret = pthread_cond_broadcast(&pool->condvar);
354 if (ret != 0) {
355 pthread_mutex_unlock(&pool->mutex);
356 return ret;
357 }
358 }
359
360 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
361
362 if (pool->num_exited > 0) {
363 pthreadpool_join_children(pool);
364 continue;
365 }
366 /*
367 * A thread that shuts down will also signal
368 * pool->condvar
369 */
370 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
371 if (ret != 0) {
372 pthread_mutex_unlock(&pool->mutex);
373 return ret;
374 }
375 }
376 }
377
378 ret = pthread_mutex_unlock(&pool->mutex);
379 if (ret != 0) {
380 return ret;
381 }
382 ret = pthread_mutex_destroy(&pool->mutex);
383 ret1 = pthread_cond_destroy(&pool->condvar);
384
385 if (ret != 0) {
386 return ret;
387 }
388 if (ret1 != 0) {
389 return ret1;
390 }
391
392 ret = pthread_mutex_lock(&pthreadpools_mutex);
393 if (ret != 0) {
394 return ret;
395 }
396 DLIST_REMOVE(pthreadpools, pool);
397 ret = pthread_mutex_unlock(&pthreadpools_mutex);
398 assert(ret == 0);
399
400 close(pool->sig_pipe[0]);
401 pool->sig_pipe[0] = -1;
402
403 close(pool->sig_pipe[1]);
404 pool->sig_pipe[1] = -1;
405
406 free(pool->exited);
407 free(pool->jobs);
408 free(pool);
409
410 return 0;
411}
412
413/*
414 * Prepare for pthread_exit(), pool->mutex must be locked
415 */
416static void pthreadpool_server_exit(struct pthreadpool *pool)
417{
418 pthread_t *exited;
419
420 pool->num_threads -= 1;
421
422 exited = (pthread_t *)realloc(
423 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
424
425 if (exited == NULL) {
426 /* lost a thread status */
427 return;
428 }
429 pool->exited = exited;
430
431 pool->exited[pool->num_exited] = pthread_self();
432 pool->num_exited += 1;
433}
434
435static bool pthreadpool_get_job(struct pthreadpool *p,
436 struct pthreadpool_job *job)
437{
438 if (p->num_jobs == 0) {
439 return false;
440 }
441 *job = p->jobs[p->head];
442 p->head = (p->head+1) % p->jobs_array_len;
443 p->num_jobs -= 1;
444 return true;
445}
446
447static bool pthreadpool_put_job(struct pthreadpool *p,
448 int id,
449 void (*fn)(void *private_data),
450 void *private_data)
451{
452 struct pthreadpool_job *job;
453
454 if (p->num_jobs == p->jobs_array_len) {
455 struct pthreadpool_job *tmp;
456 size_t new_len = p->jobs_array_len * 2;
457
458 tmp = realloc(
459 p->jobs, sizeof(struct pthreadpool_job) * new_len);
460 if (tmp == NULL) {
461 return false;
462 }
463 p->jobs = tmp;
464
465 /*
466 * We just doubled the jobs array. The array implements a FIFO
467 * queue with a modulo-based wraparound, so we have to memcpy
468 * the jobs that are logically at the queue end but physically
469 * before the queue head into the reallocated area. The new
470 * space starts at the current jobs_array_len, and we have to
471 * copy everything before the current head job into the new
472 * area.
473 */
474 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
475 sizeof(struct pthreadpool_job) * p->head);
476
477 p->jobs_array_len = new_len;
478 }
479
480 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
481 job->id = id;
482 job->fn = fn;
483 job->private_data = private_data;
484
485 p->num_jobs += 1;
486
487 return true;
488}
489
490static void *pthreadpool_server(void *arg)
491{
492 struct pthreadpool *pool = (struct pthreadpool *)arg;
493 int res;
494
495 res = pthread_mutex_lock(&pool->mutex);
496 if (res != 0) {
497 return NULL;
498 }
499
500 while (1) {
501 struct timespec ts;
502 struct pthreadpool_job job;
503
504 /*
505 * idle-wait at most 1 second. If nothing happens in that
506 * time, exit this thread.
507 */
508
509 clock_gettime(CLOCK_REALTIME, &ts);
510 ts.tv_sec += 1;
511
512 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
513
514 pool->num_idle += 1;
515 res = pthread_cond_timedwait(
516 &pool->condvar, &pool->mutex, &ts);
517 pool->num_idle -= 1;
518
519 if (res == ETIMEDOUT) {
520
521 if (pool->num_jobs == 0) {
522 /*
523 * we timed out and still no work for
524 * us. Exit.
525 */
526 pthreadpool_server_exit(pool);
527 pthread_mutex_unlock(&pool->mutex);
528 return NULL;
529 }
530
531 break;
532 }
533 assert(res == 0);
534 }
535
536 if (pthreadpool_get_job(pool, &job)) {
537 ssize_t written;
538 int sig_pipe = pool->sig_pipe[1];
539
540 /*
541 * Do the work with the mutex unlocked
542 */
543
544 res = pthread_mutex_unlock(&pool->mutex);
545 assert(res == 0);
546
547 job.fn(job.private_data);
548
549 res = pthread_mutex_lock(&pool->mutex);
550 assert(res == 0);
551
552 written = write(sig_pipe, &job.id, sizeof(job.id));
553 if (written != sizeof(int)) {
554 pthreadpool_server_exit(pool);
555 pthread_mutex_unlock(&pool->mutex);
556 return NULL;
557 }
558 }
559
560 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
561 /*
562 * No more work to do and we're asked to shut down, so
563 * exit
564 */
565 pthreadpool_server_exit(pool);
566
567 if (pool->num_threads == 0) {
568 /*
569 * Ping the main thread waiting for all of us
570 * workers to have quit.
571 */
572 pthread_cond_broadcast(&pool->condvar);
573 }
574
575 pthread_mutex_unlock(&pool->mutex);
576 return NULL;
577 }
578 }
579}
580
581int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
582 void (*fn)(void *private_data), void *private_data)
583{
584 pthread_t thread_id;
585 int res;
586 sigset_t mask, omask;
587
588 res = pthread_mutex_lock(&pool->mutex);
589 if (res != 0) {
590 return res;
591 }
592
593 if (pool->shutdown) {
594 /*
595 * Protect against the pool being shut down while
596 * trying to add a job
597 */
598 res = pthread_mutex_unlock(&pool->mutex);
599 assert(res == 0);
600 return EINVAL;
601 }
602
603 /*
604 * Just some cleanup under the mutex
605 */
606 pthreadpool_join_children(pool);
607
608 /*
609 * Add job to the end of the queue
610 */
611 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
612 pthread_mutex_unlock(&pool->mutex);
613 return ENOMEM;
614 }
615
616 if (pool->num_idle > 0) {
617 /*
618 * We have idle threads, wake one.
619 */
620 res = pthread_cond_signal(&pool->condvar);
621 pthread_mutex_unlock(&pool->mutex);
622 return res;
623 }
624
625 if ((pool->max_threads != 0) &&
626 (pool->num_threads >= pool->max_threads)) {
627 /*
628 * No more new threads, we just queue the request
629 */
630 pthread_mutex_unlock(&pool->mutex);
631 return 0;
632 }
633
634 /*
635 * Create a new worker thread. It should not receive any signals.
636 */
637
638 sigfillset(&mask);
639
640 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
641 if (res != 0) {
642 pthread_mutex_unlock(&pool->mutex);
643 return res;
644 }
645
646 res = pthread_create(&thread_id, NULL, pthreadpool_server,
647 (void *)pool);
648 if (res == 0) {
649 pool->num_threads += 1;
650 }
651
652 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
653
654 pthread_mutex_unlock(&pool->mutex);
655 return res;
656}
Note: See TracBrowser for help on using the repository browser.