Changeset 391 for python/trunk/Lib/test/test_httpservers.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_httpservers.py
r2 r391 5 5 """ 6 6 7 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer8 from SimpleHTTPServer import SimpleHTTPRequestHandler9 from CGIHTTPServer import CGIHTTPRequestHandler10 11 7 import os 12 8 import sys 9 import re 13 10 import base64 14 11 import shutil … … 16 13 import httplib 17 14 import tempfile 18 import threading19 20 15 import unittest 16 import CGIHTTPServer 17 18 19 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer 20 from SimpleHTTPServer import SimpleHTTPRequestHandler 21 from CGIHTTPServer import CGIHTTPRequestHandler 22 from StringIO import StringIO 21 23 from test import test_support 24 25 26 threading = test_support.import_module('threading') 22 27 23 28 … … 25 30 def log_message(self, *args): 26 31 # don't write log messages to stderr 32 pass 33 34 class SocketlessRequestHandler(SimpleHTTPRequestHandler): 35 def __init__(self): 36 self.get_called = False 37 self.protocol_version = "HTTP/1.1" 38 39 def do_GET(self): 40 self.get_called = True 41 self.send_response(200) 42 self.send_header('Content-Type', 'text/html') 43 self.end_headers() 44 self.wfile.write(b'<html><body>Data</body></html>\r\n') 45 46 def log_message(self, fmt, *args): 27 47 pass 28 48 … … 33 53 self.request_handler = request_handler 34 54 self.test_object = test_object 35 self.test_object.lock.acquire()36 55 37 56 def run(self): 38 57 self.server = HTTPServer(('', 0), self.request_handler) 39 58 self.test_object.PORT = self.server.socket.getsockname()[1] 40 self.test_object.lock.release() 59 self.test_object.server_started.set() 60 self.test_object = None 41 61 try: 42 self.server.serve_forever( )62 self.server.serve_forever(0.05) 43 63 finally: 44 64 self.server.server_close() … … 50 70 class BaseTestCase(unittest.TestCase): 51 71 def setUp(self): 52 self.lock = threading.Lock() 72 self._threads = test_support.threading_setup() 73 os.environ = test_support.EnvironmentVarGuard() 74 self.server_started = threading.Event() 53 75 self.thread = TestServerThread(self, self.request_handler) 54 76 self.thread.start() 55 self. lock.acquire()77 self.server_started.wait() 56 78 57 79 def tearDown(self): 58 self.lock.release()59 80 self.thread.stop() 81 os.environ.__exit__() 82 test_support.threading_cleanup(*self._threads) 60 83 61 84 def request(self, uri, method='GET', body=None, headers={}): … … 63 86 self.connection.request(method, uri, body, headers) 64 87 return self.connection.getresponse() 88 89 class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 90 """Test the functionality of the BaseHTTPServer focussing on 91 BaseHTTPRequestHandler. 92 """ 93 94 HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK') 95 96 def setUp (self): 97 self.handler = SocketlessRequestHandler() 98 99 def send_typical_request(self, message): 100 input_msg = StringIO(message) 101 output = StringIO() 102 self.handler.rfile = input_msg 103 self.handler.wfile = output 104 self.handler.handle_one_request() 105 output.seek(0) 106 return output.readlines() 107 108 def verify_get_called(self): 109 self.assertTrue(self.handler.get_called) 110 111 def verify_expected_headers(self, headers): 112 for fieldName in 'Server: ', 'Date: ', 'Content-Type: ': 113 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 114 115 def verify_http_server_response(self, response): 116 match = self.HTTPResponseMatch.search(response) 117 self.assertTrue(match is not None) 118 119 def test_http_1_1(self): 120 result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n') 121 self.verify_http_server_response(result[0]) 122 self.verify_expected_headers(result[1:-1]) 123 self.verify_get_called() 124 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 125 126 def test_http_1_0(self): 127 result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n') 128 self.verify_http_server_response(result[0]) 129 self.verify_expected_headers(result[1:-1]) 130 self.verify_get_called() 131 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 132 133 def test_http_0_9(self): 134 result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n') 135 self.assertEqual(len(result), 1) 136 self.assertEqual(result[0], '<html><body>Data</body></html>\r\n') 137 self.verify_get_called() 138 139 def test_with_continue_1_0(self): 140 result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 141 self.verify_http_server_response(result[0]) 142 self.verify_expected_headers(result[1:-1]) 143 self.verify_get_called() 144 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') 145 146 def test_request_length(self): 147 # Issue #10714: huge request lines are discarded, to avoid Denial 148 # of Service attacks. 149 result = self.send_typical_request(b'GET ' + b'x' * 65537) 150 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 151 self.assertFalse(self.handler.get_called) 65 152 66 153 … … 99 186 self.con.request('GET', '/') 100 187 res = self.con.getresponse() 101 self.assertEqual s(res.status, 501)188 self.assertEqual(res.status, 501) 102 189 103 190 def test_request_line_trimming(self): … … 106 193 self.con.endheaders() 107 194 res = self.con.getresponse() 108 self.assertEqual s(res.status, 501)195 self.assertEqual(res.status, 501) 109 196 110 197 def test_version_bogus(self): … … 113 200 self.con.endheaders() 114 201 res = self.con.getresponse() 115 self.assertEqual s(res.status, 400)202 self.assertEqual(res.status, 400) 116 203 117 204 def test_version_digits(self): … … 120 207 self.con.endheaders() 121 208 res = self.con.getresponse() 122 self.assertEqual s(res.status, 400)209 self.assertEqual(res.status, 400) 123 210 124 211 def test_version_none_get(self): … … 127 214 self.con.endheaders() 128 215 res = self.con.getresponse() 129 self.assertEqual s(res.status, 501)216 self.assertEqual(res.status, 501) 130 217 131 218 def test_version_none(self): … … 134 221 self.con.endheaders() 135 222 res = self.con.getresponse() 136 self.assertEqual s(res.status, 400)223 self.assertEqual(res.status, 400) 137 224 138 225 def test_version_invalid(self): … … 142 229 self.con.endheaders() 143 230 res = self.con.getresponse() 144 self.assertEqual s(res.status, 505)231 self.assertEqual(res.status, 505) 145 232 146 233 def test_send_blank(self): … … 149 236 self.con.endheaders() 150 237 res = self.con.getresponse() 151 self.assertEqual s(res.status, 400)238 self.assertEqual(res.status, 400) 152 239 153 240 def test_header_close(self): … … 156 243 self.con.endheaders() 157 244 res = self.con.getresponse() 158 self.assertEqual s(res.status, 501)245 self.assertEqual(res.status, 501) 159 246 160 247 def test_head_keep_alive(self): … … 164 251 self.con.endheaders() 165 252 res = self.con.getresponse() 166 self.assertEqual s(res.status, 501)253 self.assertEqual(res.status, 501) 167 254 168 255 def test_handler(self): 169 256 self.con.request('TEST', '/') 170 257 res = self.con.getresponse() 171 self.assertEqual s(res.status, 204)258 self.assertEqual(res.status, 204) 172 259 173 260 def test_return_header_keep_alive(self): 174 261 self.con.request('KEEP', '/') 175 262 res = self.con.getresponse() 176 self.assertEqual s(res.getheader('Connection'), 'keep-alive')263 self.assertEqual(res.getheader('Connection'), 'keep-alive') 177 264 self.con.request('TEST', '/') 265 self.addCleanup(self.con.close) 178 266 179 267 def test_internal_key_error(self): 180 268 self.con.request('KEYERROR', '/') 181 269 res = self.con.getresponse() 182 self.assertEqual s(res.status, 999)270 self.assertEqual(res.status, 999) 183 271 184 272 def test_return_custom_status(self): 185 273 self.con.request('CUSTOM', '/') 186 274 res = self.con.getresponse() 187 self.assertEqual s(res.status, 999)275 self.assertEqual(res.status, 999) 188 276 189 277 … … 209 297 try: 210 298 shutil.rmtree(self.tempdir) 211 except :299 except OSError: 212 300 pass 213 301 finally: … … 216 304 def check_status_and_reason(self, response, status, data=None): 217 305 body = response.read() 218 self.assert _(response)219 self.assertEqual s(response.status, status)220 self.assert _(response.reason != None)306 self.assertTrue(response) 307 self.assertEqual(response.status, status) 308 self.assertIsNotNone(response.reason) 221 309 if data: 222 310 self.assertEqual(data, body) … … 226 314 response = self.request(self.tempdir_name + '/test') 227 315 self.check_status_and_reason(response, 200, data=self.data) 316 # check for trailing "/" which should return 404. See Issue17324 317 response = self.request(self.tempdir_name + '/test/') 318 self.check_status_and_reason(response, 404) 228 319 response = self.request(self.tempdir_name + '/') 229 320 self.check_status_and_reason(response, 200) … … 237 328 response = self.request('/' + self.tempdir_name + '/') 238 329 self.check_status_and_reason(response, 200) 239 if os.name == 'posix': 240 # chmod won't work as expected on Windows platforms 330 331 # chmod() doesn't work as expected on Windows, and filesystem 332 # permissions are ignored by root on Unix. 333 if os.name == 'posix' and os.geteuid() != 0: 241 334 os.chmod(self.tempdir, 0) 242 335 response = self.request(self.tempdir_name + '/') … … 279 372 280 373 form = cgi.FieldStorage() 281 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), \282 form.getfirst("bacon"))374 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 375 form.getfirst("bacon")) 283 376 """ 284 377 378 379 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 380 "This test can't be run reliably as root (issue #13308).") 285 381 class CGIHTTPServerTestCase(BaseTestCase): 286 382 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): … … 293 389 os.mkdir(self.cgi_dir) 294 390 391 # The shebang line should be pure ASCII: use symlink if possible. 392 # See issue #7668. 393 if hasattr(os, 'symlink'): 394 self.pythonexe = os.path.join(self.parent_dir, 'python') 395 os.symlink(sys.executable, self.pythonexe) 396 else: 397 self.pythonexe = sys.executable 398 399 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 400 with open(self.nocgi_path, 'w') as fp: 401 fp.write(cgi_file1 % self.pythonexe) 402 os.chmod(self.nocgi_path, 0777) 403 295 404 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 296 405 with open(self.file1_path, 'w') as file1: 297 file1.write(cgi_file1 % s ys.executable)406 file1.write(cgi_file1 % self.pythonexe) 298 407 os.chmod(self.file1_path, 0777) 299 408 300 409 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 301 410 with open(self.file2_path, 'w') as file2: 302 file2.write(cgi_file2 % s ys.executable)411 file2.write(cgi_file2 % self.pythonexe) 303 412 os.chmod(self.file2_path, 0777) 304 413 … … 309 418 try: 310 419 os.chdir(self.cwd) 420 if self.pythonexe != sys.executable: 421 os.remove(self.pythonexe) 422 os.remove(self.nocgi_path) 311 423 os.remove(self.file1_path) 312 424 os.remove(self.file2_path) … … 316 428 BaseTestCase.tearDown(self) 317 429 430 def test_url_collapse_path(self): 431 # verify tail is the last portion and head is the rest on proper urls 432 test_vectors = { 433 '': '//', 434 '..': IndexError, 435 '/.//..': IndexError, 436 '/': '//', 437 '//': '//', 438 '/\\': '//\\', 439 '/.//': '//', 440 'cgi-bin/file1.py': '/cgi-bin/file1.py', 441 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 442 'a': '//a', 443 '/a': '//a', 444 '//a': '//a', 445 './a': '//a', 446 './C:/': '/C:/', 447 '/a/b': '/a/b', 448 '/a/b/': '/a/b/', 449 '/a/b/.': '/a/b/', 450 '/a/b/c/..': '/a/b/', 451 '/a/b/c/../d': '/a/b/d', 452 '/a/b/c/../d/e/../f': '/a/b/d/f', 453 '/a/b/c/../d/e/../../f': '/a/b/f', 454 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 455 '../a/b/c/../d/e/.././././..//f': IndexError, 456 '/a/b/c/../d/e/../../../f': '/a/f', 457 '/a/b/c/../d/e/../../../../f': '//f', 458 '/a/b/c/../d/e/../../../../../f': IndexError, 459 '/a/b/c/../d/e/../../../../f/..': '//', 460 '/a/b/c/../d/e/../../../../f/../.': '//', 461 } 462 for path, expected in test_vectors.iteritems(): 463 if isinstance(expected, type) and issubclass(expected, Exception): 464 self.assertRaises(expected, 465 CGIHTTPServer._url_collapse_path, path) 466 else: 467 actual = CGIHTTPServer._url_collapse_path(path) 468 self.assertEqual(expected, actual, 469 msg='path = %r\nGot: %r\nWanted: %r' % 470 (path, actual, expected)) 471 318 472 def test_headers_and_content(self): 319 473 res = self.request('/cgi-bin/file1.py') 320 self.assertEquals(('Hello World\n', 'text/html', 200), \ 321 (res.read(), res.getheader('Content-type'), res.status)) 474 self.assertEqual(('Hello World\n', 'text/html', 200), 475 (res.read(), res.getheader('Content-type'), res.status)) 476 477 def test_issue19435(self): 478 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 479 self.assertEqual(res.status, 404) 322 480 323 481 def test_post(self): … … 326 484 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 327 485 328 self.assertEqual s(res.read(), '1, python, 123456\n')486 self.assertEqual(res.read(), '1, python, 123456\n') 329 487 330 488 def test_invaliduri(self): 331 489 res = self.request('/cgi-bin/invalid') 332 490 res.read() 333 self.assertEqual s(res.status, 404)491 self.assertEqual(res.status, 404) 334 492 335 493 def test_authorization(self): 336 headers = {'Authorization' : 'Basic %s' % \337 base64.b64encode('username:pass')}494 headers = {'Authorization' : 'Basic %s' % 495 base64.b64encode('username:pass')} 338 496 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 339 self.assertEquals(('Hello World\n', 'text/html', 200), \ 497 self.assertEqual(('Hello World\n', 'text/html', 200), 498 (res.read(), res.getheader('Content-type'), res.status)) 499 500 def test_no_leading_slash(self): 501 # http://bugs.python.org/issue2254 502 res = self.request('cgi-bin/file1.py') 503 self.assertEqual(('Hello World\n', 'text/html', 200), 340 504 (res.read(), res.getheader('Content-type'), res.status)) 341 505 506 def test_os_environ_is_not_altered(self): 507 signature = "Test CGI Server" 508 os.environ['SERVER_SOFTWARE'] = signature 509 res = self.request('/cgi-bin/file1.py') 510 self.assertEqual((b'Hello World\n', 'text/html', 200), 511 (res.read(), res.getheader('Content-type'), res.status)) 512 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 513 514 515 class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 516 """ Test url parsing """ 517 def setUp(self): 518 self.translated = os.getcwd() 519 self.translated = os.path.join(self.translated, 'filename') 520 self.handler = SocketlessRequestHandler() 521 522 def test_query_arguments(self): 523 path = self.handler.translate_path('/filename') 524 self.assertEqual(path, self.translated) 525 path = self.handler.translate_path('/filename?foo=bar') 526 self.assertEqual(path, self.translated) 527 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') 528 self.assertEqual(path, self.translated) 529 530 def test_start_with_double_slash(self): 531 path = self.handler.translate_path('//filename') 532 self.assertEqual(path, self.translated) 533 path = self.handler.translate_path('//filename?foo=bar') 534 self.assertEqual(path, self.translated) 535 342 536 343 537 def test_main(verbose=None): 344 cwd = os.getcwd()345 env = os.environ.copy()346 538 try: 347 test_support.run_unittest(BaseHTTPServerTestCase, 539 cwd = os.getcwd() 540 test_support.run_unittest(BaseHTTPRequestHandlerTestCase, 541 SimpleHTTPRequestHandlerTestCase, 542 BaseHTTPServerTestCase, 348 543 SimpleHTTPServerTestCase, 349 544 CGIHTTPServerTestCase 350 545 ) 351 546 finally: 352 test_support.reap_children()353 os.environ.clear()354 os.environ.update(env)355 547 os.chdir(cwd) 356 548
Note:
See TracChangeset
for help on using the changeset viewer.