1 | #!/usr/bin/env python
|
---|
2 |
|
---|
3 | import urlparse
|
---|
4 | import urllib2
|
---|
5 | import BaseHTTPServer
|
---|
6 | import unittest
|
---|
7 | import hashlib
|
---|
8 |
|
---|
9 | from test import test_support
|
---|
10 |
|
---|
11 | mimetools = test_support.import_module('mimetools', deprecated=True)
|
---|
12 | threading = test_support.import_module('threading')
|
---|
13 |
|
---|
14 | # Loopback http server infrastructure
|
---|
15 |
|
---|
16 | class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
|
---|
17 | """HTTP server w/ a few modifications that make it useful for
|
---|
18 | loopback testing purposes.
|
---|
19 | """
|
---|
20 |
|
---|
21 | def __init__(self, server_address, RequestHandlerClass):
|
---|
22 | BaseHTTPServer.HTTPServer.__init__(self,
|
---|
23 | server_address,
|
---|
24 | RequestHandlerClass)
|
---|
25 |
|
---|
26 | # Set the timeout of our listening socket really low so
|
---|
27 | # that we can stop the server easily.
|
---|
28 | self.socket.settimeout(1.0)
|
---|
29 |
|
---|
30 | def get_request(self):
|
---|
31 | """BaseHTTPServer method, overridden."""
|
---|
32 |
|
---|
33 | request, client_address = self.socket.accept()
|
---|
34 |
|
---|
35 | # It's a loopback connection, so setting the timeout
|
---|
36 | # really low shouldn't affect anything, but should make
|
---|
37 | # deadlocks less likely to occur.
|
---|
38 | request.settimeout(10.0)
|
---|
39 |
|
---|
40 | return (request, client_address)
|
---|
41 |
|
---|
42 | class LoopbackHttpServerThread(threading.Thread):
|
---|
43 | """Stoppable thread that runs a loopback http server."""
|
---|
44 |
|
---|
45 | def __init__(self, request_handler):
|
---|
46 | threading.Thread.__init__(self)
|
---|
47 | self._stop = False
|
---|
48 | self.ready = threading.Event()
|
---|
49 | request_handler.protocol_version = "HTTP/1.0"
|
---|
50 | self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
|
---|
51 | request_handler)
|
---|
52 | #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
|
---|
53 | # self.httpd.server_port)
|
---|
54 | self.port = self.httpd.server_port
|
---|
55 |
|
---|
56 | def stop(self):
|
---|
57 | """Stops the webserver if it's currently running."""
|
---|
58 |
|
---|
59 | # Set the stop flag.
|
---|
60 | self._stop = True
|
---|
61 |
|
---|
62 | self.join()
|
---|
63 |
|
---|
64 | def run(self):
|
---|
65 | self.ready.set()
|
---|
66 | while not self._stop:
|
---|
67 | self.httpd.handle_request()
|
---|
68 |
|
---|
69 | # Authentication infrastructure
|
---|
70 |
|
---|
71 | class DigestAuthHandler:
|
---|
72 | """Handler for performing digest authentication."""
|
---|
73 |
|
---|
74 | def __init__(self):
|
---|
75 | self._request_num = 0
|
---|
76 | self._nonces = []
|
---|
77 | self._users = {}
|
---|
78 | self._realm_name = "Test Realm"
|
---|
79 | self._qop = "auth"
|
---|
80 |
|
---|
81 | def set_qop(self, qop):
|
---|
82 | self._qop = qop
|
---|
83 |
|
---|
84 | def set_users(self, users):
|
---|
85 | assert isinstance(users, dict)
|
---|
86 | self._users = users
|
---|
87 |
|
---|
88 | def set_realm(self, realm):
|
---|
89 | self._realm_name = realm
|
---|
90 |
|
---|
91 | def _generate_nonce(self):
|
---|
92 | self._request_num += 1
|
---|
93 | nonce = hashlib.md5(str(self._request_num)).hexdigest()
|
---|
94 | self._nonces.append(nonce)
|
---|
95 | return nonce
|
---|
96 |
|
---|
97 | def _create_auth_dict(self, auth_str):
|
---|
98 | first_space_index = auth_str.find(" ")
|
---|
99 | auth_str = auth_str[first_space_index+1:]
|
---|
100 |
|
---|
101 | parts = auth_str.split(",")
|
---|
102 |
|
---|
103 | auth_dict = {}
|
---|
104 | for part in parts:
|
---|
105 | name, value = part.split("=")
|
---|
106 | name = name.strip()
|
---|
107 | if value[0] == '"' and value[-1] == '"':
|
---|
108 | value = value[1:-1]
|
---|
109 | else:
|
---|
110 | value = value.strip()
|
---|
111 | auth_dict[name] = value
|
---|
112 | return auth_dict
|
---|
113 |
|
---|
114 | def _validate_auth(self, auth_dict, password, method, uri):
|
---|
115 | final_dict = {}
|
---|
116 | final_dict.update(auth_dict)
|
---|
117 | final_dict["password"] = password
|
---|
118 | final_dict["method"] = method
|
---|
119 | final_dict["uri"] = uri
|
---|
120 | HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
|
---|
121 | HA1 = hashlib.md5(HA1_str).hexdigest()
|
---|
122 | HA2_str = "%(method)s:%(uri)s" % final_dict
|
---|
123 | HA2 = hashlib.md5(HA2_str).hexdigest()
|
---|
124 | final_dict["HA1"] = HA1
|
---|
125 | final_dict["HA2"] = HA2
|
---|
126 | response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
|
---|
127 | "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
|
---|
128 | response = hashlib.md5(response_str).hexdigest()
|
---|
129 |
|
---|
130 | return response == auth_dict["response"]
|
---|
131 |
|
---|
132 | def _return_auth_challenge(self, request_handler):
|
---|
133 | request_handler.send_response(407, "Proxy Authentication Required")
|
---|
134 | request_handler.send_header("Content-Type", "text/html")
|
---|
135 | request_handler.send_header(
|
---|
136 | 'Proxy-Authenticate', 'Digest realm="%s", '
|
---|
137 | 'qop="%s",'
|
---|
138 | 'nonce="%s", ' % \
|
---|
139 | (self._realm_name, self._qop, self._generate_nonce()))
|
---|
140 | # XXX: Not sure if we're supposed to add this next header or
|
---|
141 | # not.
|
---|
142 | #request_handler.send_header('Connection', 'close')
|
---|
143 | request_handler.end_headers()
|
---|
144 | request_handler.wfile.write("Proxy Authentication Required.")
|
---|
145 | return False
|
---|
146 |
|
---|
147 | def handle_request(self, request_handler):
|
---|
148 | """Performs digest authentication on the given HTTP request
|
---|
149 | handler. Returns True if authentication was successful, False
|
---|
150 | otherwise.
|
---|
151 |
|
---|
152 | If no users have been set, then digest auth is effectively
|
---|
153 | disabled and this method will always return True.
|
---|
154 | """
|
---|
155 |
|
---|
156 | if len(self._users) == 0:
|
---|
157 | return True
|
---|
158 |
|
---|
159 | if 'Proxy-Authorization' not in request_handler.headers:
|
---|
160 | return self._return_auth_challenge(request_handler)
|
---|
161 | else:
|
---|
162 | auth_dict = self._create_auth_dict(
|
---|
163 | request_handler.headers['Proxy-Authorization']
|
---|
164 | )
|
---|
165 | if auth_dict["username"] in self._users:
|
---|
166 | password = self._users[ auth_dict["username"] ]
|
---|
167 | else:
|
---|
168 | return self._return_auth_challenge(request_handler)
|
---|
169 | if not auth_dict.get("nonce") in self._nonces:
|
---|
170 | return self._return_auth_challenge(request_handler)
|
---|
171 | else:
|
---|
172 | self._nonces.remove(auth_dict["nonce"])
|
---|
173 |
|
---|
174 | auth_validated = False
|
---|
175 |
|
---|
176 | # MSIE uses short_path in its validation, but Python's
|
---|
177 | # urllib2 uses the full path, so we're going to see if
|
---|
178 | # either of them works here.
|
---|
179 |
|
---|
180 | for path in [request_handler.path, request_handler.short_path]:
|
---|
181 | if self._validate_auth(auth_dict,
|
---|
182 | password,
|
---|
183 | request_handler.command,
|
---|
184 | path):
|
---|
185 | auth_validated = True
|
---|
186 |
|
---|
187 | if not auth_validated:
|
---|
188 | return self._return_auth_challenge(request_handler)
|
---|
189 | return True
|
---|
190 |
|
---|
191 | # Proxy test infrastructure
|
---|
192 |
|
---|
193 | class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
---|
194 | """This is a 'fake proxy' that makes it look like the entire
|
---|
195 | internet has gone down due to a sudden zombie invasion. It main
|
---|
196 | utility is in providing us with authentication support for
|
---|
197 | testing.
|
---|
198 | """
|
---|
199 |
|
---|
200 | def __init__(self, digest_auth_handler, *args, **kwargs):
|
---|
201 | # This has to be set before calling our parent's __init__(), which will
|
---|
202 | # try to call do_GET().
|
---|
203 | self.digest_auth_handler = digest_auth_handler
|
---|
204 | BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
|
---|
205 |
|
---|
206 | def log_message(self, format, *args):
|
---|
207 | # Uncomment the next line for debugging.
|
---|
208 | #sys.stderr.write(format % args)
|
---|
209 | pass
|
---|
210 |
|
---|
211 | def do_GET(self):
|
---|
212 | (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
|
---|
213 | self.path, 'http')
|
---|
214 | self.short_path = path
|
---|
215 | if self.digest_auth_handler.handle_request(self):
|
---|
216 | self.send_response(200, "OK")
|
---|
217 | self.send_header("Content-Type", "text/html")
|
---|
218 | self.end_headers()
|
---|
219 | self.wfile.write("You've reached %s!<BR>" % self.path)
|
---|
220 | self.wfile.write("Our apologies, but our server is down due to "
|
---|
221 | "a sudden zombie invasion.")
|
---|
222 |
|
---|
223 | # Test cases
|
---|
224 |
|
---|
225 | class BaseTestCase(unittest.TestCase):
|
---|
226 | def setUp(self):
|
---|
227 | self._threads = test_support.threading_setup()
|
---|
228 |
|
---|
229 | def tearDown(self):
|
---|
230 | test_support.threading_cleanup(*self._threads)
|
---|
231 |
|
---|
232 |
|
---|
233 | class ProxyAuthTests(BaseTestCase):
|
---|
234 | URL = "http://localhost"
|
---|
235 |
|
---|
236 | USER = "tester"
|
---|
237 | PASSWD = "test123"
|
---|
238 | REALM = "TestRealm"
|
---|
239 |
|
---|
240 | def setUp(self):
|
---|
241 | super(ProxyAuthTests, self).setUp()
|
---|
242 | self.digest_auth_handler = DigestAuthHandler()
|
---|
243 | self.digest_auth_handler.set_users({self.USER: self.PASSWD})
|
---|
244 | self.digest_auth_handler.set_realm(self.REALM)
|
---|
245 | def create_fake_proxy_handler(*args, **kwargs):
|
---|
246 | return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
|
---|
247 |
|
---|
248 | self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
|
---|
249 | self.server.start()
|
---|
250 | self.server.ready.wait()
|
---|
251 | proxy_url = "http://127.0.0.1:%d" % self.server.port
|
---|
252 | handler = urllib2.ProxyHandler({"http" : proxy_url})
|
---|
253 | self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
|
---|
254 | self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
|
---|
255 |
|
---|
256 | def tearDown(self):
|
---|
257 | self.server.stop()
|
---|
258 | super(ProxyAuthTests, self).tearDown()
|
---|
259 |
|
---|
260 | def test_proxy_with_bad_password_raises_httperror(self):
|
---|
261 | self.proxy_digest_handler.add_password(self.REALM, self.URL,
|
---|
262 | self.USER, self.PASSWD+"bad")
|
---|
263 | self.digest_auth_handler.set_qop("auth")
|
---|
264 | self.assertRaises(urllib2.HTTPError,
|
---|
265 | self.opener.open,
|
---|
266 | self.URL)
|
---|
267 |
|
---|
268 | def test_proxy_with_no_password_raises_httperror(self):
|
---|
269 | self.digest_auth_handler.set_qop("auth")
|
---|
270 | self.assertRaises(urllib2.HTTPError,
|
---|
271 | self.opener.open,
|
---|
272 | self.URL)
|
---|
273 |
|
---|
274 | def test_proxy_qop_auth_works(self):
|
---|
275 | self.proxy_digest_handler.add_password(self.REALM, self.URL,
|
---|
276 | self.USER, self.PASSWD)
|
---|
277 | self.digest_auth_handler.set_qop("auth")
|
---|
278 | result = self.opener.open(self.URL)
|
---|
279 | while result.read():
|
---|
280 | pass
|
---|
281 | result.close()
|
---|
282 |
|
---|
283 | def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
|
---|
284 | self.proxy_digest_handler.add_password(self.REALM, self.URL,
|
---|
285 | self.USER, self.PASSWD)
|
---|
286 | self.digest_auth_handler.set_qop("auth-int")
|
---|
287 | try:
|
---|
288 | result = self.opener.open(self.URL)
|
---|
289 | except urllib2.URLError:
|
---|
290 | # It's okay if we don't support auth-int, but we certainly
|
---|
291 | # shouldn't receive any kind of exception here other than
|
---|
292 | # a URLError.
|
---|
293 | result = None
|
---|
294 | if result:
|
---|
295 | while result.read():
|
---|
296 | pass
|
---|
297 | result.close()
|
---|
298 |
|
---|
299 |
|
---|
300 | def GetRequestHandler(responses):
|
---|
301 |
|
---|
302 | class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
---|
303 |
|
---|
304 | server_version = "TestHTTP/"
|
---|
305 | requests = []
|
---|
306 | headers_received = []
|
---|
307 | port = 80
|
---|
308 |
|
---|
309 | def do_GET(self):
|
---|
310 | body = self.send_head()
|
---|
311 | if body:
|
---|
312 | self.wfile.write(body)
|
---|
313 |
|
---|
314 | def do_POST(self):
|
---|
315 | content_length = self.headers['Content-Length']
|
---|
316 | post_data = self.rfile.read(int(content_length))
|
---|
317 | self.do_GET()
|
---|
318 | self.requests.append(post_data)
|
---|
319 |
|
---|
320 | def send_head(self):
|
---|
321 | FakeHTTPRequestHandler.headers_received = self.headers
|
---|
322 | self.requests.append(self.path)
|
---|
323 | response_code, headers, body = responses.pop(0)
|
---|
324 |
|
---|
325 | self.send_response(response_code)
|
---|
326 |
|
---|
327 | for (header, value) in headers:
|
---|
328 | self.send_header(header, value % self.port)
|
---|
329 | if body:
|
---|
330 | self.send_header('Content-type', 'text/plain')
|
---|
331 | self.end_headers()
|
---|
332 | return body
|
---|
333 | self.end_headers()
|
---|
334 |
|
---|
335 | def log_message(self, *args):
|
---|
336 | pass
|
---|
337 |
|
---|
338 |
|
---|
339 | return FakeHTTPRequestHandler
|
---|
340 |
|
---|
341 |
|
---|
342 | class TestUrlopen(BaseTestCase):
|
---|
343 | """Tests urllib2.urlopen using the network.
|
---|
344 |
|
---|
345 | These tests are not exhaustive. Assuming that testing using files does a
|
---|
346 | good job overall of some of the basic interface features. There are no
|
---|
347 | tests exercising the optional 'data' and 'proxies' arguments. No tests
|
---|
348 | for transparent redirection have been written.
|
---|
349 | """
|
---|
350 |
|
---|
351 | def setUp(self):
|
---|
352 | proxy_handler = urllib2.ProxyHandler({})
|
---|
353 | opener = urllib2.build_opener(proxy_handler)
|
---|
354 | urllib2.install_opener(opener)
|
---|
355 | super(TestUrlopen, self).setUp()
|
---|
356 |
|
---|
357 | def start_server(self, responses):
|
---|
358 | handler = GetRequestHandler(responses)
|
---|
359 |
|
---|
360 | self.server = LoopbackHttpServerThread(handler)
|
---|
361 | self.server.start()
|
---|
362 | self.server.ready.wait()
|
---|
363 | port = self.server.port
|
---|
364 | handler.port = port
|
---|
365 | return handler
|
---|
366 |
|
---|
367 |
|
---|
368 | def test_redirection(self):
|
---|
369 | expected_response = 'We got here...'
|
---|
370 | responses = [
|
---|
371 | (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
|
---|
372 | (200, [], expected_response)
|
---|
373 | ]
|
---|
374 |
|
---|
375 | handler = self.start_server(responses)
|
---|
376 |
|
---|
377 | try:
|
---|
378 | f = urllib2.urlopen('http://localhost:%s/' % handler.port)
|
---|
379 | data = f.read()
|
---|
380 | f.close()
|
---|
381 |
|
---|
382 | self.assertEqual(data, expected_response)
|
---|
383 | self.assertEqual(handler.requests, ['/', '/somewhere_else'])
|
---|
384 | finally:
|
---|
385 | self.server.stop()
|
---|
386 |
|
---|
387 |
|
---|
388 | def test_404(self):
|
---|
389 | expected_response = 'Bad bad bad...'
|
---|
390 | handler = self.start_server([(404, [], expected_response)])
|
---|
391 |
|
---|
392 | try:
|
---|
393 | try:
|
---|
394 | urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
|
---|
395 | except urllib2.URLError, f:
|
---|
396 | pass
|
---|
397 | else:
|
---|
398 | self.fail('404 should raise URLError')
|
---|
399 |
|
---|
400 | data = f.read()
|
---|
401 | f.close()
|
---|
402 |
|
---|
403 | self.assertEqual(data, expected_response)
|
---|
404 | self.assertEqual(handler.requests, ['/weeble'])
|
---|
405 | finally:
|
---|
406 | self.server.stop()
|
---|
407 |
|
---|
408 |
|
---|
409 | def test_200(self):
|
---|
410 | expected_response = 'pycon 2008...'
|
---|
411 | handler = self.start_server([(200, [], expected_response)])
|
---|
412 |
|
---|
413 | try:
|
---|
414 | f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
|
---|
415 | data = f.read()
|
---|
416 | f.close()
|
---|
417 |
|
---|
418 | self.assertEqual(data, expected_response)
|
---|
419 | self.assertEqual(handler.requests, ['/bizarre'])
|
---|
420 | finally:
|
---|
421 | self.server.stop()
|
---|
422 |
|
---|
423 | def test_200_with_parameters(self):
|
---|
424 | expected_response = 'pycon 2008...'
|
---|
425 | handler = self.start_server([(200, [], expected_response)])
|
---|
426 |
|
---|
427 | try:
|
---|
428 | f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
|
---|
429 | data = f.read()
|
---|
430 | f.close()
|
---|
431 |
|
---|
432 | self.assertEqual(data, expected_response)
|
---|
433 | self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
|
---|
434 | finally:
|
---|
435 | self.server.stop()
|
---|
436 |
|
---|
437 |
|
---|
438 | def test_sending_headers(self):
|
---|
439 | handler = self.start_server([(200, [], "we don't care")])
|
---|
440 |
|
---|
441 | try:
|
---|
442 | req = urllib2.Request("http://localhost:%s/" % handler.port,
|
---|
443 | headers={'Range': 'bytes=20-39'})
|
---|
444 | urllib2.urlopen(req)
|
---|
445 | self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
|
---|
446 | finally:
|
---|
447 | self.server.stop()
|
---|
448 |
|
---|
449 | def test_basic(self):
|
---|
450 | handler = self.start_server([(200, [], "we don't care")])
|
---|
451 |
|
---|
452 | try:
|
---|
453 | open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
|
---|
454 | for attr in ("read", "close", "info", "geturl"):
|
---|
455 | self.assertTrue(hasattr(open_url, attr), "object returned from "
|
---|
456 | "urlopen lacks the %s attribute" % attr)
|
---|
457 | try:
|
---|
458 | self.assertTrue(open_url.read(), "calling 'read' failed")
|
---|
459 | finally:
|
---|
460 | open_url.close()
|
---|
461 | finally:
|
---|
462 | self.server.stop()
|
---|
463 |
|
---|
464 | def test_info(self):
|
---|
465 | handler = self.start_server([(200, [], "we don't care")])
|
---|
466 |
|
---|
467 | try:
|
---|
468 | open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
|
---|
469 | info_obj = open_url.info()
|
---|
470 | self.assertIsInstance(info_obj, mimetools.Message,
|
---|
471 | "object returned by 'info' is not an "
|
---|
472 | "instance of mimetools.Message")
|
---|
473 | self.assertEqual(info_obj.getsubtype(), "plain")
|
---|
474 | finally:
|
---|
475 | self.server.stop()
|
---|
476 |
|
---|
477 | def test_geturl(self):
|
---|
478 | # Make sure same URL as opened is returned by geturl.
|
---|
479 | handler = self.start_server([(200, [], "we don't care")])
|
---|
480 |
|
---|
481 | try:
|
---|
482 | open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
|
---|
483 | url = open_url.geturl()
|
---|
484 | self.assertEqual(url, "http://localhost:%s" % handler.port)
|
---|
485 | finally:
|
---|
486 | self.server.stop()
|
---|
487 |
|
---|
488 |
|
---|
489 | def test_bad_address(self):
|
---|
490 | # Make sure proper exception is raised when connecting to a bogus
|
---|
491 | # address.
|
---|
492 |
|
---|
493 | # as indicated by the comment below, this might fail with some ISP,
|
---|
494 | # so we run the test only when -unetwork/-uall is specified to
|
---|
495 | # mitigate the problem a bit (see #17564)
|
---|
496 | test_support.requires('network')
|
---|
497 | self.assertRaises(IOError,
|
---|
498 | # Given that both VeriSign and various ISPs have in
|
---|
499 | # the past or are presently hijacking various invalid
|
---|
500 | # domain name requests in an attempt to boost traffic
|
---|
501 | # to their own sites, finding a domain name to use
|
---|
502 | # for this test is difficult. RFC2606 leads one to
|
---|
503 | # believe that '.invalid' should work, but experience
|
---|
504 | # seemed to indicate otherwise. Single character
|
---|
505 | # TLDs are likely to remain invalid, so this seems to
|
---|
506 | # be the best choice. The trailing '.' prevents a
|
---|
507 | # related problem: The normal DNS resolver appends
|
---|
508 | # the domain names from the search path if there is
|
---|
509 | # no '.' the end and, and if one of those domains
|
---|
510 | # implements a '*' rule a result is returned.
|
---|
511 | # However, none of this will prevent the test from
|
---|
512 | # failing if the ISP hijacks all invalid domain
|
---|
513 | # requests. The real solution would be to be able to
|
---|
514 | # parameterize the framework with a mock resolver.
|
---|
515 | urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
|
---|
516 |
|
---|
517 | def test_iteration(self):
|
---|
518 | expected_response = "pycon 2008..."
|
---|
519 | handler = self.start_server([(200, [], expected_response)])
|
---|
520 | try:
|
---|
521 | data = urllib2.urlopen("http://localhost:%s" % handler.port)
|
---|
522 | for line in data:
|
---|
523 | self.assertEqual(line, expected_response)
|
---|
524 | finally:
|
---|
525 | self.server.stop()
|
---|
526 |
|
---|
527 | def ztest_line_iteration(self):
|
---|
528 | lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
|
---|
529 | expected_response = "".join(lines)
|
---|
530 | handler = self.start_server([(200, [], expected_response)])
|
---|
531 | try:
|
---|
532 | data = urllib2.urlopen("http://localhost:%s" % handler.port)
|
---|
533 | for index, line in enumerate(data):
|
---|
534 | self.assertEqual(line, lines[index],
|
---|
535 | "Fetched line number %s doesn't match expected:\n"
|
---|
536 | " Expected length was %s, got %s" %
|
---|
537 | (index, len(lines[index]), len(line)))
|
---|
538 | finally:
|
---|
539 | self.server.stop()
|
---|
540 | self.assertEqual(index + 1, len(lines))
|
---|
541 |
|
---|
542 | def test_main():
|
---|
543 | # We will NOT depend on the network resource flag
|
---|
544 | # (Lib/test/regrtest.py -u network) since all tests here are only
|
---|
545 | # localhost. However, if this is a bad rationale, then uncomment
|
---|
546 | # the next line.
|
---|
547 | #test_support.requires("network")
|
---|
548 |
|
---|
549 | test_support.run_unittest(ProxyAuthTests, TestUrlopen)
|
---|
550 |
|
---|
551 | if __name__ == "__main__":
|
---|
552 | test_main()
|
---|