Changeset 988 for vendor/current/source4/lib/messaging
- Timestamp:
- Nov 24, 2016, 1:14:11 PM (9 years ago)
- Location:
- vendor/current/source4/lib/messaging
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/source4/lib/messaging/irpc.h
r740 r988 36 36 bool defer_reply; 37 37 bool no_reply; 38 struct messaging_context *msg_ctx;38 struct imessaging_context *msg_ctx; 39 39 struct irpc_list *irpc; 40 40 void *data; 41 struct tevent_context *ev;42 41 }; 43 42 … … 59 58 struct ndr_interface_table; 60 59 61 NTSTATUS irpc_register(struct messaging_context *msg_ctx,60 NTSTATUS irpc_register(struct imessaging_context *msg_ctx, 62 61 const struct ndr_interface_table *table, 63 62 int call, irpc_function_t fn, void *private_data); 64 63 65 64 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx, 66 structmessaging_context *msg_ctx,67 struct server_id server_id,68 const struct ndr_interface_table *table);65 struct imessaging_context *msg_ctx, 66 struct server_id server_id, 67 const struct ndr_interface_table *table); 69 68 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx, 70 structmessaging_context *msg_ctx,71 const char *dest_task,72 const struct ndr_interface_table *table);69 struct imessaging_context *msg_ctx, 70 const char *dest_task, 71 const struct ndr_interface_table *table); 73 72 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle *h, 74 73 struct security_token *token); 75 74 76 NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name); 77 struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, TALLOC_CTX *mem_ctx, const char *name); 78 void irpc_remove_name(struct messaging_context *msg_ctx, const char *name); 75 NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name); 76 NTSTATUS irpc_servers_byname(struct imessaging_context *msg_ctx, 77 TALLOC_CTX *mem_ctx, const char *name, 78 unsigned *num_servers, 79 struct server_id **servers); 80 struct irpc_name_records *irpc_all_servers(struct imessaging_context *msg_ctx, 81 TALLOC_CTX *mem_ctx); 82 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name); 79 83 NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status); 80 84 -
vendor/current/source4/lib/messaging/messaging.c
r740 r988 1 /* 1 /* 2 2 Unix SMB/CIFS implementation. 3 3 … … 5 5 6 6 Copyright (C) Andrew Tridgell 2004 7 7 8 8 This program is free software; you can redistribute it and/or modify 9 9 it under the terms of the GNU General Public License as published by 10 10 the Free Software Foundation; either version 3 of the License, or 11 11 (at your option) any later version. 12 12 13 13 This program is distributed in the hope that it will be useful, 14 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 16 GNU General Public License for more details. 17 17 18 18 You should have received a copy of the GNU General Public License 19 19 along with this program. If not, see <http://www.gnu.org/licenses/>. … … 28 28 #include "librpc/gen_ndr/ndr_irpc.h" 29 29 #include "lib/messaging/irpc.h" 30 #include "lib/util/tdb_wrap.h"31 30 #include "../lib/util/unix_privs.h" 32 31 #include "librpc/rpc/dcerpc.h" 33 #include <tdb.h>34 #include "../lib/util/util_tdb.h"35 32 #include "cluster/cluster.h" 36 33 #include "../lib/util/tevent_ntstatus.h" 34 #include "lib/param/param.h" 35 #include "lib/util/server_id_db.h" 36 #include "lib/util/talloc_report.h" 37 #include "../source3/lib/messages_dgm.h" 38 #include "../source3/lib/messages_dgm_ref.h" 39 #include "../source3/lib/messages_util.h" 40 #include <tdb.h> 37 41 38 42 /* change the message version with any incompatible changes in the protocol */ 39 #define MESSAGING_VERSION 143 #define IMESSAGING_VERSION 1 40 44 41 45 /* … … 43 47 */ 44 48 struct irpc_request { 45 struct messaging_context *msg_ctx;49 struct imessaging_context *msg_ctx; 46 50 int callid; 47 51 struct { … … 51 55 }; 52 56 53 struct messaging_context { 57 struct imessaging_context { 58 struct imessaging_context *prev, *next; 54 59 struct server_id server_id; 55 struct socket_context *sock; 56 const char *base_path; 57 const char *path; 60 const char *sock_dir; 61 const char *lock_dir; 58 62 struct dispatch_fn **dispatch; 59 63 uint32_t num_types; 60 64 struct idr_context *dispatch_tree; 61 struct messaging_rec *pending;62 struct messaging_rec *retry_queue;63 65 struct irpc_list *irpc; 64 66 struct idr_context *idr; 65 const char **names;67 struct server_id_db *names; 66 68 struct timeval start_time; 67 struct tevent_timer *retry_te; 68 struct { 69 struct tevent_context *ev; 70 struct tevent_fd *fde; 71 } event; 69 void *msg_dgm_ref; 72 70 }; 73 71 … … 82 80 83 81 /* an individual message */ 84 struct messaging_rec { 85 struct messaging_rec *next, *prev; 86 struct messaging_context *msg; 87 const char *path; 88 89 struct messaging_header { 90 uint32_t version; 91 uint32_t msg_type; 92 struct server_id from; 93 struct server_id to; 94 uint32_t length; 95 } *header; 96 97 DATA_BLOB packet; 98 uint32_t retries; 99 }; 100 101 102 static void irpc_handler(struct messaging_context *, void *, 82 83 static void irpc_handler(struct imessaging_context *, void *, 103 84 uint32_t, struct server_id, DATA_BLOB *); 104 85 … … 107 88 A useful function for testing the message system. 108 89 */ 109 static void ping_message(struct messaging_context *msg, void *private_data,90 static void ping_message(struct imessaging_context *msg, void *private_data, 110 91 uint32_t msg_type, struct server_id src, DATA_BLOB *data) 111 92 { 112 DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n", 113 (unsigned int)src.node, (unsigned int)src.id, (int)data->length, 93 struct server_id_buf idbuf; 94 DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n", 95 server_id_str_buf(src, &idbuf), (int)data->length, 114 96 data->data?(const char *)data->data:"")); 115 messaging_send(msg, src, MSG_PONG, data); 97 imessaging_send(msg, src, MSG_PONG, data); 98 } 99 100 static void pool_message(struct imessaging_context *msg, void *private_data, 101 uint32_t msg_type, struct server_id src, 102 DATA_BLOB *data) 103 { 104 char *report; 105 106 report = talloc_report_str(msg, NULL); 107 108 if (report != NULL) { 109 DATA_BLOB blob = { .data = (uint8_t *)report, 110 .length = talloc_get_size(report) - 1}; 111 imessaging_send(msg, src, MSG_POOL_USAGE, &blob); 112 } 113 talloc_free(report); 116 114 } 117 115 … … 119 117 return uptime of messaging server via irpc 120 118 */ 121 static NTSTATUS irpc_uptime(struct irpc_message *msg, 119 static NTSTATUS irpc_uptime(struct irpc_message *msg, 122 120 struct irpc_uptime *r) 123 121 { 124 struct messaging_context *ctx = talloc_get_type(msg->private_data, structmessaging_context);122 struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context); 125 123 *r->out.start_time = timeval_to_nttime(&ctx->start_time); 126 124 return NT_STATUS_OK; 127 125 } 128 126 129 /* 130 return the path to a messaging socket 131 */ 132 static char *messaging_path(struct messaging_context *msg, struct server_id server_id) 133 { 134 TALLOC_CTX *tmp_ctx = talloc_new(msg); 135 const char *id = cluster_id_string(tmp_ctx, server_id); 136 char *s; 137 if (id == NULL) { 138 return NULL; 139 } 140 s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id); 141 talloc_steal(s, tmp_ctx); 142 return s; 143 } 144 145 /* 146 dispatch a fully received message 147 148 note that this deliberately can match more than one message handler 149 per message. That allows a single messasging context to register 150 (for example) a debug handler for more than one piece of code 151 */ 152 static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec) 153 { 154 struct dispatch_fn *d, *next; 155 127 static struct dispatch_fn *imessaging_find_dispatch( 128 struct imessaging_context *msg, uint32_t msg_type) 129 { 156 130 /* temporary IDs use an idtree, the rest use a array of pointers */ 157 if (rec->header->msg_type >= MSG_TMP_BASE) { 158 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 159 rec->header->msg_type); 160 } else if (rec->header->msg_type < msg->num_types) { 161 d = msg->dispatch[rec->header->msg_type]; 162 } else { 163 d = NULL; 164 } 165 166 for (; d; d = next) { 167 DATA_BLOB data; 168 next = d->next; 169 data.data = rec->packet.data + sizeof(*rec->header); 170 data.length = rec->header->length; 171 d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data); 172 } 173 rec->header->length = 0; 174 } 175 176 /* 177 handler for messages that arrive from other nodes in the cluster 178 */ 179 static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet) 180 { 181 struct messaging_rec *rec; 182 183 rec = talloc(msg, struct messaging_rec); 184 if (rec == NULL) { 185 smb_panic("Unable to allocate messaging_rec"); 186 } 187 188 rec->msg = msg; 189 rec->path = msg->path; 190 rec->header = (struct messaging_header *)packet.data; 191 rec->packet = packet; 192 rec->retries = 0; 193 194 if (packet.length != sizeof(*rec->header) + rec->header->length) { 195 DEBUG(0,("messaging: bad message header size %d should be %d\n", 196 rec->header->length, (int)(packet.length - sizeof(*rec->header)))); 197 talloc_free(rec); 198 return; 199 } 200 201 messaging_dispatch(msg, rec); 202 talloc_free(rec); 203 } 204 205 206 207 /* 208 try to send the message 209 */ 210 static NTSTATUS try_send(struct messaging_rec *rec) 211 { 212 struct messaging_context *msg = rec->msg; 213 size_t nsent; 214 void *priv; 215 NTSTATUS status; 216 struct socket_address *path; 217 218 /* rec->path is the path of the *other* socket, where we want 219 * this to end up */ 220 path = socket_address_from_strings(msg, msg->sock->backend_name, 221 rec->path, 0); 222 if (!path) { 223 return NT_STATUS_NO_MEMORY; 224 } 225 226 /* we send with privileges so messages work from any context */ 227 priv = root_privileges(); 228 status = socket_sendto(msg->sock, &rec->packet, &nsent, path); 229 talloc_free(path); 230 talloc_free(priv); 231 232 return status; 233 } 234 235 /* 236 retry backed off messages 237 */ 238 static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, 239 struct timeval t, void *private_data) 240 { 241 struct messaging_context *msg = talloc_get_type(private_data, 242 struct messaging_context); 243 msg->retry_te = NULL; 244 245 /* put the messages back on the main queue */ 246 while (msg->retry_queue) { 247 struct messaging_rec *rec = msg->retry_queue; 248 DLIST_REMOVE(msg->retry_queue, rec); 249 DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); 250 } 251 252 EVENT_FD_WRITEABLE(msg->event.fde); 253 } 254 255 /* 256 handle a socket write event 257 */ 258 static void messaging_send_handler(struct messaging_context *msg) 259 { 260 while (msg->pending) { 261 struct messaging_rec *rec = msg->pending; 262 NTSTATUS status; 263 status = try_send(rec); 264 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { 265 rec->retries++; 266 if (rec->retries > 3) { 267 /* we're getting continuous write errors - 268 backoff this record */ 269 DLIST_REMOVE(msg->pending, rec); 270 DLIST_ADD_END(msg->retry_queue, rec, 271 struct messaging_rec *); 272 if (msg->retry_te == NULL) { 273 msg->retry_te = 274 event_add_timed(msg->event.ev, msg, 275 timeval_current_ofs(1, 0), 276 msg_retry_timer, msg); 277 } 278 } 279 break; 280 } 281 rec->retries = 0; 282 if (!NT_STATUS_IS_OK(status)) { 283 TALLOC_CTX *tmp_ctx = talloc_new(msg); 284 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 285 cluster_id_string(tmp_ctx, rec->header->from), 286 cluster_id_string(tmp_ctx, rec->header->to), 287 rec->header->msg_type, 288 nt_errstr(status))); 289 talloc_free(tmp_ctx); 290 } 291 DLIST_REMOVE(msg->pending, rec); 292 talloc_free(rec); 293 } 294 if (msg->pending == NULL) { 295 EVENT_FD_NOT_WRITEABLE(msg->event.fde); 296 } 297 } 298 299 /* 300 handle a new incoming packet 301 */ 302 static void messaging_recv_handler(struct messaging_context *msg) 303 { 304 struct messaging_rec *rec; 305 NTSTATUS status; 306 DATA_BLOB packet; 307 size_t msize; 308 309 /* see how many bytes are in the next packet */ 310 status = socket_pending(msg->sock, &msize); 311 if (!NT_STATUS_IS_OK(status)) { 312 DEBUG(0,("socket_pending failed in messaging - %s\n", 313 nt_errstr(status))); 314 return; 315 } 316 317 packet = data_blob_talloc(msg, NULL, msize); 318 if (packet.data == NULL) { 319 /* assume this is temporary and retry */ 320 return; 321 } 322 323 status = socket_recv(msg->sock, packet.data, msize, &msize); 324 if (!NT_STATUS_IS_OK(status)) { 325 data_blob_free(&packet); 326 return; 327 } 328 329 if (msize < sizeof(*rec->header)) { 330 DEBUG(0,("messaging: bad message of size %d\n", (int)msize)); 331 data_blob_free(&packet); 332 return; 333 } 334 335 rec = talloc(msg, struct messaging_rec); 336 if (rec == NULL) { 337 smb_panic("Unable to allocate messaging_rec"); 338 } 339 340 talloc_steal(rec, packet.data); 341 rec->msg = msg; 342 rec->path = msg->path; 343 rec->header = (struct messaging_header *)packet.data; 344 rec->packet = packet; 345 rec->retries = 0; 346 347 if (msize != sizeof(*rec->header) + rec->header->length) { 348 DEBUG(0,("messaging: bad message header size %d should be %d\n", 349 rec->header->length, (int)(msize - sizeof(*rec->header)))); 350 talloc_free(rec); 351 return; 352 } 353 354 messaging_dispatch(msg, rec); 355 talloc_free(rec); 356 } 357 358 359 /* 360 handle a socket event 361 */ 362 static void messaging_handler(struct tevent_context *ev, struct tevent_fd *fde, 363 uint16_t flags, void *private_data) 364 { 365 struct messaging_context *msg = talloc_get_type(private_data, 366 struct messaging_context); 367 if (flags & EVENT_FD_WRITE) { 368 messaging_send_handler(msg); 369 } 370 if (flags & EVENT_FD_READ) { 371 messaging_recv_handler(msg); 372 } 373 } 374 131 if (msg_type >= MSG_TMP_BASE) { 132 return (struct dispatch_fn *)idr_find(msg->dispatch_tree, 133 msg_type); 134 } 135 if (msg_type < msg->num_types) { 136 return msg->dispatch[msg_type]; 137 } 138 return NULL; 139 } 375 140 376 141 /* 377 142 Register a dispatch function for a particular message type. 378 143 */ 379 NTSTATUS messaging_register(structmessaging_context *msg, void *private_data,144 NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data, 380 145 uint32_t msg_type, msg_callback_t fn) 381 146 { … … 410 175 above MSG_TMP_BASE 411 176 */ 412 NTSTATUS messaging_register_tmp(structmessaging_context *msg, void *private_data,177 NTSTATUS imessaging_register_tmp(struct imessaging_context *msg, void *private_data, 413 178 msg_callback_t fn, uint32_t *msg_type) 414 179 { … … 436 201 De-register the function for a particular message type. 437 202 */ 438 void messaging_deregister(structmessaging_context *msg, uint32_t msg_type, void *private_data)203 void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, void *private_data) 439 204 { 440 205 struct dispatch_fn *d, *next; 441 206 442 207 if (msg_type >= msg->num_types) { 443 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 208 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 444 209 msg_type); 445 210 if (!d) return; … … 461 226 Send a message to a particular server 462 227 */ 463 NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,228 NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server, 464 229 uint32_t msg_type, const DATA_BLOB *data) 465 230 { 466 struct messaging_rec *rec; 467 NTSTATUS status; 468 size_t dlength = data?data->length:0; 469 470 rec = talloc(msg, struct messaging_rec); 471 if (rec == NULL) { 472 return NT_STATUS_NO_MEMORY; 473 } 474 475 rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength); 476 if (rec->packet.data == NULL) { 477 talloc_free(rec); 478 return NT_STATUS_NO_MEMORY; 479 } 480 481 rec->retries = 0; 482 rec->msg = msg; 483 rec->header = (struct messaging_header *)rec->packet.data; 484 /* zero padding */ 485 ZERO_STRUCTP(rec->header); 486 rec->header->version = MESSAGING_VERSION; 487 rec->header->msg_type = msg_type; 488 rec->header->from = msg->server_id; 489 rec->header->to = server; 490 rec->header->length = dlength; 491 if (dlength != 0) { 492 memcpy(rec->packet.data + sizeof(*rec->header), 493 data->data, dlength); 494 } 231 uint8_t hdr[MESSAGE_HDR_LENGTH]; 232 struct iovec iov[2]; 233 int num_iov, ret; 234 pid_t pid; 235 void *priv; 495 236 496 237 if (!cluster_node_equal(&msg->server_id, &server)) { 497 /* the destination is on another node - dispatch via 498 the cluster layer */ 499 status = cluster_message_send(server, &rec->packet); 500 talloc_free(rec); 501 return status; 502 } 503 504 rec->path = messaging_path(msg, server); 505 talloc_steal(rec, rec->path); 506 507 if (msg->pending != NULL) { 508 status = STATUS_MORE_ENTRIES; 509 } else { 510 status = try_send(rec); 511 } 512 513 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { 514 if (msg->pending == NULL) { 515 EVENT_FD_WRITEABLE(msg->event.fde); 516 } 517 DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); 238 /* No cluster in source4... */ 518 239 return NT_STATUS_OK; 519 240 } 520 241 521 talloc_free(rec); 522 523 return status; 242 message_hdr_put(hdr, msg_type, msg->server_id, server); 243 244 iov[0] = (struct iovec) { .iov_base = &hdr, .iov_len = sizeof(hdr) }; 245 num_iov = 1; 246 247 if (data != NULL) { 248 iov[1] = (struct iovec) { .iov_base = data->data, 249 .iov_len = data->length }; 250 num_iov += 1; 251 } 252 253 pid = server.pid; 254 if (pid == 0) { 255 pid = getpid(); 256 } 257 258 priv = root_privileges(); 259 ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0); 260 TALLOC_FREE(priv); 261 if (ret != 0) { 262 return map_nt_error_from_unix_common(ret); 263 } 264 return NT_STATUS_OK; 524 265 } 525 266 … … 527 268 Send a message to a particular server, with the message containing a single pointer 528 269 */ 529 NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server,270 NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server, 530 271 uint32_t msg_type, void *ptr) 531 272 { … … 535 276 blob.length = sizeof(void *); 536 277 537 return messaging_send(msg, server, msg_type, &blob); 538 } 539 540 541 /* 542 destroy the messaging context 543 */ 544 static int messaging_destructor(struct messaging_context *msg) 545 { 546 unlink(msg->path); 547 while (msg->names && msg->names[0]) { 548 irpc_remove_name(msg, msg->names[0]); 278 return imessaging_send(msg, server, msg_type, &blob); 279 } 280 281 282 /* 283 remove our messaging socket and database entry 284 */ 285 int imessaging_cleanup(struct imessaging_context *msg) 286 { 287 if (!msg) { 288 return 0; 549 289 } 550 290 return 0; 551 291 } 552 292 293 static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len, 294 int *fds, size_t num_fds, 295 void *private_data); 296 553 297 /* 554 298 create the listening socket and setup the dispatcher 555 */ 556 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 557 const char *dir, 558 struct server_id server_id, 559 struct tevent_context *ev) 560 { 561 struct messaging_context *msg; 562 NTSTATUS status; 563 struct socket_address *path; 299 300 use auto_remove=true when you want a destructor to remove the 301 associated messaging socket and database entry on talloc free. Don't 302 use this in processes that may fork and a child may talloc free this 303 memory 304 */ 305 struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, 306 struct loadparm_context *lp_ctx, 307 struct server_id server_id, 308 struct tevent_context *ev, 309 bool auto_remove) 310 { 311 struct imessaging_context *msg; 312 bool ok; 313 int ret; 314 const char *lock_dir = NULL; 315 int tdb_flags = TDB_INCOMPATIBLE_HASH | TDB_CLEAR_IF_FIRST; 564 316 565 317 if (ev == NULL) { … … 567 319 } 568 320 569 msg = talloc_zero(mem_ctx, struct messaging_context);321 msg = talloc_zero(mem_ctx, struct imessaging_context); 570 322 if (msg == NULL) { 571 323 return NULL; 572 324 } 573 325 574 /* setup a handler for messages from other cluster nodes, if appropriate */575 status = cluster_message_init(msg, server_id, cluster_message_handler);576 if (!NT_STATUS_IS_OK(status)) {577 talloc_free(msg);578 return NULL;579 }580 581 326 /* create the messaging directory if needed */ 582 mkdir(dir, 0700); 583 584 msg->base_path = talloc_reference(msg, dir); 585 msg->path = messaging_path(msg, server_id); 327 328 lock_dir = lpcfg_lock_directory(lp_ctx); 329 if (lock_dir == NULL) { 330 goto fail; 331 } 332 333 msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "msg.sock"); 334 if (msg->sock_dir == NULL) { 335 goto fail; 336 } 337 ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700); 338 if (!ok) { 339 goto fail; 340 } 341 342 msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg.lock"); 343 if (msg->lock_dir == NULL) { 344 goto fail; 345 } 346 ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755); 347 if (!ok) { 348 goto fail; 349 } 350 351 msg->msg_dgm_ref = messaging_dgm_ref( 352 msg, ev, &server_id.unique_id, msg->sock_dir, msg->lock_dir, 353 imessaging_dgm_recv, msg, &ret); 354 355 if (msg->msg_dgm_ref == NULL) { 356 goto fail; 357 } 358 586 359 msg->server_id = server_id; 587 360 msg->idr = idr_init(msg); 361 if (msg->idr == NULL) { 362 goto fail; 363 } 364 588 365 msg->dispatch_tree = idr_init(msg); 366 if (msg->dispatch_tree == NULL) { 367 goto fail; 368 } 369 589 370 msg->start_time = timeval_current(); 590 371 591 status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); 592 if (!NT_STATUS_IS_OK(status)) { 593 talloc_free(msg); 594 return NULL; 595 } 596 597 /* by stealing here we ensure that the socket is cleaned up (and even 598 deleted) on exit */ 599 talloc_steal(msg, msg->sock); 600 601 path = socket_address_from_strings(msg, msg->sock->backend_name, 602 msg->path, 0); 603 if (!path) { 604 talloc_free(msg); 605 return NULL; 606 } 607 608 status = socket_listen(msg->sock, path, 50, 0); 609 if (!NT_STATUS_IS_OK(status)) { 610 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status))); 611 talloc_free(msg); 612 return NULL; 613 } 614 615 /* it needs to be non blocking for sends */ 616 set_blocking(socket_get_fd(msg->sock), false); 617 618 msg->event.ev = ev; 619 msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock), 620 EVENT_FD_READ, messaging_handler, msg); 621 tevent_fd_set_auto_close(msg->event.fde); 622 623 talloc_set_destructor(msg, messaging_destructor); 624 625 messaging_register(msg, NULL, MSG_PING, ping_message); 626 messaging_register(msg, NULL, MSG_IRPC, irpc_handler); 372 tdb_flags |= lpcfg_tdb_flags(lp_ctx, 0); 373 374 msg->names = server_id_db_init(msg, server_id, lock_dir, 0, tdb_flags); 375 if (msg->names == NULL) { 376 goto fail; 377 } 378 379 if (auto_remove) { 380 talloc_set_destructor(msg, imessaging_cleanup); 381 } 382 383 imessaging_register(msg, NULL, MSG_PING, ping_message); 384 imessaging_register(msg, NULL, MSG_REQ_POOL_USAGE, pool_message); 385 imessaging_register(msg, NULL, MSG_IRPC, irpc_handler); 627 386 IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg); 628 387 629 388 return msg; 630 } 631 632 /* 633 A hack, for the short term until we get 'client only' messaging in place 634 */ 635 struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 636 const char *dir, 389 fail: 390 talloc_free(msg); 391 return NULL; 392 } 393 394 static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len, 395 int *fds, size_t num_fds, 396 void *private_data) 397 { 398 struct imessaging_context *msg = talloc_get_type_abort( 399 private_data, struct imessaging_context); 400 uint32_t msg_type; 401 struct server_id src, dst; 402 struct server_id_buf srcbuf, dstbuf; 403 DATA_BLOB data; 404 405 if (buf_len < MESSAGE_HDR_LENGTH) { 406 /* Invalid message, ignore */ 407 return; 408 } 409 410 message_hdr_get(&msg_type, &src, &dst, buf); 411 412 data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH); 413 data.length = buf_len - MESSAGE_HDR_LENGTH; 414 415 if ((cluster_id_equal(&dst, &msg->server_id)) || 416 ((dst.task_id == 0) && (msg->server_id.pid == 0))) { 417 struct dispatch_fn *d, *next; 418 419 DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n", 420 __func__, 421 server_id_str_buf(dst, &dstbuf), 422 server_id_str_buf(msg->server_id, &srcbuf), 423 (unsigned)msg_type)); 424 425 d = imessaging_find_dispatch(msg, msg_type); 426 427 for (; d; d = next) { 428 next = d->next; 429 d->fn(msg, d->private_data, d->msg_type, src, &data); 430 } 431 } else { 432 DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n", 433 __func__, (unsigned)msg_type, 434 server_id_str_buf(dst, &dstbuf), 435 server_id_str_buf(msg->server_id, &srcbuf))); 436 } 437 } 438 439 /* 440 A hack, for the short term until we get 'client only' messaging in place 441 */ 442 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx, 443 struct loadparm_context *lp_ctx, 637 444 struct tevent_context *ev) 638 445 { 639 446 struct server_id id; 640 447 ZERO_STRUCT(id); 641 id.id = random() % 0x10000000; 642 return messaging_init(mem_ctx, dir, id, ev); 448 id.pid = getpid(); 449 id.task_id = generate_random(); 450 id.vnn = NONCLUSTER_VNN; 451 452 /* This is because we are not in the s3 serverid database */ 453 id.unique_id = SERVERID_UNIQUE_ID_NOT_TO_VERIFY; 454 455 return imessaging_init(mem_ctx, lp_ctx, id, ev, true); 643 456 } 644 457 /* … … 658 471 register a irpc server function 659 472 */ 660 NTSTATUS irpc_register(struct messaging_context *msg_ctx,661 const struct ndr_interface_table *table, 473 NTSTATUS irpc_register(struct imessaging_context *msg_ctx, 474 const struct ndr_interface_table *table, 662 475 int callnum, irpc_function_t fn, void *private_data) 663 476 { … … 689 502 handle an incoming irpc reply message 690 503 */ 691 static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m)504 static void irpc_handler_reply(struct imessaging_context *msg_ctx, struct irpc_message *m) 692 505 { 693 506 struct irpc_request *irpc; … … 735 548 /* send the reply message */ 736 549 packet = ndr_push_blob(push); 737 status = messaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);550 status = imessaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet); 738 551 if (!NT_STATUS_IS_OK(status)) goto failed; 739 552 … … 746 559 handle an incoming irpc request message 747 560 */ 748 static void irpc_handler_request(struct messaging_context *msg_ctx,561 static void irpc_handler_request(struct imessaging_context *msg_ctx, 749 562 struct irpc_message *m) 750 563 { … … 784 597 m->irpc = i; 785 598 m->data = r; 786 m->ev = msg_ctx->event.ev;787 599 788 600 m->header.status = i->fn(m, r); … … 810 622 handle an incoming irpc message 811 623 */ 812 static void irpc_handler(struct messaging_context *msg_ctx, void *private_data,624 static void irpc_handler(struct imessaging_context *msg_ctx, void *private_data, 813 625 uint32_t msg_type, struct server_id src, DATA_BLOB *packet) 814 626 { … … 855 667 856 668 /* 857 open the naming database 858 */ 859 static struct tdb_wrap *irpc_namedb_open(struct messaging_context *msg_ctx) 860 { 861 struct tdb_wrap *t; 862 char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path); 863 if (path == NULL) { 669 add a string name that this irpc server can be called on 670 */ 671 NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name) 672 { 673 int ret; 674 675 ret = server_id_db_add(msg_ctx->names, name); 676 if (ret != 0) { 677 return map_nt_error_from_unix_common(ret); 678 } 679 return NT_STATUS_OK; 680 } 681 682 /* 683 return a list of server ids for a server name 684 */ 685 NTSTATUS irpc_servers_byname(struct imessaging_context *msg_ctx, 686 TALLOC_CTX *mem_ctx, const char *name, 687 unsigned *num_servers, 688 struct server_id **servers) 689 { 690 int ret; 691 692 ret = server_id_db_lookup(msg_ctx->names, name, mem_ctx, 693 num_servers, servers); 694 if (ret != 0) { 695 return map_nt_error_from_unix_common(ret); 696 } 697 return NT_STATUS_OK; 698 } 699 700 static int all_servers_func(const char *name, unsigned num_servers, 701 const struct server_id *servers, 702 void *private_data) 703 { 704 struct irpc_name_records *name_records = talloc_get_type( 705 private_data, struct irpc_name_records); 706 struct irpc_name_record *name_record; 707 int i; 708 709 name_records->names 710 = talloc_realloc(name_records, name_records->names, 711 struct irpc_name_record *, name_records->num_records+1); 712 if (!name_records->names) { 713 return -1; 714 } 715 716 name_records->names[name_records->num_records] = name_record 717 = talloc(name_records->names, 718 struct irpc_name_record); 719 if (!name_record) { 720 return -1; 721 } 722 723 name_records->num_records++; 724 725 name_record->name = talloc_strdup(name_record, name); 726 if (!name_record->name) { 727 return -1; 728 } 729 730 name_record->count = num_servers; 731 name_record->ids = talloc_array(name_record, struct server_id, 732 num_servers); 733 if (name_record->ids == NULL) { 734 return -1; 735 } 736 for (i=0;i<name_record->count;i++) { 737 name_record->ids[i] = servers[i]; 738 } 739 return 0; 740 } 741 742 /* 743 return a list of server ids for a server name 744 */ 745 struct irpc_name_records *irpc_all_servers(struct imessaging_context *msg_ctx, 746 TALLOC_CTX *mem_ctx) 747 { 748 int ret; 749 struct irpc_name_records *name_records = talloc_zero(mem_ctx, struct irpc_name_records); 750 if (name_records == NULL) { 864 751 return NULL; 865 752 } 866 t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660); 867 talloc_free(path); 868 return t; 869 } 870 871 872 /* 873 add a string name that this irpc server can be called on 874 */ 875 NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name) 876 { 877 struct tdb_wrap *t; 878 TDB_DATA rec; 879 int count; 880 NTSTATUS status = NT_STATUS_OK; 881 882 t = irpc_namedb_open(msg_ctx); 883 NT_STATUS_HAVE_NO_MEMORY(t); 884 885 if (tdb_lock_bystring(t->tdb, name) != 0) { 886 talloc_free(t); 887 return NT_STATUS_LOCK_NOT_GRANTED; 888 } 889 rec = tdb_fetch_bystring(t->tdb, name); 890 count = rec.dsize / sizeof(struct server_id); 891 rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1); 892 rec.dsize += sizeof(struct server_id); 893 if (rec.dptr == NULL) { 894 tdb_unlock_bystring(t->tdb, name); 895 talloc_free(t); 896 return NT_STATUS_NO_MEMORY; 897 } 898 ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id; 899 if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) { 900 status = NT_STATUS_INTERNAL_ERROR; 901 } 902 free(rec.dptr); 903 tdb_unlock_bystring(t->tdb, name); 904 talloc_free(t); 905 906 msg_ctx->names = str_list_add(msg_ctx->names, name); 907 talloc_steal(msg_ctx, msg_ctx->names); 908 909 return status; 910 } 911 912 /* 913 return a list of server ids for a server name 914 */ 915 struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, 916 TALLOC_CTX *mem_ctx, 917 const char *name) 918 { 919 struct tdb_wrap *t; 920 TDB_DATA rec; 921 int count, i; 922 struct server_id *ret; 923 924 t = irpc_namedb_open(msg_ctx); 925 if (t == NULL) { 753 754 ret = server_id_db_traverse_read(msg_ctx->names, all_servers_func, 755 name_records); 756 if (ret == -1) { 757 TALLOC_FREE(name_records); 926 758 return NULL; 927 759 } 928 760 929 if (tdb_lock_bystring(t->tdb, name) != 0) { 930 talloc_free(t); 931 return NULL; 932 } 933 rec = tdb_fetch_bystring(t->tdb, name); 934 if (rec.dptr == NULL) { 935 tdb_unlock_bystring(t->tdb, name); 936 talloc_free(t); 937 return NULL; 938 } 939 count = rec.dsize / sizeof(struct server_id); 940 ret = talloc_array(mem_ctx, struct server_id, count+1); 941 if (ret == NULL) { 942 tdb_unlock_bystring(t->tdb, name); 943 talloc_free(t); 944 return NULL; 945 } 946 for (i=0;i<count;i++) { 947 ret[i] = ((struct server_id *)rec.dptr)[i]; 948 } 949 ret[i] = cluster_id(0, 0); 950 free(rec.dptr); 951 tdb_unlock_bystring(t->tdb, name); 952 talloc_free(t); 953 954 return ret; 761 return name_records; 955 762 } 956 763 … … 958 765 remove a name from a messaging context 959 766 */ 960 void irpc_remove_name(struct messaging_context *msg_ctx, const char *name) 961 { 962 struct tdb_wrap *t; 963 TDB_DATA rec; 964 int count, i; 965 struct server_id *ids; 966 967 str_list_remove(msg_ctx->names, name); 968 969 t = irpc_namedb_open(msg_ctx); 970 if (t == NULL) { 971 return; 972 } 973 974 if (tdb_lock_bystring(t->tdb, name) != 0) { 975 talloc_free(t); 976 return; 977 } 978 rec = tdb_fetch_bystring(t->tdb, name); 979 if (rec.dptr == NULL) { 980 tdb_unlock_bystring(t->tdb, name); 981 talloc_free(t); 982 return; 983 } 984 count = rec.dsize / sizeof(struct server_id); 985 if (count == 0) { 986 free(rec.dptr); 987 tdb_unlock_bystring(t->tdb, name); 988 talloc_free(t); 989 return; 990 } 991 ids = (struct server_id *)rec.dptr; 992 for (i=0;i<count;i++) { 993 if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) { 994 if (i < count-1) { 995 memmove(ids+i, ids+i+1, 996 sizeof(struct server_id) * (count-(i+1))); 997 } 998 rec.dsize -= sizeof(struct server_id); 999 break; 1000 } 1001 } 1002 tdb_store_bystring(t->tdb, name, rec, 0); 1003 free(rec.dptr); 1004 tdb_unlock_bystring(t->tdb, name); 1005 talloc_free(t); 1006 } 1007 1008 struct server_id messaging_get_server_id(struct messaging_context *msg_ctx) 767 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name) 768 { 769 server_id_db_remove(msg_ctx->names, name); 770 } 771 772 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx) 1009 773 { 1010 774 return msg_ctx->server_id; … … 1012 776 1013 777 struct irpc_bh_state { 1014 struct messaging_context *msg_ctx;778 struct imessaging_context *msg_ctx; 1015 779 struct server_id server_id; 1016 780 const struct ndr_interface_table *table; … … 1085 849 ok = irpc_bh_is_connected(h); 1086 850 if (!ok) { 1087 tevent_req_nterror(req, NT_STATUS_ INVALID_CONNECTION);851 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED); 1088 852 return tevent_req_post(req, ev); 1089 853 } … … 1138 902 /* and send it */ 1139 903 state->in_packet = ndr_push_blob(ndr); 1140 status = messaging_send(hs->msg_ctx, hs->server_id,904 status = imessaging_send(hs->msg_ctx, hs->server_id, 1141 905 MSG_IRPC, &state->in_packet); 1142 906 if (!NT_STATUS_IS_OK(status)) { … … 1177 941 m->ndr->data_size - m->ndr->offset); 1178 942 if ((m->ndr->data_size - m->ndr->offset) > 0 && !state->out_data.data) { 1179 tevent_req_ nomem(NULL,req);943 tevent_req_oom(req); 1180 944 return; 1181 945 } … … 1229 993 ok = irpc_bh_is_connected(h); 1230 994 if (!ok) { 1231 tevent_req_nterror(req, NT_STATUS_ INVALID_CONNECTION);995 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED); 1232 996 return tevent_req_post(req, ev); 1233 997 } … … 1271 1035 /* initialise a irpc binding handle */ 1272 1036 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx, 1273 structmessaging_context *msg_ctx,1274 struct server_id server_id,1275 const struct ndr_interface_table *table)1037 struct imessaging_context *msg_ctx, 1038 struct server_id server_id, 1039 const struct ndr_interface_table *table) 1276 1040 { 1277 1041 struct dcerpc_binding_handle *h; … … 1293 1057 hs->timeout = IRPC_CALL_TIMEOUT; 1294 1058 1295 dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev);1296 1297 1059 return h; 1298 1060 } 1299 1061 1300 1062 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx, 1301 structmessaging_context *msg_ctx,1302 const char *dest_task,1303 const struct ndr_interface_table *table)1063 struct imessaging_context *msg_ctx, 1064 const char *dest_task, 1065 const struct ndr_interface_table *table) 1304 1066 { 1305 1067 struct dcerpc_binding_handle *h; 1068 unsigned num_sids; 1306 1069 struct server_id *sids; 1307 1070 struct server_id sid; 1071 NTSTATUS status; 1308 1072 1309 1073 /* find the server task */ 1310 sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task); 1311 if (sids == NULL) { 1312 errno = EADDRNOTAVAIL; 1313 return NULL; 1314 } 1315 if (sids[0].id == 0) { 1316 talloc_free(sids); 1074 1075 status = irpc_servers_byname(msg_ctx, mem_ctx, dest_task, 1076 &num_sids, &sids); 1077 if (!NT_STATUS_IS_OK(status)) { 1317 1078 errno = EADDRNOTAVAIL; 1318 1079 return NULL; -
vendor/current/source4/lib/messaging/messaging.h
r740 r988 19 19 */ 20 20 21 #ifndef _ MESSAGES_H_22 #define _ MESSAGES_H_21 #ifndef _SOURCE4_LIB_MESSAGING_MESSAGES_H_ 22 #define _SOURCE4_LIB_MESSAGING_MESSAGES_H_ 23 23 24 #include "librpc/gen_ndr/server_id4.h" 24 #include "librpc/gen_ndr/server_id.h" 25 #include "librpc/gen_ndr/messaging.h" 25 26 26 struct messaging_context; 27 28 /* general messages */ 29 #define MSG_DEBUG 1 30 #define MSG_PING 2 31 #define MSG_PONG 3 32 #define MSG_BRL_RETRY 4 33 #define MSG_PVFS_RETRY_OPEN 5 34 #define MSG_IRPC 6 35 #define MSG_PVFS_NOTIFY 7 36 #define MSG_NTVFS_OPLOCK_BREAK 8 37 #define MSG_DREPL_ALLOCATE_RID 9 38 39 /* temporary messaging endpoints are allocated above this line */ 40 #define MSG_TMP_BASE 1000 27 struct imessaging_context; 41 28 42 29 /* taskid for messaging of parent process */ 43 30 #define SAMBA_PARENT_TASKID 0 44 31 45 typedef void (*msg_callback_t)(struct messaging_context *msg, void *private_data,32 typedef void (*msg_callback_t)(struct imessaging_context *msg, void *private_data, 46 33 uint32_t msg_type, 47 34 struct server_id server_id, DATA_BLOB *data); 48 35 49 NTSTATUS messaging_send(structmessaging_context *msg, struct server_id server,36 NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server, 50 37 uint32_t msg_type, const DATA_BLOB *data); 51 NTSTATUS messaging_register(structmessaging_context *msg, void *private_data,38 NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data, 52 39 uint32_t msg_type, 53 40 msg_callback_t fn); 54 NTSTATUS messaging_register_tmp(structmessaging_context *msg, void *private_data,41 NTSTATUS imessaging_register_tmp(struct imessaging_context *msg, void *private_data, 55 42 msg_callback_t fn, uint32_t *msg_type); 56 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 57 const char *dir, 58 struct server_id server_id, 43 struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, 44 struct loadparm_context *lp_ctx, 45 struct server_id server_id, 46 struct tevent_context *ev, 47 bool auto_remove); 48 int imessaging_cleanup(struct imessaging_context *msg); 49 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx, 50 struct loadparm_context *lp_ctx, 59 51 struct tevent_context *ev); 60 struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 61 const char *dir, 62 struct tevent_context *ev); 63 NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, 52 NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server, 64 53 uint32_t msg_type, void *ptr); 65 void messaging_deregister(structmessaging_context *msg, uint32_t msg_type, void *private_data);66 struct server_id messaging_get_server_id(structmessaging_context *msg_ctx);54 void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, void *private_data); 55 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx); 67 56 68 57 #endif -
vendor/current/source4/lib/messaging/pymessaging.c
r740 r988 22 22 #include <Python.h> 23 23 #include "includes.h" 24 #include " scripting/python/modules.h"24 #include "python/modules.h" 25 25 #include "libcli/util/pyerrors.h" 26 26 #include "librpc/rpc/pyrpc_util.h" 27 27 #include "librpc/ndr/libndr.h" 28 28 #include "lib/messaging/messaging.h" 29 #include "lib/messaging/irpc.h" 29 30 #include "lib/events/events.h" 30 31 #include "cluster/cluster.h" … … 32 33 #include "param/pyparam.h" 33 34 #include "librpc/rpc/dcerpc.h" 34 #include "librpc/gen_ndr/server_id4.h" 35 #include "librpc/gen_ndr/server_id.h" 36 #include <pytalloc.h> 35 37 36 38 void initmessaging(void); 37 39 38 extern PyTypeObject messaging_Type;40 extern PyTypeObject imessaging_Type; 39 41 40 42 static bool server_id_from_py(PyObject *object, struct server_id *server_id) 41 43 { 42 44 if (!PyTuple_Check(object)) { 43 PyErr_SetString(PyExc_ValueError, "Expected tuple"); 44 return false; 45 } 46 45 if (!py_check_dcerpc_type(object, "samba.dcerpc.server_id", "server_id")) { 46 47 PyErr_SetString(PyExc_ValueError, "Expected tuple or server_id"); 48 return false; 49 } 50 *server_id = *pytalloc_get_type(object, struct server_id); 51 return true; 52 } 47 53 if (PyTuple_Size(object) == 3) { 48 return PyArg_ParseTuple(object, "iii", &server_id->id, &server_id->id2, &server_id->node); 54 unsigned long long pid; 55 int task_id, vnn; 56 57 if (!PyArg_ParseTuple(object, "KII", &pid, &task_id, &vnn)) { 58 return false; 59 } 60 server_id->pid = pid; 61 server_id->task_id = task_id; 62 server_id->vnn = vnn; 63 return true; 49 64 } else { 50 int id, id2; 51 if (!PyArg_ParseTuple(object, "ii", &id, &id2)) 65 unsigned long long pid; 66 int task_id; 67 if (!PyArg_ParseTuple(object, "KI", &pid, &task_id)) 52 68 return false; 53 *server_id = cluster_id( id, id2);69 *server_id = cluster_id(pid, task_id); 54 70 return true; 55 71 } … … 59 75 PyObject_HEAD 60 76 TALLOC_CTX *mem_ctx; 61 struct messaging_context *msg_ctx;62 } messaging_Object;63 64 static PyObject *py_ messaging_connect(PyTypeObject *self, PyObject *args, PyObject *kwargs)77 struct imessaging_context *msg_ctx; 78 } imessaging_Object; 79 80 static PyObject *py_imessaging_connect(PyTypeObject *self, PyObject *args, PyObject *kwargs) 65 81 { 66 82 struct tevent_context *ev; 67 const char *kwnames[] = { "own_id", " messaging_path", NULL };83 const char *kwnames[] = { "own_id", "lp_ctx", NULL }; 68 84 PyObject *own_id = Py_None; 69 const char *messaging_path = NULL; 70 messaging_Object *ret; 71 72 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Oz:connect", 73 discard_const_p(char *, kwnames), &own_id, &messaging_path)) { 74 return NULL; 75 } 76 77 ret = PyObject_New(messaging_Object, &messaging_Type); 85 PyObject *py_lp_ctx = Py_None; 86 imessaging_Object *ret; 87 struct loadparm_context *lp_ctx; 88 89 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO:connect", 90 discard_const_p(char *, kwnames), &own_id, &py_lp_ctx)) { 91 return NULL; 92 } 93 94 ret = PyObject_New(imessaging_Object, &imessaging_Type); 78 95 if (ret == NULL) 79 96 return NULL; … … 81 98 ret->mem_ctx = talloc_new(NULL); 82 99 100 lp_ctx = lpcfg_from_py_object(ret->mem_ctx, py_lp_ctx); 101 if (lp_ctx == NULL) { 102 PyErr_SetString(PyExc_RuntimeError, "imessaging_connect unable to interpret loadparm_context"); 103 talloc_free(ret->mem_ctx); 104 return NULL; 105 } 106 83 107 ev = s4_event_context_init(ret->mem_ctx); 84 85 if (messaging_path == NULL) {86 messaging_path = lpcfg_messaging_path(ret->mem_ctx,87 py_default_loadparm_context(ret->mem_ctx));88 } else {89 messaging_path = talloc_strdup(ret->mem_ctx, messaging_path);90 }91 108 92 109 if (own_id != Py_None) { … … 96 113 return NULL; 97 114 98 ret->msg_ctx = messaging_init(ret->mem_ctx,99 messaging_path,100 server_id,101 ev);115 ret->msg_ctx = imessaging_init(ret->mem_ctx, 116 lp_ctx, 117 server_id, 118 ev, true); 102 119 } else { 103 ret->msg_ctx = messaging_client_init(ret->mem_ctx,104 messaging_path,105 ev);120 ret->msg_ctx = imessaging_client_init(ret->mem_ctx, 121 lp_ctx, 122 ev); 106 123 } 107 124 108 125 if (ret->msg_ctx == NULL) { 109 PyErr_SetString(PyExc_RuntimeError, " messaging_connect unable to create a messaging context");126 PyErr_SetString(PyExc_RuntimeError, "imessaging_connect unable to create a messaging context"); 110 127 talloc_free(ret->mem_ctx); 111 128 return NULL; … … 115 132 } 116 133 117 static void py_ messaging_dealloc(PyObject *self)118 { 119 messaging_Object *iface = (messaging_Object *)self;134 static void py_imessaging_dealloc(PyObject *self) 135 { 136 imessaging_Object *iface = (imessaging_Object *)self; 120 137 talloc_free(iface->msg_ctx); 121 138 self->ob_type->tp_free(self); 122 139 } 123 140 124 static PyObject *py_ messaging_send(PyObject *self, PyObject *args, PyObject *kwargs)125 { 126 messaging_Object *iface = (messaging_Object *)self;141 static PyObject *py_imessaging_send(PyObject *self, PyObject *args, PyObject *kwargs) 142 { 143 imessaging_Object *iface = (imessaging_Object *)self; 127 144 uint32_t msg_type; 128 145 DATA_BLOB data; … … 131 148 struct server_id server; 132 149 const char *kwnames[] = { "target", "msg_type", "data", NULL }; 133 int length;150 Py_ssize_t length; 134 151 135 152 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "Ois#:send", … … 144 161 return NULL; 145 162 146 status = messaging_send(iface->msg_ctx, server, msg_type, &data);163 status = imessaging_send(iface->msg_ctx, server, msg_type, &data); 147 164 if (NT_STATUS_IS_ERR(status)) { 148 165 PyErr_SetNTSTATUS(status); … … 153 170 } 154 171 155 static void py_msg_callback_wrapper(struct messaging_context *msg, void *private_data,172 static void py_msg_callback_wrapper(struct imessaging_context *msg, void *private_data, 156 173 uint32_t msg_type, 157 174 struct server_id server_id, DATA_BLOB *data) 158 175 { 159 PyObject *callback = (PyObject *)private_data; 160 161 PyObject_CallFunction(callback, discard_const_p(char, "i(iii)s#"), msg_type, 162 server_id.id, server_id.id2, server_id.node, 176 PyObject *py_server_id, *callback = (PyObject *)private_data; 177 178 struct server_id *p_server_id = talloc(NULL, struct server_id); 179 if (!p_server_id) { 180 PyErr_NoMemory(); 181 return; 182 } 183 *p_server_id = server_id; 184 185 py_server_id = py_return_ndr_struct("samba.dcerpc.server_id", "server_id", p_server_id, p_server_id); 186 talloc_unlink(NULL, p_server_id); 187 188 PyObject_CallFunction(callback, discard_const_p(char, "i(O)s#"), msg_type, 189 py_server_id, 163 190 data->data, data->length); 164 191 } 165 192 166 static PyObject *py_ messaging_register(PyObject *self, PyObject *args, PyObject *kwargs)167 { 168 messaging_Object *iface = (messaging_Object *)self;193 static PyObject *py_imessaging_register(PyObject *self, PyObject *args, PyObject *kwargs) 194 { 195 imessaging_Object *iface = (imessaging_Object *)self; 169 196 int msg_type = -1; 170 197 PyObject *callback; … … 181 208 if (msg_type == -1) { 182 209 uint32_t msg_type32 = msg_type; 183 status = messaging_register_tmp(iface->msg_ctx, callback,210 status = imessaging_register_tmp(iface->msg_ctx, callback, 184 211 py_msg_callback_wrapper, &msg_type32); 185 212 msg_type = msg_type32; 186 213 } else { 187 status = messaging_register(iface->msg_ctx, callback,214 status = imessaging_register(iface->msg_ctx, callback, 188 215 msg_type, py_msg_callback_wrapper); 189 216 } … … 196 223 } 197 224 198 static PyObject *py_ messaging_deregister(PyObject *self, PyObject *args, PyObject *kwargs)199 { 200 messaging_Object *iface = (messaging_Object *)self;225 static PyObject *py_imessaging_deregister(PyObject *self, PyObject *args, PyObject *kwargs) 226 { 227 imessaging_Object *iface = (imessaging_Object *)self; 201 228 int msg_type = -1; 202 229 PyObject *callback; … … 208 235 } 209 236 210 messaging_deregister(iface->msg_ctx, msg_type, callback);237 imessaging_deregister(iface->msg_ctx, msg_type, callback); 211 238 212 239 Py_DECREF(callback); … … 215 242 } 216 243 217 static PyMethodDef py_messaging_methods[] = { 218 { "send", (PyCFunction)py_messaging_send, METH_VARARGS|METH_KEYWORDS, 244 static PyObject *py_irpc_servers_byname(PyObject *self, PyObject *args, PyObject *kwargs) 245 { 246 imessaging_Object *iface = (imessaging_Object *)self; 247 char *server_name; 248 unsigned i, num_ids; 249 struct server_id *ids; 250 PyObject *pylist; 251 TALLOC_CTX *mem_ctx = talloc_new(NULL); 252 NTSTATUS status; 253 254 if (!mem_ctx) { 255 PyErr_NoMemory(); 256 return NULL; 257 } 258 259 if (!PyArg_ParseTuple(args, "s", &server_name)) { 260 TALLOC_FREE(mem_ctx); 261 return NULL; 262 } 263 264 status = irpc_servers_byname(iface->msg_ctx, mem_ctx, server_name, 265 &num_ids, &ids); 266 if (!NT_STATUS_IS_OK(status)) { 267 TALLOC_FREE(mem_ctx); 268 PyErr_SetString(PyExc_KeyError, "No such name"); 269 return NULL; 270 } 271 272 pylist = PyList_New(num_ids); 273 if (pylist == NULL) { 274 TALLOC_FREE(mem_ctx); 275 PyErr_NoMemory(); 276 return NULL; 277 } 278 for (i = 0; i < num_ids; i++) { 279 PyObject *py_server_id; 280 struct server_id *p_server_id = talloc(NULL, struct server_id); 281 if (!p_server_id) { 282 PyErr_NoMemory(); 283 return NULL; 284 } 285 *p_server_id = ids[i]; 286 287 py_server_id = py_return_ndr_struct("samba.dcerpc.server_id", "server_id", p_server_id, p_server_id); 288 if (!py_server_id) { 289 return NULL; 290 } 291 PyList_SetItem(pylist, i, py_server_id); 292 talloc_unlink(NULL, p_server_id); 293 } 294 TALLOC_FREE(mem_ctx); 295 return pylist; 296 } 297 298 static PyObject *py_irpc_all_servers(PyObject *self, PyObject *args, PyObject *kwargs) 299 { 300 imessaging_Object *iface = (imessaging_Object *)self; 301 PyObject *pylist; 302 int i; 303 struct irpc_name_records *records; 304 TALLOC_CTX *mem_ctx = talloc_new(NULL); 305 if (!mem_ctx) { 306 PyErr_NoMemory(); 307 return NULL; 308 } 309 310 records = irpc_all_servers(iface->msg_ctx, mem_ctx); 311 if (records == NULL) { 312 return NULL; 313 } 314 315 pylist = PyList_New(records->num_records); 316 if (pylist == NULL) { 317 TALLOC_FREE(mem_ctx); 318 PyErr_NoMemory(); 319 return NULL; 320 } 321 for (i = 0; i < records->num_records; i++) { 322 PyObject *py_name_record 323 = py_return_ndr_struct("samba.dcerpc.irpc", 324 "name_record", 325 records->names[i], 326 records->names[i]); 327 if (!py_name_record) { 328 return NULL; 329 } 330 PyList_SetItem(pylist, i, 331 py_name_record); 332 } 333 TALLOC_FREE(mem_ctx); 334 return pylist; 335 } 336 337 static PyMethodDef py_imessaging_methods[] = { 338 { "send", (PyCFunction)py_imessaging_send, METH_VARARGS|METH_KEYWORDS, 219 339 "S.send(target, msg_type, data) -> None\nSend a message" }, 220 { "register", (PyCFunction)py_ messaging_register, METH_VARARGS|METH_KEYWORDS,340 { "register", (PyCFunction)py_imessaging_register, METH_VARARGS|METH_KEYWORDS, 221 341 "S.register(callback, msg_type=None) -> msg_type\nRegister a message handler" }, 222 { "deregister", (PyCFunction)py_ messaging_deregister, METH_VARARGS|METH_KEYWORDS,342 { "deregister", (PyCFunction)py_imessaging_deregister, METH_VARARGS|METH_KEYWORDS, 223 343 "S.deregister(callback, msg_type) -> None\nDeregister a message handler" }, 344 { "irpc_servers_byname", (PyCFunction)py_irpc_servers_byname, METH_VARARGS, 345 "S.irpc_servers_byname(name) -> list\nGet list of server_id values that are registered for a particular name" }, 346 { "irpc_all_servers", (PyCFunction)py_irpc_all_servers, METH_NOARGS, 347 "S.irpc_servers_byname() -> list\nGet list of all registered names and the associated server_id values" }, 224 348 { NULL, NULL, 0, NULL } 225 349 }; 226 350 227 static PyObject *py_messaging_server_id(PyObject *obj, void *closure) 228 { 229 messaging_Object *iface = (messaging_Object *)obj; 230 struct server_id server_id = messaging_get_server_id(iface->msg_ctx); 231 232 return Py_BuildValue("(iii)", server_id.id, server_id.id2, 233 server_id.node); 234 } 235 236 static PyGetSetDef py_messaging_getset[] = { 237 { discard_const_p(char, "server_id"), py_messaging_server_id, NULL, 351 static PyObject *py_imessaging_server_id(PyObject *obj, void *closure) 352 { 353 imessaging_Object *iface = (imessaging_Object *)obj; 354 PyObject *py_server_id; 355 struct server_id server_id = imessaging_get_server_id(iface->msg_ctx); 356 struct server_id *p_server_id = talloc(NULL, struct server_id); 357 if (!p_server_id) { 358 PyErr_NoMemory(); 359 return NULL; 360 } 361 *p_server_id = server_id; 362 363 py_server_id = py_return_ndr_struct("samba.dcerpc.server_id", "server_id", p_server_id, p_server_id); 364 talloc_unlink(NULL, p_server_id); 365 366 return py_server_id; 367 } 368 369 static PyGetSetDef py_imessaging_getset[] = { 370 { discard_const_p(char, "server_id"), py_imessaging_server_id, NULL, 238 371 discard_const_p(char, "local server id") }, 239 372 { NULL }, … … 241 374 242 375 243 PyTypeObject messaging_Type = {376 PyTypeObject imessaging_Type = { 244 377 PyObject_HEAD_INIT(NULL) 0, 245 378 .tp_name = "messaging.Messaging", 246 .tp_basicsize = sizeof( messaging_Object),379 .tp_basicsize = sizeof(imessaging_Object), 247 380 .tp_flags = Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE, 248 .tp_new = py_messaging_connect, 249 .tp_dealloc = py_messaging_dealloc, 250 .tp_methods = py_messaging_methods, 251 .tp_getset = py_messaging_getset, 252 .tp_doc = "Messaging(own_id=None, messaging_path=None)\n" \ 253 "Create a new object that can be used to communicate with the peers in the specified messaging path.\n" \ 254 "If no path is specified, the default path from smb.conf will be used." 381 .tp_new = py_imessaging_connect, 382 .tp_dealloc = py_imessaging_dealloc, 383 .tp_methods = py_imessaging_methods, 384 .tp_getset = py_imessaging_getset, 385 .tp_doc = "Messaging(own_id=None)\n" \ 386 "Create a new object that can be used to communicate with the peers in the specified messaging path.\n" 255 387 }; 256 388 … … 259 391 PyObject *mod; 260 392 261 if (PyType_Ready(& messaging_Type) < 0)393 if (PyType_Ready(&imessaging_Type) < 0) 262 394 return; 263 395 … … 266 398 return; 267 399 268 Py_INCREF((PyObject *)& messaging_Type);269 PyModule_AddObject(mod, "Messaging", (PyObject *)& messaging_Type);270 } 400 Py_INCREF((PyObject *)&imessaging_Type); 401 PyModule_AddObject(mod, "Messaging", (PyObject *)&imessaging_Type); 402 } -
vendor/current/source4/lib/messaging/tests/irpc.c
r740 r988 28 28 #include "cluster/cluster.h" 29 29 #include "param/param.h" 30 #include "torture/local/proto.h" 30 31 31 32 const uint32_t MSG_ID1 = 1, MSG_ID2 = 2; … … 35 36 struct irpc_test_data 36 37 { 37 struct messaging_context *msg_ctx1, *msg_ctx2;38 struct imessaging_context *msg_ctx1, *msg_ctx2; 38 39 struct tevent_context *ev; 39 40 }; … … 74 75 static NTSTATUS irpc_EchoData(struct irpc_message *irpc, struct echo_EchoData *r) 75 76 { 77 struct irpc_test_data *data = talloc_get_type_abort(irpc->private_data, struct irpc_test_data); 76 78 irpc->defer_reply = true; 77 event_add_timed(irpc->ev, irpc, timeval_zero(), deferred_echodata, irpc);79 tevent_add_timer(data->ev, irpc, timeval_zero(), deferred_echodata, irpc); 78 80 return NT_STATUS_OK; 79 81 } … … 101 103 102 104 test_debug = true; 105 /* 106 * Note: this makes use of nested event loops 107 * as client and server use the same loop. 108 */ 109 dcerpc_binding_handle_set_sync_ev(irpc_handle, data->ev); 103 110 status = dcerpc_echo_AddOne_r(irpc_handle, test, &r); 104 111 test_debug = false; … … 135 142 r.in.len = strlen((char *)r.in.in_data); 136 143 144 /* 145 * Note: this makes use of nested event loops 146 * as client and server use the same loop. 147 */ 148 dcerpc_binding_handle_set_sync_ev(irpc_handle, data->ev); 137 149 status = dcerpc_echo_EchoData_r(irpc_handle, mem_ctx, &r); 138 150 torture_assert_ntstatus_ok(tctx, status, "EchoData failed"); … … 219 231 220 232 while (ping_count > pong_count + 20) { 221 event_loop_once(data->ev);233 tevent_loop_once(data->ev); 222 234 } 223 235 } … … 226 238 ping_count - pong_count, pong_count); 227 239 while (timeval_elapsed(&tv) < 30 && pong_count < ping_count) { 228 event_loop_once(data->ev);240 tevent_loop_once(data->ev); 229 241 } 230 242 … … 247 259 data->ev = tctx->ev; 248 260 torture_assert(tctx, data->msg_ctx1 = 249 messaging_init(tctx,250 lpcfg_messaging_path(tctx, tctx->lp_ctx),261 imessaging_init(tctx, 262 tctx->lp_ctx, 251 263 cluster_id(0, MSG_ID1), 252 data->ev ),264 data->ev, true), 253 265 "Failed to init first messaging context"); 254 266 255 267 torture_assert(tctx, data->msg_ctx2 = 256 messaging_init(tctx,257 lpcfg_messaging_path(tctx, tctx->lp_ctx),268 imessaging_init(tctx, 269 tctx->lp_ctx, 258 270 cluster_id(0, MSG_ID2), 259 data->ev ),271 data->ev, true), 260 272 "Failed to init second messaging context"); 261 273 262 274 /* register the server side function */ 263 IRPC_REGISTER(data->msg_ctx1, rpcecho, ECHO_ADDONE, irpc_AddOne, NULL);264 IRPC_REGISTER(data->msg_ctx2, rpcecho, ECHO_ADDONE, irpc_AddOne, NULL);265 266 IRPC_REGISTER(data->msg_ctx1, rpcecho, ECHO_ECHODATA, irpc_EchoData, NULL);267 IRPC_REGISTER(data->msg_ctx2, rpcecho, ECHO_ECHODATA, irpc_EchoData, NULL);275 IRPC_REGISTER(data->msg_ctx1, rpcecho, ECHO_ADDONE, irpc_AddOne, data); 276 IRPC_REGISTER(data->msg_ctx2, rpcecho, ECHO_ADDONE, irpc_AddOne, data); 277 278 IRPC_REGISTER(data->msg_ctx1, rpcecho, ECHO_ECHODATA, irpc_EchoData, data); 279 IRPC_REGISTER(data->msg_ctx2, rpcecho, ECHO_ECHODATA, irpc_EchoData, data); 268 280 269 281 return true; -
vendor/current/source4/lib/messaging/tests/messaging.c
r740 r988 26 26 #include "cluster/cluster.h" 27 27 #include "param/param.h" 28 28 #include "torture/local/proto.h" 29 29 30 30 static uint32_t msg_pong; 31 31 32 static void ping_message(struct messaging_context *msg, void *private_data,32 static void ping_message(struct imessaging_context *msg, void *private_data, 33 33 uint32_t msg_type, struct server_id src, DATA_BLOB *data) 34 34 { 35 35 NTSTATUS status; 36 status = messaging_send(msg, src, msg_pong, data);36 status = imessaging_send(msg, src, msg_pong, data); 37 37 if (!NT_STATUS_IS_OK(status)) { 38 38 printf("pong failed - %s\n", nt_errstr(status)); … … 40 40 } 41 41 42 static void pong_message(struct messaging_context *msg, void *private_data,42 static void pong_message(struct imessaging_context *msg, void *private_data, 43 43 uint32_t msg_type, struct server_id src, DATA_BLOB *data) 44 44 { … … 47 47 } 48 48 49 static void exit_message(struct messaging_context *msg, void *private_data,49 static void exit_message(struct imessaging_context *msg, void *private_data, 50 50 uint32_t msg_type, struct server_id src, DATA_BLOB *data) 51 51 { … … 60 60 { 61 61 struct tevent_context *ev; 62 struct messaging_context *msg_client_ctx;63 struct messaging_context *msg_server_ctx;62 struct imessaging_context *msg_client_ctx; 63 struct imessaging_context *msg_server_ctx; 64 64 int ping_count = 0; 65 65 int pong_count = 0; … … 72 72 ev = tctx->ev; 73 73 74 msg_server_ctx = messaging_init(tctx,75 lpcfg_messaging_path(tctx, tctx->lp_ctx), cluster_id(0, 1),76 ev);74 msg_server_ctx = imessaging_init(tctx, 75 tctx->lp_ctx, cluster_id(0, 1), 76 ev, true); 77 77 78 78 torture_assert(tctx, msg_server_ctx != NULL, "Failed to init ping messaging context"); 79 79 80 messaging_register_tmp(msg_server_ctx, NULL, ping_message, &msg_ping);81 messaging_register_tmp(msg_server_ctx, tctx, exit_message, &msg_exit);80 imessaging_register_tmp(msg_server_ctx, NULL, ping_message, &msg_ping); 81 imessaging_register_tmp(msg_server_ctx, tctx, exit_message, &msg_exit); 82 82 83 msg_client_ctx = messaging_init(tctx,84 lpcfg_messaging_path(tctx, tctx->lp_ctx),85 cluster_id(0, 2),86 ev);83 msg_client_ctx = imessaging_init(tctx, 84 tctx->lp_ctx, 85 cluster_id(0, 2), 86 ev, true); 87 87 88 88 torture_assert(tctx, msg_client_ctx != NULL, 89 "msg_client_ctx messaging_init() failed");89 "msg_client_ctx imessaging_init() failed"); 90 90 91 messaging_register_tmp(msg_client_ctx, &pong_count, pong_message, &msg_pong);91 imessaging_register_tmp(msg_client_ctx, &pong_count, pong_message, &msg_pong); 92 92 93 93 tv = timeval_current(); … … 101 101 data.length = strlen((const char *)data.data); 102 102 103 status1 = messaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, &data);104 status2 = messaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, NULL);103 status1 = imessaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, &data); 104 status2 = imessaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, NULL); 105 105 106 106 torture_assert_ntstatus_ok(tctx, status1, "msg1 failed"); … … 111 111 112 112 while (ping_count > pong_count + 20) { 113 event_loop_once(ev);113 tevent_loop_once(ev); 114 114 } 115 115 } … … 118 118 ping_count - pong_count, pong_count); 119 119 while (timeval_elapsed(&tv) < 30 && pong_count < ping_count) { 120 event_loop_once(ev);120 tevent_loop_once(ev); 121 121 } 122 122 123 123 torture_comment(tctx, "sending exit\n"); 124 messaging_send(msg_client_ctx, cluster_id(0, 1), msg_exit, NULL);124 imessaging_send(msg_client_ctx, cluster_id(0, 1), msg_exit, NULL); 125 125 126 126 torture_assert_int_equal(tctx, ping_count, pong_count, "ping test failed"); -
vendor/current/source4/lib/messaging/wscript_build
r740 r988 2 2 3 3 4 bld.SAMBA_ SUBSYSTEM('MESSAGING',4 bld.SAMBA_LIBRARY('MESSAGING', 5 5 source='messaging.c', 6 public_deps='samba-util tdb-wrap NDR_IRPC UNIX_PRIVS UTIL_TDB cluster ndr samba_socket dcerpc' 6 public_deps='samba-util NDR_IRPC UNIX_PRIVS cluster ndr dcerpc messages_util server_id_db talloc_report', 7 private_library=True 7 8 ) 8 9 9 10 10 bld.SAMBA_PYTHON('python_messaging', 11 11 source='pymessaging.c', 12 deps='MESSAGING events pyparam_util ',12 deps='MESSAGING events pyparam_util pytalloc-util', 13 13 realname='samba/messaging.so' 14 14 )
Note:
See TracChangeset
for help on using the changeset viewer.