source: vendor/current/source3/lib/messages.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 27.4 KB
Line 
1/*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
21*/
22
23/**
24 @defgroup messages Internal messaging framework
25 @{
26 @file messages.c
27
28 @brief Module for internal messaging between Samba daemons.
29
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
34
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
38
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
41
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
45
46*/
47
48#include "includes.h"
49#include "dbwrap/dbwrap.h"
50#include "serverid.h"
51#include "messages.h"
52#include "lib/util/tevent_unix.h"
53#include "lib/background.h"
54#include "lib/messages_dgm.h"
55#include "lib/util/iov_buf.h"
56#include "lib/util/server_id_db.h"
57#include "lib/messages_dgm_ref.h"
58#include "lib/messages_util.h"
59
60struct messaging_callback {
61 struct messaging_callback *prev, *next;
62 uint32_t msg_type;
63 void (*fn)(struct messaging_context *msg, void *private_data,
64 uint32_t msg_type,
65 struct server_id server_id, DATA_BLOB *data);
66 void *private_data;
67};
68
69struct messaging_context {
70 struct server_id id;
71 struct tevent_context *event_ctx;
72 struct messaging_callback *callbacks;
73
74 struct tevent_req **new_waiters;
75 unsigned num_new_waiters;
76
77 struct tevent_req **waiters;
78 unsigned num_waiters;
79
80 void *msg_dgm_ref;
81 struct messaging_backend *remote;
82
83 struct server_id_db *names_db;
84};
85
86static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87 struct messaging_rec *rec);
88
89/****************************************************************************
90 A useful function for testing the message system.
91****************************************************************************/
92
93static void ping_message(struct messaging_context *msg_ctx,
94 void *private_data,
95 uint32_t msg_type,
96 struct server_id src,
97 DATA_BLOB *data)
98{
99 struct server_id_buf idbuf;
100
101 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102 server_id_str_buf(src, &idbuf), (int)data->length,
103 data->data ? (char *)data->data : ""));
104
105 messaging_send(msg_ctx, src, MSG_PONG, data);
106}
107
108/****************************************************************************
109 Register/replace a dispatch function for a particular message type.
110 JRA changed Dec 13 2006. Only one message handler now permitted per type.
111 *NOTE*: Dispatch functions must be able to cope with incoming
112 messages on an *odd* byte boundary.
113****************************************************************************/
114
115struct msg_all {
116 struct messaging_context *msg_ctx;
117 int msg_type;
118 uint32_t msg_flag;
119 const void *buf;
120 size_t len;
121 int n_sent;
122};
123
124/****************************************************************************
125 Send one of the messages for the broadcast.
126****************************************************************************/
127
128static int traverse_fn(struct db_record *rec, const struct server_id *id,
129 uint32_t msg_flags, void *state)
130{
131 struct msg_all *msg_all = (struct msg_all *)state;
132 NTSTATUS status;
133
134 /* Don't send if the receiver hasn't registered an interest. */
135
136 if((msg_flags & msg_all->msg_flag) == 0) {
137 return 0;
138 }
139
140 /* If the msg send fails because the pid was not found (i.e. smbd died),
141 * the msg has already been deleted from the messages.tdb.*/
142
143 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144 (const uint8_t *)msg_all->buf, msg_all->len);
145
146 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147 struct server_id_buf idbuf;
148
149 /*
150 * If the pid was not found delete the entry from
151 * serverid.tdb
152 */
153
154 DEBUG(2, ("pid %s doesn't exist\n",
155 server_id_str_buf(*id, &idbuf)));
156
157 dbwrap_record_delete(rec);
158 }
159 msg_all->n_sent++;
160 return 0;
161}
162
163/**
164 * Send a message to all smbd processes.
165 *
166 * It isn't very efficient, but should be OK for the sorts of
167 * applications that use it. When we need efficient broadcast we can add
168 * it.
169 *
170 * @param n_sent Set to the number of messages sent. This should be
171 * equal to the number of processes, but be careful for races.
172 *
173 * @retval True for success.
174 **/
175bool message_send_all(struct messaging_context *msg_ctx,
176 int msg_type,
177 const void *buf, size_t len,
178 int *n_sent)
179{
180 struct msg_all msg_all;
181
182 msg_all.msg_type = msg_type;
183 if (msg_type < 0x100) {
184 msg_all.msg_flag = FLAG_MSG_GENERAL;
185 } else if (msg_type > 0x100 && msg_type < 0x200) {
186 msg_all.msg_flag = FLAG_MSG_NMBD;
187 } else if (msg_type > 0x200 && msg_type < 0x300) {
188 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189 } else if (msg_type > 0x300 && msg_type < 0x400) {
190 msg_all.msg_flag = FLAG_MSG_SMBD;
191 } else if (msg_type > 0x400 && msg_type < 0x600) {
192 msg_all.msg_flag = FLAG_MSG_WINBIND;
193 } else if (msg_type > 4000 && msg_type < 5000) {
194 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195 } else {
196 return false;
197 }
198
199 msg_all.buf = buf;
200 msg_all.len = len;
201 msg_all.n_sent = 0;
202 msg_all.msg_ctx = msg_ctx;
203
204 serverid_traverse(traverse_fn, &msg_all);
205 if (n_sent)
206 *n_sent = msg_all.n_sent;
207 return true;
208}
209
210static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211 int *fds, size_t num_fds,
212 void *private_data)
213{
214 struct messaging_context *msg_ctx = talloc_get_type_abort(
215 private_data, struct messaging_context);
216 struct server_id_buf idbuf;
217 struct messaging_rec rec;
218 int64_t fds64[MIN(num_fds, INT8_MAX)];
219 size_t i;
220
221 if (msg_len < MESSAGE_HDR_LENGTH) {
222 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223 goto close_fail;
224 }
225
226 if (num_fds > INT8_MAX) {
227 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228 goto close_fail;
229 }
230
231 /*
232 * "consume" the fds by copying them and setting
233 * the original variable to -1
234 */
235 for (i=0; i < num_fds; i++) {
236 fds64[i] = fds[i];
237 fds[i] = -1;
238 }
239
240 rec = (struct messaging_rec) {
241 .msg_version = MESSAGE_VERSION,
242 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244 .num_fds = num_fds,
245 .fds = fds64,
246 };
247
248 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251 __func__, (unsigned)rec.msg_type,
252 (unsigned)rec.buf.length,
253 (unsigned)num_fds,
254 server_id_str_buf(rec.src, &idbuf)));
255
256 messaging_dispatch_rec(msg_ctx, &rec);
257 return;
258
259close_fail:
260 for (i=0; i < num_fds; i++) {
261 close(fds[i]);
262 }
263}
264
265static int messaging_context_destructor(struct messaging_context *ctx)
266{
267 unsigned i;
268
269 for (i=0; i<ctx->num_new_waiters; i++) {
270 if (ctx->new_waiters[i] != NULL) {
271 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272 ctx->new_waiters[i] = NULL;
273 }
274 }
275 for (i=0; i<ctx->num_waiters; i++) {
276 if (ctx->waiters[i] != NULL) {
277 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278 ctx->waiters[i] = NULL;
279 }
280 }
281
282 return 0;
283}
284
285static const char *private_path(const char *name)
286{
287 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288}
289
290struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
291 struct tevent_context *ev)
292{
293 struct messaging_context *ctx;
294 int ret;
295 const char *lck_path;
296 const char *priv_path;
297 bool ok;
298
299 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
300 return NULL;
301 }
302
303 ctx->id = (struct server_id) {
304 .pid = getpid(), .vnn = NONCLUSTER_VNN
305 };
306
307 ctx->event_ctx = ev;
308
309 sec_init();
310
311 lck_path = lock_path("msg.lock");
312 if (lck_path == NULL) {
313 TALLOC_FREE(ctx);
314 return NULL;
315 }
316
317 ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
318 0755);
319 if (!ok) {
320 DEBUG(10, ("%s: Could not create lock directory: %s\n",
321 __func__, strerror(errno)));
322 TALLOC_FREE(ctx);
323 return NULL;
324 }
325
326 priv_path = private_path("msg.sock");
327 if (priv_path == NULL) {
328 TALLOC_FREE(ctx);
329 return NULL;
330 }
331
332 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
333 0700);
334 if (!ok) {
335 DEBUG(10, ("%s: Could not create msg directory: %s\n",
336 __func__, strerror(errno)));
337 TALLOC_FREE(ctx);
338 return NULL;
339 }
340
341 ctx->msg_dgm_ref = messaging_dgm_ref(
342 ctx, ctx->event_ctx, &ctx->id.unique_id,
343 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
344
345 if (ctx->msg_dgm_ref == NULL) {
346 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
347 TALLOC_FREE(ctx);
348 return NULL;
349 }
350
351 talloc_set_destructor(ctx, messaging_context_destructor);
352
353 if (lp_clustering()) {
354 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
355
356 if (ret != 0) {
357 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
358 strerror(ret)));
359 TALLOC_FREE(ctx);
360 return NULL;
361 }
362 }
363 ctx->id.vnn = get_my_vnn();
364
365 ctx->names_db = server_id_db_init(
366 ctx, ctx->id, lp_lock_directory(), 0,
367 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
368 if (ctx->names_db == NULL) {
369 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
370 TALLOC_FREE(ctx);
371 return NULL;
372 }
373
374 messaging_register(ctx, NULL, MSG_PING, ping_message);
375
376 /* Register some debugging related messages */
377
378 register_msg_pool_usage(ctx);
379 register_dmalloc_msgs(ctx);
380 debug_register_msgs(ctx);
381
382 return ctx;
383}
384
385struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
386{
387 return msg_ctx->id;
388}
389
390/*
391 * re-init after a fork
392 */
393NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
394{
395 int ret;
396
397 TALLOC_FREE(msg_ctx->msg_dgm_ref);
398
399 msg_ctx->id = (struct server_id) {
400 .pid = getpid(), .vnn = msg_ctx->id.vnn
401 };
402
403 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
404 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
405 private_path("msg.sock"), lock_path("msg.lock"),
406 messaging_recv_cb, msg_ctx, &ret);
407
408 if (msg_ctx->msg_dgm_ref == NULL) {
409 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
410 return map_nt_error_from_unix(ret);
411 }
412
413 TALLOC_FREE(msg_ctx->remote);
414
415 if (lp_clustering()) {
416 ret = messaging_ctdbd_init(msg_ctx, msg_ctx,
417 &msg_ctx->remote);
418
419 if (ret != 0) {
420 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
421 strerror(ret)));
422 return map_nt_error_from_unix(ret);
423 }
424 }
425
426 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
427
428 return NT_STATUS_OK;
429}
430
431
432/*
433 * Register a dispatch function for a particular message type. Allow multiple
434 * registrants
435*/
436NTSTATUS messaging_register(struct messaging_context *msg_ctx,
437 void *private_data,
438 uint32_t msg_type,
439 void (*fn)(struct messaging_context *msg,
440 void *private_data,
441 uint32_t msg_type,
442 struct server_id server_id,
443 DATA_BLOB *data))
444{
445 struct messaging_callback *cb;
446
447 DEBUG(5, ("Registering messaging pointer for type %u - "
448 "private_data=%p\n",
449 (unsigned)msg_type, private_data));
450
451 /*
452 * Only one callback per type
453 */
454
455 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
456 /* we allow a second registration of the same message
457 type if it has a different private pointer. This is
458 needed in, for example, the internal notify code,
459 which creates a new notify context for each tree
460 connect, and expects to receive messages to each of
461 them. */
462 if (cb->msg_type == msg_type && private_data == cb->private_data) {
463 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
464 (unsigned)msg_type, private_data));
465 cb->fn = fn;
466 cb->private_data = private_data;
467 return NT_STATUS_OK;
468 }
469 }
470
471 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
472 return NT_STATUS_NO_MEMORY;
473 }
474
475 cb->msg_type = msg_type;
476 cb->fn = fn;
477 cb->private_data = private_data;
478
479 DLIST_ADD(msg_ctx->callbacks, cb);
480 return NT_STATUS_OK;
481}
482
483/*
484 De-register the function for a particular message type.
485*/
486void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
487 void *private_data)
488{
489 struct messaging_callback *cb, *next;
490
491 for (cb = ctx->callbacks; cb; cb = next) {
492 next = cb->next;
493 if ((cb->msg_type == msg_type)
494 && (cb->private_data == private_data)) {
495 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
496 (unsigned)msg_type, private_data));
497 DLIST_REMOVE(ctx->callbacks, cb);
498 TALLOC_FREE(cb);
499 }
500 }
501}
502
503/*
504 Send a message to a particular server
505*/
506NTSTATUS messaging_send(struct messaging_context *msg_ctx,
507 struct server_id server, uint32_t msg_type,
508 const DATA_BLOB *data)
509{
510 struct iovec iov;
511
512 iov.iov_base = data->data;
513 iov.iov_len = data->length;
514
515 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
516}
517
518NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
519 struct server_id server, uint32_t msg_type,
520 const uint8_t *buf, size_t len)
521{
522 DATA_BLOB blob = data_blob_const(buf, len);
523 return messaging_send(msg_ctx, server, msg_type, &blob);
524}
525
526int messaging_send_iov_from(struct messaging_context *msg_ctx,
527 struct server_id src, struct server_id dst,
528 uint32_t msg_type,
529 const struct iovec *iov, int iovlen,
530 const int *fds, size_t num_fds)
531{
532 int ret;
533 uint8_t hdr[MESSAGE_HDR_LENGTH];
534 struct iovec iov2[iovlen+1];
535
536 if (server_id_is_disconnected(&dst)) {
537 return EINVAL;
538 }
539
540 if (num_fds > INT8_MAX) {
541 return EINVAL;
542 }
543
544 if (!procid_is_local(&dst)) {
545 if (num_fds > 0) {
546 return ENOSYS;
547 }
548
549 ret = msg_ctx->remote->send_fn(src, dst,
550 msg_type, iov, iovlen,
551 NULL, 0,
552 msg_ctx->remote);
553 return ret;
554 }
555
556 message_hdr_put(hdr, msg_type, src, dst);
557 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
558 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
559
560 become_root();
561 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
562 unbecome_root();
563
564 return ret;
565}
566
567NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
568 struct server_id dst, uint32_t msg_type,
569 const struct iovec *iov, int iovlen,
570 const int *fds, size_t num_fds)
571{
572 int ret;
573
574 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
575 iov, iovlen, fds, num_fds);
576 if (ret != 0) {
577 return map_nt_error_from_unix(ret);
578 }
579 return NT_STATUS_OK;
580}
581
582static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
583 struct messaging_rec *rec)
584{
585 struct messaging_rec *result;
586 size_t fds_size = sizeof(int64_t) * rec->num_fds;
587
588 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
589 rec->buf.length + fds_size);
590 if (result == NULL) {
591 return NULL;
592 }
593 *result = *rec;
594
595 /* Doesn't fail, see talloc_pooled_object */
596
597 result->buf.data = talloc_memdup(result, rec->buf.data,
598 rec->buf.length);
599
600 result->fds = NULL;
601 if (result->num_fds > 0) {
602 result->fds = talloc_memdup(result, rec->fds, fds_size);
603 }
604
605 return result;
606}
607
608struct messaging_filtered_read_state {
609 struct tevent_context *ev;
610 struct messaging_context *msg_ctx;
611 void *tevent_handle;
612
613 bool (*filter)(struct messaging_rec *rec, void *private_data);
614 void *private_data;
615
616 struct messaging_rec *rec;
617};
618
619static void messaging_filtered_read_cleanup(struct tevent_req *req,
620 enum tevent_req_state req_state);
621
622struct tevent_req *messaging_filtered_read_send(
623 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
624 struct messaging_context *msg_ctx,
625 bool (*filter)(struct messaging_rec *rec, void *private_data),
626 void *private_data)
627{
628 struct tevent_req *req;
629 struct messaging_filtered_read_state *state;
630 size_t new_waiters_len;
631
632 req = tevent_req_create(mem_ctx, &state,
633 struct messaging_filtered_read_state);
634 if (req == NULL) {
635 return NULL;
636 }
637 state->ev = ev;
638 state->msg_ctx = msg_ctx;
639 state->filter = filter;
640 state->private_data = private_data;
641
642 /*
643 * We have to defer the callback here, as we might be called from
644 * within a different tevent_context than state->ev
645 */
646 tevent_req_defer_callback(req, state->ev);
647
648 state->tevent_handle = messaging_dgm_register_tevent_context(
649 state, ev);
650 if (tevent_req_nomem(state->tevent_handle, req)) {
651 return tevent_req_post(req, ev);
652 }
653
654 /*
655 * We add ourselves to the "new_waiters" array, not the "waiters"
656 * array. If we are called from within messaging_read_done,
657 * messaging_dispatch_rec will be in an active for-loop on
658 * "waiters". We must be careful not to mess with this array, because
659 * it could mean that a single event is being delivered twice.
660 */
661
662 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
663
664 if (new_waiters_len == msg_ctx->num_new_waiters) {
665 struct tevent_req **tmp;
666
667 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
668 struct tevent_req *, new_waiters_len+1);
669 if (tevent_req_nomem(tmp, req)) {
670 return tevent_req_post(req, ev);
671 }
672 msg_ctx->new_waiters = tmp;
673 }
674
675 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
676 msg_ctx->num_new_waiters += 1;
677 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
678
679 return req;
680}
681
682static void messaging_filtered_read_cleanup(struct tevent_req *req,
683 enum tevent_req_state req_state)
684{
685 struct messaging_filtered_read_state *state = tevent_req_data(
686 req, struct messaging_filtered_read_state);
687 struct messaging_context *msg_ctx = state->msg_ctx;
688 unsigned i;
689
690 tevent_req_set_cleanup_fn(req, NULL);
691
692 TALLOC_FREE(state->tevent_handle);
693
694 /*
695 * Just set the [new_]waiters entry to NULL, be careful not to mess
696 * with the other "waiters" array contents. We are often called from
697 * within "messaging_dispatch_rec", which loops over
698 * "waiters". Messing with the "waiters" array will mess up that
699 * for-loop.
700 */
701
702 for (i=0; i<msg_ctx->num_waiters; i++) {
703 if (msg_ctx->waiters[i] == req) {
704 msg_ctx->waiters[i] = NULL;
705 return;
706 }
707 }
708
709 for (i=0; i<msg_ctx->num_new_waiters; i++) {
710 if (msg_ctx->new_waiters[i] == req) {
711 msg_ctx->new_waiters[i] = NULL;
712 return;
713 }
714 }
715}
716
717static void messaging_filtered_read_done(struct tevent_req *req,
718 struct messaging_rec *rec)
719{
720 struct messaging_filtered_read_state *state = tevent_req_data(
721 req, struct messaging_filtered_read_state);
722
723 state->rec = messaging_rec_dup(state, rec);
724 if (tevent_req_nomem(state->rec, req)) {
725 return;
726 }
727 tevent_req_done(req);
728}
729
730int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
731 struct messaging_rec **presult)
732{
733 struct messaging_filtered_read_state *state = tevent_req_data(
734 req, struct messaging_filtered_read_state);
735 int err;
736
737 if (tevent_req_is_unix_error(req, &err)) {
738 tevent_req_received(req);
739 return err;
740 }
741 *presult = talloc_move(mem_ctx, &state->rec);
742 return 0;
743}
744
745struct messaging_read_state {
746 uint32_t msg_type;
747 struct messaging_rec *rec;
748};
749
750static bool messaging_read_filter(struct messaging_rec *rec,
751 void *private_data);
752static void messaging_read_done(struct tevent_req *subreq);
753
754struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
755 struct tevent_context *ev,
756 struct messaging_context *msg,
757 uint32_t msg_type)
758{
759 struct tevent_req *req, *subreq;
760 struct messaging_read_state *state;
761
762 req = tevent_req_create(mem_ctx, &state,
763 struct messaging_read_state);
764 if (req == NULL) {
765 return NULL;
766 }
767 state->msg_type = msg_type;
768
769 subreq = messaging_filtered_read_send(state, ev, msg,
770 messaging_read_filter, state);
771 if (tevent_req_nomem(subreq, req)) {
772 return tevent_req_post(req, ev);
773 }
774 tevent_req_set_callback(subreq, messaging_read_done, req);
775 return req;
776}
777
778static bool messaging_read_filter(struct messaging_rec *rec,
779 void *private_data)
780{
781 struct messaging_read_state *state = talloc_get_type_abort(
782 private_data, struct messaging_read_state);
783
784 if (rec->num_fds != 0) {
785 return false;
786 }
787
788 return rec->msg_type == state->msg_type;
789}
790
791static void messaging_read_done(struct tevent_req *subreq)
792{
793 struct tevent_req *req = tevent_req_callback_data(
794 subreq, struct tevent_req);
795 struct messaging_read_state *state = tevent_req_data(
796 req, struct messaging_read_state);
797 int ret;
798
799 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
800 TALLOC_FREE(subreq);
801 if (tevent_req_error(req, ret)) {
802 return;
803 }
804 tevent_req_done(req);
805}
806
807int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
808 struct messaging_rec **presult)
809{
810 struct messaging_read_state *state = tevent_req_data(
811 req, struct messaging_read_state);
812 int err;
813
814 if (tevent_req_is_unix_error(req, &err)) {
815 return err;
816 }
817 if (presult != NULL) {
818 *presult = talloc_move(mem_ctx, &state->rec);
819 }
820 return 0;
821}
822
823struct messaging_handler_state {
824 struct tevent_context *ev;
825 struct messaging_context *msg_ctx;
826 uint32_t msg_type;
827 bool (*handler)(struct messaging_context *msg_ctx,
828 struct messaging_rec **rec, void *private_data);
829 void *private_data;
830};
831
832static void messaging_handler_got_msg(struct tevent_req *subreq);
833
834struct tevent_req *messaging_handler_send(
835 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
836 struct messaging_context *msg_ctx, uint32_t msg_type,
837 bool (*handler)(struct messaging_context *msg_ctx,
838 struct messaging_rec **rec, void *private_data),
839 void *private_data)
840{
841 struct tevent_req *req, *subreq;
842 struct messaging_handler_state *state;
843
844 req = tevent_req_create(mem_ctx, &state,
845 struct messaging_handler_state);
846 if (req == NULL) {
847 return NULL;
848 }
849 state->ev = ev;
850 state->msg_ctx = msg_ctx;
851 state->msg_type = msg_type;
852 state->handler = handler;
853 state->private_data = private_data;
854
855 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
856 state->msg_type);
857 if (tevent_req_nomem(subreq, req)) {
858 return tevent_req_post(req, ev);
859 }
860 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
861 return req;
862}
863
864static void messaging_handler_got_msg(struct tevent_req *subreq)
865{
866 struct tevent_req *req = tevent_req_callback_data(
867 subreq, struct tevent_req);
868 struct messaging_handler_state *state = tevent_req_data(
869 req, struct messaging_handler_state);
870 struct messaging_rec *rec;
871 int ret;
872 bool ok;
873
874 ret = messaging_read_recv(subreq, state, &rec);
875 TALLOC_FREE(subreq);
876 if (tevent_req_error(req, ret)) {
877 return;
878 }
879
880 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
881 state->msg_type);
882 if (tevent_req_nomem(subreq, req)) {
883 return;
884 }
885 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
886
887 ok = state->handler(state->msg_ctx, &rec, state->private_data);
888 TALLOC_FREE(rec);
889 if (ok) {
890 /*
891 * Next round
892 */
893 return;
894 }
895 TALLOC_FREE(subreq);
896 tevent_req_done(req);
897}
898
899int messaging_handler_recv(struct tevent_req *req)
900{
901 return tevent_req_simple_recv_unix(req);
902}
903
904static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
905{
906 if (msg_ctx->num_new_waiters == 0) {
907 return true;
908 }
909
910 if (talloc_array_length(msg_ctx->waiters) <
911 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
912 struct tevent_req **tmp;
913 tmp = talloc_realloc(
914 msg_ctx, msg_ctx->waiters, struct tevent_req *,
915 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
916 if (tmp == NULL) {
917 DEBUG(1, ("%s: talloc failed\n", __func__));
918 return false;
919 }
920 msg_ctx->waiters = tmp;
921 }
922
923 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
924 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
925
926 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
927 msg_ctx->num_new_waiters = 0;
928
929 return true;
930}
931
932/*
933 Dispatch one messaging_rec
934*/
935static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
936 struct messaging_rec *rec)
937{
938 struct messaging_callback *cb, *next;
939 unsigned i;
940 size_t j;
941
942 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
943 next = cb->next;
944 if (cb->msg_type != rec->msg_type) {
945 continue;
946 }
947
948 /*
949 * the old style callbacks don't support fd passing
950 */
951 for (j=0; j < rec->num_fds; j++) {
952 int fd = rec->fds[j];
953 close(fd);
954 }
955 rec->num_fds = 0;
956 rec->fds = NULL;
957
958 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
959 rec->src, &rec->buf);
960
961 /*
962 * we continue looking for matching messages after finding
963 * one. This matters for subsystems like the internal notify
964 * code which register more than one handler for the same
965 * message type
966 */
967 }
968
969 if (!messaging_append_new_waiters(msg_ctx)) {
970 for (j=0; j < rec->num_fds; j++) {
971 int fd = rec->fds[j];
972 close(fd);
973 }
974 rec->num_fds = 0;
975 rec->fds = NULL;
976 return;
977 }
978
979 i = 0;
980 while (i < msg_ctx->num_waiters) {
981 struct tevent_req *req;
982 struct messaging_filtered_read_state *state;
983
984 req = msg_ctx->waiters[i];
985 if (req == NULL) {
986 /*
987 * This got cleaned up. In the meantime,
988 * move everything down one. We need
989 * to keep the order of waiters, as
990 * other code may depend on this.
991 */
992 if (i < msg_ctx->num_waiters - 1) {
993 memmove(&msg_ctx->waiters[i],
994 &msg_ctx->waiters[i+1],
995 sizeof(struct tevent_req *) *
996 (msg_ctx->num_waiters - i - 1));
997 }
998 msg_ctx->num_waiters -= 1;
999 continue;
1000 }
1001
1002 state = tevent_req_data(
1003 req, struct messaging_filtered_read_state);
1004 if (state->filter(rec, state->private_data)) {
1005 messaging_filtered_read_done(req, rec);
1006
1007 /*
1008 * Only the first one gets the fd-array
1009 */
1010 rec->num_fds = 0;
1011 rec->fds = NULL;
1012 }
1013
1014 i += 1;
1015 }
1016
1017 /*
1018 * If the fd-array isn't used, just close it.
1019 */
1020 for (j=0; j < rec->num_fds; j++) {
1021 int fd = rec->fds[j];
1022 close(fd);
1023 }
1024 rec->num_fds = 0;
1025 rec->fds = NULL;
1026}
1027
1028static int mess_parent_dgm_cleanup(void *private_data);
1029static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1030
1031bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1032{
1033 struct tevent_req *req;
1034
1035 req = background_job_send(
1036 msg, msg->event_ctx, msg, NULL, 0,
1037 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1038 60*15),
1039 mess_parent_dgm_cleanup, msg);
1040 if (req == NULL) {
1041 return false;
1042 }
1043 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1044 return true;
1045}
1046
1047static int mess_parent_dgm_cleanup(void *private_data)
1048{
1049 int ret;
1050
1051 ret = messaging_dgm_wipe();
1052 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1053 ret ? strerror(ret) : "ok"));
1054 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1055 60*15);
1056}
1057
1058static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1059{
1060 struct messaging_context *msg = tevent_req_callback_data(
1061 req, struct messaging_context);
1062 NTSTATUS status;
1063
1064 status = background_job_recv(req);
1065 TALLOC_FREE(req);
1066 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1067 nt_errstr(status)));
1068
1069 req = background_job_send(
1070 msg, msg->event_ctx, msg, NULL, 0,
1071 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1072 60*15),
1073 mess_parent_dgm_cleanup, msg);
1074 if (req == NULL) {
1075 DEBUG(1, ("background_job_send failed\n"));
1076 return;
1077 }
1078 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1079}
1080
1081int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1082{
1083 int ret;
1084
1085 if (pid == 0) {
1086 ret = messaging_dgm_wipe();
1087 } else {
1088 ret = messaging_dgm_cleanup(pid);
1089 }
1090
1091 return ret;
1092}
1093
1094struct tevent_context *messaging_tevent_context(
1095 struct messaging_context *msg_ctx)
1096{
1097 return msg_ctx->event_ctx;
1098}
1099
1100struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1101{
1102 return msg_ctx->names_db;
1103}
1104
1105/** @} **/
Note: See TracBrowser for help on using the repository browser.