source: python/trunk/Lib/test/test_ssl.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 60.9 KB
Line 
1# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import asyncore
7import socket
8import select
9import time
10import gc
11import os
12import errno
13import pprint
14import urllib, urlparse
15import traceback
16import weakref
17import functools
18import platform
19
20from BaseHTTPServer import HTTPServer
21from SimpleHTTPServer import SimpleHTTPRequestHandler
22
23ssl = test_support.import_module("ssl")
24
25HOST = test_support.HOST
26CERTFILE = None
27SVN_PYTHON_ORG_ROOT_CERT = None
28NULLBYTECERT = None
29
30def handle_error(prefix):
31 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
32 if test_support.verbose:
33 sys.stdout.write(prefix + exc_format)
34
35
36class BasicTests(unittest.TestCase):
37
38 def test_sslwrap_simple(self):
39 # A crude test for the legacy API
40 try:
41 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
42 except IOError, e:
43 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
44 pass
45 else:
46 raise
47 try:
48 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
49 except IOError, e:
50 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
51 pass
52 else:
53 raise
54
55# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
56def skip_if_broken_ubuntu_ssl(func):
57 if hasattr(ssl, 'PROTOCOL_SSLv2'):
58 # We need to access the lower-level wrapper in order to create an
59 # implicit SSL context without trying to connect or listen.
60 try:
61 import _ssl
62 except ImportError:
63 # The returned function won't get executed, just ignore the error
64 pass
65 @functools.wraps(func)
66 def f(*args, **kwargs):
67 try:
68 s = socket.socket(socket.AF_INET)
69 _ssl.sslwrap(s._sock, 0, None, None,
70 ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None)
71 except ssl.SSLError as e:
72 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
73 platform.linux_distribution() == ('debian', 'squeeze/sid', '')
74 and 'Invalid SSL protocol variant specified' in str(e)):
75 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
76 return func(*args, **kwargs)
77 return f
78 else:
79 return func
80
81
82class BasicSocketTests(unittest.TestCase):
83
84 def test_constants(self):
85 #ssl.PROTOCOL_SSLv2
86 ssl.PROTOCOL_SSLv23
87 ssl.PROTOCOL_SSLv3
88 ssl.PROTOCOL_TLSv1
89 ssl.CERT_NONE
90 ssl.CERT_OPTIONAL
91 ssl.CERT_REQUIRED
92
93 def test_random(self):
94 v = ssl.RAND_status()
95 if test_support.verbose:
96 sys.stdout.write("\n RAND_status is %d (%s)\n"
97 % (v, (v and "sufficient randomness") or
98 "insufficient randomness"))
99 self.assertRaises(TypeError, ssl.RAND_egd, 1)
100 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
101 ssl.RAND_add("this is a random string", 75.0)
102
103 def test_parse_cert(self):
104 # note that this uses an 'unofficial' function in _ssl.c,
105 # provided solely for this test, to exercise the certificate
106 # parsing code
107 p = ssl._ssl._test_decode_cert(CERTFILE, False)
108 if test_support.verbose:
109 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
110 self.assertEqual(p['subject'],
111 ((('countryName', 'XY'),),
112 (('localityName', 'Castle Anthrax'),),
113 (('organizationName', 'Python Software Foundation'),),
114 (('commonName', 'localhost'),))
115 )
116 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
117 # Issue #13034: the subjectAltName in some certificates
118 # (notably projects.developer.nokia.com:443) wasn't parsed
119 p = ssl._ssl._test_decode_cert(NOKIACERT)
120 if test_support.verbose:
121 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
122 self.assertEqual(p['subjectAltName'],
123 (('DNS', 'projects.developer.nokia.com'),
124 ('DNS', 'projects.forum.nokia.com'))
125 )
126
127 def test_parse_cert_CVE_2013_4238(self):
128 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
129 if test_support.verbose:
130 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
131 subject = ((('countryName', 'US'),),
132 (('stateOrProvinceName', 'Oregon'),),
133 (('localityName', 'Beaverton'),),
134 (('organizationName', 'Python Software Foundation'),),
135 (('organizationalUnitName', 'Python Core Development'),),
136 (('commonName', 'null.python.org\x00example.org'),),
137 (('emailAddress', 'python-dev@python.org'),))
138 self.assertEqual(p['subject'], subject)
139 self.assertEqual(p['issuer'], subject)
140 if ssl.OPENSSL_VERSION_INFO >= (0, 9, 8):
141 san = (('DNS', 'altnull.python.org\x00example.com'),
142 ('email', 'null@python.org\x00user@example.org'),
143 ('URI', 'http://null.python.org\x00http://example.org'),
144 ('IP Address', '192.0.2.1'),
145 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
146 else:
147 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
148 san = (('DNS', 'altnull.python.org\x00example.com'),
149 ('email', 'null@python.org\x00user@example.org'),
150 ('URI', 'http://null.python.org\x00http://example.org'),
151 ('IP Address', '192.0.2.1'),
152 ('IP Address', '<invalid>'))
153
154 self.assertEqual(p['subjectAltName'], san)
155
156 def test_DER_to_PEM(self):
157 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
158 pem = f.read()
159 d1 = ssl.PEM_cert_to_DER_cert(pem)
160 p2 = ssl.DER_cert_to_PEM_cert(d1)
161 d2 = ssl.PEM_cert_to_DER_cert(p2)
162 self.assertEqual(d1, d2)
163 if not p2.startswith(ssl.PEM_HEADER + '\n'):
164 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
165 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
166 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
167
168 def test_openssl_version(self):
169 n = ssl.OPENSSL_VERSION_NUMBER
170 t = ssl.OPENSSL_VERSION_INFO
171 s = ssl.OPENSSL_VERSION
172 self.assertIsInstance(n, (int, long))
173 self.assertIsInstance(t, tuple)
174 self.assertIsInstance(s, str)
175 # Some sanity checks follow
176 # >= 0.9
177 self.assertGreaterEqual(n, 0x900000)
178 # < 2.0
179 self.assertLess(n, 0x20000000)
180 major, minor, fix, patch, status = t
181 self.assertGreaterEqual(major, 0)
182 self.assertLess(major, 2)
183 self.assertGreaterEqual(minor, 0)
184 self.assertLess(minor, 256)
185 self.assertGreaterEqual(fix, 0)
186 self.assertLess(fix, 256)
187 self.assertGreaterEqual(patch, 0)
188 self.assertLessEqual(patch, 26)
189 self.assertGreaterEqual(status, 0)
190 self.assertLessEqual(status, 15)
191 # Version string as returned by OpenSSL, the format might change
192 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
193 (s, t))
194
195 def test_ciphers(self):
196 if not test_support.is_resource_enabled('network'):
197 return
198 remote = ("svn.python.org", 443)
199 with test_support.transient_internet(remote[0]):
200 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
201 cert_reqs=ssl.CERT_NONE, ciphers="ALL")
202 s.connect(remote)
203 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
204 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
205 s.connect(remote)
206 # Error checking occurs when connecting, because the SSL context
207 # isn't created before.
208 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
209 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
210 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
211 s.connect(remote)
212
213 @test_support.cpython_only
214 def test_refcycle(self):
215 # Issue #7943: an SSL object doesn't create reference cycles with
216 # itself.
217 s = socket.socket(socket.AF_INET)
218 ss = ssl.wrap_socket(s)
219 wr = weakref.ref(ss)
220 del ss
221 self.assertEqual(wr(), None)
222
223 def test_wrapped_unconnected(self):
224 # The _delegate_methods in socket.py are correctly delegated to by an
225 # unconnected SSLSocket, so they will raise a socket.error rather than
226 # something unexpected like TypeError.
227 s = socket.socket(socket.AF_INET)
228 ss = ssl.wrap_socket(s)
229 self.assertRaises(socket.error, ss.recv, 1)
230 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
231 self.assertRaises(socket.error, ss.recvfrom, 1)
232 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
233 self.assertRaises(socket.error, ss.send, b'x')
234 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
235
236
237class NetworkedTests(unittest.TestCase):
238
239 def test_connect(self):
240 with test_support.transient_internet("svn.python.org"):
241 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
242 cert_reqs=ssl.CERT_NONE)
243 s.connect(("svn.python.org", 443))
244 c = s.getpeercert()
245 if c:
246 self.fail("Peer cert %s shouldn't be here!")
247 s.close()
248
249 # this should fail because we have no verification certs
250 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
251 cert_reqs=ssl.CERT_REQUIRED)
252 try:
253 s.connect(("svn.python.org", 443))
254 except ssl.SSLError:
255 pass
256 finally:
257 s.close()
258
259 # this should succeed because we specify the root cert
260 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
261 cert_reqs=ssl.CERT_REQUIRED,
262 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
263 try:
264 s.connect(("svn.python.org", 443))
265 finally:
266 s.close()
267
268 def test_connect_ex(self):
269 # Issue #11326: check connect_ex() implementation
270 with test_support.transient_internet("svn.python.org"):
271 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
272 cert_reqs=ssl.CERT_REQUIRED,
273 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
274 try:
275 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
276 self.assertTrue(s.getpeercert())
277 finally:
278 s.close()
279
280 def test_non_blocking_connect_ex(self):
281 # Issue #11326: non-blocking connect_ex() should allow handshake
282 # to proceed after the socket gets ready.
283 with test_support.transient_internet("svn.python.org"):
284 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
285 cert_reqs=ssl.CERT_REQUIRED,
286 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
287 do_handshake_on_connect=False)
288 try:
289 s.setblocking(False)
290 rc = s.connect_ex(('svn.python.org', 443))
291 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
292 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
293 # Wait for connect to finish
294 select.select([], [s], [], 5.0)
295 # Non-blocking handshake
296 while True:
297 try:
298 s.do_handshake()
299 break
300 except ssl.SSLError as err:
301 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
302 select.select([s], [], [], 5.0)
303 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
304 select.select([], [s], [], 5.0)
305 else:
306 raise
307 # SSL established
308 self.assertTrue(s.getpeercert())
309 finally:
310 s.close()
311
312 def test_timeout_connect_ex(self):
313 # Issue #12065: on a timeout, connect_ex() should return the original
314 # errno (mimicking the behaviour of non-SSL sockets).
315 with test_support.transient_internet("svn.python.org"):
316 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
317 cert_reqs=ssl.CERT_REQUIRED,
318 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
319 do_handshake_on_connect=False)
320 try:
321 s.settimeout(0.0000001)
322 rc = s.connect_ex(('svn.python.org', 443))
323 if rc == 0:
324 self.skipTest("svn.python.org responded too quickly")
325 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
326 finally:
327 s.close()
328
329 def test_connect_ex_error(self):
330 with test_support.transient_internet("svn.python.org"):
331 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
332 cert_reqs=ssl.CERT_REQUIRED,
333 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
334 try:
335 self.assertEqual(errno.ECONNREFUSED,
336 s.connect_ex(("svn.python.org", 444)))
337 finally:
338 s.close()
339
340 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
341 def test_makefile_close(self):
342 # Issue #5238: creating a file-like object with makefile() shouldn't
343 # delay closing the underlying "real socket" (here tested with its
344 # file descriptor, hence skipping the test under Windows).
345 with test_support.transient_internet("svn.python.org"):
346 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
347 ss.connect(("svn.python.org", 443))
348 fd = ss.fileno()
349 f = ss.makefile()
350 f.close()
351 # The fd is still open
352 os.read(fd, 0)
353 # Closing the SSL socket should close the fd too
354 ss.close()
355 gc.collect()
356 with self.assertRaises(OSError) as e:
357 os.read(fd, 0)
358 self.assertEqual(e.exception.errno, errno.EBADF)
359
360 def test_non_blocking_handshake(self):
361 with test_support.transient_internet("svn.python.org"):
362 s = socket.socket(socket.AF_INET)
363 s.connect(("svn.python.org", 443))
364 s.setblocking(False)
365 s = ssl.wrap_socket(s,
366 cert_reqs=ssl.CERT_NONE,
367 do_handshake_on_connect=False)
368 count = 0
369 while True:
370 try:
371 count += 1
372 s.do_handshake()
373 break
374 except ssl.SSLError, err:
375 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
376 select.select([s], [], [])
377 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
378 select.select([], [s], [])
379 else:
380 raise
381 s.close()
382 if test_support.verbose:
383 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
384
385 def test_get_server_certificate(self):
386 with test_support.transient_internet("svn.python.org"):
387 pem = ssl.get_server_certificate(("svn.python.org", 443))
388 if not pem:
389 self.fail("No server certificate on svn.python.org:443!")
390
391 try:
392 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
393 except ssl.SSLError:
394 #should fail
395 pass
396 else:
397 self.fail("Got server certificate %s for svn.python.org!" % pem)
398
399 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
400 if not pem:
401 self.fail("No server certificate on svn.python.org:443!")
402 if test_support.verbose:
403 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
404
405 def test_algorithms(self):
406 # Issue #8484: all algorithms should be available when verifying a
407 # certificate.
408 # SHA256 was added in OpenSSL 0.9.8
409 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
410 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
411 self.skipTest("remote host needs SNI, only available on Python 3.2+")
412 # NOTE: https://sha2.hboeck.de is another possible test host
413 remote = ("sha256.tbs-internet.com", 443)
414 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
415 with test_support.transient_internet("sha256.tbs-internet.com"):
416 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
417 cert_reqs=ssl.CERT_REQUIRED,
418 ca_certs=sha256_cert,)
419 try:
420 s.connect(remote)
421 if test_support.verbose:
422 sys.stdout.write("\nCipher with %r is %r\n" %
423 (remote, s.cipher()))
424 sys.stdout.write("Certificate is:\n%s\n" %
425 pprint.pformat(s.getpeercert()))
426 finally:
427 s.close()
428
429
430try:
431 import threading
432except ImportError:
433 _have_threads = False
434else:
435 _have_threads = True
436
437 class ThreadedEchoServer(threading.Thread):
438
439 class ConnectionHandler(threading.Thread):
440
441 """A mildly complicated class, because we want it to work both
442 with and without the SSL wrapper around the socket connection, so
443 that we can test the STARTTLS functionality."""
444
445 def __init__(self, server, connsock):
446 self.server = server
447 self.running = False
448 self.sock = connsock
449 self.sock.setblocking(1)
450 self.sslconn = None
451 threading.Thread.__init__(self)
452 self.daemon = True
453
454 def show_conn_details(self):
455 if self.server.certreqs == ssl.CERT_REQUIRED:
456 cert = self.sslconn.getpeercert()
457 if test_support.verbose and self.server.chatty:
458 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
459 cert_binary = self.sslconn.getpeercert(True)
460 if test_support.verbose and self.server.chatty:
461 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
462 cipher = self.sslconn.cipher()
463 if test_support.verbose and self.server.chatty:
464 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
465
466 def wrap_conn(self):
467 try:
468 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
469 certfile=self.server.certificate,
470 ssl_version=self.server.protocol,
471 ca_certs=self.server.cacerts,
472 cert_reqs=self.server.certreqs,
473 ciphers=self.server.ciphers)
474 except ssl.SSLError as e:
475 # XXX Various errors can have happened here, for example
476 # a mismatching protocol version, an invalid certificate,
477 # or a low-level bug. This should be made more discriminating.
478 self.server.conn_errors.append(e)
479 if self.server.chatty:
480 handle_error("\n server: bad connection attempt from " +
481 str(self.sock.getpeername()) + ":\n")
482 self.close()
483 self.running = False
484 self.server.stop()
485 return False
486 else:
487 return True
488
489 def read(self):
490 if self.sslconn:
491 return self.sslconn.read()
492 else:
493 return self.sock.recv(1024)
494
495 def write(self, bytes):
496 if self.sslconn:
497 return self.sslconn.write(bytes)
498 else:
499 return self.sock.send(bytes)
500
501 def close(self):
502 if self.sslconn:
503 self.sslconn.close()
504 else:
505 self.sock._sock.close()
506
507 def run(self):
508 self.running = True
509 if not self.server.starttls_server:
510 if isinstance(self.sock, ssl.SSLSocket):
511 self.sslconn = self.sock
512 elif not self.wrap_conn():
513 return
514 self.show_conn_details()
515 while self.running:
516 try:
517 msg = self.read()
518 if not msg:
519 # eof, so quit this handler
520 self.running = False
521 self.close()
522 elif msg.strip() == 'over':
523 if test_support.verbose and self.server.connectionchatty:
524 sys.stdout.write(" server: client closed connection\n")
525 self.close()
526 return
527 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
528 if test_support.verbose and self.server.connectionchatty:
529 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
530 self.write("OK\n")
531 if not self.wrap_conn():
532 return
533 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
534 if test_support.verbose and self.server.connectionchatty:
535 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
536 self.write("OK\n")
537 self.sslconn.unwrap()
538 self.sslconn = None
539 if test_support.verbose and self.server.connectionchatty:
540 sys.stdout.write(" server: connection is now unencrypted...\n")
541 else:
542 if (test_support.verbose and
543 self.server.connectionchatty):
544 ctype = (self.sslconn and "encrypted") or "unencrypted"
545 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
546 % (repr(msg), ctype, repr(msg.lower()), ctype))
547 self.write(msg.lower())
548 except ssl.SSLError:
549 if self.server.chatty:
550 handle_error("Test server failure:\n")
551 self.close()
552 self.running = False
553 # normally, we'd just stop here, but for the test
554 # harness, we want to stop the server
555 self.server.stop()
556
557 def __init__(self, certificate, ssl_version=None,
558 certreqs=None, cacerts=None,
559 chatty=True, connectionchatty=False, starttls_server=False,
560 wrap_accepting_socket=False, ciphers=None):
561
562 if ssl_version is None:
563 ssl_version = ssl.PROTOCOL_TLSv1
564 if certreqs is None:
565 certreqs = ssl.CERT_NONE
566 self.certificate = certificate
567 self.protocol = ssl_version
568 self.certreqs = certreqs
569 self.cacerts = cacerts
570 self.ciphers = ciphers
571 self.chatty = chatty
572 self.connectionchatty = connectionchatty
573 self.starttls_server = starttls_server
574 self.sock = socket.socket()
575 self.flag = None
576 if wrap_accepting_socket:
577 self.sock = ssl.wrap_socket(self.sock, server_side=True,
578 certfile=self.certificate,
579 cert_reqs = self.certreqs,
580 ca_certs = self.cacerts,
581 ssl_version = self.protocol,
582 ciphers = self.ciphers)
583 if test_support.verbose and self.chatty:
584 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
585 self.port = test_support.bind_port(self.sock)
586 self.active = False
587 self.conn_errors = []
588 threading.Thread.__init__(self)
589 self.daemon = True
590
591 def __enter__(self):
592 self.start(threading.Event())
593 self.flag.wait()
594 return self
595
596 def __exit__(self, *args):
597 self.stop()
598 self.join()
599
600 def start(self, flag=None):
601 self.flag = flag
602 threading.Thread.start(self)
603
604 def run(self):
605 self.sock.settimeout(0.05)
606 self.sock.listen(5)
607 self.active = True
608 if self.flag:
609 # signal an event
610 self.flag.set()
611 while self.active:
612 try:
613 newconn, connaddr = self.sock.accept()
614 if test_support.verbose and self.chatty:
615 sys.stdout.write(' server: new connection from '
616 + str(connaddr) + '\n')
617 handler = self.ConnectionHandler(self, newconn)
618 handler.start()
619 handler.join()
620 except socket.timeout:
621 pass
622 except KeyboardInterrupt:
623 self.stop()
624 self.sock.close()
625
626 def stop(self):
627 self.active = False
628
629 class AsyncoreEchoServer(threading.Thread):
630
631 class EchoServer(asyncore.dispatcher):
632
633 class ConnectionHandler(asyncore.dispatcher_with_send):
634
635 def __init__(self, conn, certfile):
636 asyncore.dispatcher_with_send.__init__(self, conn)
637 self.socket = ssl.wrap_socket(conn, server_side=True,
638 certfile=certfile,
639 do_handshake_on_connect=False)
640 self._ssl_accepting = True
641
642 def readable(self):
643 if isinstance(self.socket, ssl.SSLSocket):
644 while self.socket.pending() > 0:
645 self.handle_read_event()
646 return True
647
648 def _do_ssl_handshake(self):
649 try:
650 self.socket.do_handshake()
651 except ssl.SSLError, err:
652 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
653 ssl.SSL_ERROR_WANT_WRITE):
654 return
655 elif err.args[0] == ssl.SSL_ERROR_EOF:
656 return self.handle_close()
657 raise
658 except socket.error, err:
659 if err.args[0] == errno.ECONNABORTED:
660 return self.handle_close()
661 else:
662 self._ssl_accepting = False
663
664 def handle_read(self):
665 if self._ssl_accepting:
666 self._do_ssl_handshake()
667 else:
668 data = self.recv(1024)
669 if data and data.strip() != 'over':
670 self.send(data.lower())
671
672 def handle_close(self):
673 self.close()
674 if test_support.verbose:
675 sys.stdout.write(" server: closed connection %s\n" % self.socket)
676
677 def handle_error(self):
678 raise
679
680 def __init__(self, certfile):
681 self.certfile = certfile
682 asyncore.dispatcher.__init__(self)
683 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
684 self.port = test_support.bind_port(self.socket)
685 self.listen(5)
686
687 def handle_accept(self):
688 sock_obj, addr = self.accept()
689 if test_support.verbose:
690 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
691 self.ConnectionHandler(sock_obj, self.certfile)
692
693 def handle_error(self):
694 raise
695
696 def __init__(self, certfile):
697 self.flag = None
698 self.active = False
699 self.server = self.EchoServer(certfile)
700 self.port = self.server.port
701 threading.Thread.__init__(self)
702 self.daemon = True
703
704 def __str__(self):
705 return "<%s %s>" % (self.__class__.__name__, self.server)
706
707 def __enter__(self):
708 self.start(threading.Event())
709 self.flag.wait()
710 return self
711
712 def __exit__(self, *args):
713 if test_support.verbose:
714 sys.stdout.write(" cleanup: stopping server.\n")
715 self.stop()
716 if test_support.verbose:
717 sys.stdout.write(" cleanup: joining server thread.\n")
718 self.join()
719 if test_support.verbose:
720 sys.stdout.write(" cleanup: successfully joined.\n")
721
722 def start(self, flag=None):
723 self.flag = flag
724 threading.Thread.start(self)
725
726 def run(self):
727 self.active = True
728 if self.flag:
729 self.flag.set()
730 while self.active:
731 asyncore.loop(0.05)
732
733 def stop(self):
734 self.active = False
735 self.server.close()
736
737 class SocketServerHTTPSServer(threading.Thread):
738
739 class HTTPSServer(HTTPServer):
740
741 def __init__(self, server_address, RequestHandlerClass, certfile):
742 HTTPServer.__init__(self, server_address, RequestHandlerClass)
743 # we assume the certfile contains both private key and certificate
744 self.certfile = certfile
745 self.allow_reuse_address = True
746
747 def __str__(self):
748 return ('<%s %s:%s>' %
749 (self.__class__.__name__,
750 self.server_name,
751 self.server_port))
752
753 def get_request(self):
754 # override this to wrap socket with SSL
755 sock, addr = self.socket.accept()
756 sslconn = ssl.wrap_socket(sock, server_side=True,
757 certfile=self.certfile)
758 return sslconn, addr
759
760 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
761 # need to override translate_path to get a known root,
762 # instead of using os.curdir, since the test could be
763 # run from anywhere
764
765 server_version = "TestHTTPS/1.0"
766
767 root = None
768
769 def translate_path(self, path):
770 """Translate a /-separated PATH to the local filename syntax.
771
772 Components that mean special things to the local file system
773 (e.g. drive or directory names) are ignored. (XXX They should
774 probably be diagnosed.)
775
776 """
777 # abandon query parameters
778 path = urlparse.urlparse(path)[2]
779 path = os.path.normpath(urllib.unquote(path))
780 words = path.split('/')
781 words = filter(None, words)
782 path = self.root
783 for word in words:
784 drive, word = os.path.splitdrive(word)
785 head, word = os.path.split(word)
786 if word in self.root: continue
787 path = os.path.join(path, word)
788 return path
789
790 def log_message(self, format, *args):
791
792 # we override this to suppress logging unless "verbose"
793
794 if test_support.verbose:
795 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
796 (self.server.server_address,
797 self.server.server_port,
798 self.request.cipher(),
799 self.log_date_time_string(),
800 format%args))
801
802
803 def __init__(self, certfile):
804 self.flag = None
805 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
806 self.server = self.HTTPSServer(
807 (HOST, 0), self.RootedHTTPRequestHandler, certfile)
808 self.port = self.server.server_port
809 threading.Thread.__init__(self)
810 self.daemon = True
811
812 def __str__(self):
813 return "<%s %s>" % (self.__class__.__name__, self.server)
814
815 def start(self, flag=None):
816 self.flag = flag
817 threading.Thread.start(self)
818
819 def run(self):
820 if self.flag:
821 self.flag.set()
822 self.server.serve_forever(0.05)
823
824 def stop(self):
825 self.server.shutdown()
826
827
828 def bad_cert_test(certfile):
829 """
830 Launch a server with CERT_REQUIRED, and check that trying to
831 connect to it with the given client certificate fails.
832 """
833 server = ThreadedEchoServer(CERTFILE,
834 certreqs=ssl.CERT_REQUIRED,
835 cacerts=CERTFILE, chatty=False)
836 with server:
837 try:
838 s = ssl.wrap_socket(socket.socket(),
839 certfile=certfile,
840 ssl_version=ssl.PROTOCOL_TLSv1)
841 s.connect((HOST, server.port))
842 except ssl.SSLError, x:
843 if test_support.verbose:
844 sys.stdout.write("\nSSLError is %s\n" % x[1])
845 except socket.error, x:
846 if test_support.verbose:
847 sys.stdout.write("\nsocket.error is %s\n" % x[1])
848 else:
849 raise AssertionError("Use of invalid cert should have failed!")
850
851 def server_params_test(certfile, protocol, certreqs, cacertsfile,
852 client_certfile, client_protocol=None, indata="FOO\n",
853 ciphers=None, chatty=True, connectionchatty=False,
854 wrap_accepting_socket=False):
855 """
856 Launch a server, connect a client to it and try various reads
857 and writes.
858 """
859 server = ThreadedEchoServer(certfile,
860 certreqs=certreqs,
861 ssl_version=protocol,
862 cacerts=cacertsfile,
863 ciphers=ciphers,
864 chatty=chatty,
865 connectionchatty=connectionchatty,
866 wrap_accepting_socket=wrap_accepting_socket)
867 with server:
868 # try to connect
869 if client_protocol is None:
870 client_protocol = protocol
871 s = ssl.wrap_socket(socket.socket(),
872 certfile=client_certfile,
873 ca_certs=cacertsfile,
874 ciphers=ciphers,
875 cert_reqs=certreqs,
876 ssl_version=client_protocol)
877 s.connect((HOST, server.port))
878 for arg in [indata, bytearray(indata), memoryview(indata)]:
879 if connectionchatty:
880 if test_support.verbose:
881 sys.stdout.write(
882 " client: sending %s...\n" % (repr(arg)))
883 s.write(arg)
884 outdata = s.read()
885 if connectionchatty:
886 if test_support.verbose:
887 sys.stdout.write(" client: read %s\n" % repr(outdata))
888 if outdata != indata.lower():
889 raise AssertionError(
890 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
891 % (outdata[:min(len(outdata),20)], len(outdata),
892 indata[:min(len(indata),20)].lower(), len(indata)))
893 s.write("over\n")
894 if connectionchatty:
895 if test_support.verbose:
896 sys.stdout.write(" client: closing connection.\n")
897 s.close()
898
899 def try_protocol_combo(server_protocol,
900 client_protocol,
901 expect_success,
902 certsreqs=None):
903 if certsreqs is None:
904 certsreqs = ssl.CERT_NONE
905 certtype = {
906 ssl.CERT_NONE: "CERT_NONE",
907 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
908 ssl.CERT_REQUIRED: "CERT_REQUIRED",
909 }[certsreqs]
910 if test_support.verbose:
911 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
912 sys.stdout.write(formatstr %
913 (ssl.get_protocol_name(client_protocol),
914 ssl.get_protocol_name(server_protocol),
915 certtype))
916 try:
917 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
918 # will send an SSLv3 hello (rather than SSLv2) starting from
919 # OpenSSL 1.0.0 (see issue #8322).
920 server_params_test(CERTFILE, server_protocol, certsreqs,
921 CERTFILE, CERTFILE, client_protocol,
922 ciphers="ALL", chatty=False)
923 # Protocol mismatch can result in either an SSLError, or a
924 # "Connection reset by peer" error.
925 except ssl.SSLError:
926 if expect_success:
927 raise
928 except socket.error as e:
929 if expect_success or e.errno != errno.ECONNRESET:
930 raise
931 else:
932 if not expect_success:
933 raise AssertionError(
934 "Client protocol %s succeeded with server protocol %s!"
935 % (ssl.get_protocol_name(client_protocol),
936 ssl.get_protocol_name(server_protocol)))
937
938
939 class ThreadedTests(unittest.TestCase):
940
941 def test_rude_shutdown(self):
942 """A brutal shutdown of an SSL server should raise an IOError
943 in the client when attempting handshake.
944 """
945 listener_ready = threading.Event()
946 listener_gone = threading.Event()
947
948 s = socket.socket()
949 port = test_support.bind_port(s, HOST)
950
951 # `listener` runs in a thread. It sits in an accept() until
952 # the main thread connects. Then it rudely closes the socket,
953 # and sets Event `listener_gone` to let the main thread know
954 # the socket is gone.
955 def listener():
956 s.listen(5)
957 listener_ready.set()
958 s.accept()
959 s.close()
960 listener_gone.set()
961
962 def connector():
963 listener_ready.wait()
964 c = socket.socket()
965 c.connect((HOST, port))
966 listener_gone.wait()
967 try:
968 ssl_sock = ssl.wrap_socket(c)
969 except IOError:
970 pass
971 else:
972 self.fail('connecting to closed SSL socket should have failed')
973
974 t = threading.Thread(target=listener)
975 t.start()
976 try:
977 connector()
978 finally:
979 t.join()
980
981 @skip_if_broken_ubuntu_ssl
982 def test_echo(self):
983 """Basic test of an SSL client connecting to a server"""
984 if test_support.verbose:
985 sys.stdout.write("\n")
986 server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
987 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
988 chatty=True, connectionchatty=True)
989
990 def test_getpeercert(self):
991 if test_support.verbose:
992 sys.stdout.write("\n")
993 s2 = socket.socket()
994 server = ThreadedEchoServer(CERTFILE,
995 certreqs=ssl.CERT_NONE,
996 ssl_version=ssl.PROTOCOL_SSLv23,
997 cacerts=CERTFILE,
998 chatty=False)
999 with server:
1000 s = ssl.wrap_socket(socket.socket(),
1001 certfile=CERTFILE,
1002 ca_certs=CERTFILE,
1003 cert_reqs=ssl.CERT_REQUIRED,
1004 ssl_version=ssl.PROTOCOL_SSLv23)
1005 s.connect((HOST, server.port))
1006 cert = s.getpeercert()
1007 self.assertTrue(cert, "Can't get peer certificate.")
1008 cipher = s.cipher()
1009 if test_support.verbose:
1010 sys.stdout.write(pprint.pformat(cert) + '\n')
1011 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
1012 if 'subject' not in cert:
1013 self.fail("No subject field in certificate: %s." %
1014 pprint.pformat(cert))
1015 if ((('organizationName', 'Python Software Foundation'),)
1016 not in cert['subject']):
1017 self.fail(
1018 "Missing or invalid 'organizationName' field in certificate subject; "
1019 "should be 'Python Software Foundation'.")
1020 s.close()
1021
1022 def test_empty_cert(self):
1023 """Connecting with an empty cert file"""
1024 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1025 "nullcert.pem"))
1026 def test_malformed_cert(self):
1027 """Connecting with a badly formatted certificate (syntax error)"""
1028 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1029 "badcert.pem"))
1030 def test_nonexisting_cert(self):
1031 """Connecting with a non-existing cert file"""
1032 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1033 "wrongcert.pem"))
1034 def test_malformed_key(self):
1035 """Connecting with a badly formatted key (syntax error)"""
1036 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1037 "badkey.pem"))
1038
1039 @skip_if_broken_ubuntu_ssl
1040 def test_protocol_sslv2(self):
1041 """Connecting to an SSLv2 server with various client options"""
1042 if test_support.verbose:
1043 sys.stdout.write("\n")
1044 if not hasattr(ssl, 'PROTOCOL_SSLv2'):
1045 self.skipTest("PROTOCOL_SSLv2 needed")
1046 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
1047 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
1048 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
1049 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
1050 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
1051 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
1052
1053 @skip_if_broken_ubuntu_ssl
1054 def test_protocol_sslv23(self):
1055 """Connecting to an SSLv23 server with various client options"""
1056 if test_support.verbose:
1057 sys.stdout.write("\n")
1058 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
1059 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
1060 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
1061
1062 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
1063 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
1064 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
1065
1066 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1067 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
1068 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1069
1070 @skip_if_broken_ubuntu_ssl
1071 def test_protocol_sslv3(self):
1072 """Connecting to an SSLv3 server with various client options"""
1073 if test_support.verbose:
1074 sys.stdout.write("\n")
1075 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
1076 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
1077 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1078 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1079 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
1080 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
1081
1082 @skip_if_broken_ubuntu_ssl
1083 def test_protocol_tlsv1(self):
1084 """Connecting to a TLSv1 server with various client options"""
1085 if test_support.verbose:
1086 sys.stdout.write("\n")
1087 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
1088 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
1089 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1090 if hasattr(ssl, 'PROTOCOL_SSLv2'):
1091 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
1092 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1093
1094 def test_starttls(self):
1095 """Switching from clear text to encrypted and back again."""
1096 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
1097
1098 server = ThreadedEchoServer(CERTFILE,
1099 ssl_version=ssl.PROTOCOL_TLSv1,
1100 starttls_server=True,
1101 chatty=True,
1102 connectionchatty=True)
1103 wrapped = False
1104 with server:
1105 s = socket.socket()
1106 s.setblocking(1)
1107 s.connect((HOST, server.port))
1108 if test_support.verbose:
1109 sys.stdout.write("\n")
1110 for indata in msgs:
1111 if test_support.verbose:
1112 sys.stdout.write(
1113 " client: sending %s...\n" % repr(indata))
1114 if wrapped:
1115 conn.write(indata)
1116 outdata = conn.read()
1117 else:
1118 s.send(indata)
1119 outdata = s.recv(1024)
1120 if (indata == "STARTTLS" and
1121 outdata.strip().lower().startswith("ok")):
1122 # STARTTLS ok, switch to secure mode
1123 if test_support.verbose:
1124 sys.stdout.write(
1125 " client: read %s from server, starting TLS...\n"
1126 % repr(outdata))
1127 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
1128 wrapped = True
1129 elif (indata == "ENDTLS" and
1130 outdata.strip().lower().startswith("ok")):
1131 # ENDTLS ok, switch back to clear text
1132 if test_support.verbose:
1133 sys.stdout.write(
1134 " client: read %s from server, ending TLS...\n"
1135 % repr(outdata))
1136 s = conn.unwrap()
1137 wrapped = False
1138 else:
1139 if test_support.verbose:
1140 sys.stdout.write(
1141 " client: read %s from server\n" % repr(outdata))
1142 if test_support.verbose:
1143 sys.stdout.write(" client: closing connection.\n")
1144 if wrapped:
1145 conn.write("over\n")
1146 else:
1147 s.send("over\n")
1148 s.close()
1149
1150 def test_socketserver(self):
1151 """Using a SocketServer to create and manage SSL connections."""
1152 server = SocketServerHTTPSServer(CERTFILE)
1153 flag = threading.Event()
1154 server.start(flag)
1155 # wait for it to start
1156 flag.wait()
1157 # try to connect
1158 try:
1159 if test_support.verbose:
1160 sys.stdout.write('\n')
1161 with open(CERTFILE, 'rb') as f:
1162 d1 = f.read()
1163 d2 = ''
1164 # now fetch the same data from the HTTPS server
1165 url = 'https://127.0.0.1:%d/%s' % (
1166 server.port, os.path.split(CERTFILE)[1])
1167 with test_support.check_py3k_warnings():
1168 f = urllib.urlopen(url)
1169 dlen = f.info().getheader("content-length")
1170 if dlen and (int(dlen) > 0):
1171 d2 = f.read(int(dlen))
1172 if test_support.verbose:
1173 sys.stdout.write(
1174 " client: read %d bytes from remote server '%s'\n"
1175 % (len(d2), server))
1176 f.close()
1177 self.assertEqual(d1, d2)
1178 finally:
1179 server.stop()
1180 server.join()
1181
1182 def test_wrapped_accept(self):
1183 """Check the accept() method on SSL sockets."""
1184 if test_support.verbose:
1185 sys.stdout.write("\n")
1186 server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
1187 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1188 chatty=True, connectionchatty=True,
1189 wrap_accepting_socket=True)
1190
1191 def test_asyncore_server(self):
1192 """Check the example asyncore integration."""
1193 indata = "TEST MESSAGE of mixed case\n"
1194
1195 if test_support.verbose:
1196 sys.stdout.write("\n")
1197 server = AsyncoreEchoServer(CERTFILE)
1198 with server:
1199 s = ssl.wrap_socket(socket.socket())
1200 s.connect(('127.0.0.1', server.port))
1201 if test_support.verbose:
1202 sys.stdout.write(
1203 " client: sending %s...\n" % (repr(indata)))
1204 s.write(indata)
1205 outdata = s.read()
1206 if test_support.verbose:
1207 sys.stdout.write(" client: read %s\n" % repr(outdata))
1208 if outdata != indata.lower():
1209 self.fail(
1210 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1211 % (outdata[:min(len(outdata),20)], len(outdata),
1212 indata[:min(len(indata),20)].lower(), len(indata)))
1213 s.write("over\n")
1214 if test_support.verbose:
1215 sys.stdout.write(" client: closing connection.\n")
1216 s.close()
1217
1218 def test_recv_send(self):
1219 """Test recv(), send() and friends."""
1220 if test_support.verbose:
1221 sys.stdout.write("\n")
1222
1223 server = ThreadedEchoServer(CERTFILE,
1224 certreqs=ssl.CERT_NONE,
1225 ssl_version=ssl.PROTOCOL_TLSv1,
1226 cacerts=CERTFILE,
1227 chatty=True,
1228 connectionchatty=False)
1229 with server:
1230 s = ssl.wrap_socket(socket.socket(),
1231 server_side=False,
1232 certfile=CERTFILE,
1233 ca_certs=CERTFILE,
1234 cert_reqs=ssl.CERT_NONE,
1235 ssl_version=ssl.PROTOCOL_TLSv1)
1236 s.connect((HOST, server.port))
1237 # helper methods for standardising recv* method signatures
1238 def _recv_into():
1239 b = bytearray("\0"*100)
1240 count = s.recv_into(b)
1241 return b[:count]
1242
1243 def _recvfrom_into():
1244 b = bytearray("\0"*100)
1245 count, addr = s.recvfrom_into(b)
1246 return b[:count]
1247
1248 # (name, method, whether to expect success, *args)
1249 send_methods = [
1250 ('send', s.send, True, []),
1251 ('sendto', s.sendto, False, ["some.address"]),
1252 ('sendall', s.sendall, True, []),
1253 ]
1254 recv_methods = [
1255 ('recv', s.recv, True, []),
1256 ('recvfrom', s.recvfrom, False, ["some.address"]),
1257 ('recv_into', _recv_into, True, []),
1258 ('recvfrom_into', _recvfrom_into, False, []),
1259 ]
1260 data_prefix = u"PREFIX_"
1261
1262 for meth_name, send_meth, expect_success, args in send_methods:
1263 indata = data_prefix + meth_name
1264 try:
1265 send_meth(indata.encode('ASCII', 'strict'), *args)
1266 outdata = s.read()
1267 outdata = outdata.decode('ASCII', 'strict')
1268 if outdata != indata.lower():
1269 self.fail(
1270 "While sending with <<%s>> bad data "
1271 "<<%r>> (%d) received; "
1272 "expected <<%r>> (%d)\n" % (
1273 meth_name, outdata[:20], len(outdata),
1274 indata[:20], len(indata)
1275 )
1276 )
1277 except ValueError as e:
1278 if expect_success:
1279 self.fail(
1280 "Failed to send with method <<%s>>; "
1281 "expected to succeed.\n" % (meth_name,)
1282 )
1283 if not str(e).startswith(meth_name):
1284 self.fail(
1285 "Method <<%s>> failed with unexpected "
1286 "exception message: %s\n" % (
1287 meth_name, e
1288 )
1289 )
1290
1291 for meth_name, recv_meth, expect_success, args in recv_methods:
1292 indata = data_prefix + meth_name
1293 try:
1294 s.send(indata.encode('ASCII', 'strict'))
1295 outdata = recv_meth(*args)
1296 outdata = outdata.decode('ASCII', 'strict')
1297 if outdata != indata.lower():
1298 self.fail(
1299 "While receiving with <<%s>> bad data "
1300 "<<%r>> (%d) received; "
1301 "expected <<%r>> (%d)\n" % (
1302 meth_name, outdata[:20], len(outdata),
1303 indata[:20], len(indata)
1304 )
1305 )
1306 except ValueError as e:
1307 if expect_success:
1308 self.fail(
1309 "Failed to receive with method <<%s>>; "
1310 "expected to succeed.\n" % (meth_name,)
1311 )
1312 if not str(e).startswith(meth_name):
1313 self.fail(
1314 "Method <<%s>> failed with unexpected "
1315 "exception message: %s\n" % (
1316 meth_name, e
1317 )
1318 )
1319 # consume data
1320 s.read()
1321
1322 s.write("over\n".encode("ASCII", "strict"))
1323 s.close()
1324
1325 def test_handshake_timeout(self):
1326 # Issue #5103: SSL handshake must respect the socket timeout
1327 server = socket.socket(socket.AF_INET)
1328 host = "127.0.0.1"
1329 port = test_support.bind_port(server)
1330 started = threading.Event()
1331 finish = False
1332
1333 def serve():
1334 server.listen(5)
1335 started.set()
1336 conns = []
1337 while not finish:
1338 r, w, e = select.select([server], [], [], 0.1)
1339 if server in r:
1340 # Let the socket hang around rather than having
1341 # it closed by garbage collection.
1342 conns.append(server.accept()[0])
1343
1344 t = threading.Thread(target=serve)
1345 t.start()
1346 started.wait()
1347
1348 try:
1349 try:
1350 c = socket.socket(socket.AF_INET)
1351 c.settimeout(0.2)
1352 c.connect((host, port))
1353 # Will attempt handshake and time out
1354 self.assertRaisesRegexp(ssl.SSLError, "timed out",
1355 ssl.wrap_socket, c)
1356 finally:
1357 c.close()
1358 try:
1359 c = socket.socket(socket.AF_INET)
1360 c.settimeout(0.2)
1361 c = ssl.wrap_socket(c)
1362 # Will attempt handshake and time out
1363 self.assertRaisesRegexp(ssl.SSLError, "timed out",
1364 c.connect, (host, port))
1365 finally:
1366 c.close()
1367 finally:
1368 finish = True
1369 t.join()
1370 server.close()
1371
1372 def test_default_ciphers(self):
1373 with ThreadedEchoServer(CERTFILE,
1374 ssl_version=ssl.PROTOCOL_SSLv23,
1375 chatty=False) as server:
1376 sock = socket.socket()
1377 try:
1378 # Force a set of weak ciphers on our client socket
1379 try:
1380 s = ssl.wrap_socket(sock,
1381 ssl_version=ssl.PROTOCOL_SSLv23,
1382 ciphers="DES")
1383 except ssl.SSLError:
1384 self.skipTest("no DES cipher available")
1385 with self.assertRaises((OSError, ssl.SSLError)):
1386 s.connect((HOST, server.port))
1387 finally:
1388 sock.close()
1389 self.assertIn("no shared cipher", str(server.conn_errors[0]))
1390
1391
1392def test_main(verbose=False):
1393 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT, NULLBYTECERT
1394 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1395 "keycert.pem")
1396 SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1397 os.path.dirname(__file__) or os.curdir,
1398 "https_svn_python_org_root.pem")
1399 NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir,
1400 "nokia.pem")
1401 NULLBYTECERT = os.path.join(os.path.dirname(__file__) or os.curdir,
1402 "nullbytecert.pem")
1403
1404 if (not os.path.exists(CERTFILE) or
1405 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or
1406 not os.path.exists(NOKIACERT) or
1407 not os.path.exists(NULLBYTECERT)):
1408 raise test_support.TestFailed("Can't read certificate files!")
1409
1410 tests = [BasicTests, BasicSocketTests]
1411
1412 if test_support.is_resource_enabled('network'):
1413 tests.append(NetworkedTests)
1414
1415 if _have_threads:
1416 thread_info = test_support.threading_setup()
1417 if thread_info and test_support.is_resource_enabled('network'):
1418 tests.append(ThreadedTests)
1419
1420 try:
1421 test_support.run_unittest(*tests)
1422 finally:
1423 if _have_threads:
1424 test_support.threading_cleanup(*thread_info)
1425
1426if __name__ == "__main__":
1427 test_main()
Note: See TracBrowser for help on using the repository browser.