1 | #!/usr/bin/env python
|
---|
2 |
|
---|
3 | import unittest
|
---|
4 | from test import test_support
|
---|
5 | from test.test_urllib2 import sanepathname2url
|
---|
6 |
|
---|
7 | import socket
|
---|
8 | import urllib2
|
---|
9 | import os
|
---|
10 | import sys
|
---|
11 |
|
---|
12 | TIMEOUT = 60 # seconds
|
---|
13 |
|
---|
14 |
|
---|
15 | def _retry_thrice(func, exc, *args, **kwargs):
|
---|
16 | for i in range(3):
|
---|
17 | try:
|
---|
18 | return func(*args, **kwargs)
|
---|
19 | except exc, last_exc:
|
---|
20 | continue
|
---|
21 | except:
|
---|
22 | raise
|
---|
23 | raise last_exc
|
---|
24 |
|
---|
25 | def _wrap_with_retry_thrice(func, exc):
|
---|
26 | def wrapped(*args, **kwargs):
|
---|
27 | return _retry_thrice(func, exc, *args, **kwargs)
|
---|
28 | return wrapped
|
---|
29 |
|
---|
30 | # Connecting to remote hosts is flaky. Make it more robust by retrying
|
---|
31 | # the connection several times.
|
---|
32 | _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
|
---|
33 |
|
---|
34 |
|
---|
35 | class AuthTests(unittest.TestCase):
|
---|
36 | """Tests urllib2 authentication features."""
|
---|
37 |
|
---|
38 | ## Disabled at the moment since there is no page under python.org which
|
---|
39 | ## could be used to HTTP authentication.
|
---|
40 | #
|
---|
41 | # def test_basic_auth(self):
|
---|
42 | # import httplib
|
---|
43 | #
|
---|
44 | # test_url = "http://www.python.org/test/test_urllib2/basic_auth"
|
---|
45 | # test_hostport = "www.python.org"
|
---|
46 | # test_realm = 'Test Realm'
|
---|
47 | # test_user = 'test.test_urllib2net'
|
---|
48 | # test_password = 'blah'
|
---|
49 | #
|
---|
50 | # # failure
|
---|
51 | # try:
|
---|
52 | # _urlopen_with_retry(test_url)
|
---|
53 | # except urllib2.HTTPError, exc:
|
---|
54 | # self.assertEqual(exc.code, 401)
|
---|
55 | # else:
|
---|
56 | # self.fail("urlopen() should have failed with 401")
|
---|
57 | #
|
---|
58 | # # success
|
---|
59 | # auth_handler = urllib2.HTTPBasicAuthHandler()
|
---|
60 | # auth_handler.add_password(test_realm, test_hostport,
|
---|
61 | # test_user, test_password)
|
---|
62 | # opener = urllib2.build_opener(auth_handler)
|
---|
63 | # f = opener.open('http://localhost/')
|
---|
64 | # response = _urlopen_with_retry("http://www.python.org/")
|
---|
65 | #
|
---|
66 | # # The 'userinfo' URL component is deprecated by RFC 3986 for security
|
---|
67 | # # reasons, let's not implement it! (it's already implemented for proxy
|
---|
68 | # # specification strings (that is, URLs or authorities specifying a
|
---|
69 | # # proxy), so we must keep that)
|
---|
70 | # self.assertRaises(httplib.InvalidURL,
|
---|
71 | # urllib2.urlopen, "http://evil:thing@example.com")
|
---|
72 |
|
---|
73 |
|
---|
74 | class CloseSocketTest(unittest.TestCase):
|
---|
75 |
|
---|
76 | def test_close(self):
|
---|
77 | import httplib
|
---|
78 |
|
---|
79 | # calling .close() on urllib2's response objects should close the
|
---|
80 | # underlying socket
|
---|
81 |
|
---|
82 | # delve deep into response to fetch socket._socketobject
|
---|
83 | response = _urlopen_with_retry("http://www.python.org/")
|
---|
84 | abused_fileobject = response.fp
|
---|
85 | self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
|
---|
86 | httpresponse = abused_fileobject._sock
|
---|
87 | self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
|
---|
88 | fileobject = httpresponse.fp
|
---|
89 | self.assertTrue(fileobject.__class__ is socket._fileobject)
|
---|
90 |
|
---|
91 | self.assertTrue(not fileobject.closed)
|
---|
92 | response.close()
|
---|
93 | self.assertTrue(fileobject.closed)
|
---|
94 |
|
---|
95 | class OtherNetworkTests(unittest.TestCase):
|
---|
96 | def setUp(self):
|
---|
97 | if 0: # for debugging
|
---|
98 | import logging
|
---|
99 | logger = logging.getLogger("test_urllib2net")
|
---|
100 | logger.addHandler(logging.StreamHandler())
|
---|
101 |
|
---|
102 | # XXX The rest of these tests aren't very good -- they don't check much.
|
---|
103 | # They do sometimes catch some major disasters, though.
|
---|
104 |
|
---|
105 | def test_ftp(self):
|
---|
106 | urls = [
|
---|
107 | 'ftp://ftp.kernel.org/pub/linux/kernel/README',
|
---|
108 | 'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
|
---|
109 | #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
|
---|
110 | 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
|
---|
111 | '/research-reports/00README-Legal-Rules-Regs',
|
---|
112 | ]
|
---|
113 | self._test_urls(urls, self._extra_handlers())
|
---|
114 |
|
---|
115 | def test_file(self):
|
---|
116 | TESTFN = test_support.TESTFN
|
---|
117 | f = open(TESTFN, 'w')
|
---|
118 | try:
|
---|
119 | f.write('hi there\n')
|
---|
120 | f.close()
|
---|
121 | urls = [
|
---|
122 | 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
|
---|
123 | ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
|
---|
124 | ]
|
---|
125 | self._test_urls(urls, self._extra_handlers(), retry=True)
|
---|
126 | finally:
|
---|
127 | os.remove(TESTFN)
|
---|
128 |
|
---|
129 | self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
|
---|
130 |
|
---|
131 | # XXX Following test depends on machine configurations that are internal
|
---|
132 | # to CNRI. Need to set up a public server with the right authentication
|
---|
133 | # configuration for test purposes.
|
---|
134 |
|
---|
135 | ## def test_cnri(self):
|
---|
136 | ## if socket.gethostname() == 'bitdiddle':
|
---|
137 | ## localhost = 'bitdiddle.cnri.reston.va.us'
|
---|
138 | ## elif socket.gethostname() == 'bitdiddle.concentric.net':
|
---|
139 | ## localhost = 'localhost'
|
---|
140 | ## else:
|
---|
141 | ## localhost = None
|
---|
142 | ## if localhost is not None:
|
---|
143 | ## urls = [
|
---|
144 | ## 'file://%s/etc/passwd' % localhost,
|
---|
145 | ## 'http://%s/simple/' % localhost,
|
---|
146 | ## 'http://%s/digest/' % localhost,
|
---|
147 | ## 'http://%s/not/found.h' % localhost,
|
---|
148 | ## ]
|
---|
149 |
|
---|
150 | ## bauth = HTTPBasicAuthHandler()
|
---|
151 | ## bauth.add_password('basic_test_realm', localhost, 'jhylton',
|
---|
152 | ## 'password')
|
---|
153 | ## dauth = HTTPDigestAuthHandler()
|
---|
154 | ## dauth.add_password('digest_test_realm', localhost, 'jhylton',
|
---|
155 | ## 'password')
|
---|
156 |
|
---|
157 | ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
|
---|
158 |
|
---|
159 | def test_urlwithfrag(self):
|
---|
160 | urlwith_frag = "http://docs.python.org/2/glossary.html#glossary"
|
---|
161 | with test_support.transient_internet(urlwith_frag):
|
---|
162 | req = urllib2.Request(urlwith_frag)
|
---|
163 | res = urllib2.urlopen(req)
|
---|
164 | self.assertEqual(res.geturl(),
|
---|
165 | "http://docs.python.org/2/glossary.html#glossary")
|
---|
166 |
|
---|
167 | def test_fileno(self):
|
---|
168 | req = urllib2.Request("http://www.python.org")
|
---|
169 | opener = urllib2.build_opener()
|
---|
170 | res = opener.open(req)
|
---|
171 | try:
|
---|
172 | res.fileno()
|
---|
173 | except AttributeError:
|
---|
174 | self.fail("HTTPResponse object should return a valid fileno")
|
---|
175 | finally:
|
---|
176 | res.close()
|
---|
177 |
|
---|
178 | def test_custom_headers(self):
|
---|
179 | url = "http://www.example.com"
|
---|
180 | with test_support.transient_internet(url):
|
---|
181 | opener = urllib2.build_opener()
|
---|
182 | request = urllib2.Request(url)
|
---|
183 | self.assertFalse(request.header_items())
|
---|
184 | opener.open(request)
|
---|
185 | self.assertTrue(request.header_items())
|
---|
186 | self.assertTrue(request.has_header('User-agent'))
|
---|
187 | request.add_header('User-Agent','Test-Agent')
|
---|
188 | opener.open(request)
|
---|
189 | self.assertEqual(request.get_header('User-agent'),'Test-Agent')
|
---|
190 |
|
---|
191 | def test_sites_no_connection_close(self):
|
---|
192 | # Some sites do not send Connection: close header.
|
---|
193 | # Verify that those work properly. (#issue12576)
|
---|
194 |
|
---|
195 | URL = 'http://www.imdb.com' # No Connection:close
|
---|
196 | with test_support.transient_internet(URL):
|
---|
197 | req = urllib2.urlopen(URL)
|
---|
198 | res = req.read()
|
---|
199 | self.assertTrue(res)
|
---|
200 |
|
---|
201 | def _test_urls(self, urls, handlers, retry=True):
|
---|
202 | import time
|
---|
203 | import logging
|
---|
204 | debug = logging.getLogger("test_urllib2").debug
|
---|
205 |
|
---|
206 | urlopen = urllib2.build_opener(*handlers).open
|
---|
207 | if retry:
|
---|
208 | urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
|
---|
209 |
|
---|
210 | for url in urls:
|
---|
211 | if isinstance(url, tuple):
|
---|
212 | url, req, expected_err = url
|
---|
213 | else:
|
---|
214 | req = expected_err = None
|
---|
215 | with test_support.transient_internet(url):
|
---|
216 | debug(url)
|
---|
217 | try:
|
---|
218 | f = urlopen(url, req, TIMEOUT)
|
---|
219 | except EnvironmentError as err:
|
---|
220 | debug(err)
|
---|
221 | if expected_err:
|
---|
222 | msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
|
---|
223 | (expected_err, url, req, type(err), err))
|
---|
224 | self.assertIsInstance(err, expected_err, msg)
|
---|
225 | except urllib2.URLError as err:
|
---|
226 | if isinstance(err[0], socket.timeout):
|
---|
227 | print >>sys.stderr, "<timeout: %s>" % url
|
---|
228 | continue
|
---|
229 | else:
|
---|
230 | raise
|
---|
231 | else:
|
---|
232 | try:
|
---|
233 | with test_support.transient_internet(url):
|
---|
234 | buf = f.read()
|
---|
235 | debug("read %d bytes" % len(buf))
|
---|
236 | except socket.timeout:
|
---|
237 | print >>sys.stderr, "<timeout: %s>" % url
|
---|
238 | f.close()
|
---|
239 | debug("******** next url coming up...")
|
---|
240 | time.sleep(0.1)
|
---|
241 |
|
---|
242 | def _extra_handlers(self):
|
---|
243 | handlers = []
|
---|
244 |
|
---|
245 | cfh = urllib2.CacheFTPHandler()
|
---|
246 | self.addCleanup(cfh.clear_cache)
|
---|
247 | cfh.setTimeout(1)
|
---|
248 | handlers.append(cfh)
|
---|
249 |
|
---|
250 | return handlers
|
---|
251 |
|
---|
252 |
|
---|
253 | class TimeoutTest(unittest.TestCase):
|
---|
254 | def test_http_basic(self):
|
---|
255 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
256 | url = "http://www.python.org"
|
---|
257 | with test_support.transient_internet(url, timeout=None):
|
---|
258 | u = _urlopen_with_retry(url)
|
---|
259 | self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
|
---|
260 |
|
---|
261 | def test_http_default_timeout(self):
|
---|
262 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
263 | url = "http://www.python.org"
|
---|
264 | with test_support.transient_internet(url):
|
---|
265 | socket.setdefaulttimeout(60)
|
---|
266 | try:
|
---|
267 | u = _urlopen_with_retry(url)
|
---|
268 | finally:
|
---|
269 | socket.setdefaulttimeout(None)
|
---|
270 | self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
|
---|
271 |
|
---|
272 | def test_http_no_timeout(self):
|
---|
273 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
274 | url = "http://www.python.org"
|
---|
275 | with test_support.transient_internet(url):
|
---|
276 | socket.setdefaulttimeout(60)
|
---|
277 | try:
|
---|
278 | u = _urlopen_with_retry(url, timeout=None)
|
---|
279 | finally:
|
---|
280 | socket.setdefaulttimeout(None)
|
---|
281 | self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
|
---|
282 |
|
---|
283 | def test_http_timeout(self):
|
---|
284 | url = "http://www.python.org"
|
---|
285 | with test_support.transient_internet(url):
|
---|
286 | u = _urlopen_with_retry(url, timeout=120)
|
---|
287 | self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
|
---|
288 |
|
---|
289 | FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
|
---|
290 |
|
---|
291 | def test_ftp_basic(self):
|
---|
292 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
293 | with test_support.transient_internet(self.FTP_HOST, timeout=None):
|
---|
294 | u = _urlopen_with_retry(self.FTP_HOST)
|
---|
295 | self.assertTrue(u.fp.fp._sock.gettimeout() is None)
|
---|
296 |
|
---|
297 | def test_ftp_default_timeout(self):
|
---|
298 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
299 | with test_support.transient_internet(self.FTP_HOST):
|
---|
300 | socket.setdefaulttimeout(60)
|
---|
301 | try:
|
---|
302 | u = _urlopen_with_retry(self.FTP_HOST)
|
---|
303 | finally:
|
---|
304 | socket.setdefaulttimeout(None)
|
---|
305 | self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
|
---|
306 |
|
---|
307 | def test_ftp_no_timeout(self):
|
---|
308 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
309 | with test_support.transient_internet(self.FTP_HOST):
|
---|
310 | socket.setdefaulttimeout(60)
|
---|
311 | try:
|
---|
312 | u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
|
---|
313 | finally:
|
---|
314 | socket.setdefaulttimeout(None)
|
---|
315 | self.assertTrue(u.fp.fp._sock.gettimeout() is None)
|
---|
316 |
|
---|
317 | def test_ftp_timeout(self):
|
---|
318 | with test_support.transient_internet(self.FTP_HOST):
|
---|
319 | u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
|
---|
320 | self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
|
---|
321 |
|
---|
322 |
|
---|
323 | def test_main():
|
---|
324 | test_support.requires("network")
|
---|
325 | test_support.run_unittest(AuthTests,
|
---|
326 | OtherNetworkTests,
|
---|
327 | CloseSocketTest,
|
---|
328 | TimeoutTest,
|
---|
329 | )
|
---|
330 |
|
---|
331 | if __name__ == "__main__":
|
---|
332 | test_main()
|
---|