1 | # We can test part of the module without zlib.
|
---|
2 | try:
|
---|
3 | import zlib
|
---|
4 | except ImportError:
|
---|
5 | zlib = None
|
---|
6 |
|
---|
7 | import os
|
---|
8 | import io
|
---|
9 | import sys
|
---|
10 | import time
|
---|
11 | import shutil
|
---|
12 | import struct
|
---|
13 | import zipfile
|
---|
14 | import unittest
|
---|
15 |
|
---|
16 | from StringIO import StringIO
|
---|
17 | from tempfile import TemporaryFile
|
---|
18 | from random import randint, random
|
---|
19 | from unittest import skipUnless
|
---|
20 |
|
---|
21 | from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \
|
---|
22 | run_unittest, findfile, unlink
|
---|
23 | try:
|
---|
24 | TESTFN_UNICODE.encode(TESTFN_ENCODING)
|
---|
25 | except (UnicodeError, TypeError):
|
---|
26 | # Either the file system encoding is None, or the file name
|
---|
27 | # cannot be encoded in the file system encoding.
|
---|
28 | TESTFN_UNICODE = None
|
---|
29 |
|
---|
30 | TESTFN2 = TESTFN + "2"
|
---|
31 | TESTFNDIR = TESTFN + "d"
|
---|
32 | FIXEDTEST_SIZE = 1000
|
---|
33 |
|
---|
34 | SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
|
---|
35 | ('ziptest2dir/_ziptest2', 'qawsedrftg'),
|
---|
36 | ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
|
---|
37 | ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
|
---|
38 |
|
---|
39 |
|
---|
40 | class TestsWithSourceFile(unittest.TestCase):
|
---|
41 | def setUp(self):
|
---|
42 | self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
|
---|
43 | for i in xrange(FIXEDTEST_SIZE)]
|
---|
44 | self.data = '\n'.join(self.line_gen) + '\n'
|
---|
45 |
|
---|
46 | # Make a source file with some lines
|
---|
47 | with open(TESTFN, "wb") as fp:
|
---|
48 | fp.write(self.data)
|
---|
49 |
|
---|
50 | def make_test_archive(self, f, compression):
|
---|
51 | # Create the ZIP archive
|
---|
52 | with zipfile.ZipFile(f, "w", compression) as zipfp:
|
---|
53 | zipfp.write(TESTFN, "another.name")
|
---|
54 | zipfp.write(TESTFN, TESTFN)
|
---|
55 | zipfp.writestr("strfile", self.data)
|
---|
56 |
|
---|
57 | def zip_test(self, f, compression):
|
---|
58 | self.make_test_archive(f, compression)
|
---|
59 |
|
---|
60 | # Read the ZIP archive
|
---|
61 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
62 | self.assertEqual(zipfp.read(TESTFN), self.data)
|
---|
63 | self.assertEqual(zipfp.read("another.name"), self.data)
|
---|
64 | self.assertEqual(zipfp.read("strfile"), self.data)
|
---|
65 |
|
---|
66 | # Print the ZIP directory
|
---|
67 | fp = StringIO()
|
---|
68 | stdout = sys.stdout
|
---|
69 | try:
|
---|
70 | sys.stdout = fp
|
---|
71 | zipfp.printdir()
|
---|
72 | finally:
|
---|
73 | sys.stdout = stdout
|
---|
74 |
|
---|
75 | directory = fp.getvalue()
|
---|
76 | lines = directory.splitlines()
|
---|
77 | self.assertEqual(len(lines), 4) # Number of files + header
|
---|
78 |
|
---|
79 | self.assertIn('File Name', lines[0])
|
---|
80 | self.assertIn('Modified', lines[0])
|
---|
81 | self.assertIn('Size', lines[0])
|
---|
82 |
|
---|
83 | fn, date, time_, size = lines[1].split()
|
---|
84 | self.assertEqual(fn, 'another.name')
|
---|
85 | self.assertTrue(time.strptime(date, '%Y-%m-%d'))
|
---|
86 | self.assertTrue(time.strptime(time_, '%H:%M:%S'))
|
---|
87 | self.assertEqual(size, str(len(self.data)))
|
---|
88 |
|
---|
89 | # Check the namelist
|
---|
90 | names = zipfp.namelist()
|
---|
91 | self.assertEqual(len(names), 3)
|
---|
92 | self.assertIn(TESTFN, names)
|
---|
93 | self.assertIn("another.name", names)
|
---|
94 | self.assertIn("strfile", names)
|
---|
95 |
|
---|
96 | # Check infolist
|
---|
97 | infos = zipfp.infolist()
|
---|
98 | names = [i.filename for i in infos]
|
---|
99 | self.assertEqual(len(names), 3)
|
---|
100 | self.assertIn(TESTFN, names)
|
---|
101 | self.assertIn("another.name", names)
|
---|
102 | self.assertIn("strfile", names)
|
---|
103 | for i in infos:
|
---|
104 | self.assertEqual(i.file_size, len(self.data))
|
---|
105 |
|
---|
106 | # check getinfo
|
---|
107 | for nm in (TESTFN, "another.name", "strfile"):
|
---|
108 | info = zipfp.getinfo(nm)
|
---|
109 | self.assertEqual(info.filename, nm)
|
---|
110 | self.assertEqual(info.file_size, len(self.data))
|
---|
111 |
|
---|
112 | # Check that testzip doesn't raise an exception
|
---|
113 | zipfp.testzip()
|
---|
114 |
|
---|
115 | def test_stored(self):
|
---|
116 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
117 | self.zip_test(f, zipfile.ZIP_STORED)
|
---|
118 |
|
---|
119 | def zip_open_test(self, f, compression):
|
---|
120 | self.make_test_archive(f, compression)
|
---|
121 |
|
---|
122 | # Read the ZIP archive
|
---|
123 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
124 | zipdata1 = []
|
---|
125 | with zipfp.open(TESTFN) as zipopen1:
|
---|
126 | while True:
|
---|
127 | read_data = zipopen1.read(256)
|
---|
128 | if not read_data:
|
---|
129 | break
|
---|
130 | zipdata1.append(read_data)
|
---|
131 |
|
---|
132 | zipdata2 = []
|
---|
133 | with zipfp.open("another.name") as zipopen2:
|
---|
134 | while True:
|
---|
135 | read_data = zipopen2.read(256)
|
---|
136 | if not read_data:
|
---|
137 | break
|
---|
138 | zipdata2.append(read_data)
|
---|
139 |
|
---|
140 | self.assertEqual(''.join(zipdata1), self.data)
|
---|
141 | self.assertEqual(''.join(zipdata2), self.data)
|
---|
142 |
|
---|
143 | def test_open_stored(self):
|
---|
144 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
145 | self.zip_open_test(f, zipfile.ZIP_STORED)
|
---|
146 |
|
---|
147 | def test_open_via_zip_info(self):
|
---|
148 | # Create the ZIP archive
|
---|
149 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
150 | zipfp.writestr("name", "foo")
|
---|
151 | zipfp.writestr("name", "bar")
|
---|
152 |
|
---|
153 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
154 | infos = zipfp.infolist()
|
---|
155 | data = ""
|
---|
156 | for info in infos:
|
---|
157 | with zipfp.open(info) as f:
|
---|
158 | data += f.read()
|
---|
159 | self.assertTrue(data == "foobar" or data == "barfoo")
|
---|
160 | data = ""
|
---|
161 | for info in infos:
|
---|
162 | data += zipfp.read(info)
|
---|
163 | self.assertTrue(data == "foobar" or data == "barfoo")
|
---|
164 |
|
---|
165 | def zip_random_open_test(self, f, compression):
|
---|
166 | self.make_test_archive(f, compression)
|
---|
167 |
|
---|
168 | # Read the ZIP archive
|
---|
169 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
170 | zipdata1 = []
|
---|
171 | with zipfp.open(TESTFN) as zipopen1:
|
---|
172 | while True:
|
---|
173 | read_data = zipopen1.read(randint(1, 1024))
|
---|
174 | if not read_data:
|
---|
175 | break
|
---|
176 | zipdata1.append(read_data)
|
---|
177 |
|
---|
178 | self.assertEqual(''.join(zipdata1), self.data)
|
---|
179 |
|
---|
180 | def test_random_open_stored(self):
|
---|
181 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
182 | self.zip_random_open_test(f, zipfile.ZIP_STORED)
|
---|
183 |
|
---|
184 | def test_univeral_readaheads(self):
|
---|
185 | f = StringIO()
|
---|
186 |
|
---|
187 | data = 'a\r\n' * 16 * 1024
|
---|
188 | with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp:
|
---|
189 | zipfp.writestr(TESTFN, data)
|
---|
190 |
|
---|
191 | data2 = ''
|
---|
192 | with zipfile.ZipFile(f, 'r') as zipfp:
|
---|
193 | with zipfp.open(TESTFN, 'rU') as zipopen:
|
---|
194 | for line in zipopen:
|
---|
195 | data2 += line
|
---|
196 |
|
---|
197 | self.assertEqual(data, data2.replace('\n', '\r\n'))
|
---|
198 |
|
---|
199 | def zip_readline_read_test(self, f, compression):
|
---|
200 | self.make_test_archive(f, compression)
|
---|
201 |
|
---|
202 | # Read the ZIP archive
|
---|
203 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
204 | with zipfp.open(TESTFN) as zipopen:
|
---|
205 | data = ''
|
---|
206 | while True:
|
---|
207 | read = zipopen.readline()
|
---|
208 | if not read:
|
---|
209 | break
|
---|
210 | data += read
|
---|
211 |
|
---|
212 | read = zipopen.read(100)
|
---|
213 | if not read:
|
---|
214 | break
|
---|
215 | data += read
|
---|
216 |
|
---|
217 | self.assertEqual(data, self.data)
|
---|
218 |
|
---|
219 | def zip_readline_test(self, f, compression):
|
---|
220 | self.make_test_archive(f, compression)
|
---|
221 |
|
---|
222 | # Read the ZIP archive
|
---|
223 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
224 | with zipfp.open(TESTFN) as zipopen:
|
---|
225 | for line in self.line_gen:
|
---|
226 | linedata = zipopen.readline()
|
---|
227 | self.assertEqual(linedata, line + '\n')
|
---|
228 |
|
---|
229 | def zip_readlines_test(self, f, compression):
|
---|
230 | self.make_test_archive(f, compression)
|
---|
231 |
|
---|
232 | # Read the ZIP archive
|
---|
233 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
234 | with zipfp.open(TESTFN) as zo:
|
---|
235 | ziplines = zo.readlines()
|
---|
236 | for line, zipline in zip(self.line_gen, ziplines):
|
---|
237 | self.assertEqual(zipline, line + '\n')
|
---|
238 |
|
---|
239 | def zip_iterlines_test(self, f, compression):
|
---|
240 | self.make_test_archive(f, compression)
|
---|
241 |
|
---|
242 | # Read the ZIP archive
|
---|
243 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
244 | for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
|
---|
245 | self.assertEqual(zipline, line + '\n')
|
---|
246 |
|
---|
247 | def test_readline_read_stored(self):
|
---|
248 | # Issue #7610: calls to readline() interleaved with calls to read().
|
---|
249 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
250 | self.zip_readline_read_test(f, zipfile.ZIP_STORED)
|
---|
251 |
|
---|
252 | def test_readline_stored(self):
|
---|
253 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
254 | self.zip_readline_test(f, zipfile.ZIP_STORED)
|
---|
255 |
|
---|
256 | def test_readlines_stored(self):
|
---|
257 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
258 | self.zip_readlines_test(f, zipfile.ZIP_STORED)
|
---|
259 |
|
---|
260 | def test_iterlines_stored(self):
|
---|
261 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
262 | self.zip_iterlines_test(f, zipfile.ZIP_STORED)
|
---|
263 |
|
---|
264 | @skipUnless(zlib, "requires zlib")
|
---|
265 | def test_deflated(self):
|
---|
266 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
267 | self.zip_test(f, zipfile.ZIP_DEFLATED)
|
---|
268 |
|
---|
269 | @skipUnless(zlib, "requires zlib")
|
---|
270 | def test_open_deflated(self):
|
---|
271 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
272 | self.zip_open_test(f, zipfile.ZIP_DEFLATED)
|
---|
273 |
|
---|
274 | @skipUnless(zlib, "requires zlib")
|
---|
275 | def test_random_open_deflated(self):
|
---|
276 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
277 | self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
|
---|
278 |
|
---|
279 | @skipUnless(zlib, "requires zlib")
|
---|
280 | def test_readline_read_deflated(self):
|
---|
281 | # Issue #7610: calls to readline() interleaved with calls to read().
|
---|
282 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
283 | self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
|
---|
284 |
|
---|
285 | @skipUnless(zlib, "requires zlib")
|
---|
286 | def test_readline_deflated(self):
|
---|
287 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
288 | self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
|
---|
289 |
|
---|
290 | @skipUnless(zlib, "requires zlib")
|
---|
291 | def test_readlines_deflated(self):
|
---|
292 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
293 | self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
|
---|
294 |
|
---|
295 | @skipUnless(zlib, "requires zlib")
|
---|
296 | def test_iterlines_deflated(self):
|
---|
297 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
298 | self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
|
---|
299 |
|
---|
300 | @skipUnless(zlib, "requires zlib")
|
---|
301 | def test_low_compression(self):
|
---|
302 | """Check for cases where compressed data is larger than original."""
|
---|
303 | # Create the ZIP archive
|
---|
304 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
|
---|
305 | zipfp.writestr("strfile", '12')
|
---|
306 |
|
---|
307 | # Get an open object for strfile
|
---|
308 | with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
|
---|
309 | with zipfp.open("strfile") as openobj:
|
---|
310 | self.assertEqual(openobj.read(1), '1')
|
---|
311 | self.assertEqual(openobj.read(1), '2')
|
---|
312 |
|
---|
313 | def test_absolute_arcnames(self):
|
---|
314 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
315 | zipfp.write(TESTFN, "/absolute")
|
---|
316 |
|
---|
317 | with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
|
---|
318 | self.assertEqual(zipfp.namelist(), ["absolute"])
|
---|
319 |
|
---|
320 | def test_append_to_zip_file(self):
|
---|
321 | """Test appending to an existing zipfile."""
|
---|
322 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
323 | zipfp.write(TESTFN, TESTFN)
|
---|
324 |
|
---|
325 | with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
|
---|
326 | zipfp.writestr("strfile", self.data)
|
---|
327 | self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
|
---|
328 |
|
---|
329 | def test_append_to_non_zip_file(self):
|
---|
330 | """Test appending to an existing file that is not a zipfile."""
|
---|
331 | # NOTE: this test fails if len(d) < 22 because of the first
|
---|
332 | # line "fpin.seek(-22, 2)" in _EndRecData
|
---|
333 | data = 'I am not a ZipFile!'*10
|
---|
334 | with open(TESTFN2, 'wb') as f:
|
---|
335 | f.write(data)
|
---|
336 |
|
---|
337 | with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
|
---|
338 | zipfp.write(TESTFN, TESTFN)
|
---|
339 |
|
---|
340 | with open(TESTFN2, 'rb') as f:
|
---|
341 | f.seek(len(data))
|
---|
342 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
343 | self.assertEqual(zipfp.namelist(), [TESTFN])
|
---|
344 |
|
---|
345 | def test_ignores_newline_at_end(self):
|
---|
346 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
347 | zipfp.write(TESTFN, TESTFN)
|
---|
348 | with open(TESTFN2, 'a') as f:
|
---|
349 | f.write("\r\n\00\00\00")
|
---|
350 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
351 | self.assertIsInstance(zipfp, zipfile.ZipFile)
|
---|
352 |
|
---|
353 | def test_ignores_stuff_appended_past_comments(self):
|
---|
354 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
355 | zipfp.comment = b"this is a comment"
|
---|
356 | zipfp.write(TESTFN, TESTFN)
|
---|
357 | with open(TESTFN2, 'a') as f:
|
---|
358 | f.write("abcdef\r\n")
|
---|
359 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
360 | self.assertIsInstance(zipfp, zipfile.ZipFile)
|
---|
361 | self.assertEqual(zipfp.comment, b"this is a comment")
|
---|
362 |
|
---|
363 | def test_write_default_name(self):
|
---|
364 | """Check that calling ZipFile.write without arcname specified
|
---|
365 | produces the expected result."""
|
---|
366 | with zipfile.ZipFile(TESTFN2, "w") as zipfp:
|
---|
367 | zipfp.write(TESTFN)
|
---|
368 | self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read())
|
---|
369 |
|
---|
370 | @skipUnless(zlib, "requires zlib")
|
---|
371 | def test_per_file_compression(self):
|
---|
372 | """Check that files within a Zip archive can have different
|
---|
373 | compression options."""
|
---|
374 | with zipfile.ZipFile(TESTFN2, "w") as zipfp:
|
---|
375 | zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
|
---|
376 | zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
|
---|
377 | sinfo = zipfp.getinfo('storeme')
|
---|
378 | dinfo = zipfp.getinfo('deflateme')
|
---|
379 | self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
|
---|
380 | self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
|
---|
381 |
|
---|
382 | def test_write_to_readonly(self):
|
---|
383 | """Check that trying to call write() on a readonly ZipFile object
|
---|
384 | raises a RuntimeError."""
|
---|
385 | with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
|
---|
386 | zipfp.writestr("somefile.txt", "bogus")
|
---|
387 |
|
---|
388 | with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
|
---|
389 | self.assertRaises(RuntimeError, zipfp.write, TESTFN)
|
---|
390 |
|
---|
391 | def test_extract(self):
|
---|
392 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
393 | for fpath, fdata in SMALL_TEST_DATA:
|
---|
394 | zipfp.writestr(fpath, fdata)
|
---|
395 |
|
---|
396 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
397 | for fpath, fdata in SMALL_TEST_DATA:
|
---|
398 | writtenfile = zipfp.extract(fpath)
|
---|
399 |
|
---|
400 | # make sure it was written to the right place
|
---|
401 | correctfile = os.path.join(os.getcwd(), fpath)
|
---|
402 | correctfile = os.path.normpath(correctfile)
|
---|
403 |
|
---|
404 | self.assertEqual(writtenfile, correctfile)
|
---|
405 |
|
---|
406 | # make sure correct data is in correct file
|
---|
407 | self.assertEqual(fdata, open(writtenfile, "rb").read())
|
---|
408 | os.remove(writtenfile)
|
---|
409 |
|
---|
410 | # remove the test file subdirectories
|
---|
411 | shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
|
---|
412 |
|
---|
413 | def test_extract_all(self):
|
---|
414 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
415 | for fpath, fdata in SMALL_TEST_DATA:
|
---|
416 | zipfp.writestr(fpath, fdata)
|
---|
417 |
|
---|
418 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
419 | zipfp.extractall()
|
---|
420 | for fpath, fdata in SMALL_TEST_DATA:
|
---|
421 | outfile = os.path.join(os.getcwd(), fpath)
|
---|
422 |
|
---|
423 | self.assertEqual(fdata, open(outfile, "rb").read())
|
---|
424 | os.remove(outfile)
|
---|
425 |
|
---|
426 | # remove the test file subdirectories
|
---|
427 | shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
|
---|
428 |
|
---|
429 | def check_file(self, filename, content):
|
---|
430 | self.assertTrue(os.path.isfile(filename))
|
---|
431 | with open(filename, 'rb') as f:
|
---|
432 | self.assertEqual(f.read(), content)
|
---|
433 |
|
---|
434 | @skipUnless(TESTFN_UNICODE, "No Unicode filesystem semantics on this platform.")
|
---|
435 | def test_extract_unicode_filenames(self):
|
---|
436 | fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)]
|
---|
437 | content = 'Test for unicode filename'
|
---|
438 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
|
---|
439 | for fname in fnames:
|
---|
440 | zipfp.writestr(fname, content)
|
---|
441 |
|
---|
442 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
443 | for fname in fnames:
|
---|
444 | writtenfile = zipfp.extract(fname)
|
---|
445 |
|
---|
446 | # make sure it was written to the right place
|
---|
447 | correctfile = os.path.join(os.getcwd(), fname)
|
---|
448 | correctfile = os.path.normpath(correctfile)
|
---|
449 | self.assertEqual(writtenfile, correctfile)
|
---|
450 |
|
---|
451 | self.check_file(writtenfile, content)
|
---|
452 | os.remove(writtenfile)
|
---|
453 |
|
---|
454 | def test_extract_hackers_arcnames(self):
|
---|
455 | hacknames = [
|
---|
456 | ('../foo/bar', 'foo/bar'),
|
---|
457 | ('foo/../bar', 'foo/bar'),
|
---|
458 | ('foo/../../bar', 'foo/bar'),
|
---|
459 | ('foo/bar/..', 'foo/bar'),
|
---|
460 | ('./../foo/bar', 'foo/bar'),
|
---|
461 | ('/foo/bar', 'foo/bar'),
|
---|
462 | ('/foo/../bar', 'foo/bar'),
|
---|
463 | ('/foo/../../bar', 'foo/bar'),
|
---|
464 | ]
|
---|
465 | if os.path.sep == '\\':
|
---|
466 | hacknames.extend([
|
---|
467 | (r'..\foo\bar', 'foo/bar'),
|
---|
468 | (r'..\/foo\/bar', 'foo/bar'),
|
---|
469 | (r'foo/\..\/bar', 'foo/bar'),
|
---|
470 | (r'foo\/../\bar', 'foo/bar'),
|
---|
471 | (r'C:foo/bar', 'foo/bar'),
|
---|
472 | (r'C:/foo/bar', 'foo/bar'),
|
---|
473 | (r'C://foo/bar', 'foo/bar'),
|
---|
474 | (r'C:\foo\bar', 'foo/bar'),
|
---|
475 | (r'//conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
|
---|
476 | (r'\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
|
---|
477 | (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
|
---|
478 | (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
|
---|
479 | (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
|
---|
480 | (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
|
---|
481 | (r'//?/C:/foo/bar', '_/C_/foo/bar'),
|
---|
482 | (r'\\?\C:\foo\bar', '_/C_/foo/bar'),
|
---|
483 | (r'C:/../C:/foo/bar', 'C_/foo/bar'),
|
---|
484 | (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
|
---|
485 | ('../../foo../../ba..r', 'foo/ba..r'),
|
---|
486 | ])
|
---|
487 | else: # Unix
|
---|
488 | hacknames.extend([
|
---|
489 | ('//foo/bar', 'foo/bar'),
|
---|
490 | ('../../foo../../ba..r', 'foo../ba..r'),
|
---|
491 | (r'foo/..\bar', r'foo/..\bar'),
|
---|
492 | ])
|
---|
493 |
|
---|
494 | for arcname, fixedname in hacknames:
|
---|
495 | content = b'foobar' + arcname.encode()
|
---|
496 | with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
|
---|
497 | zinfo = zipfile.ZipInfo()
|
---|
498 | # preserve backslashes
|
---|
499 | zinfo.filename = arcname
|
---|
500 | zinfo.external_attr = 0o600 << 16
|
---|
501 | zipfp.writestr(zinfo, content)
|
---|
502 |
|
---|
503 | arcname = arcname.replace(os.sep, "/")
|
---|
504 | targetpath = os.path.join('target', 'subdir', 'subsub')
|
---|
505 | correctfile = os.path.join(targetpath, *fixedname.split('/'))
|
---|
506 |
|
---|
507 | with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
|
---|
508 | writtenfile = zipfp.extract(arcname, targetpath)
|
---|
509 | self.assertEqual(writtenfile, correctfile,
|
---|
510 | msg="extract %r" % arcname)
|
---|
511 | self.check_file(correctfile, content)
|
---|
512 | shutil.rmtree('target')
|
---|
513 |
|
---|
514 | with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
|
---|
515 | zipfp.extractall(targetpath)
|
---|
516 | self.check_file(correctfile, content)
|
---|
517 | shutil.rmtree('target')
|
---|
518 |
|
---|
519 | correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
|
---|
520 |
|
---|
521 | with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
|
---|
522 | writtenfile = zipfp.extract(arcname)
|
---|
523 | self.assertEqual(writtenfile, correctfile,
|
---|
524 | msg="extract %r" % arcname)
|
---|
525 | self.check_file(correctfile, content)
|
---|
526 | shutil.rmtree(fixedname.split('/')[0])
|
---|
527 |
|
---|
528 | with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
|
---|
529 | zipfp.extractall()
|
---|
530 | self.check_file(correctfile, content)
|
---|
531 | shutil.rmtree(fixedname.split('/')[0])
|
---|
532 |
|
---|
533 | os.remove(TESTFN2)
|
---|
534 |
|
---|
535 | def test_writestr_compression(self):
|
---|
536 | zipfp = zipfile.ZipFile(TESTFN2, "w")
|
---|
537 | zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED)
|
---|
538 | if zlib:
|
---|
539 | zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED)
|
---|
540 |
|
---|
541 | info = zipfp.getinfo('a.txt')
|
---|
542 | self.assertEqual(info.compress_type, zipfile.ZIP_STORED)
|
---|
543 |
|
---|
544 | if zlib:
|
---|
545 | info = zipfp.getinfo('b.txt')
|
---|
546 | self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
|
---|
547 |
|
---|
548 |
|
---|
549 | def zip_test_writestr_permissions(self, f, compression):
|
---|
550 | # Make sure that writestr creates files with mode 0600,
|
---|
551 | # when it is passed a name rather than a ZipInfo instance.
|
---|
552 |
|
---|
553 | self.make_test_archive(f, compression)
|
---|
554 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
555 | zinfo = zipfp.getinfo('strfile')
|
---|
556 | self.assertEqual(zinfo.external_attr, 0600 << 16)
|
---|
557 |
|
---|
558 | def test_writestr_permissions(self):
|
---|
559 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
560 | self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
|
---|
561 |
|
---|
562 | def test_close(self):
|
---|
563 | """Check that the zipfile is closed after the 'with' block."""
|
---|
564 | with zipfile.ZipFile(TESTFN2, "w") as zipfp:
|
---|
565 | for fpath, fdata in SMALL_TEST_DATA:
|
---|
566 | zipfp.writestr(fpath, fdata)
|
---|
567 | self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
|
---|
568 | self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
|
---|
569 |
|
---|
570 | with zipfile.ZipFile(TESTFN2, "r") as zipfp:
|
---|
571 | self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
|
---|
572 | self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
|
---|
573 |
|
---|
574 | def test_close_on_exception(self):
|
---|
575 | """Check that the zipfile is closed if an exception is raised in the
|
---|
576 | 'with' block."""
|
---|
577 | with zipfile.ZipFile(TESTFN2, "w") as zipfp:
|
---|
578 | for fpath, fdata in SMALL_TEST_DATA:
|
---|
579 | zipfp.writestr(fpath, fdata)
|
---|
580 |
|
---|
581 | try:
|
---|
582 | with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
|
---|
583 | raise zipfile.BadZipfile()
|
---|
584 | except zipfile.BadZipfile:
|
---|
585 | self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
|
---|
586 |
|
---|
587 | def test_add_file_before_1980(self):
|
---|
588 | # Set atime and mtime to 1970-01-01
|
---|
589 | os.utime(TESTFN, (0, 0))
|
---|
590 | with zipfile.ZipFile(TESTFN2, "w") as zipfp:
|
---|
591 | self.assertRaises(ValueError, zipfp.write, TESTFN)
|
---|
592 |
|
---|
593 | def tearDown(self):
|
---|
594 | unlink(TESTFN)
|
---|
595 | unlink(TESTFN2)
|
---|
596 |
|
---|
597 |
|
---|
598 | class TestZip64InSmallFiles(unittest.TestCase):
|
---|
599 | # These tests test the ZIP64 functionality without using large files,
|
---|
600 | # see test_zipfile64 for proper tests.
|
---|
601 |
|
---|
602 | def setUp(self):
|
---|
603 | self._limit = zipfile.ZIP64_LIMIT
|
---|
604 | zipfile.ZIP64_LIMIT = 5
|
---|
605 |
|
---|
606 | line_gen = ("Test of zipfile line %d." % i
|
---|
607 | for i in range(0, FIXEDTEST_SIZE))
|
---|
608 | self.data = '\n'.join(line_gen)
|
---|
609 |
|
---|
610 | # Make a source file with some lines
|
---|
611 | with open(TESTFN, "wb") as fp:
|
---|
612 | fp.write(self.data)
|
---|
613 |
|
---|
614 | def large_file_exception_test(self, f, compression):
|
---|
615 | with zipfile.ZipFile(f, "w", compression) as zipfp:
|
---|
616 | self.assertRaises(zipfile.LargeZipFile,
|
---|
617 | zipfp.write, TESTFN, "another.name")
|
---|
618 |
|
---|
619 | def large_file_exception_test2(self, f, compression):
|
---|
620 | with zipfile.ZipFile(f, "w", compression) as zipfp:
|
---|
621 | self.assertRaises(zipfile.LargeZipFile,
|
---|
622 | zipfp.writestr, "another.name", self.data)
|
---|
623 |
|
---|
624 | def test_large_file_exception(self):
|
---|
625 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
626 | self.large_file_exception_test(f, zipfile.ZIP_STORED)
|
---|
627 | self.large_file_exception_test2(f, zipfile.ZIP_STORED)
|
---|
628 |
|
---|
629 | def zip_test(self, f, compression):
|
---|
630 | # Create the ZIP archive
|
---|
631 | with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
|
---|
632 | zipfp.write(TESTFN, "another.name")
|
---|
633 | zipfp.write(TESTFN, TESTFN)
|
---|
634 | zipfp.writestr("strfile", self.data)
|
---|
635 |
|
---|
636 | # Read the ZIP archive
|
---|
637 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
638 | self.assertEqual(zipfp.read(TESTFN), self.data)
|
---|
639 | self.assertEqual(zipfp.read("another.name"), self.data)
|
---|
640 | self.assertEqual(zipfp.read("strfile"), self.data)
|
---|
641 |
|
---|
642 | # Print the ZIP directory
|
---|
643 | fp = StringIO()
|
---|
644 | stdout = sys.stdout
|
---|
645 | try:
|
---|
646 | sys.stdout = fp
|
---|
647 | zipfp.printdir()
|
---|
648 | finally:
|
---|
649 | sys.stdout = stdout
|
---|
650 |
|
---|
651 | directory = fp.getvalue()
|
---|
652 | lines = directory.splitlines()
|
---|
653 | self.assertEqual(len(lines), 4) # Number of files + header
|
---|
654 |
|
---|
655 | self.assertIn('File Name', lines[0])
|
---|
656 | self.assertIn('Modified', lines[0])
|
---|
657 | self.assertIn('Size', lines[0])
|
---|
658 |
|
---|
659 | fn, date, time_, size = lines[1].split()
|
---|
660 | self.assertEqual(fn, 'another.name')
|
---|
661 | self.assertTrue(time.strptime(date, '%Y-%m-%d'))
|
---|
662 | self.assertTrue(time.strptime(time_, '%H:%M:%S'))
|
---|
663 | self.assertEqual(size, str(len(self.data)))
|
---|
664 |
|
---|
665 | # Check the namelist
|
---|
666 | names = zipfp.namelist()
|
---|
667 | self.assertEqual(len(names), 3)
|
---|
668 | self.assertIn(TESTFN, names)
|
---|
669 | self.assertIn("another.name", names)
|
---|
670 | self.assertIn("strfile", names)
|
---|
671 |
|
---|
672 | # Check infolist
|
---|
673 | infos = zipfp.infolist()
|
---|
674 | names = [i.filename for i in infos]
|
---|
675 | self.assertEqual(len(names), 3)
|
---|
676 | self.assertIn(TESTFN, names)
|
---|
677 | self.assertIn("another.name", names)
|
---|
678 | self.assertIn("strfile", names)
|
---|
679 | for i in infos:
|
---|
680 | self.assertEqual(i.file_size, len(self.data))
|
---|
681 |
|
---|
682 | # check getinfo
|
---|
683 | for nm in (TESTFN, "another.name", "strfile"):
|
---|
684 | info = zipfp.getinfo(nm)
|
---|
685 | self.assertEqual(info.filename, nm)
|
---|
686 | self.assertEqual(info.file_size, len(self.data))
|
---|
687 |
|
---|
688 | # Check that testzip doesn't raise an exception
|
---|
689 | zipfp.testzip()
|
---|
690 |
|
---|
691 | def test_stored(self):
|
---|
692 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
693 | self.zip_test(f, zipfile.ZIP_STORED)
|
---|
694 |
|
---|
695 | @skipUnless(zlib, "requires zlib")
|
---|
696 | def test_deflated(self):
|
---|
697 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
698 | self.zip_test(f, zipfile.ZIP_DEFLATED)
|
---|
699 |
|
---|
700 | def test_absolute_arcnames(self):
|
---|
701 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
|
---|
702 | allowZip64=True) as zipfp:
|
---|
703 | zipfp.write(TESTFN, "/absolute")
|
---|
704 |
|
---|
705 | with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
|
---|
706 | self.assertEqual(zipfp.namelist(), ["absolute"])
|
---|
707 |
|
---|
708 | def tearDown(self):
|
---|
709 | zipfile.ZIP64_LIMIT = self._limit
|
---|
710 | unlink(TESTFN)
|
---|
711 | unlink(TESTFN2)
|
---|
712 |
|
---|
713 |
|
---|
714 | class PyZipFileTests(unittest.TestCase):
|
---|
715 | def test_write_pyfile(self):
|
---|
716 | with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
|
---|
717 | fn = __file__
|
---|
718 | if fn.endswith('.pyc') or fn.endswith('.pyo'):
|
---|
719 | fn = fn[:-1]
|
---|
720 |
|
---|
721 | zipfp.writepy(fn)
|
---|
722 |
|
---|
723 | bn = os.path.basename(fn)
|
---|
724 | self.assertNotIn(bn, zipfp.namelist())
|
---|
725 | self.assertTrue(bn + 'o' in zipfp.namelist() or
|
---|
726 | bn + 'c' in zipfp.namelist())
|
---|
727 |
|
---|
728 | with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
|
---|
729 | fn = __file__
|
---|
730 | if fn.endswith(('.pyc', '.pyo')):
|
---|
731 | fn = fn[:-1]
|
---|
732 |
|
---|
733 | zipfp.writepy(fn, "testpackage")
|
---|
734 |
|
---|
735 | bn = "%s/%s" % ("testpackage", os.path.basename(fn))
|
---|
736 | self.assertNotIn(bn, zipfp.namelist())
|
---|
737 | self.assertTrue(bn + 'o' in zipfp.namelist() or
|
---|
738 | bn + 'c' in zipfp.namelist())
|
---|
739 |
|
---|
740 | def test_write_python_package(self):
|
---|
741 | import email
|
---|
742 | packagedir = os.path.dirname(email.__file__)
|
---|
743 |
|
---|
744 | with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
|
---|
745 | zipfp.writepy(packagedir)
|
---|
746 |
|
---|
747 | # Check for a couple of modules at different levels of the
|
---|
748 | # hierarchy
|
---|
749 | names = zipfp.namelist()
|
---|
750 | self.assertTrue('email/__init__.pyo' in names or
|
---|
751 | 'email/__init__.pyc' in names)
|
---|
752 | self.assertTrue('email/mime/text.pyo' in names or
|
---|
753 | 'email/mime/text.pyc' in names)
|
---|
754 |
|
---|
755 | def test_write_python_directory(self):
|
---|
756 | os.mkdir(TESTFN2)
|
---|
757 | try:
|
---|
758 | with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
|
---|
759 | fp.write("print(42)\n")
|
---|
760 |
|
---|
761 | with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
|
---|
762 | fp.write("print(42 * 42)\n")
|
---|
763 |
|
---|
764 | with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
|
---|
765 | fp.write("bla bla bla\n")
|
---|
766 |
|
---|
767 | zipfp = zipfile.PyZipFile(TemporaryFile(), "w")
|
---|
768 | zipfp.writepy(TESTFN2)
|
---|
769 |
|
---|
770 | names = zipfp.namelist()
|
---|
771 | self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names)
|
---|
772 | self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names)
|
---|
773 | self.assertNotIn('mod2.txt', names)
|
---|
774 |
|
---|
775 | finally:
|
---|
776 | shutil.rmtree(TESTFN2)
|
---|
777 |
|
---|
778 | def test_write_non_pyfile(self):
|
---|
779 | with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
|
---|
780 | open(TESTFN, 'w').write('most definitely not a python file')
|
---|
781 | self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
|
---|
782 | os.remove(TESTFN)
|
---|
783 |
|
---|
784 |
|
---|
785 | class OtherTests(unittest.TestCase):
|
---|
786 | zips_with_bad_crc = {
|
---|
787 | zipfile.ZIP_STORED: (
|
---|
788 | b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
|
---|
789 | b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
|
---|
790 | b'ilehello,AworldP'
|
---|
791 | b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
|
---|
792 | b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
|
---|
793 | b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
|
---|
794 | b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
|
---|
795 | b'\0\0/\0\0\0\0\0'),
|
---|
796 | zipfile.ZIP_DEFLATED: (
|
---|
797 | b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
|
---|
798 | b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
|
---|
799 | b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
|
---|
800 | b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
|
---|
801 | b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
|
---|
802 | b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
|
---|
803 | b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
|
---|
804 | b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
|
---|
805 | }
|
---|
806 |
|
---|
807 | def test_unicode_filenames(self):
|
---|
808 | with zipfile.ZipFile(TESTFN, "w") as zf:
|
---|
809 | zf.writestr(u"foo.txt", "Test for unicode filename")
|
---|
810 | zf.writestr(u"\xf6.txt", "Test for unicode filename")
|
---|
811 | self.assertIsInstance(zf.infolist()[0].filename, unicode)
|
---|
812 |
|
---|
813 | with zipfile.ZipFile(TESTFN, "r") as zf:
|
---|
814 | self.assertEqual(zf.filelist[0].filename, "foo.txt")
|
---|
815 | self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
|
---|
816 |
|
---|
817 | def test_create_non_existent_file_for_append(self):
|
---|
818 | if os.path.exists(TESTFN):
|
---|
819 | os.unlink(TESTFN)
|
---|
820 |
|
---|
821 | filename = 'testfile.txt'
|
---|
822 | content = 'hello, world. this is some content.'
|
---|
823 |
|
---|
824 | try:
|
---|
825 | with zipfile.ZipFile(TESTFN, 'a') as zf:
|
---|
826 | zf.writestr(filename, content)
|
---|
827 | except IOError:
|
---|
828 | self.fail('Could not append data to a non-existent zip file.')
|
---|
829 |
|
---|
830 | self.assertTrue(os.path.exists(TESTFN))
|
---|
831 |
|
---|
832 | with zipfile.ZipFile(TESTFN, 'r') as zf:
|
---|
833 | self.assertEqual(zf.read(filename), content)
|
---|
834 |
|
---|
835 | def test_close_erroneous_file(self):
|
---|
836 | # This test checks that the ZipFile constructor closes the file object
|
---|
837 | # it opens if there's an error in the file. If it doesn't, the
|
---|
838 | # traceback holds a reference to the ZipFile object and, indirectly,
|
---|
839 | # the file object.
|
---|
840 | # On Windows, this causes the os.unlink() call to fail because the
|
---|
841 | # underlying file is still open. This is SF bug #412214.
|
---|
842 | #
|
---|
843 | with open(TESTFN, "w") as fp:
|
---|
844 | fp.write("this is not a legal zip file\n")
|
---|
845 | try:
|
---|
846 | zf = zipfile.ZipFile(TESTFN)
|
---|
847 | except zipfile.BadZipfile:
|
---|
848 | pass
|
---|
849 |
|
---|
850 | def test_is_zip_erroneous_file(self):
|
---|
851 | """Check that is_zipfile() correctly identifies non-zip files."""
|
---|
852 | # - passing a filename
|
---|
853 | with open(TESTFN, "w") as fp:
|
---|
854 | fp.write("this is not a legal zip file\n")
|
---|
855 | chk = zipfile.is_zipfile(TESTFN)
|
---|
856 | self.assertFalse(chk)
|
---|
857 | # - passing a file object
|
---|
858 | with open(TESTFN, "rb") as fp:
|
---|
859 | chk = zipfile.is_zipfile(fp)
|
---|
860 | self.assertTrue(not chk)
|
---|
861 | # - passing a file-like object
|
---|
862 | fp = StringIO()
|
---|
863 | fp.write("this is not a legal zip file\n")
|
---|
864 | chk = zipfile.is_zipfile(fp)
|
---|
865 | self.assertTrue(not chk)
|
---|
866 | fp.seek(0, 0)
|
---|
867 | chk = zipfile.is_zipfile(fp)
|
---|
868 | self.assertTrue(not chk)
|
---|
869 |
|
---|
870 | def test_damaged_zipfile(self):
|
---|
871 | """Check that zipfiles with missing bytes at the end raise BadZipFile."""
|
---|
872 | # - Create a valid zip file
|
---|
873 | fp = io.BytesIO()
|
---|
874 | with zipfile.ZipFile(fp, mode="w") as zipf:
|
---|
875 | zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
|
---|
876 | zipfiledata = fp.getvalue()
|
---|
877 |
|
---|
878 | # - Now create copies of it missing the last N bytes and make sure
|
---|
879 | # a BadZipFile exception is raised when we try to open it
|
---|
880 | for N in range(len(zipfiledata)):
|
---|
881 | fp = io.BytesIO(zipfiledata[:N])
|
---|
882 | self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
|
---|
883 |
|
---|
884 | def test_is_zip_valid_file(self):
|
---|
885 | """Check that is_zipfile() correctly identifies zip files."""
|
---|
886 | # - passing a filename
|
---|
887 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
888 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
889 | chk = zipfile.is_zipfile(TESTFN)
|
---|
890 | self.assertTrue(chk)
|
---|
891 | # - passing a file object
|
---|
892 | with open(TESTFN, "rb") as fp:
|
---|
893 | chk = zipfile.is_zipfile(fp)
|
---|
894 | self.assertTrue(chk)
|
---|
895 | fp.seek(0, 0)
|
---|
896 | zip_contents = fp.read()
|
---|
897 | # - passing a file-like object
|
---|
898 | fp = StringIO()
|
---|
899 | fp.write(zip_contents)
|
---|
900 | chk = zipfile.is_zipfile(fp)
|
---|
901 | self.assertTrue(chk)
|
---|
902 | fp.seek(0, 0)
|
---|
903 | chk = zipfile.is_zipfile(fp)
|
---|
904 | self.assertTrue(chk)
|
---|
905 |
|
---|
906 | def test_non_existent_file_raises_IOError(self):
|
---|
907 | # make sure we don't raise an AttributeError when a partially-constructed
|
---|
908 | # ZipFile instance is finalized; this tests for regression on SF tracker
|
---|
909 | # bug #403871.
|
---|
910 |
|
---|
911 | # The bug we're testing for caused an AttributeError to be raised
|
---|
912 | # when a ZipFile instance was created for a file that did not
|
---|
913 | # exist; the .fp member was not initialized but was needed by the
|
---|
914 | # __del__() method. Since the AttributeError is in the __del__(),
|
---|
915 | # it is ignored, but the user should be sufficiently annoyed by
|
---|
916 | # the message on the output that regression will be noticed
|
---|
917 | # quickly.
|
---|
918 | self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
|
---|
919 |
|
---|
920 | def test_empty_file_raises_BadZipFile(self):
|
---|
921 | with open(TESTFN, 'w') as f:
|
---|
922 | pass
|
---|
923 | self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
|
---|
924 |
|
---|
925 | with open(TESTFN, 'w') as fp:
|
---|
926 | fp.write("short file")
|
---|
927 | self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
|
---|
928 |
|
---|
929 | def test_closed_zip_raises_RuntimeError(self):
|
---|
930 | """Verify that testzip() doesn't swallow inappropriate exceptions."""
|
---|
931 | data = StringIO()
|
---|
932 | with zipfile.ZipFile(data, mode="w") as zipf:
|
---|
933 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
934 |
|
---|
935 | # This is correct; calling .read on a closed ZipFile should raise
|
---|
936 | # a RuntimeError, and so should calling .testzip. An earlier
|
---|
937 | # version of .testzip would swallow this exception (and any other)
|
---|
938 | # and report that the first file in the archive was corrupt.
|
---|
939 | self.assertRaises(RuntimeError, zipf.read, "foo.txt")
|
---|
940 | self.assertRaises(RuntimeError, zipf.open, "foo.txt")
|
---|
941 | self.assertRaises(RuntimeError, zipf.testzip)
|
---|
942 | self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
|
---|
943 | open(TESTFN, 'w').write('zipfile test data')
|
---|
944 | self.assertRaises(RuntimeError, zipf.write, TESTFN)
|
---|
945 |
|
---|
946 | def test_bad_constructor_mode(self):
|
---|
947 | """Check that bad modes passed to ZipFile constructor are caught."""
|
---|
948 | self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
|
---|
949 |
|
---|
950 | def test_bad_open_mode(self):
|
---|
951 | """Check that bad modes passed to ZipFile.open are caught."""
|
---|
952 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
953 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
954 |
|
---|
955 | with zipfile.ZipFile(TESTFN, mode="r") as zipf:
|
---|
956 | # read the data to make sure the file is there
|
---|
957 | zipf.read("foo.txt")
|
---|
958 | self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
|
---|
959 |
|
---|
960 | def test_read0(self):
|
---|
961 | """Check that calling read(0) on a ZipExtFile object returns an empty
|
---|
962 | string and doesn't advance file pointer."""
|
---|
963 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
964 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
965 | # read the data to make sure the file is there
|
---|
966 | with zipf.open("foo.txt") as f:
|
---|
967 | for i in xrange(FIXEDTEST_SIZE):
|
---|
968 | self.assertEqual(f.read(0), '')
|
---|
969 |
|
---|
970 | self.assertEqual(f.read(), "O, for a Muse of Fire!")
|
---|
971 |
|
---|
972 | def test_open_non_existent_item(self):
|
---|
973 | """Check that attempting to call open() for an item that doesn't
|
---|
974 | exist in the archive raises a RuntimeError."""
|
---|
975 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
976 | self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
|
---|
977 |
|
---|
978 | def test_bad_compression_mode(self):
|
---|
979 | """Check that bad compression methods passed to ZipFile.open are
|
---|
980 | caught."""
|
---|
981 | self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
|
---|
982 |
|
---|
983 | def test_unsupported_compression(self):
|
---|
984 | # data is declared as shrunk, but actually deflated
|
---|
985 | data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
|
---|
986 | b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
|
---|
987 | b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
|
---|
988 | b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
---|
989 | b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
|
---|
990 | b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
|
---|
991 | with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
|
---|
992 | self.assertRaises(NotImplementedError, zipf.open, 'x')
|
---|
993 |
|
---|
994 | def test_null_byte_in_filename(self):
|
---|
995 | """Check that a filename containing a null byte is properly
|
---|
996 | terminated."""
|
---|
997 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
998 | zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
|
---|
999 | self.assertEqual(zipf.namelist(), ['foo.txt'])
|
---|
1000 |
|
---|
1001 | def test_struct_sizes(self):
|
---|
1002 | """Check that ZIP internal structure sizes are calculated correctly."""
|
---|
1003 | self.assertEqual(zipfile.sizeEndCentDir, 22)
|
---|
1004 | self.assertEqual(zipfile.sizeCentralDir, 46)
|
---|
1005 | self.assertEqual(zipfile.sizeEndCentDir64, 56)
|
---|
1006 | self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
|
---|
1007 |
|
---|
1008 | def test_comments(self):
|
---|
1009 | """Check that comments on the archive are handled properly."""
|
---|
1010 |
|
---|
1011 | # check default comment is empty
|
---|
1012 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
1013 | self.assertEqual(zipf.comment, '')
|
---|
1014 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
1015 |
|
---|
1016 | with zipfile.ZipFile(TESTFN, mode="r") as zipf:
|
---|
1017 | self.assertEqual(zipf.comment, '')
|
---|
1018 |
|
---|
1019 | # check a simple short comment
|
---|
1020 | comment = 'Bravely taking to his feet, he beat a very brave retreat.'
|
---|
1021 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
1022 | zipf.comment = comment
|
---|
1023 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
1024 | with zipfile.ZipFile(TESTFN, mode="r") as zipf:
|
---|
1025 | self.assertEqual(zipf.comment, comment)
|
---|
1026 |
|
---|
1027 | # check a comment of max length
|
---|
1028 | comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
|
---|
1029 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
1030 | zipf.comment = comment2
|
---|
1031 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
1032 |
|
---|
1033 | with zipfile.ZipFile(TESTFN, mode="r") as zipf:
|
---|
1034 | self.assertEqual(zipf.comment, comment2)
|
---|
1035 |
|
---|
1036 | # check a comment that is too long is truncated
|
---|
1037 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
1038 | zipf.comment = comment2 + 'oops'
|
---|
1039 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
1040 | with zipfile.ZipFile(TESTFN, mode="r") as zipf:
|
---|
1041 | self.assertEqual(zipf.comment, comment2)
|
---|
1042 |
|
---|
1043 | def test_change_comment_in_empty_archive(self):
|
---|
1044 | with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
|
---|
1045 | self.assertFalse(zipf.filelist)
|
---|
1046 | zipf.comment = b"this is a comment"
|
---|
1047 | with zipfile.ZipFile(TESTFN, "r") as zipf:
|
---|
1048 | self.assertEqual(zipf.comment, b"this is a comment")
|
---|
1049 |
|
---|
1050 | def test_change_comment_in_nonempty_archive(self):
|
---|
1051 | with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
|
---|
1052 | zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
---|
1053 | with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
|
---|
1054 | self.assertTrue(zipf.filelist)
|
---|
1055 | zipf.comment = b"this is a comment"
|
---|
1056 | with zipfile.ZipFile(TESTFN, "r") as zipf:
|
---|
1057 | self.assertEqual(zipf.comment, b"this is a comment")
|
---|
1058 |
|
---|
1059 | def check_testzip_with_bad_crc(self, compression):
|
---|
1060 | """Tests that files with bad CRCs return their name from testzip."""
|
---|
1061 | zipdata = self.zips_with_bad_crc[compression]
|
---|
1062 |
|
---|
1063 | with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
|
---|
1064 | # testzip returns the name of the first corrupt file, or None
|
---|
1065 | self.assertEqual('afile', zipf.testzip())
|
---|
1066 |
|
---|
1067 | def test_testzip_with_bad_crc_stored(self):
|
---|
1068 | self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
|
---|
1069 |
|
---|
1070 | @skipUnless(zlib, "requires zlib")
|
---|
1071 | def test_testzip_with_bad_crc_deflated(self):
|
---|
1072 | self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
|
---|
1073 |
|
---|
1074 | def check_read_with_bad_crc(self, compression):
|
---|
1075 | """Tests that files with bad CRCs raise a BadZipfile exception when read."""
|
---|
1076 | zipdata = self.zips_with_bad_crc[compression]
|
---|
1077 |
|
---|
1078 | # Using ZipFile.read()
|
---|
1079 | with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
|
---|
1080 | self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
|
---|
1081 |
|
---|
1082 | # Using ZipExtFile.read()
|
---|
1083 | with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
|
---|
1084 | with zipf.open('afile', 'r') as corrupt_file:
|
---|
1085 | self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
|
---|
1086 |
|
---|
1087 | # Same with small reads (in order to exercise the buffering logic)
|
---|
1088 | with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
|
---|
1089 | with zipf.open('afile', 'r') as corrupt_file:
|
---|
1090 | corrupt_file.MIN_READ_SIZE = 2
|
---|
1091 | with self.assertRaises(zipfile.BadZipfile):
|
---|
1092 | while corrupt_file.read(2):
|
---|
1093 | pass
|
---|
1094 |
|
---|
1095 | def test_read_with_bad_crc_stored(self):
|
---|
1096 | self.check_read_with_bad_crc(zipfile.ZIP_STORED)
|
---|
1097 |
|
---|
1098 | @skipUnless(zlib, "requires zlib")
|
---|
1099 | def test_read_with_bad_crc_deflated(self):
|
---|
1100 | self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
|
---|
1101 |
|
---|
1102 | def check_read_return_size(self, compression):
|
---|
1103 | # Issue #9837: ZipExtFile.read() shouldn't return more bytes
|
---|
1104 | # than requested.
|
---|
1105 | for test_size in (1, 4095, 4096, 4097, 16384):
|
---|
1106 | file_size = test_size + 1
|
---|
1107 | junk = b''.join(struct.pack('B', randint(0, 255))
|
---|
1108 | for x in range(file_size))
|
---|
1109 | with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf:
|
---|
1110 | zipf.writestr('foo', junk)
|
---|
1111 | with zipf.open('foo', 'r') as fp:
|
---|
1112 | buf = fp.read(test_size)
|
---|
1113 | self.assertEqual(len(buf), test_size)
|
---|
1114 |
|
---|
1115 | def test_read_return_size_stored(self):
|
---|
1116 | self.check_read_return_size(zipfile.ZIP_STORED)
|
---|
1117 |
|
---|
1118 | @skipUnless(zlib, "requires zlib")
|
---|
1119 | def test_read_return_size_deflated(self):
|
---|
1120 | self.check_read_return_size(zipfile.ZIP_DEFLATED)
|
---|
1121 |
|
---|
1122 | def test_empty_zipfile(self):
|
---|
1123 | # Check that creating a file in 'w' or 'a' mode and closing without
|
---|
1124 | # adding any files to the archives creates a valid empty ZIP file
|
---|
1125 | with zipfile.ZipFile(TESTFN, mode="w") as zipf:
|
---|
1126 | pass
|
---|
1127 | try:
|
---|
1128 | zipf = zipfile.ZipFile(TESTFN, mode="r")
|
---|
1129 | except zipfile.BadZipfile:
|
---|
1130 | self.fail("Unable to create empty ZIP file in 'w' mode")
|
---|
1131 |
|
---|
1132 | with zipfile.ZipFile(TESTFN, mode="a") as zipf:
|
---|
1133 | pass
|
---|
1134 | try:
|
---|
1135 | zipf = zipfile.ZipFile(TESTFN, mode="r")
|
---|
1136 | except:
|
---|
1137 | self.fail("Unable to create empty ZIP file in 'a' mode")
|
---|
1138 |
|
---|
1139 | def test_open_empty_file(self):
|
---|
1140 | # Issue 1710703: Check that opening a file with less than 22 bytes
|
---|
1141 | # raises a BadZipfile exception (rather than the previously unhelpful
|
---|
1142 | # IOError)
|
---|
1143 | with open(TESTFN, 'w') as f:
|
---|
1144 | pass
|
---|
1145 | self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r')
|
---|
1146 |
|
---|
1147 | def test_create_zipinfo_before_1980(self):
|
---|
1148 | self.assertRaises(ValueError,
|
---|
1149 | zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
|
---|
1150 |
|
---|
1151 | def tearDown(self):
|
---|
1152 | unlink(TESTFN)
|
---|
1153 | unlink(TESTFN2)
|
---|
1154 |
|
---|
1155 |
|
---|
1156 | class DecryptionTests(unittest.TestCase):
|
---|
1157 | """Check that ZIP decryption works. Since the library does not
|
---|
1158 | support encryption at the moment, we use a pre-generated encrypted
|
---|
1159 | ZIP file."""
|
---|
1160 |
|
---|
1161 | data = (
|
---|
1162 | 'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
|
---|
1163 | '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
|
---|
1164 | '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
|
---|
1165 | 'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
|
---|
1166 | '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
|
---|
1167 | '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
|
---|
1168 | '\x00\x00L\x00\x00\x00\x00\x00' )
|
---|
1169 | data2 = (
|
---|
1170 | 'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
|
---|
1171 | '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
|
---|
1172 | '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
|
---|
1173 | 'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
|
---|
1174 | '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
|
---|
1175 | '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
|
---|
1176 | 'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
|
---|
1177 | '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
|
---|
1178 |
|
---|
1179 | plain = 'zipfile.py encryption test'
|
---|
1180 | plain2 = '\x00'*512
|
---|
1181 |
|
---|
1182 | def setUp(self):
|
---|
1183 | with open(TESTFN, "wb") as fp:
|
---|
1184 | fp.write(self.data)
|
---|
1185 | self.zip = zipfile.ZipFile(TESTFN, "r")
|
---|
1186 | with open(TESTFN2, "wb") as fp:
|
---|
1187 | fp.write(self.data2)
|
---|
1188 | self.zip2 = zipfile.ZipFile(TESTFN2, "r")
|
---|
1189 |
|
---|
1190 | def tearDown(self):
|
---|
1191 | self.zip.close()
|
---|
1192 | os.unlink(TESTFN)
|
---|
1193 | self.zip2.close()
|
---|
1194 | os.unlink(TESTFN2)
|
---|
1195 |
|
---|
1196 | def test_no_password(self):
|
---|
1197 | # Reading the encrypted file without password
|
---|
1198 | # must generate a RunTime exception
|
---|
1199 | self.assertRaises(RuntimeError, self.zip.read, "test.txt")
|
---|
1200 | self.assertRaises(RuntimeError, self.zip2.read, "zero")
|
---|
1201 |
|
---|
1202 | def test_bad_password(self):
|
---|
1203 | self.zip.setpassword("perl")
|
---|
1204 | self.assertRaises(RuntimeError, self.zip.read, "test.txt")
|
---|
1205 | self.zip2.setpassword("perl")
|
---|
1206 | self.assertRaises(RuntimeError, self.zip2.read, "zero")
|
---|
1207 |
|
---|
1208 | @skipUnless(zlib, "requires zlib")
|
---|
1209 | def test_good_password(self):
|
---|
1210 | self.zip.setpassword("python")
|
---|
1211 | self.assertEqual(self.zip.read("test.txt"), self.plain)
|
---|
1212 | self.zip2.setpassword("12345")
|
---|
1213 | self.assertEqual(self.zip2.read("zero"), self.plain2)
|
---|
1214 |
|
---|
1215 |
|
---|
1216 | class TestsWithRandomBinaryFiles(unittest.TestCase):
|
---|
1217 | def setUp(self):
|
---|
1218 | datacount = randint(16, 64)*1024 + randint(1, 1024)
|
---|
1219 | self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000))
|
---|
1220 | for i in xrange(datacount))
|
---|
1221 |
|
---|
1222 | # Make a source file with some lines
|
---|
1223 | with open(TESTFN, "wb") as fp:
|
---|
1224 | fp.write(self.data)
|
---|
1225 |
|
---|
1226 | def tearDown(self):
|
---|
1227 | unlink(TESTFN)
|
---|
1228 | unlink(TESTFN2)
|
---|
1229 |
|
---|
1230 | def make_test_archive(self, f, compression):
|
---|
1231 | # Create the ZIP archive
|
---|
1232 | with zipfile.ZipFile(f, "w", compression) as zipfp:
|
---|
1233 | zipfp.write(TESTFN, "another.name")
|
---|
1234 | zipfp.write(TESTFN, TESTFN)
|
---|
1235 |
|
---|
1236 | def zip_test(self, f, compression):
|
---|
1237 | self.make_test_archive(f, compression)
|
---|
1238 |
|
---|
1239 | # Read the ZIP archive
|
---|
1240 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
1241 | testdata = zipfp.read(TESTFN)
|
---|
1242 | self.assertEqual(len(testdata), len(self.data))
|
---|
1243 | self.assertEqual(testdata, self.data)
|
---|
1244 | self.assertEqual(zipfp.read("another.name"), self.data)
|
---|
1245 |
|
---|
1246 | def test_stored(self):
|
---|
1247 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1248 | self.zip_test(f, zipfile.ZIP_STORED)
|
---|
1249 |
|
---|
1250 | @skipUnless(zlib, "requires zlib")
|
---|
1251 | def test_deflated(self):
|
---|
1252 | for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
---|
1253 | self.zip_test(f, zipfile.ZIP_DEFLATED)
|
---|
1254 |
|
---|
1255 | def zip_open_test(self, f, compression):
|
---|
1256 | self.make_test_archive(f, compression)
|
---|
1257 |
|
---|
1258 | # Read the ZIP archive
|
---|
1259 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
1260 | zipdata1 = []
|
---|
1261 | with zipfp.open(TESTFN) as zipopen1:
|
---|
1262 | while True:
|
---|
1263 | read_data = zipopen1.read(256)
|
---|
1264 | if not read_data:
|
---|
1265 | break
|
---|
1266 | zipdata1.append(read_data)
|
---|
1267 |
|
---|
1268 | zipdata2 = []
|
---|
1269 | with zipfp.open("another.name") as zipopen2:
|
---|
1270 | while True:
|
---|
1271 | read_data = zipopen2.read(256)
|
---|
1272 | if not read_data:
|
---|
1273 | break
|
---|
1274 | zipdata2.append(read_data)
|
---|
1275 |
|
---|
1276 | testdata1 = ''.join(zipdata1)
|
---|
1277 | self.assertEqual(len(testdata1), len(self.data))
|
---|
1278 | self.assertEqual(testdata1, self.data)
|
---|
1279 |
|
---|
1280 | testdata2 = ''.join(zipdata2)
|
---|
1281 | self.assertEqual(len(testdata2), len(self.data))
|
---|
1282 | self.assertEqual(testdata2, self.data)
|
---|
1283 |
|
---|
1284 | def test_open_stored(self):
|
---|
1285 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1286 | self.zip_open_test(f, zipfile.ZIP_STORED)
|
---|
1287 |
|
---|
1288 | @skipUnless(zlib, "requires zlib")
|
---|
1289 | def test_open_deflated(self):
|
---|
1290 | for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
---|
1291 | self.zip_open_test(f, zipfile.ZIP_DEFLATED)
|
---|
1292 |
|
---|
1293 | def zip_random_open_test(self, f, compression):
|
---|
1294 | self.make_test_archive(f, compression)
|
---|
1295 |
|
---|
1296 | # Read the ZIP archive
|
---|
1297 | with zipfile.ZipFile(f, "r", compression) as zipfp:
|
---|
1298 | zipdata1 = []
|
---|
1299 | with zipfp.open(TESTFN) as zipopen1:
|
---|
1300 | while True:
|
---|
1301 | read_data = zipopen1.read(randint(1, 1024))
|
---|
1302 | if not read_data:
|
---|
1303 | break
|
---|
1304 | zipdata1.append(read_data)
|
---|
1305 |
|
---|
1306 | testdata = ''.join(zipdata1)
|
---|
1307 | self.assertEqual(len(testdata), len(self.data))
|
---|
1308 | self.assertEqual(testdata, self.data)
|
---|
1309 |
|
---|
1310 | def test_random_open_stored(self):
|
---|
1311 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1312 | self.zip_random_open_test(f, zipfile.ZIP_STORED)
|
---|
1313 |
|
---|
1314 | @skipUnless(zlib, "requires zlib")
|
---|
1315 | def test_random_open_deflated(self):
|
---|
1316 | for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
---|
1317 | self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
|
---|
1318 |
|
---|
1319 |
|
---|
1320 | @skipUnless(zlib, "requires zlib")
|
---|
1321 | class TestsWithMultipleOpens(unittest.TestCase):
|
---|
1322 | def setUp(self):
|
---|
1323 | # Create the ZIP archive
|
---|
1324 | with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
|
---|
1325 | zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
|
---|
1326 | zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
|
---|
1327 |
|
---|
1328 | def test_same_file(self):
|
---|
1329 | # Verify that (when the ZipFile is in control of creating file objects)
|
---|
1330 | # multiple open() calls can be made without interfering with each other.
|
---|
1331 | with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
---|
1332 | zopen1 = zipf.open('ones')
|
---|
1333 | zopen2 = zipf.open('ones')
|
---|
1334 | data1 = zopen1.read(500)
|
---|
1335 | data2 = zopen2.read(500)
|
---|
1336 | data1 += zopen1.read(500)
|
---|
1337 | data2 += zopen2.read(500)
|
---|
1338 | self.assertEqual(data1, data2)
|
---|
1339 |
|
---|
1340 | def test_different_file(self):
|
---|
1341 | # Verify that (when the ZipFile is in control of creating file objects)
|
---|
1342 | # multiple open() calls can be made without interfering with each other.
|
---|
1343 | with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
---|
1344 | with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
|
---|
1345 | data1 = zopen1.read(500)
|
---|
1346 | data2 = zopen2.read(500)
|
---|
1347 | data1 += zopen1.read(500)
|
---|
1348 | data2 += zopen2.read(500)
|
---|
1349 | self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
|
---|
1350 | self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
|
---|
1351 |
|
---|
1352 | def test_interleaved(self):
|
---|
1353 | # Verify that (when the ZipFile is in control of creating file objects)
|
---|
1354 | # multiple open() calls can be made without interfering with each other.
|
---|
1355 | with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
---|
1356 | with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
|
---|
1357 | data1 = zopen1.read(500)
|
---|
1358 | data2 = zopen2.read(500)
|
---|
1359 | data1 += zopen1.read(500)
|
---|
1360 | data2 += zopen2.read(500)
|
---|
1361 | self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
|
---|
1362 | self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
|
---|
1363 |
|
---|
1364 | def tearDown(self):
|
---|
1365 | unlink(TESTFN2)
|
---|
1366 |
|
---|
1367 |
|
---|
1368 | class TestWithDirectory(unittest.TestCase):
|
---|
1369 | def setUp(self):
|
---|
1370 | os.mkdir(TESTFN2)
|
---|
1371 |
|
---|
1372 | def test_extract_dir(self):
|
---|
1373 | with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
|
---|
1374 | zipf.extractall(TESTFN2)
|
---|
1375 | self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
|
---|
1376 | self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
|
---|
1377 | self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
|
---|
1378 |
|
---|
1379 | def test_bug_6050(self):
|
---|
1380 | # Extraction should succeed if directories already exist
|
---|
1381 | os.mkdir(os.path.join(TESTFN2, "a"))
|
---|
1382 | self.test_extract_dir()
|
---|
1383 |
|
---|
1384 | def test_store_dir(self):
|
---|
1385 | os.mkdir(os.path.join(TESTFN2, "x"))
|
---|
1386 | zipf = zipfile.ZipFile(TESTFN, "w")
|
---|
1387 | zipf.write(os.path.join(TESTFN2, "x"), "x")
|
---|
1388 | self.assertTrue(zipf.filelist[0].filename.endswith("x/"))
|
---|
1389 |
|
---|
1390 | def tearDown(self):
|
---|
1391 | shutil.rmtree(TESTFN2)
|
---|
1392 | if os.path.exists(TESTFN):
|
---|
1393 | unlink(TESTFN)
|
---|
1394 |
|
---|
1395 |
|
---|
1396 | class UniversalNewlineTests(unittest.TestCase):
|
---|
1397 | def setUp(self):
|
---|
1398 | self.line_gen = ["Test of zipfile line %d." % i
|
---|
1399 | for i in xrange(FIXEDTEST_SIZE)]
|
---|
1400 | self.seps = ('\r', '\r\n', '\n')
|
---|
1401 | self.arcdata, self.arcfiles = {}, {}
|
---|
1402 | for n, s in enumerate(self.seps):
|
---|
1403 | self.arcdata[s] = s.join(self.line_gen) + s
|
---|
1404 | self.arcfiles[s] = '%s-%d' % (TESTFN, n)
|
---|
1405 | open(self.arcfiles[s], "wb").write(self.arcdata[s])
|
---|
1406 |
|
---|
1407 | def make_test_archive(self, f, compression):
|
---|
1408 | # Create the ZIP archive
|
---|
1409 | with zipfile.ZipFile(f, "w", compression) as zipfp:
|
---|
1410 | for fn in self.arcfiles.values():
|
---|
1411 | zipfp.write(fn, fn)
|
---|
1412 |
|
---|
1413 | def read_test(self, f, compression):
|
---|
1414 | self.make_test_archive(f, compression)
|
---|
1415 |
|
---|
1416 | # Read the ZIP archive
|
---|
1417 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
1418 | for sep, fn in self.arcfiles.items():
|
---|
1419 | with zipfp.open(fn, "rU") as fp:
|
---|
1420 | zipdata = fp.read()
|
---|
1421 | self.assertEqual(self.arcdata[sep], zipdata)
|
---|
1422 |
|
---|
1423 | def readline_read_test(self, f, compression):
|
---|
1424 | self.make_test_archive(f, compression)
|
---|
1425 |
|
---|
1426 | # Read the ZIP archive
|
---|
1427 | zipfp = zipfile.ZipFile(f, "r")
|
---|
1428 | for sep, fn in self.arcfiles.items():
|
---|
1429 | with zipfp.open(fn, "rU") as zipopen:
|
---|
1430 | data = ''
|
---|
1431 | while True:
|
---|
1432 | read = zipopen.readline()
|
---|
1433 | if not read:
|
---|
1434 | break
|
---|
1435 | data += read
|
---|
1436 |
|
---|
1437 | read = zipopen.read(5)
|
---|
1438 | if not read:
|
---|
1439 | break
|
---|
1440 | data += read
|
---|
1441 |
|
---|
1442 | self.assertEqual(data, self.arcdata['\n'])
|
---|
1443 |
|
---|
1444 | zipfp.close()
|
---|
1445 |
|
---|
1446 | def readline_test(self, f, compression):
|
---|
1447 | self.make_test_archive(f, compression)
|
---|
1448 |
|
---|
1449 | # Read the ZIP archive
|
---|
1450 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
1451 | for sep, fn in self.arcfiles.items():
|
---|
1452 | with zipfp.open(fn, "rU") as zipopen:
|
---|
1453 | for line in self.line_gen:
|
---|
1454 | linedata = zipopen.readline()
|
---|
1455 | self.assertEqual(linedata, line + '\n')
|
---|
1456 |
|
---|
1457 | def readlines_test(self, f, compression):
|
---|
1458 | self.make_test_archive(f, compression)
|
---|
1459 |
|
---|
1460 | # Read the ZIP archive
|
---|
1461 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
1462 | for sep, fn in self.arcfiles.items():
|
---|
1463 | with zipfp.open(fn, "rU") as fp:
|
---|
1464 | ziplines = fp.readlines()
|
---|
1465 | for line, zipline in zip(self.line_gen, ziplines):
|
---|
1466 | self.assertEqual(zipline, line + '\n')
|
---|
1467 |
|
---|
1468 | def iterlines_test(self, f, compression):
|
---|
1469 | self.make_test_archive(f, compression)
|
---|
1470 |
|
---|
1471 | # Read the ZIP archive
|
---|
1472 | with zipfile.ZipFile(f, "r") as zipfp:
|
---|
1473 | for sep, fn in self.arcfiles.items():
|
---|
1474 | for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
|
---|
1475 | self.assertEqual(zipline, line + '\n')
|
---|
1476 |
|
---|
1477 | def test_read_stored(self):
|
---|
1478 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1479 | self.read_test(f, zipfile.ZIP_STORED)
|
---|
1480 |
|
---|
1481 | def test_readline_read_stored(self):
|
---|
1482 | # Issue #7610: calls to readline() interleaved with calls to read().
|
---|
1483 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1484 | self.readline_read_test(f, zipfile.ZIP_STORED)
|
---|
1485 |
|
---|
1486 | def test_readline_stored(self):
|
---|
1487 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1488 | self.readline_test(f, zipfile.ZIP_STORED)
|
---|
1489 |
|
---|
1490 | def test_readlines_stored(self):
|
---|
1491 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1492 | self.readlines_test(f, zipfile.ZIP_STORED)
|
---|
1493 |
|
---|
1494 | def test_iterlines_stored(self):
|
---|
1495 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1496 | self.iterlines_test(f, zipfile.ZIP_STORED)
|
---|
1497 |
|
---|
1498 | @skipUnless(zlib, "requires zlib")
|
---|
1499 | def test_read_deflated(self):
|
---|
1500 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1501 | self.read_test(f, zipfile.ZIP_DEFLATED)
|
---|
1502 |
|
---|
1503 | @skipUnless(zlib, "requires zlib")
|
---|
1504 | def test_readline_read_deflated(self):
|
---|
1505 | # Issue #7610: calls to readline() interleaved with calls to read().
|
---|
1506 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1507 | self.readline_read_test(f, zipfile.ZIP_DEFLATED)
|
---|
1508 |
|
---|
1509 | @skipUnless(zlib, "requires zlib")
|
---|
1510 | def test_readline_deflated(self):
|
---|
1511 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1512 | self.readline_test(f, zipfile.ZIP_DEFLATED)
|
---|
1513 |
|
---|
1514 | @skipUnless(zlib, "requires zlib")
|
---|
1515 | def test_readlines_deflated(self):
|
---|
1516 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1517 | self.readlines_test(f, zipfile.ZIP_DEFLATED)
|
---|
1518 |
|
---|
1519 | @skipUnless(zlib, "requires zlib")
|
---|
1520 | def test_iterlines_deflated(self):
|
---|
1521 | for f in (TESTFN2, TemporaryFile(), StringIO()):
|
---|
1522 | self.iterlines_test(f, zipfile.ZIP_DEFLATED)
|
---|
1523 |
|
---|
1524 | def tearDown(self):
|
---|
1525 | for sep, fn in self.arcfiles.items():
|
---|
1526 | os.remove(fn)
|
---|
1527 | unlink(TESTFN)
|
---|
1528 | unlink(TESTFN2)
|
---|
1529 |
|
---|
1530 |
|
---|
1531 | def test_main():
|
---|
1532 | run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
|
---|
1533 | PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
|
---|
1534 | TestWithDirectory, UniversalNewlineTests,
|
---|
1535 | TestsWithRandomBinaryFiles)
|
---|
1536 |
|
---|
1537 | if __name__ == "__main__":
|
---|
1538 | test_main()
|
---|