Ignore:
Timestamp:
Nov 24, 2016, 1:14:11 PM (9 years ago)
Author:
Silvan Scherrer
Message:

Samba Server: update vendor to version 4.4.3

Location:
vendor/current/source3/lib/pthreadpool
Files:
2 added
3 edited

Legend:

Unmodified
Added
Removed
  • vendor/current/source3/lib/pthreadpool/pthreadpool.c

    r746 r988  
    1818 */
    1919
     20#include "config.h"
    2021#include <errno.h>
    2122#include <stdio.h>
    22 #include <unistd.h>
    2323#include <stdlib.h>
    2424#include <string.h>
     
    2727#include <assert.h>
    2828#include <fcntl.h>
    29 #include <sys/time.h>
     29#include "system/time.h"
     30#include "system/filesys.h"
     31#include "replace.h"
    3032
    3133#include "pthreadpool.h"
     
    3335
    3436struct pthreadpool_job {
    35         struct pthreadpool_job *next;
    3637        int id;
    3738        void (*fn)(void *private_data);
     
    5657
    5758        /*
    58          * List of work jobs
    59          */
    60         struct pthreadpool_job *jobs, *last_job;
     59         * Array of jobs
     60         */
     61        size_t jobs_array_len;
     62        struct pthreadpool_job *jobs;
     63
     64        size_t head;
     65        size_t num_jobs;
    6166
    6267        /*
     
    112117        }
    113118
     119        pool->jobs_array_len = 4;
     120        pool->jobs = calloc(
     121                pool->jobs_array_len, sizeof(struct pthreadpool_job));
     122
     123        if (pool->jobs == NULL) {
     124                free(pool);
     125                return ENOMEM;
     126        }
     127
     128        pool->head = pool->num_jobs = 0;
     129
    114130        ret = pipe(pool->sig_pipe);
    115131        if (ret == -1) {
    116132                int err = errno;
     133                free(pool->jobs);
    117134                free(pool);
    118135                return err;
     
    123140                close(pool->sig_pipe[0]);
    124141                close(pool->sig_pipe[1]);
     142                free(pool->jobs);
    125143                free(pool);
    126144                return ret;
     
    132150                close(pool->sig_pipe[0]);
    133151                close(pool->sig_pipe[1]);
     152                free(pool->jobs);
    134153                free(pool);
    135154                return ret;
     
    137156
    138157        pool->shutdown = 0;
    139         pool->jobs = pool->last_job = NULL;
    140158        pool->num_threads = 0;
    141159        pool->num_exited = 0;
     
    150168                close(pool->sig_pipe[0]);
    151169                close(pool->sig_pipe[1]);
     170                free(pool->jobs);
    152171                free(pool);
    153172                return ret;
     
    187206        struct pthreadpool *pool;
    188207
    189         pool = DLIST_TAIL(pthreadpools);
    190 
    191         while (1) {
     208        for (pool = DLIST_TAIL(pthreadpools);
     209             pool != NULL;
     210             pool = DLIST_PREV(pool)) {
    192211                ret = pthread_mutex_unlock(&pool->mutex);
    193212                assert(ret == 0);
    194 
    195                 if (pool == pthreadpools) {
    196                         break;
    197                 }
    198                 pool = pool->prev;
    199213        }
    200214
     
    208222        struct pthreadpool *pool;
    209223
    210         pool = DLIST_TAIL(pthreadpools);
    211 
    212         while (1) {
     224        for (pool = DLIST_TAIL(pthreadpools);
     225             pool != NULL;
     226             pool = DLIST_PREV(pool)) {
     227
    213228                close(pool->sig_pipe[0]);
    214229                close(pool->sig_pipe[1]);
     
    224239
    225240                pool->num_idle = 0;
    226 
    227                 while (pool->jobs != NULL) {
    228                         struct pthreadpool_job *job;
    229                         job = pool->jobs;
    230                         pool->jobs = job->next;
    231                         free(job);
    232                 }
    233                 pool->last_job = NULL;
     241                pool->head = 0;
     242                pool->num_jobs = 0;
    234243
    235244                ret = pthread_mutex_unlock(&pool->mutex);
    236245                assert(ret == 0);
    237 
    238                 if (pool == pthreadpools) {
    239                         break;
    240                 }
    241                 pool = pool->prev;
    242246        }
    243247
     
    271275
    272276        for (i=0; i<pool->num_exited; i++) {
    273                 pthread_join(pool->exited[i], NULL);
     277                int ret;
     278
     279                ret = pthread_join(pool->exited[i], NULL);
     280                if (ret != 0) {
     281                        /*
     282                         * Severe internal error, we can't do much but
     283                         * abort here.
     284                         */
     285                        abort();
     286                }
    274287        }
    275288        pool->num_exited = 0;
     
    285298 */
    286299
    287 int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
    288 {
    289         int ret_jobid;
    290         ssize_t nread;
     300int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
     301                              unsigned num_jobids)
     302{
     303        ssize_t to_read, nread;
    291304
    292305        nread = -1;
    293306        errno = EINTR;
    294307
     308        to_read = sizeof(int) * num_jobids;
     309
    295310        while ((nread == -1) && (errno == EINTR)) {
    296                 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
     311                nread = read(pool->sig_pipe[0], jobids, to_read);
    297312        }
    298313        if (nread == -1) {
    299                 return errno;
    300         }
    301         if (nread != sizeof(int)) {
    302                 return EINVAL;
    303         }
    304         *jobid = ret_jobid;
    305         return 0;
     314                return -errno;
     315        }
     316        if ((nread % sizeof(int)) != 0) {
     317                return -EINVAL;
     318        }
     319        return nread / sizeof(int);
    306320}
    307321
     
    319333        }
    320334
    321         if ((pool->jobs != NULL) || pool->shutdown) {
     335        if ((pool->num_jobs != 0) || pool->shutdown) {
    322336                ret = pthread_mutex_unlock(&pool->mutex);
    323337                assert(ret == 0);
     
    334348                if (pool->num_idle > 0) {
    335349                        /*
    336                          * Wake the idle threads. They will find pool->quit to
    337                          * be set and exit themselves
     350                         * Wake the idle threads. They will find
     351                         * pool->shutdown to be set and exit themselves
    338352                         */
    339353                        ret = pthread_cond_broadcast(&pool->condvar);
     
    391405
    392406        free(pool->exited);
     407        free(pool->jobs);
    393408        free(pool);
    394409
     
    406421
    407422        exited = (pthread_t *)realloc(
    408                 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
     423                pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
    409424
    410425        if (exited == NULL) {
     
    418433}
    419434
     435static bool pthreadpool_get_job(struct pthreadpool *p,
     436                                struct pthreadpool_job *job)
     437{
     438        if (p->num_jobs == 0) {
     439                return false;
     440        }
     441        *job = p->jobs[p->head];
     442        p->head = (p->head+1) % p->jobs_array_len;
     443        p->num_jobs -= 1;
     444        return true;
     445}
     446
     447static bool pthreadpool_put_job(struct pthreadpool *p,
     448                                int id,
     449                                void (*fn)(void *private_data),
     450                                void *private_data)
     451{
     452        struct pthreadpool_job *job;
     453
     454        if (p->num_jobs == p->jobs_array_len) {
     455                struct pthreadpool_job *tmp;
     456                size_t new_len = p->jobs_array_len * 2;
     457
     458                tmp = realloc(
     459                        p->jobs, sizeof(struct pthreadpool_job) * new_len);
     460                if (tmp == NULL) {
     461                        return false;
     462                }
     463                p->jobs = tmp;
     464
     465                /*
     466                 * We just doubled the jobs array. The array implements a FIFO
     467                 * queue with a modulo-based wraparound, so we have to memcpy
     468                 * the jobs that are logically at the queue end but physically
     469                 * before the queue head into the reallocated area. The new
     470                 * space starts at the current jobs_array_len, and we have to
     471                 * copy everything before the current head job into the new
     472                 * area.
     473                 */
     474                memcpy(&p->jobs[p->jobs_array_len], p->jobs,
     475                       sizeof(struct pthreadpool_job) * p->head);
     476
     477                p->jobs_array_len = new_len;
     478        }
     479
     480        job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
     481        job->id = id;
     482        job->fn = fn;
     483        job->private_data = private_data;
     484
     485        p->num_jobs += 1;
     486
     487        return true;
     488}
     489
    420490static void *pthreadpool_server(void *arg)
    421491{
     
    429499
    430500        while (1) {
    431                 struct timeval tv;
    432501                struct timespec ts;
    433                 struct pthreadpool_job *job;
     502                struct pthreadpool_job job;
    434503
    435504                /*
     
    438507                 */
    439508
    440                 gettimeofday(&tv, NULL);
    441                 ts.tv_sec = tv.tv_sec + 1;
    442                 ts.tv_nsec = tv.tv_usec*1000;
    443 
    444                 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
     509                clock_gettime(CLOCK_REALTIME, &ts);
     510                ts.tv_sec += 1;
     511
     512                while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
    445513
    446514                        pool->num_idle += 1;
     
    451519                        if (res == ETIMEDOUT) {
    452520
    453                                 if (pool->jobs == NULL) {
     521                                if (pool->num_jobs == 0) {
    454522                                        /*
    455523                                         * we timed out and still no work for
     
    466534                }
    467535
    468                 job = pool->jobs;
    469 
    470                 if (job != NULL) {
     536                if (pthreadpool_get_job(pool, &job)) {
    471537                        ssize_t written;
    472 
    473                         /*
    474                          * Ok, there's work for us to do, remove the job from
    475                          * the pthreadpool list
    476                          */
    477                         pool->jobs = job->next;
    478                         if (pool->last_job == job) {
    479                                 pool->last_job = NULL;
    480                         }
     538                        int sig_pipe = pool->sig_pipe[1];
    481539
    482540                        /*
     
    487545                        assert(res == 0);
    488546
    489                         job->fn(job->private_data);
    490 
    491                         written = write(pool->sig_pipe[1], &job->id,
    492                                         sizeof(int));
    493 
    494                         free(job);
     547                        job.fn(job.private_data);
    495548
    496549                        res = pthread_mutex_lock(&pool->mutex);
    497550                        assert(res == 0);
    498551
     552                        written = write(sig_pipe, &job.id, sizeof(job.id));
    499553                        if (written != sizeof(int)) {
    500554                                pthreadpool_server_exit(pool);
     
    504558                }
    505559
    506                 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
     560                if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
    507561                        /*
    508562                         * No more work to do and we're asked to shut down, so
     
    528582                        void (*fn)(void *private_data), void *private_data)
    529583{
    530         struct pthreadpool_job *job;
    531584        pthread_t thread_id;
    532585        int res;
    533586        sigset_t mask, omask;
    534587
    535         job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
    536         if (job == NULL) {
    537                 return ENOMEM;
    538         }
    539 
    540         job->fn = fn;
    541         job->private_data = private_data;
    542         job->id = job_id;
    543         job->next = NULL;
    544 
    545588        res = pthread_mutex_lock(&pool->mutex);
    546589        if (res != 0) {
    547                 free(job);
    548590                return res;
    549591        }
     
    556598                res = pthread_mutex_unlock(&pool->mutex);
    557599                assert(res == 0);
    558                 free(job);
    559600                return EINVAL;
    560601        }
     
    568609         * Add job to the end of the queue
    569610         */
    570         if (pool->jobs == NULL) {
    571                 pool->jobs = job;
    572         }
    573         else {
    574                 pool->last_job->next = job;
    575         }
    576         pool->last_job = job;
     611        if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
     612                pthread_mutex_unlock(&pool->mutex);
     613                return ENOMEM;
     614        }
    577615
    578616        if (pool->num_idle > 0) {
  • vendor/current/source3/lib/pthreadpool/pthreadpool.h

    r746 r988  
    6262 * This adds a job to a pthreadpool. The job can be identified by
    6363 * job_id. This integer will be returned from
    64  * pthreadpool_finished_job() then the job is completed.
     64 * pthreadpool_finished_jobs() then the job is completed.
    6565 *
    6666 * @param[in]   pool            The pool to run the job on
     
    7676 * @brief Get the signalling fd from a pthreadpool
    7777 *
    78  * Completion of a job is indicated by readability of the fd retuned
     78 * Completion of a job is indicated by readability of the fd returned
    7979 * by pthreadpool_signal_fd().
    8080 *
     
    8585
    8686/**
    87  * @brief Get the job_id of a finished job
     87 * @brief Get the job_ids of finished jobs
    8888 *
    8989 * This blocks until a job has finished unless the fd returned by
     
    9191 *
    9292 * @param[in]   pool            The pool to query for finished jobs
    93  * @param[out]  pjobid          The job_id of the finished job
    94  * @return                      success: 0, failure: errno
     93 * @param[out]  jobids          The job_ids of the finished job
     94 * @param[int]  num_jobids      The job_ids array size
     95 * @return                      success: >=0, number of finished jobs
     96 *                              failure: -errno
    9597 */
    96 int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid);
     98int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
     99                              unsigned num_jobids);
    97100
    98101#endif
  • vendor/current/source3/lib/pthreadpool/tests.c

    r746 r988  
    66#include <pthread.h>
    77#include <unistd.h>
     8#include <sys/types.h>
     9#include <sys/wait.h>
    810#include "pthreadpool.h"
    911
     
    7072        for (i=0; i<num_jobs; i++) {
    7173                int jobid = -1;
    72                 ret = pthreadpool_finished_job(p, &jobid);
    73                 if ((ret != 0) || (jobid >= num_jobs)) {
     74                ret = pthreadpool_finished_jobs(p, &jobid, 1);
     75                if ((ret != 1) || (jobid >= num_jobs)) {
    7476                        fprintf(stderr, "invalid job number %d\n", jobid);
    7577                        return -1;
     
    283285                        }
    284286
    285                         ret = pthreadpool_finished_job(pools[j], &jobid);
    286                         if ((ret != 0) || (jobid >= num_jobs * num_threads)) {
     287                        ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
     288                        if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
    287289                                fprintf(stderr, "invalid job number %d\n",
    288290                                        jobid);
     
    319321}
    320322
     323static int test_fork(void)
     324{
     325        struct pthreadpool *p;
     326        pid_t child, waited;
     327        int status, ret;
     328
     329        ret = pthreadpool_init(1, &p);
     330        if (ret != 0) {
     331                fprintf(stderr, "pthreadpool_init failed: %s\n",
     332                        strerror(ret));
     333                return -1;
     334        }
     335        ret = pthreadpool_destroy(p);
     336        if (ret != 0) {
     337                fprintf(stderr, "pthreadpool_destroy failed: %s\n",
     338                        strerror(ret));
     339                return -1;
     340        }
     341
     342        child = fork();
     343        if (child < 0) {
     344                perror("fork failed");
     345                return -1;
     346        }
     347        if (child == 0) {
     348                exit(0);
     349        }
     350        waited = wait(&status);
     351        if (waited == -1) {
     352                perror("wait failed");
     353                return -1;
     354        }
     355        if (waited != child) {
     356                fprintf(stderr, "expected child %d, got %d\n",
     357                        (int)child, (int)waited);
     358                return -1;
     359        }
     360        return 0;
     361}
     362
    321363int main(void)
    322364{
     
    326368        if (ret != 0) {
    327369                fprintf(stderr, "test_init failed\n");
     370                return 1;
     371        }
     372
     373        ret = test_fork();
     374        if (ret != 0) {
     375                fprintf(stderr, "test_fork failed\n");
    328376                return 1;
    329377        }
Note: See TracChangeset for help on using the changeset viewer.