/* * synergy -- mouse and keyboard sharing utility * Copyright (C) 2002 Chris Schoeneman * Copyright (C) 2006 Knut St. Osmundsen * * This package is free software you can redistribute it and/or * modify it under the terms of the GNU General Public License * found in the file COPYING that should have accompanied this file. * * This package is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. */ #include "CArchMultithreadOS2.h" #include "CArch.h" #include "XArch.h" #include // // note -- implementation of condition variable taken from: // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html // titled "Strategies for Implementing POSIX Condition Variables // on Win32." it also provides an implementation that doesn't // suffer from the incorrectness problem described in our // corresponding header but it is slower, still unfair, and // can cause busy waiting. // // // CArchThreadImpl // class CArchThreadImpl { public: CArchThreadImpl(); ~CArchThreadImpl(); public: int m_refCount; int m_id; IArchMultithread::ThreadFunc m_func; void* m_userData; HEV m_cancel; bool m_cancelling; HEV m_exit; void* m_result; void* m_networkData; }; CArchThreadImpl::CArchThreadImpl() : m_refCount(1), m_id(0), m_func(NULL), m_userData(NULL), m_cancel(NULLHANDLE), m_cancelling(false), m_exit(NULLHANDLE), m_result(NULL), m_networkData(NULL) { APIRET rc = DosCreateEventSem(NULL, &m_exit, 0, FALSE); assert(rc == NO_ERROR); rc = DosCreateEventSem(NULL, &m_cancel, 0, FALSE); assert(rc == NO_ERROR); } CArchThreadImpl::~CArchThreadImpl() { DosCloseEventSem(m_exit); DosCloseEventSem(m_cancel); } // // CArchMultithreadWindows // CArchMultithreadOS2* CArchMultithreadOS2::s_instance = NULL; CArchMultithreadOS2::CArchMultithreadOS2() { assert(s_instance == NULL); s_instance = this; // no signal handlers for (size_t i = 0; i < kNUM_SIGNALS; ++i) { m_signalFunc[i] = NULL; m_signalUserData[i] = NULL; } // create mutex for thread list m_threadMutex = newMutex(); // create thread for calling (main) thread and add it to our // list. no need to lock the mutex since we're the only thread. m_mainThread = new CArchThreadImpl; m_mainThread->m_id = _gettid(); insert(m_mainThread); } CArchMultithreadOS2::~CArchMultithreadOS2() { s_instance = NULL; // clean up thread list for (CThreadList::iterator index = m_threadList.begin(); index != m_threadList.end(); ++index) { delete *index; } // done with mutex delete m_threadMutex; } void CArchMultithreadOS2::setNetworkDataForCurrentThread(void* data) { lockMutex(m_threadMutex); CArchThreadImpl* thread = findNoRef(_gettid()); thread->m_networkData = data; unlockMutex(m_threadMutex); } void* CArchMultithreadOS2::getNetworkDataForThread(CArchThread thread) { lockMutex(m_threadMutex); void* data = thread->m_networkData; unlockMutex(m_threadMutex); return data; } HEV CArchMultithreadOS2::getCancelEventForCurrentThread() { lockMutex(m_threadMutex); CArchThreadImpl* thread = findNoRef(_gettid()); unlockMutex(m_threadMutex); return thread->m_cancel; } CArchMultithreadOS2* CArchMultithreadOS2::getInstance() { return s_instance; } CArchCond CArchMultithreadOS2::newCondVar() { CArchCondImpl* cond = new CArchCondImpl; APIRET rc = DosCreateEventSem(NULL, &cond->m_events[CArchCondImpl::kSignal], /*DCE_AUTORESET*/0, FALSE); assert(rc == NO_ERROR); rc = DosCreateEventSem(NULL, &cond->m_events[CArchCondImpl::kBroadcast], 0, FALSE); assert(rc == NO_ERROR); cond->m_waitCountMutex = newMutex(); cond->m_waitCount = 0; return cond; } void CArchMultithreadOS2::closeCondVar(CArchCond cond) { DosCloseEventSem(cond->m_events[CArchCondImpl::kSignal]); DosCloseEventSem(cond->m_events[CArchCondImpl::kBroadcast]); closeMutex(cond->m_waitCountMutex); delete cond; } void CArchMultithreadOS2::signalCondVar(CArchCond cond) { // is anybody waiting? lockMutex(cond->m_waitCountMutex); if (cond->m_waitCount > 0) DosPostEventSem(cond->m_events[CArchCondImpl::kSignal]); unlockMutex(cond->m_waitCountMutex); } void CArchMultithreadOS2::broadcastCondVar(CArchCond cond) { // is anybody waiting? lockMutex(cond->m_waitCountMutex); if (cond->m_waitCount > 0) DosPostEventSem(cond->m_events[CArchCondImpl::kBroadcast]); unlockMutex(cond->m_waitCountMutex); } bool CArchMultithreadOS2::waitCondVar(CArchCond cond, CArchMutex mutex, double timeout) { // prepare to wait const ULONG os2Timeout = (timeout < 0.0) ? SEM_INDEFINITE_WAIT : static_cast(1000.0 * timeout); // make a list of the condition variable events and the cancel event // for the current thread. SEMRECORD handles[3]; handles[0].hsemCur = (HSEM)cond->m_events[CArchCondImpl::kSignal]; handles[0].ulUser = 0; handles[1].hsemCur = (HSEM)cond->m_events[CArchCondImpl::kBroadcast]; handles[1].ulUser = 1; handles[2].hsemCur = (HSEM)getCancelEventForCurrentThread(); handles[2].ulUser = 2; HMUX hmux = NULLHANDLE; APIRET rc = DosCreateMuxWaitSem(NULL, &hmux, 3, &handles[0], DCMW_WAIT_ANY); if (rc != NO_ERROR) { assert(rc == NO_ERROR); return false; } // update waiter count lockMutex(cond->m_waitCountMutex); ++cond->m_waitCount; unlockMutex(cond->m_waitCountMutex); // release mutex. this should be atomic with the wait so that it's // impossible for another thread to signal us between the unlock and // the wait, which would lead to a lost signal on broadcasts. // however, we're using a manual reset event for broadcasts which // stays set until we reset it, so we don't lose the broadcast. unlockMutex(mutex); // wait for a signal or broadcast // we're using a a manual reset semaphore, that complicates matters. ULONG iSem = ~0UL; for (;;) { rc = DosWaitMuxWaitSem(hmux, os2Timeout, &iSem); lockMutex(cond->m_waitCountMutex); if (rc != NO_ERROR) { iSem = ~0UL; break; } // cancel takes priority if ( iSem != 2 && DosWaitEventSem((HEV)handles[2].hsemCur, 0) == NO_ERROR) iSem = 2; // see if we got here first. if (iSem != 0) break; ULONG cPosts = 0; rc = DosResetEventSem((HEV)handles[0].hsemCur, &cPosts); if (rc == NO_ERROR && cPosts > 0) { while (--cPosts > 0) DosPostEventSem((HEV)handles[0].hsemCur); break; } } DosCloseMuxWaitSem(hmux); // update the waiter count and reset the broadcast event if we're the last waiter if ((iSem == 1 && cond->m_waitCount == 0)) { ULONG ulIgnore; DosResetEventSem(cond->m_events[CArchCondImpl::kBroadcast], &ulIgnore); } unlockMutex(cond->m_waitCountMutex); // reacquire the mutex lockMutex(mutex); // cancel thread if necessary if (iSem == 2) { ARCH->testCancelThread(); } // return success or failure return (rc == NO_ERROR && (iSem == 0 || iSem == 1)); } CArchMutex CArchMultithreadOS2::newMutex() { CArchMutexImpl* mutex = new CArchMutexImpl; _fmutex_create2(&mutex->m_mutex, 0, __FUNCTION__); return mutex; } void CArchMultithreadOS2::closeMutex(CArchMutex mutex) { _fmutex_close(&mutex->m_mutex); delete mutex; } void CArchMultithreadOS2::lockMutex(CArchMutex mutex) { _fmutex_request(&mutex->m_mutex, 0); } void CArchMultithreadOS2::unlockMutex(CArchMutex mutex) { _fmutex_release(&mutex->m_mutex); } CArchThread CArchMultithreadOS2::newThread(ThreadFunc func, void* data) { lockMutex(m_threadMutex); // create thread impl for new thread CArchThreadImpl* thread = new CArchThreadImpl; thread->m_func = func; thread->m_userData = data; // create thread thread->m_id = _beginthread(threadFunc, NULL, 256*1024, thread); // check if thread was started if (thread->m_id < 0) { // failed to start thread so clean up delete thread; thread = NULL; } else { // add thread to list insert(thread); // increment ref count to account for the thread itself refThread(thread); } // note that the child thread will wait until we release this mutex unlockMutex(m_threadMutex); return thread; } CArchThread CArchMultithreadOS2::newCurrentThread() { lockMutex(m_threadMutex); CArchThreadImpl* thread = find(_gettid()); unlockMutex(m_threadMutex); assert(thread != NULL); return thread; } void CArchMultithreadOS2::closeThread(CArchThread thread) { assert(thread != NULL); // decrement ref count and clean up thread if no more references if (--thread->m_refCount == 0) { // remove thread from list lockMutex(m_threadMutex); assert(findNoRefOrCreate(thread->m_id) == thread); erase(thread); unlockMutex(m_threadMutex); // done with thread delete thread; } } CArchThread CArchMultithreadOS2::copyThread(CArchThread thread) { refThread(thread); return thread; } void CArchMultithreadOS2::cancelThread(CArchThread thread) { assert(thread != NULL); // set cancel flag DosPostEventSem(thread->m_cancel); } void CArchMultithreadOS2::setPriorityOfThread(CArchThread thread, int n) { #if 0 /* not implemented on posix, so ignore. */ struct CPriorityInfo { public: ULONG m_class; int m_level; }; static const CPriorityInfo s_pClass[] = { { IDLE_PRIORITY_CLASS, THREAD_PRIORITY_IDLE }, { IDLE_PRIORITY_CLASS, THREAD_PRIORITY_LOWEST }, { IDLE_PRIORITY_CLASS, THREAD_PRIORITY_BELOW_NORMAL }, { IDLE_PRIORITY_CLASS, THREAD_PRIORITY_NORMAL }, { IDLE_PRIORITY_CLASS, THREAD_PRIORITY_ABOVE_NORMAL }, { IDLE_PRIORITY_CLASS, THREAD_PRIORITY_HIGHEST }, { NORMAL_PRIORITY_CLASS, THREAD_PRIORITY_LOWEST }, { NORMAL_PRIORITY_CLASS, THREAD_PRIORITY_BELOW_NORMAL }, { NORMAL_PRIORITY_CLASS, THREAD_PRIORITY_NORMAL }, { NORMAL_PRIORITY_CLASS, THREAD_PRIORITY_ABOVE_NORMAL }, { NORMAL_PRIORITY_CLASS, THREAD_PRIORITY_HIGHEST }, { HIGH_PRIORITY_CLASS, THREAD_PRIORITY_LOWEST }, { HIGH_PRIORITY_CLASS, THREAD_PRIORITY_BELOW_NORMAL }, { HIGH_PRIORITY_CLASS, THREAD_PRIORITY_NORMAL }, { HIGH_PRIORITY_CLASS, THREAD_PRIORITY_ABOVE_NORMAL }, { HIGH_PRIORITY_CLASS, THREAD_PRIORITY_HIGHEST }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_IDLE }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_LOWEST }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_BELOW_NORMAL }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_NORMAL }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_ABOVE_NORMAL }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_HIGHEST }, { REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_TIME_CRITICAL} }; #if defined(_DEBUG) // don't use really high priorities when debugging static const size_t s_pMax = 13; #else static const size_t s_pMax = sizeof(s_pClass) / sizeof(s_pClass[0]) - 1; #endif static const size_t s_pBase = 8; // index of normal priority assert(thread != NULL); size_t index; if (n > 0 && s_pBase < (size_t)n) { // lowest priority index = 0; } else { index = (size_t)((int)s_pBase - n); if (index > s_pMax) { // highest priority index = s_pMax; } } SetPriorityClass(GetCurrentProcess(), s_pClass[index].m_class); SetThreadPriority(thread->m_thread, s_pClass[index].m_level); #endif /* */ } void CArchMultithreadOS2::testCancelThread() { // find current thread lockMutex(m_threadMutex); CArchThreadImpl* thread = findNoRef(_gettid()); unlockMutex(m_threadMutex); // test cancel on thread testCancelThreadImpl(thread); } bool CArchMultithreadOS2::wait(CArchThread target, double timeout) { assert(target != NULL); lockMutex(m_threadMutex); // find current thread CArchThreadImpl* self = findNoRef(_gettid()); // ignore wait if trying to wait on ourself if (target == self) { unlockMutex(m_threadMutex); return false; } // ref the target so it can't go away while we're watching it refThread(target); unlockMutex(m_threadMutex); // convert timeout ULONG os2Timeout; if (timeout < 0.0) { os2Timeout = SEM_INDEFINITE_WAIT; } else { os2Timeout = (ULONG)(1000.0 * timeout); } // fixme! // wait for this thread to be cancelled or woken up or for the // target thread to terminate. SEMRECORD handles[2]; handles[0].hsemCur = (HSEM)target->m_exit; handles[0].ulUser = 0; handles[1].hsemCur = (HSEM)self->m_cancel; handles[1].ulUser = 1; HMUX hmux = NULLHANDLE; APIRET rc = DosCreateMuxWaitSem(NULL, &hmux, 2, &handles[0], DCMW_WAIT_ANY); if (rc != NO_ERROR) { assert(rc == NO_ERROR); return false; } ULONG iSem = ~0UL; rc = DosWaitMuxWaitSem(hmux, os2Timeout, &iSem); if (rc != NO_ERROR) { iSem = ~0UL; } DosCloseMuxWaitSem(hmux); // cancel takes priority if ( rc == NO_ERROR && iSem != 1 && DosWaitEventSem((HEV)handles[1].hsemCur, 0) == NO_ERROR) { iSem = 1; } // release target closeThread(target); // handle result switch (iSem) { case 0: // target thread terminated return true; case 1: // this thread was cancelled. does not return. testCancelThreadImpl(self); default: // timeout or error return false; } } bool CArchMultithreadOS2::isSameThread(CArchThread thread1, CArchThread thread2) { return (thread1 == thread2); } bool CArchMultithreadOS2::isExitedThread(CArchThread thread) { // poll exit event APIRET rc = DosWaitEventSem(thread->m_exit, 0); return (rc == NO_ERROR); } void* CArchMultithreadOS2::getResultOfThread(CArchThread thread) { lockMutex(m_threadMutex); void* result = thread->m_result; unlockMutex(m_threadMutex); return result; } IArchMultithread::ThreadID CArchMultithreadOS2::getIDOfThread(CArchThread thread) { return static_cast(thread->m_id); } void CArchMultithreadOS2::setSignalHandler( ESignal signal, SignalFunc func, void* userData) { lockMutex(m_threadMutex); m_signalFunc[signal] = func; m_signalUserData[signal] = userData; unlockMutex(m_threadMutex); } void CArchMultithreadOS2::raiseSignal(ESignal signal) { lockMutex(m_threadMutex); if (m_signalFunc[signal] != NULL) { m_signalFunc[signal](signal, m_signalUserData[signal]); ARCH->unblockPollSocket(m_mainThread); } else if (signal == kINTERRUPT || signal == kTERMINATE) { ARCH->cancelThread(m_mainThread); } unlockMutex(m_threadMutex); } CArchThreadImpl* CArchMultithreadOS2::find(int id) { CArchThreadImpl* impl = findNoRef(id); if (impl != NULL) { refThread(impl); } return impl; } CArchThreadImpl* CArchMultithreadOS2::findNoRef(int id) { CArchThreadImpl* impl = findNoRefOrCreate(id); if (impl == NULL) { // create thread for calling thread which isn't in our list and // add it to the list. this won't normally happen but it can if // the system calls us under a new thread, like it does when we // run as a service. impl = new CArchThreadImpl; impl->m_id = _gettid(); insert(impl); } return impl; } CArchThreadImpl* CArchMultithreadOS2::findNoRefOrCreate(int id) { // linear search for (CThreadList::const_iterator index = m_threadList.begin(); index != m_threadList.end(); ++index) { if ((*index)->m_id == id) { return *index; } } return NULL; } void CArchMultithreadOS2::insert(CArchThreadImpl* thread) { assert(thread != NULL); // thread shouldn't already be on the list assert(findNoRefOrCreate(thread->m_id) == NULL); // append to list m_threadList.push_back(thread); } void CArchMultithreadOS2::erase(CArchThreadImpl* thread) { for (CThreadList::iterator index = m_threadList.begin(); index != m_threadList.end(); ++index) { if (*index == thread) { m_threadList.erase(index); break; } } } void CArchMultithreadOS2::refThread(CArchThreadImpl* thread) { assert(thread != NULL); assert(findNoRefOrCreate(thread->m_id) != NULL); ++thread->m_refCount; } void CArchMultithreadOS2::testCancelThreadImpl(CArchThreadImpl* thread) { assert(thread != NULL); // poll cancel event. return if not set. APIRET rc = DosWaitEventSem(thread->m_cancel, 0); if (rc != NO_ERROR) { return; } // update cancel state lockMutex(m_threadMutex); bool cancel = !thread->m_cancelling; thread->m_cancelling = true; ULONG ulIgnored; DosResetEventSem(thread->m_cancel, &ulIgnored); unlockMutex(m_threadMutex); // unwind thread's stack if cancelling if (cancel) { throw XThreadCancel(); } } void CArchMultithreadOS2::threadFunc(void* vrep) { // get the thread CArchThreadImpl* thread = reinterpret_cast(vrep); // run thread s_instance->doThreadFunc(thread); // terminate the thread _endthread(); } void CArchMultithreadOS2::doThreadFunc(CArchThread thread) { // wait for parent to initialize this object lockMutex(m_threadMutex); unlockMutex(m_threadMutex); void* result = NULL; try { // go result = (*thread->m_func)(thread->m_userData); } catch (XThreadCancel&) { // client called cancel() } catch (...) { // note -- don't catch (...) to avoid masking bugs DosPostEventSem(thread->m_exit); closeThread(thread); throw; } // thread has exited lockMutex(m_threadMutex); thread->m_result = result; unlockMutex(m_threadMutex); DosPostEventSem(thread->m_exit); // done with thread closeThread(thread); }