Changeset 391 for python/trunk/Lib/test/test_ssl.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_ssl.py
r2 r391 7 7 import socket 8 8 import select 9 import time 10 import gc 11 import os 9 12 import errno 10 import subprocess11 import time12 import os13 13 import pprint 14 14 import urllib, urlparse 15 import shutil16 15 import traceback 16 import weakref 17 import functools 18 import platform 17 19 18 20 from BaseHTTPServer import HTTPServer 19 21 from SimpleHTTPServer import SimpleHTTPRequestHandler 20 22 21 # Optionally test SSL support, if we have it in the tested platform 22 skip_expected = False 23 try: 24 import ssl 25 except ImportError: 26 skip_expected = True 23 ssl = test_support.import_module("ssl") 27 24 28 25 HOST = test_support.HOST 29 26 CERTFILE = None 30 27 SVN_PYTHON_ORG_ROOT_CERT = None 28 NULLBYTECERT = None 31 29 32 30 def handle_error(prefix): … … 35 33 sys.stdout.write(prefix + exc_format) 36 34 37 def testSimpleSSLwrap(self): 35 36 class BasicTests(unittest.TestCase): 37 38 def test_sslwrap_simple(self): 39 # A crude test for the legacy API 38 40 try: 39 41 ssl.sslwrap_simple(socket.socket(socket.AF_INET)) … … 51 53 raise 52 54 53 class BasicTests(unittest.TestCase): 54 55 def testSSLconnect(self): 56 if not test_support.is_resource_enabled('network'): 57 return 58 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 59 cert_reqs=ssl.CERT_NONE) 60 s.connect(("svn.python.org", 443)) 61 c = s.getpeercert() 62 if c: 63 raise test_support.TestFailed("Peer cert %s shouldn't be here!") 64 s.close() 65 66 # this should fail because we have no verification certs 67 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 68 cert_reqs=ssl.CERT_REQUIRED) 55 # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 56 def skip_if_broken_ubuntu_ssl(func): 57 if hasattr(ssl, 'PROTOCOL_SSLv2'): 58 # We need to access the lower-level wrapper in order to create an 59 # implicit SSL context without trying to connect or listen. 69 60 try: 70 s.connect(("svn.python.org", 443)) 71 except ssl.SSLError: 61 import _ssl 62 except ImportError: 63 # The returned function won't get executed, just ignore the error 72 64 pass 73 finally: 74 s.close() 75 76 def testCrucialConstants(self): 77 ssl.PROTOCOL_SSLv2 65 @functools.wraps(func) 66 def f(*args, **kwargs): 67 try: 68 s = socket.socket(socket.AF_INET) 69 _ssl.sslwrap(s._sock, 0, None, None, 70 ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None) 71 except ssl.SSLError as e: 72 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 73 platform.linux_distribution() == ('debian', 'squeeze/sid', '') 74 and 'Invalid SSL protocol variant specified' in str(e)): 75 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") 76 return func(*args, **kwargs) 77 return f 78 else: 79 return func 80 81 82 class BasicSocketTests(unittest.TestCase): 83 84 def test_constants(self): 85 #ssl.PROTOCOL_SSLv2 78 86 ssl.PROTOCOL_SSLv23 79 87 ssl.PROTOCOL_SSLv3 … … 83 91 ssl.CERT_REQUIRED 84 92 85 def test RAND(self):93 def test_random(self): 86 94 v = ssl.RAND_status() 87 95 if test_support.verbose: … … 89 97 % (v, (v and "sufficient randomness") or 90 98 "insufficient randomness")) 91 try: 92 ssl.RAND_egd(1) 93 except TypeError: 94 pass 95 else: 96 print "didn't raise TypeError" 99 self.assertRaises(TypeError, ssl.RAND_egd, 1) 100 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 97 101 ssl.RAND_add("this is a random string", 75.0) 98 102 99 def test ParseCert(self):103 def test_parse_cert(self): 100 104 # note that this uses an 'unofficial' function in _ssl.c, 101 105 # provided solely for this test, to exercise the certificate … … 104 108 if test_support.verbose: 105 109 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 106 107 def testDERtoPEM(self): 108 109 pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read() 110 self.assertEqual(p['subject'], 111 ((('countryName', 'XY'),), 112 (('localityName', 'Castle Anthrax'),), 113 (('organizationName', 'Python Software Foundation'),), 114 (('commonName', 'localhost'),)) 115 ) 116 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 117 # Issue #13034: the subjectAltName in some certificates 118 # (notably projects.developer.nokia.com:443) wasn't parsed 119 p = ssl._ssl._test_decode_cert(NOKIACERT) 120 if test_support.verbose: 121 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 122 self.assertEqual(p['subjectAltName'], 123 (('DNS', 'projects.developer.nokia.com'), 124 ('DNS', 'projects.forum.nokia.com')) 125 ) 126 127 def test_parse_cert_CVE_2013_4238(self): 128 p = ssl._ssl._test_decode_cert(NULLBYTECERT) 129 if test_support.verbose: 130 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 131 subject = ((('countryName', 'US'),), 132 (('stateOrProvinceName', 'Oregon'),), 133 (('localityName', 'Beaverton'),), 134 (('organizationName', 'Python Software Foundation'),), 135 (('organizationalUnitName', 'Python Core Development'),), 136 (('commonName', 'null.python.org\x00example.org'),), 137 (('emailAddress', 'python-dev@python.org'),)) 138 self.assertEqual(p['subject'], subject) 139 self.assertEqual(p['issuer'], subject) 140 if ssl.OPENSSL_VERSION_INFO >= (0, 9, 8): 141 san = (('DNS', 'altnull.python.org\x00example.com'), 142 ('email', 'null@python.org\x00user@example.org'), 143 ('URI', 'http://null.python.org\x00http://example.org'), 144 ('IP Address', '192.0.2.1'), 145 ('IP Address', '2001:DB8:0:0:0:0:0:1\n')) 146 else: 147 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName 148 san = (('DNS', 'altnull.python.org\x00example.com'), 149 ('email', 'null@python.org\x00user@example.org'), 150 ('URI', 'http://null.python.org\x00http://example.org'), 151 ('IP Address', '192.0.2.1'), 152 ('IP Address', '<invalid>')) 153 154 self.assertEqual(p['subjectAltName'], san) 155 156 def test_DER_to_PEM(self): 157 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: 158 pem = f.read() 110 159 d1 = ssl.PEM_cert_to_DER_cert(pem) 111 160 p2 = ssl.DER_cert_to_PEM_cert(d1) 112 161 d2 = ssl.PEM_cert_to_DER_cert(p2) 113 if (d1 != d2): 114 raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed") 162 self.assertEqual(d1, d2) 163 if not p2.startswith(ssl.PEM_HEADER + '\n'): 164 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 165 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): 166 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) 167 168 def test_openssl_version(self): 169 n = ssl.OPENSSL_VERSION_NUMBER 170 t = ssl.OPENSSL_VERSION_INFO 171 s = ssl.OPENSSL_VERSION 172 self.assertIsInstance(n, (int, long)) 173 self.assertIsInstance(t, tuple) 174 self.assertIsInstance(s, str) 175 # Some sanity checks follow 176 # >= 0.9 177 self.assertGreaterEqual(n, 0x900000) 178 # < 2.0 179 self.assertLess(n, 0x20000000) 180 major, minor, fix, patch, status = t 181 self.assertGreaterEqual(major, 0) 182 self.assertLess(major, 2) 183 self.assertGreaterEqual(minor, 0) 184 self.assertLess(minor, 256) 185 self.assertGreaterEqual(fix, 0) 186 self.assertLess(fix, 256) 187 self.assertGreaterEqual(patch, 0) 188 self.assertLessEqual(patch, 26) 189 self.assertGreaterEqual(status, 0) 190 self.assertLessEqual(status, 15) 191 # Version string as returned by OpenSSL, the format might change 192 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), 193 (s, t)) 194 195 def test_ciphers(self): 196 if not test_support.is_resource_enabled('network'): 197 return 198 remote = ("svn.python.org", 443) 199 with test_support.transient_internet(remote[0]): 200 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 201 cert_reqs=ssl.CERT_NONE, ciphers="ALL") 202 s.connect(remote) 203 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 204 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") 205 s.connect(remote) 206 # Error checking occurs when connecting, because the SSL context 207 # isn't created before. 208 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 209 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") 210 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 211 s.connect(remote) 212 213 @test_support.cpython_only 214 def test_refcycle(self): 215 # Issue #7943: an SSL object doesn't create reference cycles with 216 # itself. 217 s = socket.socket(socket.AF_INET) 218 ss = ssl.wrap_socket(s) 219 wr = weakref.ref(ss) 220 del ss 221 self.assertEqual(wr(), None) 222 223 def test_wrapped_unconnected(self): 224 # The _delegate_methods in socket.py are correctly delegated to by an 225 # unconnected SSLSocket, so they will raise a socket.error rather than 226 # something unexpected like TypeError. 227 s = socket.socket(socket.AF_INET) 228 ss = ssl.wrap_socket(s) 229 self.assertRaises(socket.error, ss.recv, 1) 230 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) 231 self.assertRaises(socket.error, ss.recvfrom, 1) 232 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) 233 self.assertRaises(socket.error, ss.send, b'x') 234 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) 235 115 236 116 237 class NetworkedTests(unittest.TestCase): 117 238 118 def testConnect(self): 119 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 120 cert_reqs=ssl.CERT_NONE) 121 s.connect(("svn.python.org", 443)) 122 c = s.getpeercert() 123 if c: 124 raise test_support.TestFailed("Peer cert %s shouldn't be here!") 125 s.close() 126 127 # this should fail because we have no verification certs 128 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 129 cert_reqs=ssl.CERT_REQUIRED) 130 try: 239 def test_connect(self): 240 with test_support.transient_internet("svn.python.org"): 241 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 242 cert_reqs=ssl.CERT_NONE) 131 243 s.connect(("svn.python.org", 443)) 132 except ssl.SSLError:133 pass134 finally:244 c = s.getpeercert() 245 if c: 246 self.fail("Peer cert %s shouldn't be here!") 135 247 s.close() 136 248 137 # this should succeed because we specify the root cert 138 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 139 cert_reqs=ssl.CERT_REQUIRED, 140 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 141 try: 249 # this should fail because we have no verification certs 250 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 251 cert_reqs=ssl.CERT_REQUIRED) 252 try: 253 s.connect(("svn.python.org", 443)) 254 except ssl.SSLError: 255 pass 256 finally: 257 s.close() 258 259 # this should succeed because we specify the root cert 260 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 261 cert_reqs=ssl.CERT_REQUIRED, 262 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 263 try: 264 s.connect(("svn.python.org", 443)) 265 finally: 266 s.close() 267 268 def test_connect_ex(self): 269 # Issue #11326: check connect_ex() implementation 270 with test_support.transient_internet("svn.python.org"): 271 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 272 cert_reqs=ssl.CERT_REQUIRED, 273 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 274 try: 275 self.assertEqual(0, s.connect_ex(("svn.python.org", 443))) 276 self.assertTrue(s.getpeercert()) 277 finally: 278 s.close() 279 280 def test_non_blocking_connect_ex(self): 281 # Issue #11326: non-blocking connect_ex() should allow handshake 282 # to proceed after the socket gets ready. 283 with test_support.transient_internet("svn.python.org"): 284 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 285 cert_reqs=ssl.CERT_REQUIRED, 286 ca_certs=SVN_PYTHON_ORG_ROOT_CERT, 287 do_handshake_on_connect=False) 288 try: 289 s.setblocking(False) 290 rc = s.connect_ex(('svn.python.org', 443)) 291 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere 292 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) 293 # Wait for connect to finish 294 select.select([], [s], [], 5.0) 295 # Non-blocking handshake 296 while True: 297 try: 298 s.do_handshake() 299 break 300 except ssl.SSLError as err: 301 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 302 select.select([s], [], [], 5.0) 303 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 304 select.select([], [s], [], 5.0) 305 else: 306 raise 307 # SSL established 308 self.assertTrue(s.getpeercert()) 309 finally: 310 s.close() 311 312 def test_timeout_connect_ex(self): 313 # Issue #12065: on a timeout, connect_ex() should return the original 314 # errno (mimicking the behaviour of non-SSL sockets). 315 with test_support.transient_internet("svn.python.org"): 316 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 317 cert_reqs=ssl.CERT_REQUIRED, 318 ca_certs=SVN_PYTHON_ORG_ROOT_CERT, 319 do_handshake_on_connect=False) 320 try: 321 s.settimeout(0.0000001) 322 rc = s.connect_ex(('svn.python.org', 443)) 323 if rc == 0: 324 self.skipTest("svn.python.org responded too quickly") 325 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 326 finally: 327 s.close() 328 329 def test_connect_ex_error(self): 330 with test_support.transient_internet("svn.python.org"): 331 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 332 cert_reqs=ssl.CERT_REQUIRED, 333 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 334 try: 335 self.assertEqual(errno.ECONNREFUSED, 336 s.connect_ex(("svn.python.org", 444))) 337 finally: 338 s.close() 339 340 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") 341 def test_makefile_close(self): 342 # Issue #5238: creating a file-like object with makefile() shouldn't 343 # delay closing the underlying "real socket" (here tested with its 344 # file descriptor, hence skipping the test under Windows). 345 with test_support.transient_internet("svn.python.org"): 346 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 347 ss.connect(("svn.python.org", 443)) 348 fd = ss.fileno() 349 f = ss.makefile() 350 f.close() 351 # The fd is still open 352 os.read(fd, 0) 353 # Closing the SSL socket should close the fd too 354 ss.close() 355 gc.collect() 356 with self.assertRaises(OSError) as e: 357 os.read(fd, 0) 358 self.assertEqual(e.exception.errno, errno.EBADF) 359 360 def test_non_blocking_handshake(self): 361 with test_support.transient_internet("svn.python.org"): 362 s = socket.socket(socket.AF_INET) 142 363 s.connect(("svn.python.org", 443)) 143 except ssl.SSLError, x: 144 raise test_support.TestFailed("Unexpected exception %s" % x) 145 finally: 364 s.setblocking(False) 365 s = ssl.wrap_socket(s, 366 cert_reqs=ssl.CERT_NONE, 367 do_handshake_on_connect=False) 368 count = 0 369 while True: 370 try: 371 count += 1 372 s.do_handshake() 373 break 374 except ssl.SSLError, err: 375 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 376 select.select([s], [], []) 377 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 378 select.select([], [s], []) 379 else: 380 raise 146 381 s.close() 147 148 149 def testNonBlockingHandshake(self): 150 s = socket.socket(socket.AF_INET) 151 s.connect(("svn.python.org", 443)) 152 s.setblocking(False) 153 s = ssl.wrap_socket(s, 154 cert_reqs=ssl.CERT_NONE, 155 do_handshake_on_connect=False) 156 count = 0 157 while True: 382 if test_support.verbose: 383 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) 384 385 def test_get_server_certificate(self): 386 with test_support.transient_internet("svn.python.org"): 387 pem = ssl.get_server_certificate(("svn.python.org", 443)) 388 if not pem: 389 self.fail("No server certificate on svn.python.org:443!") 390 158 391 try: 159 count += 1 160 s.do_handshake() 161 break 162 except ssl.SSLError, err: 163 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 164 select.select([s], [], []) 165 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 166 select.select([], [s], []) 167 else: 168 raise 169 s.close() 170 if test_support.verbose: 171 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) 172 173 def testFetchServerCert(self): 174 175 pem = ssl.get_server_certificate(("svn.python.org", 443)) 176 if not pem: 177 raise test_support.TestFailed("No server certificate on svn.python.org:443!") 178 179 try: 180 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) 181 except ssl.SSLError: 182 #should fail 183 pass 184 else: 185 raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem) 186 187 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 188 if not pem: 189 raise test_support.TestFailed("No server certificate on svn.python.org:443!") 190 if test_support.verbose: 191 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) 392 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) 393 except ssl.SSLError: 394 #should fail 395 pass 396 else: 397 self.fail("Got server certificate %s for svn.python.org!" % pem) 398 399 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 400 if not pem: 401 self.fail("No server certificate on svn.python.org:443!") 402 if test_support.verbose: 403 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) 404 405 def test_algorithms(self): 406 # Issue #8484: all algorithms should be available when verifying a 407 # certificate. 408 # SHA256 was added in OpenSSL 0.9.8 409 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): 410 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) 411 self.skipTest("remote host needs SNI, only available on Python 3.2+") 412 # NOTE: https://sha2.hboeck.de is another possible test host 413 remote = ("sha256.tbs-internet.com", 443) 414 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") 415 with test_support.transient_internet("sha256.tbs-internet.com"): 416 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 417 cert_reqs=ssl.CERT_REQUIRED, 418 ca_certs=sha256_cert,) 419 try: 420 s.connect(remote) 421 if test_support.verbose: 422 sys.stdout.write("\nCipher with %r is %r\n" % 423 (remote, s.cipher())) 424 sys.stdout.write("Certificate is:\n%s\n" % 425 pprint.pformat(s.getpeercert())) 426 finally: 427 s.close() 192 428 193 429 … … 197 433 _have_threads = False 198 434 else: 199 200 435 _have_threads = True 201 436 … … 229 464 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") 230 465 231 def wrap_conn 466 def wrap_conn(self): 232 467 try: 233 468 self.sslconn = ssl.wrap_socket(self.sock, server_side=True, … … 235 470 ssl_version=self.server.protocol, 236 471 ca_certs=self.server.cacerts, 237 cert_reqs=self.server.certreqs) 238 except: 472 cert_reqs=self.server.certreqs, 473 ciphers=self.server.ciphers) 474 except ssl.SSLError as e: 475 # XXX Various errors can have happened here, for example 476 # a mismatching protocol version, an invalid certificate, 477 # or a low-level bug. This should be made more discriminating. 478 self.server.conn_errors.append(e) 239 479 if self.server.chatty: 240 480 handle_error("\n server: bad connection attempt from " + 241 481 str(self.sock.getpeername()) + ":\n") 242 482 self.close() 243 if not self.server.expect_bad_connects: 244 # here, we want to stop the server, because this shouldn't 245 # happen in the context of our test case 246 self.running = False 247 # normally, we'd just stop here, but for the test 248 # harness, we want to stop the server 249 self.server.stop() 483 self.running = False 484 self.server.stop() 250 485 return False 251 252 486 else: 253 487 return True … … 271 505 self.sock._sock.close() 272 506 273 def run 507 def run(self): 274 508 self.running = True 275 509 if not self.server.starttls_server: … … 320 554 # harness, we want to stop the server 321 555 self.server.stop() 322 except:323 handle_error('')324 556 325 557 def __init__(self, certificate, ssl_version=None, 326 certreqs=None, cacerts=None, expect_bad_connects=False,558 certreqs=None, cacerts=None, 327 559 chatty=True, connectionchatty=False, starttls_server=False, 328 wrap_accepting_socket=False ):560 wrap_accepting_socket=False, ciphers=None): 329 561 330 562 if ssl_version is None: … … 336 568 self.certreqs = certreqs 337 569 self.cacerts = cacerts 338 self. expect_bad_connects = expect_bad_connects570 self.ciphers = ciphers 339 571 self.chatty = chatty 340 572 self.connectionchatty = connectionchatty … … 347 579 cert_reqs = self.certreqs, 348 580 ca_certs = self.cacerts, 349 ssl_version = self.protocol) 581 ssl_version = self.protocol, 582 ciphers = self.ciphers) 350 583 if test_support.verbose and self.chatty: 351 584 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) 352 585 self.port = test_support.bind_port(self.sock) 353 586 self.active = False 587 self.conn_errors = [] 354 588 threading.Thread.__init__(self) 355 589 self.daemon = True 356 590 357 def start (self, flag=None): 591 def __enter__(self): 592 self.start(threading.Event()) 593 self.flag.wait() 594 return self 595 596 def __exit__(self, *args): 597 self.stop() 598 self.join() 599 600 def start(self, flag=None): 358 601 self.flag = flag 359 602 threading.Thread.start(self) 360 603 361 def run 362 self.sock.settimeout(0. 5)604 def run(self): 605 self.sock.settimeout(0.05) 363 606 self.sock.listen(5) 364 607 self.active = True … … 374 617 handler = self.ConnectionHandler(self, newconn) 375 618 handler.start() 619 handler.join() 376 620 except socket.timeout: 377 621 pass 378 622 except KeyboardInterrupt: 379 623 self.stop() 380 except:381 if self.chatty:382 handle_error("Test server failure:\n")383 624 self.sock.close() 384 625 385 def stop 626 def stop(self): 386 627 self.active = False 387 628 388 629 class AsyncoreEchoServer(threading.Thread): 389 630 390 class EchoServer 391 392 class ConnectionHandler 631 class EchoServer(asyncore.dispatcher): 632 633 class ConnectionHandler(asyncore.dispatcher_with_send): 393 634 394 635 def __init__(self, conn, certfile): … … 396 637 self.socket = ssl.wrap_socket(conn, server_side=True, 397 638 certfile=certfile, 398 do_handshake_on_connect=True) 639 do_handshake_on_connect=False) 640 self._ssl_accepting = True 399 641 400 642 def readable(self): … … 404 646 return True 405 647 648 def _do_ssl_handshake(self): 649 try: 650 self.socket.do_handshake() 651 except ssl.SSLError, err: 652 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 653 ssl.SSL_ERROR_WANT_WRITE): 654 return 655 elif err.args[0] == ssl.SSL_ERROR_EOF: 656 return self.handle_close() 657 raise 658 except socket.error, err: 659 if err.args[0] == errno.ECONNABORTED: 660 return self.handle_close() 661 else: 662 self._ssl_accepting = False 663 406 664 def handle_read(self): 407 data = self.recv(1024) 408 self.send(data.lower()) 665 if self._ssl_accepting: 666 self._do_ssl_handshake() 667 else: 668 data = self.recv(1024) 669 if data and data.strip() != 'over': 670 self.send(data.lower()) 409 671 410 672 def handle_close(self): … … 443 705 return "<%s %s>" % (self.__class__.__name__, self.server) 444 706 445 def start (self, flag=None): 707 def __enter__(self): 708 self.start(threading.Event()) 709 self.flag.wait() 710 return self 711 712 def __exit__(self, *args): 713 if test_support.verbose: 714 sys.stdout.write(" cleanup: stopping server.\n") 715 self.stop() 716 if test_support.verbose: 717 sys.stdout.write(" cleanup: joining server thread.\n") 718 self.join() 719 if test_support.verbose: 720 sys.stdout.write(" cleanup: successfully joined.\n") 721 722 def start(self, flag=None): 446 723 self.flag = flag 447 724 threading.Thread.start(self) 448 725 449 def run 726 def run(self): 450 727 self.active = True 451 728 if self.flag: 452 729 self.flag.set() 453 730 while self.active: 454 try: 455 asyncore.loop(1) 456 except: 457 pass 458 459 def stop (self): 731 asyncore.loop(0.05) 732 733 def stop(self): 460 734 self.active = False 461 735 self.server.close() … … 466 740 467 741 def __init__(self, server_address, RequestHandlerClass, certfile): 468 469 742 HTTPServer.__init__(self, server_address, RequestHandlerClass) 470 743 # we assume the certfile contains both private key and certificate 471 744 self.certfile = certfile 472 self.active = False473 self.active_lock = threading.Lock()474 745 self.allow_reuse_address = True 475 746 … … 480 751 self.server_port)) 481 752 482 def get_request 753 def get_request(self): 483 754 # override this to wrap socket with SSL 484 755 sock, addr = self.socket.accept() … … 487 758 return sslconn, addr 488 759 489 # The methods overridden below this are mainly so that we490 # can run it in a thread and be able to stop it from another491 # You probably wouldn't need them in other uses.492 493 def server_activate(self):494 # We want to run this in a thread for testing purposes,495 # so we override this to set timeout, so that we get496 # a chance to stop the server497 self.socket.settimeout(0.5)498 HTTPServer.server_activate(self)499 500 def serve_forever(self):501 # We want this to run in a thread, so we use a slightly502 # modified version of "forever".503 self.active = True504 while 1:505 try:506 # We need to lock while handling the request.507 # Another thread can close the socket after self.active508 # has been checked and before the request is handled.509 # This causes an exception when using the closed socket.510 with self.active_lock:511 if not self.active:512 break513 self.handle_request()514 except socket.timeout:515 pass516 except KeyboardInterrupt:517 self.server_close()518 return519 except:520 sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))521 break522 time.sleep(0.1)523 524 def server_close(self):525 # Again, we want this to run in a thread, so we need to override526 # close to clear the "active" flag, so that serve_forever() will527 # terminate.528 with self.active_lock:529 HTTPServer.server_close(self)530 self.active = False531 532 760 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): 533 534 761 # need to override translate_path to get a known root, 535 762 # instead of using os.curdir, since the test could be … … 576 803 def __init__(self, certfile): 577 804 self.flag = None 578 self.active = False579 805 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] 580 self.port = test_support.find_unused_port()581 806 self.server = self.HTTPSServer( 582 (HOST, self.port), self.RootedHTTPRequestHandler, certfile) 807 (HOST, 0), self.RootedHTTPRequestHandler, certfile) 808 self.port = self.server.server_port 583 809 threading.Thread.__init__(self) 584 810 self.daemon = True … … 587 813 return "<%s %s>" % (self.__class__.__name__, self.server) 588 814 589 def start 815 def start(self, flag=None): 590 816 self.flag = flag 591 817 threading.Thread.start(self) 592 818 593 def run (self): 594 self.active = True 819 def run(self): 595 820 if self.flag: 596 821 self.flag.set() 597 self.server.serve_forever() 598 self.active = False 599 600 def stop (self): 601 self.active = False 602 self.server.server_close() 603 604 605 def badCertTest (certfile): 822 self.server.serve_forever(0.05) 823 824 def stop(self): 825 self.server.shutdown() 826 827 828 def bad_cert_test(certfile): 829 """ 830 Launch a server with CERT_REQUIRED, and check that trying to 831 connect to it with the given client certificate fails. 832 """ 606 833 server = ThreadedEchoServer(CERTFILE, 607 834 certreqs=ssl.CERT_REQUIRED, 608 835 cacerts=CERTFILE, chatty=False) 609 flag = threading.Event() 610 server.start(flag) 611 # wait for it to start 612 flag.wait() 613 # try to connect 614 try: 836 with server: 615 837 try: 616 838 s = ssl.wrap_socket(socket.socket(), … … 625 847 sys.stdout.write("\nsocket.error is %s\n" % x[1]) 626 848 else: 627 raise test_support.TestFailed( 628 "Use of invalid cert should have failed!") 629 finally: 630 server.stop() 631 server.join() 632 633 def serverParamsTest (certfile, protocol, certreqs, cacertsfile, 634 client_certfile, client_protocol=None, indata="FOO\n", 635 chatty=True, connectionchatty=False, 636 wrap_accepting_socket=False): 637 849 raise AssertionError("Use of invalid cert should have failed!") 850 851 def server_params_test(certfile, protocol, certreqs, cacertsfile, 852 client_certfile, client_protocol=None, indata="FOO\n", 853 ciphers=None, chatty=True, connectionchatty=False, 854 wrap_accepting_socket=False): 855 """ 856 Launch a server, connect a client to it and try various reads 857 and writes. 858 """ 638 859 server = ThreadedEchoServer(certfile, 639 860 certreqs=certreqs, 640 861 ssl_version=protocol, 641 862 cacerts=cacertsfile, 863 ciphers=ciphers, 642 864 chatty=chatty, 643 865 connectionchatty=connectionchatty, 644 866 wrap_accepting_socket=wrap_accepting_socket) 645 flag = threading.Event() 646 server.start(flag) 647 # wait for it to start 648 flag.wait() 649 # try to connect 650 if client_protocol is None: 651 client_protocol = protocol 652 try: 653 try: 654 s = ssl.wrap_socket(socket.socket(), 655 certfile=client_certfile, 656 ca_certs=cacertsfile, 657 cert_reqs=certreqs, 658 ssl_version=client_protocol) 659 s.connect((HOST, server.port)) 660 except ssl.SSLError, x: 661 raise test_support.TestFailed("Unexpected SSL error: " + str(x)) 662 except Exception, x: 663 raise test_support.TestFailed("Unexpected exception: " + str(x)) 664 else: 867 with server: 868 # try to connect 869 if client_protocol is None: 870 client_protocol = protocol 871 s = ssl.wrap_socket(socket.socket(), 872 certfile=client_certfile, 873 ca_certs=cacertsfile, 874 ciphers=ciphers, 875 cert_reqs=certreqs, 876 ssl_version=client_protocol) 877 s.connect((HOST, server.port)) 878 for arg in [indata, bytearray(indata), memoryview(indata)]: 665 879 if connectionchatty: 666 880 if test_support.verbose: 667 881 sys.stdout.write( 668 " client: sending %s...\n" % (repr( indata)))669 s.write( indata)882 " client: sending %s...\n" % (repr(arg))) 883 s.write(arg) 670 884 outdata = s.read() 671 885 if connectionchatty: … … 673 887 sys.stdout.write(" client: read %s\n" % repr(outdata)) 674 888 if outdata != indata.lower(): 675 raise test_support.TestFailed(889 raise AssertionError( 676 890 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" 677 891 % (outdata[:min(len(outdata),20)], len(outdata), 678 892 indata[:min(len(indata),20)].lower(), len(indata))) 679 s.write("over\n") 680 if connectionchatty: 681 if test_support.verbose: 682 sys.stdout.write(" client: closing connection.\n") 683 s.close() 684 finally: 685 server.stop() 686 server.join() 687 688 def tryProtocolCombo (server_protocol, 689 client_protocol, 690 expectedToWork, 691 certsreqs=None): 692 893 s.write("over\n") 894 if connectionchatty: 895 if test_support.verbose: 896 sys.stdout.write(" client: closing connection.\n") 897 s.close() 898 899 def try_protocol_combo(server_protocol, 900 client_protocol, 901 expect_success, 902 certsreqs=None): 693 903 if certsreqs is None: 694 904 certsreqs = ssl.CERT_NONE 695 696 if certsreqs == ssl.CERT_NONE: 697 certtype = "CERT_NONE" 698 elif certsreqs == ssl.CERT_OPTIONAL: 699 certtype = "CERT_OPTIONAL" 700 elif certsreqs == ssl.CERT_REQUIRED: 701 certtype = "CERT_REQUIRED" 905 certtype = { 906 ssl.CERT_NONE: "CERT_NONE", 907 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 908 ssl.CERT_REQUIRED: "CERT_REQUIRED", 909 }[certsreqs] 702 910 if test_support.verbose: 703 formatstr = (expect edToWorkand " %s->%s %s\n") or " {%s->%s} %s\n"911 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 704 912 sys.stdout.write(formatstr % 705 913 (ssl.get_protocol_name(client_protocol), … … 707 915 certtype)) 708 916 try: 709 serverParamsTest(CERTFILE, server_protocol, certsreqs, 710 CERTFILE, CERTFILE, client_protocol, chatty=False) 711 except test_support.TestFailed: 712 if expectedToWork: 917 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client 918 # will send an SSLv3 hello (rather than SSLv2) starting from 919 # OpenSSL 1.0.0 (see issue #8322). 920 server_params_test(CERTFILE, server_protocol, certsreqs, 921 CERTFILE, CERTFILE, client_protocol, 922 ciphers="ALL", chatty=False) 923 # Protocol mismatch can result in either an SSLError, or a 924 # "Connection reset by peer" error. 925 except ssl.SSLError: 926 if expect_success: 927 raise 928 except socket.error as e: 929 if expect_success or e.errno != errno.ECONNRESET: 713 930 raise 714 931 else: 715 if not expect edToWork:716 raise test_support.TestFailed(932 if not expect_success: 933 raise AssertionError( 717 934 "Client protocol %s succeeded with server protocol %s!" 718 935 % (ssl.get_protocol_name(client_protocol), … … 722 939 class ThreadedTests(unittest.TestCase): 723 940 724 def testRudeShutdown(self): 725 941 def test_rude_shutdown(self): 942 """A brutal shutdown of an SSL server should raise an IOError 943 in the client when attempting handshake. 944 """ 726 945 listener_ready = threading.Event() 727 946 listener_gone = threading.Event() 728 port = test_support.find_unused_port() 729 730 # `listener` runs in a thread. It opens a socket listening on 731 # PORT, and sits in an accept() until the main thread connects. 732 # Then it rudely closes the socket, and sets Event `listener_gone` 733 # to let the main thread know the socket is gone. 947 948 s = socket.socket() 949 port = test_support.bind_port(s, HOST) 950 951 # `listener` runs in a thread. It sits in an accept() until 952 # the main thread connects. Then it rudely closes the socket, 953 # and sets Event `listener_gone` to let the main thread know 954 # the socket is gone. 734 955 def listener(): 735 s = socket.socket()736 s.bind((HOST, port))737 956 s.listen(5) 738 957 listener_ready.set() 739 958 s.accept() 740 s = None # reclaim the socket object, which also closes it959 s.close() 741 960 listener_gone.set() 742 961 743 962 def connector(): 744 963 listener_ready.wait() 745 s= socket.socket()746 s.connect((HOST, port))964 c = socket.socket() 965 c.connect((HOST, port)) 747 966 listener_gone.wait() 748 967 try: 749 ssl_sock = ssl.wrap_socket( s)968 ssl_sock = ssl.wrap_socket(c) 750 969 except IOError: 751 970 pass 752 971 else: 753 raise test_support.TestFailed( 754 'connecting to closed SSL socket should have failed') 972 self.fail('connecting to closed SSL socket should have failed') 755 973 756 974 t = threading.Thread(target=listener) 757 975 t.start() 758 connector() 759 t.join() 760 761 def testEcho (self): 762 976 try: 977 connector() 978 finally: 979 t.join() 980 981 @skip_if_broken_ubuntu_ssl 982 def test_echo(self): 983 """Basic test of an SSL client connecting to a server""" 763 984 if test_support.verbose: 764 985 sys.stdout.write("\n") 765 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, 766 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, 767 chatty=True, connectionchatty=True) 768 769 def testReadCert(self): 770 986 server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, 987 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, 988 chatty=True, connectionchatty=True) 989 990 def test_getpeercert(self): 771 991 if test_support.verbose: 772 992 sys.stdout.write("\n") … … 777 997 cacerts=CERTFILE, 778 998 chatty=False) 779 flag = threading.Event() 780 server.start(flag) 781 # wait for it to start 782 flag.wait() 783 # try to connect 784 try: 785 try: 786 s = ssl.wrap_socket(socket.socket(), 787 certfile=CERTFILE, 788 ca_certs=CERTFILE, 789 cert_reqs=ssl.CERT_REQUIRED, 790 ssl_version=ssl.PROTOCOL_SSLv23) 791 s.connect((HOST, server.port)) 792 except ssl.SSLError, x: 793 raise test_support.TestFailed( 794 "Unexpected SSL error: " + str(x)) 795 except Exception, x: 796 raise test_support.TestFailed( 797 "Unexpected exception: " + str(x)) 798 else: 799 if not s: 800 raise test_support.TestFailed( 801 "Can't SSL-handshake with test server") 802 cert = s.getpeercert() 803 if not cert: 804 raise test_support.TestFailed( 805 "Can't get peer certificate.") 806 cipher = s.cipher() 807 if test_support.verbose: 808 sys.stdout.write(pprint.pformat(cert) + '\n') 809 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') 810 if not cert.has_key('subject'): 811 raise test_support.TestFailed( 812 "No subject field in certificate: %s." % 813 pprint.pformat(cert)) 814 if ((('organizationName', 'Python Software Foundation'),) 815 not in cert['subject']): 816 raise test_support.TestFailed( 817 "Missing or invalid 'organizationName' field in certificate subject; " 818 "should be 'Python Software Foundation'.") 819 s.close() 820 finally: 821 server.stop() 822 server.join() 823 824 def testNULLcert(self): 825 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, 826 "nullcert.pem")) 827 def testMalformedCert(self): 828 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, 829 "badcert.pem")) 830 def testWrongCert(self): 831 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, 832 "wrongcert.pem")) 833 def testMalformedKey(self): 834 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, 835 "badkey.pem")) 836 837 def testProtocolSSL2(self): 999 with server: 1000 s = ssl.wrap_socket(socket.socket(), 1001 certfile=CERTFILE, 1002 ca_certs=CERTFILE, 1003 cert_reqs=ssl.CERT_REQUIRED, 1004 ssl_version=ssl.PROTOCOL_SSLv23) 1005 s.connect((HOST, server.port)) 1006 cert = s.getpeercert() 1007 self.assertTrue(cert, "Can't get peer certificate.") 1008 cipher = s.cipher() 1009 if test_support.verbose: 1010 sys.stdout.write(pprint.pformat(cert) + '\n') 1011 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') 1012 if 'subject' not in cert: 1013 self.fail("No subject field in certificate: %s." % 1014 pprint.pformat(cert)) 1015 if ((('organizationName', 'Python Software Foundation'),) 1016 not in cert['subject']): 1017 self.fail( 1018 "Missing or invalid 'organizationName' field in certificate subject; " 1019 "should be 'Python Software Foundation'.") 1020 s.close() 1021 1022 def test_empty_cert(self): 1023 """Connecting with an empty cert file""" 1024 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 1025 "nullcert.pem")) 1026 def test_malformed_cert(self): 1027 """Connecting with a badly formatted certificate (syntax error)""" 1028 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 1029 "badcert.pem")) 1030 def test_nonexisting_cert(self): 1031 """Connecting with a non-existing cert file""" 1032 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 1033 "wrongcert.pem")) 1034 def test_malformed_key(self): 1035 """Connecting with a badly formatted key (syntax error)""" 1036 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 1037 "badkey.pem")) 1038 1039 @skip_if_broken_ubuntu_ssl 1040 def test_protocol_sslv2(self): 1041 """Connecting to an SSLv2 server with various client options""" 838 1042 if test_support.verbose: 839 1043 sys.stdout.write("\n") 840 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 841 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) 842 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) 843 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) 844 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 845 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 846 847 def testProtocolSSL23(self): 1044 if not hasattr(ssl, 'PROTOCOL_SSLv2'): 1045 self.skipTest("PROTOCOL_SSLv2 needed") 1046 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 1047 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) 1048 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) 1049 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) 1050 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 1051 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 1052 1053 @skip_if_broken_ubuntu_ssl 1054 def test_protocol_sslv23(self): 1055 """Connecting to an SSLv23 server with various client options""" 848 1056 if test_support.verbose: 849 1057 sys.stdout.write("\n") 850 try: 851 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 852 except test_support.TestFailed, x: 853 # this fails on some older versions of OpenSSL (0.9.7l, for instance) 854 if test_support.verbose: 855 sys.stdout.write( 856 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" 857 % str(x)) 858 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) 859 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 860 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) 861 862 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) 863 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) 864 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) 865 866 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) 867 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) 868 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) 869 870 def testProtocolSSL3(self): 1058 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) 1059 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 1060 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) 1061 1062 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) 1063 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) 1064 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) 1065 1066 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) 1067 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) 1068 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) 1069 1070 @skip_if_broken_ubuntu_ssl 1071 def test_protocol_sslv3(self): 1072 """Connecting to an SSLv3 server with various client options""" 871 1073 if test_support.verbose: 872 1074 sys.stdout.write("\n") 873 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) 874 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) 875 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) 876 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) 877 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) 878 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) 879 880 def testProtocolTLS1(self): 1075 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) 1076 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) 1077 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) 1078 if hasattr(ssl, 'PROTOCOL_SSLv2'): 1079 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) 1080 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) 1081 1082 @skip_if_broken_ubuntu_ssl 1083 def test_protocol_tlsv1(self): 1084 """Connecting to a TLSv1 server with various client options""" 881 1085 if test_support.verbose: 882 1086 sys.stdout.write("\n") 883 try ProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)884 try ProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)885 try ProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)886 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)887 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)888 try ProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)889 890 def test STARTTLS(self):891 1087 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) 1088 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) 1089 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) 1090 if hasattr(ssl, 'PROTOCOL_SSLv2'): 1091 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) 1092 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 1093 1094 def test_starttls(self): 1095 """Switching from clear text to encrypted and back again.""" 892 1096 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") 893 1097 … … 897 1101 chatty=True, 898 1102 connectionchatty=True) 899 flag = threading.Event()900 server.start(flag)901 # wait for it to start902 flag.wait()903 # try to connect904 1103 wrapped = False 905 try: 906 try: 907 s = socket.socket() 908 s.setblocking(1) 909 s.connect((HOST, server.port)) 910 except Exception, x: 911 raise test_support.TestFailed("Unexpected exception: " + str(x)) 912 else: 1104 with server: 1105 s = socket.socket() 1106 s.setblocking(1) 1107 s.connect((HOST, server.port)) 1108 if test_support.verbose: 1109 sys.stdout.write("\n") 1110 for indata in msgs: 913 1111 if test_support.verbose: 914 sys.stdout.write("\n") 915 for indata in msgs: 1112 sys.stdout.write( 1113 " client: sending %s...\n" % repr(indata)) 1114 if wrapped: 1115 conn.write(indata) 1116 outdata = conn.read() 1117 else: 1118 s.send(indata) 1119 outdata = s.recv(1024) 1120 if (indata == "STARTTLS" and 1121 outdata.strip().lower().startswith("ok")): 1122 # STARTTLS ok, switch to secure mode 916 1123 if test_support.verbose: 917 1124 sys.stdout.write( 918 " client: sending %s...\n" % repr(indata)) 919 if wrapped: 920 conn.write(indata) 921 outdata = conn.read() 922 else: 923 s.send(indata) 924 outdata = s.recv(1024) 925 if (indata == "STARTTLS" and 926 outdata.strip().lower().startswith("ok")): 927 if test_support.verbose: 928 sys.stdout.write( 929 " client: read %s from server, starting TLS...\n" 930 % repr(outdata)) 931 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) 932 wrapped = True 933 elif (indata == "ENDTLS" and 934 outdata.strip().lower().startswith("ok")): 935 if test_support.verbose: 936 sys.stdout.write( 937 " client: read %s from server, ending TLS...\n" 938 % repr(outdata)) 939 s = conn.unwrap() 940 wrapped = False 941 else: 942 if test_support.verbose: 943 sys.stdout.write( 944 " client: read %s from server\n" % repr(outdata)) 945 if test_support.verbose: 946 sys.stdout.write(" client: closing connection.\n") 947 if wrapped: 948 conn.write("over\n") 1125 " client: read %s from server, starting TLS...\n" 1126 % repr(outdata)) 1127 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) 1128 wrapped = True 1129 elif (indata == "ENDTLS" and 1130 outdata.strip().lower().startswith("ok")): 1131 # ENDTLS ok, switch back to clear text 1132 if test_support.verbose: 1133 sys.stdout.write( 1134 " client: read %s from server, ending TLS...\n" 1135 % repr(outdata)) 1136 s = conn.unwrap() 1137 wrapped = False 949 1138 else: 950 s.send("over\n") 951 s.close() 952 finally: 953 server.stop() 954 server.join() 955 956 def testSocketServer(self): 957 1139 if test_support.verbose: 1140 sys.stdout.write( 1141 " client: read %s from server\n" % repr(outdata)) 1142 if test_support.verbose: 1143 sys.stdout.write(" client: closing connection.\n") 1144 if wrapped: 1145 conn.write("over\n") 1146 else: 1147 s.send("over\n") 1148 s.close() 1149 1150 def test_socketserver(self): 1151 """Using a SocketServer to create and manage SSL connections.""" 958 1152 server = SocketServerHTTPSServer(CERTFILE) 959 1153 flag = threading.Event() … … 965 1159 if test_support.verbose: 966 1160 sys.stdout.write('\n') 967 d1 = open(CERTFILE, 'rb').read() 1161 with open(CERTFILE, 'rb') as f: 1162 d1 = f.read() 968 1163 d2 = '' 969 1164 # now fetch the same data from the HTTPS server 970 1165 url = 'https://127.0.0.1:%d/%s' % ( 971 1166 server.port, os.path.split(CERTFILE)[1]) 972 f = urllib.urlopen(url) 1167 with test_support.check_py3k_warnings(): 1168 f = urllib.urlopen(url) 973 1169 dlen = f.info().getheader("content-length") 974 1170 if dlen and (int(dlen) > 0): … … 979 1175 % (len(d2), server)) 980 1176 f.close() 981 except: 982 msg = ''.join(traceback.format_exception(*sys.exc_info())) 983 if test_support.verbose: 984 sys.stdout.write('\n' + msg) 985 raise test_support.TestFailed(msg) 986 else: 987 if not (d1 == d2): 988 raise test_support.TestFailed( 989 "Couldn't fetch data from HTTPS server") 1177 self.assertEqual(d1, d2) 990 1178 finally: 991 1179 server.stop() 992 1180 server.join() 993 1181 994 def test WrappedAccept(self):995 1182 def test_wrapped_accept(self): 1183 """Check the accept() method on SSL sockets.""" 996 1184 if test_support.verbose: 997 1185 sys.stdout.write("\n") 998 serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, 999 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, 1000 chatty=True, connectionchatty=True, 1001 wrap_accepting_socket=True) 1002 1003 1004 def testAsyncoreServer (self): 1005 1186 server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, 1187 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, 1188 chatty=True, connectionchatty=True, 1189 wrap_accepting_socket=True) 1190 1191 def test_asyncore_server(self): 1192 """Check the example asyncore integration.""" 1006 1193 indata = "TEST MESSAGE of mixed case\n" 1007 1194 … … 1009 1196 sys.stdout.write("\n") 1010 1197 server = AsyncoreEchoServer(CERTFILE) 1011 flag = threading.Event() 1012 server.start(flag) 1013 # wait for it to start 1014 flag.wait() 1015 # try to connect 1016 try: 1017 try: 1018 s = ssl.wrap_socket(socket.socket()) 1019 s.connect(('127.0.0.1', server.port)) 1020 except ssl.SSLError, x: 1021 raise test_support.TestFailed("Unexpected SSL error: " + str(x)) 1022 except Exception, x: 1023 raise test_support.TestFailed("Unexpected exception: " + str(x)) 1024 else: 1025 if test_support.verbose: 1026 sys.stdout.write( 1027 " client: sending %s...\n" % (repr(indata))) 1028 s.write(indata) 1029 outdata = s.read() 1030 if test_support.verbose: 1031 sys.stdout.write(" client: read %s\n" % repr(outdata)) 1032 if outdata != indata.lower(): 1033 raise test_support.TestFailed( 1034 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" 1035 % (outdata[:min(len(outdata),20)], len(outdata), 1036 indata[:min(len(indata),20)].lower(), len(indata))) 1037 s.write("over\n") 1038 if test_support.verbose: 1039 sys.stdout.write(" client: closing connection.\n") 1040 s.close() 1041 finally: 1042 server.stop() 1043 # wait for server thread to end 1044 server.join() 1045 1046 1047 def testAllRecvAndSendMethods(self): 1048 1198 with server: 1199 s = ssl.wrap_socket(socket.socket()) 1200 s.connect(('127.0.0.1', server.port)) 1201 if test_support.verbose: 1202 sys.stdout.write( 1203 " client: sending %s...\n" % (repr(indata))) 1204 s.write(indata) 1205 outdata = s.read() 1206 if test_support.verbose: 1207 sys.stdout.write(" client: read %s\n" % repr(outdata)) 1208 if outdata != indata.lower(): 1209 self.fail( 1210 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" 1211 % (outdata[:min(len(outdata),20)], len(outdata), 1212 indata[:min(len(indata),20)].lower(), len(indata))) 1213 s.write("over\n") 1214 if test_support.verbose: 1215 sys.stdout.write(" client: closing connection.\n") 1216 s.close() 1217 1218 def test_recv_send(self): 1219 """Test recv(), send() and friends.""" 1049 1220 if test_support.verbose: 1050 1221 sys.stdout.write("\n") … … 1056 1227 chatty=True, 1057 1228 connectionchatty=False) 1058 flag = threading.Event() 1059 server.start(flag) 1060 # wait for it to start 1061 flag.wait() 1062 # try to connect 1063 try: 1229 with server: 1064 1230 s = ssl.wrap_socket(socket.socket(), 1065 1231 server_side=False, … … 1069 1235 ssl_version=ssl.PROTOCOL_TLSv1) 1070 1236 s.connect((HOST, server.port)) 1071 except ssl.SSLError as x:1072 raise support.TestFailed("Unexpected SSL error: " + str(x))1073 except Exception as x:1074 raise support.TestFailed("Unexpected exception: " + str(x))1075 else:1076 1237 # helper methods for standardising recv* method signatures 1077 1238 def _recv_into(): … … 1106 1267 outdata = outdata.decode('ASCII', 'strict') 1107 1268 if outdata != indata.lower(): 1108 raise support.TestFailed(1269 self.fail( 1109 1270 "While sending with <<%s>> bad data " 1110 1271 "<<%r>> (%d) received; " … … 1116 1277 except ValueError as e: 1117 1278 if expect_success: 1118 raise support.TestFailed(1279 self.fail( 1119 1280 "Failed to send with method <<%s>>; " 1120 1281 "expected to succeed.\n" % (meth_name,) 1121 1282 ) 1122 1283 if not str(e).startswith(meth_name): 1123 raise support.TestFailed(1284 self.fail( 1124 1285 "Method <<%s>> failed with unexpected " 1125 1286 "exception message: %s\n" % ( … … 1135 1296 outdata = outdata.decode('ASCII', 'strict') 1136 1297 if outdata != indata.lower(): 1137 raise support.TestFailed(1298 self.fail( 1138 1299 "While receiving with <<%s>> bad data " 1139 1300 "<<%r>> (%d) received; " … … 1145 1306 except ValueError as e: 1146 1307 if expect_success: 1147 raise support.TestFailed(1308 self.fail( 1148 1309 "Failed to receive with method <<%s>>; " 1149 1310 "expected to succeed.\n" % (meth_name,) 1150 1311 ) 1151 1312 if not str(e).startswith(meth_name): 1152 raise support.TestFailed(1313 self.fail( 1153 1314 "Method <<%s>> failed with unexpected " 1154 1315 "exception message: %s\n" % ( … … 1161 1322 s.write("over\n".encode("ASCII", "strict")) 1162 1323 s.close() 1324 1325 def test_handshake_timeout(self): 1326 # Issue #5103: SSL handshake must respect the socket timeout 1327 server = socket.socket(socket.AF_INET) 1328 host = "127.0.0.1" 1329 port = test_support.bind_port(server) 1330 started = threading.Event() 1331 finish = False 1332 1333 def serve(): 1334 server.listen(5) 1335 started.set() 1336 conns = [] 1337 while not finish: 1338 r, w, e = select.select([server], [], [], 0.1) 1339 if server in r: 1340 # Let the socket hang around rather than having 1341 # it closed by garbage collection. 1342 conns.append(server.accept()[0]) 1343 1344 t = threading.Thread(target=serve) 1345 t.start() 1346 started.wait() 1347 1348 try: 1349 try: 1350 c = socket.socket(socket.AF_INET) 1351 c.settimeout(0.2) 1352 c.connect((host, port)) 1353 # Will attempt handshake and time out 1354 self.assertRaisesRegexp(ssl.SSLError, "timed out", 1355 ssl.wrap_socket, c) 1356 finally: 1357 c.close() 1358 try: 1359 c = socket.socket(socket.AF_INET) 1360 c.settimeout(0.2) 1361 c = ssl.wrap_socket(c) 1362 # Will attempt handshake and time out 1363 self.assertRaisesRegexp(ssl.SSLError, "timed out", 1364 c.connect, (host, port)) 1365 finally: 1366 c.close() 1163 1367 finally: 1164 server.stop() 1165 server.join() 1368 finish = True 1369 t.join() 1370 server.close() 1371 1372 def test_default_ciphers(self): 1373 with ThreadedEchoServer(CERTFILE, 1374 ssl_version=ssl.PROTOCOL_SSLv23, 1375 chatty=False) as server: 1376 sock = socket.socket() 1377 try: 1378 # Force a set of weak ciphers on our client socket 1379 try: 1380 s = ssl.wrap_socket(sock, 1381 ssl_version=ssl.PROTOCOL_SSLv23, 1382 ciphers="DES") 1383 except ssl.SSLError: 1384 self.skipTest("no DES cipher available") 1385 with self.assertRaises((OSError, ssl.SSLError)): 1386 s.connect((HOST, server.port)) 1387 finally: 1388 sock.close() 1389 self.assertIn("no shared cipher", str(server.conn_errors[0])) 1166 1390 1167 1391 1168 1392 def test_main(verbose=False): 1169 if skip_expected: 1170 raise test_support.TestSkipped("No SSL support") 1171 1172 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT 1393 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT, NULLBYTECERT 1173 1394 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 1174 1395 "keycert.pem") … … 1176 1397 os.path.dirname(__file__) or os.curdir, 1177 1398 "https_svn_python_org_root.pem") 1399 NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir, 1400 "nokia.pem") 1401 NULLBYTECERT = os.path.join(os.path.dirname(__file__) or os.curdir, 1402 "nullbytecert.pem") 1178 1403 1179 1404 if (not os.path.exists(CERTFILE) or 1180 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): 1405 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or 1406 not os.path.exists(NOKIACERT) or 1407 not os.path.exists(NULLBYTECERT)): 1181 1408 raise test_support.TestFailed("Can't read certificate files!") 1182 1409 1183 TESTPORT = test_support.find_unused_port() 1184 if not TESTPORT: 1185 raise test_support.TestFailed("Can't find open port to test servers on!") 1186 1187 tests = [BasicTests] 1410 tests = [BasicTests, BasicSocketTests] 1188 1411 1189 1412 if test_support.is_resource_enabled('network'): … … 1195 1418 tests.append(ThreadedTests) 1196 1419 1197 test_support.run_unittest(*tests) 1198 1199 if _have_threads: 1200 test_support.threading_cleanup(*thread_info) 1420 try: 1421 test_support.run_unittest(*tests) 1422 finally: 1423 if _have_threads: 1424 test_support.threading_cleanup(*thread_info) 1201 1425 1202 1426 if __name__ == "__main__":
Note:
See TracChangeset
for help on using the changeset viewer.