Ignore:
Timestamp:
Nov 24, 2016, 1:14:11 PM (9 years ago)
Author:
Silvan Scherrer
Message:

Samba Server: update vendor to version 4.4.3

File:
1 edited

Legend:

Unmodified
Added
Removed
  • vendor/current/source3/lib/messages.c

    r740 r988  
    66   Copyright (C) 2002 by Jeremy Allison
    77   Copyright (C) 2007 by Volker Lendecke
    8    
     8
    99   This program is free software; you can redistribute it and/or modify
    1010   it under the terms of the GNU General Public License as published by
    1111   the Free Software Foundation; either version 3 of the License, or
    1212   (at your option) any later version.
    13    
     13
    1414   This program is distributed in the hope that it will be useful,
    1515   but WITHOUT ANY WARRANTY; without even the implied warranty of
    1616   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    1717   GNU General Public License for more details.
    18    
     18
    1919   You should have received a copy of the GNU General Public License
    2020   along with this program.  If not, see <http://www.gnu.org/licenses/>.
     
    2525  @{
    2626  @file messages.c
    27  
     27
    2828  @brief  Module for internal messaging between Samba daemons.
    2929
     
    4747
    4848#include "includes.h"
    49 #include "dbwrap.h"
     49#include "dbwrap/dbwrap.h"
    5050#include "serverid.h"
    5151#include "messages.h"
     52#include "lib/util/tevent_unix.h"
     53#include "lib/background.h"
     54#include "lib/messages_dgm.h"
     55#include "lib/util/iov_buf.h"
     56#include "lib/util/server_id_db.h"
     57#include "lib/messages_dgm_ref.h"
     58#include "lib/messages_util.h"
    5259
    5360struct messaging_callback {
    5461        struct messaging_callback *prev, *next;
    55         uint32 msg_type;
     62        uint32_t msg_type;
    5663        void (*fn)(struct messaging_context *msg, void *private_data,
    5764                   uint32_t msg_type,
     
    6067};
    6168
     69struct messaging_context {
     70        struct server_id id;
     71        struct tevent_context *event_ctx;
     72        struct messaging_callback *callbacks;
     73
     74        struct tevent_req **new_waiters;
     75        unsigned num_new_waiters;
     76
     77        struct tevent_req **waiters;
     78        unsigned num_waiters;
     79
     80        void *msg_dgm_ref;
     81        struct messaging_backend *remote;
     82
     83        struct server_id_db *names_db;
     84};
     85
     86static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
     87                                   struct messaging_rec *rec);
     88
    6289/****************************************************************************
    6390 A useful function for testing the message system.
     
    7097                         DATA_BLOB *data)
    7198{
    72         const char *msg = data->data ? (const char *)data->data : "none";
    73 
    74         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
    75                  procid_str_static(&src), msg));
     99        struct server_id_buf idbuf;
     100
     101        DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
     102                  server_id_str_buf(src, &idbuf), (int)data->length,
     103                  data->data ? (char *)data->data : ""));
     104
    76105        messaging_send(msg_ctx, src, MSG_PONG, data);
    77106}
     
    87116        struct messaging_context *msg_ctx;
    88117        int msg_type;
    89         uint32 msg_flag;
     118        uint32_t msg_flag;
    90119        const void *buf;
    91120        size_t len;
     
    113142
    114143        status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
    115                                     (uint8 *)msg_all->buf, msg_all->len);
     144                                    (const uint8_t *)msg_all->buf, msg_all->len);
    116145
    117146        if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
    118                
    119                 /* If the pid was not found delete the entry from connections.tdb */
    120 
    121                 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
    122 
    123                 rec->delete_rec(rec);
     147                struct server_id_buf idbuf;
     148
     149                /*
     150                 * If the pid was not found delete the entry from
     151                 * serverid.tdb
     152                 */
     153
     154                DEBUG(2, ("pid %s doesn't exist\n",
     155                          server_id_str_buf(*id, &idbuf)));
     156
     157                dbwrap_record_delete(rec);
    124158        }
    125159        msg_all->n_sent++;
     
    147181
    148182        msg_all.msg_type = msg_type;
    149         if (msg_type < 1000)
     183        if (msg_type < 0x100) {
    150184                msg_all.msg_flag = FLAG_MSG_GENERAL;
    151         else if (msg_type > 1000 && msg_type < 2000)
     185        } else if (msg_type > 0x100 && msg_type < 0x200) {
    152186                msg_all.msg_flag = FLAG_MSG_NMBD;
    153         else if (msg_type > 2000 && msg_type < 2100)
    154                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
    155         else if (msg_type > 2100 && msg_type < 3000)
     187        } else if (msg_type > 0x200 && msg_type < 0x300) {
    156188                msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
    157         else if (msg_type > 3000 && msg_type < 4000)
     189        } else if (msg_type > 0x300 && msg_type < 0x400) {
    158190                msg_all.msg_flag = FLAG_MSG_SMBD;
    159         else if (msg_type > 4000 && msg_type < 5000)
     191        } else if (msg_type > 0x400 && msg_type < 0x600) {
     192                msg_all.msg_flag = FLAG_MSG_WINBIND;
     193        } else if (msg_type > 4000 && msg_type < 5000) {
    160194                msg_all.msg_flag = FLAG_MSG_DBWRAP;
    161         else
    162                 return False;
     195        } else {
     196                return false;
     197        }
    163198
    164199        msg_all.buf = buf;
     
    170205        if (n_sent)
    171206                *n_sent = msg_all.n_sent;
    172         return True;
    173 }
    174 
    175 struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
    176 {
    177         return msg_ctx->event_ctx;
     207        return true;
     208}
     209
     210static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
     211                              int *fds, size_t num_fds,
     212                              void *private_data)
     213{
     214        struct messaging_context *msg_ctx = talloc_get_type_abort(
     215                private_data, struct messaging_context);
     216        struct server_id_buf idbuf;
     217        struct messaging_rec rec;
     218        int64_t fds64[MIN(num_fds, INT8_MAX)];
     219        size_t i;
     220
     221        if (msg_len < MESSAGE_HDR_LENGTH) {
     222                DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
     223                goto close_fail;
     224        }
     225
     226        if (num_fds > INT8_MAX) {
     227                DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
     228                goto close_fail;
     229        }
     230
     231        /*
     232         * "consume" the fds by copying them and setting
     233         * the original variable to -1
     234         */
     235        for (i=0; i < num_fds; i++) {
     236                fds64[i] = fds[i];
     237                fds[i] = -1;
     238        }
     239
     240        rec = (struct messaging_rec) {
     241                .msg_version = MESSAGE_VERSION,
     242                .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
     243                .buf.length = msg_len - MESSAGE_HDR_LENGTH,
     244                .num_fds = num_fds,
     245                .fds = fds64,
     246        };
     247
     248        message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
     249
     250        DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
     251                   __func__, (unsigned)rec.msg_type,
     252                   (unsigned)rec.buf.length,
     253                   (unsigned)num_fds,
     254                   server_id_str_buf(rec.src, &idbuf)));
     255
     256        messaging_dispatch_rec(msg_ctx, &rec);
     257        return;
     258
     259close_fail:
     260        for (i=0; i < num_fds; i++) {
     261                close(fds[i]);
     262        }
     263}
     264
     265static int messaging_context_destructor(struct messaging_context *ctx)
     266{
     267        unsigned i;
     268
     269        for (i=0; i<ctx->num_new_waiters; i++) {
     270                if (ctx->new_waiters[i] != NULL) {
     271                        tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
     272                        ctx->new_waiters[i] = NULL;
     273                }
     274        }
     275        for (i=0; i<ctx->num_waiters; i++) {
     276                if (ctx->waiters[i] != NULL) {
     277                        tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
     278                        ctx->waiters[i] = NULL;
     279                }
     280        }
     281
     282        return 0;
     283}
     284
     285static const char *private_path(const char *name)
     286{
     287        return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
    178288}
    179289
    180290struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
    181                                          struct server_id server_id,
    182                                          struct event_context *ev)
     291                                         struct tevent_context *ev)
    183292{
    184293        struct messaging_context *ctx;
    185         NTSTATUS status;
    186 
    187         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
     294        int ret;
     295        const char *lck_path;
     296        const char *priv_path;
     297        bool ok;
     298
     299        if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
    188300                return NULL;
    189301        }
    190302
    191         ctx->id = server_id;
     303        ctx->id = (struct server_id) {
     304                .pid = getpid(), .vnn = NONCLUSTER_VNN
     305        };
     306
    192307        ctx->event_ctx = ev;
    193308
    194         status = messaging_tdb_init(ctx, ctx, &ctx->local);
    195 
    196         if (!NT_STATUS_IS_OK(status)) {
    197                 DEBUG(2, ("messaging_tdb_init failed: %s\n",
    198                           nt_errstr(status)));
     309        sec_init();
     310
     311        lck_path = lock_path("msg.lock");
     312        if (lck_path == NULL) {
    199313                TALLOC_FREE(ctx);
    200314                return NULL;
    201315        }
    202316
    203 #ifdef CLUSTER_SUPPORT
     317        ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
     318                                              0755);
     319        if (!ok) {
     320                DEBUG(10, ("%s: Could not create lock directory: %s\n",
     321                           __func__, strerror(errno)));
     322                TALLOC_FREE(ctx);
     323                return NULL;
     324        }
     325
     326        priv_path = private_path("msg.sock");
     327        if (priv_path == NULL) {
     328                TALLOC_FREE(ctx);
     329                return NULL;
     330        }
     331
     332        ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
     333                                              0700);
     334        if (!ok) {
     335                DEBUG(10, ("%s: Could not create msg directory: %s\n",
     336                           __func__, strerror(errno)));
     337                TALLOC_FREE(ctx);
     338                return NULL;
     339        }
     340
     341        ctx->msg_dgm_ref = messaging_dgm_ref(
     342                ctx, ctx->event_ctx, &ctx->id.unique_id,
     343                priv_path, lck_path, messaging_recv_cb, ctx, &ret);
     344
     345        if (ctx->msg_dgm_ref == NULL) {
     346                DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
     347                TALLOC_FREE(ctx);
     348                return NULL;
     349        }
     350
     351        talloc_set_destructor(ctx, messaging_context_destructor);
     352
    204353        if (lp_clustering()) {
    205                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
    206 
    207                 if (!NT_STATUS_IS_OK(status)) {
    208                         DEBUG(2, ("messaging_ctdb_init failed: %s\n",
    209                                   nt_errstr(status)));
     354                ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
     355
     356                if (ret != 0) {
     357                        DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
     358                                  strerror(ret)));
    210359                        TALLOC_FREE(ctx);
    211360                        return NULL;
     
    213362        }
    214363        ctx->id.vnn = get_my_vnn();
    215 #endif
     364
     365        ctx->names_db = server_id_db_init(
     366                ctx, ctx->id, lp_lock_directory(), 0,
     367                TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
     368        if (ctx->names_db == NULL) {
     369                DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
     370                TALLOC_FREE(ctx);
     371                return NULL;
     372        }
    216373
    217374        messaging_register(ctx, NULL, MSG_PING, ping_message);
     
    234391 * re-init after a fork
    235392 */
    236 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx,
    237                           struct server_id id)
    238 {
    239         NTSTATUS status;
    240 
    241         TALLOC_FREE(msg_ctx->local);
    242 
    243         msg_ctx->id = id;
    244 
    245         status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local);
    246         if (!NT_STATUS_IS_OK(status)) {
    247                 DEBUG(0, ("messaging_tdb_init failed: %s\n",
    248                           nt_errstr(status)));
    249                 return status;
    250         }
    251 
    252 #ifdef CLUSTER_SUPPORT
     393NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
     394{
     395        int ret;
     396
     397        TALLOC_FREE(msg_ctx->msg_dgm_ref);
     398
     399        msg_ctx->id = (struct server_id) {
     400                .pid = getpid(), .vnn = msg_ctx->id.vnn
     401        };
     402
     403        msg_ctx->msg_dgm_ref = messaging_dgm_ref(
     404                msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
     405                private_path("msg.sock"), lock_path("msg.lock"),
     406                messaging_recv_cb, msg_ctx, &ret);
     407
     408        if (msg_ctx->msg_dgm_ref == NULL) {
     409                DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
     410                return map_nt_error_from_unix(ret);
     411        }
     412
    253413        TALLOC_FREE(msg_ctx->remote);
    254414
    255415        if (lp_clustering()) {
    256                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
    257                                               &msg_ctx->remote);
    258 
    259                 if (!NT_STATUS_IS_OK(status)) {
    260                         DEBUG(1, ("messaging_ctdb_init failed: %s\n",
    261                                   nt_errstr(status)));
    262                         return status;
    263                 }
    264         }
    265 
    266 #endif
     416                ret = messaging_ctdbd_init(msg_ctx, msg_ctx,
     417                                           &msg_ctx->remote);
     418
     419                if (ret != 0) {
     420                        DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
     421                                  strerror(ret)));
     422                        return map_nt_error_from_unix(ret);
     423                }
     424        }
     425
     426        server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
    267427
    268428        return NT_STATUS_OK;
     
    284444{
    285445        struct messaging_callback *cb;
     446
     447        DEBUG(5, ("Registering messaging pointer for type %u - "
     448                  "private_data=%p\n",
     449                  (unsigned)msg_type, private_data));
    286450
    287451        /*
     
    344508                        const DATA_BLOB *data)
    345509{
    346 #ifdef CLUSTER_SUPPORT
    347         if (!procid_is_local(&server)) {
    348                 return msg_ctx->remote->send_fn(msg_ctx, server,
    349                                                 msg_type, data,
    350                                                 msg_ctx->remote);
    351         }
    352 #endif
    353         return msg_ctx->local->send_fn(msg_ctx, server, msg_type, data,
    354                                        msg_ctx->local);
     510        struct iovec iov;
     511
     512        iov.iov_base = data->data;
     513        iov.iov_len = data->length;
     514
     515        return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
    355516}
    356517
    357518NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
    358519                            struct server_id server, uint32_t msg_type,
    359                             const uint8 *buf, size_t len)
     520                            const uint8_t *buf, size_t len)
    360521{
    361522        DATA_BLOB blob = data_blob_const(buf, len);
    362523        return messaging_send(msg_ctx, server, msg_type, &blob);
     524}
     525
     526int messaging_send_iov_from(struct messaging_context *msg_ctx,
     527                            struct server_id src, struct server_id dst,
     528                            uint32_t msg_type,
     529                            const struct iovec *iov, int iovlen,
     530                            const int *fds, size_t num_fds)
     531{
     532        int ret;
     533        uint8_t hdr[MESSAGE_HDR_LENGTH];
     534        struct iovec iov2[iovlen+1];
     535
     536        if (server_id_is_disconnected(&dst)) {
     537                return EINVAL;
     538        }
     539
     540        if (num_fds > INT8_MAX) {
     541                return EINVAL;
     542        }
     543
     544        if (!procid_is_local(&dst)) {
     545                if (num_fds > 0) {
     546                        return ENOSYS;
     547                }
     548
     549                ret = msg_ctx->remote->send_fn(src, dst,
     550                                               msg_type, iov, iovlen,
     551                                               NULL, 0,
     552                                               msg_ctx->remote);
     553                return ret;
     554        }
     555
     556        message_hdr_put(hdr, msg_type, src, dst);
     557        iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
     558        memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
     559
     560        become_root();
     561        ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
     562        unbecome_root();
     563
     564        return ret;
     565}
     566
     567NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
     568                            struct server_id dst, uint32_t msg_type,
     569                            const struct iovec *iov, int iovlen,
     570                            const int *fds, size_t num_fds)
     571{
     572        int ret;
     573
     574        ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
     575                                      iov, iovlen, fds, num_fds);
     576        if (ret != 0) {
     577                return map_nt_error_from_unix(ret);
     578        }
     579        return NT_STATUS_OK;
     580}
     581
     582static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
     583                                               struct messaging_rec *rec)
     584{
     585        struct messaging_rec *result;
     586        size_t fds_size = sizeof(int64_t) * rec->num_fds;
     587
     588        result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
     589                                      rec->buf.length + fds_size);
     590        if (result == NULL) {
     591                return NULL;
     592        }
     593        *result = *rec;
     594
     595        /* Doesn't fail, see talloc_pooled_object */
     596
     597        result->buf.data = talloc_memdup(result, rec->buf.data,
     598                                         rec->buf.length);
     599
     600        result->fds = NULL;
     601        if (result->num_fds > 0) {
     602                result->fds = talloc_memdup(result, rec->fds, fds_size);
     603        }
     604
     605        return result;
     606}
     607
     608struct messaging_filtered_read_state {
     609        struct tevent_context *ev;
     610        struct messaging_context *msg_ctx;
     611        void *tevent_handle;
     612
     613        bool (*filter)(struct messaging_rec *rec, void *private_data);
     614        void *private_data;
     615
     616        struct messaging_rec *rec;
     617};
     618
     619static void messaging_filtered_read_cleanup(struct tevent_req *req,
     620                                            enum tevent_req_state req_state);
     621
     622struct tevent_req *messaging_filtered_read_send(
     623        TALLOC_CTX *mem_ctx, struct tevent_context *ev,
     624        struct messaging_context *msg_ctx,
     625        bool (*filter)(struct messaging_rec *rec, void *private_data),
     626        void *private_data)
     627{
     628        struct tevent_req *req;
     629        struct messaging_filtered_read_state *state;
     630        size_t new_waiters_len;
     631
     632        req = tevent_req_create(mem_ctx, &state,
     633                                struct messaging_filtered_read_state);
     634        if (req == NULL) {
     635                return NULL;
     636        }
     637        state->ev = ev;
     638        state->msg_ctx = msg_ctx;
     639        state->filter = filter;
     640        state->private_data = private_data;
     641
     642        /*
     643         * We have to defer the callback here, as we might be called from
     644         * within a different tevent_context than state->ev
     645         */
     646        tevent_req_defer_callback(req, state->ev);
     647
     648        state->tevent_handle = messaging_dgm_register_tevent_context(
     649                state, ev);
     650        if (tevent_req_nomem(state->tevent_handle, req)) {
     651                return tevent_req_post(req, ev);
     652        }
     653
     654        /*
     655         * We add ourselves to the "new_waiters" array, not the "waiters"
     656         * array. If we are called from within messaging_read_done,
     657         * messaging_dispatch_rec will be in an active for-loop on
     658         * "waiters". We must be careful not to mess with this array, because
     659         * it could mean that a single event is being delivered twice.
     660         */
     661
     662        new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
     663
     664        if (new_waiters_len == msg_ctx->num_new_waiters) {
     665                struct tevent_req **tmp;
     666
     667                tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
     668                                     struct tevent_req *, new_waiters_len+1);
     669                if (tevent_req_nomem(tmp, req)) {
     670                        return tevent_req_post(req, ev);
     671                }
     672                msg_ctx->new_waiters = tmp;
     673        }
     674
     675        msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
     676        msg_ctx->num_new_waiters += 1;
     677        tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
     678
     679        return req;
     680}
     681
     682static void messaging_filtered_read_cleanup(struct tevent_req *req,
     683                                            enum tevent_req_state req_state)
     684{
     685        struct messaging_filtered_read_state *state = tevent_req_data(
     686                req, struct messaging_filtered_read_state);
     687        struct messaging_context *msg_ctx = state->msg_ctx;
     688        unsigned i;
     689
     690        tevent_req_set_cleanup_fn(req, NULL);
     691
     692        TALLOC_FREE(state->tevent_handle);
     693
     694        /*
     695         * Just set the [new_]waiters entry to NULL, be careful not to mess
     696         * with the other "waiters" array contents. We are often called from
     697         * within "messaging_dispatch_rec", which loops over
     698         * "waiters". Messing with the "waiters" array will mess up that
     699         * for-loop.
     700         */
     701
     702        for (i=0; i<msg_ctx->num_waiters; i++) {
     703                if (msg_ctx->waiters[i] == req) {
     704                        msg_ctx->waiters[i] = NULL;
     705                        return;
     706                }
     707        }
     708
     709        for (i=0; i<msg_ctx->num_new_waiters; i++) {
     710                if (msg_ctx->new_waiters[i] == req) {
     711                        msg_ctx->new_waiters[i] = NULL;
     712                        return;
     713                }
     714        }
     715}
     716
     717static void messaging_filtered_read_done(struct tevent_req *req,
     718                                         struct messaging_rec *rec)
     719{
     720        struct messaging_filtered_read_state *state = tevent_req_data(
     721                req, struct messaging_filtered_read_state);
     722
     723        state->rec = messaging_rec_dup(state, rec);
     724        if (tevent_req_nomem(state->rec, req)) {
     725                return;
     726        }
     727        tevent_req_done(req);
     728}
     729
     730int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
     731                                 struct messaging_rec **presult)
     732{
     733        struct messaging_filtered_read_state *state = tevent_req_data(
     734                req, struct messaging_filtered_read_state);
     735        int err;
     736
     737        if (tevent_req_is_unix_error(req, &err)) {
     738                tevent_req_received(req);
     739                return err;
     740        }
     741        *presult = talloc_move(mem_ctx, &state->rec);
     742        return 0;
     743}
     744
     745struct messaging_read_state {
     746        uint32_t msg_type;
     747        struct messaging_rec *rec;
     748};
     749
     750static bool messaging_read_filter(struct messaging_rec *rec,
     751                                  void *private_data);
     752static void messaging_read_done(struct tevent_req *subreq);
     753
     754struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
     755                                       struct tevent_context *ev,
     756                                       struct messaging_context *msg,
     757                                       uint32_t msg_type)
     758{
     759        struct tevent_req *req, *subreq;
     760        struct messaging_read_state *state;
     761
     762        req = tevent_req_create(mem_ctx, &state,
     763                                struct messaging_read_state);
     764        if (req == NULL) {
     765                return NULL;
     766        }
     767        state->msg_type = msg_type;
     768
     769        subreq = messaging_filtered_read_send(state, ev, msg,
     770                                              messaging_read_filter, state);
     771        if (tevent_req_nomem(subreq, req)) {
     772                return tevent_req_post(req, ev);
     773        }
     774        tevent_req_set_callback(subreq, messaging_read_done, req);
     775        return req;
     776}
     777
     778static bool messaging_read_filter(struct messaging_rec *rec,
     779                                  void *private_data)
     780{
     781        struct messaging_read_state *state = talloc_get_type_abort(
     782                private_data, struct messaging_read_state);
     783
     784        if (rec->num_fds != 0) {
     785                return false;
     786        }
     787
     788        return rec->msg_type == state->msg_type;
     789}
     790
     791static void messaging_read_done(struct tevent_req *subreq)
     792{
     793        struct tevent_req *req = tevent_req_callback_data(
     794                subreq, struct tevent_req);
     795        struct messaging_read_state *state = tevent_req_data(
     796                req, struct messaging_read_state);
     797        int ret;
     798
     799        ret = messaging_filtered_read_recv(subreq, state, &state->rec);
     800        TALLOC_FREE(subreq);
     801        if (tevent_req_error(req, ret)) {
     802                return;
     803        }
     804        tevent_req_done(req);
     805}
     806
     807int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
     808                        struct messaging_rec **presult)
     809{
     810        struct messaging_read_state *state = tevent_req_data(
     811                req, struct messaging_read_state);
     812        int err;
     813
     814        if (tevent_req_is_unix_error(req, &err)) {
     815                return err;
     816        }
     817        if (presult != NULL) {
     818                *presult = talloc_move(mem_ctx, &state->rec);
     819        }
     820        return 0;
     821}
     822
     823struct messaging_handler_state {
     824        struct tevent_context *ev;
     825        struct messaging_context *msg_ctx;
     826        uint32_t msg_type;
     827        bool (*handler)(struct messaging_context *msg_ctx,
     828                        struct messaging_rec **rec, void *private_data);
     829        void *private_data;
     830};
     831
     832static void messaging_handler_got_msg(struct tevent_req *subreq);
     833
     834struct tevent_req *messaging_handler_send(
     835        TALLOC_CTX *mem_ctx, struct tevent_context *ev,
     836        struct messaging_context *msg_ctx, uint32_t msg_type,
     837        bool (*handler)(struct messaging_context *msg_ctx,
     838                        struct messaging_rec **rec, void *private_data),
     839        void *private_data)
     840{
     841        struct tevent_req *req, *subreq;
     842        struct messaging_handler_state *state;
     843
     844        req = tevent_req_create(mem_ctx, &state,
     845                                struct messaging_handler_state);
     846        if (req == NULL) {
     847                return NULL;
     848        }
     849        state->ev = ev;
     850        state->msg_ctx = msg_ctx;
     851        state->msg_type = msg_type;
     852        state->handler = handler;
     853        state->private_data = private_data;
     854
     855        subreq = messaging_read_send(state, state->ev, state->msg_ctx,
     856                                     state->msg_type);
     857        if (tevent_req_nomem(subreq, req)) {
     858                return tevent_req_post(req, ev);
     859        }
     860        tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
     861        return req;
     862}
     863
     864static void messaging_handler_got_msg(struct tevent_req *subreq)
     865{
     866        struct tevent_req *req = tevent_req_callback_data(
     867                subreq, struct tevent_req);
     868        struct messaging_handler_state *state = tevent_req_data(
     869                req, struct messaging_handler_state);
     870        struct messaging_rec *rec;
     871        int ret;
     872        bool ok;
     873
     874        ret = messaging_read_recv(subreq, state, &rec);
     875        TALLOC_FREE(subreq);
     876        if (tevent_req_error(req, ret)) {
     877                return;
     878        }
     879
     880        subreq = messaging_read_send(state, state->ev, state->msg_ctx,
     881                                     state->msg_type);
     882        if (tevent_req_nomem(subreq, req)) {
     883                return;
     884        }
     885        tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
     886
     887        ok = state->handler(state->msg_ctx, &rec, state->private_data);
     888        TALLOC_FREE(rec);
     889        if (ok) {
     890                /*
     891                 * Next round
     892                 */
     893                return;
     894        }
     895        TALLOC_FREE(subreq);
     896        tevent_req_done(req);
     897}
     898
     899int messaging_handler_recv(struct tevent_req *req)
     900{
     901        return tevent_req_simple_recv_unix(req);
     902}
     903
     904static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
     905{
     906        if (msg_ctx->num_new_waiters == 0) {
     907                return true;
     908        }
     909
     910        if (talloc_array_length(msg_ctx->waiters) <
     911            (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
     912                struct tevent_req **tmp;
     913                tmp = talloc_realloc(
     914                        msg_ctx, msg_ctx->waiters, struct tevent_req *,
     915                        msg_ctx->num_waiters + msg_ctx->num_new_waiters);
     916                if (tmp == NULL) {
     917                        DEBUG(1, ("%s: talloc failed\n", __func__));
     918                        return false;
     919                }
     920                msg_ctx->waiters = tmp;
     921        }
     922
     923        memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
     924               sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
     925
     926        msg_ctx->num_waiters += msg_ctx->num_new_waiters;
     927        msg_ctx->num_new_waiters = 0;
     928
     929        return true;
    363930}
    364931
     
    366933  Dispatch one messaging_rec
    367934*/
    368 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
    369                             struct messaging_rec *rec)
     935static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
     936                                   struct messaging_rec *rec)
    370937{
    371938        struct messaging_callback *cb, *next;
     939        unsigned i;
     940        size_t j;
    372941
    373942        for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
    374943                next = cb->next;
    375                 if (cb->msg_type == rec->msg_type) {
    376                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
    377                                rec->src, &rec->buf);
    378                         /* we continue looking for matching messages
    379                            after finding one. This matters for
    380                            subsystems like the internal notify code
    381                            which register more than one handler for
    382                            the same message type */
    383                 }
    384         }
    385         return;
     944                if (cb->msg_type != rec->msg_type) {
     945                        continue;
     946                }
     947
     948                /*
     949                 * the old style callbacks don't support fd passing
     950                 */
     951                for (j=0; j < rec->num_fds; j++) {
     952                        int fd = rec->fds[j];
     953                        close(fd);
     954                }
     955                rec->num_fds = 0;
     956                rec->fds = NULL;
     957
     958                cb->fn(msg_ctx, cb->private_data, rec->msg_type,
     959                       rec->src, &rec->buf);
     960
     961                /*
     962                 * we continue looking for matching messages after finding
     963                 * one. This matters for subsystems like the internal notify
     964                 * code which register more than one handler for the same
     965                 * message type
     966                 */
     967        }
     968
     969        if (!messaging_append_new_waiters(msg_ctx)) {
     970                for (j=0; j < rec->num_fds; j++) {
     971                        int fd = rec->fds[j];
     972                        close(fd);
     973                }
     974                rec->num_fds = 0;
     975                rec->fds = NULL;
     976                return;
     977        }
     978
     979        i = 0;
     980        while (i < msg_ctx->num_waiters) {
     981                struct tevent_req *req;
     982                struct messaging_filtered_read_state *state;
     983
     984                req = msg_ctx->waiters[i];
     985                if (req == NULL) {
     986                        /*
     987                         * This got cleaned up. In the meantime,
     988                         * move everything down one. We need
     989                         * to keep the order of waiters, as
     990                         * other code may depend on this.
     991                         */
     992                        if (i < msg_ctx->num_waiters - 1) {
     993                                memmove(&msg_ctx->waiters[i],
     994                                        &msg_ctx->waiters[i+1],
     995                                        sizeof(struct tevent_req *) *
     996                                            (msg_ctx->num_waiters - i - 1));
     997                        }
     998                        msg_ctx->num_waiters -= 1;
     999                        continue;
     1000                }
     1001
     1002                state = tevent_req_data(
     1003                        req, struct messaging_filtered_read_state);
     1004                if (state->filter(rec, state->private_data)) {
     1005                        messaging_filtered_read_done(req, rec);
     1006
     1007                        /*
     1008                         * Only the first one gets the fd-array
     1009                         */
     1010                        rec->num_fds = 0;
     1011                        rec->fds = NULL;
     1012                }
     1013
     1014                i += 1;
     1015        }
     1016
     1017        /*
     1018         * If the fd-array isn't used, just close it.
     1019         */
     1020        for (j=0; j < rec->num_fds; j++) {
     1021                int fd = rec->fds[j];
     1022                close(fd);
     1023        }
     1024        rec->num_fds = 0;
     1025        rec->fds = NULL;
     1026}
     1027
     1028static int mess_parent_dgm_cleanup(void *private_data);
     1029static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
     1030
     1031bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
     1032{
     1033        struct tevent_req *req;
     1034
     1035        req = background_job_send(
     1036                msg, msg->event_ctx, msg, NULL, 0,
     1037                lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
     1038                            60*15),
     1039                mess_parent_dgm_cleanup, msg);
     1040        if (req == NULL) {
     1041                return false;
     1042        }
     1043        tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
     1044        return true;
     1045}
     1046
     1047static int mess_parent_dgm_cleanup(void *private_data)
     1048{
     1049        int ret;
     1050
     1051        ret = messaging_dgm_wipe();
     1052        DEBUG(10, ("messaging_dgm_wipe returned %s\n",
     1053                   ret ? strerror(ret) : "ok"));
     1054        return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
     1055                           60*15);
     1056}
     1057
     1058static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
     1059{
     1060        struct messaging_context *msg = tevent_req_callback_data(
     1061                req, struct messaging_context);
     1062        NTSTATUS status;
     1063
     1064        status = background_job_recv(req);
     1065        TALLOC_FREE(req);
     1066        DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
     1067                  nt_errstr(status)));
     1068
     1069        req = background_job_send(
     1070                msg, msg->event_ctx, msg, NULL, 0,
     1071                lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
     1072                            60*15),
     1073                mess_parent_dgm_cleanup, msg);
     1074        if (req == NULL) {
     1075                DEBUG(1, ("background_job_send failed\n"));
     1076                return;
     1077        }
     1078        tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
     1079}
     1080
     1081int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
     1082{
     1083        int ret;
     1084
     1085        if (pid == 0) {
     1086                ret = messaging_dgm_wipe();
     1087        } else {
     1088                ret = messaging_dgm_cleanup(pid);
     1089        }
     1090
     1091        return ret;
     1092}
     1093
     1094struct tevent_context *messaging_tevent_context(
     1095        struct messaging_context *msg_ctx)
     1096{
     1097        return msg_ctx->event_ctx;
     1098}
     1099
     1100struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
     1101{
     1102        return msg_ctx->names_db;
    3861103}
    3871104
Note: See TracChangeset for help on using the changeset viewer.