source: python/trunk/Lib/test/test_socket.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 58.9 KB
Line 
1#!/usr/bin/env python
2
3import unittest
4from test import test_support
5
6import errno
7import socket
8import select
9import _testcapi
10import time
11import traceback
12import Queue
13import sys
14import os
15import array
16import contextlib
17from weakref import proxy
18import signal
19import math
20
21def try_address(host, port=0, family=socket.AF_INET):
22 """Try to bind a socket on the given host:port and return True
23 if that has been possible."""
24 try:
25 sock = socket.socket(family, socket.SOCK_STREAM)
26 sock.bind((host, port))
27 except (socket.error, socket.gaierror):
28 return False
29 else:
30 sock.close()
31 return True
32
33HOST = test_support.HOST
34MSG = b'Michael Gilfix was here\n'
35SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
36
37try:
38 import thread
39 import threading
40except ImportError:
41 thread = None
42 threading = None
43
44HOST = test_support.HOST
45MSG = 'Michael Gilfix was here\n'
46
47class SocketTCPTest(unittest.TestCase):
48
49 def setUp(self):
50 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
51 self.port = test_support.bind_port(self.serv)
52 self.serv.listen(1)
53
54 def tearDown(self):
55 self.serv.close()
56 self.serv = None
57
58class SocketUDPTest(unittest.TestCase):
59
60 def setUp(self):
61 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
62 self.port = test_support.bind_port(self.serv)
63
64 def tearDown(self):
65 self.serv.close()
66 self.serv = None
67
68class ThreadableTest:
69 """Threadable Test class
70
71 The ThreadableTest class makes it easy to create a threaded
72 client/server pair from an existing unit test. To create a
73 new threaded class from an existing unit test, use multiple
74 inheritance:
75
76 class NewClass (OldClass, ThreadableTest):
77 pass
78
79 This class defines two new fixture functions with obvious
80 purposes for overriding:
81
82 clientSetUp ()
83 clientTearDown ()
84
85 Any new test functions within the class must then define
86 tests in pairs, where the test name is preceeded with a
87 '_' to indicate the client portion of the test. Ex:
88
89 def testFoo(self):
90 # Server portion
91
92 def _testFoo(self):
93 # Client portion
94
95 Any exceptions raised by the clients during their tests
96 are caught and transferred to the main thread to alert
97 the testing framework.
98
99 Note, the server setup function cannot call any blocking
100 functions that rely on the client thread during setup,
101 unless serverExplicitReady() is called just before
102 the blocking call (such as in setting up a client/server
103 connection and performing the accept() in setUp().
104 """
105
106 def __init__(self):
107 # Swap the true setup function
108 self.__setUp = self.setUp
109 self.__tearDown = self.tearDown
110 self.setUp = self._setUp
111 self.tearDown = self._tearDown
112
113 def serverExplicitReady(self):
114 """This method allows the server to explicitly indicate that
115 it wants the client thread to proceed. This is useful if the
116 server is about to execute a blocking routine that is
117 dependent upon the client thread during its setup routine."""
118 self.server_ready.set()
119
120 def _setUp(self):
121 self.server_ready = threading.Event()
122 self.client_ready = threading.Event()
123 self.done = threading.Event()
124 self.queue = Queue.Queue(1)
125
126 # Do some munging to start the client test.
127 methodname = self.id()
128 i = methodname.rfind('.')
129 methodname = methodname[i+1:]
130 test_method = getattr(self, '_' + methodname)
131 self.client_thread = thread.start_new_thread(
132 self.clientRun, (test_method,))
133
134 self.__setUp()
135 if not self.server_ready.is_set():
136 self.server_ready.set()
137 self.client_ready.wait()
138
139 def _tearDown(self):
140 self.__tearDown()
141 self.done.wait()
142
143 if not self.queue.empty():
144 msg = self.queue.get()
145 self.fail(msg)
146
147 def clientRun(self, test_func):
148 self.server_ready.wait()
149 self.clientSetUp()
150 self.client_ready.set()
151 if not callable(test_func):
152 raise TypeError("test_func must be a callable function.")
153 try:
154 test_func()
155 except Exception, strerror:
156 self.queue.put(strerror)
157 self.clientTearDown()
158
159 def clientSetUp(self):
160 raise NotImplementedError("clientSetUp must be implemented.")
161
162 def clientTearDown(self):
163 self.done.set()
164 thread.exit()
165
166class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
167
168 def __init__(self, methodName='runTest'):
169 SocketTCPTest.__init__(self, methodName=methodName)
170 ThreadableTest.__init__(self)
171
172 def clientSetUp(self):
173 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174
175 def clientTearDown(self):
176 self.cli.close()
177 self.cli = None
178 ThreadableTest.clientTearDown(self)
179
180class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
181
182 def __init__(self, methodName='runTest'):
183 SocketUDPTest.__init__(self, methodName=methodName)
184 ThreadableTest.__init__(self)
185
186 def clientSetUp(self):
187 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
188
189 def clientTearDown(self):
190 self.cli.close()
191 self.cli = None
192 ThreadableTest.clientTearDown(self)
193
194class SocketConnectedTest(ThreadedTCPSocketTest):
195
196 def __init__(self, methodName='runTest'):
197 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
198
199 def setUp(self):
200 ThreadedTCPSocketTest.setUp(self)
201 # Indicate explicitly we're ready for the client thread to
202 # proceed and then perform the blocking call to accept
203 self.serverExplicitReady()
204 conn, addr = self.serv.accept()
205 self.cli_conn = conn
206
207 def tearDown(self):
208 self.cli_conn.close()
209 self.cli_conn = None
210 ThreadedTCPSocketTest.tearDown(self)
211
212 def clientSetUp(self):
213 ThreadedTCPSocketTest.clientSetUp(self)
214 self.cli.connect((HOST, self.port))
215 self.serv_conn = self.cli
216
217 def clientTearDown(self):
218 self.serv_conn.close()
219 self.serv_conn = None
220 ThreadedTCPSocketTest.clientTearDown(self)
221
222class SocketPairTest(unittest.TestCase, ThreadableTest):
223
224 def __init__(self, methodName='runTest'):
225 unittest.TestCase.__init__(self, methodName=methodName)
226 ThreadableTest.__init__(self)
227
228 def setUp(self):
229 self.serv, self.cli = socket.socketpair()
230
231 def tearDown(self):
232 self.serv.close()
233 self.serv = None
234
235 def clientSetUp(self):
236 pass
237
238 def clientTearDown(self):
239 self.cli.close()
240 self.cli = None
241 ThreadableTest.clientTearDown(self)
242
243
244#######################################################################
245## Begin Tests
246
247class GeneralModuleTests(unittest.TestCase):
248
249 def test_weakref(self):
250 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
251 p = proxy(s)
252 self.assertEqual(p.fileno(), s.fileno())
253 s.close()
254 s = None
255 try:
256 p.fileno()
257 except ReferenceError:
258 pass
259 else:
260 self.fail('Socket proxy still exists')
261
262 def testSocketError(self):
263 # Testing socket module exceptions
264 def raise_error(*args, **kwargs):
265 raise socket.error
266 def raise_herror(*args, **kwargs):
267 raise socket.herror
268 def raise_gaierror(*args, **kwargs):
269 raise socket.gaierror
270 self.assertRaises(socket.error, raise_error,
271 "Error raising socket exception.")
272 self.assertRaises(socket.error, raise_herror,
273 "Error raising socket exception.")
274 self.assertRaises(socket.error, raise_gaierror,
275 "Error raising socket exception.")
276
277 def testSendtoErrors(self):
278 # Testing that sendto doens't masks failures. See #10169.
279 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
280 self.addCleanup(s.close)
281 s.bind(('', 0))
282 sockname = s.getsockname()
283 # 2 args
284 with self.assertRaises(UnicodeEncodeError):
285 s.sendto(u'\u2620', sockname)
286 with self.assertRaises(TypeError) as cm:
287 s.sendto(5j, sockname)
288 self.assertIn('not complex', str(cm.exception))
289 with self.assertRaises(TypeError) as cm:
290 s.sendto('foo', None)
291 self.assertIn('not NoneType', str(cm.exception))
292 # 3 args
293 with self.assertRaises(UnicodeEncodeError):
294 s.sendto(u'\u2620', 0, sockname)
295 with self.assertRaises(TypeError) as cm:
296 s.sendto(5j, 0, sockname)
297 self.assertIn('not complex', str(cm.exception))
298 with self.assertRaises(TypeError) as cm:
299 s.sendto('foo', 0, None)
300 self.assertIn('not NoneType', str(cm.exception))
301 with self.assertRaises(TypeError) as cm:
302 s.sendto('foo', 'bar', sockname)
303 self.assertIn('an integer is required', str(cm.exception))
304 with self.assertRaises(TypeError) as cm:
305 s.sendto('foo', None, None)
306 self.assertIn('an integer is required', str(cm.exception))
307 # wrong number of args
308 with self.assertRaises(TypeError) as cm:
309 s.sendto('foo')
310 self.assertIn('(1 given)', str(cm.exception))
311 with self.assertRaises(TypeError) as cm:
312 s.sendto('foo', 0, sockname, 4)
313 self.assertIn('(4 given)', str(cm.exception))
314
315
316 def testCrucialConstants(self):
317 # Testing for mission critical constants
318 socket.AF_INET
319 socket.SOCK_STREAM
320 socket.SOCK_DGRAM
321 socket.SOCK_RAW
322 socket.SOCK_RDM
323 socket.SOCK_SEQPACKET
324 socket.SOL_SOCKET
325 socket.SO_REUSEADDR
326
327 def testHostnameRes(self):
328 # Testing hostname resolution mechanisms
329 hostname = socket.gethostname()
330 try:
331 ip = socket.gethostbyname(hostname)
332 except socket.error:
333 # Probably name lookup wasn't set up right; skip this test
334 return
335 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
336 try:
337 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
338 except socket.error:
339 # Probably a similar problem as above; skip this test
340 return
341 all_host_names = [hostname, hname] + aliases
342 fqhn = socket.getfqdn(ip)
343 if not fqhn in all_host_names:
344 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
345
346 def testRefCountGetNameInfo(self):
347 # Testing reference count for getnameinfo
348 if hasattr(sys, "getrefcount"):
349 try:
350 # On some versions, this loses a reference
351 orig = sys.getrefcount(__name__)
352 socket.getnameinfo(__name__,0)
353 except TypeError:
354 self.assertEqual(sys.getrefcount(__name__), orig,
355 "socket.getnameinfo loses a reference")
356
357 def testInterpreterCrash(self):
358 # Making sure getnameinfo doesn't crash the interpreter
359 try:
360 # On some versions, this crashes the interpreter.
361 socket.getnameinfo(('x', 0, 0, 0), 0)
362 except socket.error:
363 pass
364
365 def testNtoH(self):
366 # This just checks that htons etc. are their own inverse,
367 # when looking at the lower 16 or 32 bits.
368 sizes = {socket.htonl: 32, socket.ntohl: 32,
369 socket.htons: 16, socket.ntohs: 16}
370 for func, size in sizes.items():
371 mask = (1L<<size) - 1
372 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
373 self.assertEqual(i & mask, func(func(i&mask)) & mask)
374
375 swapped = func(mask)
376 self.assertEqual(swapped & mask, mask)
377 self.assertRaises(OverflowError, func, 1L<<34)
378
379 def testNtoHErrors(self):
380 good_values = [ 1, 2, 3, 1L, 2L, 3L ]
381 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
382 for k in good_values:
383 socket.ntohl(k)
384 socket.ntohs(k)
385 socket.htonl(k)
386 socket.htons(k)
387 for k in bad_values:
388 self.assertRaises(OverflowError, socket.ntohl, k)
389 self.assertRaises(OverflowError, socket.ntohs, k)
390 self.assertRaises(OverflowError, socket.htonl, k)
391 self.assertRaises(OverflowError, socket.htons, k)
392
393 def testGetServBy(self):
394 eq = self.assertEqual
395 # Find one service that exists, then check all the related interfaces.
396 # I've ordered this by protocols that have both a tcp and udp
397 # protocol, at least for modern Linuxes.
398 if (sys.platform.startswith('linux') or
399 sys.platform.startswith('freebsd') or
400 sys.platform.startswith('netbsd') or
401 sys.platform == 'darwin'):
402 # avoid the 'echo' service on this platform, as there is an
403 # assumption breaking non-standard port/protocol entry
404 services = ('daytime', 'qotd', 'domain')
405 else:
406 services = ('echo', 'daytime', 'domain')
407 for service in services:
408 try:
409 port = socket.getservbyname(service, 'tcp')
410 break
411 except socket.error:
412 pass
413 else:
414 raise socket.error
415 # Try same call with optional protocol omitted
416 port2 = socket.getservbyname(service)
417 eq(port, port2)
418 # Try udp, but don't barf if it doesn't exist
419 try:
420 udpport = socket.getservbyname(service, 'udp')
421 except socket.error:
422 udpport = None
423 else:
424 eq(udpport, port)
425 # Now make sure the lookup by port returns the same service name
426 eq(socket.getservbyport(port2), service)
427 eq(socket.getservbyport(port, 'tcp'), service)
428 if udpport is not None:
429 eq(socket.getservbyport(udpport, 'udp'), service)
430 # Make sure getservbyport does not accept out of range ports.
431 self.assertRaises(OverflowError, socket.getservbyport, -1)
432 self.assertRaises(OverflowError, socket.getservbyport, 65536)
433
434 def testDefaultTimeout(self):
435 # Testing default timeout
436 # The default timeout should initially be None
437 self.assertEqual(socket.getdefaulttimeout(), None)
438 s = socket.socket()
439 self.assertEqual(s.gettimeout(), None)
440 s.close()
441
442 # Set the default timeout to 10, and see if it propagates
443 socket.setdefaulttimeout(10)
444 self.assertEqual(socket.getdefaulttimeout(), 10)
445 s = socket.socket()
446 self.assertEqual(s.gettimeout(), 10)
447 s.close()
448
449 # Reset the default timeout to None, and see if it propagates
450 socket.setdefaulttimeout(None)
451 self.assertEqual(socket.getdefaulttimeout(), None)
452 s = socket.socket()
453 self.assertEqual(s.gettimeout(), None)
454 s.close()
455
456 # Check that setting it to an invalid value raises ValueError
457 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
458
459 # Check that setting it to an invalid type raises TypeError
460 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
461
462 def testIPv4_inet_aton_fourbytes(self):
463 if not hasattr(socket, 'inet_aton'):
464 return # No inet_aton, nothing to check
465 # Test that issue1008086 and issue767150 are fixed.
466 # It must return 4 bytes.
467 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
468 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
469
470 def testIPv4toString(self):
471 if not hasattr(socket, 'inet_pton'):
472 return # No inet_pton() on this platform
473 from socket import inet_aton as f, inet_pton, AF_INET
474 g = lambda a: inet_pton(AF_INET, a)
475
476 self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))
477 self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))
478 self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
479 self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))
480 self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))
481
482 self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))
483 self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))
484 self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
485 self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
486
487 def testIPv6toString(self):
488 if not hasattr(socket, 'inet_pton'):
489 return # No inet_pton() on this platform
490 try:
491 from socket import inet_pton, AF_INET6, has_ipv6
492 if not has_ipv6:
493 return
494 except ImportError:
495 return
496 f = lambda a: inet_pton(AF_INET6, a)
497
498 self.assertEqual('\x00' * 16, f('::'))
499 self.assertEqual('\x00' * 16, f('0::0'))
500 self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
501 self.assertEqual(
502 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
503 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
504 )
505
506 def testStringToIPv4(self):
507 if not hasattr(socket, 'inet_ntop'):
508 return # No inet_ntop() on this platform
509 from socket import inet_ntoa as f, inet_ntop, AF_INET
510 g = lambda a: inet_ntop(AF_INET, a)
511
512 self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))
513 self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))
514 self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))
515 self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))
516
517 self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))
518 self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
519 self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
520
521 def testStringToIPv6(self):
522 if not hasattr(socket, 'inet_ntop'):
523 return # No inet_ntop() on this platform
524 try:
525 from socket import inet_ntop, AF_INET6, has_ipv6
526 if not has_ipv6:
527 return
528 except ImportError:
529 return
530 f = lambda a: inet_ntop(AF_INET6, a)
531
532 self.assertEqual('::', f('\x00' * 16))
533 self.assertEqual('::1', f('\x00' * 15 + '\x01'))
534 self.assertEqual(
535 'aef:b01:506:1001:ffff:9997:55:170',
536 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
537 )
538
539 # XXX The following don't test module-level functionality...
540
541 def _get_unused_port(self, bind_address='0.0.0.0'):
542 """Use a temporary socket to elicit an unused ephemeral port.
543
544 Args:
545 bind_address: Hostname or IP address to search for a port on.
546
547 Returns: A most likely to be unused port.
548 """
549 tempsock = socket.socket()
550 tempsock.bind((bind_address, 0))
551 host, port = tempsock.getsockname()
552 tempsock.close()
553 return port
554
555 def testSockName(self):
556 # Testing getsockname()
557 port = self._get_unused_port()
558 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
559 self.addCleanup(sock.close)
560 sock.bind(("0.0.0.0", port))
561 name = sock.getsockname()
562 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
563 # it reasonable to get the host's addr in addition to 0.0.0.0.
564 # At least for eCos. This is required for the S/390 to pass.
565 try:
566 my_ip_addr = socket.gethostbyname(socket.gethostname())
567 except socket.error:
568 # Probably name lookup wasn't set up right; skip this test
569 return
570 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
571 self.assertEqual(name[1], port)
572
573 def testGetSockOpt(self):
574 # Testing getsockopt()
575 # We know a socket should start without reuse==0
576 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
577 self.addCleanup(sock.close)
578 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
579 self.assertFalse(reuse != 0, "initial mode is reuse")
580
581 def testSetSockOpt(self):
582 # Testing setsockopt()
583 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
584 self.addCleanup(sock.close)
585 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
586 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
587 self.assertFalse(reuse == 0, "failed to set reuse mode")
588
589 def testSendAfterClose(self):
590 # testing send() after close() with timeout
591 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
592 sock.settimeout(1)
593 sock.close()
594 self.assertRaises(socket.error, sock.send, "spam")
595
596 def testNewAttributes(self):
597 # testing .family, .type and .protocol
598 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
599 self.assertEqual(sock.family, socket.AF_INET)
600 self.assertEqual(sock.type, socket.SOCK_STREAM)
601 self.assertEqual(sock.proto, 0)
602 sock.close()
603
604 def test_getsockaddrarg(self):
605 host = '0.0.0.0'
606 port = self._get_unused_port(bind_address=host)
607 big_port = port + 65536
608 neg_port = port - 65536
609 sock = socket.socket()
610 try:
611 self.assertRaises(OverflowError, sock.bind, (host, big_port))
612 self.assertRaises(OverflowError, sock.bind, (host, neg_port))
613 sock.bind((host, port))
614 finally:
615 sock.close()
616
617 @unittest.skipUnless(os.name == "nt", "Windows specific")
618 def test_sock_ioctl(self):
619 self.assertTrue(hasattr(socket.socket, 'ioctl'))
620 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
621 self.assertTrue(hasattr(socket, 'RCVALL_ON'))
622 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
623 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
624 s = socket.socket()
625 self.addCleanup(s.close)
626 self.assertRaises(ValueError, s.ioctl, -1, None)
627 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
628
629 def testGetaddrinfo(self):
630 try:
631 socket.getaddrinfo('localhost', 80)
632 except socket.gaierror as err:
633 if err.errno == socket.EAI_SERVICE:
634 # see http://bugs.python.org/issue1282647
635 self.skipTest("buggy libc version")
636 raise
637 # len of every sequence is supposed to be == 5
638 for info in socket.getaddrinfo(HOST, None):
639 self.assertEqual(len(info), 5)
640 # host can be a domain name, a string representation of an
641 # IPv4/v6 address or None
642 socket.getaddrinfo('localhost', 80)
643 socket.getaddrinfo('127.0.0.1', 80)
644 socket.getaddrinfo(None, 80)
645 if SUPPORTS_IPV6:
646 socket.getaddrinfo('::1', 80)
647 # port can be a string service name such as "http", a numeric
648 # port number (int or long), or None
649 socket.getaddrinfo(HOST, "http")
650 socket.getaddrinfo(HOST, 80)
651 socket.getaddrinfo(HOST, 80L)
652 socket.getaddrinfo(HOST, None)
653 # test family and socktype filters
654 infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
655 for family, _, _, _, _ in infos:
656 self.assertEqual(family, socket.AF_INET)
657 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
658 for _, socktype, _, _, _ in infos:
659 self.assertEqual(socktype, socket.SOCK_STREAM)
660 # test proto and flags arguments
661 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
662 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
663 # a server willing to support both IPv4 and IPv6 will
664 # usually do this
665 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
666 socket.AI_PASSIVE)
667
668 # Issue 17269
669 if hasattr(socket, 'AI_NUMERICSERV'):
670 socket.getaddrinfo("localhost", None, 0, 0, 0, socket.AI_NUMERICSERV)
671
672 def check_sendall_interrupted(self, with_timeout):
673 # socketpair() is not stricly required, but it makes things easier.
674 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
675 self.skipTest("signal.alarm and socket.socketpair required for this test")
676 # Our signal handlers clobber the C errno by calling a math function
677 # with an invalid domain value.
678 def ok_handler(*args):
679 self.assertRaises(ValueError, math.acosh, 0)
680 def raising_handler(*args):
681 self.assertRaises(ValueError, math.acosh, 0)
682 1 // 0
683 c, s = socket.socketpair()
684 old_alarm = signal.signal(signal.SIGALRM, raising_handler)
685 try:
686 if with_timeout:
687 # Just above the one second minimum for signal.alarm
688 c.settimeout(1.5)
689 with self.assertRaises(ZeroDivisionError):
690 signal.alarm(1)
691 c.sendall(b"x" * test_support.SOCK_MAX_SIZE)
692 if with_timeout:
693 signal.signal(signal.SIGALRM, ok_handler)
694 signal.alarm(1)
695 self.assertRaises(socket.timeout, c.sendall,
696 b"x" * test_support.SOCK_MAX_SIZE)
697 finally:
698 signal.signal(signal.SIGALRM, old_alarm)
699 c.close()
700 s.close()
701
702 def test_sendall_interrupted(self):
703 self.check_sendall_interrupted(False)
704
705 def test_sendall_interrupted_with_timeout(self):
706 self.check_sendall_interrupted(True)
707
708 def test_listen_backlog(self):
709 for backlog in 0, -1:
710 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
711 srv.bind((HOST, 0))
712 srv.listen(backlog)
713 srv.close()
714
715 # Issue 15989
716 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
717 srv.bind((HOST, 0))
718 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
719 srv.close()
720
721 @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
722 def test_flowinfo(self):
723 self.assertRaises(OverflowError, socket.getnameinfo,
724 ('::1',0, 0xffffffff), 0)
725 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
726 try:
727 self.assertRaises(OverflowError, s.bind, ('::1', 0, -10))
728 finally:
729 s.close()
730
731
732@unittest.skipUnless(thread, 'Threading required for this test.')
733class BasicTCPTest(SocketConnectedTest):
734
735 def __init__(self, methodName='runTest'):
736 SocketConnectedTest.__init__(self, methodName=methodName)
737
738 def testRecv(self):
739 # Testing large receive over TCP
740 msg = self.cli_conn.recv(1024)
741 self.assertEqual(msg, MSG)
742
743 def _testRecv(self):
744 self.serv_conn.send(MSG)
745
746 def testOverFlowRecv(self):
747 # Testing receive in chunks over TCP
748 seg1 = self.cli_conn.recv(len(MSG) - 3)
749 seg2 = self.cli_conn.recv(1024)
750 msg = seg1 + seg2
751 self.assertEqual(msg, MSG)
752
753 def _testOverFlowRecv(self):
754 self.serv_conn.send(MSG)
755
756 def testRecvFrom(self):
757 # Testing large recvfrom() over TCP
758 msg, addr = self.cli_conn.recvfrom(1024)
759 self.assertEqual(msg, MSG)
760
761 def _testRecvFrom(self):
762 self.serv_conn.send(MSG)
763
764 def testOverFlowRecvFrom(self):
765 # Testing recvfrom() in chunks over TCP
766 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
767 seg2, addr = self.cli_conn.recvfrom(1024)
768 msg = seg1 + seg2
769 self.assertEqual(msg, MSG)
770
771 def _testOverFlowRecvFrom(self):
772 self.serv_conn.send(MSG)
773
774 def testSendAll(self):
775 # Testing sendall() with a 2048 byte string over TCP
776 msg = ''
777 while 1:
778 read = self.cli_conn.recv(1024)
779 if not read:
780 break
781 msg += read
782 self.assertEqual(msg, 'f' * 2048)
783
784 def _testSendAll(self):
785 big_chunk = 'f' * 2048
786 self.serv_conn.sendall(big_chunk)
787
788 def testFromFd(self):
789 # Testing fromfd()
790 if not hasattr(socket, "fromfd"):
791 return # On Windows, this doesn't exist
792 fd = self.cli_conn.fileno()
793 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
794 self.addCleanup(sock.close)
795 msg = sock.recv(1024)
796 self.assertEqual(msg, MSG)
797
798 def _testFromFd(self):
799 self.serv_conn.send(MSG)
800
801 def testDup(self):
802 # Testing dup()
803 sock = self.cli_conn.dup()
804 self.addCleanup(sock.close)
805 msg = sock.recv(1024)
806 self.assertEqual(msg, MSG)
807
808 def _testDup(self):
809 self.serv_conn.send(MSG)
810
811 def testShutdown(self):
812 # Testing shutdown()
813 msg = self.cli_conn.recv(1024)
814 self.assertEqual(msg, MSG)
815 # wait for _testShutdown to finish: on OS X, when the server
816 # closes the connection the client also becomes disconnected,
817 # and the client's shutdown call will fail. (Issue #4397.)
818 self.done.wait()
819
820 def _testShutdown(self):
821 self.serv_conn.send(MSG)
822 # Issue 15989
823 self.assertRaises(OverflowError, self.serv_conn.shutdown,
824 _testcapi.INT_MAX + 1)
825 self.assertRaises(OverflowError, self.serv_conn.shutdown,
826 2 + (_testcapi.UINT_MAX + 1))
827 self.serv_conn.shutdown(2)
828
829@unittest.skipUnless(thread, 'Threading required for this test.')
830class BasicUDPTest(ThreadedUDPSocketTest):
831
832 def __init__(self, methodName='runTest'):
833 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
834
835 def testSendtoAndRecv(self):
836 # Testing sendto() and Recv() over UDP
837 msg = self.serv.recv(len(MSG))
838 self.assertEqual(msg, MSG)
839
840 def _testSendtoAndRecv(self):
841 self.cli.sendto(MSG, 0, (HOST, self.port))
842
843 def testRecvFrom(self):
844 # Testing recvfrom() over UDP
845 msg, addr = self.serv.recvfrom(len(MSG))
846 self.assertEqual(msg, MSG)
847
848 def _testRecvFrom(self):
849 self.cli.sendto(MSG, 0, (HOST, self.port))
850
851 def testRecvFromNegative(self):
852 # Negative lengths passed to recvfrom should give ValueError.
853 self.assertRaises(ValueError, self.serv.recvfrom, -1)
854
855 def _testRecvFromNegative(self):
856 self.cli.sendto(MSG, 0, (HOST, self.port))
857
858@unittest.skipUnless(thread, 'Threading required for this test.')
859class TCPCloserTest(ThreadedTCPSocketTest):
860
861 def testClose(self):
862 conn, addr = self.serv.accept()
863 conn.close()
864
865 sd = self.cli
866 read, write, err = select.select([sd], [], [], 1.0)
867 self.assertEqual(read, [sd])
868 self.assertEqual(sd.recv(1), '')
869
870 def _testClose(self):
871 self.cli.connect((HOST, self.port))
872 time.sleep(1.0)
873
874@unittest.skipUnless(thread, 'Threading required for this test.')
875class BasicSocketPairTest(SocketPairTest):
876
877 def __init__(self, methodName='runTest'):
878 SocketPairTest.__init__(self, methodName=methodName)
879
880 def testRecv(self):
881 msg = self.serv.recv(1024)
882 self.assertEqual(msg, MSG)
883
884 def _testRecv(self):
885 self.cli.send(MSG)
886
887 def testSend(self):
888 self.serv.send(MSG)
889
890 def _testSend(self):
891 msg = self.cli.recv(1024)
892 self.assertEqual(msg, MSG)
893
894@unittest.skipUnless(thread, 'Threading required for this test.')
895class NonBlockingTCPTests(ThreadedTCPSocketTest):
896
897 def __init__(self, methodName='runTest'):
898 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
899
900 def testSetBlocking(self):
901 # Testing whether set blocking works
902 self.serv.setblocking(True)
903 self.assertIsNone(self.serv.gettimeout())
904 self.serv.setblocking(False)
905 self.assertEqual(self.serv.gettimeout(), 0.0)
906 start = time.time()
907 try:
908 self.serv.accept()
909 except socket.error:
910 pass
911 end = time.time()
912 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
913 # Issue 15989
914 if _testcapi.UINT_MAX < _testcapi.ULONG_MAX:
915 self.serv.setblocking(_testcapi.UINT_MAX + 1)
916 self.assertIsNone(self.serv.gettimeout())
917
918 def _testSetBlocking(self):
919 pass
920
921 def testAccept(self):
922 # Testing non-blocking accept
923 self.serv.setblocking(0)
924 try:
925 conn, addr = self.serv.accept()
926 except socket.error:
927 pass
928 else:
929 self.fail("Error trying to do non-blocking accept.")
930 read, write, err = select.select([self.serv], [], [])
931 if self.serv in read:
932 conn, addr = self.serv.accept()
933 conn.close()
934 else:
935 self.fail("Error trying to do accept after select.")
936
937 def _testAccept(self):
938 time.sleep(0.1)
939 self.cli.connect((HOST, self.port))
940
941 def testConnect(self):
942 # Testing non-blocking connect
943 conn, addr = self.serv.accept()
944 conn.close()
945
946 def _testConnect(self):
947 self.cli.settimeout(10)
948 self.cli.connect((HOST, self.port))
949
950 def testRecv(self):
951 # Testing non-blocking recv
952 conn, addr = self.serv.accept()
953 conn.setblocking(0)
954 try:
955 msg = conn.recv(len(MSG))
956 except socket.error:
957 pass
958 else:
959 self.fail("Error trying to do non-blocking recv.")
960 read, write, err = select.select([conn], [], [])
961 if conn in read:
962 msg = conn.recv(len(MSG))
963 conn.close()
964 self.assertEqual(msg, MSG)
965 else:
966 self.fail("Error during select call to non-blocking socket.")
967
968 def _testRecv(self):
969 self.cli.connect((HOST, self.port))
970 time.sleep(0.1)
971 self.cli.send(MSG)
972
973@unittest.skipUnless(thread, 'Threading required for this test.')
974class FileObjectClassTestCase(SocketConnectedTest):
975
976 bufsize = -1 # Use default buffer size
977
978 def __init__(self, methodName='runTest'):
979 SocketConnectedTest.__init__(self, methodName=methodName)
980
981 def setUp(self):
982 SocketConnectedTest.setUp(self)
983 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
984
985 def tearDown(self):
986 self.serv_file.close()
987 self.assertTrue(self.serv_file.closed)
988 SocketConnectedTest.tearDown(self)
989 self.serv_file = None
990
991 def clientSetUp(self):
992 SocketConnectedTest.clientSetUp(self)
993 self.cli_file = self.serv_conn.makefile('wb')
994
995 def clientTearDown(self):
996 self.cli_file.close()
997 self.assertTrue(self.cli_file.closed)
998 self.cli_file = None
999 SocketConnectedTest.clientTearDown(self)
1000
1001 def testSmallRead(self):
1002 # Performing small file read test
1003 first_seg = self.serv_file.read(len(MSG)-3)
1004 second_seg = self.serv_file.read(3)
1005 msg = first_seg + second_seg
1006 self.assertEqual(msg, MSG)
1007
1008 def _testSmallRead(self):
1009 self.cli_file.write(MSG)
1010 self.cli_file.flush()
1011
1012 def testFullRead(self):
1013 # read until EOF
1014 msg = self.serv_file.read()
1015 self.assertEqual(msg, MSG)
1016
1017 def _testFullRead(self):
1018 self.cli_file.write(MSG)
1019 self.cli_file.close()
1020
1021 def testUnbufferedRead(self):
1022 # Performing unbuffered file read test
1023 buf = ''
1024 while 1:
1025 char = self.serv_file.read(1)
1026 if not char:
1027 break
1028 buf += char
1029 self.assertEqual(buf, MSG)
1030
1031 def _testUnbufferedRead(self):
1032 self.cli_file.write(MSG)
1033 self.cli_file.flush()
1034
1035 def testReadline(self):
1036 # Performing file readline test
1037 line = self.serv_file.readline()
1038 self.assertEqual(line, MSG)
1039
1040 def _testReadline(self):
1041 self.cli_file.write(MSG)
1042 self.cli_file.flush()
1043
1044 def testReadlineAfterRead(self):
1045 a_baloo_is = self.serv_file.read(len("A baloo is"))
1046 self.assertEqual("A baloo is", a_baloo_is)
1047 _a_bear = self.serv_file.read(len(" a bear"))
1048 self.assertEqual(" a bear", _a_bear)
1049 line = self.serv_file.readline()
1050 self.assertEqual("\n", line)
1051 line = self.serv_file.readline()
1052 self.assertEqual("A BALOO IS A BEAR.\n", line)
1053 line = self.serv_file.readline()
1054 self.assertEqual(MSG, line)
1055
1056 def _testReadlineAfterRead(self):
1057 self.cli_file.write("A baloo is a bear\n")
1058 self.cli_file.write("A BALOO IS A BEAR.\n")
1059 self.cli_file.write(MSG)
1060 self.cli_file.flush()
1061
1062 def testReadlineAfterReadNoNewline(self):
1063 end_of_ = self.serv_file.read(len("End Of "))
1064 self.assertEqual("End Of ", end_of_)
1065 line = self.serv_file.readline()
1066 self.assertEqual("Line", line)
1067
1068 def _testReadlineAfterReadNoNewline(self):
1069 self.cli_file.write("End Of Line")
1070
1071 def testClosedAttr(self):
1072 self.assertTrue(not self.serv_file.closed)
1073
1074 def _testClosedAttr(self):
1075 self.assertTrue(not self.cli_file.closed)
1076
1077
1078class FileObjectInterruptedTestCase(unittest.TestCase):
1079 """Test that the file object correctly handles EINTR internally."""
1080
1081 class MockSocket(object):
1082 def __init__(self, recv_funcs=()):
1083 # A generator that returns callables that we'll call for each
1084 # call to recv().
1085 self._recv_step = iter(recv_funcs)
1086
1087 def recv(self, size):
1088 return self._recv_step.next()()
1089
1090 @staticmethod
1091 def _raise_eintr():
1092 raise socket.error(errno.EINTR)
1093
1094 def _test_readline(self, size=-1, **kwargs):
1095 mock_sock = self.MockSocket(recv_funcs=[
1096 lambda : "This is the first line\nAnd the sec",
1097 self._raise_eintr,
1098 lambda : "ond line is here\n",
1099 lambda : "",
1100 ])
1101 fo = socket._fileobject(mock_sock, **kwargs)
1102 self.assertEqual(fo.readline(size), "This is the first line\n")
1103 self.assertEqual(fo.readline(size), "And the second line is here\n")
1104
1105 def _test_read(self, size=-1, **kwargs):
1106 mock_sock = self.MockSocket(recv_funcs=[
1107 lambda : "This is the first line\nAnd the sec",
1108 self._raise_eintr,
1109 lambda : "ond line is here\n",
1110 lambda : "",
1111 ])
1112 fo = socket._fileobject(mock_sock, **kwargs)
1113 self.assertEqual(fo.read(size), "This is the first line\n"
1114 "And the second line is here\n")
1115
1116 def test_default(self):
1117 self._test_readline()
1118 self._test_readline(size=100)
1119 self._test_read()
1120 self._test_read(size=100)
1121
1122 def test_with_1k_buffer(self):
1123 self._test_readline(bufsize=1024)
1124 self._test_readline(size=100, bufsize=1024)
1125 self._test_read(bufsize=1024)
1126 self._test_read(size=100, bufsize=1024)
1127
1128 def _test_readline_no_buffer(self, size=-1):
1129 mock_sock = self.MockSocket(recv_funcs=[
1130 lambda : "aa",
1131 lambda : "\n",
1132 lambda : "BB",
1133 self._raise_eintr,
1134 lambda : "bb",
1135 lambda : "",
1136 ])
1137 fo = socket._fileobject(mock_sock, bufsize=0)
1138 self.assertEqual(fo.readline(size), "aa\n")
1139 self.assertEqual(fo.readline(size), "BBbb")
1140
1141 def test_no_buffer(self):
1142 self._test_readline_no_buffer()
1143 self._test_readline_no_buffer(size=4)
1144 self._test_read(bufsize=0)
1145 self._test_read(size=100, bufsize=0)
1146
1147
1148class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
1149
1150 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
1151
1152 In this case (and in this case only), it should be possible to
1153 create a file object, read a line from it, create another file
1154 object, read another line from it, without loss of data in the
1155 first file object's buffer. Note that httplib relies on this
1156 when reading multiple requests from the same socket."""
1157
1158 bufsize = 0 # Use unbuffered mode
1159
1160 def testUnbufferedReadline(self):
1161 # Read a line, create a new file object, read another line with it
1162 line = self.serv_file.readline() # first line
1163 self.assertEqual(line, "A. " + MSG) # first line
1164 self.serv_file = self.cli_conn.makefile('rb', 0)
1165 line = self.serv_file.readline() # second line
1166 self.assertEqual(line, "B. " + MSG) # second line
1167
1168 def _testUnbufferedReadline(self):
1169 self.cli_file.write("A. " + MSG)
1170 self.cli_file.write("B. " + MSG)
1171 self.cli_file.flush()
1172
1173class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1174
1175 bufsize = 1 # Default-buffered for reading; line-buffered for writing
1176
1177 class SocketMemo(object):
1178 """A wrapper to keep track of sent data, needed to examine write behaviour"""
1179 def __init__(self, sock):
1180 self._sock = sock
1181 self.sent = []
1182
1183 def send(self, data, flags=0):
1184 n = self._sock.send(data, flags)
1185 self.sent.append(data[:n])
1186 return n
1187
1188 def sendall(self, data, flags=0):
1189 self._sock.sendall(data, flags)
1190 self.sent.append(data)
1191
1192 def __getattr__(self, attr):
1193 return getattr(self._sock, attr)
1194
1195 def getsent(self):
1196 return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent]
1197
1198 def setUp(self):
1199 FileObjectClassTestCase.setUp(self)
1200 self.serv_file._sock = self.SocketMemo(self.serv_file._sock)
1201
1202 def testLinebufferedWrite(self):
1203 # Write two lines, in small chunks
1204 msg = MSG.strip()
1205 print >> self.serv_file, msg,
1206 print >> self.serv_file, msg
1207
1208 # second line:
1209 print >> self.serv_file, msg,
1210 print >> self.serv_file, msg,
1211 print >> self.serv_file, msg
1212
1213 # third line
1214 print >> self.serv_file, ''
1215
1216 self.serv_file.flush()
1217
1218 msg1 = "%s %s\n"%(msg, msg)
1219 msg2 = "%s %s %s\n"%(msg, msg, msg)
1220 msg3 = "\n"
1221 self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3])
1222
1223 def _testLinebufferedWrite(self):
1224 msg = MSG.strip()
1225 msg1 = "%s %s\n"%(msg, msg)
1226 msg2 = "%s %s %s\n"%(msg, msg, msg)
1227 msg3 = "\n"
1228 l1 = self.cli_file.readline()
1229 self.assertEqual(l1, msg1)
1230 l2 = self.cli_file.readline()
1231 self.assertEqual(l2, msg2)
1232 l3 = self.cli_file.readline()
1233 self.assertEqual(l3, msg3)
1234
1235
1236class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
1237
1238 bufsize = 2 # Exercise the buffering code
1239
1240
1241class NetworkConnectionTest(object):
1242 """Prove network connection."""
1243 def clientSetUp(self):
1244 # We're inherited below by BasicTCPTest2, which also inherits
1245 # BasicTCPTest, which defines self.port referenced below.
1246 self.cli = socket.create_connection((HOST, self.port))
1247 self.serv_conn = self.cli
1248
1249class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
1250 """Tests that NetworkConnection does not break existing TCP functionality.
1251 """
1252
1253class NetworkConnectionNoServer(unittest.TestCase):
1254 class MockSocket(socket.socket):
1255 def connect(self, *args):
1256 raise socket.timeout('timed out')
1257
1258 @contextlib.contextmanager
1259 def mocked_socket_module(self):
1260 """Return a socket which times out on connect"""
1261 old_socket = socket.socket
1262 socket.socket = self.MockSocket
1263 try:
1264 yield
1265 finally:
1266 socket.socket = old_socket
1267
1268 def test_connect(self):
1269 port = test_support.find_unused_port()
1270 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1271 self.addCleanup(cli.close)
1272 with self.assertRaises(socket.error) as cm:
1273 cli.connect((HOST, port))
1274 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
1275
1276 def test_create_connection(self):
1277 # Issue #9792: errors raised by create_connection() should have
1278 # a proper errno attribute.
1279 port = test_support.find_unused_port()
1280 with self.assertRaises(socket.error) as cm:
1281 socket.create_connection((HOST, port))
1282
1283 # Issue #16257: create_connection() calls getaddrinfo() against
1284 # 'localhost'. This may result in an IPV6 addr being returned
1285 # as well as an IPV4 one:
1286 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
1287 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)),
1288 # (26, 2, 0, '', ('::1', 41230, 0, 0))]
1289 #
1290 # create_connection() enumerates through all the addresses returned
1291 # and if it doesn't successfully bind to any of them, it propagates
1292 # the last exception it encountered.
1293 #
1294 # On Solaris, ENETUNREACH is returned in this circumstance instead
1295 # of ECONNREFUSED. So, if that errno exists, add it to our list of
1296 # expected errnos.
1297 expected_errnos = [ errno.ECONNREFUSED, ]
1298 if hasattr(errno, 'ENETUNREACH'):
1299 expected_errnos.append(errno.ENETUNREACH)
1300
1301 self.assertIn(cm.exception.errno, expected_errnos)
1302
1303 def test_create_connection_timeout(self):
1304 # Issue #9792: create_connection() should not recast timeout errors
1305 # as generic socket errors.
1306 with self.mocked_socket_module():
1307 with self.assertRaises(socket.timeout):
1308 socket.create_connection((HOST, 1234))
1309
1310
1311@unittest.skipUnless(thread, 'Threading required for this test.')
1312class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
1313
1314 def __init__(self, methodName='runTest'):
1315 SocketTCPTest.__init__(self, methodName=methodName)
1316 ThreadableTest.__init__(self)
1317
1318 def clientSetUp(self):
1319 self.source_port = test_support.find_unused_port()
1320
1321 def clientTearDown(self):
1322 self.cli.close()
1323 self.cli = None
1324 ThreadableTest.clientTearDown(self)
1325
1326 def _justAccept(self):
1327 conn, addr = self.serv.accept()
1328 conn.close()
1329
1330 testFamily = _justAccept
1331 def _testFamily(self):
1332 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1333 self.addCleanup(self.cli.close)
1334 self.assertEqual(self.cli.family, 2)
1335
1336 testSourceAddress = _justAccept
1337 def _testSourceAddress(self):
1338 self.cli = socket.create_connection((HOST, self.port), timeout=30,
1339 source_address=('', self.source_port))
1340 self.addCleanup(self.cli.close)
1341 self.assertEqual(self.cli.getsockname()[1], self.source_port)
1342 # The port number being used is sufficient to show that the bind()
1343 # call happened.
1344
1345 testTimeoutDefault = _justAccept
1346 def _testTimeoutDefault(self):
1347 # passing no explicit timeout uses socket's global default
1348 self.assertTrue(socket.getdefaulttimeout() is None)
1349 socket.setdefaulttimeout(42)
1350 try:
1351 self.cli = socket.create_connection((HOST, self.port))
1352 self.addCleanup(self.cli.close)
1353 finally:
1354 socket.setdefaulttimeout(None)
1355 self.assertEqual(self.cli.gettimeout(), 42)
1356
1357 testTimeoutNone = _justAccept
1358 def _testTimeoutNone(self):
1359 # None timeout means the same as sock.settimeout(None)
1360 self.assertTrue(socket.getdefaulttimeout() is None)
1361 socket.setdefaulttimeout(30)
1362 try:
1363 self.cli = socket.create_connection((HOST, self.port), timeout=None)
1364 self.addCleanup(self.cli.close)
1365 finally:
1366 socket.setdefaulttimeout(None)
1367 self.assertEqual(self.cli.gettimeout(), None)
1368
1369 testTimeoutValueNamed = _justAccept
1370 def _testTimeoutValueNamed(self):
1371 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1372 self.assertEqual(self.cli.gettimeout(), 30)
1373
1374 testTimeoutValueNonamed = _justAccept
1375 def _testTimeoutValueNonamed(self):
1376 self.cli = socket.create_connection((HOST, self.port), 30)
1377 self.addCleanup(self.cli.close)
1378 self.assertEqual(self.cli.gettimeout(), 30)
1379
1380@unittest.skipUnless(thread, 'Threading required for this test.')
1381class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1382
1383 def __init__(self, methodName='runTest'):
1384 SocketTCPTest.__init__(self, methodName=methodName)
1385 ThreadableTest.__init__(self)
1386
1387 def clientSetUp(self):
1388 pass
1389
1390 def clientTearDown(self):
1391 self.cli.close()
1392 self.cli = None
1393 ThreadableTest.clientTearDown(self)
1394
1395 def testInsideTimeout(self):
1396 conn, addr = self.serv.accept()
1397 self.addCleanup(conn.close)
1398 time.sleep(3)
1399 conn.send("done!")
1400 testOutsideTimeout = testInsideTimeout
1401
1402 def _testInsideTimeout(self):
1403 self.cli = sock = socket.create_connection((HOST, self.port))
1404 data = sock.recv(5)
1405 self.assertEqual(data, "done!")
1406
1407 def _testOutsideTimeout(self):
1408 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1409 self.assertRaises(socket.timeout, lambda: sock.recv(5))
1410
1411
1412class Urllib2FileobjectTest(unittest.TestCase):
1413
1414 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1415 # it close the socket if the close c'tor argument is true
1416
1417 def testClose(self):
1418 class MockSocket:
1419 closed = False
1420 def flush(self): pass
1421 def close(self): self.closed = True
1422
1423 # must not close unless we request it: the original use of _fileobject
1424 # by module socket requires that the underlying socket not be closed until
1425 # the _socketobject that created the _fileobject is closed
1426 s = MockSocket()
1427 f = socket._fileobject(s)
1428 f.close()
1429 self.assertTrue(not s.closed)
1430
1431 s = MockSocket()
1432 f = socket._fileobject(s, close=True)
1433 f.close()
1434 self.assertTrue(s.closed)
1435
1436class TCPTimeoutTest(SocketTCPTest):
1437
1438 def testTCPTimeout(self):
1439 def raise_timeout(*args, **kwargs):
1440 self.serv.settimeout(1.0)
1441 self.serv.accept()
1442 self.assertRaises(socket.timeout, raise_timeout,
1443 "Error generating a timeout exception (TCP)")
1444
1445 def testTimeoutZero(self):
1446 ok = False
1447 try:
1448 self.serv.settimeout(0.0)
1449 foo = self.serv.accept()
1450 except socket.timeout:
1451 self.fail("caught timeout instead of error (TCP)")
1452 except socket.error:
1453 ok = True
1454 except:
1455 self.fail("caught unexpected exception (TCP)")
1456 if not ok:
1457 self.fail("accept() returned success when we did not expect it")
1458
1459 def testInterruptedTimeout(self):
1460 # XXX I don't know how to do this test on MSWindows or any other
1461 # plaform that doesn't support signal.alarm() or os.kill(), though
1462 # the bug should have existed on all platforms.
1463 if not hasattr(signal, "alarm"):
1464 return # can only test on *nix
1465 self.serv.settimeout(5.0) # must be longer than alarm
1466 class Alarm(Exception):
1467 pass
1468 def alarm_handler(signal, frame):
1469 raise Alarm
1470 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1471 try:
1472 signal.alarm(2) # POSIX allows alarm to be up to 1 second early
1473 try:
1474 foo = self.serv.accept()
1475 except socket.timeout:
1476 self.fail("caught timeout instead of Alarm")
1477 except Alarm:
1478 pass
1479 except:
1480 self.fail("caught other exception instead of Alarm:"
1481 " %s(%s):\n%s" %
1482 (sys.exc_info()[:2] + (traceback.format_exc(),)))
1483 else:
1484 self.fail("nothing caught")
1485 finally:
1486 signal.alarm(0) # shut off alarm
1487 except Alarm:
1488 self.fail("got Alarm in wrong place")
1489 finally:
1490 # no alarm can be pending. Safe to restore old handler.
1491 signal.signal(signal.SIGALRM, old_alarm)
1492
1493class UDPTimeoutTest(SocketUDPTest):
1494
1495 def testUDPTimeout(self):
1496 def raise_timeout(*args, **kwargs):
1497 self.serv.settimeout(1.0)
1498 self.serv.recv(1024)
1499 self.assertRaises(socket.timeout, raise_timeout,
1500 "Error generating a timeout exception (UDP)")
1501
1502 def testTimeoutZero(self):
1503 ok = False
1504 try:
1505 self.serv.settimeout(0.0)
1506 foo = self.serv.recv(1024)
1507 except socket.timeout:
1508 self.fail("caught timeout instead of error (UDP)")
1509 except socket.error:
1510 ok = True
1511 except:
1512 self.fail("caught unexpected exception (UDP)")
1513 if not ok:
1514 self.fail("recv() returned success when we did not expect it")
1515
1516class TestExceptions(unittest.TestCase):
1517
1518 def testExceptionTree(self):
1519 self.assertTrue(issubclass(socket.error, Exception))
1520 self.assertTrue(issubclass(socket.herror, socket.error))
1521 self.assertTrue(issubclass(socket.gaierror, socket.error))
1522 self.assertTrue(issubclass(socket.timeout, socket.error))
1523
1524class TestLinuxAbstractNamespace(unittest.TestCase):
1525
1526 UNIX_PATH_MAX = 108
1527
1528 def testLinuxAbstractNamespace(self):
1529 address = "\x00python-test-hello\x00\xff"
1530 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1531 s1.bind(address)
1532 s1.listen(1)
1533 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1534 s2.connect(s1.getsockname())
1535 s1.accept()
1536 self.assertEqual(s1.getsockname(), address)
1537 self.assertEqual(s2.getpeername(), address)
1538
1539 def testMaxName(self):
1540 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1541 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1542 s.bind(address)
1543 self.assertEqual(s.getsockname(), address)
1544
1545 def testNameOverflow(self):
1546 address = "\x00" + "h" * self.UNIX_PATH_MAX
1547 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1548 self.assertRaises(socket.error, s.bind, address)
1549
1550
1551@unittest.skipUnless(thread, 'Threading required for this test.')
1552class BufferIOTest(SocketConnectedTest):
1553 """
1554 Test the buffer versions of socket.recv() and socket.send().
1555 """
1556 def __init__(self, methodName='runTest'):
1557 SocketConnectedTest.__init__(self, methodName=methodName)
1558
1559 def testRecvIntoArray(self):
1560 buf = array.array('c', ' '*1024)
1561 nbytes = self.cli_conn.recv_into(buf)
1562 self.assertEqual(nbytes, len(MSG))
1563 msg = buf.tostring()[:len(MSG)]
1564 self.assertEqual(msg, MSG)
1565
1566 def _testRecvIntoArray(self):
1567 with test_support.check_py3k_warnings():
1568 buf = buffer(MSG)
1569 self.serv_conn.send(buf)
1570
1571 def testRecvIntoBytearray(self):
1572 buf = bytearray(1024)
1573 nbytes = self.cli_conn.recv_into(buf)
1574 self.assertEqual(nbytes, len(MSG))
1575 msg = buf[:len(MSG)]
1576 self.assertEqual(msg, MSG)
1577
1578 _testRecvIntoBytearray = _testRecvIntoArray
1579
1580 def testRecvIntoMemoryview(self):
1581 buf = bytearray(1024)
1582 nbytes = self.cli_conn.recv_into(memoryview(buf))
1583 self.assertEqual(nbytes, len(MSG))
1584 msg = buf[:len(MSG)]
1585 self.assertEqual(msg, MSG)
1586
1587 _testRecvIntoMemoryview = _testRecvIntoArray
1588
1589 def testRecvFromIntoArray(self):
1590 buf = array.array('c', ' '*1024)
1591 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1592 self.assertEqual(nbytes, len(MSG))
1593 msg = buf.tostring()[:len(MSG)]
1594 self.assertEqual(msg, MSG)
1595
1596 def _testRecvFromIntoArray(self):
1597 with test_support.check_py3k_warnings():
1598 buf = buffer(MSG)
1599 self.serv_conn.send(buf)
1600
1601 def testRecvFromIntoBytearray(self):
1602 buf = bytearray(1024)
1603 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1604 self.assertEqual(nbytes, len(MSG))
1605 msg = buf[:len(MSG)]
1606 self.assertEqual(msg, MSG)
1607
1608 _testRecvFromIntoBytearray = _testRecvFromIntoArray
1609
1610 def testRecvFromIntoMemoryview(self):
1611 buf = bytearray(1024)
1612 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
1613 self.assertEqual(nbytes, len(MSG))
1614 msg = buf[:len(MSG)]
1615 self.assertEqual(msg, MSG)
1616
1617 _testRecvFromIntoMemoryview = _testRecvFromIntoArray
1618
1619
1620TIPC_STYPE = 2000
1621TIPC_LOWER = 200
1622TIPC_UPPER = 210
1623
1624def isTipcAvailable():
1625 """Check if the TIPC module is loaded
1626
1627 The TIPC module is not loaded automatically on Ubuntu and probably
1628 other Linux distros.
1629 """
1630 if not hasattr(socket, "AF_TIPC"):
1631 return False
1632 if not os.path.isfile("/proc/modules"):
1633 return False
1634 with open("/proc/modules") as f:
1635 for line in f:
1636 if line.startswith("tipc "):
1637 return True
1638 if test_support.verbose:
1639 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1640 return False
1641
1642class TIPCTest (unittest.TestCase):
1643 def testRDM(self):
1644 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1645 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1646
1647 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1648 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1649 TIPC_LOWER, TIPC_UPPER)
1650 srv.bind(srvaddr)
1651
1652 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1653 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1654 cli.sendto(MSG, sendaddr)
1655
1656 msg, recvaddr = srv.recvfrom(1024)
1657
1658 self.assertEqual(cli.getsockname(), recvaddr)
1659 self.assertEqual(msg, MSG)
1660
1661
1662class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1663 def __init__(self, methodName = 'runTest'):
1664 unittest.TestCase.__init__(self, methodName = methodName)
1665 ThreadableTest.__init__(self)
1666
1667 def setUp(self):
1668 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1669 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1670 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1671 TIPC_LOWER, TIPC_UPPER)
1672 self.srv.bind(srvaddr)
1673 self.srv.listen(5)
1674 self.serverExplicitReady()
1675 self.conn, self.connaddr = self.srv.accept()
1676
1677 def clientSetUp(self):
1678 # The is a hittable race between serverExplicitReady() and the
1679 # accept() call; sleep a little while to avoid it, otherwise
1680 # we could get an exception
1681 time.sleep(0.1)
1682 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1683 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1684 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1685 self.cli.connect(addr)
1686 self.cliaddr = self.cli.getsockname()
1687
1688 def testStream(self):
1689 msg = self.conn.recv(1024)
1690 self.assertEqual(msg, MSG)
1691 self.assertEqual(self.cliaddr, self.connaddr)
1692
1693 def _testStream(self):
1694 self.cli.send(MSG)
1695 self.cli.close()
1696
1697
1698def test_main():
1699 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1700 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
1701 UDPTimeoutTest ]
1702
1703 tests.extend([
1704 NonBlockingTCPTests,
1705 FileObjectClassTestCase,
1706 FileObjectInterruptedTestCase,
1707 UnbufferedFileObjectClassTestCase,
1708 LineBufferedFileObjectClassTestCase,
1709 SmallBufferedFileObjectClassTestCase,
1710 Urllib2FileobjectTest,
1711 NetworkConnectionNoServer,
1712 NetworkConnectionAttributesTest,
1713 NetworkConnectionBehaviourTest,
1714 ])
1715 if hasattr(socket, "socketpair"):
1716 tests.append(BasicSocketPairTest)
1717 if sys.platform == 'linux2':
1718 tests.append(TestLinuxAbstractNamespace)
1719 if isTipcAvailable():
1720 tests.append(TIPCTest)
1721 tests.append(TIPCThreadableTest)
1722
1723 thread_info = test_support.threading_setup()
1724 test_support.run_unittest(*tests)
1725 test_support.threading_cleanup(*thread_info)
1726
1727if __name__ == "__main__":
1728 test_main()
Note: See TracBrowser for help on using the repository browser.