source: trunk/synergy/lib/arch/CArchMultithreadPosix.cpp@ 2960

Last change on this file since 2960 was 2749, checked in by bird, 19 years ago

synergy v1.3.1 sources (zip).

File size: 17.5 KB
Line 
1/*
2 * synergy -- mouse and keyboard sharing utility
3 * Copyright (C) 2002 Chris Schoeneman
4 *
5 * This package is free software you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * found in the file COPYING that should have accompanied this file.
8 *
9 * This package is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 */
14
15#include "CArchMultithreadPosix.h"
16#include "CArch.h"
17#include "XArch.h"
18#include <signal.h>
19#if TIME_WITH_SYS_TIME
20# include <sys/time.h>
21# include <time.h>
22#else
23# if HAVE_SYS_TIME_H
24# include <sys/time.h>
25# else
26# include <time.h>
27# endif
28#endif
29#include <cerrno>
30
31#define SIGWAKEUP SIGUSR1
32
33#if !HAVE_PTHREAD_SIGNAL
34 // boy, is this platform broken. forget about pthread signal
35 // handling and let signals through to every process. synergy
36 // will not terminate cleanly when it gets SIGTERM or SIGINT.
37# define pthread_sigmask sigprocmask
38# define pthread_kill(tid_, sig_) kill(0, (sig_))
39# define sigwait(set_, sig_)
40# undef HAVE_POSIX_SIGWAIT
41# define HAVE_POSIX_SIGWAIT 1
42#endif
43
44static
45void
46setSignalSet(sigset_t* sigset)
47{
48 sigemptyset(sigset);
49 sigaddset(sigset, SIGHUP);
50 sigaddset(sigset, SIGINT);
51 sigaddset(sigset, SIGTERM);
52 sigaddset(sigset, SIGUSR2);
53}
54
55//
56// CArchThreadImpl
57//
58
59class CArchThreadImpl {
60public:
61 CArchThreadImpl();
62
63public:
64 int m_refCount;
65 IArchMultithread::ThreadID m_id;
66 pthread_t m_thread;
67 IArchMultithread::ThreadFunc m_func;
68 void* m_userData;
69 bool m_cancel;
70 bool m_cancelling;
71 bool m_exited;
72 void* m_result;
73 void* m_networkData;
74};
75
76CArchThreadImpl::CArchThreadImpl() :
77 m_refCount(1),
78 m_id(0),
79 m_func(NULL),
80 m_userData(NULL),
81 m_cancel(false),
82 m_cancelling(false),
83 m_exited(false),
84 m_result(NULL),
85 m_networkData(NULL)
86{
87 // do nothing
88}
89
90
91//
92// CArchMultithreadPosix
93//
94
95CArchMultithreadPosix* CArchMultithreadPosix::s_instance = NULL;
96
97CArchMultithreadPosix::CArchMultithreadPosix() :
98 m_newThreadCalled(false),
99 m_nextID(0)
100{
101 assert(s_instance == NULL);
102
103 s_instance = this;
104
105 // no signal handlers
106 for (size_t i = 0; i < kNUM_SIGNALS; ++i) {
107 m_signalFunc[i] = NULL;
108 m_signalUserData[i] = NULL;
109 }
110
111 // create mutex for thread list
112 m_threadMutex = newMutex();
113
114 // create thread for calling (main) thread and add it to our
115 // list. no need to lock the mutex since we're the only thread.
116 m_mainThread = new CArchThreadImpl;
117 m_mainThread->m_thread = pthread_self();
118 insert(m_mainThread);
119
120 // install SIGWAKEUP handler. this causes SIGWAKEUP to interrupt
121 // system calls. we use that when cancelling a thread to force it
122 // to wake up immediately if it's blocked in a system call. we
123 // won't need this until another thread is created but it's fine
124 // to install it now.
125 struct sigaction act;
126 sigemptyset(&act.sa_mask);
127# if defined(SA_INTERRUPT)
128 act.sa_flags = SA_INTERRUPT;
129# else
130 act.sa_flags = 0;
131# endif
132 act.sa_handler = &threadCancel;
133 sigaction(SIGWAKEUP, &act, NULL);
134
135 // set desired signal dispositions. let SIGWAKEUP through but
136 // ignore SIGPIPE (we'll handle EPIPE).
137 sigset_t sigset;
138 sigemptyset(&sigset);
139 sigaddset(&sigset, SIGWAKEUP);
140 pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
141 sigemptyset(&sigset);
142 sigaddset(&sigset, SIGPIPE);
143 pthread_sigmask(SIG_BLOCK, &sigset, NULL);
144}
145
146CArchMultithreadPosix::~CArchMultithreadPosix()
147{
148 assert(s_instance != NULL);
149
150 closeMutex(m_threadMutex);
151 s_instance = NULL;
152}
153
154void
155CArchMultithreadPosix::setNetworkDataForCurrentThread(void* data)
156{
157 lockMutex(m_threadMutex);
158 CArchThreadImpl* thread = find(pthread_self());
159 thread->m_networkData = data;
160 unlockMutex(m_threadMutex);
161}
162
163void*
164CArchMultithreadPosix::getNetworkDataForThread(CArchThread thread)
165{
166 lockMutex(m_threadMutex);
167 void* data = thread->m_networkData;
168 unlockMutex(m_threadMutex);
169 return data;
170}
171
172CArchMultithreadPosix*
173CArchMultithreadPosix::getInstance()
174{
175 return s_instance;
176}
177
178CArchCond
179CArchMultithreadPosix::newCondVar()
180{
181 CArchCondImpl* cond = new CArchCondImpl;
182 int status = pthread_cond_init(&cond->m_cond, NULL);
183 (void)status;
184 assert(status == 0);
185 return cond;
186}
187
188void
189CArchMultithreadPosix::closeCondVar(CArchCond cond)
190{
191 int status = pthread_cond_destroy(&cond->m_cond);
192 (void)status;
193 assert(status == 0);
194 delete cond;
195}
196
197void
198CArchMultithreadPosix::signalCondVar(CArchCond cond)
199{
200 int status = pthread_cond_signal(&cond->m_cond);
201 (void)status;
202 assert(status == 0);
203}
204
205void
206CArchMultithreadPosix::broadcastCondVar(CArchCond cond)
207{
208 int status = pthread_cond_broadcast(&cond->m_cond);
209 (void)status;
210 assert(status == 0);
211}
212
213bool
214CArchMultithreadPosix::waitCondVar(CArchCond cond,
215 CArchMutex mutex, double timeout)
216{
217 // we can't wait on a condition variable and also wake it up for
218 // cancellation since we don't use posix cancellation. so we
219 // must wake up periodically to check for cancellation. we
220 // can't simply go back to waiting after the check since the
221 // condition may have changed and we'll have lost the signal.
222 // so we have to return to the caller. since the caller will
223 // always check for spurious wakeups the only drawback here is
224 // performance: we're waking up a lot more than desired.
225 static const double maxCancellationLatency = 0.1;
226 if (timeout < 0.0 || timeout > maxCancellationLatency) {
227 timeout = maxCancellationLatency;
228 }
229
230 // see if we should cancel this thread
231 testCancelThread();
232
233 // get final time
234 struct timeval now;
235 gettimeofday(&now, NULL);
236 struct timespec finalTime;
237 finalTime.tv_sec = now.tv_sec;
238 finalTime.tv_nsec = now.tv_usec * 1000;
239 long timeout_sec = (long)timeout;
240 long timeout_nsec = (long)(1.0e+9 * (timeout - timeout_sec));
241 finalTime.tv_sec += timeout_sec;
242 finalTime.tv_nsec += timeout_nsec;
243 if (finalTime.tv_nsec >= 1000000000) {
244 finalTime.tv_nsec -= 1000000000;
245 finalTime.tv_sec += 1;
246 }
247
248 // wait
249 int status = pthread_cond_timedwait(&cond->m_cond,
250 &mutex->m_mutex, &finalTime);
251
252 // check for cancel again
253 testCancelThread();
254
255 switch (status) {
256 case 0:
257 // success
258 return true;
259
260 case ETIMEDOUT:
261 return false;
262
263 default:
264 assert(0 && "condition variable wait error");
265 return false;
266 }
267}
268
269CArchMutex
270CArchMultithreadPosix::newMutex()
271{
272 pthread_mutexattr_t attr;
273 int status = pthread_mutexattr_init(&attr);
274 assert(status == 0);
275 CArchMutexImpl* mutex = new CArchMutexImpl;
276 status = pthread_mutex_init(&mutex->m_mutex, &attr);
277 assert(status == 0);
278 return mutex;
279}
280
281void
282CArchMultithreadPosix::closeMutex(CArchMutex mutex)
283{
284 int status = pthread_mutex_destroy(&mutex->m_mutex);
285 (void)status;
286 assert(status == 0);
287 delete mutex;
288}
289
290void
291CArchMultithreadPosix::lockMutex(CArchMutex mutex)
292{
293 int status = pthread_mutex_lock(&mutex->m_mutex);
294
295 switch (status) {
296 case 0:
297 // success
298 return;
299
300 case EDEADLK:
301 assert(0 && "lock already owned");
302 break;
303
304 case EAGAIN:
305 assert(0 && "too many recursive locks");
306 break;
307
308 default:
309 assert(0 && "unexpected error");
310 break;
311 }
312}
313
314void
315CArchMultithreadPosix::unlockMutex(CArchMutex mutex)
316{
317 int status = pthread_mutex_unlock(&mutex->m_mutex);
318
319 switch (status) {
320 case 0:
321 // success
322 return;
323
324 case EPERM:
325 assert(0 && "thread doesn't own a lock");
326 break;
327
328 default:
329 assert(0 && "unexpected error");
330 break;
331 }
332}
333
334CArchThread
335CArchMultithreadPosix::newThread(ThreadFunc func, void* data)
336{
337 assert(func != NULL);
338
339 // initialize signal handler. we do this here instead of the
340 // constructor so we can avoid daemonizing (using fork())
341 // when there are multiple threads. clients can safely
342 // use condition variables and mutexes before creating a
343 // new thread and they can safely use the only thread
344 // they have access to, the main thread, so they really
345 // can't tell the difference.
346 if (!m_newThreadCalled) {
347 m_newThreadCalled = true;
348#if HAVE_PTHREAD_SIGNAL
349 startSignalHandler();
350#endif
351 }
352
353 lockMutex(m_threadMutex);
354
355 // create thread impl for new thread
356 CArchThreadImpl* thread = new CArchThreadImpl;
357 thread->m_func = func;
358 thread->m_userData = data;
359
360 // create the thread. pthread_create() on RedHat 7.2 smp fails
361 // if passed a NULL attr so use a default attr.
362 pthread_attr_t attr;
363 int status = pthread_attr_init(&attr);
364 if (status == 0) {
365 status = pthread_create(&thread->m_thread, &attr,
366 &CArchMultithreadPosix::threadFunc, thread);
367 pthread_attr_destroy(&attr);
368 }
369
370 // check if thread was started
371 if (status != 0) {
372 // failed to start thread so clean up
373 delete thread;
374 thread = NULL;
375 }
376 else {
377 // add thread to list
378 insert(thread);
379
380 // increment ref count to account for the thread itself
381 refThread(thread);
382 }
383
384 // note that the child thread will wait until we release this mutex
385 unlockMutex(m_threadMutex);
386
387 return thread;
388}
389
390CArchThread
391CArchMultithreadPosix::newCurrentThread()
392{
393 lockMutex(m_threadMutex);
394 CArchThreadImpl* thread = find(pthread_self());
395 unlockMutex(m_threadMutex);
396 assert(thread != NULL);
397 return thread;
398}
399
400void
401CArchMultithreadPosix::closeThread(CArchThread thread)
402{
403 assert(thread != NULL);
404
405 // decrement ref count and clean up thread if no more references
406 if (--thread->m_refCount == 0) {
407 // detach from thread (unless it's the main thread)
408 if (thread->m_func != NULL) {
409 pthread_detach(thread->m_thread);
410 }
411
412 // remove thread from list
413 lockMutex(m_threadMutex);
414 assert(findNoRef(thread->m_thread) == thread);
415 erase(thread);
416 unlockMutex(m_threadMutex);
417
418 // done with thread
419 delete thread;
420 }
421}
422
423CArchThread
424CArchMultithreadPosix::copyThread(CArchThread thread)
425{
426 refThread(thread);
427 return thread;
428}
429
430void
431CArchMultithreadPosix::cancelThread(CArchThread thread)
432{
433 assert(thread != NULL);
434
435 // set cancel and wakeup flags if thread can be cancelled
436 bool wakeup = false;
437 lockMutex(m_threadMutex);
438 if (!thread->m_exited && !thread->m_cancelling) {
439 thread->m_cancel = true;
440 wakeup = true;
441 }
442 unlockMutex(m_threadMutex);
443
444 // force thread to exit system calls if wakeup is true
445 if (wakeup) {
446 pthread_kill(thread->m_thread, SIGWAKEUP);
447 }
448}
449
450void
451CArchMultithreadPosix::setPriorityOfThread(CArchThread thread, int /*n*/)
452{
453 assert(thread != NULL);
454
455 // FIXME
456}
457
458void
459CArchMultithreadPosix::testCancelThread()
460{
461 // find current thread
462 lockMutex(m_threadMutex);
463 CArchThreadImpl* thread = findNoRef(pthread_self());
464 unlockMutex(m_threadMutex);
465
466 // test cancel on thread
467 testCancelThreadImpl(thread);
468}
469
470bool
471CArchMultithreadPosix::wait(CArchThread target, double timeout)
472{
473 assert(target != NULL);
474
475 lockMutex(m_threadMutex);
476
477 // find current thread
478 CArchThreadImpl* self = findNoRef(pthread_self());
479
480 // ignore wait if trying to wait on ourself
481 if (target == self) {
482 unlockMutex(m_threadMutex);
483 return false;
484 }
485
486 // ref the target so it can't go away while we're watching it
487 refThread(target);
488
489 unlockMutex(m_threadMutex);
490
491 try {
492 // do first test regardless of timeout
493 testCancelThreadImpl(self);
494 if (isExitedThread(target)) {
495 closeThread(target);
496 return true;
497 }
498
499 // wait and repeat test if there's a timeout
500 if (timeout != 0.0) {
501 const double start = ARCH->time();
502 do {
503 // wait a little
504 ARCH->sleep(0.05);
505
506 // repeat test
507 testCancelThreadImpl(self);
508 if (isExitedThread(target)) {
509 closeThread(target);
510 return true;
511 }
512
513 // repeat wait and test until timed out
514 } while (timeout < 0.0 || (ARCH->time() - start) <= timeout);
515 }
516
517 closeThread(target);
518 return false;
519 }
520 catch (...) {
521 closeThread(target);
522 throw;
523 }
524}
525
526bool
527CArchMultithreadPosix::isSameThread(CArchThread thread1, CArchThread thread2)
528{
529 return (thread1 == thread2);
530}
531
532bool
533CArchMultithreadPosix::isExitedThread(CArchThread thread)
534{
535 lockMutex(m_threadMutex);
536 bool exited = thread->m_exited;
537 unlockMutex(m_threadMutex);
538 return exited;
539}
540
541void*
542CArchMultithreadPosix::getResultOfThread(CArchThread thread)
543{
544 lockMutex(m_threadMutex);
545 void* result = thread->m_result;
546 unlockMutex(m_threadMutex);
547 return result;
548}
549
550IArchMultithread::ThreadID
551CArchMultithreadPosix::getIDOfThread(CArchThread thread)
552{
553 return thread->m_id;
554}
555
556void
557CArchMultithreadPosix::setSignalHandler(
558 ESignal signal, SignalFunc func, void* userData)
559{
560 lockMutex(m_threadMutex);
561 m_signalFunc[signal] = func;
562 m_signalUserData[signal] = userData;
563 unlockMutex(m_threadMutex);
564}
565
566void
567CArchMultithreadPosix::raiseSignal(ESignal signal)
568{
569 lockMutex(m_threadMutex);
570 if (m_signalFunc[signal] != NULL) {
571 m_signalFunc[signal](signal, m_signalUserData[signal]);
572 pthread_kill(m_mainThread->m_thread, SIGWAKEUP);
573 }
574 else if (signal == kINTERRUPT || signal == kTERMINATE) {
575 ARCH->cancelThread(m_mainThread);
576 }
577 unlockMutex(m_threadMutex);
578}
579
580void
581CArchMultithreadPosix::startSignalHandler()
582{
583 // set signal mask. the main thread blocks these signals and
584 // the signal handler thread will listen for them.
585 sigset_t sigset, oldsigset;
586 setSignalSet(&sigset);
587 pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
588
589 // fire up the INT and TERM signal handler thread. we could
590 // instead arrange to catch and handle these signals but
591 // we'd be unable to cancel the main thread since no pthread
592 // calls are allowed in a signal handler.
593 pthread_attr_t attr;
594 int status = pthread_attr_init(&attr);
595 if (status == 0) {
596 status = pthread_create(&m_signalThread, &attr,
597 &CArchMultithreadPosix::threadSignalHandler,
598 NULL);
599 pthread_attr_destroy(&attr);
600 }
601 if (status != 0) {
602 // can't create thread to wait for signal so don't block
603 // the signals.
604 pthread_sigmask(SIG_UNBLOCK, &oldsigset, NULL);
605 }
606}
607
608CArchThreadImpl*
609CArchMultithreadPosix::find(pthread_t thread)
610{
611 CArchThreadImpl* impl = findNoRef(thread);
612 if (impl != NULL) {
613 refThread(impl);
614 }
615 return impl;
616}
617
618CArchThreadImpl*
619CArchMultithreadPosix::findNoRef(pthread_t thread)
620{
621 // linear search
622 for (CThreadList::const_iterator index = m_threadList.begin();
623 index != m_threadList.end(); ++index) {
624 if ((*index)->m_thread == thread) {
625 return *index;
626 }
627 }
628 return NULL;
629}
630
631void
632CArchMultithreadPosix::insert(CArchThreadImpl* thread)
633{
634 assert(thread != NULL);
635
636 // thread shouldn't already be on the list
637 assert(findNoRef(thread->m_thread) == NULL);
638
639 // set thread id. note that we don't worry about m_nextID
640 // wrapping back to 0 and duplicating thread ID's since the
641 // likelihood of synergy running that long is vanishingly
642 // small.
643 thread->m_id = ++m_nextID;
644
645 // append to list
646 m_threadList.push_back(thread);
647}
648
649void
650CArchMultithreadPosix::erase(CArchThreadImpl* thread)
651{
652 for (CThreadList::iterator index = m_threadList.begin();
653 index != m_threadList.end(); ++index) {
654 if (*index == thread) {
655 m_threadList.erase(index);
656 break;
657 }
658 }
659}
660
661void
662CArchMultithreadPosix::refThread(CArchThreadImpl* thread)
663{
664 assert(thread != NULL);
665 assert(findNoRef(thread->m_thread) != NULL);
666 ++thread->m_refCount;
667}
668
669void
670CArchMultithreadPosix::testCancelThreadImpl(CArchThreadImpl* thread)
671{
672 assert(thread != NULL);
673
674 // update cancel state
675 lockMutex(m_threadMutex);
676 bool cancel = false;
677 if (thread->m_cancel && !thread->m_cancelling) {
678 thread->m_cancelling = true;
679 thread->m_cancel = false;
680 cancel = true;
681 }
682 unlockMutex(m_threadMutex);
683
684 // unwind thread's stack if cancelling
685 if (cancel) {
686 throw XThreadCancel();
687 }
688}
689
690void*
691CArchMultithreadPosix::threadFunc(void* vrep)
692{
693 // get the thread
694 CArchThreadImpl* thread = reinterpret_cast<CArchThreadImpl*>(vrep);
695
696 // setup pthreads
697 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
698 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
699
700 // run thread
701 s_instance->doThreadFunc(thread);
702
703 // terminate the thread
704 return NULL;
705}
706
707void
708CArchMultithreadPosix::doThreadFunc(CArchThread thread)
709{
710 // default priority is slightly below normal
711 setPriorityOfThread(thread, 1);
712
713 // wait for parent to initialize this object
714 lockMutex(m_threadMutex);
715 unlockMutex(m_threadMutex);
716
717 void* result = NULL;
718 try {
719 // go
720 result = (*thread->m_func)(thread->m_userData);
721 }
722
723 catch (XThreadCancel&) {
724 // client called cancel()
725 }
726 catch (...) {
727 // note -- don't catch (...) to avoid masking bugs
728 lockMutex(m_threadMutex);
729 thread->m_exited = true;
730 unlockMutex(m_threadMutex);
731 closeThread(thread);
732 throw;
733 }
734
735 // thread has exited
736 lockMutex(m_threadMutex);
737 thread->m_result = result;
738 thread->m_exited = true;
739 unlockMutex(m_threadMutex);
740
741 // done with thread
742 closeThread(thread);
743}
744
745void
746CArchMultithreadPosix::threadCancel(int)
747{
748 // do nothing
749}
750
751void*
752CArchMultithreadPosix::threadSignalHandler(void*)
753{
754 // detach
755 pthread_detach(pthread_self());
756
757 // add signal to mask
758 sigset_t sigset;
759 setSignalSet(&sigset);
760
761 // also wait on SIGABRT. on linux (others?) this thread (process)
762 // will persist after all the other threads evaporate due to an
763 // assert unless we wait on SIGABRT. that means our resources (like
764 // the socket we're listening on) are not released and never will be
765 // until the lingering thread is killed. i don't know why sigwait()
766 // should protect the thread from being killed. note that sigwait()
767 // doesn't actually return if we receive SIGABRT and, for some
768 // reason, we don't have to block SIGABRT.
769 sigaddset(&sigset, SIGABRT);
770
771 // we exit the loop via thread cancellation in sigwait()
772 for (;;) {
773 // wait
774#if HAVE_POSIX_SIGWAIT
775 int signal = 0;
776 sigwait(&sigset, &signal);
777#else
778 sigwait(&sigset);
779#endif
780
781 // if we get here then the signal was raised
782 switch (signal) {
783 case SIGINT:
784 ARCH->raiseSignal(kINTERRUPT);
785 break;
786
787 case SIGTERM:
788 ARCH->raiseSignal(kTERMINATE);
789 break;
790
791 case SIGHUP:
792 ARCH->raiseSignal(kHANGUP);
793 break;
794
795 case SIGUSR2:
796 ARCH->raiseSignal(kUSER);
797 break;
798
799 default:
800 // ignore
801 break;
802 }
803 }
804
805 return NULL;
806}
Note: See TracBrowser for help on using the repository browser.