source: trunk/synergy/lib/net/CSocketMultiplexer.cpp@ 3020

Last change on this file since 3020 was 2749, checked in by bird, 19 years ago

synergy v1.3.1 sources (zip).

File size: 8.0 KB
Line 
1/*
2 * synergy -- mouse and keyboard sharing utility
3 * Copyright (C) 2004 Chris Schoeneman
4 *
5 * This package is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * found in the file COPYING that should have accompanied this file.
8 *
9 * This package is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 */
14
15#include "CSocketMultiplexer.h"
16#include "ISocketMultiplexerJob.h"
17#include "CCondVar.h"
18#include "CLock.h"
19#include "CMutex.h"
20#include "CThread.h"
21#include "CLog.h"
22#include "TMethodJob.h"
23#include "CArch.h"
24#include "XArch.h"
25#include "stdvector.h"
26
27//
28// CSocketMultiplexer
29//
30
31CSocketMultiplexer* CSocketMultiplexer::s_instance = NULL;
32
33CSocketMultiplexer::CSocketMultiplexer() :
34 m_mutex(new CMutex),
35 m_thread(NULL),
36 m_update(false),
37 m_jobsReady(new CCondVar<bool>(m_mutex, false)),
38 m_jobListLock(new CCondVar<bool>(m_mutex, false)),
39 m_jobListLockLocked(new CCondVar<bool>(m_mutex, false)),
40 m_jobListLocker(NULL),
41 m_jobListLockLocker(NULL)
42{
43 assert(s_instance == NULL);
44
45 // this pointer just has to be unique and not NULL. it will
46 // never be dereferenced. it's used to identify cursor nodes
47 // in the jobs list.
48 m_cursorMark = reinterpret_cast<ISocketMultiplexerJob*>(this);
49
50 // start thread
51 m_thread = new CThread(new TMethodJob<CSocketMultiplexer>(
52 this, &CSocketMultiplexer::serviceThread));
53
54 s_instance = this;
55}
56
57CSocketMultiplexer::~CSocketMultiplexer()
58{
59 m_thread->cancel();
60 m_thread->unblockPollSocket();
61 m_thread->wait();
62 delete m_thread;
63 delete m_jobsReady;
64 delete m_jobListLock;
65 delete m_jobListLockLocked;
66 delete m_jobListLocker;
67 delete m_jobListLockLocker;
68 delete m_mutex;
69
70 // clean up jobs
71 for (CSocketJobMap::iterator i = m_socketJobMap.begin();
72 i != m_socketJobMap.end(); ++i) {
73 delete *(i->second);
74 }
75
76 s_instance = NULL;
77}
78
79CSocketMultiplexer*
80CSocketMultiplexer::getInstance()
81{
82 return s_instance;
83}
84
85void
86CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
87{
88 assert(socket != NULL);
89 assert(job != NULL);
90
91 // prevent other threads from locking the job list
92 lockJobListLock();
93
94 // break thread out of poll
95 m_thread->unblockPollSocket();
96
97 // lock the job list
98 lockJobList();
99
100 // insert/replace job
101 CSocketJobMap::iterator i = m_socketJobMap.find(socket);
102 if (i == m_socketJobMap.end()) {
103 // we *must* put the job at the end so the order of jobs in
104 // the list continue to match the order of jobs in pfds in
105 // serviceThread().
106 CJobCursor j = m_socketJobs.insert(m_socketJobs.end(), job);
107 m_update = true;
108 m_socketJobMap.insert(std::make_pair(socket, j));
109 }
110 else {
111 CJobCursor j = i->second;
112 if (*j != job) {
113 delete *j;
114 *j = job;
115 }
116 m_update = true;
117 }
118
119 // unlock the job list
120 unlockJobList();
121}
122
123void
124CSocketMultiplexer::removeSocket(ISocket* socket)
125{
126 assert(socket != NULL);
127
128 // prevent other threads from locking the job list
129 lockJobListLock();
130
131 // break thread out of poll
132 m_thread->unblockPollSocket();
133
134 // lock the job list
135 lockJobList();
136
137 // remove job. rather than removing it from the map we put NULL
138 // in the list instead so the order of jobs in the list continues
139 // to match the order of jobs in pfds in serviceThread().
140 CSocketJobMap::iterator i = m_socketJobMap.find(socket);
141 if (i != m_socketJobMap.end()) {
142 if (*(i->second) != NULL) {
143 delete *(i->second);
144 *(i->second) = NULL;
145 m_update = true;
146 }
147 }
148
149 // unlock the job list
150 unlockJobList();
151}
152
153void
154CSocketMultiplexer::serviceThread(void*)
155{
156 std::vector<IArchNetwork::CPollEntry> pfds;
157 IArchNetwork::CPollEntry pfd;
158
159 // service the connections
160 for (;;) {
161 CThread::testCancel();
162
163 // wait until there are jobs to handle
164 {
165 CLock lock(m_mutex);
166 while (!(bool)*m_jobsReady) {
167 m_jobsReady->wait();
168 }
169 }
170
171 // lock the job list
172 lockJobListLock();
173 lockJobList();
174
175 // collect poll entries
176 if (m_update) {
177 m_update = false;
178 pfds.clear();
179 pfds.reserve(m_socketJobMap.size());
180
181 CJobCursor cursor = newCursor();
182 CJobCursor jobCursor = nextCursor(cursor);
183 while (jobCursor != m_socketJobs.end()) {
184 ISocketMultiplexerJob* job = *jobCursor;
185 if (job != NULL) {
186 pfd.m_socket = job->getSocket();
187 pfd.m_events = 0;
188 if (job->isReadable()) {
189 pfd.m_events |= IArchNetwork::kPOLLIN;
190 }
191 if (job->isWritable()) {
192 pfd.m_events |= IArchNetwork::kPOLLOUT;
193 }
194 pfds.push_back(pfd);
195 }
196 jobCursor = nextCursor(cursor);
197 }
198 deleteCursor(cursor);
199 }
200
201 int status;
202 try {
203 // check for status
204 status = ARCH->pollSocket(&pfds[0], pfds.size(), -1);
205 }
206 catch (XArchNetwork& e) {
207 LOG((CLOG_WARN "error in socket multiplexer: %s", e.what().c_str()));
208 status = 0;
209 }
210
211 if (status != 0) {
212 // iterate over socket jobs, invoking each and saving the
213 // new job.
214 UInt32 i = 0;
215 CJobCursor cursor = newCursor();
216 CJobCursor jobCursor = nextCursor(cursor);
217 while (i < pfds.size() && jobCursor != m_socketJobs.end()) {
218 if (*jobCursor != NULL) {
219 // get poll state
220 unsigned short revents = pfds[i].m_revents;
221 bool read = ((revents & IArchNetwork::kPOLLIN) != 0);
222 bool write = ((revents & IArchNetwork::kPOLLOUT) != 0);
223 bool error = ((revents & (IArchNetwork::kPOLLERR |
224 IArchNetwork::kPOLLNVAL)) != 0);
225
226 // run job
227 ISocketMultiplexerJob* job = *jobCursor;
228 ISocketMultiplexerJob* newJob = job->run(read, write, error);
229
230 // save job, if different
231 if (newJob != job) {
232 CLock lock(m_mutex);
233 delete job;
234 *jobCursor = newJob;
235 m_update = true;
236 }
237 ++i;
238 }
239
240 // next job
241 jobCursor = nextCursor(cursor);
242 }
243 deleteCursor(cursor);
244 }
245
246 // delete any removed socket jobs
247 for (CSocketJobMap::iterator i = m_socketJobMap.begin();
248 i != m_socketJobMap.end();) {
249 if (*(i->second) == NULL) {
250 m_socketJobMap.erase(i++);
251 m_update = true;
252 }
253 else {
254 ++i;
255 }
256 }
257
258 // unlock the job list
259 unlockJobList();
260 }
261}
262
263CSocketMultiplexer::CJobCursor
264CSocketMultiplexer::newCursor()
265{
266 CLock lock(m_mutex);
267 return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark);
268}
269
270CSocketMultiplexer::CJobCursor
271CSocketMultiplexer::nextCursor(CJobCursor cursor)
272{
273 CLock lock(m_mutex);
274 CJobCursor j = m_socketJobs.end();
275 CJobCursor i = cursor;
276 while (++i != m_socketJobs.end()) {
277 if (*i != m_cursorMark) {
278 // found a real job (as opposed to a cursor)
279 j = i;
280
281 // move our cursor just past the job
282 m_socketJobs.splice(++i, m_socketJobs, cursor);
283 break;
284 }
285 }
286 return j;
287}
288
289void
290CSocketMultiplexer::deleteCursor(CJobCursor cursor)
291{
292 CLock lock(m_mutex);
293 m_socketJobs.erase(cursor);
294}
295
296void
297CSocketMultiplexer::lockJobListLock()
298{
299 CLock lock(m_mutex);
300
301 // wait for the lock on the lock
302 while (*m_jobListLockLocked) {
303 m_jobListLockLocked->wait();
304 }
305
306 // take ownership of the lock on the lock
307 *m_jobListLockLocked = true;
308 m_jobListLockLocker = new CThread(CThread::getCurrentThread());
309}
310
311void
312CSocketMultiplexer::lockJobList()
313{
314 CLock lock(m_mutex);
315
316 // make sure we're the one that called lockJobListLock()
317 assert(*m_jobListLockLocker == CThread::getCurrentThread());
318
319 // wait for the job list lock
320 while (*m_jobListLock) {
321 m_jobListLock->wait();
322 }
323
324 // take ownership of the lock
325 *m_jobListLock = true;
326 m_jobListLocker = m_jobListLockLocker;
327 m_jobListLockLocker = NULL;
328
329 // release the lock on the lock
330 *m_jobListLockLocked = false;
331 m_jobListLockLocked->broadcast();
332}
333
334void
335CSocketMultiplexer::unlockJobList()
336{
337 CLock lock(m_mutex);
338
339 // make sure we're the one that called lockJobList()
340 assert(*m_jobListLocker == CThread::getCurrentThread());
341
342 // release the lock
343 delete m_jobListLocker;
344 m_jobListLocker = NULL;
345 *m_jobListLock = false;
346 m_jobListLock->signal();
347
348 // set new jobs ready state
349 bool isReady = !m_socketJobMap.empty();
350 if (*m_jobsReady != isReady) {
351 *m_jobsReady = isReady;
352 m_jobsReady->signal();
353 }
354}
Note: See TracBrowser for help on using the repository browser.