source: branches/samba-3.5.x/lib/async_req/async_sock.c

Last change on this file was 698, checked in by Silvan Scherrer, 13 years ago

Samba Server 3.5: some high-mem workarounds removed

File size: 15.3 KB
Line 
1/*
2 Unix SMB/CIFS implementation.
3 async socket syscalls
4 Copyright (C) Volker Lendecke 2008
5
6 ** NOTE! The following LGPL license applies to the async_sock
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
9
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
14
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Library General Public License for more details.
19
20 You should have received a copy of the GNU Lesser General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
22*/
23
24#include "replace.h"
25#include "system/network.h"
26#include "system/filesys.h"
27#include <talloc.h>
28#include <tevent.h>
29#include "lib/async_req/async_sock.h"
30
31/* Note: lib/util/ is currently GPL */
32#include "lib/util/tevent_unix.h"
33#include "lib/util/util.h"
34
35#ifndef TALLOC_FREE
36#define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37#endif
38
39struct async_send_state {
40 int fd;
41 const void *buf;
42 size_t len;
43 int flags;
44 ssize_t sent;
45};
46
47static void async_send_handler(struct tevent_context *ev,
48 struct tevent_fd *fde,
49 uint16_t flags, void *private_data);
50
51struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
52 struct tevent_context *ev,
53 int fd, const void *buf, size_t len,
54 int flags)
55{
56 struct tevent_req *result;
57 struct async_send_state *state;
58 struct tevent_fd *fde;
59
60 result = tevent_req_create(mem_ctx, &state, struct async_send_state);
61 if (result == NULL) {
62 return result;
63 }
64 state->fd = fd;
65 state->buf = buf;
66 state->len = len;
67 state->flags = flags;
68
69 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
70 result);
71 if (fde == NULL) {
72 TALLOC_FREE(result);
73 return NULL;
74 }
75 return result;
76}
77
78static void async_send_handler(struct tevent_context *ev,
79 struct tevent_fd *fde,
80 uint16_t flags, void *private_data)
81{
82 struct tevent_req *req = talloc_get_type_abort(
83 private_data, struct tevent_req);
84 struct async_send_state *state =
85 tevent_req_data(req, struct async_send_state);
86
87 state->sent = send(state->fd, state->buf, state->len, state->flags);
88 if ((state->sent == -1) && (errno == EINTR)) {
89 /* retry */
90 return;
91 }
92 if (state->sent == -1) {
93 tevent_req_error(req, errno);
94 return;
95 }
96 tevent_req_done(req);
97}
98
99ssize_t async_send_recv(struct tevent_req *req, int *perrno)
100{
101 struct async_send_state *state =
102 tevent_req_data(req, struct async_send_state);
103
104 if (tevent_req_is_unix_error(req, perrno)) {
105 return -1;
106 }
107 return state->sent;
108}
109
110struct async_recv_state {
111 int fd;
112 void *buf;
113 size_t len;
114 int flags;
115 ssize_t received;
116};
117
118static void async_recv_handler(struct tevent_context *ev,
119 struct tevent_fd *fde,
120 uint16_t flags, void *private_data);
121
122struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
123 struct tevent_context *ev,
124 int fd, void *buf, size_t len, int flags)
125{
126 struct tevent_req *result;
127 struct async_recv_state *state;
128 struct tevent_fd *fde;
129
130 result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
131 if (result == NULL) {
132 return result;
133 }
134 state->fd = fd;
135 state->buf = buf;
136 state->len = len;
137 state->flags = flags;
138
139 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
140 result);
141 if (fde == NULL) {
142 TALLOC_FREE(result);
143 return NULL;
144 }
145 return result;
146}
147
148static void async_recv_handler(struct tevent_context *ev,
149 struct tevent_fd *fde,
150 uint16_t flags, void *private_data)
151{
152 struct tevent_req *req = talloc_get_type_abort(
153 private_data, struct tevent_req);
154 struct async_recv_state *state =
155 tevent_req_data(req, struct async_recv_state);
156
157 state->received = recv(state->fd, state->buf, state->len,
158 state->flags);
159 if ((state->received == -1) && (errno == EINTR)) {
160 /* retry */
161 return;
162 }
163 if (state->received == 0) {
164 tevent_req_error(req, EPIPE);
165 return;
166 }
167 if (state->received == -1) {
168 tevent_req_error(req, errno);
169 return;
170 }
171 tevent_req_done(req);
172}
173
174ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
175{
176 struct async_recv_state *state =
177 tevent_req_data(req, struct async_recv_state);
178
179 if (tevent_req_is_unix_error(req, perrno)) {
180 return -1;
181 }
182 return state->received;
183}
184
185struct async_connect_state {
186 int fd;
187 int result;
188 int sys_errno;
189 long old_sockflags;
190 socklen_t address_len;
191 struct sockaddr_storage address;
192};
193
194static void async_connect_connected(struct tevent_context *ev,
195 struct tevent_fd *fde, uint16_t flags,
196 void *priv);
197
198/**
199 * @brief async version of connect(2)
200 * @param[in] mem_ctx The memory context to hang the result off
201 * @param[in] ev The event context to work from
202 * @param[in] fd The socket to recv from
203 * @param[in] address Where to connect?
204 * @param[in] address_len Length of *address
205 * @retval The async request
206 *
207 * This function sets the socket into non-blocking state to be able to call
208 * connect in an async state. This will be reset when the request is finished.
209 */
210
211struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
212 struct tevent_context *ev,
213 int fd, const struct sockaddr *address,
214 socklen_t address_len)
215{
216 struct tevent_req *result;
217 struct async_connect_state *state;
218 struct tevent_fd *fde;
219
220 result = tevent_req_create(
221 mem_ctx, &state, struct async_connect_state);
222 if (result == NULL) {
223 return NULL;
224 }
225
226 /**
227 * We have to set the socket to nonblocking for async connect(2). Keep
228 * the old sockflags around.
229 */
230
231 state->fd = fd;
232 state->sys_errno = 0;
233
234 state->old_sockflags = fcntl(fd, F_GETFL, 0);
235 if (state->old_sockflags == -1) {
236 goto post_errno;
237 }
238
239 state->address_len = address_len;
240 if (address_len > sizeof(state->address)) {
241 errno = EINVAL;
242 goto post_errno;
243 }
244 memcpy(&state->address, address, address_len);
245
246 set_blocking(fd, false);
247
248 state->result = connect(fd, address, address_len);
249 if (state->result == 0) {
250 tevent_req_done(result);
251 goto done;
252 }
253
254 /**
255 * A number of error messages show that something good is progressing
256 * and that we have to wait for readability.
257 *
258 * If none of them are present, bail out.
259 */
260
261 if (!(errno == EINPROGRESS || errno == EALREADY ||
262#ifdef EISCONN
263 errno == EISCONN ||
264#endif
265 errno == EAGAIN || errno == EINTR)) {
266 state->sys_errno = errno;
267 goto post_errno;
268 }
269
270 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
271 async_connect_connected, result);
272 if (fde == NULL) {
273 state->sys_errno = ENOMEM;
274 goto post_errno;
275 }
276 return result;
277
278 post_errno:
279 tevent_req_error(result, state->sys_errno);
280 done:
281 fcntl(fd, F_SETFL, state->old_sockflags);
282#ifdef __OS2__ // if the old flags have blocking set we need to set it also (fcntl bug)
283 if (!(state->old_sockflags & O_NONBLOCK)) {
284 set_blocking(state->fd, true);
285 }
286#endif
287 return tevent_req_post(result, ev);
288}
289
290/**
291 * fde event handler for connect(2)
292 * @param[in] ev The event context that sent us here
293 * @param[in] fde The file descriptor event associated with the connect
294 * @param[in] flags Indicate read/writeability of the socket
295 * @param[in] priv private data, "struct async_req *" in this case
296 */
297
298static void async_connect_connected(struct tevent_context *ev,
299 struct tevent_fd *fde, uint16_t flags,
300 void *priv)
301{
302 struct tevent_req *req = talloc_get_type_abort(
303 priv, struct tevent_req);
304 struct async_connect_state *state =
305 tevent_req_data(req, struct async_connect_state);
306
307 /*
308 * Stevens, Network Programming says that if there's a
309 * successful connect, the socket is only writable. Upon an
310 * error, it's both readable and writable.
311 */
312 if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
313 == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
314 int ret;
315
316 ret = connect(state->fd,
317 (struct sockaddr *)(void *)&state->address,
318 state->address_len);
319 if (ret == 0) {
320 TALLOC_FREE(fde);
321 tevent_req_done(req);
322 return;
323 }
324
325 if (errno == EINPROGRESS) {
326 /* Try again later, leave the fde around */
327 return;
328 }
329 TALLOC_FREE(fde);
330 tevent_req_error(req, errno);
331 return;
332 }
333
334 state->sys_errno = 0;
335 tevent_req_done(req);
336}
337
338int async_connect_recv(struct tevent_req *req, int *perrno)
339{
340 struct async_connect_state *state =
341 tevent_req_data(req, struct async_connect_state);
342 int err;
343
344 fcntl(state->fd, F_SETFL, state->old_sockflags);
345#ifdef __OS2__ // if the old flags have blocking set we need to set it also (fcntl bug)
346 if (!(state->old_sockflags & O_NONBLOCK)) {
347 set_blocking(state->fd, true);
348 }
349#endif
350 if (tevent_req_is_unix_error(req, &err)) {
351 *perrno = err;
352 return -1;
353 }
354
355 if (state->sys_errno == 0) {
356 return 0;
357 }
358
359 *perrno = state->sys_errno;
360 return -1;
361}
362
363struct writev_state {
364 struct tevent_context *ev;
365 int fd;
366 struct iovec *iov;
367 int count;
368 size_t total_size;
369 uint16_t flags;
370};
371
372static void writev_trigger(struct tevent_req *req, void *private_data);
373static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
374 uint16_t flags, void *private_data);
375
376struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
377 struct tevent_queue *queue, int fd,
378 bool err_on_readability,
379 struct iovec *iov, int count)
380{
381 struct tevent_req *req;
382 struct writev_state *state;
383
384 req = tevent_req_create(mem_ctx, &state, struct writev_state);
385 if (req == NULL) {
386 return NULL;
387 }
388 state->ev = ev;
389 state->fd = fd;
390 state->total_size = 0;
391 state->count = count;
392 state->iov = (struct iovec *)talloc_memdup(
393 state, iov, sizeof(struct iovec) * count);
394 if (state->iov == NULL) {
395 goto fail;
396 }
397 state->flags = TEVENT_FD_WRITE;
398 if (err_on_readability) {
399 state->flags |= TEVENT_FD_READ;
400 }
401
402 if (queue == NULL) {
403 struct tevent_fd *fde;
404 fde = tevent_add_fd(state->ev, state, state->fd,
405 state->flags, writev_handler, req);
406 if (tevent_req_nomem(fde, req)) {
407 return tevent_req_post(req, ev);
408 }
409 return req;
410 }
411
412 if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
413 goto fail;
414 }
415 return req;
416 fail:
417 TALLOC_FREE(req);
418 return NULL;
419}
420
421static void writev_trigger(struct tevent_req *req, void *private_data)
422{
423 struct writev_state *state = tevent_req_data(req, struct writev_state);
424 struct tevent_fd *fde;
425
426 fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
427 writev_handler, req);
428 if (fde == NULL) {
429 tevent_req_error(req, ENOMEM);
430 }
431}
432
433static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
434 uint16_t flags, void *private_data)
435{
436 struct tevent_req *req = talloc_get_type_abort(
437 private_data, struct tevent_req);
438 struct writev_state *state =
439 tevent_req_data(req, struct writev_state);
440 size_t to_write, written;
441 int i;
442
443 to_write = 0;
444
445 if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
446 tevent_req_error(req, EPIPE);
447 return;
448 }
449
450 for (i=0; i<state->count; i++) {
451 to_write += state->iov[i].iov_len;
452 }
453
454 written = writev(state->fd, state->iov, state->count);
455 if ((written == -1) && (errno == EINTR)) {
456 /* retry */
457 return;
458 }
459 if (written == -1) {
460 tevent_req_error(req, errno);
461 return;
462 }
463 if (written == 0) {
464 tevent_req_error(req, EPIPE);
465 return;
466 }
467 state->total_size += written;
468
469 if (written == to_write) {
470 tevent_req_done(req);
471 return;
472 }
473
474 /*
475 * We've written less than we were asked to, drop stuff from
476 * state->iov.
477 */
478
479 while (written > 0) {
480 if (written < state->iov[0].iov_len) {
481 state->iov[0].iov_base =
482 (char *)state->iov[0].iov_base + written;
483 state->iov[0].iov_len -= written;
484 break;
485 }
486 written -= state->iov[0].iov_len;
487 state->iov += 1;
488 state->count -= 1;
489 }
490}
491
492ssize_t writev_recv(struct tevent_req *req, int *perrno)
493{
494 struct writev_state *state =
495 tevent_req_data(req, struct writev_state);
496
497 if (tevent_req_is_unix_error(req, perrno)) {
498 return -1;
499 }
500 return state->total_size;
501}
502
503struct read_packet_state {
504 int fd;
505 uint8_t *buf;
506 size_t nread;
507 ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
508 void *private_data;
509};
510
511static void read_packet_handler(struct tevent_context *ev,
512 struct tevent_fd *fde,
513 uint16_t flags, void *private_data);
514
515struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
516 struct tevent_context *ev,
517 int fd, size_t initial,
518 ssize_t (*more)(uint8_t *buf,
519 size_t buflen,
520 void *private_data),
521 void *private_data)
522{
523 struct tevent_req *result;
524 struct read_packet_state *state;
525 struct tevent_fd *fde;
526
527 result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
528 if (result == NULL) {
529 return NULL;
530 }
531 state->fd = fd;
532 state->nread = 0;
533 state->more = more;
534 state->private_data = private_data;
535
536 state->buf = talloc_array(state, uint8_t, initial);
537 if (state->buf == NULL) {
538 goto fail;
539 }
540
541 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
542 result);
543 if (fde == NULL) {
544 goto fail;
545 }
546 return result;
547 fail:
548 TALLOC_FREE(result);
549 return NULL;
550}
551
552static void read_packet_handler(struct tevent_context *ev,
553 struct tevent_fd *fde,
554 uint16_t flags, void *private_data)
555{
556 struct tevent_req *req = talloc_get_type_abort(
557 private_data, struct tevent_req);
558 struct read_packet_state *state =
559 tevent_req_data(req, struct read_packet_state);
560 size_t total = talloc_get_size(state->buf);
561 ssize_t nread, more;
562 uint8_t *tmp;
563
564 nread = recv(state->fd, state->buf+state->nread, total-state->nread,
565 0);
566 if ((nread == -1) && (errno == EINTR)) {
567 /* retry */
568 return;
569 }
570 if (nread == -1) {
571 tevent_req_error(req, errno);
572 return;
573 }
574 if (nread == 0) {
575 tevent_req_error(req, EPIPE);
576 return;
577 }
578
579 state->nread += nread;
580 if (state->nread < total) {
581 /* Come back later */
582 return;
583 }
584
585 /*
586 * We got what was initially requested. See if "more" asks for -- more.
587 */
588 if (state->more == NULL) {
589 /* Nobody to ask, this is a async read_data */
590 tevent_req_done(req);
591 return;
592 }
593
594 more = state->more(state->buf, total, state->private_data);
595 if (more == -1) {
596 /* We got an invalid packet, tell the caller */
597 tevent_req_error(req, EIO);
598 return;
599 }
600 if (more == 0) {
601 /* We're done, full packet received */
602 tevent_req_done(req);
603 return;
604 }
605
606 tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
607 if (tevent_req_nomem(tmp, req)) {
608 return;
609 }
610 state->buf = tmp;
611}
612
613ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
614 uint8_t **pbuf, int *perrno)
615{
616 struct read_packet_state *state =
617 tevent_req_data(req, struct read_packet_state);
618
619 if (tevent_req_is_unix_error(req, perrno)) {
620 return -1;
621 }
622 *pbuf = talloc_move(mem_ctx, &state->buf);
623 return talloc_get_size(*pbuf);
624}
Note: See TracBrowser for help on using the repository browser.