1 | """Regresssion tests for urllib"""
|
---|
2 |
|
---|
3 | import urllib
|
---|
4 | import httplib
|
---|
5 | import unittest
|
---|
6 | import os
|
---|
7 | import sys
|
---|
8 | import mimetools
|
---|
9 | import tempfile
|
---|
10 | import StringIO
|
---|
11 |
|
---|
12 | from test import test_support
|
---|
13 | from base64 import b64encode
|
---|
14 |
|
---|
15 |
|
---|
16 | def hexescape(char):
|
---|
17 | """Escape char as RFC 2396 specifies"""
|
---|
18 | hex_repr = hex(ord(char))[2:].upper()
|
---|
19 | if len(hex_repr) == 1:
|
---|
20 | hex_repr = "0%s" % hex_repr
|
---|
21 | return "%" + hex_repr
|
---|
22 |
|
---|
23 |
|
---|
24 | class FakeHTTPMixin(object):
|
---|
25 | def fakehttp(self, fakedata):
|
---|
26 | class FakeSocket(StringIO.StringIO):
|
---|
27 |
|
---|
28 | def sendall(self, data):
|
---|
29 | FakeHTTPConnection.buf = data
|
---|
30 |
|
---|
31 | def makefile(self, *args, **kwds):
|
---|
32 | return self
|
---|
33 |
|
---|
34 | def read(self, amt=None):
|
---|
35 | if self.closed:
|
---|
36 | return ""
|
---|
37 | return StringIO.StringIO.read(self, amt)
|
---|
38 |
|
---|
39 | def readline(self, length=None):
|
---|
40 | if self.closed:
|
---|
41 | return ""
|
---|
42 | return StringIO.StringIO.readline(self, length)
|
---|
43 |
|
---|
44 | class FakeHTTPConnection(httplib.HTTPConnection):
|
---|
45 |
|
---|
46 | # buffer to store data for verification in urlopen tests.
|
---|
47 | buf = ""
|
---|
48 |
|
---|
49 | def connect(self):
|
---|
50 | self.sock = FakeSocket(fakedata)
|
---|
51 |
|
---|
52 | assert httplib.HTTP._connection_class == httplib.HTTPConnection
|
---|
53 |
|
---|
54 | httplib.HTTP._connection_class = FakeHTTPConnection
|
---|
55 |
|
---|
56 | def unfakehttp(self):
|
---|
57 | httplib.HTTP._connection_class = httplib.HTTPConnection
|
---|
58 |
|
---|
59 |
|
---|
60 | class urlopen_FileTests(unittest.TestCase):
|
---|
61 | """Test urlopen() opening a temporary file.
|
---|
62 |
|
---|
63 | Try to test as much functionality as possible so as to cut down on reliance
|
---|
64 | on connecting to the Net for testing.
|
---|
65 |
|
---|
66 | """
|
---|
67 |
|
---|
68 | def setUp(self):
|
---|
69 | """Setup of a temp file to use for testing"""
|
---|
70 | self.text = "test_urllib: %s\n" % self.__class__.__name__
|
---|
71 | FILE = file(test_support.TESTFN, 'wb')
|
---|
72 | try:
|
---|
73 | FILE.write(self.text)
|
---|
74 | finally:
|
---|
75 | FILE.close()
|
---|
76 | self.pathname = test_support.TESTFN
|
---|
77 | self.returned_obj = urllib.urlopen("file:%s" % self.pathname)
|
---|
78 |
|
---|
79 | def tearDown(self):
|
---|
80 | """Shut down the open object"""
|
---|
81 | self.returned_obj.close()
|
---|
82 | os.remove(test_support.TESTFN)
|
---|
83 |
|
---|
84 | def test_interface(self):
|
---|
85 | # Make sure object returned by urlopen() has the specified methods
|
---|
86 | for attr in ("read", "readline", "readlines", "fileno",
|
---|
87 | "close", "info", "geturl", "getcode", "__iter__"):
|
---|
88 | self.assertTrue(hasattr(self.returned_obj, attr),
|
---|
89 | "object returned by urlopen() lacks %s attribute" %
|
---|
90 | attr)
|
---|
91 |
|
---|
92 | def test_read(self):
|
---|
93 | self.assertEqual(self.text, self.returned_obj.read())
|
---|
94 |
|
---|
95 | def test_readline(self):
|
---|
96 | self.assertEqual(self.text, self.returned_obj.readline())
|
---|
97 | self.assertEqual('', self.returned_obj.readline(),
|
---|
98 | "calling readline() after exhausting the file did not"
|
---|
99 | " return an empty string")
|
---|
100 |
|
---|
101 | def test_readlines(self):
|
---|
102 | lines_list = self.returned_obj.readlines()
|
---|
103 | self.assertEqual(len(lines_list), 1,
|
---|
104 | "readlines() returned the wrong number of lines")
|
---|
105 | self.assertEqual(lines_list[0], self.text,
|
---|
106 | "readlines() returned improper text")
|
---|
107 |
|
---|
108 | def test_fileno(self):
|
---|
109 | file_num = self.returned_obj.fileno()
|
---|
110 | self.assertIsInstance(file_num, int, "fileno() did not return an int")
|
---|
111 | self.assertEqual(os.read(file_num, len(self.text)), self.text,
|
---|
112 | "Reading on the file descriptor returned by fileno() "
|
---|
113 | "did not return the expected text")
|
---|
114 |
|
---|
115 | def test_close(self):
|
---|
116 | # Test close() by calling it hear and then having it be called again
|
---|
117 | # by the tearDown() method for the test
|
---|
118 | self.returned_obj.close()
|
---|
119 |
|
---|
120 | def test_info(self):
|
---|
121 | self.assertIsInstance(self.returned_obj.info(), mimetools.Message)
|
---|
122 |
|
---|
123 | def test_geturl(self):
|
---|
124 | self.assertEqual(self.returned_obj.geturl(), self.pathname)
|
---|
125 |
|
---|
126 | def test_getcode(self):
|
---|
127 | self.assertEqual(self.returned_obj.getcode(), None)
|
---|
128 |
|
---|
129 | def test_iter(self):
|
---|
130 | # Test iterator
|
---|
131 | # Don't need to count number of iterations since test would fail the
|
---|
132 | # instant it returned anything beyond the first line from the
|
---|
133 | # comparison
|
---|
134 | for line in self.returned_obj.__iter__():
|
---|
135 | self.assertEqual(line, self.text)
|
---|
136 |
|
---|
137 | def test_relativelocalfile(self):
|
---|
138 | self.assertRaises(ValueError,urllib.urlopen,'./' + self.pathname)
|
---|
139 |
|
---|
140 | class ProxyTests(unittest.TestCase):
|
---|
141 |
|
---|
142 | def setUp(self):
|
---|
143 | # Records changes to env vars
|
---|
144 | self.env = test_support.EnvironmentVarGuard()
|
---|
145 | # Delete all proxy related env vars
|
---|
146 | for k in os.environ.keys():
|
---|
147 | if 'proxy' in k.lower():
|
---|
148 | self.env.unset(k)
|
---|
149 |
|
---|
150 | def tearDown(self):
|
---|
151 | # Restore all proxy related env vars
|
---|
152 | self.env.__exit__()
|
---|
153 | del self.env
|
---|
154 |
|
---|
155 | def test_getproxies_environment_keep_no_proxies(self):
|
---|
156 | self.env.set('NO_PROXY', 'localhost')
|
---|
157 | proxies = urllib.getproxies_environment()
|
---|
158 | # getproxies_environment use lowered case truncated (no '_proxy') keys
|
---|
159 | self.assertEqual('localhost', proxies['no'])
|
---|
160 | # List of no_proxies with space.
|
---|
161 | self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com')
|
---|
162 | self.assertTrue(urllib.proxy_bypass_environment('anotherdomain.com'))
|
---|
163 |
|
---|
164 |
|
---|
165 | class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
|
---|
166 | """Test urlopen() opening a fake http connection."""
|
---|
167 |
|
---|
168 | def test_read(self):
|
---|
169 | self.fakehttp('Hello!')
|
---|
170 | try:
|
---|
171 | fp = urllib.urlopen("http://python.org/")
|
---|
172 | self.assertEqual(fp.readline(), 'Hello!')
|
---|
173 | self.assertEqual(fp.readline(), '')
|
---|
174 | self.assertEqual(fp.geturl(), 'http://python.org/')
|
---|
175 | self.assertEqual(fp.getcode(), 200)
|
---|
176 | finally:
|
---|
177 | self.unfakehttp()
|
---|
178 |
|
---|
179 | def test_url_fragment(self):
|
---|
180 | # Issue #11703: geturl() omits fragments in the original URL.
|
---|
181 | url = 'http://docs.python.org/library/urllib.html#OK'
|
---|
182 | self.fakehttp('Hello!')
|
---|
183 | try:
|
---|
184 | fp = urllib.urlopen(url)
|
---|
185 | self.assertEqual(fp.geturl(), url)
|
---|
186 | finally:
|
---|
187 | self.unfakehttp()
|
---|
188 |
|
---|
189 | def test_read_bogus(self):
|
---|
190 | # urlopen() should raise IOError for many error codes.
|
---|
191 | self.fakehttp('''HTTP/1.1 401 Authentication Required
|
---|
192 | Date: Wed, 02 Jan 2008 03:03:54 GMT
|
---|
193 | Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
|
---|
194 | Connection: close
|
---|
195 | Content-Type: text/html; charset=iso-8859-1
|
---|
196 | ''')
|
---|
197 | try:
|
---|
198 | self.assertRaises(IOError, urllib.urlopen, "http://python.org/")
|
---|
199 | finally:
|
---|
200 | self.unfakehttp()
|
---|
201 |
|
---|
202 | def test_invalid_redirect(self):
|
---|
203 | # urlopen() should raise IOError for many error codes.
|
---|
204 | self.fakehttp("""HTTP/1.1 302 Found
|
---|
205 | Date: Wed, 02 Jan 2008 03:03:54 GMT
|
---|
206 | Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
|
---|
207 | Location: file:README
|
---|
208 | Connection: close
|
---|
209 | Content-Type: text/html; charset=iso-8859-1
|
---|
210 | """)
|
---|
211 | try:
|
---|
212 | self.assertRaises(IOError, urllib.urlopen, "http://python.org/")
|
---|
213 | finally:
|
---|
214 | self.unfakehttp()
|
---|
215 |
|
---|
216 | def test_empty_socket(self):
|
---|
217 | # urlopen() raises IOError if the underlying socket does not send any
|
---|
218 | # data. (#1680230)
|
---|
219 | self.fakehttp('')
|
---|
220 | try:
|
---|
221 | self.assertRaises(IOError, urllib.urlopen, 'http://something')
|
---|
222 | finally:
|
---|
223 | self.unfakehttp()
|
---|
224 |
|
---|
225 | def test_missing_localfile(self):
|
---|
226 | self.assertRaises(IOError, urllib.urlopen,
|
---|
227 | 'file://localhost/a/missing/file.py')
|
---|
228 | fd, tmp_file = tempfile.mkstemp()
|
---|
229 | tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
|
---|
230 | self.assertTrue(os.path.exists(tmp_file))
|
---|
231 | try:
|
---|
232 | fp = urllib.urlopen(tmp_fileurl)
|
---|
233 | fp.close()
|
---|
234 | finally:
|
---|
235 | os.close(fd)
|
---|
236 | os.unlink(tmp_file)
|
---|
237 |
|
---|
238 | self.assertFalse(os.path.exists(tmp_file))
|
---|
239 | self.assertRaises(IOError, urllib.urlopen, tmp_fileurl)
|
---|
240 |
|
---|
241 | def test_ftp_nonexisting(self):
|
---|
242 | self.assertRaises(IOError, urllib.urlopen,
|
---|
243 | 'ftp://localhost/not/existing/file.py')
|
---|
244 |
|
---|
245 |
|
---|
246 | def test_userpass_inurl(self):
|
---|
247 | self.fakehttp('Hello!')
|
---|
248 | try:
|
---|
249 | fakehttp_wrapper = httplib.HTTP._connection_class
|
---|
250 | fp = urllib.urlopen("http://user:pass@python.org/")
|
---|
251 | authorization = ("Authorization: Basic %s\r\n" %
|
---|
252 | b64encode('user:pass'))
|
---|
253 | # The authorization header must be in place
|
---|
254 | self.assertIn(authorization, fakehttp_wrapper.buf)
|
---|
255 | self.assertEqual(fp.readline(), "Hello!")
|
---|
256 | self.assertEqual(fp.readline(), "")
|
---|
257 | self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
|
---|
258 | self.assertEqual(fp.getcode(), 200)
|
---|
259 | finally:
|
---|
260 | self.unfakehttp()
|
---|
261 |
|
---|
262 | def test_userpass_with_spaces_inurl(self):
|
---|
263 | self.fakehttp('Hello!')
|
---|
264 | try:
|
---|
265 | url = "http://a b:c d@python.org/"
|
---|
266 | fakehttp_wrapper = httplib.HTTP._connection_class
|
---|
267 | authorization = ("Authorization: Basic %s\r\n" %
|
---|
268 | b64encode('a b:c d'))
|
---|
269 | fp = urllib.urlopen(url)
|
---|
270 | # The authorization header must be in place
|
---|
271 | self.assertIn(authorization, fakehttp_wrapper.buf)
|
---|
272 | self.assertEqual(fp.readline(), "Hello!")
|
---|
273 | self.assertEqual(fp.readline(), "")
|
---|
274 | # the spaces are quoted in URL so no match
|
---|
275 | self.assertNotEqual(fp.geturl(), url)
|
---|
276 | self.assertEqual(fp.getcode(), 200)
|
---|
277 | finally:
|
---|
278 | self.unfakehttp()
|
---|
279 |
|
---|
280 |
|
---|
281 | class urlretrieve_FileTests(unittest.TestCase):
|
---|
282 | """Test urllib.urlretrieve() on local files"""
|
---|
283 |
|
---|
284 | def setUp(self):
|
---|
285 | # Create a list of temporary files. Each item in the list is a file
|
---|
286 | # name (absolute path or relative to the current working directory).
|
---|
287 | # All files in this list will be deleted in the tearDown method. Note,
|
---|
288 | # this only helps to makes sure temporary files get deleted, but it
|
---|
289 | # does nothing about trying to close files that may still be open. It
|
---|
290 | # is the responsibility of the developer to properly close files even
|
---|
291 | # when exceptional conditions occur.
|
---|
292 | self.tempFiles = []
|
---|
293 |
|
---|
294 | # Create a temporary file.
|
---|
295 | self.registerFileForCleanUp(test_support.TESTFN)
|
---|
296 | self.text = 'testing urllib.urlretrieve'
|
---|
297 | try:
|
---|
298 | FILE = file(test_support.TESTFN, 'wb')
|
---|
299 | FILE.write(self.text)
|
---|
300 | FILE.close()
|
---|
301 | finally:
|
---|
302 | try: FILE.close()
|
---|
303 | except: pass
|
---|
304 |
|
---|
305 | def tearDown(self):
|
---|
306 | # Delete the temporary files.
|
---|
307 | for each in self.tempFiles:
|
---|
308 | try: os.remove(each)
|
---|
309 | except: pass
|
---|
310 |
|
---|
311 | def constructLocalFileUrl(self, filePath):
|
---|
312 | return "file://%s" % urllib.pathname2url(os.path.abspath(filePath))
|
---|
313 |
|
---|
314 | def createNewTempFile(self, data=""):
|
---|
315 | """Creates a new temporary file containing the specified data,
|
---|
316 | registers the file for deletion during the test fixture tear down, and
|
---|
317 | returns the absolute path of the file."""
|
---|
318 |
|
---|
319 | newFd, newFilePath = tempfile.mkstemp()
|
---|
320 | try:
|
---|
321 | self.registerFileForCleanUp(newFilePath)
|
---|
322 | newFile = os.fdopen(newFd, "wb")
|
---|
323 | newFile.write(data)
|
---|
324 | newFile.close()
|
---|
325 | finally:
|
---|
326 | try: newFile.close()
|
---|
327 | except: pass
|
---|
328 | return newFilePath
|
---|
329 |
|
---|
330 | def registerFileForCleanUp(self, fileName):
|
---|
331 | self.tempFiles.append(fileName)
|
---|
332 |
|
---|
333 | def test_basic(self):
|
---|
334 | # Make sure that a local file just gets its own location returned and
|
---|
335 | # a headers value is returned.
|
---|
336 | result = urllib.urlretrieve("file:%s" % test_support.TESTFN)
|
---|
337 | self.assertEqual(result[0], test_support.TESTFN)
|
---|
338 | self.assertIsInstance(result[1], mimetools.Message,
|
---|
339 | "did not get a mimetools.Message instance as "
|
---|
340 | "second returned value")
|
---|
341 |
|
---|
342 | def test_copy(self):
|
---|
343 | # Test that setting the filename argument works.
|
---|
344 | second_temp = "%s.2" % test_support.TESTFN
|
---|
345 | self.registerFileForCleanUp(second_temp)
|
---|
346 | result = urllib.urlretrieve(self.constructLocalFileUrl(
|
---|
347 | test_support.TESTFN), second_temp)
|
---|
348 | self.assertEqual(second_temp, result[0])
|
---|
349 | self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
|
---|
350 | "made")
|
---|
351 | FILE = file(second_temp, 'rb')
|
---|
352 | try:
|
---|
353 | text = FILE.read()
|
---|
354 | FILE.close()
|
---|
355 | finally:
|
---|
356 | try: FILE.close()
|
---|
357 | except: pass
|
---|
358 | self.assertEqual(self.text, text)
|
---|
359 |
|
---|
360 | def test_reporthook(self):
|
---|
361 | # Make sure that the reporthook works.
|
---|
362 | def hooktester(count, block_size, total_size, count_holder=[0]):
|
---|
363 | self.assertIsInstance(count, int)
|
---|
364 | self.assertIsInstance(block_size, int)
|
---|
365 | self.assertIsInstance(total_size, int)
|
---|
366 | self.assertEqual(count, count_holder[0])
|
---|
367 | count_holder[0] = count_holder[0] + 1
|
---|
368 | second_temp = "%s.2" % test_support.TESTFN
|
---|
369 | self.registerFileForCleanUp(second_temp)
|
---|
370 | urllib.urlretrieve(self.constructLocalFileUrl(test_support.TESTFN),
|
---|
371 | second_temp, hooktester)
|
---|
372 |
|
---|
373 | def test_reporthook_0_bytes(self):
|
---|
374 | # Test on zero length file. Should call reporthook only 1 time.
|
---|
375 | report = []
|
---|
376 | def hooktester(count, block_size, total_size, _report=report):
|
---|
377 | _report.append((count, block_size, total_size))
|
---|
378 | srcFileName = self.createNewTempFile()
|
---|
379 | urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
|
---|
380 | test_support.TESTFN, hooktester)
|
---|
381 | self.assertEqual(len(report), 1)
|
---|
382 | self.assertEqual(report[0][2], 0)
|
---|
383 |
|
---|
384 | def test_reporthook_5_bytes(self):
|
---|
385 | # Test on 5 byte file. Should call reporthook only 2 times (once when
|
---|
386 | # the "network connection" is established and once when the block is
|
---|
387 | # read). Since the block size is 8192 bytes, only one block read is
|
---|
388 | # required to read the entire file.
|
---|
389 | report = []
|
---|
390 | def hooktester(count, block_size, total_size, _report=report):
|
---|
391 | _report.append((count, block_size, total_size))
|
---|
392 | srcFileName = self.createNewTempFile("x" * 5)
|
---|
393 | urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
|
---|
394 | test_support.TESTFN, hooktester)
|
---|
395 | self.assertEqual(len(report), 2)
|
---|
396 | self.assertEqual(report[0][1], 8192)
|
---|
397 | self.assertEqual(report[0][2], 5)
|
---|
398 |
|
---|
399 | def test_reporthook_8193_bytes(self):
|
---|
400 | # Test on 8193 byte file. Should call reporthook only 3 times (once
|
---|
401 | # when the "network connection" is established, once for the next 8192
|
---|
402 | # bytes, and once for the last byte).
|
---|
403 | report = []
|
---|
404 | def hooktester(count, block_size, total_size, _report=report):
|
---|
405 | _report.append((count, block_size, total_size))
|
---|
406 | srcFileName = self.createNewTempFile("x" * 8193)
|
---|
407 | urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
|
---|
408 | test_support.TESTFN, hooktester)
|
---|
409 | self.assertEqual(len(report), 3)
|
---|
410 | self.assertEqual(report[0][1], 8192)
|
---|
411 | self.assertEqual(report[0][2], 8193)
|
---|
412 |
|
---|
413 |
|
---|
414 | class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin):
|
---|
415 | """Test urllib.urlretrieve() using fake http connections"""
|
---|
416 |
|
---|
417 | def test_short_content_raises_ContentTooShortError(self):
|
---|
418 | self.fakehttp('''HTTP/1.1 200 OK
|
---|
419 | Date: Wed, 02 Jan 2008 03:03:54 GMT
|
---|
420 | Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
|
---|
421 | Connection: close
|
---|
422 | Content-Length: 100
|
---|
423 | Content-Type: text/html; charset=iso-8859-1
|
---|
424 |
|
---|
425 | FF
|
---|
426 | ''')
|
---|
427 |
|
---|
428 | def _reporthook(par1, par2, par3):
|
---|
429 | pass
|
---|
430 |
|
---|
431 | try:
|
---|
432 | self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve,
|
---|
433 | 'http://example.com', reporthook=_reporthook)
|
---|
434 | finally:
|
---|
435 | self.unfakehttp()
|
---|
436 |
|
---|
437 | def test_short_content_raises_ContentTooShortError_without_reporthook(self):
|
---|
438 | self.fakehttp('''HTTP/1.1 200 OK
|
---|
439 | Date: Wed, 02 Jan 2008 03:03:54 GMT
|
---|
440 | Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
|
---|
441 | Connection: close
|
---|
442 | Content-Length: 100
|
---|
443 | Content-Type: text/html; charset=iso-8859-1
|
---|
444 |
|
---|
445 | FF
|
---|
446 | ''')
|
---|
447 | try:
|
---|
448 | self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve, 'http://example.com/')
|
---|
449 | finally:
|
---|
450 | self.unfakehttp()
|
---|
451 |
|
---|
452 | class QuotingTests(unittest.TestCase):
|
---|
453 | """Tests for urllib.quote() and urllib.quote_plus()
|
---|
454 |
|
---|
455 | According to RFC 2396 ("Uniform Resource Identifiers), to escape a
|
---|
456 | character you write it as '%' + <2 character US-ASCII hex value>. The Python
|
---|
457 | code of ``'%' + hex(ord(<character>))[2:]`` escapes a character properly.
|
---|
458 | Case does not matter on the hex letters.
|
---|
459 |
|
---|
460 | The various character sets specified are:
|
---|
461 |
|
---|
462 | Reserved characters : ";/?:@&=+$,"
|
---|
463 | Have special meaning in URIs and must be escaped if not being used for
|
---|
464 | their special meaning
|
---|
465 | Data characters : letters, digits, and "-_.!~*'()"
|
---|
466 | Unreserved and do not need to be escaped; can be, though, if desired
|
---|
467 | Control characters : 0x00 - 0x1F, 0x7F
|
---|
468 | Have no use in URIs so must be escaped
|
---|
469 | space : 0x20
|
---|
470 | Must be escaped
|
---|
471 | Delimiters : '<>#%"'
|
---|
472 | Must be escaped
|
---|
473 | Unwise : "{}|\^[]`"
|
---|
474 | Must be escaped
|
---|
475 |
|
---|
476 | """
|
---|
477 |
|
---|
478 | def test_never_quote(self):
|
---|
479 | # Make sure quote() does not quote letters, digits, and "_,.-"
|
---|
480 | do_not_quote = '' .join(["ABCDEFGHIJKLMNOPQRSTUVWXYZ",
|
---|
481 | "abcdefghijklmnopqrstuvwxyz",
|
---|
482 | "0123456789",
|
---|
483 | "_.-"])
|
---|
484 | result = urllib.quote(do_not_quote)
|
---|
485 | self.assertEqual(do_not_quote, result,
|
---|
486 | "using quote(): %s != %s" % (do_not_quote, result))
|
---|
487 | result = urllib.quote_plus(do_not_quote)
|
---|
488 | self.assertEqual(do_not_quote, result,
|
---|
489 | "using quote_plus(): %s != %s" % (do_not_quote, result))
|
---|
490 |
|
---|
491 | def test_default_safe(self):
|
---|
492 | # Test '/' is default value for 'safe' parameter
|
---|
493 | self.assertEqual(urllib.quote.func_defaults[0], '/')
|
---|
494 |
|
---|
495 | def test_safe(self):
|
---|
496 | # Test setting 'safe' parameter does what it should do
|
---|
497 | quote_by_default = "<>"
|
---|
498 | result = urllib.quote(quote_by_default, safe=quote_by_default)
|
---|
499 | self.assertEqual(quote_by_default, result,
|
---|
500 | "using quote(): %s != %s" % (quote_by_default, result))
|
---|
501 | result = urllib.quote_plus(quote_by_default, safe=quote_by_default)
|
---|
502 | self.assertEqual(quote_by_default, result,
|
---|
503 | "using quote_plus(): %s != %s" %
|
---|
504 | (quote_by_default, result))
|
---|
505 |
|
---|
506 | def test_default_quoting(self):
|
---|
507 | # Make sure all characters that should be quoted are by default sans
|
---|
508 | # space (separate test for that).
|
---|
509 | should_quote = [chr(num) for num in range(32)] # For 0x00 - 0x1F
|
---|
510 | should_quote.append('<>#%"{}|\^[]`')
|
---|
511 | should_quote.append(chr(127)) # For 0x7F
|
---|
512 | should_quote = ''.join(should_quote)
|
---|
513 | for char in should_quote:
|
---|
514 | result = urllib.quote(char)
|
---|
515 | self.assertEqual(hexescape(char), result,
|
---|
516 | "using quote(): %s should be escaped to %s, not %s" %
|
---|
517 | (char, hexescape(char), result))
|
---|
518 | result = urllib.quote_plus(char)
|
---|
519 | self.assertEqual(hexescape(char), result,
|
---|
520 | "using quote_plus(): "
|
---|
521 | "%s should be escapes to %s, not %s" %
|
---|
522 | (char, hexescape(char), result))
|
---|
523 | del should_quote
|
---|
524 | partial_quote = "ab[]cd"
|
---|
525 | expected = "ab%5B%5Dcd"
|
---|
526 | result = urllib.quote(partial_quote)
|
---|
527 | self.assertEqual(expected, result,
|
---|
528 | "using quote(): %s != %s" % (expected, result))
|
---|
529 | result = urllib.quote_plus(partial_quote)
|
---|
530 | self.assertEqual(expected, result,
|
---|
531 | "using quote_plus(): %s != %s" % (expected, result))
|
---|
532 | self.assertRaises(TypeError, urllib.quote, None)
|
---|
533 |
|
---|
534 | def test_quoting_space(self):
|
---|
535 | # Make sure quote() and quote_plus() handle spaces as specified in
|
---|
536 | # their unique way
|
---|
537 | result = urllib.quote(' ')
|
---|
538 | self.assertEqual(result, hexescape(' '),
|
---|
539 | "using quote(): %s != %s" % (result, hexescape(' ')))
|
---|
540 | result = urllib.quote_plus(' ')
|
---|
541 | self.assertEqual(result, '+',
|
---|
542 | "using quote_plus(): %s != +" % result)
|
---|
543 | given = "a b cd e f"
|
---|
544 | expect = given.replace(' ', hexescape(' '))
|
---|
545 | result = urllib.quote(given)
|
---|
546 | self.assertEqual(expect, result,
|
---|
547 | "using quote(): %s != %s" % (expect, result))
|
---|
548 | expect = given.replace(' ', '+')
|
---|
549 | result = urllib.quote_plus(given)
|
---|
550 | self.assertEqual(expect, result,
|
---|
551 | "using quote_plus(): %s != %s" % (expect, result))
|
---|
552 |
|
---|
553 | def test_quoting_plus(self):
|
---|
554 | self.assertEqual(urllib.quote_plus('alpha+beta gamma'),
|
---|
555 | 'alpha%2Bbeta+gamma')
|
---|
556 | self.assertEqual(urllib.quote_plus('alpha+beta gamma', '+'),
|
---|
557 | 'alpha+beta+gamma')
|
---|
558 |
|
---|
559 | class UnquotingTests(unittest.TestCase):
|
---|
560 | """Tests for unquote() and unquote_plus()
|
---|
561 |
|
---|
562 | See the doc string for quoting_Tests for details on quoting and such.
|
---|
563 |
|
---|
564 | """
|
---|
565 |
|
---|
566 | def test_unquoting(self):
|
---|
567 | # Make sure unquoting of all ASCII values works
|
---|
568 | escape_list = []
|
---|
569 | for num in range(128):
|
---|
570 | given = hexescape(chr(num))
|
---|
571 | expect = chr(num)
|
---|
572 | result = urllib.unquote(given)
|
---|
573 | self.assertEqual(expect, result,
|
---|
574 | "using unquote(): %s != %s" % (expect, result))
|
---|
575 | result = urllib.unquote_plus(given)
|
---|
576 | self.assertEqual(expect, result,
|
---|
577 | "using unquote_plus(): %s != %s" %
|
---|
578 | (expect, result))
|
---|
579 | escape_list.append(given)
|
---|
580 | escape_string = ''.join(escape_list)
|
---|
581 | del escape_list
|
---|
582 | result = urllib.unquote(escape_string)
|
---|
583 | self.assertEqual(result.count('%'), 1,
|
---|
584 | "using quote(): not all characters escaped; %s" %
|
---|
585 | result)
|
---|
586 | result = urllib.unquote(escape_string)
|
---|
587 | self.assertEqual(result.count('%'), 1,
|
---|
588 | "using unquote(): not all characters escaped: "
|
---|
589 | "%s" % result)
|
---|
590 |
|
---|
591 | def test_unquoting_badpercent(self):
|
---|
592 | # Test unquoting on bad percent-escapes
|
---|
593 | given = '%xab'
|
---|
594 | expect = given
|
---|
595 | result = urllib.unquote(given)
|
---|
596 | self.assertEqual(expect, result, "using unquote(): %r != %r"
|
---|
597 | % (expect, result))
|
---|
598 | given = '%x'
|
---|
599 | expect = given
|
---|
600 | result = urllib.unquote(given)
|
---|
601 | self.assertEqual(expect, result, "using unquote(): %r != %r"
|
---|
602 | % (expect, result))
|
---|
603 | given = '%'
|
---|
604 | expect = given
|
---|
605 | result = urllib.unquote(given)
|
---|
606 | self.assertEqual(expect, result, "using unquote(): %r != %r"
|
---|
607 | % (expect, result))
|
---|
608 |
|
---|
609 | def test_unquoting_mixed_case(self):
|
---|
610 | # Test unquoting on mixed-case hex digits in the percent-escapes
|
---|
611 | given = '%Ab%eA'
|
---|
612 | expect = '\xab\xea'
|
---|
613 | result = urllib.unquote(given)
|
---|
614 | self.assertEqual(expect, result, "using unquote(): %r != %r"
|
---|
615 | % (expect, result))
|
---|
616 |
|
---|
617 | def test_unquoting_parts(self):
|
---|
618 | # Make sure unquoting works when have non-quoted characters
|
---|
619 | # interspersed
|
---|
620 | given = 'ab%sd' % hexescape('c')
|
---|
621 | expect = "abcd"
|
---|
622 | result = urllib.unquote(given)
|
---|
623 | self.assertEqual(expect, result,
|
---|
624 | "using quote(): %s != %s" % (expect, result))
|
---|
625 | result = urllib.unquote_plus(given)
|
---|
626 | self.assertEqual(expect, result,
|
---|
627 | "using unquote_plus(): %s != %s" % (expect, result))
|
---|
628 |
|
---|
629 | def test_unquoting_plus(self):
|
---|
630 | # Test difference between unquote() and unquote_plus()
|
---|
631 | given = "are+there+spaces..."
|
---|
632 | expect = given
|
---|
633 | result = urllib.unquote(given)
|
---|
634 | self.assertEqual(expect, result,
|
---|
635 | "using unquote(): %s != %s" % (expect, result))
|
---|
636 | expect = given.replace('+', ' ')
|
---|
637 | result = urllib.unquote_plus(given)
|
---|
638 | self.assertEqual(expect, result,
|
---|
639 | "using unquote_plus(): %s != %s" % (expect, result))
|
---|
640 |
|
---|
641 | def test_unquote_with_unicode(self):
|
---|
642 | r = urllib.unquote(u'br%C3%BCckner_sapporo_20050930.doc')
|
---|
643 | self.assertEqual(r, u'br\xc3\xbcckner_sapporo_20050930.doc')
|
---|
644 |
|
---|
645 | class urlencode_Tests(unittest.TestCase):
|
---|
646 | """Tests for urlencode()"""
|
---|
647 |
|
---|
648 | def help_inputtype(self, given, test_type):
|
---|
649 | """Helper method for testing different input types.
|
---|
650 |
|
---|
651 | 'given' must lead to only the pairs:
|
---|
652 | * 1st, 1
|
---|
653 | * 2nd, 2
|
---|
654 | * 3rd, 3
|
---|
655 |
|
---|
656 | Test cannot assume anything about order. Docs make no guarantee and
|
---|
657 | have possible dictionary input.
|
---|
658 |
|
---|
659 | """
|
---|
660 | expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
|
---|
661 | result = urllib.urlencode(given)
|
---|
662 | for expected in expect_somewhere:
|
---|
663 | self.assertIn(expected, result,
|
---|
664 | "testing %s: %s not found in %s" %
|
---|
665 | (test_type, expected, result))
|
---|
666 | self.assertEqual(result.count('&'), 2,
|
---|
667 | "testing %s: expected 2 '&'s; got %s" %
|
---|
668 | (test_type, result.count('&')))
|
---|
669 | amp_location = result.index('&')
|
---|
670 | on_amp_left = result[amp_location - 1]
|
---|
671 | on_amp_right = result[amp_location + 1]
|
---|
672 | self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
|
---|
673 | "testing %s: '&' not located in proper place in %s" %
|
---|
674 | (test_type, result))
|
---|
675 | self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
|
---|
676 | "testing %s: "
|
---|
677 | "unexpected number of characters: %s != %s" %
|
---|
678 | (test_type, len(result), (5 * 3) + 2))
|
---|
679 |
|
---|
680 | def test_using_mapping(self):
|
---|
681 | # Test passing in a mapping object as an argument.
|
---|
682 | self.help_inputtype({"1st":'1', "2nd":'2', "3rd":'3'},
|
---|
683 | "using dict as input type")
|
---|
684 |
|
---|
685 | def test_using_sequence(self):
|
---|
686 | # Test passing in a sequence of two-item sequences as an argument.
|
---|
687 | self.help_inputtype([('1st', '1'), ('2nd', '2'), ('3rd', '3')],
|
---|
688 | "using sequence of two-item tuples as input")
|
---|
689 |
|
---|
690 | def test_quoting(self):
|
---|
691 | # Make sure keys and values are quoted using quote_plus()
|
---|
692 | given = {"&":"="}
|
---|
693 | expect = "%s=%s" % (hexescape('&'), hexescape('='))
|
---|
694 | result = urllib.urlencode(given)
|
---|
695 | self.assertEqual(expect, result)
|
---|
696 | given = {"key name":"A bunch of pluses"}
|
---|
697 | expect = "key+name=A+bunch+of+pluses"
|
---|
698 | result = urllib.urlencode(given)
|
---|
699 | self.assertEqual(expect, result)
|
---|
700 |
|
---|
701 | def test_doseq(self):
|
---|
702 | # Test that passing True for 'doseq' parameter works correctly
|
---|
703 | given = {'sequence':['1', '2', '3']}
|
---|
704 | expect = "sequence=%s" % urllib.quote_plus(str(['1', '2', '3']))
|
---|
705 | result = urllib.urlencode(given)
|
---|
706 | self.assertEqual(expect, result)
|
---|
707 | result = urllib.urlencode(given, True)
|
---|
708 | for value in given["sequence"]:
|
---|
709 | expect = "sequence=%s" % value
|
---|
710 | self.assertIn(expect, result)
|
---|
711 | self.assertEqual(result.count('&'), 2,
|
---|
712 | "Expected 2 '&'s, got %s" % result.count('&'))
|
---|
713 |
|
---|
714 | class Pathname_Tests(unittest.TestCase):
|
---|
715 | """Test pathname2url() and url2pathname()"""
|
---|
716 |
|
---|
717 | def test_basic(self):
|
---|
718 | # Make sure simple tests pass
|
---|
719 | expected_path = os.path.join("parts", "of", "a", "path")
|
---|
720 | expected_url = "parts/of/a/path"
|
---|
721 | result = urllib.pathname2url(expected_path)
|
---|
722 | self.assertEqual(expected_url, result,
|
---|
723 | "pathname2url() failed; %s != %s" %
|
---|
724 | (result, expected_url))
|
---|
725 | result = urllib.url2pathname(expected_url)
|
---|
726 | self.assertEqual(expected_path, result,
|
---|
727 | "url2pathame() failed; %s != %s" %
|
---|
728 | (result, expected_path))
|
---|
729 |
|
---|
730 | def test_quoting(self):
|
---|
731 | # Test automatic quoting and unquoting works for pathnam2url() and
|
---|
732 | # url2pathname() respectively
|
---|
733 | given = os.path.join("needs", "quot=ing", "here")
|
---|
734 | expect = "needs/%s/here" % urllib.quote("quot=ing")
|
---|
735 | result = urllib.pathname2url(given)
|
---|
736 | self.assertEqual(expect, result,
|
---|
737 | "pathname2url() failed; %s != %s" %
|
---|
738 | (expect, result))
|
---|
739 | expect = given
|
---|
740 | result = urllib.url2pathname(result)
|
---|
741 | self.assertEqual(expect, result,
|
---|
742 | "url2pathname() failed; %s != %s" %
|
---|
743 | (expect, result))
|
---|
744 | given = os.path.join("make sure", "using_quote")
|
---|
745 | expect = "%s/using_quote" % urllib.quote("make sure")
|
---|
746 | result = urllib.pathname2url(given)
|
---|
747 | self.assertEqual(expect, result,
|
---|
748 | "pathname2url() failed; %s != %s" %
|
---|
749 | (expect, result))
|
---|
750 | given = "make+sure/using_unquote"
|
---|
751 | expect = os.path.join("make+sure", "using_unquote")
|
---|
752 | result = urllib.url2pathname(given)
|
---|
753 | self.assertEqual(expect, result,
|
---|
754 | "url2pathname() failed; %s != %s" %
|
---|
755 | (expect, result))
|
---|
756 |
|
---|
757 | @unittest.skipUnless(sys.platform == 'win32',
|
---|
758 | 'test specific to the nturl2path library')
|
---|
759 | def test_ntpath(self):
|
---|
760 | given = ('/C:/', '///C:/', '/C|//')
|
---|
761 | expect = 'C:\\'
|
---|
762 | for url in given:
|
---|
763 | result = urllib.url2pathname(url)
|
---|
764 | self.assertEqual(expect, result,
|
---|
765 | 'nturl2path.url2pathname() failed; %s != %s' %
|
---|
766 | (expect, result))
|
---|
767 | given = '///C|/path'
|
---|
768 | expect = 'C:\\path'
|
---|
769 | result = urllib.url2pathname(given)
|
---|
770 | self.assertEqual(expect, result,
|
---|
771 | 'nturl2path.url2pathname() failed; %s != %s' %
|
---|
772 | (expect, result))
|
---|
773 |
|
---|
774 | class Utility_Tests(unittest.TestCase):
|
---|
775 | """Testcase to test the various utility functions in the urllib."""
|
---|
776 |
|
---|
777 | def test_splitpasswd(self):
|
---|
778 | """Some of the password examples are not sensible, but it is added to
|
---|
779 | confirming to RFC2617 and addressing issue4675.
|
---|
780 | """
|
---|
781 | self.assertEqual(('user', 'ab'),urllib.splitpasswd('user:ab'))
|
---|
782 | self.assertEqual(('user', 'a\nb'),urllib.splitpasswd('user:a\nb'))
|
---|
783 | self.assertEqual(('user', 'a\tb'),urllib.splitpasswd('user:a\tb'))
|
---|
784 | self.assertEqual(('user', 'a\rb'),urllib.splitpasswd('user:a\rb'))
|
---|
785 | self.assertEqual(('user', 'a\fb'),urllib.splitpasswd('user:a\fb'))
|
---|
786 | self.assertEqual(('user', 'a\vb'),urllib.splitpasswd('user:a\vb'))
|
---|
787 | self.assertEqual(('user', 'a:b'),urllib.splitpasswd('user:a:b'))
|
---|
788 | self.assertEqual(('user', 'a b'),urllib.splitpasswd('user:a b'))
|
---|
789 | self.assertEqual(('user 2', 'ab'),urllib.splitpasswd('user 2:ab'))
|
---|
790 | self.assertEqual(('user+1', 'a+b'),urllib.splitpasswd('user+1:a+b'))
|
---|
791 |
|
---|
792 |
|
---|
793 | class URLopener_Tests(unittest.TestCase):
|
---|
794 | """Testcase to test the open method of URLopener class."""
|
---|
795 |
|
---|
796 | def test_quoted_open(self):
|
---|
797 | class DummyURLopener(urllib.URLopener):
|
---|
798 | def open_spam(self, url):
|
---|
799 | return url
|
---|
800 |
|
---|
801 | self.assertEqual(DummyURLopener().open(
|
---|
802 | 'spam://example/ /'),'//example/%20/')
|
---|
803 |
|
---|
804 | # test the safe characters are not quoted by urlopen
|
---|
805 | self.assertEqual(DummyURLopener().open(
|
---|
806 | "spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"),
|
---|
807 | "//c:|windows%/:=&?~#+!$,;'@()*[]|/path/")
|
---|
808 |
|
---|
809 |
|
---|
810 | # Just commented them out.
|
---|
811 | # Can't really tell why keep failing in windows and sparc.
|
---|
812 | # Everywhere else they work ok, but on those machines, sometimes
|
---|
813 | # fail in one of the tests, sometimes in other. I have a linux, and
|
---|
814 | # the tests go ok.
|
---|
815 | # If anybody has one of the problematic environments, please help!
|
---|
816 | # . Facundo
|
---|
817 | #
|
---|
818 | # def server(evt):
|
---|
819 | # import socket, time
|
---|
820 | # serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
821 | # serv.settimeout(3)
|
---|
822 | # serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
---|
823 | # serv.bind(("", 9093))
|
---|
824 | # serv.listen(5)
|
---|
825 | # try:
|
---|
826 | # conn, addr = serv.accept()
|
---|
827 | # conn.send("1 Hola mundo\n")
|
---|
828 | # cantdata = 0
|
---|
829 | # while cantdata < 13:
|
---|
830 | # data = conn.recv(13-cantdata)
|
---|
831 | # cantdata += len(data)
|
---|
832 | # time.sleep(.3)
|
---|
833 | # conn.send("2 No more lines\n")
|
---|
834 | # conn.close()
|
---|
835 | # except socket.timeout:
|
---|
836 | # pass
|
---|
837 | # finally:
|
---|
838 | # serv.close()
|
---|
839 | # evt.set()
|
---|
840 | #
|
---|
841 | # class FTPWrapperTests(unittest.TestCase):
|
---|
842 | #
|
---|
843 | # def setUp(self):
|
---|
844 | # import ftplib, time, threading
|
---|
845 | # ftplib.FTP.port = 9093
|
---|
846 | # self.evt = threading.Event()
|
---|
847 | # threading.Thread(target=server, args=(self.evt,)).start()
|
---|
848 | # time.sleep(.1)
|
---|
849 | #
|
---|
850 | # def tearDown(self):
|
---|
851 | # self.evt.wait()
|
---|
852 | #
|
---|
853 | # def testBasic(self):
|
---|
854 | # # connects
|
---|
855 | # ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
|
---|
856 | # ftp.close()
|
---|
857 | #
|
---|
858 | # def testTimeoutNone(self):
|
---|
859 | # # global default timeout is ignored
|
---|
860 | # import socket
|
---|
861 | # self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
862 | # socket.setdefaulttimeout(30)
|
---|
863 | # try:
|
---|
864 | # ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
|
---|
865 | # finally:
|
---|
866 | # socket.setdefaulttimeout(None)
|
---|
867 | # self.assertEqual(ftp.ftp.sock.gettimeout(), 30)
|
---|
868 | # ftp.close()
|
---|
869 | #
|
---|
870 | # def testTimeoutDefault(self):
|
---|
871 | # # global default timeout is used
|
---|
872 | # import socket
|
---|
873 | # self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
874 | # socket.setdefaulttimeout(30)
|
---|
875 | # try:
|
---|
876 | # ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
|
---|
877 | # finally:
|
---|
878 | # socket.setdefaulttimeout(None)
|
---|
879 | # self.assertEqual(ftp.ftp.sock.gettimeout(), 30)
|
---|
880 | # ftp.close()
|
---|
881 | #
|
---|
882 | # def testTimeoutValue(self):
|
---|
883 | # ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [],
|
---|
884 | # timeout=30)
|
---|
885 | # self.assertEqual(ftp.ftp.sock.gettimeout(), 30)
|
---|
886 | # ftp.close()
|
---|
887 |
|
---|
888 |
|
---|
889 |
|
---|
890 | def test_main():
|
---|
891 | import warnings
|
---|
892 | with warnings.catch_warnings():
|
---|
893 | warnings.filterwarnings('ignore', ".*urllib\.urlopen.*Python 3.0",
|
---|
894 | DeprecationWarning)
|
---|
895 | test_support.run_unittest(
|
---|
896 | urlopen_FileTests,
|
---|
897 | urlopen_HttpTests,
|
---|
898 | urlretrieve_FileTests,
|
---|
899 | urlretrieve_HttpTests,
|
---|
900 | ProxyTests,
|
---|
901 | QuotingTests,
|
---|
902 | UnquotingTests,
|
---|
903 | urlencode_Tests,
|
---|
904 | Pathname_Tests,
|
---|
905 | Utility_Tests,
|
---|
906 | URLopener_Tests,
|
---|
907 | #FTPWrapperTests,
|
---|
908 | )
|
---|
909 |
|
---|
910 |
|
---|
911 |
|
---|
912 | if __name__ == '__main__':
|
---|
913 | test_main()
|
---|