source: trunk/server/lib/async_req/async_sock.c

Last change on this file was 862, checked in by Silvan Scherrer, 11 years ago

Samba Server: update trunk to 3.6.23

File size: 16.0 KB
Line 
1/*
2 Unix SMB/CIFS implementation.
3 async socket syscalls
4 Copyright (C) Volker Lendecke 2008
5
6 ** NOTE! The following LGPL license applies to the async_sock
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
9
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
14
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Library General Public License for more details.
19
20 You should have received a copy of the GNU Lesser General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
22*/
23
24#include "replace.h"
25#include "system/network.h"
26#include "system/filesys.h"
27#include <talloc.h>
28#include <tevent.h>
29#include "lib/async_req/async_sock.h"
30
31/* Note: lib/util/ is currently GPL */
32#include "lib/util/tevent_unix.h"
33#include "lib/util/util.h"
34
35#ifndef TALLOC_FREE
36#define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37#endif
38
39struct sendto_state {
40 int fd;
41 const void *buf;
42 size_t len;
43 int flags;
44 const struct sockaddr_storage *addr;
45 socklen_t addr_len;
46 ssize_t sent;
47};
48
49static void sendto_handler(struct tevent_context *ev,
50 struct tevent_fd *fde,
51 uint16_t flags, void *private_data);
52
53struct tevent_req *sendto_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
54 int fd, const void *buf, size_t len, int flags,
55 const struct sockaddr_storage *addr)
56{
57 struct tevent_req *result;
58 struct sendto_state *state;
59 struct tevent_fd *fde;
60
61 result = tevent_req_create(mem_ctx, &state, struct sendto_state);
62 if (result == NULL) {
63 return result;
64 }
65 state->fd = fd;
66 state->buf = buf;
67 state->len = len;
68 state->flags = flags;
69 state->addr = addr;
70
71 switch (addr->ss_family) {
72 case AF_INET:
73 state->addr_len = sizeof(struct sockaddr_in);
74 break;
75#if defined(HAVE_IPV6)
76 case AF_INET6:
77 state->addr_len = sizeof(struct sockaddr_in6);
78 break;
79#endif
80 case AF_UNIX:
81 state->addr_len = sizeof(struct sockaddr_un);
82 break;
83 default:
84 state->addr_len = sizeof(struct sockaddr_storage);
85 break;
86 }
87
88 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, sendto_handler,
89 result);
90 if (fde == NULL) {
91 TALLOC_FREE(result);
92 return NULL;
93 }
94 return result;
95}
96
97static void sendto_handler(struct tevent_context *ev,
98 struct tevent_fd *fde,
99 uint16_t flags, void *private_data)
100{
101 struct tevent_req *req = talloc_get_type_abort(
102 private_data, struct tevent_req);
103 struct sendto_state *state =
104 tevent_req_data(req, struct sendto_state);
105
106 state->sent = sendto(state->fd, state->buf, state->len, state->flags,
107 (struct sockaddr *)state->addr, state->addr_len);
108 if ((state->sent == -1) && (errno == EINTR)) {
109 /* retry */
110 return;
111 }
112 if (state->sent == -1) {
113 tevent_req_error(req, errno);
114 return;
115 }
116 tevent_req_done(req);
117}
118
119ssize_t sendto_recv(struct tevent_req *req, int *perrno)
120{
121 struct sendto_state *state =
122 tevent_req_data(req, struct sendto_state);
123
124 if (tevent_req_is_unix_error(req, perrno)) {
125 return -1;
126 }
127 return state->sent;
128}
129
130struct recvfrom_state {
131 int fd;
132 void *buf;
133 size_t len;
134 int flags;
135 struct sockaddr_storage *addr;
136 socklen_t *addr_len;
137 ssize_t received;
138};
139
140static void recvfrom_handler(struct tevent_context *ev,
141 struct tevent_fd *fde,
142 uint16_t flags, void *private_data);
143
144struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx,
145 struct tevent_context *ev,
146 int fd, void *buf, size_t len, int flags,
147 struct sockaddr_storage *addr,
148 socklen_t *addr_len)
149{
150 struct tevent_req *result;
151 struct recvfrom_state *state;
152 struct tevent_fd *fde;
153
154 result = tevent_req_create(mem_ctx, &state, struct recvfrom_state);
155 if (result == NULL) {
156 return result;
157 }
158 state->fd = fd;
159 state->buf = buf;
160 state->len = len;
161 state->flags = flags;
162 state->addr = addr;
163 state->addr_len = addr_len;
164
165 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler,
166 result);
167 if (fde == NULL) {
168 TALLOC_FREE(result);
169 return NULL;
170 }
171 return result;
172}
173
174static void recvfrom_handler(struct tevent_context *ev,
175 struct tevent_fd *fde,
176 uint16_t flags, void *private_data)
177{
178 struct tevent_req *req = talloc_get_type_abort(
179 private_data, struct tevent_req);
180 struct recvfrom_state *state =
181 tevent_req_data(req, struct recvfrom_state);
182
183 state->received = recvfrom(state->fd, state->buf, state->len,
184 state->flags, (struct sockaddr *)state->addr,
185 state->addr_len);
186 if ((state->received == -1) && (errno == EINTR)) {
187 /* retry */
188 return;
189 }
190 if (state->received == 0) {
191 tevent_req_error(req, EPIPE);
192 return;
193 }
194 if (state->received == -1) {
195 tevent_req_error(req, errno);
196 return;
197 }
198 tevent_req_done(req);
199}
200
201ssize_t recvfrom_recv(struct tevent_req *req, int *perrno)
202{
203 struct recvfrom_state *state =
204 tevent_req_data(req, struct recvfrom_state);
205
206 if (tevent_req_is_unix_error(req, perrno)) {
207 return -1;
208 }
209 return state->received;
210}
211
212struct async_connect_state {
213 int fd;
214 int result;
215 int sys_errno;
216 long old_sockflags;
217 socklen_t address_len;
218 struct sockaddr_storage address;
219};
220
221static void async_connect_connected(struct tevent_context *ev,
222 struct tevent_fd *fde, uint16_t flags,
223 void *priv);
224
225/**
226 * @brief async version of connect(2)
227 * @param[in] mem_ctx The memory context to hang the result off
228 * @param[in] ev The event context to work from
229 * @param[in] fd The socket to recv from
230 * @param[in] address Where to connect?
231 * @param[in] address_len Length of *address
232 * @retval The async request
233 *
234 * This function sets the socket into non-blocking state to be able to call
235 * connect in an async state. This will be reset when the request is finished.
236 */
237
238struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
239 struct tevent_context *ev,
240 int fd, const struct sockaddr *address,
241 socklen_t address_len)
242{
243 struct tevent_req *result;
244 struct async_connect_state *state;
245 struct tevent_fd *fde;
246
247 result = tevent_req_create(
248 mem_ctx, &state, struct async_connect_state);
249 if (result == NULL) {
250 return NULL;
251 }
252
253 /**
254 * We have to set the socket to nonblocking for async connect(2). Keep
255 * the old sockflags around.
256 */
257
258 state->fd = fd;
259 state->sys_errno = 0;
260
261 state->old_sockflags = fcntl(fd, F_GETFL, 0);
262 if (state->old_sockflags == -1) {
263 goto post_errno;
264 }
265
266 state->address_len = address_len;
267 if (address_len > sizeof(state->address)) {
268 errno = EINVAL;
269 goto post_errno;
270 }
271 memcpy(&state->address, address, address_len);
272
273 set_blocking(fd, false);
274
275 state->result = connect(fd, address, address_len);
276 if (state->result == 0) {
277 tevent_req_done(result);
278 goto done;
279 }
280
281 /**
282 * A number of error messages show that something good is progressing
283 * and that we have to wait for readability.
284 *
285 * If none of them are present, bail out.
286 */
287
288 if (!(errno == EINPROGRESS || errno == EALREADY ||
289#ifdef EISCONN
290 errno == EISCONN ||
291#endif
292 errno == EAGAIN || errno == EINTR)) {
293 state->sys_errno = errno;
294 goto post_errno;
295 }
296
297 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
298 async_connect_connected, result);
299 if (fde == NULL) {
300 state->sys_errno = ENOMEM;
301 goto post_errno;
302 }
303 return result;
304
305 post_errno:
306 tevent_req_error(result, state->sys_errno);
307 done:
308 fcntl(fd, F_SETFL, state->old_sockflags);
309#ifdef __OS2__ // if the old flags have blocking set we need to set it also (fcntl bug)
310 if (!(state->old_sockflags & O_NONBLOCK)) {
311 set_blocking(state->fd, true);
312 }
313#endif
314 return tevent_req_post(result, ev);
315}
316
317/**
318 * fde event handler for connect(2)
319 * @param[in] ev The event context that sent us here
320 * @param[in] fde The file descriptor event associated with the connect
321 * @param[in] flags Indicate read/writeability of the socket
322 * @param[in] priv private data, "struct async_req *" in this case
323 */
324
325static void async_connect_connected(struct tevent_context *ev,
326 struct tevent_fd *fde, uint16_t flags,
327 void *priv)
328{
329 struct tevent_req *req = talloc_get_type_abort(
330 priv, struct tevent_req);
331 struct async_connect_state *state =
332 tevent_req_data(req, struct async_connect_state);
333 int ret;
334
335 ret = connect(state->fd, (struct sockaddr *)(void *)&state->address,
336 state->address_len);
337 if (ret == 0) {
338 state->sys_errno = 0;
339 TALLOC_FREE(fde);
340 tevent_req_done(req);
341 return;
342 }
343 if (errno == EINPROGRESS) {
344 /* Try again later, leave the fde around */
345 return;
346 }
347 state->sys_errno = errno;
348 TALLOC_FREE(fde);
349 tevent_req_error(req, errno);
350 return;
351}
352
353int async_connect_recv(struct tevent_req *req, int *perrno)
354{
355 struct async_connect_state *state =
356 tevent_req_data(req, struct async_connect_state);
357 int err;
358
359 fcntl(state->fd, F_SETFL, state->old_sockflags);
360#ifdef __OS2__ // if the old flags have blocking set we need to set it also (fcntl bug)
361 if (!(state->old_sockflags & O_NONBLOCK)) {
362 set_blocking(state->fd, true);
363 }
364#endif
365
366 if (tevent_req_is_unix_error(req, &err)) {
367 *perrno = err;
368 return -1;
369 }
370
371 if (state->sys_errno == 0) {
372 return 0;
373 }
374
375 *perrno = state->sys_errno;
376 return -1;
377}
378
379struct writev_state {
380 struct tevent_context *ev;
381 int fd;
382 struct iovec *iov;
383 int count;
384 size_t total_size;
385 uint16_t flags;
386 bool err_on_readability;
387};
388
389static void writev_trigger(struct tevent_req *req, void *private_data);
390static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
391 uint16_t flags, void *private_data);
392
393struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
394 struct tevent_queue *queue, int fd,
395 bool err_on_readability,
396 struct iovec *iov, int count)
397{
398 struct tevent_req *req;
399 struct writev_state *state;
400
401 req = tevent_req_create(mem_ctx, &state, struct writev_state);
402 if (req == NULL) {
403 return NULL;
404 }
405 state->ev = ev;
406 state->fd = fd;
407 state->total_size = 0;
408 state->count = count;
409 state->iov = (struct iovec *)talloc_memdup(
410 state, iov, sizeof(struct iovec) * count);
411 if (state->iov == NULL) {
412 goto fail;
413 }
414 state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
415 state->err_on_readability = err_on_readability;
416
417 if (queue == NULL) {
418 struct tevent_fd *fde;
419 fde = tevent_add_fd(state->ev, state, state->fd,
420 state->flags, writev_handler, req);
421 if (tevent_req_nomem(fde, req)) {
422 return tevent_req_post(req, ev);
423 }
424 return req;
425 }
426
427 if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
428 goto fail;
429 }
430 return req;
431 fail:
432 TALLOC_FREE(req);
433 return NULL;
434}
435
436static void writev_trigger(struct tevent_req *req, void *private_data)
437{
438 struct writev_state *state = tevent_req_data(req, struct writev_state);
439 struct tevent_fd *fde;
440
441 fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
442 writev_handler, req);
443 if (fde == NULL) {
444 tevent_req_error(req, ENOMEM);
445 }
446}
447
448static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
449 uint16_t flags, void *private_data)
450{
451 struct tevent_req *req = talloc_get_type_abort(
452 private_data, struct tevent_req);
453 struct writev_state *state =
454 tevent_req_data(req, struct writev_state);
455 size_t to_write, written;
456 int i;
457
458 to_write = 0;
459
460 if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
461 int ret, value;
462
463 if (state->err_on_readability) {
464 /* Readable and the caller wants an error on read. */
465 tevent_req_error(req, EPIPE);
466 return;
467 }
468
469 /* Might be an error. Check if there are bytes to read */
470 ret = ioctl(state->fd, FIONREAD, &value);
471 /* FIXME - should we also check
472 for ret == 0 and value == 0 here ? */
473 if (ret == -1) {
474 /* There's an error. */
475 tevent_req_error(req, EPIPE);
476 return;
477 }
478 /* A request for TEVENT_FD_READ will succeed from now and
479 forevermore until the bytes are read so if there was
480 an error we'll wait until we do read, then get it in
481 the read callback function. Until then, remove TEVENT_FD_READ
482 from the flags we're waiting for. */
483 state->flags &= ~TEVENT_FD_READ;
484 TEVENT_FD_NOT_READABLE(fde);
485
486 /* If not writable, we're done. */
487 if (!(flags & TEVENT_FD_WRITE)) {
488 return;
489 }
490 }
491
492 for (i=0; i<state->count; i++) {
493 to_write += state->iov[i].iov_len;
494 }
495
496 written = writev(state->fd, state->iov, state->count);
497 if ((written == -1) && (errno == EINTR)) {
498 /* retry */
499 return;
500 }
501 if (written == -1) {
502 tevent_req_error(req, errno);
503 return;
504 }
505 if (written == 0) {
506 tevent_req_error(req, EPIPE);
507 return;
508 }
509 state->total_size += written;
510
511 if (written == to_write) {
512 tevent_req_done(req);
513 return;
514 }
515
516 /*
517 * We've written less than we were asked to, drop stuff from
518 * state->iov.
519 */
520
521 while (written > 0) {
522 if (written < state->iov[0].iov_len) {
523 state->iov[0].iov_base =
524 (char *)state->iov[0].iov_base + written;
525 state->iov[0].iov_len -= written;
526 break;
527 }
528 written -= state->iov[0].iov_len;
529 state->iov += 1;
530 state->count -= 1;
531 }
532}
533
534ssize_t writev_recv(struct tevent_req *req, int *perrno)
535{
536 struct writev_state *state =
537 tevent_req_data(req, struct writev_state);
538
539 if (tevent_req_is_unix_error(req, perrno)) {
540 return -1;
541 }
542 return state->total_size;
543}
544
545struct read_packet_state {
546 int fd;
547 uint8_t *buf;
548 size_t nread;
549 ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
550 void *private_data;
551};
552
553static void read_packet_handler(struct tevent_context *ev,
554 struct tevent_fd *fde,
555 uint16_t flags, void *private_data);
556
557struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
558 struct tevent_context *ev,
559 int fd, size_t initial,
560 ssize_t (*more)(uint8_t *buf,
561 size_t buflen,
562 void *private_data),
563 void *private_data)
564{
565 struct tevent_req *result;
566 struct read_packet_state *state;
567 struct tevent_fd *fde;
568
569 result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
570 if (result == NULL) {
571 return NULL;
572 }
573 state->fd = fd;
574 state->nread = 0;
575 state->more = more;
576 state->private_data = private_data;
577
578 state->buf = talloc_array(state, uint8_t, initial);
579 if (state->buf == NULL) {
580 goto fail;
581 }
582
583 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
584 result);
585 if (fde == NULL) {
586 goto fail;
587 }
588 return result;
589 fail:
590 TALLOC_FREE(result);
591 return NULL;
592}
593
594static void read_packet_handler(struct tevent_context *ev,
595 struct tevent_fd *fde,
596 uint16_t flags, void *private_data)
597{
598 struct tevent_req *req = talloc_get_type_abort(
599 private_data, struct tevent_req);
600 struct read_packet_state *state =
601 tevent_req_data(req, struct read_packet_state);
602 size_t total = talloc_get_size(state->buf);
603 ssize_t nread, more;
604 uint8_t *tmp;
605
606 nread = recv(state->fd, state->buf+state->nread, total-state->nread,
607 0);
608 if ((nread == -1) && (errno == EINTR)) {
609 /* retry */
610 return;
611 }
612 if (nread == -1) {
613 tevent_req_error(req, errno);
614 return;
615 }
616 if (nread == 0) {
617 tevent_req_error(req, EPIPE);
618 return;
619 }
620
621 state->nread += nread;
622 if (state->nread < total) {
623 /* Come back later */
624 return;
625 }
626
627 /*
628 * We got what was initially requested. See if "more" asks for -- more.
629 */
630 if (state->more == NULL) {
631 /* Nobody to ask, this is a async read_data */
632 tevent_req_done(req);
633 return;
634 }
635
636 more = state->more(state->buf, total, state->private_data);
637 if (more == -1) {
638 /* We got an invalid packet, tell the caller */
639 tevent_req_error(req, EIO);
640 return;
641 }
642 if (more == 0) {
643 /* We're done, full packet received */
644 tevent_req_done(req);
645 return;
646 }
647
648 if (total + more < total) {
649 tevent_req_error(req, EMSGSIZE);
650 return;
651 }
652
653 tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
654 if (tevent_req_nomem(tmp, req)) {
655 return;
656 }
657 state->buf = tmp;
658}
659
660ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
661 uint8_t **pbuf, int *perrno)
662{
663 struct read_packet_state *state =
664 tevent_req_data(req, struct read_packet_state);
665
666 if (tevent_req_is_unix_error(req, perrno)) {
667 return -1;
668 }
669 *pbuf = talloc_move(mem_ctx, &state->buf);
670 return talloc_get_size(*pbuf);
671}
Note: See TracBrowser for help on using the repository browser.