Changeset 988 for vendor/current/source3/lib/messages.c
- Timestamp:
- Nov 24, 2016, 1:14:11 PM (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/source3/lib/messages.c
r740 r988 6 6 Copyright (C) 2002 by Jeremy Allison 7 7 Copyright (C) 2007 by Volker Lendecke 8 8 9 9 This program is free software; you can redistribute it and/or modify 10 10 it under the terms of the GNU General Public License as published by 11 11 the Free Software Foundation; either version 3 of the License, or 12 12 (at your option) any later version. 13 13 14 14 This program is distributed in the hope that it will be useful, 15 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 17 GNU General Public License for more details. 18 18 19 19 You should have received a copy of the GNU General Public License 20 20 along with this program. If not, see <http://www.gnu.org/licenses/>. … … 25 25 @{ 26 26 @file messages.c 27 27 28 28 @brief Module for internal messaging between Samba daemons. 29 29 … … 47 47 48 48 #include "includes.h" 49 #include "dbwrap .h"49 #include "dbwrap/dbwrap.h" 50 50 #include "serverid.h" 51 51 #include "messages.h" 52 #include "lib/util/tevent_unix.h" 53 #include "lib/background.h" 54 #include "lib/messages_dgm.h" 55 #include "lib/util/iov_buf.h" 56 #include "lib/util/server_id_db.h" 57 #include "lib/messages_dgm_ref.h" 58 #include "lib/messages_util.h" 52 59 53 60 struct messaging_callback { 54 61 struct messaging_callback *prev, *next; 55 uint32 msg_type;62 uint32_t msg_type; 56 63 void (*fn)(struct messaging_context *msg, void *private_data, 57 64 uint32_t msg_type, … … 60 67 }; 61 68 69 struct messaging_context { 70 struct server_id id; 71 struct tevent_context *event_ctx; 72 struct messaging_callback *callbacks; 73 74 struct tevent_req **new_waiters; 75 unsigned num_new_waiters; 76 77 struct tevent_req **waiters; 78 unsigned num_waiters; 79 80 void *msg_dgm_ref; 81 struct messaging_backend *remote; 82 83 struct server_id_db *names_db; 84 }; 85 86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx, 87 struct messaging_rec *rec); 88 62 89 /**************************************************************************** 63 90 A useful function for testing the message system. … … 70 97 DATA_BLOB *data) 71 98 { 72 const char *msg = data->data ? (const char *)data->data : "none"; 73 74 DEBUG(1,("INFO: Received PING message from PID %s [%s]\n", 75 procid_str_static(&src), msg)); 99 struct server_id_buf idbuf; 100 101 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n", 102 server_id_str_buf(src, &idbuf), (int)data->length, 103 data->data ? (char *)data->data : "")); 104 76 105 messaging_send(msg_ctx, src, MSG_PONG, data); 77 106 } … … 87 116 struct messaging_context *msg_ctx; 88 117 int msg_type; 89 uint32 msg_flag;118 uint32_t msg_flag; 90 119 const void *buf; 91 120 size_t len; … … 113 142 114 143 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type, 115 ( uint8*)msg_all->buf, msg_all->len);144 (const uint8_t *)msg_all->buf, msg_all->len); 116 145 117 146 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) { 118 119 /* If the pid was not found delete the entry from connections.tdb */ 120 121 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id))); 122 123 rec->delete_rec(rec); 147 struct server_id_buf idbuf; 148 149 /* 150 * If the pid was not found delete the entry from 151 * serverid.tdb 152 */ 153 154 DEBUG(2, ("pid %s doesn't exist\n", 155 server_id_str_buf(*id, &idbuf))); 156 157 dbwrap_record_delete(rec); 124 158 } 125 159 msg_all->n_sent++; … … 147 181 148 182 msg_all.msg_type = msg_type; 149 if (msg_type < 1000)183 if (msg_type < 0x100) { 150 184 msg_all.msg_flag = FLAG_MSG_GENERAL; 151 else if (msg_type > 1000 && msg_type < 2000)185 } else if (msg_type > 0x100 && msg_type < 0x200) { 152 186 msg_all.msg_flag = FLAG_MSG_NMBD; 153 else if (msg_type > 2000 && msg_type < 2100) 154 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY; 155 else if (msg_type > 2100 && msg_type < 3000) 187 } else if (msg_type > 0x200 && msg_type < 0x300) { 156 188 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL; 157 else if (msg_type > 3000 && msg_type < 4000)189 } else if (msg_type > 0x300 && msg_type < 0x400) { 158 190 msg_all.msg_flag = FLAG_MSG_SMBD; 159 else if (msg_type > 4000 && msg_type < 5000) 191 } else if (msg_type > 0x400 && msg_type < 0x600) { 192 msg_all.msg_flag = FLAG_MSG_WINBIND; 193 } else if (msg_type > 4000 && msg_type < 5000) { 160 194 msg_all.msg_flag = FLAG_MSG_DBWRAP; 161 else 162 return False; 195 } else { 196 return false; 197 } 163 198 164 199 msg_all.buf = buf; … … 170 205 if (n_sent) 171 206 *n_sent = msg_all.n_sent; 172 return True; 173 } 174 175 struct event_context *messaging_event_context(struct messaging_context *msg_ctx) 176 { 177 return msg_ctx->event_ctx; 207 return true; 208 } 209 210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len, 211 int *fds, size_t num_fds, 212 void *private_data) 213 { 214 struct messaging_context *msg_ctx = talloc_get_type_abort( 215 private_data, struct messaging_context); 216 struct server_id_buf idbuf; 217 struct messaging_rec rec; 218 int64_t fds64[MIN(num_fds, INT8_MAX)]; 219 size_t i; 220 221 if (msg_len < MESSAGE_HDR_LENGTH) { 222 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len)); 223 goto close_fail; 224 } 225 226 if (num_fds > INT8_MAX) { 227 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds)); 228 goto close_fail; 229 } 230 231 /* 232 * "consume" the fds by copying them and setting 233 * the original variable to -1 234 */ 235 for (i=0; i < num_fds; i++) { 236 fds64[i] = fds[i]; 237 fds[i] = -1; 238 } 239 240 rec = (struct messaging_rec) { 241 .msg_version = MESSAGE_VERSION, 242 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH, 243 .buf.length = msg_len - MESSAGE_HDR_LENGTH, 244 .num_fds = num_fds, 245 .fds = fds64, 246 }; 247 248 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg); 249 250 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n", 251 __func__, (unsigned)rec.msg_type, 252 (unsigned)rec.buf.length, 253 (unsigned)num_fds, 254 server_id_str_buf(rec.src, &idbuf))); 255 256 messaging_dispatch_rec(msg_ctx, &rec); 257 return; 258 259 close_fail: 260 for (i=0; i < num_fds; i++) { 261 close(fds[i]); 262 } 263 } 264 265 static int messaging_context_destructor(struct messaging_context *ctx) 266 { 267 unsigned i; 268 269 for (i=0; i<ctx->num_new_waiters; i++) { 270 if (ctx->new_waiters[i] != NULL) { 271 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL); 272 ctx->new_waiters[i] = NULL; 273 } 274 } 275 for (i=0; i<ctx->num_waiters; i++) { 276 if (ctx->waiters[i] != NULL) { 277 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL); 278 ctx->waiters[i] = NULL; 279 } 280 } 281 282 return 0; 283 } 284 285 static const char *private_path(const char *name) 286 { 287 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name); 178 288 } 179 289 180 290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 181 struct server_id server_id, 182 struct event_context *ev) 291 struct tevent_context *ev) 183 292 { 184 293 struct messaging_context *ctx; 185 NTSTATUS status; 186 187 if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) { 294 int ret; 295 const char *lck_path; 296 const char *priv_path; 297 bool ok; 298 299 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) { 188 300 return NULL; 189 301 } 190 302 191 ctx->id = server_id; 303 ctx->id = (struct server_id) { 304 .pid = getpid(), .vnn = NONCLUSTER_VNN 305 }; 306 192 307 ctx->event_ctx = ev; 193 308 194 status = messaging_tdb_init(ctx, ctx, &ctx->local); 195 196 if (!NT_STATUS_IS_OK(status)) { 197 DEBUG(2, ("messaging_tdb_init failed: %s\n", 198 nt_errstr(status))); 309 sec_init(); 310 311 lck_path = lock_path("msg.lock"); 312 if (lck_path == NULL) { 199 313 TALLOC_FREE(ctx); 200 314 return NULL; 201 315 } 202 316 203 #ifdef CLUSTER_SUPPORT 317 ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(), 318 0755); 319 if (!ok) { 320 DEBUG(10, ("%s: Could not create lock directory: %s\n", 321 __func__, strerror(errno))); 322 TALLOC_FREE(ctx); 323 return NULL; 324 } 325 326 priv_path = private_path("msg.sock"); 327 if (priv_path == NULL) { 328 TALLOC_FREE(ctx); 329 return NULL; 330 } 331 332 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(), 333 0700); 334 if (!ok) { 335 DEBUG(10, ("%s: Could not create msg directory: %s\n", 336 __func__, strerror(errno))); 337 TALLOC_FREE(ctx); 338 return NULL; 339 } 340 341 ctx->msg_dgm_ref = messaging_dgm_ref( 342 ctx, ctx->event_ctx, &ctx->id.unique_id, 343 priv_path, lck_path, messaging_recv_cb, ctx, &ret); 344 345 if (ctx->msg_dgm_ref == NULL) { 346 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret))); 347 TALLOC_FREE(ctx); 348 return NULL; 349 } 350 351 talloc_set_destructor(ctx, messaging_context_destructor); 352 204 353 if (lp_clustering()) { 205 status= messaging_ctdbd_init(ctx, ctx, &ctx->remote);206 207 if ( !NT_STATUS_IS_OK(status)) {208 DEBUG(2, ("messaging_ctdb _init failed: %s\n",209 nt_errstr(status)));354 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote); 355 356 if (ret != 0) { 357 DEBUG(2, ("messaging_ctdbd_init failed: %s\n", 358 strerror(ret))); 210 359 TALLOC_FREE(ctx); 211 360 return NULL; … … 213 362 } 214 363 ctx->id.vnn = get_my_vnn(); 215 #endif 364 365 ctx->names_db = server_id_db_init( 366 ctx, ctx->id, lp_lock_directory(), 0, 367 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST); 368 if (ctx->names_db == NULL) { 369 DEBUG(10, ("%s: server_id_db_init failed\n", __func__)); 370 TALLOC_FREE(ctx); 371 return NULL; 372 } 216 373 217 374 messaging_register(ctx, NULL, MSG_PING, ping_message); … … 234 391 * re-init after a fork 235 392 */ 236 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx, 237 struct server_id id) 238 { 239 NTSTATUS status; 240 241 TALLOC_FREE(msg_ctx->local); 242 243 msg_ctx->id = id; 244 245 status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local); 246 if (!NT_STATUS_IS_OK(status)) { 247 DEBUG(0, ("messaging_tdb_init failed: %s\n", 248 nt_errstr(status))); 249 return status; 250 } 251 252 #ifdef CLUSTER_SUPPORT 393 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx) 394 { 395 int ret; 396 397 TALLOC_FREE(msg_ctx->msg_dgm_ref); 398 399 msg_ctx->id = (struct server_id) { 400 .pid = getpid(), .vnn = msg_ctx->id.vnn 401 }; 402 403 msg_ctx->msg_dgm_ref = messaging_dgm_ref( 404 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id, 405 private_path("msg.sock"), lock_path("msg.lock"), 406 messaging_recv_cb, msg_ctx, &ret); 407 408 if (msg_ctx->msg_dgm_ref == NULL) { 409 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret))); 410 return map_nt_error_from_unix(ret); 411 } 412 253 413 TALLOC_FREE(msg_ctx->remote); 254 414 255 415 if (lp_clustering()) { 256 status= messaging_ctdbd_init(msg_ctx, msg_ctx,257 258 259 if ( !NT_STATUS_IS_OK(status)) {260 DEBUG(1, ("messaging_ctdb _init failed: %s\n",261 nt_errstr(status)));262 return status;263 } 264 } 265 266 #endif 416 ret = messaging_ctdbd_init(msg_ctx, msg_ctx, 417 &msg_ctx->remote); 418 419 if (ret != 0) { 420 DEBUG(1, ("messaging_ctdbd_init failed: %s\n", 421 strerror(ret))); 422 return map_nt_error_from_unix(ret); 423 } 424 } 425 426 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id); 267 427 268 428 return NT_STATUS_OK; … … 284 444 { 285 445 struct messaging_callback *cb; 446 447 DEBUG(5, ("Registering messaging pointer for type %u - " 448 "private_data=%p\n", 449 (unsigned)msg_type, private_data)); 286 450 287 451 /* … … 344 508 const DATA_BLOB *data) 345 509 { 346 #ifdef CLUSTER_SUPPORT 347 if (!procid_is_local(&server)) { 348 return msg_ctx->remote->send_fn(msg_ctx, server, 349 msg_type, data, 350 msg_ctx->remote); 351 } 352 #endif 353 return msg_ctx->local->send_fn(msg_ctx, server, msg_type, data, 354 msg_ctx->local); 510 struct iovec iov; 511 512 iov.iov_base = data->data; 513 iov.iov_len = data->length; 514 515 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0); 355 516 } 356 517 357 518 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx, 358 519 struct server_id server, uint32_t msg_type, 359 const uint8 *buf, size_t len)520 const uint8_t *buf, size_t len) 360 521 { 361 522 DATA_BLOB blob = data_blob_const(buf, len); 362 523 return messaging_send(msg_ctx, server, msg_type, &blob); 524 } 525 526 int messaging_send_iov_from(struct messaging_context *msg_ctx, 527 struct server_id src, struct server_id dst, 528 uint32_t msg_type, 529 const struct iovec *iov, int iovlen, 530 const int *fds, size_t num_fds) 531 { 532 int ret; 533 uint8_t hdr[MESSAGE_HDR_LENGTH]; 534 struct iovec iov2[iovlen+1]; 535 536 if (server_id_is_disconnected(&dst)) { 537 return EINVAL; 538 } 539 540 if (num_fds > INT8_MAX) { 541 return EINVAL; 542 } 543 544 if (!procid_is_local(&dst)) { 545 if (num_fds > 0) { 546 return ENOSYS; 547 } 548 549 ret = msg_ctx->remote->send_fn(src, dst, 550 msg_type, iov, iovlen, 551 NULL, 0, 552 msg_ctx->remote); 553 return ret; 554 } 555 556 message_hdr_put(hdr, msg_type, src, dst); 557 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) }; 558 memcpy(&iov2[1], iov, iovlen * sizeof(*iov)); 559 560 become_root(); 561 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds); 562 unbecome_root(); 563 564 return ret; 565 } 566 567 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx, 568 struct server_id dst, uint32_t msg_type, 569 const struct iovec *iov, int iovlen, 570 const int *fds, size_t num_fds) 571 { 572 int ret; 573 574 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type, 575 iov, iovlen, fds, num_fds); 576 if (ret != 0) { 577 return map_nt_error_from_unix(ret); 578 } 579 return NT_STATUS_OK; 580 } 581 582 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx, 583 struct messaging_rec *rec) 584 { 585 struct messaging_rec *result; 586 size_t fds_size = sizeof(int64_t) * rec->num_fds; 587 588 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2, 589 rec->buf.length + fds_size); 590 if (result == NULL) { 591 return NULL; 592 } 593 *result = *rec; 594 595 /* Doesn't fail, see talloc_pooled_object */ 596 597 result->buf.data = talloc_memdup(result, rec->buf.data, 598 rec->buf.length); 599 600 result->fds = NULL; 601 if (result->num_fds > 0) { 602 result->fds = talloc_memdup(result, rec->fds, fds_size); 603 } 604 605 return result; 606 } 607 608 struct messaging_filtered_read_state { 609 struct tevent_context *ev; 610 struct messaging_context *msg_ctx; 611 void *tevent_handle; 612 613 bool (*filter)(struct messaging_rec *rec, void *private_data); 614 void *private_data; 615 616 struct messaging_rec *rec; 617 }; 618 619 static void messaging_filtered_read_cleanup(struct tevent_req *req, 620 enum tevent_req_state req_state); 621 622 struct tevent_req *messaging_filtered_read_send( 623 TALLOC_CTX *mem_ctx, struct tevent_context *ev, 624 struct messaging_context *msg_ctx, 625 bool (*filter)(struct messaging_rec *rec, void *private_data), 626 void *private_data) 627 { 628 struct tevent_req *req; 629 struct messaging_filtered_read_state *state; 630 size_t new_waiters_len; 631 632 req = tevent_req_create(mem_ctx, &state, 633 struct messaging_filtered_read_state); 634 if (req == NULL) { 635 return NULL; 636 } 637 state->ev = ev; 638 state->msg_ctx = msg_ctx; 639 state->filter = filter; 640 state->private_data = private_data; 641 642 /* 643 * We have to defer the callback here, as we might be called from 644 * within a different tevent_context than state->ev 645 */ 646 tevent_req_defer_callback(req, state->ev); 647 648 state->tevent_handle = messaging_dgm_register_tevent_context( 649 state, ev); 650 if (tevent_req_nomem(state->tevent_handle, req)) { 651 return tevent_req_post(req, ev); 652 } 653 654 /* 655 * We add ourselves to the "new_waiters" array, not the "waiters" 656 * array. If we are called from within messaging_read_done, 657 * messaging_dispatch_rec will be in an active for-loop on 658 * "waiters". We must be careful not to mess with this array, because 659 * it could mean that a single event is being delivered twice. 660 */ 661 662 new_waiters_len = talloc_array_length(msg_ctx->new_waiters); 663 664 if (new_waiters_len == msg_ctx->num_new_waiters) { 665 struct tevent_req **tmp; 666 667 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters, 668 struct tevent_req *, new_waiters_len+1); 669 if (tevent_req_nomem(tmp, req)) { 670 return tevent_req_post(req, ev); 671 } 672 msg_ctx->new_waiters = tmp; 673 } 674 675 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req; 676 msg_ctx->num_new_waiters += 1; 677 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup); 678 679 return req; 680 } 681 682 static void messaging_filtered_read_cleanup(struct tevent_req *req, 683 enum tevent_req_state req_state) 684 { 685 struct messaging_filtered_read_state *state = tevent_req_data( 686 req, struct messaging_filtered_read_state); 687 struct messaging_context *msg_ctx = state->msg_ctx; 688 unsigned i; 689 690 tevent_req_set_cleanup_fn(req, NULL); 691 692 TALLOC_FREE(state->tevent_handle); 693 694 /* 695 * Just set the [new_]waiters entry to NULL, be careful not to mess 696 * with the other "waiters" array contents. We are often called from 697 * within "messaging_dispatch_rec", which loops over 698 * "waiters". Messing with the "waiters" array will mess up that 699 * for-loop. 700 */ 701 702 for (i=0; i<msg_ctx->num_waiters; i++) { 703 if (msg_ctx->waiters[i] == req) { 704 msg_ctx->waiters[i] = NULL; 705 return; 706 } 707 } 708 709 for (i=0; i<msg_ctx->num_new_waiters; i++) { 710 if (msg_ctx->new_waiters[i] == req) { 711 msg_ctx->new_waiters[i] = NULL; 712 return; 713 } 714 } 715 } 716 717 static void messaging_filtered_read_done(struct tevent_req *req, 718 struct messaging_rec *rec) 719 { 720 struct messaging_filtered_read_state *state = tevent_req_data( 721 req, struct messaging_filtered_read_state); 722 723 state->rec = messaging_rec_dup(state, rec); 724 if (tevent_req_nomem(state->rec, req)) { 725 return; 726 } 727 tevent_req_done(req); 728 } 729 730 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, 731 struct messaging_rec **presult) 732 { 733 struct messaging_filtered_read_state *state = tevent_req_data( 734 req, struct messaging_filtered_read_state); 735 int err; 736 737 if (tevent_req_is_unix_error(req, &err)) { 738 tevent_req_received(req); 739 return err; 740 } 741 *presult = talloc_move(mem_ctx, &state->rec); 742 return 0; 743 } 744 745 struct messaging_read_state { 746 uint32_t msg_type; 747 struct messaging_rec *rec; 748 }; 749 750 static bool messaging_read_filter(struct messaging_rec *rec, 751 void *private_data); 752 static void messaging_read_done(struct tevent_req *subreq); 753 754 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx, 755 struct tevent_context *ev, 756 struct messaging_context *msg, 757 uint32_t msg_type) 758 { 759 struct tevent_req *req, *subreq; 760 struct messaging_read_state *state; 761 762 req = tevent_req_create(mem_ctx, &state, 763 struct messaging_read_state); 764 if (req == NULL) { 765 return NULL; 766 } 767 state->msg_type = msg_type; 768 769 subreq = messaging_filtered_read_send(state, ev, msg, 770 messaging_read_filter, state); 771 if (tevent_req_nomem(subreq, req)) { 772 return tevent_req_post(req, ev); 773 } 774 tevent_req_set_callback(subreq, messaging_read_done, req); 775 return req; 776 } 777 778 static bool messaging_read_filter(struct messaging_rec *rec, 779 void *private_data) 780 { 781 struct messaging_read_state *state = talloc_get_type_abort( 782 private_data, struct messaging_read_state); 783 784 if (rec->num_fds != 0) { 785 return false; 786 } 787 788 return rec->msg_type == state->msg_type; 789 } 790 791 static void messaging_read_done(struct tevent_req *subreq) 792 { 793 struct tevent_req *req = tevent_req_callback_data( 794 subreq, struct tevent_req); 795 struct messaging_read_state *state = tevent_req_data( 796 req, struct messaging_read_state); 797 int ret; 798 799 ret = messaging_filtered_read_recv(subreq, state, &state->rec); 800 TALLOC_FREE(subreq); 801 if (tevent_req_error(req, ret)) { 802 return; 803 } 804 tevent_req_done(req); 805 } 806 807 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, 808 struct messaging_rec **presult) 809 { 810 struct messaging_read_state *state = tevent_req_data( 811 req, struct messaging_read_state); 812 int err; 813 814 if (tevent_req_is_unix_error(req, &err)) { 815 return err; 816 } 817 if (presult != NULL) { 818 *presult = talloc_move(mem_ctx, &state->rec); 819 } 820 return 0; 821 } 822 823 struct messaging_handler_state { 824 struct tevent_context *ev; 825 struct messaging_context *msg_ctx; 826 uint32_t msg_type; 827 bool (*handler)(struct messaging_context *msg_ctx, 828 struct messaging_rec **rec, void *private_data); 829 void *private_data; 830 }; 831 832 static void messaging_handler_got_msg(struct tevent_req *subreq); 833 834 struct tevent_req *messaging_handler_send( 835 TALLOC_CTX *mem_ctx, struct tevent_context *ev, 836 struct messaging_context *msg_ctx, uint32_t msg_type, 837 bool (*handler)(struct messaging_context *msg_ctx, 838 struct messaging_rec **rec, void *private_data), 839 void *private_data) 840 { 841 struct tevent_req *req, *subreq; 842 struct messaging_handler_state *state; 843 844 req = tevent_req_create(mem_ctx, &state, 845 struct messaging_handler_state); 846 if (req == NULL) { 847 return NULL; 848 } 849 state->ev = ev; 850 state->msg_ctx = msg_ctx; 851 state->msg_type = msg_type; 852 state->handler = handler; 853 state->private_data = private_data; 854 855 subreq = messaging_read_send(state, state->ev, state->msg_ctx, 856 state->msg_type); 857 if (tevent_req_nomem(subreq, req)) { 858 return tevent_req_post(req, ev); 859 } 860 tevent_req_set_callback(subreq, messaging_handler_got_msg, req); 861 return req; 862 } 863 864 static void messaging_handler_got_msg(struct tevent_req *subreq) 865 { 866 struct tevent_req *req = tevent_req_callback_data( 867 subreq, struct tevent_req); 868 struct messaging_handler_state *state = tevent_req_data( 869 req, struct messaging_handler_state); 870 struct messaging_rec *rec; 871 int ret; 872 bool ok; 873 874 ret = messaging_read_recv(subreq, state, &rec); 875 TALLOC_FREE(subreq); 876 if (tevent_req_error(req, ret)) { 877 return; 878 } 879 880 subreq = messaging_read_send(state, state->ev, state->msg_ctx, 881 state->msg_type); 882 if (tevent_req_nomem(subreq, req)) { 883 return; 884 } 885 tevent_req_set_callback(subreq, messaging_handler_got_msg, req); 886 887 ok = state->handler(state->msg_ctx, &rec, state->private_data); 888 TALLOC_FREE(rec); 889 if (ok) { 890 /* 891 * Next round 892 */ 893 return; 894 } 895 TALLOC_FREE(subreq); 896 tevent_req_done(req); 897 } 898 899 int messaging_handler_recv(struct tevent_req *req) 900 { 901 return tevent_req_simple_recv_unix(req); 902 } 903 904 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx) 905 { 906 if (msg_ctx->num_new_waiters == 0) { 907 return true; 908 } 909 910 if (talloc_array_length(msg_ctx->waiters) < 911 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) { 912 struct tevent_req **tmp; 913 tmp = talloc_realloc( 914 msg_ctx, msg_ctx->waiters, struct tevent_req *, 915 msg_ctx->num_waiters + msg_ctx->num_new_waiters); 916 if (tmp == NULL) { 917 DEBUG(1, ("%s: talloc failed\n", __func__)); 918 return false; 919 } 920 msg_ctx->waiters = tmp; 921 } 922 923 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters, 924 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters); 925 926 msg_ctx->num_waiters += msg_ctx->num_new_waiters; 927 msg_ctx->num_new_waiters = 0; 928 929 return true; 363 930 } 364 931 … … 366 933 Dispatch one messaging_rec 367 934 */ 368 void messaging_dispatch_rec(struct messaging_context *msg_ctx,369 935 static void messaging_dispatch_rec(struct messaging_context *msg_ctx, 936 struct messaging_rec *rec) 370 937 { 371 938 struct messaging_callback *cb, *next; 939 unsigned i; 940 size_t j; 372 941 373 942 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) { 374 943 next = cb->next; 375 if (cb->msg_type == rec->msg_type) { 376 cb->fn(msg_ctx, cb->private_data, rec->msg_type, 377 rec->src, &rec->buf); 378 /* we continue looking for matching messages 379 after finding one. This matters for 380 subsystems like the internal notify code 381 which register more than one handler for 382 the same message type */ 383 } 384 } 385 return; 944 if (cb->msg_type != rec->msg_type) { 945 continue; 946 } 947 948 /* 949 * the old style callbacks don't support fd passing 950 */ 951 for (j=0; j < rec->num_fds; j++) { 952 int fd = rec->fds[j]; 953 close(fd); 954 } 955 rec->num_fds = 0; 956 rec->fds = NULL; 957 958 cb->fn(msg_ctx, cb->private_data, rec->msg_type, 959 rec->src, &rec->buf); 960 961 /* 962 * we continue looking for matching messages after finding 963 * one. This matters for subsystems like the internal notify 964 * code which register more than one handler for the same 965 * message type 966 */ 967 } 968 969 if (!messaging_append_new_waiters(msg_ctx)) { 970 for (j=0; j < rec->num_fds; j++) { 971 int fd = rec->fds[j]; 972 close(fd); 973 } 974 rec->num_fds = 0; 975 rec->fds = NULL; 976 return; 977 } 978 979 i = 0; 980 while (i < msg_ctx->num_waiters) { 981 struct tevent_req *req; 982 struct messaging_filtered_read_state *state; 983 984 req = msg_ctx->waiters[i]; 985 if (req == NULL) { 986 /* 987 * This got cleaned up. In the meantime, 988 * move everything down one. We need 989 * to keep the order of waiters, as 990 * other code may depend on this. 991 */ 992 if (i < msg_ctx->num_waiters - 1) { 993 memmove(&msg_ctx->waiters[i], 994 &msg_ctx->waiters[i+1], 995 sizeof(struct tevent_req *) * 996 (msg_ctx->num_waiters - i - 1)); 997 } 998 msg_ctx->num_waiters -= 1; 999 continue; 1000 } 1001 1002 state = tevent_req_data( 1003 req, struct messaging_filtered_read_state); 1004 if (state->filter(rec, state->private_data)) { 1005 messaging_filtered_read_done(req, rec); 1006 1007 /* 1008 * Only the first one gets the fd-array 1009 */ 1010 rec->num_fds = 0; 1011 rec->fds = NULL; 1012 } 1013 1014 i += 1; 1015 } 1016 1017 /* 1018 * If the fd-array isn't used, just close it. 1019 */ 1020 for (j=0; j < rec->num_fds; j++) { 1021 int fd = rec->fds[j]; 1022 close(fd); 1023 } 1024 rec->num_fds = 0; 1025 rec->fds = NULL; 1026 } 1027 1028 static int mess_parent_dgm_cleanup(void *private_data); 1029 static void mess_parent_dgm_cleanup_done(struct tevent_req *req); 1030 1031 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg) 1032 { 1033 struct tevent_req *req; 1034 1035 req = background_job_send( 1036 msg, msg->event_ctx, msg, NULL, 0, 1037 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 1038 60*15), 1039 mess_parent_dgm_cleanup, msg); 1040 if (req == NULL) { 1041 return false; 1042 } 1043 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg); 1044 return true; 1045 } 1046 1047 static int mess_parent_dgm_cleanup(void *private_data) 1048 { 1049 int ret; 1050 1051 ret = messaging_dgm_wipe(); 1052 DEBUG(10, ("messaging_dgm_wipe returned %s\n", 1053 ret ? strerror(ret) : "ok")); 1054 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 1055 60*15); 1056 } 1057 1058 static void mess_parent_dgm_cleanup_done(struct tevent_req *req) 1059 { 1060 struct messaging_context *msg = tevent_req_callback_data( 1061 req, struct messaging_context); 1062 NTSTATUS status; 1063 1064 status = background_job_recv(req); 1065 TALLOC_FREE(req); 1066 DEBUG(1, ("messaging dgm cleanup job ended with %s\n", 1067 nt_errstr(status))); 1068 1069 req = background_job_send( 1070 msg, msg->event_ctx, msg, NULL, 0, 1071 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 1072 60*15), 1073 mess_parent_dgm_cleanup, msg); 1074 if (req == NULL) { 1075 DEBUG(1, ("background_job_send failed\n")); 1076 return; 1077 } 1078 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg); 1079 } 1080 1081 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid) 1082 { 1083 int ret; 1084 1085 if (pid == 0) { 1086 ret = messaging_dgm_wipe(); 1087 } else { 1088 ret = messaging_dgm_cleanup(pid); 1089 } 1090 1091 return ret; 1092 } 1093 1094 struct tevent_context *messaging_tevent_context( 1095 struct messaging_context *msg_ctx) 1096 { 1097 return msg_ctx->event_ctx; 1098 } 1099 1100 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx) 1101 { 1102 return msg_ctx->names_db; 386 1103 } 387 1104
Note:
See TracChangeset
for help on using the changeset viewer.