Changeset 988 for vendor/current/source3/lib/pthreadpool
- Timestamp:
- Nov 24, 2016, 1:14:11 PM (9 years ago)
- Location:
- vendor/current/source3/lib/pthreadpool
- Files:
-
- 2 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/source3/lib/pthreadpool/pthreadpool.c
r746 r988 18 18 */ 19 19 20 #include "config.h" 20 21 #include <errno.h> 21 22 #include <stdio.h> 22 #include <unistd.h>23 23 #include <stdlib.h> 24 24 #include <string.h> … … 27 27 #include <assert.h> 28 28 #include <fcntl.h> 29 #include <sys/time.h> 29 #include "system/time.h" 30 #include "system/filesys.h" 31 #include "replace.h" 30 32 31 33 #include "pthreadpool.h" … … 33 35 34 36 struct pthreadpool_job { 35 struct pthreadpool_job *next;36 37 int id; 37 38 void (*fn)(void *private_data); … … 56 57 57 58 /* 58 * List of work jobs 59 */ 60 struct pthreadpool_job *jobs, *last_job; 59 * Array of jobs 60 */ 61 size_t jobs_array_len; 62 struct pthreadpool_job *jobs; 63 64 size_t head; 65 size_t num_jobs; 61 66 62 67 /* … … 112 117 } 113 118 119 pool->jobs_array_len = 4; 120 pool->jobs = calloc( 121 pool->jobs_array_len, sizeof(struct pthreadpool_job)); 122 123 if (pool->jobs == NULL) { 124 free(pool); 125 return ENOMEM; 126 } 127 128 pool->head = pool->num_jobs = 0; 129 114 130 ret = pipe(pool->sig_pipe); 115 131 if (ret == -1) { 116 132 int err = errno; 133 free(pool->jobs); 117 134 free(pool); 118 135 return err; … … 123 140 close(pool->sig_pipe[0]); 124 141 close(pool->sig_pipe[1]); 142 free(pool->jobs); 125 143 free(pool); 126 144 return ret; … … 132 150 close(pool->sig_pipe[0]); 133 151 close(pool->sig_pipe[1]); 152 free(pool->jobs); 134 153 free(pool); 135 154 return ret; … … 137 156 138 157 pool->shutdown = 0; 139 pool->jobs = pool->last_job = NULL;140 158 pool->num_threads = 0; 141 159 pool->num_exited = 0; … … 150 168 close(pool->sig_pipe[0]); 151 169 close(pool->sig_pipe[1]); 170 free(pool->jobs); 152 171 free(pool); 153 172 return ret; … … 187 206 struct pthreadpool *pool; 188 207 189 pool = DLIST_TAIL(pthreadpools);190 191 while (1) {208 for (pool = DLIST_TAIL(pthreadpools); 209 pool != NULL; 210 pool = DLIST_PREV(pool)) { 192 211 ret = pthread_mutex_unlock(&pool->mutex); 193 212 assert(ret == 0); 194 195 if (pool == pthreadpools) {196 break;197 }198 pool = pool->prev;199 213 } 200 214 … … 208 222 struct pthreadpool *pool; 209 223 210 pool = DLIST_TAIL(pthreadpools); 211 212 while (1) { 224 for (pool = DLIST_TAIL(pthreadpools); 225 pool != NULL; 226 pool = DLIST_PREV(pool)) { 227 213 228 close(pool->sig_pipe[0]); 214 229 close(pool->sig_pipe[1]); … … 224 239 225 240 pool->num_idle = 0; 226 227 while (pool->jobs != NULL) { 228 struct pthreadpool_job *job; 229 job = pool->jobs; 230 pool->jobs = job->next; 231 free(job); 232 } 233 pool->last_job = NULL; 241 pool->head = 0; 242 pool->num_jobs = 0; 234 243 235 244 ret = pthread_mutex_unlock(&pool->mutex); 236 245 assert(ret == 0); 237 238 if (pool == pthreadpools) {239 break;240 }241 pool = pool->prev;242 246 } 243 247 … … 271 275 272 276 for (i=0; i<pool->num_exited; i++) { 273 pthread_join(pool->exited[i], NULL); 277 int ret; 278 279 ret = pthread_join(pool->exited[i], NULL); 280 if (ret != 0) { 281 /* 282 * Severe internal error, we can't do much but 283 * abort here. 284 */ 285 abort(); 286 } 274 287 } 275 288 pool->num_exited = 0; … … 285 298 */ 286 299 287 int pthreadpool_finished_job (struct pthreadpool *pool, int *jobid)288 { 289 int ret_jobid; 290 ssize_t nread;300 int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids, 301 unsigned num_jobids) 302 { 303 ssize_t to_read, nread; 291 304 292 305 nread = -1; 293 306 errno = EINTR; 294 307 308 to_read = sizeof(int) * num_jobids; 309 295 310 while ((nread == -1) && (errno == EINTR)) { 296 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));311 nread = read(pool->sig_pipe[0], jobids, to_read); 297 312 } 298 313 if (nread == -1) { 299 return errno; 300 } 301 if (nread != sizeof(int)) { 302 return EINVAL; 303 } 304 *jobid = ret_jobid; 305 return 0; 314 return -errno; 315 } 316 if ((nread % sizeof(int)) != 0) { 317 return -EINVAL; 318 } 319 return nread / sizeof(int); 306 320 } 307 321 … … 319 333 } 320 334 321 if ((pool-> jobs != NULL) || pool->shutdown) {335 if ((pool->num_jobs != 0) || pool->shutdown) { 322 336 ret = pthread_mutex_unlock(&pool->mutex); 323 337 assert(ret == 0); … … 334 348 if (pool->num_idle > 0) { 335 349 /* 336 * Wake the idle threads. They will find pool->quit to337 * be set and exit themselves350 * Wake the idle threads. They will find 351 * pool->shutdown to be set and exit themselves 338 352 */ 339 353 ret = pthread_cond_broadcast(&pool->condvar); … … 391 405 392 406 free(pool->exited); 407 free(pool->jobs); 393 408 free(pool); 394 409 … … 406 421 407 422 exited = (pthread_t *)realloc( 408 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));423 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1)); 409 424 410 425 if (exited == NULL) { … … 418 433 } 419 434 435 static bool pthreadpool_get_job(struct pthreadpool *p, 436 struct pthreadpool_job *job) 437 { 438 if (p->num_jobs == 0) { 439 return false; 440 } 441 *job = p->jobs[p->head]; 442 p->head = (p->head+1) % p->jobs_array_len; 443 p->num_jobs -= 1; 444 return true; 445 } 446 447 static bool pthreadpool_put_job(struct pthreadpool *p, 448 int id, 449 void (*fn)(void *private_data), 450 void *private_data) 451 { 452 struct pthreadpool_job *job; 453 454 if (p->num_jobs == p->jobs_array_len) { 455 struct pthreadpool_job *tmp; 456 size_t new_len = p->jobs_array_len * 2; 457 458 tmp = realloc( 459 p->jobs, sizeof(struct pthreadpool_job) * new_len); 460 if (tmp == NULL) { 461 return false; 462 } 463 p->jobs = tmp; 464 465 /* 466 * We just doubled the jobs array. The array implements a FIFO 467 * queue with a modulo-based wraparound, so we have to memcpy 468 * the jobs that are logically at the queue end but physically 469 * before the queue head into the reallocated area. The new 470 * space starts at the current jobs_array_len, and we have to 471 * copy everything before the current head job into the new 472 * area. 473 */ 474 memcpy(&p->jobs[p->jobs_array_len], p->jobs, 475 sizeof(struct pthreadpool_job) * p->head); 476 477 p->jobs_array_len = new_len; 478 } 479 480 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len]; 481 job->id = id; 482 job->fn = fn; 483 job->private_data = private_data; 484 485 p->num_jobs += 1; 486 487 return true; 488 } 489 420 490 static void *pthreadpool_server(void *arg) 421 491 { … … 429 499 430 500 while (1) { 431 struct timeval tv;432 501 struct timespec ts; 433 struct pthreadpool_job *job;502 struct pthreadpool_job job; 434 503 435 504 /* … … 438 507 */ 439 508 440 gettimeofday(&tv, NULL); 441 ts.tv_sec = tv.tv_sec + 1; 442 ts.tv_nsec = tv.tv_usec*1000; 443 444 while ((pool->jobs == NULL) && (pool->shutdown == 0)) { 509 clock_gettime(CLOCK_REALTIME, &ts); 510 ts.tv_sec += 1; 511 512 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) { 445 513 446 514 pool->num_idle += 1; … … 451 519 if (res == ETIMEDOUT) { 452 520 453 if (pool-> jobs == NULL) {521 if (pool->num_jobs == 0) { 454 522 /* 455 523 * we timed out and still no work for … … 466 534 } 467 535 468 job = pool->jobs; 469 470 if (job != NULL) { 536 if (pthreadpool_get_job(pool, &job)) { 471 537 ssize_t written; 472 473 /* 474 * Ok, there's work for us to do, remove the job from 475 * the pthreadpool list 476 */ 477 pool->jobs = job->next; 478 if (pool->last_job == job) { 479 pool->last_job = NULL; 480 } 538 int sig_pipe = pool->sig_pipe[1]; 481 539 482 540 /* … … 487 545 assert(res == 0); 488 546 489 job->fn(job->private_data); 490 491 written = write(pool->sig_pipe[1], &job->id, 492 sizeof(int)); 493 494 free(job); 547 job.fn(job.private_data); 495 548 496 549 res = pthread_mutex_lock(&pool->mutex); 497 550 assert(res == 0); 498 551 552 written = write(sig_pipe, &job.id, sizeof(job.id)); 499 553 if (written != sizeof(int)) { 500 554 pthreadpool_server_exit(pool); … … 504 558 } 505 559 506 if ((pool-> jobs == NULL) && (pool->shutdown != 0)) {560 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) { 507 561 /* 508 562 * No more work to do and we're asked to shut down, so … … 528 582 void (*fn)(void *private_data), void *private_data) 529 583 { 530 struct pthreadpool_job *job;531 584 pthread_t thread_id; 532 585 int res; 533 586 sigset_t mask, omask; 534 587 535 job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));536 if (job == NULL) {537 return ENOMEM;538 }539 540 job->fn = fn;541 job->private_data = private_data;542 job->id = job_id;543 job->next = NULL;544 545 588 res = pthread_mutex_lock(&pool->mutex); 546 589 if (res != 0) { 547 free(job);548 590 return res; 549 591 } … … 556 598 res = pthread_mutex_unlock(&pool->mutex); 557 599 assert(res == 0); 558 free(job);559 600 return EINVAL; 560 601 } … … 568 609 * Add job to the end of the queue 569 610 */ 570 if (pool->jobs == NULL) { 571 pool->jobs = job; 572 } 573 else { 574 pool->last_job->next = job; 575 } 576 pool->last_job = job; 611 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) { 612 pthread_mutex_unlock(&pool->mutex); 613 return ENOMEM; 614 } 577 615 578 616 if (pool->num_idle > 0) { -
vendor/current/source3/lib/pthreadpool/pthreadpool.h
r746 r988 62 62 * This adds a job to a pthreadpool. The job can be identified by 63 63 * job_id. This integer will be returned from 64 * pthreadpool_finished_job () then the job is completed.64 * pthreadpool_finished_jobs() then the job is completed. 65 65 * 66 66 * @param[in] pool The pool to run the job on … … 76 76 * @brief Get the signalling fd from a pthreadpool 77 77 * 78 * Completion of a job is indicated by readability of the fd retu ned78 * Completion of a job is indicated by readability of the fd returned 79 79 * by pthreadpool_signal_fd(). 80 80 * … … 85 85 86 86 /** 87 * @brief Get the job_id of a finished job87 * @brief Get the job_ids of finished jobs 88 88 * 89 89 * This blocks until a job has finished unless the fd returned by … … 91 91 * 92 92 * @param[in] pool The pool to query for finished jobs 93 * @param[out] pjobid The job_id of the finished job 94 * @return success: 0, failure: errno 93 * @param[out] jobids The job_ids of the finished job 94 * @param[int] num_jobids The job_ids array size 95 * @return success: >=0, number of finished jobs 96 * failure: -errno 95 97 */ 96 int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid); 98 int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids, 99 unsigned num_jobids); 97 100 98 101 #endif -
vendor/current/source3/lib/pthreadpool/tests.c
r746 r988 6 6 #include <pthread.h> 7 7 #include <unistd.h> 8 #include <sys/types.h> 9 #include <sys/wait.h> 8 10 #include "pthreadpool.h" 9 11 … … 70 72 for (i=0; i<num_jobs; i++) { 71 73 int jobid = -1; 72 ret = pthreadpool_finished_job (p, &jobid);73 if ((ret != 0) || (jobid >= num_jobs)) {74 ret = pthreadpool_finished_jobs(p, &jobid, 1); 75 if ((ret != 1) || (jobid >= num_jobs)) { 74 76 fprintf(stderr, "invalid job number %d\n", jobid); 75 77 return -1; … … 283 285 } 284 286 285 ret = pthreadpool_finished_job (pools[j], &jobid);286 if ((ret != 0) || (jobid >= num_jobs * num_threads)) {287 ret = pthreadpool_finished_jobs(pools[j], &jobid, 1); 288 if ((ret != 1) || (jobid >= num_jobs * num_threads)) { 287 289 fprintf(stderr, "invalid job number %d\n", 288 290 jobid); … … 319 321 } 320 322 323 static int test_fork(void) 324 { 325 struct pthreadpool *p; 326 pid_t child, waited; 327 int status, ret; 328 329 ret = pthreadpool_init(1, &p); 330 if (ret != 0) { 331 fprintf(stderr, "pthreadpool_init failed: %s\n", 332 strerror(ret)); 333 return -1; 334 } 335 ret = pthreadpool_destroy(p); 336 if (ret != 0) { 337 fprintf(stderr, "pthreadpool_destroy failed: %s\n", 338 strerror(ret)); 339 return -1; 340 } 341 342 child = fork(); 343 if (child < 0) { 344 perror("fork failed"); 345 return -1; 346 } 347 if (child == 0) { 348 exit(0); 349 } 350 waited = wait(&status); 351 if (waited == -1) { 352 perror("wait failed"); 353 return -1; 354 } 355 if (waited != child) { 356 fprintf(stderr, "expected child %d, got %d\n", 357 (int)child, (int)waited); 358 return -1; 359 } 360 return 0; 361 } 362 321 363 int main(void) 322 364 { … … 326 368 if (ret != 0) { 327 369 fprintf(stderr, "test_init failed\n"); 370 return 1; 371 } 372 373 ret = test_fork(); 374 if (ret != 0) { 375 fprintf(stderr, "test_fork failed\n"); 328 376 return 1; 329 377 }
Note:
See TracChangeset
for help on using the changeset viewer.