source: python/trunk/Lib/test/test_tarfile.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 56.7 KB
Line 
1# -*- coding: iso-8859-15 -*-
2
3import sys
4import os
5import shutil
6import StringIO
7from hashlib import md5
8import errno
9
10import unittest
11import tarfile
12
13from test import test_support
14
15# Check for our compression modules.
16try:
17 import gzip
18 gzip.GzipFile
19except (ImportError, AttributeError):
20 gzip = None
21try:
22 import bz2
23except ImportError:
24 bz2 = None
25
26def md5sum(data):
27 return md5(data).hexdigest()
28
29TEMPDIR = os.path.abspath(test_support.TESTFN)
30tarname = test_support.findfile("testtar.tar")
31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
33tmpname = os.path.join(TEMPDIR, "tmp.tar")
34
35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
37
38
39class ReadTest(unittest.TestCase):
40
41 tarname = tarname
42 mode = "r:"
43
44 def setUp(self):
45 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
46
47 def tearDown(self):
48 self.tar.close()
49
50
51class UstarReadTest(ReadTest):
52
53 def test_fileobj_regular_file(self):
54 tarinfo = self.tar.getmember("ustar/regtype")
55 fobj = self.tar.extractfile(tarinfo)
56 data = fobj.read()
57 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
58 "regular file extraction failed")
59
60 def test_fileobj_readlines(self):
61 self.tar.extract("ustar/regtype", TEMPDIR)
62 tarinfo = self.tar.getmember("ustar/regtype")
63 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
64 fobj2 = self.tar.extractfile(tarinfo)
65
66 lines1 = fobj1.readlines()
67 lines2 = fobj2.readlines()
68 self.assertTrue(lines1 == lines2,
69 "fileobj.readlines() failed")
70 self.assertTrue(len(lines2) == 114,
71 "fileobj.readlines() failed")
72 self.assertTrue(lines2[83] ==
73 "I will gladly admit that Python is not the fastest running scripting language.\n",
74 "fileobj.readlines() failed")
75
76 def test_fileobj_iter(self):
77 self.tar.extract("ustar/regtype", TEMPDIR)
78 tarinfo = self.tar.getmember("ustar/regtype")
79 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
80 fobj2 = self.tar.extractfile(tarinfo)
81 lines1 = fobj1.readlines()
82 lines2 = [line for line in fobj2]
83 self.assertTrue(lines1 == lines2,
84 "fileobj.__iter__() failed")
85
86 def test_fileobj_seek(self):
87 self.tar.extract("ustar/regtype", TEMPDIR)
88 fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
89 data = fobj.read()
90 fobj.close()
91
92 tarinfo = self.tar.getmember("ustar/regtype")
93 fobj = self.tar.extractfile(tarinfo)
94
95 text = fobj.read()
96 fobj.seek(0)
97 self.assertTrue(0 == fobj.tell(),
98 "seek() to file's start failed")
99 fobj.seek(2048, 0)
100 self.assertTrue(2048 == fobj.tell(),
101 "seek() to absolute position failed")
102 fobj.seek(-1024, 1)
103 self.assertTrue(1024 == fobj.tell(),
104 "seek() to negative relative position failed")
105 fobj.seek(1024, 1)
106 self.assertTrue(2048 == fobj.tell(),
107 "seek() to positive relative position failed")
108 s = fobj.read(10)
109 self.assertTrue(s == data[2048:2058],
110 "read() after seek failed")
111 fobj.seek(0, 2)
112 self.assertTrue(tarinfo.size == fobj.tell(),
113 "seek() to file's end failed")
114 self.assertTrue(fobj.read() == "",
115 "read() at file's end did not return empty string")
116 fobj.seek(-tarinfo.size, 2)
117 self.assertTrue(0 == fobj.tell(),
118 "relative seek() to file's start failed")
119 fobj.seek(512)
120 s1 = fobj.readlines()
121 fobj.seek(512)
122 s2 = fobj.readlines()
123 self.assertTrue(s1 == s2,
124 "readlines() after seek failed")
125 fobj.seek(0)
126 self.assertTrue(len(fobj.readline()) == fobj.tell(),
127 "tell() after readline() failed")
128 fobj.seek(512)
129 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
130 "tell() after seek() and readline() failed")
131 fobj.seek(0)
132 line = fobj.readline()
133 self.assertTrue(fobj.read() == data[len(line):],
134 "read() after readline() failed")
135 fobj.close()
136
137 # Test if symbolic and hard links are resolved by extractfile(). The
138 # test link members each point to a regular member whose data is
139 # supposed to be exported.
140 def _test_fileobj_link(self, lnktype, regtype):
141 a = self.tar.extractfile(lnktype)
142 b = self.tar.extractfile(regtype)
143 self.assertEqual(a.name, b.name)
144
145 def test_fileobj_link1(self):
146 self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
147
148 def test_fileobj_link2(self):
149 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
150
151 def test_fileobj_symlink1(self):
152 self._test_fileobj_link("ustar/symtype", "ustar/regtype")
153
154 def test_fileobj_symlink2(self):
155 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
156
157 def test_issue14160(self):
158 self._test_fileobj_link("symtype2", "ustar/regtype")
159
160
161class CommonReadTest(ReadTest):
162
163 def test_empty_tarfile(self):
164 # Test for issue6123: Allow opening empty archives.
165 # This test checks if tarfile.open() is able to open an empty tar
166 # archive successfully. Note that an empty tar archive is not the
167 # same as an empty file!
168 tarfile.open(tmpname, self.mode.replace("r", "w")).close()
169 try:
170 tar = tarfile.open(tmpname, self.mode)
171 tar.getnames()
172 except tarfile.ReadError:
173 self.fail("tarfile.open() failed on empty archive")
174 self.assertListEqual(tar.getmembers(), [])
175
176 def test_null_tarfile(self):
177 # Test for issue6123: Allow opening empty archives.
178 # This test guarantees that tarfile.open() does not treat an empty
179 # file as an empty tar archive.
180 open(tmpname, "wb").close()
181 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
182 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
183
184 def test_ignore_zeros(self):
185 # Test TarFile's ignore_zeros option.
186 if self.mode.endswith(":gz"):
187 _open = gzip.GzipFile
188 elif self.mode.endswith(":bz2"):
189 _open = bz2.BZ2File
190 else:
191 _open = open
192
193 for char in ('\0', 'a'):
194 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
195 # are ignored correctly.
196 fobj = _open(tmpname, "wb")
197 fobj.write(char * 1024)
198 fobj.write(tarfile.TarInfo("foo").tobuf())
199 fobj.close()
200
201 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
202 self.assertListEqual(tar.getnames(), ["foo"],
203 "ignore_zeros=True should have skipped the %r-blocks" % char)
204 tar.close()
205
206
207class MiscReadTest(CommonReadTest):
208
209 def test_no_name_argument(self):
210 fobj = open(self.tarname, "rb")
211 tar = tarfile.open(fileobj=fobj, mode=self.mode)
212 self.assertEqual(tar.name, os.path.abspath(fobj.name))
213
214 def test_no_name_attribute(self):
215 data = open(self.tarname, "rb").read()
216 fobj = StringIO.StringIO(data)
217 self.assertRaises(AttributeError, getattr, fobj, "name")
218 tar = tarfile.open(fileobj=fobj, mode=self.mode)
219 self.assertEqual(tar.name, None)
220
221 def test_empty_name_attribute(self):
222 data = open(self.tarname, "rb").read()
223 fobj = StringIO.StringIO(data)
224 fobj.name = ""
225 tar = tarfile.open(fileobj=fobj, mode=self.mode)
226 self.assertEqual(tar.name, None)
227
228 def test_fileobj_with_offset(self):
229 # Skip the first member and store values from the second member
230 # of the testtar.
231 tar = tarfile.open(self.tarname, mode=self.mode)
232 tar.next()
233 t = tar.next()
234 name = t.name
235 offset = t.offset
236 data = tar.extractfile(t).read()
237 tar.close()
238
239 # Open the testtar and seek to the offset of the second member.
240 if self.mode.endswith(":gz"):
241 _open = gzip.GzipFile
242 elif self.mode.endswith(":bz2"):
243 _open = bz2.BZ2File
244 else:
245 _open = open
246 fobj = _open(self.tarname, "rb")
247 fobj.seek(offset)
248
249 # Test if the tarfile starts with the second member.
250 tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
251 t = tar.next()
252 self.assertEqual(t.name, name)
253 # Read to the end of fileobj and test if seeking back to the
254 # beginning works.
255 tar.getmembers()
256 self.assertEqual(tar.extractfile(t).read(), data,
257 "seek back did not work")
258 tar.close()
259
260 def test_fail_comp(self):
261 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
262 if self.mode == "r:":
263 return
264 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
265 fobj = open(tarname, "rb")
266 self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
267
268 def test_v7_dirtype(self):
269 # Test old style dirtype member (bug #1336623):
270 # Old V7 tars create directory members using an AREGTYPE
271 # header with a "/" appended to the filename field.
272 tarinfo = self.tar.getmember("misc/dirtype-old-v7")
273 self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
274 "v7 dirtype failed")
275
276 def test_xstar_type(self):
277 # The xstar format stores extra atime and ctime fields inside the
278 # space reserved for the prefix field. The prefix field must be
279 # ignored in this case, otherwise it will mess up the name.
280 try:
281 self.tar.getmember("misc/regtype-xstar")
282 except KeyError:
283 self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
284
285 def test_check_members(self):
286 for tarinfo in self.tar:
287 self.assertTrue(int(tarinfo.mtime) == 07606136617,
288 "wrong mtime for %s" % tarinfo.name)
289 if not tarinfo.name.startswith("ustar/"):
290 continue
291 self.assertTrue(tarinfo.uname == "tarfile",
292 "wrong uname for %s" % tarinfo.name)
293
294 def test_find_members(self):
295 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
296 "could not find all members")
297
298 def test_extract_hardlink(self):
299 # Test hardlink extraction (e.g. bug #857297).
300 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
301 tar.extract("ustar/regtype", TEMPDIR)
302 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
303
304 tar.extract("ustar/lnktype", TEMPDIR)
305 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
306 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
307 data = f.read()
308 self.assertEqual(md5sum(data), md5_regtype)
309
310 tar.extract("ustar/symtype", TEMPDIR)
311 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
312 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
313 data = f.read()
314 self.assertEqual(md5sum(data), md5_regtype)
315
316 def test_extractall(self):
317 # Test if extractall() correctly restores directory permissions
318 # and times (see issue1735).
319 tar = tarfile.open(tarname, encoding="iso8859-1")
320 directories = [t for t in tar if t.isdir()]
321 tar.extractall(TEMPDIR, directories)
322 for tarinfo in directories:
323 path = os.path.join(TEMPDIR, tarinfo.name)
324 if sys.platform != "win32":
325 # Win32 has no support for fine grained permissions.
326 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
327 self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
328 tar.close()
329
330 def test_init_close_fobj(self):
331 # Issue #7341: Close the internal file object in the TarFile
332 # constructor in case of an error. For the test we rely on
333 # the fact that opening an empty file raises a ReadError.
334 empty = os.path.join(TEMPDIR, "empty")
335 open(empty, "wb").write("")
336
337 try:
338 tar = object.__new__(tarfile.TarFile)
339 try:
340 tar.__init__(empty)
341 except tarfile.ReadError:
342 self.assertTrue(tar.fileobj.closed)
343 else:
344 self.fail("ReadError not raised")
345 finally:
346 os.remove(empty)
347
348 def test_parallel_iteration(self):
349 # Issue #16601: Restarting iteration over tarfile continued
350 # from where it left off.
351 with tarfile.open(self.tarname) as tar:
352 for m1, m2 in zip(tar, tar):
353 self.assertEqual(m1.offset, m2.offset)
354 self.assertEqual(m1.name, m2.name)
355
356
357class StreamReadTest(CommonReadTest):
358
359 mode="r|"
360
361 def test_fileobj_regular_file(self):
362 tarinfo = self.tar.next() # get "regtype" (can't use getmember)
363 fobj = self.tar.extractfile(tarinfo)
364 data = fobj.read()
365 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
366 "regular file extraction failed")
367
368 def test_provoke_stream_error(self):
369 tarinfos = self.tar.getmembers()
370 f = self.tar.extractfile(tarinfos[0]) # read the first member
371 self.assertRaises(tarfile.StreamError, f.read)
372
373 def test_compare_members(self):
374 tar1 = tarfile.open(tarname, encoding="iso8859-1")
375 tar2 = self.tar
376
377 while True:
378 t1 = tar1.next()
379 t2 = tar2.next()
380 if t1 is None:
381 break
382 self.assertTrue(t2 is not None, "stream.next() failed.")
383
384 if t2.islnk() or t2.issym():
385 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
386 continue
387
388 v1 = tar1.extractfile(t1)
389 v2 = tar2.extractfile(t2)
390 if v1 is None:
391 continue
392 self.assertTrue(v2 is not None, "stream.extractfile() failed")
393 self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
394
395 tar1.close()
396
397
398class DetectReadTest(unittest.TestCase):
399
400 def _testfunc_file(self, name, mode):
401 try:
402 tarfile.open(name, mode)
403 except tarfile.ReadError:
404 self.fail()
405
406 def _testfunc_fileobj(self, name, mode):
407 try:
408 tarfile.open(name, mode, fileobj=open(name, "rb"))
409 except tarfile.ReadError:
410 self.fail()
411
412 def _test_modes(self, testfunc):
413 testfunc(tarname, "r")
414 testfunc(tarname, "r:")
415 testfunc(tarname, "r:*")
416 testfunc(tarname, "r|")
417 testfunc(tarname, "r|*")
418
419 if gzip:
420 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
421 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
422 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
423 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
424
425 testfunc(gzipname, "r")
426 testfunc(gzipname, "r:*")
427 testfunc(gzipname, "r:gz")
428 testfunc(gzipname, "r|*")
429 testfunc(gzipname, "r|gz")
430
431 if bz2:
432 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
433 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
434 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
435 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
436
437 testfunc(bz2name, "r")
438 testfunc(bz2name, "r:*")
439 testfunc(bz2name, "r:bz2")
440 testfunc(bz2name, "r|*")
441 testfunc(bz2name, "r|bz2")
442
443 def test_detect_file(self):
444 self._test_modes(self._testfunc_file)
445
446 def test_detect_fileobj(self):
447 self._test_modes(self._testfunc_fileobj)
448
449 def test_detect_stream_bz2(self):
450 # Originally, tarfile's stream detection looked for the string
451 # "BZh91" at the start of the file. This is incorrect because
452 # the '9' represents the blocksize (900kB). If the file was
453 # compressed using another blocksize autodetection fails.
454 if not bz2:
455 return
456
457 with open(tarname, "rb") as fobj:
458 data = fobj.read()
459
460 # Compress with blocksize 100kB, the file starts with "BZh11".
461 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
462 fobj.write(data)
463
464 self._testfunc_file(tmpname, "r|*")
465
466
467class MemberReadTest(ReadTest):
468
469 def _test_member(self, tarinfo, chksum=None, **kwargs):
470 if chksum is not None:
471 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
472 "wrong md5sum for %s" % tarinfo.name)
473
474 kwargs["mtime"] = 07606136617
475 kwargs["uid"] = 1000
476 kwargs["gid"] = 100
477 if "old-v7" not in tarinfo.name:
478 # V7 tar can't handle alphabetic owners.
479 kwargs["uname"] = "tarfile"
480 kwargs["gname"] = "tarfile"
481 for k, v in kwargs.iteritems():
482 self.assertTrue(getattr(tarinfo, k) == v,
483 "wrong value in %s field of %s" % (k, tarinfo.name))
484
485 def test_find_regtype(self):
486 tarinfo = self.tar.getmember("ustar/regtype")
487 self._test_member(tarinfo, size=7011, chksum=md5_regtype)
488
489 def test_find_conttype(self):
490 tarinfo = self.tar.getmember("ustar/conttype")
491 self._test_member(tarinfo, size=7011, chksum=md5_regtype)
492
493 def test_find_dirtype(self):
494 tarinfo = self.tar.getmember("ustar/dirtype")
495 self._test_member(tarinfo, size=0)
496
497 def test_find_dirtype_with_size(self):
498 tarinfo = self.tar.getmember("ustar/dirtype-with-size")
499 self._test_member(tarinfo, size=255)
500
501 def test_find_lnktype(self):
502 tarinfo = self.tar.getmember("ustar/lnktype")
503 self._test_member(tarinfo, size=0, linkname="ustar/regtype")
504
505 def test_find_symtype(self):
506 tarinfo = self.tar.getmember("ustar/symtype")
507 self._test_member(tarinfo, size=0, linkname="regtype")
508
509 def test_find_blktype(self):
510 tarinfo = self.tar.getmember("ustar/blktype")
511 self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
512
513 def test_find_chrtype(self):
514 tarinfo = self.tar.getmember("ustar/chrtype")
515 self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
516
517 def test_find_fifotype(self):
518 tarinfo = self.tar.getmember("ustar/fifotype")
519 self._test_member(tarinfo, size=0)
520
521 def test_find_sparse(self):
522 tarinfo = self.tar.getmember("ustar/sparse")
523 self._test_member(tarinfo, size=86016, chksum=md5_sparse)
524
525 def test_find_umlauts(self):
526 tarinfo = self.tar.getmember("ustar/umlauts-ÄÖÜäöüß")
527 self._test_member(tarinfo, size=7011, chksum=md5_regtype)
528
529 def test_find_ustar_longname(self):
530 name = "ustar/" + "12345/" * 39 + "1234567/longname"
531 self.assertIn(name, self.tar.getnames())
532
533 def test_find_regtype_oldv7(self):
534 tarinfo = self.tar.getmember("misc/regtype-old-v7")
535 self._test_member(tarinfo, size=7011, chksum=md5_regtype)
536
537 def test_find_pax_umlauts(self):
538 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
539 tarinfo = self.tar.getmember("pax/umlauts-ÄÖÜäöüß")
540 self._test_member(tarinfo, size=7011, chksum=md5_regtype)
541
542
543class LongnameTest(ReadTest):
544
545 def test_read_longname(self):
546 # Test reading of longname (bug #1471427).
547 longname = self.subdir + "/" + "123/" * 125 + "longname"
548 try:
549 tarinfo = self.tar.getmember(longname)
550 except KeyError:
551 self.fail("longname not found")
552 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
553
554 def test_read_longlink(self):
555 longname = self.subdir + "/" + "123/" * 125 + "longname"
556 longlink = self.subdir + "/" + "123/" * 125 + "longlink"
557 try:
558 tarinfo = self.tar.getmember(longlink)
559 except KeyError:
560 self.fail("longlink not found")
561 self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
562
563 def test_truncated_longname(self):
564 longname = self.subdir + "/" + "123/" * 125 + "longname"
565 tarinfo = self.tar.getmember(longname)
566 offset = tarinfo.offset
567 self.tar.fileobj.seek(offset)
568 fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
569 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
570
571 def test_header_offset(self):
572 # Test if the start offset of the TarInfo object includes
573 # the preceding extended header.
574 longname = self.subdir + "/" + "123/" * 125 + "longname"
575 offset = self.tar.getmember(longname).offset
576 fobj = open(tarname)
577 fobj.seek(offset)
578 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
579 self.assertEqual(tarinfo.type, self.longnametype)
580
581
582class GNUReadTest(LongnameTest):
583
584 subdir = "gnu"
585 longnametype = tarfile.GNUTYPE_LONGNAME
586
587 def test_sparse_file(self):
588 tarinfo1 = self.tar.getmember("ustar/sparse")
589 fobj1 = self.tar.extractfile(tarinfo1)
590 tarinfo2 = self.tar.getmember("gnu/sparse")
591 fobj2 = self.tar.extractfile(tarinfo2)
592 self.assertTrue(fobj1.read() == fobj2.read(),
593 "sparse file extraction failed")
594
595
596class PaxReadTest(LongnameTest):
597
598 subdir = "pax"
599 longnametype = tarfile.XHDTYPE
600
601 def test_pax_global_headers(self):
602 tar = tarfile.open(tarname, encoding="iso8859-1")
603
604 tarinfo = tar.getmember("pax/regtype1")
605 self.assertEqual(tarinfo.uname, "foo")
606 self.assertEqual(tarinfo.gname, "bar")
607 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß")
608
609 tarinfo = tar.getmember("pax/regtype2")
610 self.assertEqual(tarinfo.uname, "")
611 self.assertEqual(tarinfo.gname, "bar")
612 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß")
613
614 tarinfo = tar.getmember("pax/regtype3")
615 self.assertEqual(tarinfo.uname, "tarfile")
616 self.assertEqual(tarinfo.gname, "tarfile")
617 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß")
618
619 def test_pax_number_fields(self):
620 # All following number fields are read from the pax header.
621 tar = tarfile.open(tarname, encoding="iso8859-1")
622 tarinfo = tar.getmember("pax/regtype4")
623 self.assertEqual(tarinfo.size, 7011)
624 self.assertEqual(tarinfo.uid, 123)
625 self.assertEqual(tarinfo.gid, 123)
626 self.assertEqual(tarinfo.mtime, 1041808783.0)
627 self.assertEqual(type(tarinfo.mtime), float)
628 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
629 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
630
631
632class WriteTestBase(unittest.TestCase):
633 # Put all write tests in here that are supposed to be tested
634 # in all possible mode combinations.
635
636 def test_fileobj_no_close(self):
637 fobj = StringIO.StringIO()
638 tar = tarfile.open(fileobj=fobj, mode=self.mode)
639 tar.addfile(tarfile.TarInfo("foo"))
640 tar.close()
641 self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
642
643
644class WriteTest(WriteTestBase):
645
646 mode = "w:"
647
648 def test_100_char_name(self):
649 # The name field in a tar header stores strings of at most 100 chars.
650 # If a string is shorter than 100 chars it has to be padded with '\0',
651 # which implies that a string of exactly 100 chars is stored without
652 # a trailing '\0'.
653 name = "0123456789" * 10
654 tar = tarfile.open(tmpname, self.mode)
655 t = tarfile.TarInfo(name)
656 tar.addfile(t)
657 tar.close()
658
659 tar = tarfile.open(tmpname)
660 self.assertTrue(tar.getnames()[0] == name,
661 "failed to store 100 char filename")
662 tar.close()
663
664 def test_tar_size(self):
665 # Test for bug #1013882.
666 tar = tarfile.open(tmpname, self.mode)
667 path = os.path.join(TEMPDIR, "file")
668 fobj = open(path, "wb")
669 fobj.write("aaa")
670 fobj.close()
671 tar.add(path)
672 tar.close()
673 self.assertTrue(os.path.getsize(tmpname) > 0,
674 "tarfile is empty")
675
676 # The test_*_size tests test for bug #1167128.
677 def test_file_size(self):
678 tar = tarfile.open(tmpname, self.mode)
679
680 path = os.path.join(TEMPDIR, "file")
681 fobj = open(path, "wb")
682 fobj.close()
683 tarinfo = tar.gettarinfo(path)
684 self.assertEqual(tarinfo.size, 0)
685
686 fobj = open(path, "wb")
687 fobj.write("aaa")
688 fobj.close()
689 tarinfo = tar.gettarinfo(path)
690 self.assertEqual(tarinfo.size, 3)
691
692 tar.close()
693
694 def test_directory_size(self):
695 path = os.path.join(TEMPDIR, "directory")
696 os.mkdir(path)
697 try:
698 tar = tarfile.open(tmpname, self.mode)
699 tarinfo = tar.gettarinfo(path)
700 self.assertEqual(tarinfo.size, 0)
701 finally:
702 os.rmdir(path)
703
704 def test_link_size(self):
705 if hasattr(os, "link"):
706 link = os.path.join(TEMPDIR, "link")
707 target = os.path.join(TEMPDIR, "link_target")
708 fobj = open(target, "wb")
709 fobj.write("aaa")
710 fobj.close()
711 os.link(target, link)
712 try:
713 tar = tarfile.open(tmpname, self.mode)
714 # Record the link target in the inodes list.
715 tar.gettarinfo(target)
716 tarinfo = tar.gettarinfo(link)
717 self.assertEqual(tarinfo.size, 0)
718 finally:
719 os.remove(target)
720 os.remove(link)
721
722 def test_symlink_size(self):
723 if hasattr(os, "symlink"):
724 path = os.path.join(TEMPDIR, "symlink")
725 os.symlink("link_target", path)
726 try:
727 tar = tarfile.open(tmpname, self.mode)
728 tarinfo = tar.gettarinfo(path)
729 self.assertEqual(tarinfo.size, 0)
730 finally:
731 os.remove(path)
732
733 def test_add_self(self):
734 # Test for #1257255.
735 dstname = os.path.abspath(tmpname)
736
737 tar = tarfile.open(tmpname, self.mode)
738 self.assertTrue(tar.name == dstname, "archive name must be absolute")
739
740 tar.add(dstname)
741 self.assertTrue(tar.getnames() == [], "added the archive to itself")
742
743 cwd = os.getcwd()
744 os.chdir(TEMPDIR)
745 tar.add(dstname)
746 os.chdir(cwd)
747 self.assertTrue(tar.getnames() == [], "added the archive to itself")
748
749 def test_exclude(self):
750 tempdir = os.path.join(TEMPDIR, "exclude")
751 os.mkdir(tempdir)
752 try:
753 for name in ("foo", "bar", "baz"):
754 name = os.path.join(tempdir, name)
755 open(name, "wb").close()
756
757 exclude = os.path.isfile
758
759 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
760 with test_support.check_warnings(("use the filter argument",
761 DeprecationWarning)):
762 tar.add(tempdir, arcname="empty_dir", exclude=exclude)
763 tar.close()
764
765 tar = tarfile.open(tmpname, "r")
766 self.assertEqual(len(tar.getmembers()), 1)
767 self.assertEqual(tar.getnames()[0], "empty_dir")
768 finally:
769 shutil.rmtree(tempdir)
770
771 def test_filter(self):
772 tempdir = os.path.join(TEMPDIR, "filter")
773 os.mkdir(tempdir)
774 try:
775 for name in ("foo", "bar", "baz"):
776 name = os.path.join(tempdir, name)
777 open(name, "wb").close()
778
779 def filter(tarinfo):
780 if os.path.basename(tarinfo.name) == "bar":
781 return
782 tarinfo.uid = 123
783 tarinfo.uname = "foo"
784 return tarinfo
785
786 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
787 tar.add(tempdir, arcname="empty_dir", filter=filter)
788 tar.close()
789
790 tar = tarfile.open(tmpname, "r")
791 for tarinfo in tar:
792 self.assertEqual(tarinfo.uid, 123)
793 self.assertEqual(tarinfo.uname, "foo")
794 self.assertEqual(len(tar.getmembers()), 3)
795 tar.close()
796 finally:
797 shutil.rmtree(tempdir)
798
799 # Guarantee that stored pathnames are not modified. Don't
800 # remove ./ or ../ or double slashes. Still make absolute
801 # pathnames relative.
802 # For details see bug #6054.
803 def _test_pathname(self, path, cmp_path=None, dir=False):
804 # Create a tarfile with an empty member named path
805 # and compare the stored name with the original.
806 foo = os.path.join(TEMPDIR, "foo")
807 if not dir:
808 open(foo, "w").close()
809 else:
810 os.mkdir(foo)
811
812 tar = tarfile.open(tmpname, self.mode)
813 tar.add(foo, arcname=path)
814 tar.close()
815
816 tar = tarfile.open(tmpname, "r")
817 t = tar.next()
818 tar.close()
819
820 if not dir:
821 os.remove(foo)
822 else:
823 os.rmdir(foo)
824
825 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
826
827 def test_pathnames(self):
828 self._test_pathname("foo")
829 self._test_pathname(os.path.join("foo", ".", "bar"))
830 self._test_pathname(os.path.join("foo", "..", "bar"))
831 self._test_pathname(os.path.join(".", "foo"))
832 self._test_pathname(os.path.join(".", "foo", "."))
833 self._test_pathname(os.path.join(".", "foo", ".", "bar"))
834 self._test_pathname(os.path.join(".", "foo", "..", "bar"))
835 self._test_pathname(os.path.join(".", "foo", "..", "bar"))
836 self._test_pathname(os.path.join("..", "foo"))
837 self._test_pathname(os.path.join("..", "foo", ".."))
838 self._test_pathname(os.path.join("..", "foo", ".", "bar"))
839 self._test_pathname(os.path.join("..", "foo", "..", "bar"))
840
841 self._test_pathname("foo" + os.sep + os.sep + "bar")
842 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
843
844 def test_abs_pathnames(self):
845 if sys.platform == "win32":
846 self._test_pathname("C:\\foo", "foo")
847 else:
848 self._test_pathname("/foo", "foo")
849 self._test_pathname("///foo", "foo")
850
851 def test_cwd(self):
852 # Test adding the current working directory.
853 cwd = os.getcwd()
854 os.chdir(TEMPDIR)
855 try:
856 open("foo", "w").close()
857
858 tar = tarfile.open(tmpname, self.mode)
859 tar.add(".")
860 tar.close()
861
862 tar = tarfile.open(tmpname, "r")
863 for t in tar:
864 self.assertTrue(t.name == "." or t.name.startswith("./"))
865 tar.close()
866 finally:
867 os.chdir(cwd)
868
869 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
870 def test_extractall_symlinks(self):
871 # Test if extractall works properly when tarfile contains symlinks
872 tempdir = os.path.join(TEMPDIR, "testsymlinks")
873 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
874 os.mkdir(tempdir)
875 try:
876 source_file = os.path.join(tempdir,'source')
877 target_file = os.path.join(tempdir,'symlink')
878 with open(source_file,'w') as f:
879 f.write('something\n')
880 os.symlink(source_file, target_file)
881 tar = tarfile.open(temparchive,'w')
882 tar.add(source_file, arcname=os.path.basename(source_file))
883 tar.add(target_file, arcname=os.path.basename(target_file))
884 tar.close()
885 # Let's extract it to the location which contains the symlink
886 tar = tarfile.open(temparchive,'r')
887 # this should not raise OSError: [Errno 17] File exists
888 try:
889 tar.extractall(path=tempdir)
890 except OSError:
891 self.fail("extractall failed with symlinked files")
892 finally:
893 tar.close()
894 finally:
895 os.unlink(temparchive)
896 shutil.rmtree(tempdir)
897
898 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
899 def test_extractall_broken_symlinks(self):
900 # Test if extractall works properly when tarfile contains broken
901 # symlinks
902 tempdir = os.path.join(TEMPDIR, "testsymlinks")
903 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
904 os.mkdir(tempdir)
905 try:
906 source_file = os.path.join(tempdir,'source')
907 target_file = os.path.join(tempdir,'symlink')
908 with open(source_file,'w') as f:
909 f.write('something\n')
910 os.symlink(source_file, target_file)
911 tar = tarfile.open(temparchive,'w')
912 tar.add(target_file, arcname=os.path.basename(target_file))
913 tar.close()
914 # remove the real file
915 os.unlink(source_file)
916 # Let's extract it to the location which contains the symlink
917 tar = tarfile.open(temparchive,'r')
918 # this should not raise OSError: [Errno 17] File exists
919 try:
920 tar.extractall(path=tempdir)
921 except OSError:
922 self.fail("extractall failed with broken symlinked files")
923 finally:
924 tar.close()
925 finally:
926 os.unlink(temparchive)
927 shutil.rmtree(tempdir)
928
929 @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
930 def test_extractall_hardlinks(self):
931 # Test if extractall works properly when tarfile contains symlinks
932 tempdir = os.path.join(TEMPDIR, "testsymlinks")
933 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
934 os.mkdir(tempdir)
935 try:
936 source_file = os.path.join(tempdir,'source')
937 target_file = os.path.join(tempdir,'symlink')
938 with open(source_file,'w') as f:
939 f.write('something\n')
940 os.link(source_file, target_file)
941 tar = tarfile.open(temparchive,'w')
942 tar.add(source_file, arcname=os.path.basename(source_file))
943 tar.add(target_file, arcname=os.path.basename(target_file))
944 tar.close()
945 # Let's extract it to the location which contains the symlink
946 tar = tarfile.open(temparchive,'r')
947 # this should not raise OSError: [Errno 17] File exists
948 try:
949 tar.extractall(path=tempdir)
950 except OSError:
951 self.fail("extractall failed with linked files")
952 finally:
953 tar.close()
954 finally:
955 os.unlink(temparchive)
956 shutil.rmtree(tempdir)
957
958class StreamWriteTest(WriteTestBase):
959
960 mode = "w|"
961
962 def test_stream_padding(self):
963 # Test for bug #1543303.
964 tar = tarfile.open(tmpname, self.mode)
965 tar.close()
966
967 if self.mode.endswith("gz"):
968 fobj = gzip.GzipFile(tmpname)
969 data = fobj.read()
970 fobj.close()
971 elif self.mode.endswith("bz2"):
972 dec = bz2.BZ2Decompressor()
973 data = open(tmpname, "rb").read()
974 data = dec.decompress(data)
975 self.assertTrue(len(dec.unused_data) == 0,
976 "found trailing data")
977 else:
978 fobj = open(tmpname, "rb")
979 data = fobj.read()
980 fobj.close()
981
982 self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
983 "incorrect zero padding")
984
985 def test_file_mode(self):
986 # Test for issue #8464: Create files with correct
987 # permissions.
988 if sys.platform == "win32" or not hasattr(os, "umask"):
989 return
990
991 if os.path.exists(tmpname):
992 os.remove(tmpname)
993
994 original_umask = os.umask(0022)
995 try:
996 tar = tarfile.open(tmpname, self.mode)
997 tar.close()
998 mode = os.stat(tmpname).st_mode & 0777
999 self.assertEqual(mode, 0644, "wrong file permissions")
1000 finally:
1001 os.umask(original_umask)
1002
1003 def test_issue13639(self):
1004 try:
1005 with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
1006 pass
1007 except UnicodeDecodeError:
1008 self.fail("_Stream failed to write unicode filename")
1009
1010
1011class GNUWriteTest(unittest.TestCase):
1012 # This testcase checks for correct creation of GNU Longname
1013 # and Longlink extended headers (cp. bug #812325).
1014
1015 def _length(self, s):
1016 blocks, remainder = divmod(len(s) + 1, 512)
1017 if remainder:
1018 blocks += 1
1019 return blocks * 512
1020
1021 def _calc_size(self, name, link=None):
1022 # Initial tar header
1023 count = 512
1024
1025 if len(name) > tarfile.LENGTH_NAME:
1026 # GNU longname extended header + longname
1027 count += 512
1028 count += self._length(name)
1029 if link is not None and len(link) > tarfile.LENGTH_LINK:
1030 # GNU longlink extended header + longlink
1031 count += 512
1032 count += self._length(link)
1033 return count
1034
1035 def _test(self, name, link=None):
1036 tarinfo = tarfile.TarInfo(name)
1037 if link:
1038 tarinfo.linkname = link
1039 tarinfo.type = tarfile.LNKTYPE
1040
1041 tar = tarfile.open(tmpname, "w")
1042 tar.format = tarfile.GNU_FORMAT
1043 tar.addfile(tarinfo)
1044
1045 v1 = self._calc_size(name, link)
1046 v2 = tar.offset
1047 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
1048
1049 tar.close()
1050
1051 tar = tarfile.open(tmpname)
1052 member = tar.next()
1053 self.assertIsNotNone(member,
1054 "unable to read longname member")
1055 self.assertEqual(tarinfo.name, member.name,
1056 "unable to read longname member")
1057 self.assertEqual(tarinfo.linkname, member.linkname,
1058 "unable to read longname member")
1059
1060 def test_longname_1023(self):
1061 self._test(("longnam/" * 127) + "longnam")
1062
1063 def test_longname_1024(self):
1064 self._test(("longnam/" * 127) + "longname")
1065
1066 def test_longname_1025(self):
1067 self._test(("longnam/" * 127) + "longname_")
1068
1069 def test_longlink_1023(self):
1070 self._test("name", ("longlnk/" * 127) + "longlnk")
1071
1072 def test_longlink_1024(self):
1073 self._test("name", ("longlnk/" * 127) + "longlink")
1074
1075 def test_longlink_1025(self):
1076 self._test("name", ("longlnk/" * 127) + "longlink_")
1077
1078 def test_longnamelink_1023(self):
1079 self._test(("longnam/" * 127) + "longnam",
1080 ("longlnk/" * 127) + "longlnk")
1081
1082 def test_longnamelink_1024(self):
1083 self._test(("longnam/" * 127) + "longname",
1084 ("longlnk/" * 127) + "longlink")
1085
1086 def test_longnamelink_1025(self):
1087 self._test(("longnam/" * 127) + "longname_",
1088 ("longlnk/" * 127) + "longlink_")
1089
1090
1091class HardlinkTest(unittest.TestCase):
1092 # Test the creation of LNKTYPE (hardlink) members in an archive.
1093
1094 def setUp(self):
1095 self.foo = os.path.join(TEMPDIR, "foo")
1096 self.bar = os.path.join(TEMPDIR, "bar")
1097
1098 fobj = open(self.foo, "wb")
1099 fobj.write("foo")
1100 fobj.close()
1101
1102 os.link(self.foo, self.bar)
1103
1104 self.tar = tarfile.open(tmpname, "w")
1105 self.tar.add(self.foo)
1106
1107 def tearDown(self):
1108 self.tar.close()
1109 os.remove(self.foo)
1110 os.remove(self.bar)
1111
1112 def test_add_twice(self):
1113 # The same name will be added as a REGTYPE every
1114 # time regardless of st_nlink.
1115 tarinfo = self.tar.gettarinfo(self.foo)
1116 self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1117 "add file as regular failed")
1118
1119 def test_add_hardlink(self):
1120 tarinfo = self.tar.gettarinfo(self.bar)
1121 self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
1122 "add file as hardlink failed")
1123
1124 def test_dereference_hardlink(self):
1125 self.tar.dereference = True
1126 tarinfo = self.tar.gettarinfo(self.bar)
1127 self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1128 "dereferencing hardlink failed")
1129
1130
1131class PaxWriteTest(GNUWriteTest):
1132
1133 def _test(self, name, link=None):
1134 # See GNUWriteTest.
1135 tarinfo = tarfile.TarInfo(name)
1136 if link:
1137 tarinfo.linkname = link
1138 tarinfo.type = tarfile.LNKTYPE
1139
1140 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1141 tar.addfile(tarinfo)
1142 tar.close()
1143
1144 tar = tarfile.open(tmpname)
1145 if link:
1146 l = tar.getmembers()[0].linkname
1147 self.assertTrue(link == l, "PAX longlink creation failed")
1148 else:
1149 n = tar.getmembers()[0].name
1150 self.assertTrue(name == n, "PAX longname creation failed")
1151
1152 def test_pax_global_header(self):
1153 pax_headers = {
1154 u"foo": u"bar",
1155 u"uid": u"0",
1156 u"mtime": u"1.23",
1157 u"test": u"äöü",
1158 u"äöü": u"test"}
1159
1160 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1161 pax_headers=pax_headers)
1162 tar.addfile(tarfile.TarInfo("test"))
1163 tar.close()
1164
1165 # Test if the global header was written correctly.
1166 tar = tarfile.open(tmpname, encoding="iso8859-1")
1167 self.assertEqual(tar.pax_headers, pax_headers)
1168 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1169
1170 # Test if all the fields are unicode.
1171 for key, val in tar.pax_headers.iteritems():
1172 self.assertTrue(type(key) is unicode)
1173 self.assertTrue(type(val) is unicode)
1174 if key in tarfile.PAX_NUMBER_FIELDS:
1175 try:
1176 tarfile.PAX_NUMBER_FIELDS[key](val)
1177 except (TypeError, ValueError):
1178 self.fail("unable to convert pax header field")
1179
1180 def test_pax_extended_header(self):
1181 # The fields from the pax header have priority over the
1182 # TarInfo.
1183 pax_headers = {u"path": u"foo", u"uid": u"123"}
1184
1185 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1186 t = tarfile.TarInfo()
1187 t.name = u"äöü" # non-ASCII
1188 t.uid = 8**8 # too large
1189 t.pax_headers = pax_headers
1190 tar.addfile(t)
1191 tar.close()
1192
1193 tar = tarfile.open(tmpname, encoding="iso8859-1")
1194 t = tar.getmembers()[0]
1195 self.assertEqual(t.pax_headers, pax_headers)
1196 self.assertEqual(t.name, "foo")
1197 self.assertEqual(t.uid, 123)
1198
1199
1200class UstarUnicodeTest(unittest.TestCase):
1201 # All *UnicodeTests FIXME
1202
1203 format = tarfile.USTAR_FORMAT
1204
1205 def test_iso8859_1_filename(self):
1206 self._test_unicode_filename("iso8859-1")
1207
1208 def test_utf7_filename(self):
1209 self._test_unicode_filename("utf7")
1210
1211 def test_utf8_filename(self):
1212 self._test_unicode_filename("utf8")
1213
1214 def _test_unicode_filename(self, encoding):
1215 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1216 name = u"äöü"
1217 tar.addfile(tarfile.TarInfo(name))
1218 tar.close()
1219
1220 tar = tarfile.open(tmpname, encoding=encoding)
1221 self.assertTrue(type(tar.getnames()[0]) is not unicode)
1222 self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
1223 tar.close()
1224
1225 def test_unicode_filename_error(self):
1226 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1227 tarinfo = tarfile.TarInfo()
1228
1229 tarinfo.name = "äöü"
1230 if self.format == tarfile.PAX_FORMAT:
1231 self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1232 else:
1233 tar.addfile(tarinfo)
1234
1235 tarinfo.name = u"äöü"
1236 self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1237
1238 tarinfo.name = "foo"
1239 tarinfo.uname = u"äöü"
1240 self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1241
1242 def test_unicode_argument(self):
1243 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1244 for t in tar:
1245 self.assertTrue(type(t.name) is str)
1246 self.assertTrue(type(t.linkname) is str)
1247 self.assertTrue(type(t.uname) is str)
1248 self.assertTrue(type(t.gname) is str)
1249 tar.close()
1250
1251 def test_uname_unicode(self):
1252 for name in (u"äöü", "äöü"):
1253 t = tarfile.TarInfo("foo")
1254 t.uname = name
1255 t.gname = name
1256
1257 fobj = StringIO.StringIO()
1258 tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
1259 tar.addfile(t)
1260 tar.close()
1261 fobj.seek(0)
1262
1263 tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
1264 t = tar.getmember("foo")
1265 self.assertEqual(t.uname, "äöü")
1266 self.assertEqual(t.gname, "äöü")
1267
1268
1269class GNUUnicodeTest(UstarUnicodeTest):
1270
1271 format = tarfile.GNU_FORMAT
1272
1273
1274class PaxUnicodeTest(UstarUnicodeTest):
1275
1276 format = tarfile.PAX_FORMAT
1277
1278 def _create_unicode_name(self, name):
1279 tar = tarfile.open(tmpname, "w", format=self.format)
1280 t = tarfile.TarInfo()
1281 t.pax_headers["path"] = name
1282 tar.addfile(t)
1283 tar.close()
1284
1285 def test_error_handlers(self):
1286 # Test if the unicode error handlers work correctly for characters
1287 # that cannot be expressed in a given encoding.
1288 self._create_unicode_name(u"äöü")
1289
1290 for handler, name in (("utf-8", u"äöü".encode("utf8")),
1291 ("replace", "???"), ("ignore", "")):
1292 tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
1293 errors=handler)
1294 self.assertEqual(tar.getnames()[0], name)
1295
1296 self.assertRaises(UnicodeError, tarfile.open, tmpname,
1297 encoding="ascii", errors="strict")
1298
1299 def test_error_handler_utf8(self):
1300 # Create a pathname that has one component representable using
1301 # iso8859-1 and the other only in iso8859-15.
1302 self._create_unicode_name(u"äöü/€")
1303
1304 tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
1305 errors="utf-8")
1306 self.assertEqual(tar.getnames()[0], "äöü/" + u"€".encode("utf8"))
1307
1308
1309class AppendTest(unittest.TestCase):
1310 # Test append mode (cp. patch #1652681).
1311
1312 def setUp(self):
1313 self.tarname = tmpname
1314 if os.path.exists(self.tarname):
1315 os.remove(self.tarname)
1316
1317 def _add_testfile(self, fileobj=None):
1318 tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
1319 tar.addfile(tarfile.TarInfo("bar"))
1320 tar.close()
1321
1322 def _create_testtar(self, mode="w:"):
1323 src = tarfile.open(tarname, encoding="iso8859-1")
1324 t = src.getmember("ustar/regtype")
1325 t.name = "foo"
1326 f = src.extractfile(t)
1327 tar = tarfile.open(self.tarname, mode)
1328 tar.addfile(t, f)
1329 tar.close()
1330
1331 def _test(self, names=["bar"], fileobj=None):
1332 tar = tarfile.open(self.tarname, fileobj=fileobj)
1333 self.assertEqual(tar.getnames(), names)
1334
1335 def test_non_existing(self):
1336 self._add_testfile()
1337 self._test()
1338
1339 def test_empty(self):
1340 tarfile.open(self.tarname, "w:").close()
1341 self._add_testfile()
1342 self._test()
1343
1344 def test_empty_fileobj(self):
1345 fobj = StringIO.StringIO("\0" * 1024)
1346 self._add_testfile(fobj)
1347 fobj.seek(0)
1348 self._test(fileobj=fobj)
1349
1350 def test_fileobj(self):
1351 self._create_testtar()
1352 data = open(self.tarname).read()
1353 fobj = StringIO.StringIO(data)
1354 self._add_testfile(fobj)
1355 fobj.seek(0)
1356 self._test(names=["foo", "bar"], fileobj=fobj)
1357
1358 def test_existing(self):
1359 self._create_testtar()
1360 self._add_testfile()
1361 self._test(names=["foo", "bar"])
1362
1363 def test_append_gz(self):
1364 if gzip is None:
1365 return
1366 self._create_testtar("w:gz")
1367 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1368
1369 def test_append_bz2(self):
1370 if bz2 is None:
1371 return
1372 self._create_testtar("w:bz2")
1373 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1374
1375 # Append mode is supposed to fail if the tarfile to append to
1376 # does not end with a zero block.
1377 def _test_error(self, data):
1378 open(self.tarname, "wb").write(data)
1379 self.assertRaises(tarfile.ReadError, self._add_testfile)
1380
1381 def test_null(self):
1382 self._test_error("")
1383
1384 def test_incomplete(self):
1385 self._test_error("\0" * 13)
1386
1387 def test_premature_eof(self):
1388 data = tarfile.TarInfo("foo").tobuf()
1389 self._test_error(data)
1390
1391 def test_trailing_garbage(self):
1392 data = tarfile.TarInfo("foo").tobuf()
1393 self._test_error(data + "\0" * 13)
1394
1395 def test_invalid(self):
1396 self._test_error("a" * 512)
1397
1398
1399class LimitsTest(unittest.TestCase):
1400
1401 def test_ustar_limits(self):
1402 # 100 char name
1403 tarinfo = tarfile.TarInfo("0123456789" * 10)
1404 tarinfo.tobuf(tarfile.USTAR_FORMAT)
1405
1406 # 101 char name that cannot be stored
1407 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1408 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1409
1410 # 256 char name with a slash at pos 156
1411 tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1412 tarinfo.tobuf(tarfile.USTAR_FORMAT)
1413
1414 # 256 char name that cannot be stored
1415 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1416 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1417
1418 # 512 char name
1419 tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1420 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1421
1422 # 512 char linkname
1423 tarinfo = tarfile.TarInfo("longlink")
1424 tarinfo.linkname = "123/" * 126 + "longname"
1425 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1426
1427 # uid > 8 digits
1428 tarinfo = tarfile.TarInfo("name")
1429 tarinfo.uid = 010000000
1430 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1431
1432 def test_gnu_limits(self):
1433 tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1434 tarinfo.tobuf(tarfile.GNU_FORMAT)
1435
1436 tarinfo = tarfile.TarInfo("longlink")
1437 tarinfo.linkname = "123/" * 126 + "longname"
1438 tarinfo.tobuf(tarfile.GNU_FORMAT)
1439
1440 # uid >= 256 ** 7
1441 tarinfo = tarfile.TarInfo("name")
1442 tarinfo.uid = 04000000000000000000L
1443 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1444
1445 def test_pax_limits(self):
1446 tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1447 tarinfo.tobuf(tarfile.PAX_FORMAT)
1448
1449 tarinfo = tarfile.TarInfo("longlink")
1450 tarinfo.linkname = "123/" * 126 + "longname"
1451 tarinfo.tobuf(tarfile.PAX_FORMAT)
1452
1453 tarinfo = tarfile.TarInfo("name")
1454 tarinfo.uid = 04000000000000000000L
1455 tarinfo.tobuf(tarfile.PAX_FORMAT)
1456
1457
1458class ContextManagerTest(unittest.TestCase):
1459
1460 def test_basic(self):
1461 with tarfile.open(tarname) as tar:
1462 self.assertFalse(tar.closed, "closed inside runtime context")
1463 self.assertTrue(tar.closed, "context manager failed")
1464
1465 def test_closed(self):
1466 # The __enter__() method is supposed to raise IOError
1467 # if the TarFile object is already closed.
1468 tar = tarfile.open(tarname)
1469 tar.close()
1470 with self.assertRaises(IOError):
1471 with tar:
1472 pass
1473
1474 def test_exception(self):
1475 # Test if the IOError exception is passed through properly.
1476 with self.assertRaises(Exception) as exc:
1477 with tarfile.open(tarname) as tar:
1478 raise IOError
1479 self.assertIsInstance(exc.exception, IOError,
1480 "wrong exception raised in context manager")
1481 self.assertTrue(tar.closed, "context manager failed")
1482
1483 def test_no_eof(self):
1484 # __exit__() must not write end-of-archive blocks if an
1485 # exception was raised.
1486 try:
1487 with tarfile.open(tmpname, "w") as tar:
1488 raise Exception
1489 except:
1490 pass
1491 self.assertEqual(os.path.getsize(tmpname), 0,
1492 "context manager wrote an end-of-archive block")
1493 self.assertTrue(tar.closed, "context manager failed")
1494
1495 def test_eof(self):
1496 # __exit__() must write end-of-archive blocks, i.e. call
1497 # TarFile.close() if there was no error.
1498 with tarfile.open(tmpname, "w"):
1499 pass
1500 self.assertNotEqual(os.path.getsize(tmpname), 0,
1501 "context manager wrote no end-of-archive block")
1502
1503 def test_fileobj(self):
1504 # Test that __exit__() did not close the external file
1505 # object.
1506 fobj = open(tmpname, "wb")
1507 try:
1508 with tarfile.open(fileobj=fobj, mode="w") as tar:
1509 raise Exception
1510 except:
1511 pass
1512 self.assertFalse(fobj.closed, "external file object was closed")
1513 self.assertTrue(tar.closed, "context manager failed")
1514 fobj.close()
1515
1516
1517class LinkEmulationTest(ReadTest):
1518
1519 # Test for issue #8741 regression. On platforms that do not support
1520 # symbolic or hard links tarfile tries to extract these types of members as
1521 # the regular files they point to.
1522 def _test_link_extraction(self, name):
1523 self.tar.extract(name, TEMPDIR)
1524 data = open(os.path.join(TEMPDIR, name), "rb").read()
1525 self.assertEqual(md5sum(data), md5_regtype)
1526
1527 def test_hardlink_extraction1(self):
1528 self._test_link_extraction("ustar/lnktype")
1529
1530 def test_hardlink_extraction2(self):
1531 self._test_link_extraction("./ustar/linktest2/lnktype")
1532
1533 def test_symlink_extraction1(self):
1534 self._test_link_extraction("ustar/symtype")
1535
1536 def test_symlink_extraction2(self):
1537 self._test_link_extraction("./ustar/linktest2/symtype")
1538
1539
1540class GzipMiscReadTest(MiscReadTest):
1541 tarname = gzipname
1542 mode = "r:gz"
1543class GzipUstarReadTest(UstarReadTest):
1544 tarname = gzipname
1545 mode = "r:gz"
1546class GzipStreamReadTest(StreamReadTest):
1547 tarname = gzipname
1548 mode = "r|gz"
1549class GzipWriteTest(WriteTest):
1550 mode = "w:gz"
1551class GzipStreamWriteTest(StreamWriteTest):
1552 mode = "w|gz"
1553
1554
1555class Bz2MiscReadTest(MiscReadTest):
1556 tarname = bz2name
1557 mode = "r:bz2"
1558class Bz2UstarReadTest(UstarReadTest):
1559 tarname = bz2name
1560 mode = "r:bz2"
1561class Bz2StreamReadTest(StreamReadTest):
1562 tarname = bz2name
1563 mode = "r|bz2"
1564class Bz2WriteTest(WriteTest):
1565 mode = "w:bz2"
1566class Bz2StreamWriteTest(StreamWriteTest):
1567 mode = "w|bz2"
1568
1569class Bz2PartialReadTest(unittest.TestCase):
1570 # Issue5068: The _BZ2Proxy.read() method loops forever
1571 # on an empty or partial bzipped file.
1572
1573 def _test_partial_input(self, mode):
1574 class MyStringIO(StringIO.StringIO):
1575 hit_eof = False
1576 def read(self, n):
1577 if self.hit_eof:
1578 raise AssertionError("infinite loop detected in tarfile.open()")
1579 self.hit_eof = self.pos == self.len
1580 return StringIO.StringIO.read(self, n)
1581 def seek(self, *args):
1582 self.hit_eof = False
1583 return StringIO.StringIO.seek(self, *args)
1584
1585 data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1586 for x in range(len(data) + 1):
1587 try:
1588 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
1589 except tarfile.ReadError:
1590 pass # we have no interest in ReadErrors
1591
1592 def test_partial_input(self):
1593 self._test_partial_input("r")
1594
1595 def test_partial_input_bz2(self):
1596 self._test_partial_input("r:bz2")
1597
1598
1599def test_main():
1600 os.makedirs(TEMPDIR)
1601
1602 tests = [
1603 UstarReadTest,
1604 MiscReadTest,
1605 StreamReadTest,
1606 DetectReadTest,
1607 MemberReadTest,
1608 GNUReadTest,
1609 PaxReadTest,
1610 WriteTest,
1611 StreamWriteTest,
1612 GNUWriteTest,
1613 PaxWriteTest,
1614 UstarUnicodeTest,
1615 GNUUnicodeTest,
1616 PaxUnicodeTest,
1617 AppendTest,
1618 LimitsTest,
1619 ContextManagerTest,
1620 ]
1621
1622 if hasattr(os, "link"):
1623 tests.append(HardlinkTest)
1624 else:
1625 tests.append(LinkEmulationTest)
1626
1627 fobj = open(tarname, "rb")
1628 data = fobj.read()
1629 fobj.close()
1630
1631 if gzip:
1632 # Create testtar.tar.gz and add gzip-specific tests.
1633 tar = gzip.open(gzipname, "wb")
1634 tar.write(data)
1635 tar.close()
1636
1637 tests += [
1638 GzipMiscReadTest,
1639 GzipUstarReadTest,
1640 GzipStreamReadTest,
1641 GzipWriteTest,
1642 GzipStreamWriteTest,
1643 ]
1644
1645 if bz2:
1646 # Create testtar.tar.bz2 and add bz2-specific tests.
1647 tar = bz2.BZ2File(bz2name, "wb")
1648 tar.write(data)
1649 tar.close()
1650
1651 tests += [
1652 Bz2MiscReadTest,
1653 Bz2UstarReadTest,
1654 Bz2StreamReadTest,
1655 Bz2WriteTest,
1656 Bz2StreamWriteTest,
1657 Bz2PartialReadTest,
1658 ]
1659
1660 try:
1661 test_support.run_unittest(*tests)
1662 finally:
1663 if os.path.exists(TEMPDIR):
1664 shutil.rmtree(TEMPDIR)
1665
1666if __name__ == "__main__":
1667 test_main()
Note: See TracBrowser for help on using the repository browser.