source: trunk/synergy/lib/net/CTCPSocket.cpp@ 3418

Last change on this file since 3418 was 2749, checked in by bird, 19 years ago

synergy v1.3.1 sources (zip).

File size: 11.2 KB
Line 
1/*
2 * synergy -- mouse and keyboard sharing utility
3 * Copyright (C) 2002 Chris Schoeneman
4 *
5 * This package is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * found in the file COPYING that should have accompanied this file.
8 *
9 * This package is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 */
14
15#include "CTCPSocket.h"
16#include "CNetworkAddress.h"
17#include "CSocketMultiplexer.h"
18#include "TSocketMultiplexerMethodJob.h"
19#include "XSocket.h"
20#include "CLock.h"
21#include "CLog.h"
22#include "IEventQueue.h"
23#include "IEventJob.h"
24#include "CArch.h"
25#include "XArch.h"
26#include <string.h>
27
28//
29// CTCPSocket
30//
31
32CTCPSocket::CTCPSocket() :
33 m_mutex(),
34 m_flushed(&m_mutex, true)
35{
36 try {
37 m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
38 }
39 catch (XArchNetwork& e) {
40 throw XSocketCreate(e.what());
41 }
42
43 init();
44}
45
46CTCPSocket::CTCPSocket(CArchSocket socket) :
47 m_mutex(),
48 m_socket(socket),
49 m_flushed(&m_mutex, true)
50{
51 assert(m_socket != NULL);
52
53 // socket starts in connected state
54 init();
55 onConnected();
56 setJob(newJob());
57}
58
59CTCPSocket::~CTCPSocket()
60{
61 try {
62 close();
63 }
64 catch (...) {
65 // ignore
66 }
67}
68
69void
70CTCPSocket::bind(const CNetworkAddress& addr)
71{
72 try {
73 ARCH->bindSocket(m_socket, addr.getAddress());
74 }
75 catch (XArchNetworkAddressInUse& e) {
76 throw XSocketAddressInUse(e.what());
77 }
78 catch (XArchNetwork& e) {
79 throw XSocketBind(e.what());
80 }
81}
82
83void
84CTCPSocket::close()
85{
86 // remove ourself from the multiplexer
87 setJob(NULL);
88
89 CLock lock(&m_mutex);
90
91 // clear buffers and enter disconnected state
92 if (m_connected) {
93 sendEvent(getDisconnectedEvent());
94 }
95 onDisconnected();
96
97 // close the socket
98 if (m_socket != NULL) {
99 CArchSocket socket = m_socket;
100 m_socket = NULL;
101 try {
102 ARCH->closeSocket(socket);
103 }
104 catch (XArchNetwork& e) {
105 // ignore, there's not much we can do
106 LOG((CLOG_WARN "error closing socket: %s", e.what().c_str()));
107 }
108 }
109}
110
111void*
112CTCPSocket::getEventTarget() const
113{
114 return const_cast<void*>(reinterpret_cast<const void*>(this));
115}
116
117UInt32
118CTCPSocket::read(void* buffer, UInt32 n)
119{
120 // copy data directly from our input buffer
121 CLock lock(&m_mutex);
122 UInt32 size = m_inputBuffer.getSize();
123 if (n > size) {
124 n = size;
125 }
126 if (buffer != NULL) {
127 memcpy(buffer, m_inputBuffer.peek(n), n);
128 }
129 m_inputBuffer.pop(n);
130
131 // if no more data and we cannot read or write then send disconnected
132 if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
133 sendEvent(getDisconnectedEvent());
134 m_connected = false;
135 }
136
137 return n;
138}
139
140void
141CTCPSocket::write(const void* buffer, UInt32 n)
142{
143 bool wasEmpty;
144 {
145 CLock lock(&m_mutex);
146
147 // must not have shutdown output
148 if (!m_writable) {
149 sendEvent(getOutputErrorEvent());
150 return;
151 }
152
153 // ignore empty writes
154 if (n == 0) {
155 return;
156 }
157
158 // copy data to the output buffer
159 wasEmpty = (m_outputBuffer.getSize() == 0);
160 m_outputBuffer.write(buffer, n);
161
162 // there's data to write
163 m_flushed = false;
164 }
165
166 // make sure we're waiting to write
167 if (wasEmpty) {
168 setJob(newJob());
169 }
170}
171
172void
173CTCPSocket::flush()
174{
175 CLock lock(&m_mutex);
176 while (m_flushed == false) {
177 m_flushed.wait();
178 }
179}
180
181void
182CTCPSocket::shutdownInput()
183{
184 bool useNewJob = false;
185 {
186 CLock lock(&m_mutex);
187
188 // shutdown socket for reading
189 try {
190 ARCH->closeSocketForRead(m_socket);
191 }
192 catch (XArchNetwork&) {
193 // ignore
194 }
195
196 // shutdown buffer for reading
197 if (m_readable) {
198 sendEvent(getInputShutdownEvent());
199 onInputShutdown();
200 useNewJob = true;
201 }
202 }
203 if (useNewJob) {
204 setJob(newJob());
205 }
206}
207
208void
209CTCPSocket::shutdownOutput()
210{
211 bool useNewJob = false;
212 {
213 CLock lock(&m_mutex);
214
215 // shutdown socket for writing
216 try {
217 ARCH->closeSocketForWrite(m_socket);
218 }
219 catch (XArchNetwork&) {
220 // ignore
221 }
222
223 // shutdown buffer for writing
224 if (m_writable) {
225 sendEvent(getOutputShutdownEvent());
226 onOutputShutdown();
227 useNewJob = true;
228 }
229 }
230 if (useNewJob) {
231 setJob(newJob());
232 }
233}
234
235bool
236CTCPSocket::isReady() const
237{
238 CLock lock(&m_mutex);
239 return (m_inputBuffer.getSize() > 0);
240}
241
242UInt32
243CTCPSocket::getSize() const
244{
245 CLock lock(&m_mutex);
246 return m_inputBuffer.getSize();
247}
248
249void
250CTCPSocket::connect(const CNetworkAddress& addr)
251{
252 {
253 CLock lock(&m_mutex);
254
255 // fail on attempts to reconnect
256 if (m_socket == NULL || m_connected) {
257 sendConnectionFailedEvent("busy");
258 return;
259 }
260
261 try {
262 if (ARCH->connectSocket(m_socket, addr.getAddress())) {
263 sendEvent(getConnectedEvent());
264 onConnected();
265 }
266 else {
267 // connection is in progress
268 m_writable = true;
269 }
270 }
271 catch (XArchNetwork& e) {
272 throw XSocketConnect(e.what());
273 }
274 }
275 setJob(newJob());
276}
277
278void
279CTCPSocket::init()
280{
281 // default state
282 m_connected = false;
283 m_readable = false;
284 m_writable = false;
285
286 try {
287 // turn off Nagle algorithm. we send lots of very short messages
288 // that should be sent without (much) delay. for example, the
289 // mouse motion messages are much less useful if they're delayed.
290 ARCH->setNoDelayOnSocket(m_socket, true);
291 }
292 catch (XArchNetwork& e) {
293 try {
294 ARCH->closeSocket(m_socket);
295 m_socket = NULL;
296 }
297 catch (XArchNetwork&) {
298 // ignore
299 }
300 throw XSocketCreate(e.what());
301 }
302}
303
304void
305CTCPSocket::setJob(ISocketMultiplexerJob* job)
306{
307 // multiplexer will delete the old job
308 if (job == NULL) {
309 CSocketMultiplexer::getInstance()->removeSocket(this);
310 }
311 else {
312 CSocketMultiplexer::getInstance()->addSocket(this, job);
313 }
314}
315
316ISocketMultiplexerJob*
317CTCPSocket::newJob()
318{
319 // note -- must have m_mutex locked on entry
320
321 if (m_socket == NULL) {
322 return NULL;
323 }
324 else if (!m_connected) {
325 assert(!m_readable);
326 if (!(m_readable || m_writable)) {
327 return NULL;
328 }
329 return new TSocketMultiplexerMethodJob<CTCPSocket>(
330 this, &CTCPSocket::serviceConnecting,
331 m_socket, m_readable, m_writable);
332 }
333 else {
334 if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
335 return NULL;
336 }
337 return new TSocketMultiplexerMethodJob<CTCPSocket>(
338 this, &CTCPSocket::serviceConnected,
339 m_socket, m_readable,
340 m_writable && (m_outputBuffer.getSize() > 0));
341 }
342}
343
344void
345CTCPSocket::sendConnectionFailedEvent(const char* msg)
346{
347 CConnectionFailedInfo* info = (CConnectionFailedInfo*)malloc(
348 sizeof(CConnectionFailedInfo) + strlen(msg));
349 strcpy(info->m_what, msg);
350 EVENTQUEUE->addEvent(CEvent(getConnectionFailedEvent(),
351 getEventTarget(), info));
352}
353
354void
355CTCPSocket::sendEvent(CEvent::Type type)
356{
357 EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
358}
359
360void
361CTCPSocket::onConnected()
362{
363 m_connected = true;
364 m_readable = true;
365 m_writable = true;
366}
367
368void
369CTCPSocket::onInputShutdown()
370{
371 m_inputBuffer.pop(m_inputBuffer.getSize());
372 m_readable = false;
373}
374
375void
376CTCPSocket::onOutputShutdown()
377{
378 m_outputBuffer.pop(m_outputBuffer.getSize());
379 m_writable = false;
380
381 // we're now flushed
382 m_flushed = true;
383 m_flushed.broadcast();
384}
385
386void
387CTCPSocket::onDisconnected()
388{
389 // disconnected
390 onInputShutdown();
391 onOutputShutdown();
392 m_connected = false;
393}
394
395ISocketMultiplexerJob*
396CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
397 bool, bool write, bool error)
398{
399 CLock lock(&m_mutex);
400
401 // should only check for errors if error is true but checking a new
402 // socket (and a socket that's connecting should be new) for errors
403 // should be safe and Mac OS X appears to have a bug where a
404 // non-blocking stream socket that fails to connect immediately is
405 // reported by select as being writable (i.e. connected) even when
406 // the connection has failed. this is easily demonstrated on OS X
407 // 10.3.4 by starting a synergy client and telling to connect to
408 // another system that's not running a synergy server. it will
409 // claim to have connected then quickly disconnect (i guess because
410 // read returns 0 bytes). unfortunately, synergy attempts to
411 // reconnect immediately, the process repeats and we end up
412 // spinning the CPU. luckily, OS X does set SO_ERROR on the
413 // socket correctly when the connection has failed so checking for
414 // errors works. (curiously, sometimes OS X doesn't report
415 // connection refused. when that happens it at least doesn't
416 // report the socket as being writable so synergy is able to time
417 // out the attempt.)
418 if (error || true) {
419 try {
420 // connection may have failed or succeeded
421 ARCH->throwErrorOnSocket(m_socket);
422 }
423 catch (XArchNetwork& e) {
424 sendConnectionFailedEvent(e.what().c_str());
425 onDisconnected();
426 return newJob();
427 }
428 }
429
430 if (write) {
431 sendEvent(getConnectedEvent());
432 onConnected();
433 return newJob();
434 }
435
436 return job;
437}
438
439ISocketMultiplexerJob*
440CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
441 bool read, bool write, bool error)
442{
443 CLock lock(&m_mutex);
444
445 if (error) {
446 sendEvent(getDisconnectedEvent());
447 onDisconnected();
448 return newJob();
449 }
450
451 bool needNewJob = false;
452
453 if (write) {
454 try {
455 // write data
456 UInt32 n = m_outputBuffer.getSize();
457 const void* buffer = m_outputBuffer.peek(n);
458 n = (UInt32)ARCH->writeSocket(m_socket, buffer, n);
459
460 // discard written data
461 if (n > 0) {
462 m_outputBuffer.pop(n);
463 if (m_outputBuffer.getSize() == 0) {
464 sendEvent(getOutputFlushedEvent());
465 m_flushed = true;
466 m_flushed.broadcast();
467 needNewJob = true;
468 }
469 }
470 }
471 catch (XArchNetworkShutdown&) {
472 // remote read end of stream hungup. our output side
473 // has therefore shutdown.
474 onOutputShutdown();
475 sendEvent(getOutputShutdownEvent());
476 if (!m_readable && m_inputBuffer.getSize() == 0) {
477 sendEvent(getDisconnectedEvent());
478 m_connected = false;
479 }
480 needNewJob = true;
481 }
482 catch (XArchNetworkDisconnected&) {
483 // stream hungup
484 onDisconnected();
485 sendEvent(getDisconnectedEvent());
486 needNewJob = true;
487 }
488 catch (XArchNetwork& e) {
489 // other write error
490 LOG((CLOG_WARN "error writing socket: %s", e.what().c_str()));
491 onDisconnected();
492 sendEvent(getOutputErrorEvent());
493 sendEvent(getDisconnectedEvent());
494 needNewJob = true;
495 }
496 }
497
498 if (read && m_readable) {
499 try {
500 UInt8 buffer[4096];
501 size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
502 if (n > 0) {
503 bool wasEmpty = (m_inputBuffer.getSize() == 0);
504
505 // slurp up as much as possible
506 do {
507 m_inputBuffer.write(buffer, n);
508 n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
509 } while (n > 0);
510
511 // send input ready if input buffer was empty
512 if (wasEmpty) {
513 sendEvent(getInputReadyEvent());
514 }
515 }
516 else {
517 // remote write end of stream hungup. our input side
518 // has therefore shutdown but don't flush our buffer
519 // since there's still data to be read.
520 sendEvent(getInputShutdownEvent());
521 if (!m_writable && m_inputBuffer.getSize() == 0) {
522 sendEvent(getDisconnectedEvent());
523 m_connected = false;
524 }
525 m_readable = false;
526 needNewJob = true;
527 }
528 }
529 catch (XArchNetworkDisconnected&) {
530 // stream hungup
531 sendEvent(getDisconnectedEvent());
532 onDisconnected();
533 needNewJob = true;
534 }
535 catch (XArchNetwork& e) {
536 // ignore other read error
537 LOG((CLOG_WARN "error reading socket: %s", e.what().c_str()));
538 }
539 }
540
541 return needNewJob ? newJob() : job;
542}
Note: See TracBrowser for help on using the repository browser.