source: python/trunk/Lib/test/test_httpservers.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 19.1 KB
Line 
1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7import os
8import sys
9import re
10import base64
11import shutil
12import urllib
13import httplib
14import tempfile
15import unittest
16import CGIHTTPServer
17
18
19from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
20from SimpleHTTPServer import SimpleHTTPRequestHandler
21from CGIHTTPServer import CGIHTTPRequestHandler
22from StringIO import StringIO
23from test import test_support
24
25
26threading = test_support.import_module('threading')
27
28
29class NoLogRequestHandler:
30 def log_message(self, *args):
31 # don't write log messages to stderr
32 pass
33
34class SocketlessRequestHandler(SimpleHTTPRequestHandler):
35 def __init__(self):
36 self.get_called = False
37 self.protocol_version = "HTTP/1.1"
38
39 def do_GET(self):
40 self.get_called = True
41 self.send_response(200)
42 self.send_header('Content-Type', 'text/html')
43 self.end_headers()
44 self.wfile.write(b'<html><body>Data</body></html>\r\n')
45
46 def log_message(self, fmt, *args):
47 pass
48
49
50class TestServerThread(threading.Thread):
51 def __init__(self, test_object, request_handler):
52 threading.Thread.__init__(self)
53 self.request_handler = request_handler
54 self.test_object = test_object
55
56 def run(self):
57 self.server = HTTPServer(('', 0), self.request_handler)
58 self.test_object.PORT = self.server.socket.getsockname()[1]
59 self.test_object.server_started.set()
60 self.test_object = None
61 try:
62 self.server.serve_forever(0.05)
63 finally:
64 self.server.server_close()
65
66 def stop(self):
67 self.server.shutdown()
68
69
70class BaseTestCase(unittest.TestCase):
71 def setUp(self):
72 self._threads = test_support.threading_setup()
73 os.environ = test_support.EnvironmentVarGuard()
74 self.server_started = threading.Event()
75 self.thread = TestServerThread(self, self.request_handler)
76 self.thread.start()
77 self.server_started.wait()
78
79 def tearDown(self):
80 self.thread.stop()
81 os.environ.__exit__()
82 test_support.threading_cleanup(*self._threads)
83
84 def request(self, uri, method='GET', body=None, headers={}):
85 self.connection = httplib.HTTPConnection('localhost', self.PORT)
86 self.connection.request(method, uri, body, headers)
87 return self.connection.getresponse()
88
89class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
90 """Test the functionality of the BaseHTTPServer focussing on
91 BaseHTTPRequestHandler.
92 """
93
94 HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK')
95
96 def setUp (self):
97 self.handler = SocketlessRequestHandler()
98
99 def send_typical_request(self, message):
100 input_msg = StringIO(message)
101 output = StringIO()
102 self.handler.rfile = input_msg
103 self.handler.wfile = output
104 self.handler.handle_one_request()
105 output.seek(0)
106 return output.readlines()
107
108 def verify_get_called(self):
109 self.assertTrue(self.handler.get_called)
110
111 def verify_expected_headers(self, headers):
112 for fieldName in 'Server: ', 'Date: ', 'Content-Type: ':
113 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
114
115 def verify_http_server_response(self, response):
116 match = self.HTTPResponseMatch.search(response)
117 self.assertTrue(match is not None)
118
119 def test_http_1_1(self):
120 result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n')
121 self.verify_http_server_response(result[0])
122 self.verify_expected_headers(result[1:-1])
123 self.verify_get_called()
124 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
125
126 def test_http_1_0(self):
127 result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n')
128 self.verify_http_server_response(result[0])
129 self.verify_expected_headers(result[1:-1])
130 self.verify_get_called()
131 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
132
133 def test_http_0_9(self):
134 result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n')
135 self.assertEqual(len(result), 1)
136 self.assertEqual(result[0], '<html><body>Data</body></html>\r\n')
137 self.verify_get_called()
138
139 def test_with_continue_1_0(self):
140 result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
141 self.verify_http_server_response(result[0])
142 self.verify_expected_headers(result[1:-1])
143 self.verify_get_called()
144 self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
145
146 def test_request_length(self):
147 # Issue #10714: huge request lines are discarded, to avoid Denial
148 # of Service attacks.
149 result = self.send_typical_request(b'GET ' + b'x' * 65537)
150 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
151 self.assertFalse(self.handler.get_called)
152
153
154class BaseHTTPServerTestCase(BaseTestCase):
155 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
156 protocol_version = 'HTTP/1.1'
157 default_request_version = 'HTTP/1.1'
158
159 def do_TEST(self):
160 self.send_response(204)
161 self.send_header('Content-Type', 'text/html')
162 self.send_header('Connection', 'close')
163 self.end_headers()
164
165 def do_KEEP(self):
166 self.send_response(204)
167 self.send_header('Content-Type', 'text/html')
168 self.send_header('Connection', 'keep-alive')
169 self.end_headers()
170
171 def do_KEYERROR(self):
172 self.send_error(999)
173
174 def do_CUSTOM(self):
175 self.send_response(999)
176 self.send_header('Content-Type', 'text/html')
177 self.send_header('Connection', 'close')
178 self.end_headers()
179
180 def setUp(self):
181 BaseTestCase.setUp(self)
182 self.con = httplib.HTTPConnection('localhost', self.PORT)
183 self.con.connect()
184
185 def test_command(self):
186 self.con.request('GET', '/')
187 res = self.con.getresponse()
188 self.assertEqual(res.status, 501)
189
190 def test_request_line_trimming(self):
191 self.con._http_vsn_str = 'HTTP/1.1\n'
192 self.con.putrequest('GET', '/')
193 self.con.endheaders()
194 res = self.con.getresponse()
195 self.assertEqual(res.status, 501)
196
197 def test_version_bogus(self):
198 self.con._http_vsn_str = 'FUBAR'
199 self.con.putrequest('GET', '/')
200 self.con.endheaders()
201 res = self.con.getresponse()
202 self.assertEqual(res.status, 400)
203
204 def test_version_digits(self):
205 self.con._http_vsn_str = 'HTTP/9.9.9'
206 self.con.putrequest('GET', '/')
207 self.con.endheaders()
208 res = self.con.getresponse()
209 self.assertEqual(res.status, 400)
210
211 def test_version_none_get(self):
212 self.con._http_vsn_str = ''
213 self.con.putrequest('GET', '/')
214 self.con.endheaders()
215 res = self.con.getresponse()
216 self.assertEqual(res.status, 501)
217
218 def test_version_none(self):
219 self.con._http_vsn_str = ''
220 self.con.putrequest('PUT', '/')
221 self.con.endheaders()
222 res = self.con.getresponse()
223 self.assertEqual(res.status, 400)
224
225 def test_version_invalid(self):
226 self.con._http_vsn = 99
227 self.con._http_vsn_str = 'HTTP/9.9'
228 self.con.putrequest('GET', '/')
229 self.con.endheaders()
230 res = self.con.getresponse()
231 self.assertEqual(res.status, 505)
232
233 def test_send_blank(self):
234 self.con._http_vsn_str = ''
235 self.con.putrequest('', '')
236 self.con.endheaders()
237 res = self.con.getresponse()
238 self.assertEqual(res.status, 400)
239
240 def test_header_close(self):
241 self.con.putrequest('GET', '/')
242 self.con.putheader('Connection', 'close')
243 self.con.endheaders()
244 res = self.con.getresponse()
245 self.assertEqual(res.status, 501)
246
247 def test_head_keep_alive(self):
248 self.con._http_vsn_str = 'HTTP/1.1'
249 self.con.putrequest('GET', '/')
250 self.con.putheader('Connection', 'keep-alive')
251 self.con.endheaders()
252 res = self.con.getresponse()
253 self.assertEqual(res.status, 501)
254
255 def test_handler(self):
256 self.con.request('TEST', '/')
257 res = self.con.getresponse()
258 self.assertEqual(res.status, 204)
259
260 def test_return_header_keep_alive(self):
261 self.con.request('KEEP', '/')
262 res = self.con.getresponse()
263 self.assertEqual(res.getheader('Connection'), 'keep-alive')
264 self.con.request('TEST', '/')
265 self.addCleanup(self.con.close)
266
267 def test_internal_key_error(self):
268 self.con.request('KEYERROR', '/')
269 res = self.con.getresponse()
270 self.assertEqual(res.status, 999)
271
272 def test_return_custom_status(self):
273 self.con.request('CUSTOM', '/')
274 res = self.con.getresponse()
275 self.assertEqual(res.status, 999)
276
277
278class SimpleHTTPServerTestCase(BaseTestCase):
279 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
280 pass
281
282 def setUp(self):
283 BaseTestCase.setUp(self)
284 self.cwd = os.getcwd()
285 basetempdir = tempfile.gettempdir()
286 os.chdir(basetempdir)
287 self.data = 'We are the knights who say Ni!'
288 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
289 self.tempdir_name = os.path.basename(self.tempdir)
290 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
291 temp.write(self.data)
292 temp.close()
293
294 def tearDown(self):
295 try:
296 os.chdir(self.cwd)
297 try:
298 shutil.rmtree(self.tempdir)
299 except OSError:
300 pass
301 finally:
302 BaseTestCase.tearDown(self)
303
304 def check_status_and_reason(self, response, status, data=None):
305 body = response.read()
306 self.assertTrue(response)
307 self.assertEqual(response.status, status)
308 self.assertIsNotNone(response.reason)
309 if data:
310 self.assertEqual(data, body)
311
312 def test_get(self):
313 #constructs the path relative to the root directory of the HTTPServer
314 response = self.request(self.tempdir_name + '/test')
315 self.check_status_and_reason(response, 200, data=self.data)
316 # check for trailing "/" which should return 404. See Issue17324
317 response = self.request(self.tempdir_name + '/test/')
318 self.check_status_and_reason(response, 404)
319 response = self.request(self.tempdir_name + '/')
320 self.check_status_and_reason(response, 200)
321 response = self.request(self.tempdir_name)
322 self.check_status_and_reason(response, 301)
323 response = self.request('/ThisDoesNotExist')
324 self.check_status_and_reason(response, 404)
325 response = self.request('/' + 'ThisDoesNotExist' + '/')
326 self.check_status_and_reason(response, 404)
327 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
328 response = self.request('/' + self.tempdir_name + '/')
329 self.check_status_and_reason(response, 200)
330
331 # chmod() doesn't work as expected on Windows, and filesystem
332 # permissions are ignored by root on Unix.
333 if os.name == 'posix' and os.geteuid() != 0:
334 os.chmod(self.tempdir, 0)
335 response = self.request(self.tempdir_name + '/')
336 self.check_status_and_reason(response, 404)
337 os.chmod(self.tempdir, 0755)
338
339 def test_head(self):
340 response = self.request(
341 self.tempdir_name + '/test', method='HEAD')
342 self.check_status_and_reason(response, 200)
343 self.assertEqual(response.getheader('content-length'),
344 str(len(self.data)))
345 self.assertEqual(response.getheader('content-type'),
346 'application/octet-stream')
347
348 def test_invalid_requests(self):
349 response = self.request('/', method='FOO')
350 self.check_status_and_reason(response, 501)
351 # requests must be case sensitive,so this should fail too
352 response = self.request('/', method='get')
353 self.check_status_and_reason(response, 501)
354 response = self.request('/', method='GETs')
355 self.check_status_and_reason(response, 501)
356
357
358cgi_file1 = """\
359#!%s
360
361print "Content-type: text/html"
362print
363print "Hello World"
364"""
365
366cgi_file2 = """\
367#!%s
368import cgi
369
370print "Content-type: text/html"
371print
372
373form = cgi.FieldStorage()
374print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
375 form.getfirst("bacon"))
376"""
377
378
379@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
380 "This test can't be run reliably as root (issue #13308).")
381class CGIHTTPServerTestCase(BaseTestCase):
382 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
383 pass
384
385 def setUp(self):
386 BaseTestCase.setUp(self)
387 self.parent_dir = tempfile.mkdtemp()
388 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
389 os.mkdir(self.cgi_dir)
390
391 # The shebang line should be pure ASCII: use symlink if possible.
392 # See issue #7668.
393 if hasattr(os, 'symlink'):
394 self.pythonexe = os.path.join(self.parent_dir, 'python')
395 os.symlink(sys.executable, self.pythonexe)
396 else:
397 self.pythonexe = sys.executable
398
399 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
400 with open(self.nocgi_path, 'w') as fp:
401 fp.write(cgi_file1 % self.pythonexe)
402 os.chmod(self.nocgi_path, 0777)
403
404 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
405 with open(self.file1_path, 'w') as file1:
406 file1.write(cgi_file1 % self.pythonexe)
407 os.chmod(self.file1_path, 0777)
408
409 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
410 with open(self.file2_path, 'w') as file2:
411 file2.write(cgi_file2 % self.pythonexe)
412 os.chmod(self.file2_path, 0777)
413
414 self.cwd = os.getcwd()
415 os.chdir(self.parent_dir)
416
417 def tearDown(self):
418 try:
419 os.chdir(self.cwd)
420 if self.pythonexe != sys.executable:
421 os.remove(self.pythonexe)
422 os.remove(self.nocgi_path)
423 os.remove(self.file1_path)
424 os.remove(self.file2_path)
425 os.rmdir(self.cgi_dir)
426 os.rmdir(self.parent_dir)
427 finally:
428 BaseTestCase.tearDown(self)
429
430 def test_url_collapse_path(self):
431 # verify tail is the last portion and head is the rest on proper urls
432 test_vectors = {
433 '': '//',
434 '..': IndexError,
435 '/.//..': IndexError,
436 '/': '//',
437 '//': '//',
438 '/\\': '//\\',
439 '/.//': '//',
440 'cgi-bin/file1.py': '/cgi-bin/file1.py',
441 '/cgi-bin/file1.py': '/cgi-bin/file1.py',
442 'a': '//a',
443 '/a': '//a',
444 '//a': '//a',
445 './a': '//a',
446 './C:/': '/C:/',
447 '/a/b': '/a/b',
448 '/a/b/': '/a/b/',
449 '/a/b/.': '/a/b/',
450 '/a/b/c/..': '/a/b/',
451 '/a/b/c/../d': '/a/b/d',
452 '/a/b/c/../d/e/../f': '/a/b/d/f',
453 '/a/b/c/../d/e/../../f': '/a/b/f',
454 '/a/b/c/../d/e/.././././..//f': '/a/b/f',
455 '../a/b/c/../d/e/.././././..//f': IndexError,
456 '/a/b/c/../d/e/../../../f': '/a/f',
457 '/a/b/c/../d/e/../../../../f': '//f',
458 '/a/b/c/../d/e/../../../../../f': IndexError,
459 '/a/b/c/../d/e/../../../../f/..': '//',
460 '/a/b/c/../d/e/../../../../f/../.': '//',
461 }
462 for path, expected in test_vectors.iteritems():
463 if isinstance(expected, type) and issubclass(expected, Exception):
464 self.assertRaises(expected,
465 CGIHTTPServer._url_collapse_path, path)
466 else:
467 actual = CGIHTTPServer._url_collapse_path(path)
468 self.assertEqual(expected, actual,
469 msg='path = %r\nGot: %r\nWanted: %r' %
470 (path, actual, expected))
471
472 def test_headers_and_content(self):
473 res = self.request('/cgi-bin/file1.py')
474 self.assertEqual(('Hello World\n', 'text/html', 200),
475 (res.read(), res.getheader('Content-type'), res.status))
476
477 def test_issue19435(self):
478 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
479 self.assertEqual(res.status, 404)
480
481 def test_post(self):
482 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
483 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
484 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
485
486 self.assertEqual(res.read(), '1, python, 123456\n')
487
488 def test_invaliduri(self):
489 res = self.request('/cgi-bin/invalid')
490 res.read()
491 self.assertEqual(res.status, 404)
492
493 def test_authorization(self):
494 headers = {'Authorization' : 'Basic %s' %
495 base64.b64encode('username:pass')}
496 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
497 self.assertEqual(('Hello World\n', 'text/html', 200),
498 (res.read(), res.getheader('Content-type'), res.status))
499
500 def test_no_leading_slash(self):
501 # http://bugs.python.org/issue2254
502 res = self.request('cgi-bin/file1.py')
503 self.assertEqual(('Hello World\n', 'text/html', 200),
504 (res.read(), res.getheader('Content-type'), res.status))
505
506 def test_os_environ_is_not_altered(self):
507 signature = "Test CGI Server"
508 os.environ['SERVER_SOFTWARE'] = signature
509 res = self.request('/cgi-bin/file1.py')
510 self.assertEqual((b'Hello World\n', 'text/html', 200),
511 (res.read(), res.getheader('Content-type'), res.status))
512 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
513
514
515class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
516 """ Test url parsing """
517 def setUp(self):
518 self.translated = os.getcwd()
519 self.translated = os.path.join(self.translated, 'filename')
520 self.handler = SocketlessRequestHandler()
521
522 def test_query_arguments(self):
523 path = self.handler.translate_path('/filename')
524 self.assertEqual(path, self.translated)
525 path = self.handler.translate_path('/filename?foo=bar')
526 self.assertEqual(path, self.translated)
527 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
528 self.assertEqual(path, self.translated)
529
530 def test_start_with_double_slash(self):
531 path = self.handler.translate_path('//filename')
532 self.assertEqual(path, self.translated)
533 path = self.handler.translate_path('//filename?foo=bar')
534 self.assertEqual(path, self.translated)
535
536
537def test_main(verbose=None):
538 try:
539 cwd = os.getcwd()
540 test_support.run_unittest(BaseHTTPRequestHandlerTestCase,
541 SimpleHTTPRequestHandlerTestCase,
542 BaseHTTPServerTestCase,
543 SimpleHTTPServerTestCase,
544 CGIHTTPServerTestCase
545 )
546 finally:
547 os.chdir(cwd)
548
549if __name__ == '__main__':
550 test_main()
Note: See TracBrowser for help on using the repository browser.