source: trunk/ORBit2-2.14.0/linc2/src/linc.c

Last change on this file was 92, checked in by cinc, 19 years ago

Orbit2 modified for use with NOM

File size: 12.2 KB
Line 
1/*
2 * linc.c: This file is part of the linc library.
3 *
4 * Authors:
5 * Elliot Lee (sopwith@redhat.com)
6 * Michael Meeks (michael@ximian.com)
7 * Mark McLouglin (mark@skynet.ie) & others
8 *
9 * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10 * Sun Microsystems, Inc.
11 */
12#include <config.h>
13
14#include <stdio.h>
15#include <errno.h>
16#ifdef HAVE_UNISTD_H
17# include <unistd.h>
18#endif
19#include <signal.h>
20#include <fcntl.h>
21#include "linc-private.h"
22#include "linc-compat.h"
23
24#include <glib/gstdio.h>
25
26/* whether we do locking or not */
27static gboolean link_is_thread_safe = TRUE;
28/* an inferior loop/context for std. processing */
29GMainLoop *link_loop = NULL;
30static GMainContext *link_context = NULL;
31/* an inferior context for the I/O thread */
32static GThread *link_io_thread = NULL;
33static GMainLoop *link_thread_loop = NULL;
34static GMainContext *link_thread_context = NULL;
35static gboolean link_is_io_in_thread = FALSE;
36
37/* a big global lock for link */
38static GMutex *link_main_lock;
39static GCond *link_main_cond;
40/* command dispatch to the I/O loop */
41static GMutex *link_cmd_queue_lock = NULL;
42static GCond *link_cmd_queue_cond = NULL;
43static GList *link_cmd_queue = NULL;
44
45static int link_wakeup_fds[2] = { -1, -1 };
46#define LINK_WAKEUP_POLL link_wakeup_fds [0]
47#define LINK_WAKEUP_WRITE link_wakeup_fds [1]
48static GSource *link_main_source = NULL;
49
50#ifdef LINK_SSL_SUPPORT
51SSL_METHOD *link_ssl_method;
52SSL_CTX *link_ssl_ctx;
53#endif
54
55static void link_dispatch_command (gpointer data, gboolean immediate);
56
57gboolean
58link_thread_io (void)
59{
60 gboolean result;
61
62 /* FIXME: re-factor this to avoid locking */
63 result = link_io_thread != NULL;
64
65 return result;
66}
67
68gboolean
69link_thread_safe (void)
70{
71 return link_is_thread_safe;
72}
73
74static gboolean
75cmd_is_sync (LinkCommand *cmd)
76{
77 return (cmd->type == LINK_COMMAND_SET_IO_THREAD) ||
78 (cmd->type == LINK_COMMAND_CNX_UNREF);
79}
80
81static gboolean
82link_mainloop_handle_input (GIOChannel *source,
83 GIOCondition condition,
84 gpointer data)
85{
86 char c;
87 GList *l, *queue;
88
89 g_mutex_lock (link_cmd_queue_lock);
90
91#ifdef HAVE_WINSOCK2_H
92 recv (LINK_WAKEUP_POLL, &c, sizeof (c), 0);
93#else
94 read (LINK_WAKEUP_POLL, &c, sizeof (c));
95#endif
96 queue = link_cmd_queue;
97 link_cmd_queue = NULL;
98
99 g_mutex_unlock (link_cmd_queue_lock);
100
101 for (l = queue; l; l = l->next) {
102 gboolean sync;
103
104 sync = cmd_is_sync (l->data);
105
106 link_dispatch_command (l->data, FALSE);
107
108 if (sync) {
109 ((LinkSyncCommand *)l->data)->complete = TRUE;
110 g_cond_signal (link_cmd_queue_cond);
111 }
112 }
113
114 g_list_free (queue);
115
116 return TRUE;
117}
118
119void
120link_exec_command (LinkCommand *cmd)
121{
122 int res = 0;
123
124 if (link_in_io_thread ()) {
125 link_dispatch_command (cmd, TRUE);
126 return;
127 }
128
129 LINK_MUTEX_LOCK (link_cmd_queue_lock);
130
131 if (LINK_WAKEUP_WRITE == -1) { /* shutdown main loop */
132 LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
133 link_dispatch_command (cmd, TRUE);
134 return;
135 }
136
137 if (!link_cmd_queue) {
138 char c = 'A'; /* magic */
139#ifdef HAVE_WINSOCK2_H
140 while ((res = send (LINK_WAKEUP_WRITE, &c, sizeof (c), 0)) == SOCKET_ERROR &&
141 (WSAGetLastError () == WSAEWOULDBLOCK));
142#else
143 while ((res = write (LINK_WAKEUP_WRITE, &c, sizeof (c))) < 0 &&
144 (errno == EAGAIN || errno == EINTR));
145#endif
146 }
147
148 link_cmd_queue = g_list_append (link_cmd_queue, cmd);
149
150 if (cmd_is_sync (cmd))
151 while (!((LinkSyncCommand *)cmd)->complete)
152 g_cond_wait (link_cmd_queue_cond,
153 link_cmd_queue_lock);
154
155 LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
156
157 if (res < 0)
158 g_error ("Failed to write to linc wakeup socket %d 0x%x(%d) (%d)",
159 res, errno, errno, LINK_WAKEUP_WRITE);
160}
161
162#if defined (CONNECTION_DEBUG) && defined (CONNECTION_DEBUG_FLAG)
163gboolean link_connection_debug_flag = FALSE;
164#endif
165
166/**
167 * link_init:
168 * @thread_safe: if we want thread safety enabled.
169 *
170 * Initialize linc.
171 **/
172void
173link_init (gboolean thread_safe)
174{
175#if defined (CONNECTION_DEBUG) && defined (CONNECTION_DEBUG_FLAG)
176 if (getenv ("LINK_CONNECTION_DEBUG"))
177 link_connection_debug_flag = TRUE;
178 if (link_connection_debug_flag &&
179 getenv ("LINK_PER_PROCESS_STDERR") &&
180 fileno (stderr) >= 0) {
181 char *stderr_file = g_build_filename (g_get_tmp_dir (),
182 g_strdup_printf ("link_debug.%d", getpid ()),
183 NULL);
184 int fd;
185 fd = g_open (stderr_file, O_WRONLY|O_CREAT, 0666);
186 if (fd >= 0) {
187 char *prgname = g_get_prgname ();
188 d_printf ("Redirecting stderr of %s to %s\n",
189 (prgname ? prgname : "this process"), stderr_file);
190 dup2 (fd, fileno (stderr));
191 close (fd);
192 }
193 d_printf ("stderr redirected here\n");
194 }
195#endif
196
197 if (thread_safe && !g_thread_supported ())
198 g_thread_init (NULL);
199
200 link_is_thread_safe = (thread_safe && g_thread_supported());
201
202 g_type_init ();
203
204#ifdef SIGPIPE
205 /*
206 * Link's raison d'etre is for ORBit2 and Bonobo
207 *
208 * In Bonobo, components and containers must not crash if the
209 * remote end crashes. If a remote server crashes and then we
210 * try to make a CORBA call on it, we may get a SIGPIPE. So,
211 * for lack of a better solution, we ignore SIGPIPE here. This
212 * is open for reconsideration in the future.
213 *
214 * When SIGPIPE is ignored, write() calls which would
215 * ordinarily trigger a signal will instead return -1 and set
216 * errno to EPIPE. So linc will be able to catch these
217 * errors instead of letting them kill the component.
218 *
219 * Possibilities are the MSG_PEEK trick, where you test if the
220 * connection is dead right before doing the writev(). That
221 * approach has two problems:
222 *
223 * 1. There is the possibility of a race condition, where
224 * the remote end calls right after the test, and right
225 * before the writev().
226 *
227 * 2. An extra system call per write might be regarded by
228 * some as a performance hit.
229 *
230 * Another possibility is to surround the call to writev() in
231 * link_connection_writev (linc-connection.c) with something like
232 * this:
233 *
234 * link_ignore_sigpipe = 1;
235 *
236 * result = writev ( ... );
237 *
238 * link_ignore_sigpipe = 0;
239 *
240 * The SIGPIPE signal handler will check the global
241 * link_ignore_sigpipe variable and ignore the signal if it
242 * is 1. If it is 0, it can proxy to the user's original
243 * signal handler. This is a real possibility.
244 */
245 signal (SIGPIPE, SIG_IGN);
246#endif
247
248 link_context = g_main_context_new ();
249 link_loop = g_main_loop_new (link_context, TRUE);
250
251#ifdef LINK_SSL_SUPPORT
252 SSLeay_add_ssl_algorithms ();
253 link_ssl_method = SSLv23_method ();
254 link_ssl_ctx = SSL_CTX_new (link_ssl_method);
255#endif
256
257 link_main_lock = link_mutex_new ();
258 link_cmd_queue_lock = link_mutex_new ();
259 if (link_is_thread_safe) {
260 link_main_cond = g_cond_new ();
261 link_cmd_queue_cond = g_cond_new ();
262 }
263
264#ifdef HAVE_WINSOCK2_H
265 {
266 WSADATA wsadata;
267 if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0)
268 g_error ("Windows Sockets could not be initialized");
269 }
270#endif
271}
272
273/**
274 * link_main_iteration:
275 * @block_for_reply: whether we should wait for a reply
276 *
277 * This routine iterates the linc mainloop, which has
278 * only the linc sources registered against it.
279 **/
280void
281link_main_iteration (gboolean block_for_reply)
282{
283 g_main_context_iteration (
284 link_context, block_for_reply);
285}
286
287/**
288 * link_main_pending:
289 *
290 * determines if the linc mainloop has any pending work to process.
291 *
292 * Return value: TRUE if the linc mainloop has any pending work to process.
293 **/
294gboolean
295link_main_pending (void)
296{
297 return g_main_context_pending (link_context);
298}
299
300/**
301 * link_main_loop_run:
302 *
303 * Runs the linc mainloop; blocking until the loop is exited.
304 **/
305void
306link_main_loop_run (void)
307{
308 g_main_loop_run (link_loop);
309}
310
311/**
312 * link_mutex_new:
313 *
314 * Creates a mutex, iff threads are supported, initialized etc.
315 *
316 * Return value: a new GMutex, or NULL if one is not required.
317 **/
318GMutex *
319link_mutex_new (void)
320{
321 if (link_is_thread_safe)
322 return g_mutex_new ();
323 else
324 return NULL;
325}
326
327gboolean
328link_in_io_thread (void)
329{
330 return (!link_io_thread ||
331 g_thread_self() == link_io_thread);
332}
333
334GMainContext *
335link_main_get_context (void)
336{
337 return link_context;
338}
339
340/*
341 * This method is unreliable, and for use
342 * only for debugging.
343 */
344gboolean
345link_mutex_is_locked (GMutex *lock)
346{
347#ifdef __GLIBC__
348 gboolean result = TRUE;
349
350 if (lock && g_mutex_trylock (lock)) {
351 result = FALSE;
352 g_mutex_unlock (lock);
353 }
354
355 return result;
356#else
357 /*
358 * On at least Solaris & BSD if we link our
359 * app without -lthread, and pull in ORBit2
360 * with threading enabled, we get NOP pthread
361 * operations. This is fine mostly, but we get
362 * bogus return values from trylock which screws
363 * our debugging.
364 */
365 d_printf ("hosed system is_lock-ing\n");
366 return TRUE;
367#endif
368}
369
370void
371link_shutdown (void)
372{
373 if (link_loop) /* break into the linc loop */
374 g_main_loop_quit (link_loop);
375
376 if (link_thread_loop)
377 g_main_loop_quit (link_thread_loop);
378
379 if (link_io_thread) {
380 g_thread_join (link_io_thread);
381 link_io_thread = NULL;
382 }
383}
384
385GMainContext *
386link_thread_io_context (void)
387{
388 return link_thread_context;
389}
390
391static gpointer
392link_io_thread_fn (gpointer data)
393{
394 g_main_loop_run (link_thread_loop);
395
396 /* FIXME: need to be able to quit without waiting ... */
397
398 /* Asked to quit - so ...
399 * a) stop accepting inputs [ kill servers ]
400 * b) flush outgoing queued data etc. (oneways)
401 * c) unref all leakable resources.
402 */
403
404 link_connections_close ();
405
406 /* A tad of shutdown */
407 LINK_MUTEX_LOCK (link_cmd_queue_lock);
408 if (LINK_WAKEUP_WRITE >= 0) {
409#ifdef HAVE_WINSOCK2_H
410 closesocket (LINK_WAKEUP_WRITE);
411 closesocket (LINK_WAKEUP_POLL);
412#else
413 close (LINK_WAKEUP_WRITE);
414 close (LINK_WAKEUP_POLL);
415#endif
416 LINK_WAKEUP_WRITE = -1;
417 LINK_WAKEUP_POLL = -1;
418 }
419 LINK_MUTEX_UNLOCK (link_cmd_queue_lock);
420
421 if (link_main_source) {
422 g_source_destroy (link_main_source);
423 g_source_unref (link_main_source);
424 link_main_source = NULL;
425 }
426
427 return NULL;
428}
429
430static void
431link_exec_set_io_thread (gpointer data, gboolean immediate)
432{
433 GError *error = NULL;
434 gboolean to_io_thread = TRUE;
435
436 if (link_is_io_in_thread)
437 return;
438
439 link_lock ();
440 g_mutex_lock (link_cmd_queue_lock);
441
442 link_is_io_in_thread = TRUE;
443
444 link_thread_context = g_main_context_new ();
445 link_thread_loop = g_main_loop_new (link_thread_context, TRUE);
446
447 link_connections_move_io_T (to_io_thread);
448 link_servers_move_io_T (to_io_thread);
449
450 if (link_pipe (link_wakeup_fds) < 0)
451 g_error ("Can't create CORBA main-thread wakeup pipe");
452
453 link_main_source = link_source_create_watch
454 (link_thread_context, LINK_WAKEUP_POLL,
455 NULL, (G_IO_IN | G_IO_PRI),
456 link_mainloop_handle_input, NULL);
457
458 link_io_thread = g_thread_create_full
459 (link_io_thread_fn, NULL, 256 * 1024, TRUE, FALSE,
460 G_THREAD_PRIORITY_NORMAL, &error);
461
462 if (!link_io_thread || error)
463 g_error ("Failed to create linc worker thread");
464
465 g_main_loop_quit (link_loop);
466
467 g_mutex_unlock (link_cmd_queue_lock);
468 link_unlock ();
469}
470
471void
472link_set_io_thread (gboolean io_in_thread)
473{
474 LinkSyncCommand cmd = { { 0 }, 0 };
475
476#ifdef G_ENABLE_DEBUG
477 g_warning ("FIXME: guard from double entry");
478#endif
479
480 if (link_is_io_in_thread)
481 return;
482
483 cmd.cmd.type = LINK_COMMAND_SET_IO_THREAD;
484
485 link_exec_command (&cmd.cmd);
486}
487
488static void
489link_dispatch_command (gpointer data, gboolean immediate)
490{
491 LinkCommand *cmd = data;
492 switch (cmd->type) {
493 case LINK_COMMAND_SET_CONDITION:
494 link_connection_exec_set_condition (data, immediate);
495 break;
496 case LINK_COMMAND_DISCONNECT:
497 link_connection_exec_disconnect (data, immediate);
498 break;
499 case LINK_COMMAND_SET_IO_THREAD:
500 link_exec_set_io_thread (data, immediate);
501 break;
502 case LINK_COMMAND_CNX_UNREF:
503 link_connection_exec_cnx_unref (data, immediate);
504 break;
505 default:
506 g_error ("Unimplemented (%d)", cmd->type);
507 break;
508 }
509}
510
511void
512link_lock (void)
513{
514 if (link_main_lock)
515 g_mutex_lock (link_main_lock);
516}
517
518void
519link_unlock (void)
520{
521 if (link_main_lock)
522 g_mutex_unlock (link_main_lock);
523}
524
525void
526link_signal (void)
527{
528 if (link_is_thread_safe && link_is_io_in_thread) {
529 g_assert (link_main_cond != NULL);
530 g_assert (link_is_locked ());
531 g_cond_signal (link_main_cond);
532 }
533}
534
535void
536link_wait (void)
537{
538 if (!(link_is_thread_safe && link_is_io_in_thread)) {
539 link_unlock ();
540 link_main_iteration (TRUE);
541 link_lock ();
542 } else {
543 g_assert (link_main_cond != NULL);
544 g_cond_wait (link_main_cond, link_main_lock);
545 }
546}
547
548
549
550gboolean
551link_is_locked (void)
552{
553 return link_mutex_is_locked (link_main_lock);
554}
Note: See TracBrowser for help on using the repository browser.