source: trunk/ORBit2-2.14.0/linc2/src/linc-connection.c

Last change on this file was 92, checked in by cinc, 19 years ago

Orbit2 modified for use with NOM

File size: 35.8 KB
Line 
1/*
2 * linc-connection.c: This file is part of the linc library.
3 *
4 * Authors:
5 * Elliot Lee (sopwith@redhat.com)
6 * Michael Meeks (michael@ximian.com)
7 * Mark McLouglin (mark@skynet.ie) & others
8 *
9 * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10 * Sun Microsystems, Inc.
11 */
12#include <config.h>
13#include <stdarg.h>
14#include <fcntl.h>
15#include <errno.h>
16#include <string.h>
17#include <stdio.h>
18
19#ifdef LINK_SSL_SUPPORT
20# include <openssl/ssl.h>
21#endif
22
23#include "linc-private.h"
24#include "linc-compat.h"
25#include <linc/linc-config.h>
26#include <linc/linc-connection.h>
27
28static GObjectClass *parent_class = NULL;
29
30enum {
31 BROKEN,
32 BLOCKING,
33 LAST_SIGNAL
34};
35static guint signals [LAST_SIGNAL];
36static GList *cnx_list = NULL;
37
38#define CNX_LOCK(cnx) G_STMT_START { link_lock(); } G_STMT_END
39#define CNX_UNLOCK(cnx) G_STMT_START { link_unlock(); } G_STMT_END
40#define CNX_LIST_LOCK() CNX_LOCK(0); /* for now */
41#define CNX_LIST_UNLOCK() CNX_UNLOCK(0); /* for now */
42#define CNX_AND_LIST_LOCK(cnx) CNX_LOCK(cnx); /* for now */
43#define CNX_AND_LIST_UNLOCK(cnx) CNX_UNLOCK(cnx); /* for now */
44#define CNX_IS_LOCKED(cnx) link_is_locked()
45
46static gboolean link_connection_io_handler (GIOChannel *gioc,
47 GIOCondition condition,
48 gpointer data);
49
50#define link_connection_ref_T(cnx) g_object_ref (cnx)
51
52gpointer
53link_connection_ref (gpointer cnx)
54{
55 CNX_AND_LIST_LOCK (cnx);
56 g_object_ref (cnx);
57 CNX_AND_LIST_UNLOCK (cnx);
58
59 return cnx;
60}
61
62/* Only call if we are _certain_ that we don't hold the last ref */
63static void
64link_connection_unref_T_ (gpointer cnx)
65{
66 g_assert (((GObject *)cnx)->ref_count > 1);
67 g_object_unref (cnx);
68}
69
70static void
71link_connection_unref_unlock (gpointer cnx)
72{
73 gboolean tail_unref = FALSE;
74
75 if (((GObject *)cnx)->ref_count > 1)
76 g_object_unref (cnx);
77
78 else {
79 cnx_list = g_list_remove (cnx_list, cnx);
80 tail_unref = TRUE;
81 }
82
83 CNX_AND_LIST_UNLOCK (cnx);
84
85 if (tail_unref) {
86 LinkCommandCnxUnref cmd[1];
87
88 cmd->cmd.cmd.type = LINK_COMMAND_CNX_UNREF;
89 cmd->cmd.complete = FALSE;
90 cmd->cnx = cnx;
91 link_exec_command ((LinkCommand *) cmd);
92 }
93}
94
95void
96link_connection_exec_cnx_unref (LinkCommandCnxUnref *cmd, gboolean immediate)
97{
98 d_printf ("Exec defered unref on %p", cmd->cnx);
99
100 if (immediate) /* In I/O thread - with just 1 ref left */
101 g_object_unref (cmd->cnx);
102 else {
103 CNX_AND_LIST_LOCK (cmd->cnx);
104 link_connection_unref_unlock (cmd->cnx);
105 }
106}
107
108void
109link_connection_unref (gpointer cnx)
110{
111 g_return_if_fail (cnx != NULL);
112
113 CNX_AND_LIST_LOCK (cnx);
114
115 link_connection_unref_unlock (cnx);
116}
117
118static void
119link_close_fd (LinkConnection *cnx)
120{
121 if (cnx->priv->fd >= 0) {
122 d_printf ("link_close_fd: closing %d\n", cnx->priv->fd);
123 LINK_CLOSE_SOCKET (cnx->priv->fd);
124 }
125 cnx->priv->fd = -1;
126}
127
128typedef struct {
129 LinkBrokenCallback fn;
130 gpointer user_data;
131} BrokenCallback;
132
133static void
134link_connection_emit_broken (LinkConnection *cnx, GSList *callbacks)
135{
136 GSList *l;
137
138 for (l = callbacks; l; l = l->next) {
139 BrokenCallback *bc = l->data;
140 bc->fn (cnx, bc->user_data);
141 g_free (bc);
142 }
143 g_slist_free (callbacks);
144}
145
146static gboolean
147link_connection_broken_idle (gpointer data)
148{
149 GSList *callbacks;
150 LinkConnection *cnx = data;
151
152 d_printf ("Connection %p broken idle ...\n", data);
153
154 CNX_LOCK (cnx);
155 callbacks = cnx->idle_broken_callbacks;
156 cnx->idle_broken_callbacks = NULL;
157 cnx->inhibit_reconnect = FALSE;
158 link_signal ();
159 CNX_UNLOCK (cnx);
160
161 link_connection_emit_broken (cnx, callbacks);
162
163 link_connection_unref (cnx);
164
165 return FALSE;
166}
167
168static void
169link_source_remove (LinkConnection *cnx)
170{
171 if (cnx->priv->tag) {
172 LinkWatch *thewatch = cnx->priv->tag;
173 cnx->priv->tag = NULL;
174 link_io_remove_watch (thewatch);
175 d_printf ("Removed watch on %d\n", cnx->priv->fd);
176 }
177}
178
179static void
180link_source_add (LinkConnection *cnx,
181 GIOCondition condition)
182{
183 g_assert (cnx->priv->tag == NULL);
184
185 cnx->priv->tag = link_io_add_watch_fd (
186 cnx->priv->fd, condition,
187 link_connection_io_handler, cnx);
188
189 d_printf ("Added watch on %d (0x%x)\n",
190 cnx->priv->fd, condition);
191}
192
193typedef struct {
194 guchar *data;
195
196 struct iovec *vecs;
197 int nvecs;
198 struct iovec single_vec;
199} QueuedWrite;
200
201static void
202queued_write_free (QueuedWrite *qw)
203{
204 g_free (qw->data);
205 g_free (qw);
206}
207
208static void
209queue_free (LinkConnection *cnx)
210{
211 GList *l;
212
213 for (l = cnx->priv->write_queue; l; l = l->next)
214 queued_write_free (l->data);
215
216 g_list_free (cnx->priv->write_queue);
217 cnx->priv->write_queue = NULL;
218}
219
220static void
221dispatch_callbacks_drop_lock (LinkConnection *cnx)
222{
223 GSList *callbacks;
224
225 callbacks = cnx->idle_broken_callbacks;
226 cnx->idle_broken_callbacks = NULL;
227
228 CNX_UNLOCK (cnx);
229 link_connection_emit_broken (cnx, callbacks);
230 CNX_LOCK (cnx);
231}
232
233/*
234 * link_connection_class_state_changed:
235 * @cnx: a #LinkConnection
236 * @status: a #LinkConnectionStatus value.
237 *
238 * Set up linc's #GSources if the connection is in the #LINK_CONNECTED
239 * or #LINK_CONNECTING state.
240 *
241 * Remove the #GSources if the state has channged to #LINK_DISCONNECTED,
242 * close the socket and a gobject broken signal which may be caught by
243 * the application.
244 *
245 * Also perform SSL specific operations if the connection has move into
246 * the #LINK_CONNECTED state.
247 */
248static void
249link_connection_state_changed_T_R (LinkConnection *cnx,
250 LinkConnectionStatus status)
251{
252 gboolean changed;
253 LinkConnectionClass *klass;
254
255 g_assert (CNX_IS_LOCKED (cnx));
256
257 d_printf ("State changing from '%s' to '%s' on fd %d\n",
258 STATE_NAME (cnx->status), STATE_NAME (status),
259 cnx->priv->fd);
260
261 changed = cnx->status != status;
262
263 cnx->status = status;
264
265 switch (status) {
266 case LINK_CONNECTED:
267#ifdef LINK_SSL_SUPPORT
268 if (cnx->options & LINK_CONNECTION_SSL) {
269 if (cnx->was_initiated)
270 SSL_connect (cnx->priv->ssl);
271 else
272 SSL_accept (cnx->priv->ssl);
273 }
274#endif
275 if (!cnx->priv->tag)
276 link_source_add (cnx, LINK_ERR_CONDS | LINK_IN_CONDS);
277 break;
278
279 case LINK_CONNECTING:
280
281 if (cnx->priv->tag) /* re-connecting */
282 link_watch_set_condition (
283 cnx->priv->tag,
284 G_IO_OUT | LINK_ERR_CONDS);
285 else
286 link_source_add (cnx, G_IO_OUT | LINK_ERR_CONDS);
287 break;
288
289 case LINK_DISCONNECTED:
290 link_source_remove (cnx);
291 link_close_fd (cnx);
292 queue_free (cnx);
293 /* don't free pending queue - we could get re-connected */
294 if (changed) {
295
296 if (!cnx->priv->was_disconnected) {
297 d_printf ("Emitting the broken signal on %p\n", cnx);
298 CNX_UNLOCK (cnx);
299 g_signal_emit (cnx, signals [BROKEN], 0);
300 CNX_LOCK (cnx);
301 }
302
303 if (cnx->idle_broken_callbacks) {
304 if (!link_thread_io ()) {
305 d_printf ("Immediate broken callbacks at immediately\n");
306
307 dispatch_callbacks_drop_lock (cnx);
308 } else {
309 d_printf ("Queuing broken callbacks at idle\n");
310
311 cnx->inhibit_reconnect = TRUE;
312 link_connection_ref_T (cnx);
313 g_idle_add (link_connection_broken_idle, cnx);
314 }
315 }
316 }
317 break;
318 }
319
320 klass = (LinkConnectionClass *)G_OBJECT_GET_CLASS (cnx);
321
322 if (klass->state_changed) {
323 link_signal ();
324 CNX_UNLOCK (cnx);
325 klass->state_changed (cnx, status);
326 CNX_LOCK (cnx);
327 }
328}
329
330static void
331queue_signal_T_R (LinkConnection *cnx,
332 glong delta)
333{
334 gulong old_size;
335 gulong new_size;
336
337 d_printf ("Queue signal %ld bytes, delta %ld, max %ld\n",
338 cnx->priv->write_queue_bytes, delta,
339 cnx->priv->max_buffer_bytes);
340
341 old_size = cnx->priv->write_queue_bytes;
342 cnx->priv->write_queue_bytes += delta;
343 new_size = cnx->priv->write_queue_bytes;
344
345 if (cnx->options & LINK_CONNECTION_BLOCK_SIGNAL) {
346 if (new_size == 0 ||
347 (old_size < (cnx->priv->max_buffer_bytes >> 1) &&
348 new_size >= (cnx->priv->max_buffer_bytes >> 1)) ||
349 new_size >= cnx->priv->max_buffer_bytes) {
350 CNX_UNLOCK (cnx);
351 g_signal_emit (cnx, signals [BLOCKING], 0, new_size);
352 CNX_LOCK (cnx);
353 }
354 }
355
356 if (cnx->priv->max_buffer_bytes &&
357 cnx->priv->write_queue_bytes >= cnx->priv->max_buffer_bytes)
358 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
359}
360
361static gulong
362calc_size (struct iovec *src_vecs,
363 int nvecs)
364{
365 int i;
366 gulong total_size = 0;
367
368 for (i = 0; i < nvecs; i++)
369 total_size += src_vecs [i].iov_len;
370
371 return total_size;
372}
373
374static void
375queue_flattened_T_R (LinkConnection *cnx,
376 struct iovec *src_vecs,
377 int nvecs,
378 gboolean update_poll)
379{
380 int i;
381 guchar *p;
382 gulong total_size;
383 gboolean new_queue;
384 QueuedWrite *qw = g_new (QueuedWrite, 1);
385
386 total_size = calc_size (src_vecs, nvecs);
387
388 p = g_malloc (total_size);
389
390 qw->data = p;
391 qw->vecs = &qw->single_vec;
392 qw->nvecs = 1;
393
394 qw->vecs->iov_base = p;
395 qw->vecs->iov_len = total_size;
396
397 for (i = 0; i < nvecs; i++) {
398 memcpy (p, src_vecs [i].iov_base, src_vecs [i].iov_len);
399 p += src_vecs [i].iov_len;
400 }
401 g_assert (p == (qw->data + total_size));
402
403 d_printf ("Queueing write of %ld bytes on fd %d\n",
404 total_size, cnx->priv->fd);
405
406 new_queue = cnx->priv->write_queue == NULL;
407 cnx->priv->write_queue = g_list_append (cnx->priv->write_queue, qw);
408 queue_signal_T_R (cnx, total_size);
409
410 if (update_poll && new_queue) {
411 LinkCommandSetCondition *cmd;
412
413 cmd = g_new (LinkCommandSetCondition, 1);
414 cmd->cmd.type = LINK_COMMAND_SET_CONDITION;
415 cmd->cnx = link_connection_ref_T (cnx);
416 cmd->condition = (LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
417 link_exec_command (&cmd->cmd);
418 }
419}
420
421static void
422link_connection_from_fd_T (LinkConnection *cnx,
423 int fd,
424 const LinkProtocolInfo *proto,
425 gchar *remote_host_info,
426 gchar *remote_serv_info,
427 gboolean was_initiated,
428 LinkConnectionStatus status,
429 LinkConnectionOptions options)
430{
431 cnx->was_initiated = was_initiated;
432 cnx->is_auth = (proto->flags & LINK_PROTOCOL_SECURE);
433 cnx->proto = proto;
434 cnx->options = options;
435 cnx->priv->fd = fd;
436
437 g_free (cnx->remote_host_info);
438 cnx->remote_host_info = remote_host_info;
439 g_free (cnx->remote_serv_info);
440 cnx->remote_serv_info = remote_serv_info;
441
442 d_printf ("Cnx from fd (%d) '%s', '%s', '%s'\n",
443 fd, proto->name,
444 remote_host_info ? remote_host_info : "<Null>",
445 remote_serv_info ? remote_serv_info : "<Null>");
446
447 if (proto->setup)
448 proto->setup (fd, options);
449
450#ifdef LINK_SSL_SUPPORT
451 if (options & LINK_CONNECTION_SSL) {
452 cnx->priv->ssl = SSL_new (link_ssl_ctx);
453 SSL_set_fd (cnx->priv->ssl, fd);
454 }
455#endif
456 g_assert (CNX_IS_LOCKED (0));
457 link_connection_state_changed_T_R (cnx, status);
458
459 if (!g_list_find (cnx_list, cnx))
460 cnx_list = g_list_prepend (cnx_list, cnx);
461}
462
463/*
464 * link_connection_from_fd:
465 * @cnx: a #LinkConnection.
466 * @fd: a connected/connecting file descriptor.
467 * @proto: a #LinkProtocolInfo.
468 * @remote_host_info: protocol dependant host information; gallocation swallowed
469 * @remote_serv_info: protocol dependant service information(e.g. port number). gallocation swallowed
470 * @was_initiated: #TRUE if the connection was initiated by us.
471 * @status: a #LinkConnectionStatus value.
472 * @options: combination of #LinkConnectionOptions.
473 *
474 * Fill in @cnx, call protocol specific initialisation methonds and then
475 * call link_connection_state_changed.
476 *
477 * Return Value: #TRUE if the function succeeds, #FALSE otherwise.
478 */
479void
480link_connection_from_fd (LinkConnection *cnx,
481 int fd,
482 const LinkProtocolInfo *proto,
483 gchar *remote_host_info,
484 gchar *remote_serv_info,
485 gboolean was_initiated,
486 LinkConnectionStatus status,
487 LinkConnectionOptions options)
488{
489 CNX_LOCK (cnx);
490
491 link_connection_from_fd_T (cnx, fd, proto,
492 remote_serv_info, remote_serv_info,
493 was_initiated, status, options);
494 CNX_UNLOCK (cnx);
495}
496
497static gboolean
498link_connection_do_initiate (LinkConnection *cnx,
499 const char *proto_name,
500 const char *host,
501 const char *service,
502 LinkConnectionOptions options)
503{
504 const LinkProtocolInfo *proto;
505 int rv;
506 int fd;
507 gboolean retval = FALSE;
508 struct sockaddr *saddr;
509 LinkSockLen saddr_len;
510
511 proto = link_protocol_find (proto_name);
512
513 if (!proto)
514 return FALSE;
515
516 saddr = link_protocol_get_sockaddr (
517 proto, host, service, &saddr_len);
518
519 if (!saddr && (strcmp (proto_name, "IPv6") ==0)) {/* Falling back to IPv4 */
520 proto = link_protocol_find ("IPv4");
521
522 saddr = link_protocol_get_sockaddr (
523 proto, host, service, &saddr_len);
524 }
525
526 if (!saddr)
527 return FALSE;
528
529 fd = socket (proto->family, SOCK_STREAM,
530 proto->stream_proto_num);
531#ifdef HAVE_WINSOCK2_H
532 if (fd == INVALID_SOCKET) {
533 fd = -1;
534 link_map_winsock_error_to_errno ();
535 }
536#endif
537
538 if (fd < 0) {
539 d_printf ("socket() failed: %s\n", link_strerror (errno));
540 goto out;
541 }
542
543 if (options & LINK_CONNECTION_NONBLOCKING) {
544#ifdef HAVE_WINSOCK2_H
545 u_long yes = 1;
546 if (ioctlsocket (fd, FIONBIO, &yes) == SOCKET_ERROR) {
547 link_map_winsock_error_to_errno ();
548 d_printf ("ioctlsocket(FIONBIO) failed: %s\n",
549 link_strerror (errno));
550 goto out;
551 }
552#else
553 if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0)
554 goto out;
555#endif
556 }
557
558#if defined (F_SETFD) && defined (FD_CLOEXEC)
559 if (fcntl (fd, F_SETFD, FD_CLOEXEC) < 0)
560 goto out;
561#endif
562
563 LINK_TEMP_FAILURE_RETRY_SOCKET (connect (fd, saddr, saddr_len), rv);
564#ifdef HAVE_WINSOCK2_H
565 if (rv == SOCKET_ERROR) {
566 if ((options & LINK_CONNECTION_NONBLOCKING) &&
567 WSAGetLastError () == WSAEWOULDBLOCK) {
568 /* connect() for nonblocking sockets always
569 * fails with WSAEWOULDBLOCK. We have to
570 * select() to wait for actual status.
571 */
572 fd_set write_fds, except_fds;
573
574 FD_ZERO (&write_fds);
575 FD_SET (fd, &write_fds);
576
577 FD_ZERO (&except_fds);
578 FD_SET (fd, &except_fds);
579
580 rv = select (1, NULL, &write_fds, &except_fds, NULL);
581 if (rv == SOCKET_ERROR) {
582 rv = -1;
583 link_map_winsock_error_to_errno ();
584 } else if (FD_ISSET (fd, &write_fds)) {
585 rv = 0;
586 } else if (FD_ISSET (fd, &except_fds)) {
587 rv = -1;
588 errno = WSAECONNREFUSED;
589 }
590 } else {
591 rv = -1;
592 link_map_winsock_error_to_errno ();
593 }
594 }
595#endif
596 if (rv && errno != EINPROGRESS)
597 goto out;
598
599 d_printf ("initiate 'connect' on new fd %d\n", fd);
600
601 g_assert (CNX_IS_LOCKED (0));
602 link_connection_from_fd_T
603 (cnx, fd, proto,
604 g_strdup (host), g_strdup (service),
605 TRUE, rv ? LINK_CONNECTING : LINK_CONNECTED,
606 options);
607 retval = TRUE;
608
609 out:
610 if (!retval && fd >= 0) {
611 d_printf ("initiation failed: %s\n", link_strerror (errno));
612 d_printf ("closing %d\n", fd);
613 LINK_CLOSE_SOCKET (fd);
614 }
615
616 g_free (saddr);
617
618 return retval;
619}
620
621static LinkConnectionStatus
622link_connection_wait_connected_T (LinkConnection *cnx)
623{
624 while (cnx && cnx->status == LINK_CONNECTING)
625 link_wait ();
626
627 return cnx ? cnx->status : LINK_DISCONNECTED;
628}
629
630LinkConnectionStatus
631link_connection_try_reconnect (LinkConnection *cnx)
632{
633 LinkConnectionStatus status;
634
635 g_return_val_if_fail (LINK_IS_CONNECTION (cnx), LINK_DISCONNECTED);
636
637 CNX_LOCK (cnx);
638
639 d_printf ("Try for reconnection on %p: %d\n",
640 cnx, cnx->inhibit_reconnect);
641
642 while (cnx->inhibit_reconnect) {
643 if (g_main_context_acquire (NULL)) {
644 d_printf ("Dispatch callbacks in 'main' (mainloop owning) thread\n");
645 cnx->inhibit_reconnect = FALSE;
646 dispatch_callbacks_drop_lock (cnx);
647 g_main_context_release (NULL);
648 } else
649 link_wait ();
650 }
651
652 if (cnx->status != LINK_DISCONNECTED)
653 g_warning ("trying to re-connect connected cnx.");
654 else
655 link_connection_do_initiate
656 (cnx, cnx->proto->name, cnx->remote_host_info,
657 cnx->remote_serv_info, cnx->options);
658
659 cnx->priv->was_disconnected = TRUE;
660 status = link_connection_wait_connected_T (cnx);
661 cnx->priv->was_disconnected = FALSE;
662
663 CNX_UNLOCK (cnx);
664
665 return status;
666}
667
668/**
669 * link_connection_initiate_list:
670 * @derived_type: a #LinkConnection derived type
671 * @proto_name: the name of the protocol to use.
672 * @host: protocol dependant host information.
673 * @service: protocol dependant service information(e.g. port number).
674 * @options: combination of #LinkConnectionOptions.
675 * @opt_construct_fn: optional constructor fn for new cnx's or NULL
676 * @user_data: optional user data for constructor
677 *
678 * Looks up a connection in our cnx. list to see if we already
679 * have a matching connection; if so returns it, otherwise
680 * constructs a new cnx. and retursn that
681 *
682 * Return value: an incremented cnx ref.
683 **/
684LinkConnection *
685link_connection_initiate (GType derived_type,
686 const char *proto_name,
687 const char *remote_host_info,
688 const char *remote_serv_info,
689 LinkConnectionOptions options,
690 const char *first_property,
691 ...)
692{
693 va_list args;
694 GList *l;
695 gboolean initiated = TRUE;
696 LinkConnection *cnx = NULL;
697 const LinkProtocolInfo *proto;
698
699 va_start (args, first_property);
700
701 proto = link_protocol_find (proto_name);
702
703 CNX_LIST_LOCK();
704
705 /* FIXME: hash this if it's slow */
706 for (l = cnx_list; l; l = l->next) {
707 cnx = l->data;
708
709 if (cnx->was_initiated && cnx->proto == proto &&
710 cnx->status != LINK_DISCONNECTED &&
711 ((cnx->options & LINK_CONNECTION_SSL) == (options & LINK_CONNECTION_SSL)) &&
712 !strcmp (remote_host_info, cnx->remote_host_info) &&
713 !strcmp (remote_serv_info, cnx->remote_serv_info)) {
714 cnx = link_connection_ref_T (cnx);
715 break;
716 }
717 }
718
719 cnx = l ? l->data : NULL;
720
721 if (!cnx) {
722 cnx = LINK_CONNECTION
723 (g_object_new_valist (derived_type, first_property, args));
724
725 initiated = link_connection_do_initiate
726 (cnx, proto_name, remote_host_info,
727 remote_serv_info, options);
728 }
729
730 CNX_LIST_UNLOCK();
731
732 if (!initiated) {
733 link_connection_unref (cnx);
734 cnx = NULL;
735 }
736
737 va_end (args);
738
739 return cnx;
740}
741
742/*
743 * link_connection_state_changed:
744 * @cnx: a #LinkConnection.
745 * @status: a #LinkConnectionStatus.
746 *
747 * A wrapper for the #LinkConnectionClass's state change method.
748 */
749void
750link_connection_state_changed (LinkConnection *cnx,
751 LinkConnectionStatus status)
752{
753 CNX_LOCK (cnx);
754 link_connection_state_changed_T_R (cnx, status);
755 CNX_UNLOCK (cnx);
756}
757
758/**
759 * link_connection_read:
760 * @cnx: the connection to write to
761 * @buf: a pointer to the start of an array of bytes to read data into
762 * @len: the length of the array in bytes to read ingo
763 * @block_for_full_read: whether to block for a full read
764 *
765 * Warning, block_for_full_read is of limited usefullness.
766 *
767 * Return value: number of bytes written on success; negative on error.
768 **/
769glong
770link_connection_read (LinkConnection *cnx,
771 guchar *buf,
772 int len,
773 gboolean block_for_full_read)
774{
775 int bytes_read = 0;
776
777 d_printf ("Read up to %d bytes from fd %d\n", len, cnx->priv->fd);
778
779 if (!len)
780 return 0;
781
782 CNX_LOCK (cnx);
783
784 if (cnx->status != LINK_CONNECTED)
785 goto fatal_error;
786
787 do {
788 int n;
789
790#ifdef LINK_SSL_SUPPORT
791 if (cnx->options & LINK_CONNECTION_SSL)
792 n = SSL_read (cnx->priv->ssl, buf, len);
793 else
794#endif
795#ifdef HAVE_WINSOCK2_H
796 if ((n = recv (cnx->priv->fd, buf, len, 0)) == SOCKET_ERROR) {
797 n = -1;
798 link_map_winsock_error_to_errno ();
799 d_printf ("recv failed: %s\n",
800 link_strerror (errno));
801 }
802#else
803 LINK_TEMP_FAILURE_RETRY_SYSCALL (read (cnx->priv->fd,
804 buf,
805 len), n);
806#endif
807 g_assert (n <= len);
808
809 if (n < 0) {
810#ifdef LINK_SSL_SUPPORT
811 if (cnx->options & LINK_CONNECTION_SSL) {
812 gulong rv;
813
814 rv = SSL_get_error (cnx->priv->ssl, n);
815
816 if ((rv == SSL_ERROR_WANT_READ ||
817 rv == SSL_ERROR_WANT_WRITE) &&
818 (cnx->options & LINK_CONNECTION_NONBLOCKING))
819 goto out;
820 else
821 goto fatal_error;
822 } else
823#endif
824 {
825 if (errno == EINTR)
826 continue;
827
828 else if (errno == EAGAIN &&
829 (cnx->options & LINK_CONNECTION_NONBLOCKING))
830 goto out;
831
832 else if (errno == EBADF) {
833 g_warning ("Serious fd usage error %d", cnx->priv->fd);
834 goto fatal_error;
835
836 } else
837 goto fatal_error;
838 }
839
840 } else if (n == 0) {
841 d_printf ("we got EOF on fd %d\n", cnx->priv->fd);
842 bytes_read = LINK_IO_FATAL_ERROR;
843 goto out;
844 } else {
845 buf += n;
846 len -= n;
847 bytes_read += n;
848#ifdef CONNECTION_DEBUG
849 cnx->priv->total_read_bytes += n;
850#endif
851 }
852 } while (len > 0 && block_for_full_read);
853
854 d_printf ("we read %d bytes (total %"G_GUINT64_FORMAT")\n",
855 bytes_read, cnx->priv->total_read_bytes);
856
857 out:
858 CNX_UNLOCK (cnx);
859 return bytes_read;
860
861 fatal_error:
862 CNX_UNLOCK (cnx);
863 return LINK_IO_FATAL_ERROR;
864}
865
866/* Determine the maximum size of the iovec vector */
867
868#if defined (MAXIOV) /* HPUX */
869# define LINK_IOV_MAX (MAXIOV)
870#elif defined (IOV_MAX) /* AIX */
871# define LINK_IOV_MAX (IOV_MAX)
872#elif defined (_SC_IOV_MAX) /* SGI */
873# define LINK_IOV_MAX_INIT (sysconf (_SC_IOV_MAX))
874#elif defined (__APPLE__)
875/* Even though the write(2) man page mentions it, UIO_MAXIOV is only
876 * available if KERNEL is defined on MacOS X 10.1
877 */
878# define LINK_IOV_MAX 1024
879#elif defined (UIO_MAXIOV) /* Glibc */
880# define LINK_IOV_MAX (UIO_MAXIOV)
881#else /* Safe Guess */
882# define LINK_IOV_MAX 16
883#endif
884
885/* If the value requires initialization, define the function here */
886#if defined (LINK_IOV_MAX_INIT)
887# define LINK_IOV_MAX link_iov_max
888 static guint link_iov_max = 0;
889 static inline void
890 link_iov_max_init ()
891 {
892 if (link_iov_max == 0)
893 {
894 gint max;
895 G_LOCK_DEFINE_STATIC (link_iov_max);
896 G_LOCK (link_iov_max);
897 if (link_iov_max == 0)
898 {
899 max = LINK_IOV_MAX_INIT;
900 if (max <= 0)
901 max = 16;
902 link_iov_max = max;
903 }
904 G_UNLOCK (link_iov_max);
905 }
906 }
907#else
908# define link_iov_max_init()
909#endif
910
911static glong
912write_data_T (LinkConnection *cnx, QueuedWrite *qw)
913{
914 glong bytes_written = 0;
915
916 g_return_val_if_fail (cnx->status == LINK_CONNECTED,
917 LINK_IO_FATAL_ERROR);
918
919 link_iov_max_init ();
920
921 while ((qw->nvecs > 0) && (qw->vecs->iov_len > 0)) {
922 int n;
923
924 d_printf ("write_data %ld bytes to fd %d - ",
925 calc_size (qw->vecs, qw->nvecs), cnx->priv->fd);
926
927#ifdef LINK_SSL_SUPPORT
928 if (cnx->options & LINK_CONNECTION_SSL)
929 n = SSL_write (cnx->priv->ssl, qw->vecs->iov_base,
930 qw->vecs->iov_len);
931 else
932#endif
933#ifdef HAVE_WINSOCK2_H
934 {
935 if (WSASend (cnx->priv->fd, qw->vecs,
936 MIN (qw->nvecs, LINK_IOV_MAX),
937 (LPDWORD) &n, 0, NULL, NULL) == SOCKET_ERROR) {
938 if (WSAGetLastError () == WSAEWOULDBLOCK)
939 link_win32_watch_set_write_wouldblock (cnx->priv->tag, TRUE);
940 n = -1;
941 link_map_winsock_error_to_errno ();
942 d_printf ("WSASend failed: %s\n",
943 link_strerror (errno));
944 } else {
945 link_win32_watch_set_write_wouldblock (cnx->priv->tag, FALSE);
946 }
947 }
948#else
949 LINK_TEMP_FAILURE_RETRY_SOCKET (writev (cnx->priv->fd,
950 qw->vecs,
951 MIN (qw->nvecs, LINK_IOV_MAX)), n);
952#endif
953 d_printf ("wrote %d bytes (total %"G_GUINT64_FORMAT")\n",
954 n,
955 (cnx->priv->total_written_bytes += ((n > 0) ? n : 0),
956 cnx->priv->total_written_bytes));
957
958 if (n < 0) {
959#ifdef LINK_SSL_SUPPORT
960 if (cnx->options & LINK_CONNECTION_SSL) {
961 gulong rv;
962
963 rv = SSL_get_error (cnx->priv->ssl, n);
964
965 if ((rv == SSL_ERROR_WANT_READ ||
966 rv == SSL_ERROR_WANT_WRITE) &&
967 cnx->options & LINK_CONNECTION_NONBLOCKING)
968 return LINK_IO_QUEUED_DATA;
969 else
970 return LINK_IO_FATAL_ERROR;
971 } else
972#endif
973 {
974 if (errno == EINTR)
975 continue;
976
977 else if (errno == EAGAIN &&
978 (cnx->options & LINK_CONNECTION_NONBLOCKING))
979 return LINK_IO_QUEUED_DATA;
980
981 else if (errno == EBADF)
982 g_warning ("Serious fd usage error %d", cnx->priv->fd);
983
984 return LINK_IO_FATAL_ERROR; /* Unhandlable error */
985 }
986
987 } else if (n == 0) /* CHECK: is this really an error condition */
988 return LINK_IO_FATAL_ERROR;
989
990 else {
991 bytes_written += n;
992
993 while (qw->nvecs > 0 && n >= qw->vecs->iov_len) {
994 n -= qw->vecs->iov_len;
995 qw->nvecs--;
996 qw->vecs++;
997 }
998
999 if (n) {
1000 qw->vecs->iov_len -= n;
1001 qw->vecs->iov_base = (guchar *)qw->vecs->iov_base + n;
1002 }
1003 }
1004 }
1005
1006 return bytes_written;
1007}
1008
1009static gboolean
1010link_connection_should_block (LinkConnection *cnx,
1011 const LinkWriteOpts *opt_write_opts)
1012{
1013 if (!opt_write_opts)
1014 return TRUE;
1015
1016 if (opt_write_opts->block_on_write)
1017 return TRUE;
1018
1019 return FALSE;
1020}
1021
1022/* Always called in the I/O thread */
1023static void
1024link_connection_flush_write_queue_T_R (LinkConnection *cnx)
1025{
1026 gboolean done_writes = TRUE;
1027
1028 if (cnx->priv->write_queue) {
1029 glong status;
1030 QueuedWrite *qw = cnx->priv->write_queue->data;
1031
1032 status = write_data_T (cnx, qw);
1033
1034 d_printf ("Wrote queue %ld on fd %d\n", status, cnx->priv->fd);
1035
1036 if (status >= LINK_IO_OK) {
1037 cnx->priv->write_queue = g_list_delete_link
1038 (cnx->priv->write_queue, cnx->priv->write_queue);
1039 queued_write_free (qw);
1040
1041 queue_signal_T_R (cnx, -status);
1042
1043 done_writes = (cnx->priv->write_queue == NULL);
1044
1045 } else {
1046 if (status == LINK_IO_FATAL_ERROR) {
1047 d_printf ("Fatal error on queued write\n");
1048 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1049
1050 } else {
1051 d_printf ("Write blocked\n");
1052 done_writes = FALSE;
1053 }
1054 }
1055 }
1056
1057 d_printf ("Blocked write queue %s\n", done_writes ?
1058 "flushed & empty" : "still active");
1059
1060 if (done_writes) /* drop G_IO_OUT */
1061 link_watch_set_condition
1062 (cnx->priv->tag,
1063 LINK_ERR_CONDS | LINK_IN_CONDS);
1064 else
1065 link_watch_set_condition
1066 (cnx->priv->tag,
1067 LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
1068}
1069
1070void
1071link_connection_exec_set_condition (LinkCommandSetCondition *cmd, gboolean immediate)
1072{
1073 d_printf ("Exec defered set condition on %p -> 0x%x\n",
1074 cmd->cnx, cmd->condition);
1075
1076 if (!immediate)
1077 CNX_LOCK (cmd->cnx);
1078
1079 link_watch_set_condition (cmd->cnx->priv->tag, cmd->condition);
1080
1081 if (!immediate)
1082 link_connection_unref_unlock (cmd->cnx);
1083
1084 else /* special */
1085 link_connection_unref_T_ (cmd->cnx);
1086
1087 g_free (cmd);
1088}
1089
1090/**
1091 * link_connection_writev:
1092 * @cnx: the connection to write to
1093 * @vecs: a structure of iovecs to write - this is altered.
1094 * @nvecs: the number of populated iovecs
1095 * @opt_write_opts: optional write options, or NULL
1096 *
1097 * This routine writes data to the abstract connection.
1098 * FIXME: it allows re-enterancy via link_connection_iterate
1099 * in certain cases.
1100 * FIXME: on this basis, the connection can die underneath
1101 * our feet.
1102 *
1103 * Return value: 0 on success, non 0 on error.
1104 **/
1105LinkIOStatus
1106link_connection_writev (LinkConnection *cnx,
1107 struct iovec *vecs,
1108 int nvecs,
1109 const LinkWriteOpts *opt_write_opts)
1110{
1111 QueuedWrite qw;
1112 int status;
1113
1114 CNX_LOCK (cnx);
1115 link_connection_ref_T (cnx);
1116
1117 if (link_thread_safe ()) {
1118 d_printf ("Thread safe writev\n");
1119 if (cnx->status == LINK_CONNECTING) {
1120 queue_flattened_T_R (cnx, vecs, nvecs, TRUE);
1121 link_connection_unref_unlock (cnx);
1122 return LINK_IO_QUEUED_DATA;
1123 }
1124 } else if (cnx->options & LINK_CONNECTION_NONBLOCKING)
1125 link_connection_wait_connected (cnx);
1126
1127 if (cnx->status == LINK_DISCONNECTED) {
1128 link_connection_unref_unlock (cnx);
1129 return LINK_IO_FATAL_ERROR;
1130 }
1131
1132 if (cnx->priv->write_queue) {
1133 /* FIXME: we should really retry the write here, but we'll
1134 * get a POLLOUT for this lot at some stage anyway */
1135 queue_flattened_T_R (cnx, vecs, nvecs, FALSE);
1136 link_connection_unref_unlock (cnx);
1137 return LINK_IO_QUEUED_DATA;
1138 }
1139
1140 qw.vecs = vecs;
1141 qw.nvecs = nvecs;
1142
1143 continue_write:
1144 status = write_data_T (cnx, &qw);
1145
1146 if (status == LINK_IO_QUEUED_DATA) {
1147 if (link_thread_safe ()) {
1148 queue_flattened_T_R (cnx, qw.vecs, qw.nvecs, TRUE);
1149 link_connection_unref_unlock (cnx);
1150 return LINK_IO_QUEUED_DATA;
1151 }
1152
1153 /* Queue data & listen for buffer space */
1154 link_watch_set_condition
1155 (cnx->priv->tag,
1156 LINK_ERR_CONDS | LINK_IN_CONDS | G_IO_OUT);
1157
1158 if (!link_connection_should_block (cnx, opt_write_opts)) {
1159 queue_flattened_T_R (cnx, qw.vecs, qw.nvecs, FALSE);
1160 link_connection_unref_unlock (cnx);
1161 return LINK_IO_QUEUED_DATA;
1162
1163 } else {
1164 link_main_iteration (TRUE);
1165 goto continue_write;
1166 }
1167
1168 } else if (status >= LINK_IO_OK)
1169 status = LINK_IO_OK;
1170
1171 link_connection_unref_unlock (cnx);
1172
1173 return status;
1174}
1175
1176/**
1177 * link_connection_write:
1178 * @cnx: the connection to write to
1179 * @buf: a pointer to the start of an array of bytes
1180 * @len: the length of the array in bytes
1181 * @opt_write_opts: optional write options, or NULL
1182 *
1183 * Writes a contiguous block of data to the abstract connection.
1184 *
1185 * FIXME: it allows re-enterancy via link_connection_iterate
1186 * in certain cases.
1187 * FIXME: on this basis, the connection can die underneath
1188 * our feet eg. between the main_iteration and the
1189 * g_return_if_fail.
1190 *
1191 * Return value: 0 on success, non 0 on error.
1192 **/
1193LinkIOStatus
1194link_connection_write (LinkConnection *cnx,
1195 const guchar *buf,
1196 gulong len,
1197 const LinkWriteOpts *opt_write_opts)
1198{
1199 struct iovec vec;
1200
1201 vec.iov_base = (guchar *) buf;
1202 vec.iov_len = len;
1203
1204 return link_connection_writev (cnx, &vec, 1, opt_write_opts);
1205}
1206
1207static void
1208link_connection_dispose (GObject *obj)
1209{
1210 LinkConnection *cnx = (LinkConnection *)obj;
1211
1212 d_printf ("dispose connection %p\n", obj);
1213
1214 link_source_remove (cnx);
1215 queue_free (cnx);
1216
1217 parent_class->dispose (obj);
1218}
1219
1220static void
1221link_connection_finalize (GObject *obj)
1222{
1223 GSList *l;
1224 LinkConnection *cnx = (LinkConnection *)obj;
1225
1226 link_close_fd (cnx);
1227
1228 for (l = cnx->idle_broken_callbacks; l; l = l->next)
1229 g_free (l->data);
1230 g_slist_free (cnx->idle_broken_callbacks);
1231
1232 g_free (cnx->remote_host_info);
1233 g_free (cnx->remote_serv_info);
1234
1235 g_free (cnx->priv);
1236
1237#ifdef G_ENABLE_DEBUG
1238 g_assert (g_list_find(cnx_list, cnx) == NULL);
1239#endif
1240
1241 parent_class->finalize (obj);
1242}
1243
1244static void
1245link_connection_init (LinkConnection *cnx)
1246{
1247 d_printf ("create new connection %p\n", cnx);
1248
1249 cnx->priv = g_new0 (LinkConnectionPrivate, 1);
1250 cnx->priv->fd = -1;
1251 cnx->priv->was_disconnected = FALSE;
1252#ifdef CONNECTION_DEBUG
1253 cnx->priv->total_read_bytes = 0;
1254 cnx->priv->total_written_bytes = 0;
1255#endif
1256}
1257
1258static void
1259link_connection_class_init (LinkConnectionClass *klass)
1260{
1261 GObjectClass *object_class = (GObjectClass *) klass;
1262
1263 object_class->dispose = link_connection_dispose;
1264 object_class->finalize = link_connection_finalize;
1265
1266 signals [BROKEN] =
1267 g_signal_new ("broken",
1268 G_TYPE_FROM_CLASS (object_class),
1269 G_SIGNAL_RUN_LAST,
1270 G_STRUCT_OFFSET (LinkConnectionClass, broken),
1271 NULL, NULL,
1272 g_cclosure_marshal_VOID__VOID,
1273 G_TYPE_NONE, 0);
1274
1275 signals [BLOCKING] =
1276 g_signal_new ("blocking",
1277 G_TYPE_FROM_CLASS (object_class),
1278 G_SIGNAL_RUN_LAST,
1279 G_STRUCT_OFFSET (LinkConnectionClass, blocking),
1280 NULL, NULL,
1281 g_cclosure_marshal_VOID__ULONG,
1282 G_TYPE_NONE, 1, G_TYPE_ULONG);
1283
1284 parent_class = g_type_class_peek_parent (klass);
1285}
1286
1287GType
1288link_connection_get_type (void)
1289{
1290 static GType object_type = 0;
1291
1292 if (!object_type) {
1293 static const GTypeInfo object_info = {
1294 sizeof (LinkConnectionClass),
1295 (GBaseInitFunc) NULL,
1296 (GBaseFinalizeFunc) NULL,
1297 (GClassInitFunc) link_connection_class_init,
1298 NULL, /* class_finalize */
1299 NULL, /* class_data */
1300 sizeof (LinkConnection),
1301 0, /* n_preallocs */
1302 (GInstanceInitFunc) link_connection_init,
1303 };
1304
1305 object_type = g_type_register_static (G_TYPE_OBJECT,
1306 "LinkConnection",
1307 &object_info,
1308 0);
1309 }
1310
1311 return object_type;
1312}
1313
1314
1315LinkWriteOpts *
1316link_write_options_new (gboolean block_on_write)
1317{
1318 LinkWriteOpts *write_opts = g_new0 (LinkWriteOpts, 1);
1319
1320 write_opts->block_on_write = block_on_write;
1321
1322 return write_opts;
1323}
1324
1325void
1326link_write_options_free (LinkWriteOpts *write_opts)
1327{
1328 g_free (write_opts);
1329}
1330
1331void
1332link_connection_set_max_buffer (LinkConnection *cnx,
1333 gulong max_buffer_bytes)
1334{
1335 g_return_if_fail (cnx != NULL);
1336
1337 CNX_LOCK (cnx);
1338 /* FIXME: we might want to check the current buffer size */
1339 cnx->priv->max_buffer_bytes = max_buffer_bytes;
1340
1341 CNX_UNLOCK (cnx);
1342}
1343
1344static gboolean
1345link_connection_io_handler (GIOChannel *gioc,
1346 GIOCondition condition,
1347 gpointer data)
1348{
1349 LinkConnection *cnx = data;
1350 LinkConnectionClass *klass;
1351
1352 d_printf ("link_connection_io_handler fd %d, 0x%x\n",
1353 cnx->priv->fd, condition);
1354
1355 CNX_LOCK (cnx);
1356 link_connection_ref_T (cnx);
1357
1358 klass = (LinkConnectionClass *) G_TYPE_INSTANCE_GET_CLASS (
1359 data, LINK_TYPE_CONNECTION, LinkConnection);
1360
1361 if (cnx->status == LINK_CONNECTED &&
1362 condition & LINK_IN_CONDS && klass->handle_input) {
1363
1364 d_printf ("Handle input on fd %d\n", cnx->priv->fd);
1365
1366 CNX_UNLOCK (cnx);
1367 klass->handle_input (cnx);
1368 CNX_LOCK (cnx);
1369 }
1370
1371 if (cnx->status == LINK_CONNECTED && condition & G_IO_OUT) {
1372 d_printf ("IO Out - buffer space free ...\n");
1373 link_connection_flush_write_queue_T_R (cnx);
1374 }
1375
1376 if (condition & (LINK_ERR_CONDS | G_IO_OUT)) {
1377 int rv, n;
1378 LinkSockLen n_size = sizeof (n);
1379
1380 switch (cnx->status) {
1381 case LINK_CONNECTING:
1382 n = 0;
1383 rv = getsockopt (cnx->priv->fd, SOL_SOCKET, SO_ERROR, (char *) &n, &n_size);
1384 if (!rv && !n && condition == G_IO_OUT) {
1385 d_printf ("State changed to connected on %d\n", cnx->priv->fd);
1386
1387 link_watch_set_condition (
1388 cnx->priv->tag,
1389 LINK_ERR_CONDS | LINK_IN_CONDS);
1390
1391 link_connection_state_changed_T_R (cnx, LINK_CONNECTED);
1392
1393 if (cnx->priv->write_queue) {
1394 d_printf ("Connected, with queued writes, start flush ...\n");
1395 link_connection_flush_write_queue_T_R (cnx);
1396 }
1397 } else {
1398 d_printf ("Error connecting %d %d %d on fd %d\n",
1399 rv, n, errno, cnx->priv->fd);
1400 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1401 }
1402 break;
1403 case LINK_CONNECTED: {
1404 if (condition & LINK_ERR_CONDS) {
1405 d_printf ("Disconnect on err: %d\n", cnx->priv->fd);
1406 link_connection_state_changed_T_R (cnx, LINK_DISCONNECTED);
1407 }
1408 break;
1409 }
1410 default:
1411 break;
1412 }
1413 }
1414
1415 link_connection_unref_unlock (cnx);
1416
1417 return TRUE;
1418}
1419
1420LinkConnectionStatus
1421link_connection_get_status (LinkConnection *cnx)
1422{
1423 LinkConnectionStatus status;
1424
1425 CNX_LOCK (cnx);
1426 status = cnx->status;
1427 CNX_UNLOCK (cnx);
1428
1429 d_printf ("Get status on %p = %d\n", cnx, status);
1430
1431 return status;
1432}
1433
1434void
1435link_connection_exec_disconnect (LinkCommandDisconnect *cmd, gboolean immediate)
1436{
1437 d_printf ("Exec defered disconnect on %p\n", cmd->cnx);
1438
1439 link_connection_state_changed (cmd->cnx, LINK_DISCONNECTED);
1440
1441 link_connection_unref (cmd->cnx);
1442 g_free (cmd);
1443}
1444
1445void
1446link_connection_disconnect (LinkConnection *cnx)
1447{
1448 LinkCommandDisconnect *cmd;
1449
1450 cmd = g_new (LinkCommandDisconnect, 1);
1451 cmd->cmd.type = LINK_COMMAND_DISCONNECT;
1452 cmd->cnx = link_connection_ref (cnx);
1453
1454 link_exec_command ((LinkCommand *) cmd);
1455}
1456
1457LinkConnectionStatus
1458link_connection_wait_connected (LinkConnection *cnx)
1459{
1460 LinkConnectionStatus status;
1461
1462 CNX_LOCK (cnx);
1463
1464 status = link_connection_wait_connected_T (cnx);
1465
1466 CNX_UNLOCK (cnx);
1467
1468 return status;
1469}
1470
1471void
1472link_connections_move_io_T (gboolean to_io_thread)
1473{
1474 GList *l;
1475 for (l = cnx_list; l; l = l->next) {
1476 LinkConnection *cnx = l->data;
1477 link_watch_move_io (cnx->priv->tag, to_io_thread);
1478 }
1479}
1480
1481void
1482link_connection_add_broken_cb (LinkConnection *cnx,
1483 LinkBrokenCallback fn,
1484 gpointer user_data)
1485{
1486 BrokenCallback *bc = g_new0 (BrokenCallback, 1);
1487
1488 g_return_if_fail (fn != NULL);
1489
1490 bc->fn = fn;
1491 bc->user_data = user_data;
1492
1493 cnx->idle_broken_callbacks = g_slist_prepend (cnx->idle_broken_callbacks, bc);
1494}
1495
1496static gboolean
1497broken_callback_match (BrokenCallback *bc,
1498 LinkBrokenCallback fn,
1499 gpointer user_data)
1500{
1501 return ( (!fn || bc->fn == fn) &&
1502 (!user_data || bc->user_data == user_data) );
1503}
1504
1505void
1506link_connection_remove_broken_cb (LinkConnection *cnx,
1507 LinkBrokenCallback opt_fn,
1508 gpointer opt_user_data)
1509{
1510 GSList *l, *next;
1511
1512 CNX_LOCK (cnx);
1513
1514 for (l = cnx->idle_broken_callbacks; l; l = next) {
1515 next = l->next;
1516 if (broken_callback_match (l->data, opt_fn, opt_user_data)) {
1517 g_free (l->data);
1518 cnx->idle_broken_callbacks =
1519 g_slist_delete_link (cnx->idle_broken_callbacks,
1520 l);
1521 }
1522 }
1523
1524 CNX_UNLOCK (cnx);
1525}
1526
1527void
1528link_connections_close (void)
1529{
1530 GList *cnx, *l;
1531
1532 if (!link_in_io_thread ())
1533 return;
1534
1535 CNX_LIST_LOCK();
1536 cnx = cnx_list;
1537 cnx_list = NULL;
1538 CNX_LIST_UNLOCK();
1539
1540 if (!cnx)
1541 return;
1542
1543#ifdef G_ENABLE_DEBUG
1544 g_warning ("FIXME: Need to shutdown linc connections ...");
1545#endif
1546 for (l = cnx; l; l = l->next)
1547 g_object_run_dispose (l->data);
1548
1549 g_list_free (cnx);
1550}
Note: See TracBrowser for help on using the repository browser.