Changeset 745 for trunk/server/source4/lib/messaging/messaging.c
- Timestamp:
- Nov 27, 2012, 4:43:17 PM (13 years ago)
- Location:
- trunk/server
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/server
- Property svn:mergeinfo changed
/vendor/current merged: 581,587,591,594,597,600,615,618,740
- Property svn:mergeinfo changed
-
trunk/server/source4/lib/messaging/messaging.c
r414 r745 28 28 #include "librpc/gen_ndr/ndr_irpc.h" 29 29 #include "lib/messaging/irpc.h" 30 #include " tdb_wrap.h"30 #include "lib/util/tdb_wrap.h" 31 31 #include "../lib/util/unix_privs.h" 32 32 #include "librpc/rpc/dcerpc.h" 33 #include "../tdb/include/tdb.h"33 #include <tdb.h> 34 34 #include "../lib/util/util_tdb.h" 35 35 #include "cluster/cluster.h" 36 #include "../lib/util/tevent_ntstatus.h" 36 37 37 38 /* change the message version with any incompatible changes in the protocol */ 38 39 #define MESSAGING_VERSION 1 40 41 /* 42 a pending irpc call 43 */ 44 struct irpc_request { 45 struct messaging_context *msg_ctx; 46 int callid; 47 struct { 48 void (*handler)(struct irpc_request *irpc, struct irpc_message *m); 49 void *private_data; 50 } incoming; 51 }; 39 52 40 53 struct messaging_context { … … 48 61 struct messaging_rec *pending; 49 62 struct messaging_rec *retry_queue; 50 struct smb_iconv_convenience *iconv_convenience;51 63 struct irpc_list *irpc; 52 64 struct idr_context *idr; … … 99 111 { 100 112 DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n", 101 (u int_t)src.node, (uint_t)src.id, (int)data->length,113 (unsigned int)src.node, (unsigned int)src.id, (int)data->length, 102 114 data->data?(const char *)data->data:"")); 103 115 messaging_send(msg, src, MSG_PONG, data); … … 120 132 static char *messaging_path(struct messaging_context *msg, struct server_id server_id) 121 133 { 122 return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, 123 cluster_id_string(msg, server_id)); 134 TALLOC_CTX *tmp_ctx = talloc_new(msg); 135 const char *id = cluster_id_string(tmp_ctx, server_id); 136 char *s; 137 if (id == NULL) { 138 return NULL; 139 } 140 s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id); 141 talloc_steal(s, tmp_ctx); 142 return s; 124 143 } 125 144 … … 262 281 rec->retries = 0; 263 282 if (!NT_STATUS_IS_OK(status)) { 283 TALLOC_CTX *tmp_ctx = talloc_new(msg); 264 284 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 265 cluster_id_string( debug_ctx(), rec->header->from),266 cluster_id_string( debug_ctx(), rec->header->to),285 cluster_id_string(tmp_ctx, rec->header->from), 286 cluster_id_string(tmp_ctx, rec->header->to), 267 287 rec->header->msg_type, 268 288 nt_errstr(status))); 289 talloc_free(tmp_ctx); 269 290 } 270 291 DLIST_REMOVE(msg->pending, rec); … … 441 462 */ 442 463 NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, 443 uint32_t msg_type, DATA_BLOB *data)464 uint32_t msg_type, const DATA_BLOB *data) 444 465 { 445 466 struct messaging_rec *rec; … … 536 557 const char *dir, 537 558 struct server_id server_id, 538 struct smb_iconv_convenience *iconv_convenience,539 559 struct tevent_context *ev) 540 560 { … … 565 585 msg->path = messaging_path(msg, server_id); 566 586 msg->server_id = server_id; 567 msg->iconv_convenience = iconv_convenience;568 587 msg->idr = idr_init(msg); 569 588 msg->dispatch_tree = idr_init(msg); … … 600 619 msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock), 601 620 EVENT_FD_READ, messaging_handler, msg); 621 tevent_fd_set_auto_close(msg->event.fde); 602 622 603 623 talloc_set_destructor(msg, messaging_destructor); … … 615 635 struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 616 636 const char *dir, 617 struct smb_iconv_convenience *iconv_convenience,618 637 struct tevent_context *ev) 619 638 { … … 621 640 ZERO_STRUCT(id); 622 641 id.id = random() % 0x10000000; 623 return messaging_init(mem_ctx, dir, id, iconv_convenience,ev);642 return messaging_init(mem_ctx, dir, id, ev); 624 643 } 625 644 /* … … 673 692 { 674 693 struct irpc_request *irpc; 675 enum ndr_err_code ndr_err;676 694 677 695 irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid); 678 696 if (irpc == NULL) return; 679 697 680 /* parse the reply data */ 681 ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r); 682 if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { 683 irpc->status = m->header.status; 684 talloc_steal(irpc->mem_ctx, m); 685 } else { 686 irpc->status = ndr_map_error2ntstatus(ndr_err); 687 talloc_steal(irpc, m); 688 } 689 irpc->done = true; 690 if (irpc->async.fn) { 691 irpc->async.fn(irpc); 692 } 698 irpc->incoming.handler(irpc, m); 693 699 } 694 700 … … 705 711 706 712 /* setup the reply */ 707 push = ndr_push_init_ctx(m->ndr , m->msg_ctx->iconv_convenience);713 push = ndr_push_init_ctx(m->ndr); 708 714 if (push == NULL) { 709 715 status = NT_STATUS_NO_MEMORY; … … 712 718 713 719 m->header.flags |= IRPC_FLAG_REPLY; 720 m->header.creds.token= NULL; 714 721 715 722 /* construct the packet */ … … 764 771 if (r == NULL) goto failed; 765 772 773 m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC; 774 766 775 /* parse the request data */ 767 776 ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r); … … 771 780 m->private_data= i->private_data; 772 781 m->defer_reply = false; 782 m->no_reply = false; 773 783 m->msg_ctx = msg_ctx; 774 784 m->irpc = i; … … 778 788 m->header.status = i->fn(m, r); 779 789 790 if (m->no_reply) { 791 /* the server function won't ever be replying to this request */ 792 talloc_free(m); 793 return; 794 } 795 780 796 if (m->defer_reply) { 781 797 /* the server function has asked to defer the reply to later */ … … 805 821 m->from = src; 806 822 807 m->ndr = ndr_pull_init_blob(packet, m , msg_ctx->iconv_convenience);823 m->ndr = ndr_pull_init_blob(packet, m); 808 824 if (m->ndr == NULL) goto failed; 809 825 … … 835 851 } 836 852 837 if (irpc->reject_free) {838 return -1;839 }840 853 return 0; 841 }842 843 /*844 timeout a irpc request845 */846 static void irpc_timeout(struct tevent_context *ev, struct tevent_timer *te,847 struct timeval t, void *private_data)848 {849 struct irpc_request *irpc = talloc_get_type(private_data, struct irpc_request);850 irpc->status = NT_STATUS_IO_TIMEOUT;851 irpc->done = true;852 if (irpc->async.fn) {853 irpc->async.fn(irpc);854 }855 }856 857 858 /*859 make a irpc call - async send860 */861 struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,862 struct server_id server_id,863 const struct ndr_interface_table *table,864 int callnum, void *r, TALLOC_CTX *ctx)865 {866 struct irpc_header header;867 struct ndr_push *ndr;868 NTSTATUS status;869 DATA_BLOB packet;870 struct irpc_request *irpc;871 enum ndr_err_code ndr_err;872 873 irpc = talloc(msg_ctx, struct irpc_request);874 if (irpc == NULL) goto failed;875 876 irpc->msg_ctx = msg_ctx;877 irpc->table = table;878 irpc->callnum = callnum;879 irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);880 if (irpc->callid == -1) goto failed;881 irpc->r = r;882 irpc->done = false;883 irpc->async.fn = NULL;884 irpc->mem_ctx = ctx;885 irpc->reject_free = false;886 887 talloc_set_destructor(irpc, irpc_destructor);888 889 /* setup the header */890 header.uuid = table->syntax_id.uuid;891 892 header.if_version = table->syntax_id.if_version;893 header.callid = irpc->callid;894 header.callnum = callnum;895 header.flags = 0;896 header.status = NT_STATUS_OK;897 898 /* construct the irpc packet */899 ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience);900 if (ndr == NULL) goto failed;901 902 ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);903 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;904 905 ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r);906 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;907 908 /* and send it */909 packet = ndr_push_blob(ndr);910 status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet);911 if (!NT_STATUS_IS_OK(status)) goto failed;912 913 event_add_timed(msg_ctx->event.ev, irpc,914 timeval_current_ofs(IRPC_CALL_TIMEOUT, 0),915 irpc_timeout, irpc);916 917 talloc_free(ndr);918 return irpc;919 920 failed:921 talloc_free(irpc);922 return NULL;923 }924 925 /*926 wait for a irpc reply927 */928 NTSTATUS irpc_call_recv(struct irpc_request *irpc)929 {930 NTSTATUS status;931 932 NT_STATUS_HAVE_NO_MEMORY(irpc);933 934 irpc->reject_free = true;935 936 while (!irpc->done) {937 if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {938 return NT_STATUS_CONNECTION_DISCONNECTED;939 }940 }941 942 irpc->reject_free = false;943 944 status = irpc->status;945 talloc_free(irpc);946 return status;947 }948 949 /*950 perform a synchronous irpc request951 */952 NTSTATUS irpc_call(struct messaging_context *msg_ctx,953 struct server_id server_id,954 const struct ndr_interface_table *table,955 int callnum, void *r,956 TALLOC_CTX *mem_ctx)957 {958 struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id,959 table, callnum, r, mem_ctx);960 return irpc_call_recv(irpc);961 854 } 962 855 … … 1117 1010 return msg_ctx->server_id; 1118 1011 } 1012 1013 struct irpc_bh_state { 1014 struct messaging_context *msg_ctx; 1015 struct server_id server_id; 1016 const struct ndr_interface_table *table; 1017 uint32_t timeout; 1018 struct security_token *token; 1019 }; 1020 1021 static bool irpc_bh_is_connected(struct dcerpc_binding_handle *h) 1022 { 1023 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h, 1024 struct irpc_bh_state); 1025 1026 if (!hs->msg_ctx) { 1027 return false; 1028 } 1029 1030 return true; 1031 } 1032 1033 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle *h, 1034 uint32_t timeout) 1035 { 1036 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h, 1037 struct irpc_bh_state); 1038 uint32_t old = hs->timeout; 1039 1040 hs->timeout = timeout; 1041 1042 return old; 1043 } 1044 1045 struct irpc_bh_raw_call_state { 1046 struct irpc_request *irpc; 1047 uint32_t opnum; 1048 DATA_BLOB in_data; 1049 DATA_BLOB in_packet; 1050 DATA_BLOB out_data; 1051 }; 1052 1053 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc, 1054 struct irpc_message *m); 1055 1056 static struct tevent_req *irpc_bh_raw_call_send(TALLOC_CTX *mem_ctx, 1057 struct tevent_context *ev, 1058 struct dcerpc_binding_handle *h, 1059 const struct GUID *object, 1060 uint32_t opnum, 1061 uint32_t in_flags, 1062 const uint8_t *in_data, 1063 size_t in_length) 1064 { 1065 struct irpc_bh_state *hs = 1066 dcerpc_binding_handle_data(h, 1067 struct irpc_bh_state); 1068 struct tevent_req *req; 1069 struct irpc_bh_raw_call_state *state; 1070 bool ok; 1071 struct irpc_header header; 1072 struct ndr_push *ndr; 1073 NTSTATUS status; 1074 enum ndr_err_code ndr_err; 1075 1076 req = tevent_req_create(mem_ctx, &state, 1077 struct irpc_bh_raw_call_state); 1078 if (req == NULL) { 1079 return NULL; 1080 } 1081 state->opnum = opnum; 1082 state->in_data.data = discard_const_p(uint8_t, in_data); 1083 state->in_data.length = in_length; 1084 1085 ok = irpc_bh_is_connected(h); 1086 if (!ok) { 1087 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION); 1088 return tevent_req_post(req, ev); 1089 } 1090 1091 state->irpc = talloc_zero(state, struct irpc_request); 1092 if (tevent_req_nomem(state->irpc, req)) { 1093 return tevent_req_post(req, ev); 1094 } 1095 1096 state->irpc->msg_ctx = hs->msg_ctx; 1097 state->irpc->callid = idr_get_new(hs->msg_ctx->idr, 1098 state->irpc, UINT16_MAX); 1099 if (state->irpc->callid == -1) { 1100 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES); 1101 return tevent_req_post(req, ev); 1102 } 1103 state->irpc->incoming.handler = irpc_bh_raw_call_incoming_handler; 1104 state->irpc->incoming.private_data = req; 1105 1106 talloc_set_destructor(state->irpc, irpc_destructor); 1107 1108 /* setup the header */ 1109 header.uuid = hs->table->syntax_id.uuid; 1110 1111 header.if_version = hs->table->syntax_id.if_version; 1112 header.callid = state->irpc->callid; 1113 header.callnum = state->opnum; 1114 header.flags = 0; 1115 header.status = NT_STATUS_OK; 1116 header.creds.token= hs->token; 1117 1118 /* construct the irpc packet */ 1119 ndr = ndr_push_init_ctx(state->irpc); 1120 if (tevent_req_nomem(ndr, req)) { 1121 return tevent_req_post(req, ev); 1122 } 1123 1124 ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header); 1125 status = ndr_map_error2ntstatus(ndr_err); 1126 if (!NT_STATUS_IS_OK(status)) { 1127 tevent_req_nterror(req, status); 1128 return tevent_req_post(req, ev); 1129 } 1130 1131 ndr_err = ndr_push_bytes(ndr, in_data, in_length); 1132 status = ndr_map_error2ntstatus(ndr_err); 1133 if (!NT_STATUS_IS_OK(status)) { 1134 tevent_req_nterror(req, status); 1135 return tevent_req_post(req, ev); 1136 } 1137 1138 /* and send it */ 1139 state->in_packet = ndr_push_blob(ndr); 1140 status = messaging_send(hs->msg_ctx, hs->server_id, 1141 MSG_IRPC, &state->in_packet); 1142 if (!NT_STATUS_IS_OK(status)) { 1143 tevent_req_nterror(req, status); 1144 return tevent_req_post(req, ev); 1145 } 1146 1147 if (hs->timeout != IRPC_CALL_TIMEOUT_INF) { 1148 /* set timeout-callback in case caller wants that */ 1149 ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(hs->timeout, 0)); 1150 if (!ok) { 1151 return tevent_req_post(req, ev); 1152 } 1153 } 1154 1155 return req; 1156 } 1157 1158 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc, 1159 struct irpc_message *m) 1160 { 1161 struct tevent_req *req = 1162 talloc_get_type_abort(irpc->incoming.private_data, 1163 struct tevent_req); 1164 struct irpc_bh_raw_call_state *state = 1165 tevent_req_data(req, 1166 struct irpc_bh_raw_call_state); 1167 1168 talloc_steal(state, m); 1169 1170 if (!NT_STATUS_IS_OK(m->header.status)) { 1171 tevent_req_nterror(req, m->header.status); 1172 return; 1173 } 1174 1175 state->out_data = data_blob_talloc(state, 1176 m->ndr->data + m->ndr->offset, 1177 m->ndr->data_size - m->ndr->offset); 1178 if ((m->ndr->data_size - m->ndr->offset) > 0 && !state->out_data.data) { 1179 tevent_req_nomem(NULL, req); 1180 return; 1181 } 1182 1183 tevent_req_done(req); 1184 } 1185 1186 static NTSTATUS irpc_bh_raw_call_recv(struct tevent_req *req, 1187 TALLOC_CTX *mem_ctx, 1188 uint8_t **out_data, 1189 size_t *out_length, 1190 uint32_t *out_flags) 1191 { 1192 struct irpc_bh_raw_call_state *state = 1193 tevent_req_data(req, 1194 struct irpc_bh_raw_call_state); 1195 NTSTATUS status; 1196 1197 if (tevent_req_is_nterror(req, &status)) { 1198 tevent_req_received(req); 1199 return status; 1200 } 1201 1202 *out_data = talloc_move(mem_ctx, &state->out_data.data); 1203 *out_length = state->out_data.length; 1204 *out_flags = 0; 1205 tevent_req_received(req); 1206 return NT_STATUS_OK; 1207 } 1208 1209 struct irpc_bh_disconnect_state { 1210 uint8_t _dummy; 1211 }; 1212 1213 static struct tevent_req *irpc_bh_disconnect_send(TALLOC_CTX *mem_ctx, 1214 struct tevent_context *ev, 1215 struct dcerpc_binding_handle *h) 1216 { 1217 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h, 1218 struct irpc_bh_state); 1219 struct tevent_req *req; 1220 struct irpc_bh_disconnect_state *state; 1221 bool ok; 1222 1223 req = tevent_req_create(mem_ctx, &state, 1224 struct irpc_bh_disconnect_state); 1225 if (req == NULL) { 1226 return NULL; 1227 } 1228 1229 ok = irpc_bh_is_connected(h); 1230 if (!ok) { 1231 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION); 1232 return tevent_req_post(req, ev); 1233 } 1234 1235 hs->msg_ctx = NULL; 1236 1237 tevent_req_done(req); 1238 return tevent_req_post(req, ev); 1239 } 1240 1241 static NTSTATUS irpc_bh_disconnect_recv(struct tevent_req *req) 1242 { 1243 NTSTATUS status; 1244 1245 if (tevent_req_is_nterror(req, &status)) { 1246 tevent_req_received(req); 1247 return status; 1248 } 1249 1250 tevent_req_received(req); 1251 return NT_STATUS_OK; 1252 } 1253 1254 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle *h) 1255 { 1256 return true; 1257 } 1258 1259 static const struct dcerpc_binding_handle_ops irpc_bh_ops = { 1260 .name = "wbint", 1261 .is_connected = irpc_bh_is_connected, 1262 .set_timeout = irpc_bh_set_timeout, 1263 .raw_call_send = irpc_bh_raw_call_send, 1264 .raw_call_recv = irpc_bh_raw_call_recv, 1265 .disconnect_send = irpc_bh_disconnect_send, 1266 .disconnect_recv = irpc_bh_disconnect_recv, 1267 1268 .ref_alloc = irpc_bh_ref_alloc, 1269 }; 1270 1271 /* initialise a irpc binding handle */ 1272 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx, 1273 struct messaging_context *msg_ctx, 1274 struct server_id server_id, 1275 const struct ndr_interface_table *table) 1276 { 1277 struct dcerpc_binding_handle *h; 1278 struct irpc_bh_state *hs; 1279 1280 h = dcerpc_binding_handle_create(mem_ctx, 1281 &irpc_bh_ops, 1282 NULL, 1283 table, 1284 &hs, 1285 struct irpc_bh_state, 1286 __location__); 1287 if (h == NULL) { 1288 return NULL; 1289 } 1290 hs->msg_ctx = msg_ctx; 1291 hs->server_id = server_id; 1292 hs->table = table; 1293 hs->timeout = IRPC_CALL_TIMEOUT; 1294 1295 dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev); 1296 1297 return h; 1298 } 1299 1300 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx, 1301 struct messaging_context *msg_ctx, 1302 const char *dest_task, 1303 const struct ndr_interface_table *table) 1304 { 1305 struct dcerpc_binding_handle *h; 1306 struct server_id *sids; 1307 struct server_id sid; 1308 1309 /* find the server task */ 1310 sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task); 1311 if (sids == NULL) { 1312 errno = EADDRNOTAVAIL; 1313 return NULL; 1314 } 1315 if (sids[0].id == 0) { 1316 talloc_free(sids); 1317 errno = EADDRNOTAVAIL; 1318 return NULL; 1319 } 1320 sid = sids[0]; 1321 talloc_free(sids); 1322 1323 h = irpc_binding_handle(mem_ctx, msg_ctx, 1324 sid, table); 1325 if (h == NULL) { 1326 return NULL; 1327 } 1328 1329 return h; 1330 } 1331 1332 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle *h, 1333 struct security_token *token) 1334 { 1335 struct irpc_bh_state *hs = 1336 dcerpc_binding_handle_data(h, 1337 struct irpc_bh_state); 1338 1339 hs->token = token; 1340 }
Note:
See TracChangeset
for help on using the changeset viewer.