Changeset 391 for python/trunk/Lib/test/test_socket.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_socket.py
r2 r391 7 7 import socket 8 8 import select 9 import thread, threading9 import _testcapi 10 10 import time 11 11 import traceback … … 14 14 import os 15 15 import array 16 import contextlib 16 17 from weakref import proxy 17 18 import signal 19 import math 20 21 def try_address(host, port=0, family=socket.AF_INET): 22 """Try to bind a socket on the given host:port and return True 23 if that has been possible.""" 24 try: 25 sock = socket.socket(family, socket.SOCK_STREAM) 26 sock.bind((host, port)) 27 except (socket.error, socket.gaierror): 28 return False 29 else: 30 sock.close() 31 return True 32 33 HOST = test_support.HOST 34 MSG = b'Michael Gilfix was here\n' 35 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) 36 37 try: 38 import thread 39 import threading 40 except ImportError: 41 thread = None 42 threading = None 18 43 19 44 HOST = test_support.HOST … … 122 147 def clientRun(self, test_func): 123 148 self.server_ready.wait() 149 self.clientSetUp() 124 150 self.client_ready.set() 125 self.clientSetUp()126 151 if not callable(test_func): 127 raise TypeError , "test_func must be a callable function"152 raise TypeError("test_func must be a callable function.") 128 153 try: 129 154 test_func() … … 133 158 134 159 def clientSetUp(self): 135 raise NotImplementedError , "clientSetUp must be implemented."160 raise NotImplementedError("clientSetUp must be implemented.") 136 161 137 162 def clientTearDown(self): … … 161 186 def clientSetUp(self): 162 187 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 188 189 def clientTearDown(self): 190 self.cli.close() 191 self.cli = None 192 ThreadableTest.clientTearDown(self) 163 193 164 194 class SocketConnectedTest(ThreadedTCPSocketTest): … … 238 268 def raise_gaierror(*args, **kwargs): 239 269 raise socket.gaierror 240 self. failUnlessRaises(socket.error, raise_error,270 self.assertRaises(socket.error, raise_error, 241 271 "Error raising socket exception.") 242 self. failUnlessRaises(socket.error, raise_herror,272 self.assertRaises(socket.error, raise_herror, 243 273 "Error raising socket exception.") 244 self. failUnlessRaises(socket.error, raise_gaierror,274 self.assertRaises(socket.error, raise_gaierror, 245 275 "Error raising socket exception.") 276 277 def testSendtoErrors(self): 278 # Testing that sendto doens't masks failures. See #10169. 279 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 280 self.addCleanup(s.close) 281 s.bind(('', 0)) 282 sockname = s.getsockname() 283 # 2 args 284 with self.assertRaises(UnicodeEncodeError): 285 s.sendto(u'\u2620', sockname) 286 with self.assertRaises(TypeError) as cm: 287 s.sendto(5j, sockname) 288 self.assertIn('not complex', str(cm.exception)) 289 with self.assertRaises(TypeError) as cm: 290 s.sendto('foo', None) 291 self.assertIn('not NoneType', str(cm.exception)) 292 # 3 args 293 with self.assertRaises(UnicodeEncodeError): 294 s.sendto(u'\u2620', 0, sockname) 295 with self.assertRaises(TypeError) as cm: 296 s.sendto(5j, 0, sockname) 297 self.assertIn('not complex', str(cm.exception)) 298 with self.assertRaises(TypeError) as cm: 299 s.sendto('foo', 0, None) 300 self.assertIn('not NoneType', str(cm.exception)) 301 with self.assertRaises(TypeError) as cm: 302 s.sendto('foo', 'bar', sockname) 303 self.assertIn('an integer is required', str(cm.exception)) 304 with self.assertRaises(TypeError) as cm: 305 s.sendto('foo', None, None) 306 self.assertIn('an integer is required', str(cm.exception)) 307 # wrong number of args 308 with self.assertRaises(TypeError) as cm: 309 s.sendto('foo') 310 self.assertIn('(1 given)', str(cm.exception)) 311 with self.assertRaises(TypeError) as cm: 312 s.sendto('foo', 0, sockname, 4) 313 self.assertIn('(4 given)', str(cm.exception)) 314 246 315 247 316 def testCrucialConstants(self): … … 264 333 # Probably name lookup wasn't set up right; skip this test 265 334 return 266 self.assert _(ip.find('.') >= 0, "Error resolving host to ip.")335 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 267 336 try: 268 337 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) … … 283 352 socket.getnameinfo(__name__,0) 284 353 except TypeError: 285 if sys.getrefcount(__name__) <> orig:286 self.fail("socket.getnameinfo loses a reference")354 self.assertEqual(sys.getrefcount(__name__), orig, 355 "socket.getnameinfo loses a reference") 287 356 288 357 def testInterpreterCrash(self): … … 327 396 # I've ordered this by protocols that have both a tcp and udp 328 397 # protocol, at least for modern Linuxes. 329 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6', 330 'freebsd7', 'freebsd8', 'darwin'): 398 if (sys.platform.startswith('linux') or 399 sys.platform.startswith('freebsd') or 400 sys.platform.startswith('netbsd') or 401 sys.platform == 'darwin'): 331 402 # avoid the 'echo' service on this platform, as there is an 332 403 # assumption breaking non-standard port/protocol entry … … 345 416 port2 = socket.getservbyname(service) 346 417 eq(port, port2) 347 # Try udp, but don't barf i tit doesn't exist418 # Try udp, but don't barf if it doesn't exist 348 419 try: 349 420 udpport = socket.getservbyname(service, 'udp') … … 357 428 if udpport is not None: 358 429 eq(socket.getservbyport(udpport, 'udp'), service) 430 # Make sure getservbyport does not accept out of range ports. 431 self.assertRaises(OverflowError, socket.getservbyport, -1) 432 self.assertRaises(OverflowError, socket.getservbyport, 65536) 359 433 360 434 def testDefaultTimeout(self): … … 391 465 # Test that issue1008086 and issue767150 are fixed. 392 466 # It must return 4 bytes. 393 self.assertEqual s('\x00'*4, socket.inet_aton('0.0.0.0'))394 self.assertEqual s('\xff'*4, socket.inet_aton('255.255.255.255'))467 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) 468 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) 395 469 396 470 def testIPv4toString(self): … … 400 474 g = lambda a: inet_pton(AF_INET, a) 401 475 402 self.assertEqual s('\x00\x00\x00\x00', f('0.0.0.0'))403 self.assertEqual s('\xff\x00\xff\x00', f('255.0.255.0'))404 self.assertEqual s('\xaa\xaa\xaa\xaa', f('170.170.170.170'))405 self.assertEqual s('\x01\x02\x03\x04', f('1.2.3.4'))406 self.assertEqual s('\xff\xff\xff\xff', f('255.255.255.255'))407 408 self.assertEqual s('\x00\x00\x00\x00', g('0.0.0.0'))409 self.assertEqual s('\xff\x00\xff\x00', g('255.0.255.0'))410 self.assertEqual s('\xaa\xaa\xaa\xaa', g('170.170.170.170'))411 self.assertEqual s('\xff\xff\xff\xff', g('255.255.255.255'))476 self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0')) 477 self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0')) 478 self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170')) 479 self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4')) 480 self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255')) 481 482 self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0')) 483 self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0')) 484 self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) 485 self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) 412 486 413 487 def testIPv6toString(self): … … 422 496 f = lambda a: inet_pton(AF_INET6, a) 423 497 424 self.assertEqual s('\x00' * 16, f('::'))425 self.assertEqual s('\x00' * 16, f('0::0'))426 self.assertEqual s('\x00\x01' + '\x00' * 14, f('1::'))427 self.assertEqual s(498 self.assertEqual('\x00' * 16, f('::')) 499 self.assertEqual('\x00' * 16, f('0::0')) 500 self.assertEqual('\x00\x01' + '\x00' * 14, f('1::')) 501 self.assertEqual( 428 502 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 429 503 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') … … 436 510 g = lambda a: inet_ntop(AF_INET, a) 437 511 438 self.assertEqual s('1.0.1.0', f('\x01\x00\x01\x00'))439 self.assertEqual s('170.85.170.85', f('\xaa\x55\xaa\x55'))440 self.assertEqual s('255.255.255.255', f('\xff\xff\xff\xff'))441 self.assertEqual s('1.2.3.4', f('\x01\x02\x03\x04'))442 443 self.assertEqual s('1.0.1.0', g('\x01\x00\x01\x00'))444 self.assertEqual s('170.85.170.85', g('\xaa\x55\xaa\x55'))445 self.assertEqual s('255.255.255.255', g('\xff\xff\xff\xff'))512 self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00')) 513 self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55')) 514 self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff')) 515 self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04')) 516 517 self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00')) 518 self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) 519 self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) 446 520 447 521 def testStringToIPv6(self): … … 456 530 f = lambda a: inet_ntop(AF_INET6, a) 457 531 458 self.assertEqual s('::', f('\x00' * 16))459 self.assertEqual s('::1', f('\x00' * 15 + '\x01'))460 self.assertEqual s(532 self.assertEqual('::', f('\x00' * 16)) 533 self.assertEqual('::1', f('\x00' * 15 + '\x01')) 534 self.assertEqual( 461 535 'aef:b01:506:1001:ffff:9997:55:170', 462 536 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') … … 465 539 # XXX The following don't test module-level functionality... 466 540 541 def _get_unused_port(self, bind_address='0.0.0.0'): 542 """Use a temporary socket to elicit an unused ephemeral port. 543 544 Args: 545 bind_address: Hostname or IP address to search for a port on. 546 547 Returns: A most likely to be unused port. 548 """ 549 tempsock = socket.socket() 550 tempsock.bind((bind_address, 0)) 551 host, port = tempsock.getsockname() 552 tempsock.close() 553 return port 554 467 555 def testSockName(self): 468 # Testing getsockname(). Use a temporary socket to elicit an unused 469 # ephemeral port that we can use later in the test. 470 tempsock = socket.socket() 471 tempsock.bind(("0.0.0.0", 0)) 472 (host, port) = tempsock.getsockname() 473 tempsock.close() 474 del tempsock 475 556 # Testing getsockname() 557 port = self._get_unused_port() 476 558 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 559 self.addCleanup(sock.close) 477 560 sock.bind(("0.0.0.0", port)) 478 561 name = sock.getsockname() … … 480 563 # it reasonable to get the host's addr in addition to 0.0.0.0. 481 564 # At least for eCos. This is required for the S/390 to pass. 482 my_ip_addr = socket.gethostbyname(socket.gethostname()) 483 self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 565 try: 566 my_ip_addr = socket.gethostbyname(socket.gethostname()) 567 except socket.error: 568 # Probably name lookup wasn't set up right; skip this test 569 return 570 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 484 571 self.assertEqual(name[1], port) 485 572 … … 488 575 # We know a socket should start without reuse==0 489 576 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 577 self.addCleanup(sock.close) 490 578 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 491 self. failIf(reuse != 0, "initial mode is reuse")579 self.assertFalse(reuse != 0, "initial mode is reuse") 492 580 493 581 def testSetSockOpt(self): 494 582 # Testing setsockopt() 495 583 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 584 self.addCleanup(sock.close) 496 585 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 497 586 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 498 self. failIf(reuse == 0, "failed to set reuse mode")587 self.assertFalse(reuse == 0, "failed to set reuse mode") 499 588 500 589 def testSendAfterClose(self): … … 513 602 sock.close() 514 603 604 def test_getsockaddrarg(self): 605 host = '0.0.0.0' 606 port = self._get_unused_port(bind_address=host) 607 big_port = port + 65536 608 neg_port = port - 65536 609 sock = socket.socket() 610 try: 611 self.assertRaises(OverflowError, sock.bind, (host, big_port)) 612 self.assertRaises(OverflowError, sock.bind, (host, neg_port)) 613 sock.bind((host, port)) 614 finally: 615 sock.close() 616 617 @unittest.skipUnless(os.name == "nt", "Windows specific") 515 618 def test_sock_ioctl(self): 516 if os.name != "nt": 517 return 518 self.assert_(hasattr(socket.socket, 'ioctl')) 519 self.assert_(hasattr(socket, 'SIO_RCVALL')) 520 self.assert_(hasattr(socket, 'RCVALL_ON')) 521 self.assert_(hasattr(socket, 'RCVALL_OFF')) 522 523 619 self.assertTrue(hasattr(socket.socket, 'ioctl')) 620 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 621 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 622 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 623 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 624 s = socket.socket() 625 self.addCleanup(s.close) 626 self.assertRaises(ValueError, s.ioctl, -1, None) 627 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 628 629 def testGetaddrinfo(self): 630 try: 631 socket.getaddrinfo('localhost', 80) 632 except socket.gaierror as err: 633 if err.errno == socket.EAI_SERVICE: 634 # see http://bugs.python.org/issue1282647 635 self.skipTest("buggy libc version") 636 raise 637 # len of every sequence is supposed to be == 5 638 for info in socket.getaddrinfo(HOST, None): 639 self.assertEqual(len(info), 5) 640 # host can be a domain name, a string representation of an 641 # IPv4/v6 address or None 642 socket.getaddrinfo('localhost', 80) 643 socket.getaddrinfo('127.0.0.1', 80) 644 socket.getaddrinfo(None, 80) 645 if SUPPORTS_IPV6: 646 socket.getaddrinfo('::1', 80) 647 # port can be a string service name such as "http", a numeric 648 # port number (int or long), or None 649 socket.getaddrinfo(HOST, "http") 650 socket.getaddrinfo(HOST, 80) 651 socket.getaddrinfo(HOST, 80L) 652 socket.getaddrinfo(HOST, None) 653 # test family and socktype filters 654 infos = socket.getaddrinfo(HOST, None, socket.AF_INET) 655 for family, _, _, _, _ in infos: 656 self.assertEqual(family, socket.AF_INET) 657 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 658 for _, socktype, _, _, _ in infos: 659 self.assertEqual(socktype, socket.SOCK_STREAM) 660 # test proto and flags arguments 661 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 662 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 663 # a server willing to support both IPv4 and IPv6 will 664 # usually do this 665 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 666 socket.AI_PASSIVE) 667 668 # Issue 17269 669 if hasattr(socket, 'AI_NUMERICSERV'): 670 socket.getaddrinfo("localhost", None, 0, 0, 0, socket.AI_NUMERICSERV) 671 672 def check_sendall_interrupted(self, with_timeout): 673 # socketpair() is not stricly required, but it makes things easier. 674 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 675 self.skipTest("signal.alarm and socket.socketpair required for this test") 676 # Our signal handlers clobber the C errno by calling a math function 677 # with an invalid domain value. 678 def ok_handler(*args): 679 self.assertRaises(ValueError, math.acosh, 0) 680 def raising_handler(*args): 681 self.assertRaises(ValueError, math.acosh, 0) 682 1 // 0 683 c, s = socket.socketpair() 684 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 685 try: 686 if with_timeout: 687 # Just above the one second minimum for signal.alarm 688 c.settimeout(1.5) 689 with self.assertRaises(ZeroDivisionError): 690 signal.alarm(1) 691 c.sendall(b"x" * test_support.SOCK_MAX_SIZE) 692 if with_timeout: 693 signal.signal(signal.SIGALRM, ok_handler) 694 signal.alarm(1) 695 self.assertRaises(socket.timeout, c.sendall, 696 b"x" * test_support.SOCK_MAX_SIZE) 697 finally: 698 signal.signal(signal.SIGALRM, old_alarm) 699 c.close() 700 s.close() 701 702 def test_sendall_interrupted(self): 703 self.check_sendall_interrupted(False) 704 705 def test_sendall_interrupted_with_timeout(self): 706 self.check_sendall_interrupted(True) 707 708 def test_listen_backlog(self): 709 for backlog in 0, -1: 710 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 711 srv.bind((HOST, 0)) 712 srv.listen(backlog) 713 srv.close() 714 715 # Issue 15989 716 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 717 srv.bind((HOST, 0)) 718 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 719 srv.close() 720 721 @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.') 722 def test_flowinfo(self): 723 self.assertRaises(OverflowError, socket.getnameinfo, 724 ('::1',0, 0xffffffff), 0) 725 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 726 try: 727 self.assertRaises(OverflowError, s.bind, ('::1', 0, -10)) 728 finally: 729 s.close() 730 731 732 @unittest.skipUnless(thread, 'Threading required for this test.') 524 733 class BasicTCPTest(SocketConnectedTest): 525 734 … … 583 792 fd = self.cli_conn.fileno() 584 793 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 794 self.addCleanup(sock.close) 585 795 msg = sock.recv(1024) 586 796 self.assertEqual(msg, MSG) 587 797 588 798 def _testFromFd(self): 799 self.serv_conn.send(MSG) 800 801 def testDup(self): 802 # Testing dup() 803 sock = self.cli_conn.dup() 804 self.addCleanup(sock.close) 805 msg = sock.recv(1024) 806 self.assertEqual(msg, MSG) 807 808 def _testDup(self): 589 809 self.serv_conn.send(MSG) 590 810 … … 600 820 def _testShutdown(self): 601 821 self.serv_conn.send(MSG) 822 # Issue 15989 823 self.assertRaises(OverflowError, self.serv_conn.shutdown, 824 _testcapi.INT_MAX + 1) 825 self.assertRaises(OverflowError, self.serv_conn.shutdown, 826 2 + (_testcapi.UINT_MAX + 1)) 602 827 self.serv_conn.shutdown(2) 603 828 829 @unittest.skipUnless(thread, 'Threading required for this test.') 604 830 class BasicUDPTest(ThreadedUDPSocketTest): 605 831 … … 630 856 self.cli.sendto(MSG, 0, (HOST, self.port)) 631 857 858 @unittest.skipUnless(thread, 'Threading required for this test.') 632 859 class TCPCloserTest(ThreadedTCPSocketTest): 633 860 … … 645 872 time.sleep(1.0) 646 873 874 @unittest.skipUnless(thread, 'Threading required for this test.') 647 875 class BasicSocketPairTest(SocketPairTest): 648 876 … … 664 892 self.assertEqual(msg, MSG) 665 893 894 @unittest.skipUnless(thread, 'Threading required for this test.') 666 895 class NonBlockingTCPTests(ThreadedTCPSocketTest): 667 896 … … 671 900 def testSetBlocking(self): 672 901 # Testing whether set blocking works 673 self.serv.setblocking(0) 902 self.serv.setblocking(True) 903 self.assertIsNone(self.serv.gettimeout()) 904 self.serv.setblocking(False) 905 self.assertEqual(self.serv.gettimeout(), 0.0) 674 906 start = time.time() 675 907 try: … … 678 910 pass 679 911 end = time.time() 680 self.assert_((end - start) < 1.0, "Error setting non-blocking mode.") 912 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") 913 # Issue 15989 914 if _testcapi.UINT_MAX < _testcapi.ULONG_MAX: 915 self.serv.setblocking(_testcapi.UINT_MAX + 1) 916 self.assertIsNone(self.serv.gettimeout()) 681 917 682 918 def _testSetBlocking(self): … … 695 931 if self.serv in read: 696 932 conn, addr = self.serv.accept() 933 conn.close() 697 934 else: 698 935 self.fail("Error trying to do accept after select.") … … 705 942 # Testing non-blocking connect 706 943 conn, addr = self.serv.accept() 944 conn.close() 707 945 708 946 def _testConnect(self): … … 723 961 if conn in read: 724 962 msg = conn.recv(len(MSG)) 963 conn.close() 725 964 self.assertEqual(msg, MSG) 726 965 else: … … 732 971 self.cli.send(MSG) 733 972 973 @unittest.skipUnless(thread, 'Threading required for this test.') 734 974 class FileObjectClassTestCase(SocketConnectedTest): 735 975 … … 745 985 def tearDown(self): 746 986 self.serv_file.close() 747 self.assert_(self.serv_file.closed) 987 self.assertTrue(self.serv_file.closed) 988 SocketConnectedTest.tearDown(self) 748 989 self.serv_file = None 749 SocketConnectedTest.tearDown(self)750 990 751 991 def clientSetUp(self): … … 755 995 def clientTearDown(self): 756 996 self.cli_file.close() 757 self.assert _(self.cli_file.closed)997 self.assertTrue(self.cli_file.closed) 758 998 self.cli_file = None 759 999 SocketConnectedTest.clientTearDown(self) … … 830 1070 831 1071 def testClosedAttr(self): 832 self.assert _(not self.serv_file.closed)1072 self.assertTrue(not self.serv_file.closed) 833 1073 834 1074 def _testClosedAttr(self): 835 self.assert_(not self.cli_file.closed) 1075 self.assertTrue(not self.cli_file.closed) 1076 1077 1078 class FileObjectInterruptedTestCase(unittest.TestCase): 1079 """Test that the file object correctly handles EINTR internally.""" 1080 1081 class MockSocket(object): 1082 def __init__(self, recv_funcs=()): 1083 # A generator that returns callables that we'll call for each 1084 # call to recv(). 1085 self._recv_step = iter(recv_funcs) 1086 1087 def recv(self, size): 1088 return self._recv_step.next()() 1089 1090 @staticmethod 1091 def _raise_eintr(): 1092 raise socket.error(errno.EINTR) 1093 1094 def _test_readline(self, size=-1, **kwargs): 1095 mock_sock = self.MockSocket(recv_funcs=[ 1096 lambda : "This is the first line\nAnd the sec", 1097 self._raise_eintr, 1098 lambda : "ond line is here\n", 1099 lambda : "", 1100 ]) 1101 fo = socket._fileobject(mock_sock, **kwargs) 1102 self.assertEqual(fo.readline(size), "This is the first line\n") 1103 self.assertEqual(fo.readline(size), "And the second line is here\n") 1104 1105 def _test_read(self, size=-1, **kwargs): 1106 mock_sock = self.MockSocket(recv_funcs=[ 1107 lambda : "This is the first line\nAnd the sec", 1108 self._raise_eintr, 1109 lambda : "ond line is here\n", 1110 lambda : "", 1111 ]) 1112 fo = socket._fileobject(mock_sock, **kwargs) 1113 self.assertEqual(fo.read(size), "This is the first line\n" 1114 "And the second line is here\n") 1115 1116 def test_default(self): 1117 self._test_readline() 1118 self._test_readline(size=100) 1119 self._test_read() 1120 self._test_read(size=100) 1121 1122 def test_with_1k_buffer(self): 1123 self._test_readline(bufsize=1024) 1124 self._test_readline(size=100, bufsize=1024) 1125 self._test_read(bufsize=1024) 1126 self._test_read(size=100, bufsize=1024) 1127 1128 def _test_readline_no_buffer(self, size=-1): 1129 mock_sock = self.MockSocket(recv_funcs=[ 1130 lambda : "aa", 1131 lambda : "\n", 1132 lambda : "BB", 1133 self._raise_eintr, 1134 lambda : "bb", 1135 lambda : "", 1136 ]) 1137 fo = socket._fileobject(mock_sock, bufsize=0) 1138 self.assertEqual(fo.readline(size), "aa\n") 1139 self.assertEqual(fo.readline(size), "BBbb") 1140 1141 def test_no_buffer(self): 1142 self._test_readline_no_buffer() 1143 self._test_readline_no_buffer(size=4) 1144 self._test_read(bufsize=0) 1145 self._test_read(size=100, bufsize=0) 1146 836 1147 837 1148 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): … … 864 1175 bufsize = 1 # Default-buffered for reading; line-buffered for writing 865 1176 1177 class SocketMemo(object): 1178 """A wrapper to keep track of sent data, needed to examine write behaviour""" 1179 def __init__(self, sock): 1180 self._sock = sock 1181 self.sent = [] 1182 1183 def send(self, data, flags=0): 1184 n = self._sock.send(data, flags) 1185 self.sent.append(data[:n]) 1186 return n 1187 1188 def sendall(self, data, flags=0): 1189 self._sock.sendall(data, flags) 1190 self.sent.append(data) 1191 1192 def __getattr__(self, attr): 1193 return getattr(self._sock, attr) 1194 1195 def getsent(self): 1196 return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent] 1197 1198 def setUp(self): 1199 FileObjectClassTestCase.setUp(self) 1200 self.serv_file._sock = self.SocketMemo(self.serv_file._sock) 1201 1202 def testLinebufferedWrite(self): 1203 # Write two lines, in small chunks 1204 msg = MSG.strip() 1205 print >> self.serv_file, msg, 1206 print >> self.serv_file, msg 1207 1208 # second line: 1209 print >> self.serv_file, msg, 1210 print >> self.serv_file, msg, 1211 print >> self.serv_file, msg 1212 1213 # third line 1214 print >> self.serv_file, '' 1215 1216 self.serv_file.flush() 1217 1218 msg1 = "%s %s\n"%(msg, msg) 1219 msg2 = "%s %s %s\n"%(msg, msg, msg) 1220 msg3 = "\n" 1221 self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3]) 1222 1223 def _testLinebufferedWrite(self): 1224 msg = MSG.strip() 1225 msg1 = "%s %s\n"%(msg, msg) 1226 msg2 = "%s %s %s\n"%(msg, msg, msg) 1227 msg3 = "\n" 1228 l1 = self.cli_file.readline() 1229 self.assertEqual(l1, msg1) 1230 l2 = self.cli_file.readline() 1231 self.assertEqual(l2, msg2) 1232 l3 = self.cli_file.readline() 1233 self.assertEqual(l3, msg3) 1234 866 1235 867 1236 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): … … 883 1252 884 1253 class NetworkConnectionNoServer(unittest.TestCase): 885 def testWithoutServer(self): 1254 class MockSocket(socket.socket): 1255 def connect(self, *args): 1256 raise socket.timeout('timed out') 1257 1258 @contextlib.contextmanager 1259 def mocked_socket_module(self): 1260 """Return a socket which times out on connect""" 1261 old_socket = socket.socket 1262 socket.socket = self.MockSocket 1263 try: 1264 yield 1265 finally: 1266 socket.socket = old_socket 1267 1268 def test_connect(self): 886 1269 port = test_support.find_unused_port() 887 self.failUnlessRaises( 888 socket.error, 889 lambda: socket.create_connection((HOST, port)) 890 ) 891 1270 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1271 self.addCleanup(cli.close) 1272 with self.assertRaises(socket.error) as cm: 1273 cli.connect((HOST, port)) 1274 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 1275 1276 def test_create_connection(self): 1277 # Issue #9792: errors raised by create_connection() should have 1278 # a proper errno attribute. 1279 port = test_support.find_unused_port() 1280 with self.assertRaises(socket.error) as cm: 1281 socket.create_connection((HOST, port)) 1282 1283 # Issue #16257: create_connection() calls getaddrinfo() against 1284 # 'localhost'. This may result in an IPV6 addr being returned 1285 # as well as an IPV4 one: 1286 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 1287 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 1288 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 1289 # 1290 # create_connection() enumerates through all the addresses returned 1291 # and if it doesn't successfully bind to any of them, it propagates 1292 # the last exception it encountered. 1293 # 1294 # On Solaris, ENETUNREACH is returned in this circumstance instead 1295 # of ECONNREFUSED. So, if that errno exists, add it to our list of 1296 # expected errnos. 1297 expected_errnos = [ errno.ECONNREFUSED, ] 1298 if hasattr(errno, 'ENETUNREACH'): 1299 expected_errnos.append(errno.ENETUNREACH) 1300 1301 self.assertIn(cm.exception.errno, expected_errnos) 1302 1303 def test_create_connection_timeout(self): 1304 # Issue #9792: create_connection() should not recast timeout errors 1305 # as generic socket errors. 1306 with self.mocked_socket_module(): 1307 with self.assertRaises(socket.timeout): 1308 socket.create_connection((HOST, 1234)) 1309 1310 1311 @unittest.skipUnless(thread, 'Threading required for this test.') 892 1312 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 893 1313 … … 897 1317 898 1318 def clientSetUp(self): 899 pass1319 self.source_port = test_support.find_unused_port() 900 1320 901 1321 def clientTearDown(self): … … 906 1326 def _justAccept(self): 907 1327 conn, addr = self.serv.accept() 1328 conn.close() 908 1329 909 1330 testFamily = _justAccept 910 1331 def _testFamily(self): 911 1332 self.cli = socket.create_connection((HOST, self.port), timeout=30) 1333 self.addCleanup(self.cli.close) 912 1334 self.assertEqual(self.cli.family, 2) 1335 1336 testSourceAddress = _justAccept 1337 def _testSourceAddress(self): 1338 self.cli = socket.create_connection((HOST, self.port), timeout=30, 1339 source_address=('', self.source_port)) 1340 self.addCleanup(self.cli.close) 1341 self.assertEqual(self.cli.getsockname()[1], self.source_port) 1342 # The port number being used is sufficient to show that the bind() 1343 # call happened. 913 1344 914 1345 testTimeoutDefault = _justAccept 915 1346 def _testTimeoutDefault(self): 916 1347 # passing no explicit timeout uses socket's global default 917 self.assert _(socket.getdefaulttimeout() is None)1348 self.assertTrue(socket.getdefaulttimeout() is None) 918 1349 socket.setdefaulttimeout(42) 919 1350 try: 920 1351 self.cli = socket.create_connection((HOST, self.port)) 1352 self.addCleanup(self.cli.close) 921 1353 finally: 922 1354 socket.setdefaulttimeout(None) 923 self.assertEqual s(self.cli.gettimeout(), 42)1355 self.assertEqual(self.cli.gettimeout(), 42) 924 1356 925 1357 testTimeoutNone = _justAccept 926 1358 def _testTimeoutNone(self): 927 1359 # None timeout means the same as sock.settimeout(None) 928 self.assert _(socket.getdefaulttimeout() is None)1360 self.assertTrue(socket.getdefaulttimeout() is None) 929 1361 socket.setdefaulttimeout(30) 930 1362 try: 931 1363 self.cli = socket.create_connection((HOST, self.port), timeout=None) 1364 self.addCleanup(self.cli.close) 932 1365 finally: 933 1366 socket.setdefaulttimeout(None) … … 942 1375 def _testTimeoutValueNonamed(self): 943 1376 self.cli = socket.create_connection((HOST, self.port), 30) 1377 self.addCleanup(self.cli.close) 944 1378 self.assertEqual(self.cli.gettimeout(), 30) 945 1379 1380 @unittest.skipUnless(thread, 'Threading required for this test.') 946 1381 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 947 1382 … … 960 1395 def testInsideTimeout(self): 961 1396 conn, addr = self.serv.accept() 1397 self.addCleanup(conn.close) 962 1398 time.sleep(3) 963 1399 conn.send("done!") … … 971 1407 def _testOutsideTimeout(self): 972 1408 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 973 self. failUnlessRaises(socket.timeout, lambda: sock.recv(5))1409 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 974 1410 975 1411 … … 991 1427 f = socket._fileobject(s) 992 1428 f.close() 993 self.assert _(not s.closed)1429 self.assertTrue(not s.closed) 994 1430 995 1431 s = MockSocket() 996 1432 f = socket._fileobject(s, close=True) 997 1433 f.close() 998 self.assert _(s.closed)1434 self.assertTrue(s.closed) 999 1435 1000 1436 class TCPTimeoutTest(SocketTCPTest): … … 1004 1440 self.serv.settimeout(1.0) 1005 1441 self.serv.accept() 1006 self. failUnlessRaises(socket.timeout, raise_timeout,1442 self.assertRaises(socket.timeout, raise_timeout, 1007 1443 "Error generating a timeout exception (TCP)") 1008 1444 … … 1055 1491 signal.signal(signal.SIGALRM, old_alarm) 1056 1492 1057 class UDPTimeoutTest(Socket TCPTest):1493 class UDPTimeoutTest(SocketUDPTest): 1058 1494 1059 1495 def testUDPTimeout(self): … … 1061 1497 self.serv.settimeout(1.0) 1062 1498 self.serv.recv(1024) 1063 self. failUnlessRaises(socket.timeout, raise_timeout,1499 self.assertRaises(socket.timeout, raise_timeout, 1064 1500 "Error generating a timeout exception (UDP)") 1065 1501 … … 1081 1517 1082 1518 def testExceptionTree(self): 1083 self.assert _(issubclass(socket.error, Exception))1084 self.assert _(issubclass(socket.herror, socket.error))1085 self.assert _(issubclass(socket.gaierror, socket.error))1086 self.assert _(issubclass(socket.timeout, socket.error))1519 self.assertTrue(issubclass(socket.error, Exception)) 1520 self.assertTrue(issubclass(socket.herror, socket.error)) 1521 self.assertTrue(issubclass(socket.gaierror, socket.error)) 1522 self.assertTrue(issubclass(socket.timeout, socket.error)) 1087 1523 1088 1524 class TestLinuxAbstractNamespace(unittest.TestCase): … … 1113 1549 1114 1550 1551 @unittest.skipUnless(thread, 'Threading required for this test.') 1115 1552 class BufferIOTest(SocketConnectedTest): 1116 1553 """ … … 1120 1557 SocketConnectedTest.__init__(self, methodName=methodName) 1121 1558 1122 def testRecvInto (self):1559 def testRecvIntoArray(self): 1123 1560 buf = array.array('c', ' '*1024) 1124 1561 nbytes = self.cli_conn.recv_into(buf) … … 1127 1564 self.assertEqual(msg, MSG) 1128 1565 1129 def _testRecvInto(self): 1130 buf = buffer(MSG) 1566 def _testRecvIntoArray(self): 1567 with test_support.check_py3k_warnings(): 1568 buf = buffer(MSG) 1131 1569 self.serv_conn.send(buf) 1132 1570 1133 def testRecvFromInto(self): 1571 def testRecvIntoBytearray(self): 1572 buf = bytearray(1024) 1573 nbytes = self.cli_conn.recv_into(buf) 1574 self.assertEqual(nbytes, len(MSG)) 1575 msg = buf[:len(MSG)] 1576 self.assertEqual(msg, MSG) 1577 1578 _testRecvIntoBytearray = _testRecvIntoArray 1579 1580 def testRecvIntoMemoryview(self): 1581 buf = bytearray(1024) 1582 nbytes = self.cli_conn.recv_into(memoryview(buf)) 1583 self.assertEqual(nbytes, len(MSG)) 1584 msg = buf[:len(MSG)] 1585 self.assertEqual(msg, MSG) 1586 1587 _testRecvIntoMemoryview = _testRecvIntoArray 1588 1589 def testRecvFromIntoArray(self): 1134 1590 buf = array.array('c', ' '*1024) 1135 1591 nbytes, addr = self.cli_conn.recvfrom_into(buf) … … 1138 1594 self.assertEqual(msg, MSG) 1139 1595 1140 def _testRecvFromInto(self): 1141 buf = buffer(MSG) 1596 def _testRecvFromIntoArray(self): 1597 with test_support.check_py3k_warnings(): 1598 buf = buffer(MSG) 1142 1599 self.serv_conn.send(buf) 1600 1601 def testRecvFromIntoBytearray(self): 1602 buf = bytearray(1024) 1603 nbytes, addr = self.cli_conn.recvfrom_into(buf) 1604 self.assertEqual(nbytes, len(MSG)) 1605 msg = buf[:len(MSG)] 1606 self.assertEqual(msg, MSG) 1607 1608 _testRecvFromIntoBytearray = _testRecvFromIntoArray 1609 1610 def testRecvFromIntoMemoryview(self): 1611 buf = bytearray(1024) 1612 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 1613 self.assertEqual(nbytes, len(MSG)) 1614 msg = buf[:len(MSG)] 1615 self.assertEqual(msg, MSG) 1616 1617 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 1143 1618 1144 1619 … … 1223 1698 def test_main(): 1224 1699 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 1225 TestExceptions, BufferIOTest, BasicTCPTest2] 1226 if sys.platform != 'mac': 1227 tests.extend([ BasicUDPTest, UDPTimeoutTest ]) 1700 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, 1701 UDPTimeoutTest ] 1228 1702 1229 1703 tests.extend([ 1230 1704 NonBlockingTCPTests, 1231 1705 FileObjectClassTestCase, 1706 FileObjectInterruptedTestCase, 1232 1707 UnbufferedFileObjectClassTestCase, 1233 1708 LineBufferedFileObjectClassTestCase,
Note:
See TracChangeset
for help on using the changeset viewer.