source: vendor/current/ctdb/common/comm.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 8.8 KB
Line 
1/*
2 Communication endpoint implementation
3
4 Copyright (C) Amitay Isaacs 2015
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "replace.h"
21#include "system/network.h"
22#include "system/filesys.h"
23
24#include <talloc.h>
25#include <tdb.h>
26
27#include "lib/util/tevent_unix.h"
28
29#include "pkt_read.h"
30#include "pkt_write.h"
31#include "comm.h"
32
33static bool set_nonblocking(int fd)
34{
35 int v;
36
37 v = fcntl(fd, F_GETFL, 0);
38 if (v == -1) {
39 return false;
40 }
41 if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) {
42 return false;
43 }
44 return true;
45}
46
47/*
48 * Communication endpoint around a socket
49 */
50
51#define SMALL_PKT_SIZE 1024
52
53struct comm_context {
54 int fd;
55 comm_read_handler_fn read_handler;
56 void *read_private_data;
57 comm_dead_handler_fn dead_handler;
58 void *dead_private_data;
59 uint8_t small_pkt[SMALL_PKT_SIZE];
60 struct tevent_req *read_req, *write_req;
61 struct tevent_fd *fde;
62 struct tevent_queue *queue;
63};
64
65static void comm_fd_handler(struct tevent_context *ev,
66 struct tevent_fd *fde,
67 uint16_t flags, void *private_data);
68static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
69 struct tevent_context *ev,
70 struct comm_context *comm,
71 uint8_t *buf, size_t buflen);
72static void comm_read_failed(struct tevent_req *req);
73
74
75int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
76 comm_read_handler_fn read_handler, void *read_private_data,
77 comm_dead_handler_fn dead_handler, void *dead_private_data,
78 struct comm_context **result)
79{
80 struct comm_context *comm;
81
82 if (fd < 0) {
83 return EINVAL;
84 }
85
86 if (dead_handler == NULL) {
87 return EINVAL;
88 }
89
90 /* Socket queue relies on non-blocking sockets. */
91 if (!set_nonblocking(fd)) {
92 return EIO;
93 }
94
95 comm = talloc_zero(mem_ctx, struct comm_context);
96 if (comm == NULL) {
97 return ENOMEM;
98 }
99
100 comm->fd = fd;
101 comm->read_handler = read_handler;
102 comm->read_private_data = read_private_data;
103 comm->dead_handler = dead_handler;
104 comm->dead_private_data = dead_private_data;
105
106 comm->queue = tevent_queue_create(comm, "comm write queue");
107 if (comm->queue == NULL) {
108 goto fail;
109 }
110
111 /* Set up to write packets */
112 comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
113 comm_fd_handler, comm);
114 if (comm->fde == NULL) {
115 goto fail;
116 }
117
118 /* Set up to read packets */
119 if (read_handler != NULL) {
120 struct tevent_req *req;
121
122 req = comm_read_send(comm, ev, comm, comm->small_pkt,
123 SMALL_PKT_SIZE);
124 if (req == NULL) {
125 goto fail;
126 }
127
128 tevent_req_set_callback(req, comm_read_failed, comm);
129 comm->read_req = req;
130 }
131
132 *result = comm;
133 return 0;
134
135fail:
136 talloc_free(comm);
137 return ENOMEM;
138}
139
140
141/*
142 * Read packets
143 */
144
145struct comm_read_state {
146 struct tevent_context *ev;
147 struct comm_context *comm;
148 uint8_t *buf;
149 size_t buflen;
150 struct tevent_req *subreq;
151};
152
153static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
154static void comm_read_done(struct tevent_req *subreq);
155
156static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
157 struct tevent_context *ev,
158 struct comm_context *comm,
159 uint8_t *buf, size_t buflen)
160{
161 struct tevent_req *req, *subreq;
162 struct comm_read_state *state;
163
164 req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
165 if (req == NULL) {
166 return NULL;
167 }
168
169 state->ev = ev;
170 state->comm = comm;
171 state->buf = buf;
172 state->buflen = buflen;
173
174 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
175 state->buf, state->buflen,
176 comm_read_more, NULL);
177 if (tevent_req_nomem(subreq, req)) {
178 return tevent_req_post(req, ev);
179 }
180 state->subreq = subreq;
181
182 tevent_req_set_callback(subreq, comm_read_done, req);
183 return req;
184}
185
186static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
187{
188 uint32_t packet_len;
189
190 if (buflen < sizeof(uint32_t)) {
191 return sizeof(uint32_t) - buflen;
192 }
193
194 packet_len = *(uint32_t *)buf;
195
196 return packet_len - buflen;
197}
198
199static void comm_read_done(struct tevent_req *subreq)
200{
201 struct tevent_req *req = tevent_req_callback_data(
202 subreq, struct tevent_req);
203 struct comm_read_state *state = tevent_req_data(
204 req, struct comm_read_state);
205 struct comm_context *comm = state->comm;
206 ssize_t nread;
207 uint8_t *buf;
208 bool free_buf;
209 int err = 0;
210
211 nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
212 TALLOC_FREE(subreq);
213 state->subreq = NULL;
214 if (nread == -1) {
215 tevent_req_error(req, err);
216 return;
217 }
218
219 comm->read_handler(buf, nread, comm->read_private_data);
220
221 if (free_buf) {
222 talloc_free(buf);
223 }
224
225 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
226 state->buf, state->buflen,
227 comm_read_more, NULL);
228 if (tevent_req_nomem(subreq, req)) {
229 return;
230 }
231 state->subreq = subreq;
232
233 tevent_req_set_callback(subreq, comm_read_done, req);
234}
235
236static void comm_read_recv(struct tevent_req *req, int *perr)
237{
238 int err;
239
240 if (tevent_req_is_unix_error(req, &err)) {
241 if (perr != NULL) {
242 *perr = err;
243 }
244 }
245}
246
247static void comm_read_failed(struct tevent_req *req)
248{
249 struct comm_context *comm = tevent_req_callback_data(
250 req, struct comm_context);
251
252 comm_read_recv(req, NULL);
253 TALLOC_FREE(req);
254 comm->read_req = NULL;
255 if (comm->dead_handler != NULL) {
256 comm->dead_handler(comm->dead_private_data);
257 }
258}
259
260
261/*
262 * Write packets
263 */
264
265struct comm_write_state {
266 struct tevent_context *ev;
267 struct comm_context *comm;
268 struct tevent_req *subreq;
269 uint8_t *buf;
270 size_t buflen, nwritten;
271};
272
273static void comm_write_trigger(struct tevent_req *req, void *private_data);
274static void comm_write_done(struct tevent_req *subreq);
275
276struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
277 struct tevent_context *ev,
278 struct comm_context *comm,
279 uint8_t *buf, size_t buflen)
280{
281 struct tevent_req *req;
282 struct comm_write_state *state;
283
284 req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
285 if (req == NULL) {
286 return NULL;
287 }
288
289 state->ev = ev;
290 state->comm = comm;
291 state->buf = buf;
292 state->buflen = buflen;
293
294 if (!tevent_queue_add_entry(comm->queue, ev, req,
295 comm_write_trigger, NULL)) {
296 talloc_free(req);
297 return NULL;
298 }
299
300 return req;
301}
302
303static void comm_write_trigger(struct tevent_req *req, void *private_data)
304{
305 struct comm_write_state *state = tevent_req_data(
306 req, struct comm_write_state);
307 struct comm_context *comm = state->comm;
308 struct tevent_req *subreq;
309
310 comm->write_req = req;
311
312 subreq = pkt_write_send(state, state->ev, comm->fd,
313 state->buf, state->buflen);
314 if (tevent_req_nomem(subreq, req)) {
315 return;
316 }
317
318 state->subreq = subreq;
319 tevent_req_set_callback(subreq, comm_write_done, req);
320 TEVENT_FD_WRITEABLE(comm->fde);
321}
322
323static void comm_write_done(struct tevent_req *subreq)
324{
325 struct tevent_req *req = tevent_req_callback_data(
326 subreq, struct tevent_req);
327 struct comm_write_state *state = tevent_req_data(
328 req, struct comm_write_state);
329 struct comm_context *comm = state->comm;
330 ssize_t nwritten;
331 int err = 0;
332
333 TEVENT_FD_NOT_WRITEABLE(comm->fde);
334 nwritten = pkt_write_recv(subreq, &err);
335 TALLOC_FREE(subreq);
336 state->subreq = NULL;
337 comm->write_req = NULL;
338 if (nwritten == -1) {
339 if (err == EPIPE) {
340 comm->dead_handler(comm->dead_private_data);
341 }
342 tevent_req_error(req, err);
343 return;
344 }
345
346 state->nwritten = nwritten;
347 tevent_req_done(req);
348}
349
350bool comm_write_recv(struct tevent_req *req, int *perr)
351{
352 struct comm_write_state *state = tevent_req_data(
353 req, struct comm_write_state);
354 int err;
355
356 if (tevent_req_is_unix_error(req, &err)) {
357 if (perr != NULL) {
358 *perr = err;
359 }
360 return false;
361 }
362
363 if (state->nwritten != state->buflen) {
364 *perr = EIO;
365 return false;
366 }
367
368 *perr = 0;
369 return true;
370}
371
372static void comm_fd_handler(struct tevent_context *ev,
373 struct tevent_fd *fde,
374 uint16_t flags, void *private_data)
375{
376 struct comm_context *comm = talloc_get_type_abort(
377 private_data, struct comm_context);
378
379 if (flags & TEVENT_FD_READ) {
380 struct comm_read_state *read_state;
381
382 if (comm->read_req == NULL) {
383 /* This should never happen */
384 abort();
385 }
386
387 read_state = tevent_req_data(comm->read_req,
388 struct comm_read_state);
389 pkt_read_handler(ev, fde, flags, read_state->subreq);
390 }
391
392 if (flags & TEVENT_FD_WRITE) {
393 struct comm_write_state *write_state;
394
395 if (comm->write_req == NULL) {
396 /* This should never happen */
397 abort();
398 }
399
400 write_state = tevent_req_data(comm->write_req,
401 struct comm_write_state);
402 pkt_write_handler(ev, fde, flags, write_state->subreq);
403 }
404}
Note: See TracBrowser for help on using the repository browser.