Changeset 391 for python/trunk/Lib/test/test_tarfile.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_tarfile.py
r2 r391 4 4 import os 5 5 import shutil 6 import tempfile7 6 import StringIO 8 7 from hashlib import md5 … … 56 55 fobj = self.tar.extractfile(tarinfo) 57 56 data = fobj.read() 58 self.assert _((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),57 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 59 58 "regular file extraction failed") 60 59 … … 67 66 lines1 = fobj1.readlines() 68 67 lines2 = fobj2.readlines() 69 self.assert _(lines1 == lines2,68 self.assertTrue(lines1 == lines2, 70 69 "fileobj.readlines() failed") 71 self.assert _(len(lines2) == 114,70 self.assertTrue(len(lines2) == 114, 72 71 "fileobj.readlines() failed") 73 self.assert _(lines2[83] == \72 self.assertTrue(lines2[83] == 74 73 "I will gladly admit that Python is not the fastest running scripting language.\n", 75 74 "fileobj.readlines() failed") … … 82 81 lines1 = fobj1.readlines() 83 82 lines2 = [line for line in fobj2] 84 self.assert _(lines1 == lines2,83 self.assertTrue(lines1 == lines2, 85 84 "fileobj.__iter__() failed") 86 85 … … 96 95 text = fobj.read() 97 96 fobj.seek(0) 98 self.assert _(0 == fobj.tell(),97 self.assertTrue(0 == fobj.tell(), 99 98 "seek() to file's start failed") 100 99 fobj.seek(2048, 0) 101 self.assert _(2048 == fobj.tell(),100 self.assertTrue(2048 == fobj.tell(), 102 101 "seek() to absolute position failed") 103 102 fobj.seek(-1024, 1) 104 self.assert _(1024 == fobj.tell(),103 self.assertTrue(1024 == fobj.tell(), 105 104 "seek() to negative relative position failed") 106 105 fobj.seek(1024, 1) 107 self.assert _(2048 == fobj.tell(),106 self.assertTrue(2048 == fobj.tell(), 108 107 "seek() to positive relative position failed") 109 108 s = fobj.read(10) 110 self.assert _(s == data[2048:2058],109 self.assertTrue(s == data[2048:2058], 111 110 "read() after seek failed") 112 111 fobj.seek(0, 2) 113 self.assert _(tarinfo.size == fobj.tell(),112 self.assertTrue(tarinfo.size == fobj.tell(), 114 113 "seek() to file's end failed") 115 self.assert _(fobj.read() == "",114 self.assertTrue(fobj.read() == "", 116 115 "read() at file's end did not return empty string") 117 116 fobj.seek(-tarinfo.size, 2) 118 self.assert _(0 == fobj.tell(),117 self.assertTrue(0 == fobj.tell(), 119 118 "relative seek() to file's start failed") 120 119 fobj.seek(512) … … 122 121 fobj.seek(512) 123 122 s2 = fobj.readlines() 124 self.assert _(s1 == s2,123 self.assertTrue(s1 == s2, 125 124 "readlines() after seek failed") 126 125 fobj.seek(0) 127 self.assert _(len(fobj.readline()) == fobj.tell(),126 self.assertTrue(len(fobj.readline()) == fobj.tell(), 128 127 "tell() after readline() failed") 129 128 fobj.seek(512) 130 self.assert _(len(fobj.readline()) + 512 == fobj.tell(),129 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 131 130 "tell() after seek() and readline() failed") 132 131 fobj.seek(0) 133 132 line = fobj.readline() 134 self.assert _(fobj.read() == data[len(line):],133 self.assertTrue(fobj.read() == data[len(line):], 135 134 "read() after readline() failed") 136 135 fobj.close() 137 136 138 139 class MiscReadTest(ReadTest): 137 # Test if symbolic and hard links are resolved by extractfile(). The 138 # test link members each point to a regular member whose data is 139 # supposed to be exported. 140 def _test_fileobj_link(self, lnktype, regtype): 141 a = self.tar.extractfile(lnktype) 142 b = self.tar.extractfile(regtype) 143 self.assertEqual(a.name, b.name) 144 145 def test_fileobj_link1(self): 146 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 147 148 def test_fileobj_link2(self): 149 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 150 151 def test_fileobj_symlink1(self): 152 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 153 154 def test_fileobj_symlink2(self): 155 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 156 157 def test_issue14160(self): 158 self._test_fileobj_link("symtype2", "ustar/regtype") 159 160 161 class CommonReadTest(ReadTest): 162 163 def test_empty_tarfile(self): 164 # Test for issue6123: Allow opening empty archives. 165 # This test checks if tarfile.open() is able to open an empty tar 166 # archive successfully. Note that an empty tar archive is not the 167 # same as an empty file! 168 tarfile.open(tmpname, self.mode.replace("r", "w")).close() 169 try: 170 tar = tarfile.open(tmpname, self.mode) 171 tar.getnames() 172 except tarfile.ReadError: 173 self.fail("tarfile.open() failed on empty archive") 174 self.assertListEqual(tar.getmembers(), []) 175 176 def test_null_tarfile(self): 177 # Test for issue6123: Allow opening empty archives. 178 # This test guarantees that tarfile.open() does not treat an empty 179 # file as an empty tar archive. 180 open(tmpname, "wb").close() 181 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 182 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 183 184 def test_ignore_zeros(self): 185 # Test TarFile's ignore_zeros option. 186 if self.mode.endswith(":gz"): 187 _open = gzip.GzipFile 188 elif self.mode.endswith(":bz2"): 189 _open = bz2.BZ2File 190 else: 191 _open = open 192 193 for char in ('\0', 'a'): 194 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 195 # are ignored correctly. 196 fobj = _open(tmpname, "wb") 197 fobj.write(char * 1024) 198 fobj.write(tarfile.TarInfo("foo").tobuf()) 199 fobj.close() 200 201 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 202 self.assertListEqual(tar.getnames(), ["foo"], 203 "ignore_zeros=True should have skipped the %r-blocks" % char) 204 tar.close() 205 206 207 class MiscReadTest(CommonReadTest): 140 208 141 209 def test_no_name_argument(self): … … 203 271 # header with a "/" appended to the filename field. 204 272 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 205 self.assert _(tarinfo.type == tarfile.DIRTYPE,273 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 206 274 "v7 dirtype failed") 207 275 … … 217 285 def test_check_members(self): 218 286 for tarinfo in self.tar: 219 self.assert _(int(tarinfo.mtime) == 07606136617,287 self.assertTrue(int(tarinfo.mtime) == 07606136617, 220 288 "wrong mtime for %s" % tarinfo.name) 221 289 if not tarinfo.name.startswith("ustar/"): 222 290 continue 223 self.assert _(tarinfo.uname == "tarfile",291 self.assertTrue(tarinfo.uname == "tarfile", 224 292 "wrong uname for %s" % tarinfo.name) 225 293 226 294 def test_find_members(self): 227 self.assert _(self.tar.getmembers()[-1].name == "misc/eof",295 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 228 296 "could not find all members") 229 297 230 298 def test_extract_hardlink(self): 231 299 # Test hardlink extraction (e.g. bug #857297). 232 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")233 234 tar.extract("ustar/regtype", TEMPDIR)235 try: 300 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 301 tar.extract("ustar/regtype", TEMPDIR) 302 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype")) 303 236 304 tar.extract("ustar/lnktype", TEMPDIR) 237 except EnvironmentError, e: 238 if e.errno == errno.ENOENT: 239 self.fail("hardlink not extracted properly") 240 241 data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read() 242 self.assertEqual(md5sum(data), md5_regtype) 243 244 try: 305 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype")) 306 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 307 data = f.read() 308 self.assertEqual(md5sum(data), md5_regtype) 309 245 310 tar.extract("ustar/symtype", TEMPDIR) 246 except EnvironmentError, e: 247 if e.errno == errno.ENOENT: 248 self.fail("symlink not extracted properly") 249 250 data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read() 251 self.assertEqual(md5sum(data), md5_regtype) 311 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype")) 312 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 313 data = f.read() 314 self.assertEqual(md5sum(data), md5_regtype) 252 315 253 316 def test_extractall(self): … … 268 331 # Issue #7341: Close the internal file object in the TarFile 269 332 # constructor in case of an error. For the test we rely on 270 # the fact that opening an invalidfile raises a ReadError.271 invalid = os.path.join(TEMPDIR, "invalid")272 open( invalid, "wb").write("foo")333 # the fact that opening an empty file raises a ReadError. 334 empty = os.path.join(TEMPDIR, "empty") 335 open(empty, "wb").write("") 273 336 274 337 try: 275 338 tar = object.__new__(tarfile.TarFile) 276 339 try: 277 tar.__init__( invalid)340 tar.__init__(empty) 278 341 except tarfile.ReadError: 279 342 self.assertTrue(tar.fileobj.closed) … … 281 344 self.fail("ReadError not raised") 282 345 finally: 283 os.remove(invalid) 284 285 286 class StreamReadTest(ReadTest): 346 os.remove(empty) 347 348 def test_parallel_iteration(self): 349 # Issue #16601: Restarting iteration over tarfile continued 350 # from where it left off. 351 with tarfile.open(self.tarname) as tar: 352 for m1, m2 in zip(tar, tar): 353 self.assertEqual(m1.offset, m2.offset) 354 self.assertEqual(m1.name, m2.name) 355 356 357 class StreamReadTest(CommonReadTest): 287 358 288 359 mode="r|" … … 292 363 fobj = self.tar.extractfile(tarinfo) 293 364 data = fobj.read() 294 self.assert _((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),365 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 295 366 "regular file extraction failed") 296 367 … … 309 380 if t1 is None: 310 381 break 311 self.assert _(t2 is not None, "stream.next() failed.")382 self.assertTrue(t2 is not None, "stream.next() failed.") 312 383 313 384 if t2.islnk() or t2.issym(): … … 319 390 if v1 is None: 320 391 continue 321 self.assert _(v2 is not None, "stream.extractfile() failed")322 self.assert _(v1.read() == v2.read(), "stream extraction failed")392 self.assertTrue(v2 is not None, "stream.extractfile() failed") 393 self.assertTrue(v1.read() == v2.read(), "stream extraction failed") 323 394 324 395 tar1.close() … … 376 447 self._test_modes(self._testfunc_fileobj) 377 448 449 def test_detect_stream_bz2(self): 450 # Originally, tarfile's stream detection looked for the string 451 # "BZh91" at the start of the file. This is incorrect because 452 # the '9' represents the blocksize (900kB). If the file was 453 # compressed using another blocksize autodetection fails. 454 if not bz2: 455 return 456 457 with open(tarname, "rb") as fobj: 458 data = fobj.read() 459 460 # Compress with blocksize 100kB, the file starts with "BZh11". 461 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 462 fobj.write(data) 463 464 self._testfunc_file(tmpname, "r|*") 465 378 466 379 467 class MemberReadTest(ReadTest): … … 381 469 def _test_member(self, tarinfo, chksum=None, **kwargs): 382 470 if chksum is not None: 383 self.assert _(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,471 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 384 472 "wrong md5sum for %s" % tarinfo.name) 385 473 … … 392 480 kwargs["gname"] = "tarfile" 393 481 for k, v in kwargs.iteritems(): 394 self.assert _(getattr(tarinfo, k) == v,482 self.assertTrue(getattr(tarinfo, k) == v, 395 483 "wrong value in %s field of %s" % (k, tarinfo.name)) 396 484 … … 441 529 def test_find_ustar_longname(self): 442 530 name = "ustar/" + "12345/" * 39 + "1234567/longname" 443 self.assert _(name inself.tar.getnames())531 self.assertIn(name, self.tar.getnames()) 444 532 445 533 def test_find_regtype_oldv7(self): … … 462 550 except KeyError: 463 551 self.fail("longname not found") 464 self.assert _(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")552 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 465 553 466 554 def test_read_longlink(self): … … 471 559 except KeyError: 472 560 self.fail("longlink not found") 473 self.assert _(tarinfo.linkname == longname, "linkname wrong")561 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 474 562 475 563 def test_truncated_longname(self): … … 502 590 tarinfo2 = self.tar.getmember("gnu/sparse") 503 591 fobj2 = self.tar.extractfile(tarinfo2) 504 self.assert _(fobj1.read() == fobj2.read(),592 self.assertTrue(fobj1.read() == fobj2.read(), 505 593 "sparse file extraction failed") 506 594 … … 551 639 tar.addfile(tarfile.TarInfo("foo")) 552 640 tar.close() 553 self.assert _(fobj.closed is False, "external fileobjs must never closed")641 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 554 642 555 643 … … 570 658 571 659 tar = tarfile.open(tmpname) 572 self.assert _(tar.getnames()[0] == name,660 self.assertTrue(tar.getnames()[0] == name, 573 661 "failed to store 100 char filename") 574 662 tar.close() … … 583 671 tar.add(path) 584 672 tar.close() 585 self.assert _(os.path.getsize(tmpname) > 0,673 self.assertTrue(os.path.getsize(tmpname) > 0, 586 674 "tarfile is empty") 587 675 … … 618 706 link = os.path.join(TEMPDIR, "link") 619 707 target = os.path.join(TEMPDIR, "link_target") 620 open(target, "wb").close() 708 fobj = open(target, "wb") 709 fobj.write("aaa") 710 fobj.close() 621 711 os.link(target, link) 622 712 try: 623 713 tar = tarfile.open(tmpname, self.mode) 714 # Record the link target in the inodes list. 715 tar.gettarinfo(target) 624 716 tarinfo = tar.gettarinfo(link) 625 717 self.assertEqual(tarinfo.size, 0) … … 644 736 645 737 tar = tarfile.open(tmpname, self.mode) 646 self.assert _(tar.name == dstname, "archive name must be absolute")738 self.assertTrue(tar.name == dstname, "archive name must be absolute") 647 739 648 740 tar.add(dstname) 649 self.assert _(tar.getnames() == [], "added the archive to itself")741 self.assertTrue(tar.getnames() == [], "added the archive to itself") 650 742 651 743 cwd = os.getcwd() … … 653 745 tar.add(dstname) 654 746 os.chdir(cwd) 655 self.assert _(tar.getnames() == [], "added the archive to itself")747 self.assertTrue(tar.getnames() == [], "added the archive to itself") 656 748 657 749 def test_exclude(self): … … 663 755 open(name, "wb").close() 664 756 665 def exclude(name): 666 return os.path.isfile(name) 757 exclude = os.path.isfile 667 758 668 759 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 669 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 760 with test_support.check_warnings(("use the filter argument", 761 DeprecationWarning)): 762 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 670 763 tar.close() 671 764 … … 676 769 shutil.rmtree(tempdir) 677 770 771 def test_filter(self): 772 tempdir = os.path.join(TEMPDIR, "filter") 773 os.mkdir(tempdir) 774 try: 775 for name in ("foo", "bar", "baz"): 776 name = os.path.join(tempdir, name) 777 open(name, "wb").close() 778 779 def filter(tarinfo): 780 if os.path.basename(tarinfo.name) == "bar": 781 return 782 tarinfo.uid = 123 783 tarinfo.uname = "foo" 784 return tarinfo 785 786 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 787 tar.add(tempdir, arcname="empty_dir", filter=filter) 788 tar.close() 789 790 tar = tarfile.open(tmpname, "r") 791 for tarinfo in tar: 792 self.assertEqual(tarinfo.uid, 123) 793 self.assertEqual(tarinfo.uname, "foo") 794 self.assertEqual(len(tar.getmembers()), 3) 795 tar.close() 796 finally: 797 shutil.rmtree(tempdir) 798 799 # Guarantee that stored pathnames are not modified. Don't 800 # remove ./ or ../ or double slashes. Still make absolute 801 # pathnames relative. 802 # For details see bug #6054. 803 def _test_pathname(self, path, cmp_path=None, dir=False): 804 # Create a tarfile with an empty member named path 805 # and compare the stored name with the original. 806 foo = os.path.join(TEMPDIR, "foo") 807 if not dir: 808 open(foo, "w").close() 809 else: 810 os.mkdir(foo) 811 812 tar = tarfile.open(tmpname, self.mode) 813 tar.add(foo, arcname=path) 814 tar.close() 815 816 tar = tarfile.open(tmpname, "r") 817 t = tar.next() 818 tar.close() 819 820 if not dir: 821 os.remove(foo) 822 else: 823 os.rmdir(foo) 824 825 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 826 827 def test_pathnames(self): 828 self._test_pathname("foo") 829 self._test_pathname(os.path.join("foo", ".", "bar")) 830 self._test_pathname(os.path.join("foo", "..", "bar")) 831 self._test_pathname(os.path.join(".", "foo")) 832 self._test_pathname(os.path.join(".", "foo", ".")) 833 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 834 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 835 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 836 self._test_pathname(os.path.join("..", "foo")) 837 self._test_pathname(os.path.join("..", "foo", "..")) 838 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 839 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 840 841 self._test_pathname("foo" + os.sep + os.sep + "bar") 842 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 843 844 def test_abs_pathnames(self): 845 if sys.platform == "win32": 846 self._test_pathname("C:\\foo", "foo") 847 else: 848 self._test_pathname("/foo", "foo") 849 self._test_pathname("///foo", "foo") 850 851 def test_cwd(self): 852 # Test adding the current working directory. 853 cwd = os.getcwd() 854 os.chdir(TEMPDIR) 855 try: 856 open("foo", "w").close() 857 858 tar = tarfile.open(tmpname, self.mode) 859 tar.add(".") 860 tar.close() 861 862 tar = tarfile.open(tmpname, "r") 863 for t in tar: 864 self.assertTrue(t.name == "." or t.name.startswith("./")) 865 tar.close() 866 finally: 867 os.chdir(cwd) 868 869 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 870 def test_extractall_symlinks(self): 871 # Test if extractall works properly when tarfile contains symlinks 872 tempdir = os.path.join(TEMPDIR, "testsymlinks") 873 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 874 os.mkdir(tempdir) 875 try: 876 source_file = os.path.join(tempdir,'source') 877 target_file = os.path.join(tempdir,'symlink') 878 with open(source_file,'w') as f: 879 f.write('something\n') 880 os.symlink(source_file, target_file) 881 tar = tarfile.open(temparchive,'w') 882 tar.add(source_file, arcname=os.path.basename(source_file)) 883 tar.add(target_file, arcname=os.path.basename(target_file)) 884 tar.close() 885 # Let's extract it to the location which contains the symlink 886 tar = tarfile.open(temparchive,'r') 887 # this should not raise OSError: [Errno 17] File exists 888 try: 889 tar.extractall(path=tempdir) 890 except OSError: 891 self.fail("extractall failed with symlinked files") 892 finally: 893 tar.close() 894 finally: 895 os.unlink(temparchive) 896 shutil.rmtree(tempdir) 897 898 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 899 def test_extractall_broken_symlinks(self): 900 # Test if extractall works properly when tarfile contains broken 901 # symlinks 902 tempdir = os.path.join(TEMPDIR, "testsymlinks") 903 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 904 os.mkdir(tempdir) 905 try: 906 source_file = os.path.join(tempdir,'source') 907 target_file = os.path.join(tempdir,'symlink') 908 with open(source_file,'w') as f: 909 f.write('something\n') 910 os.symlink(source_file, target_file) 911 tar = tarfile.open(temparchive,'w') 912 tar.add(target_file, arcname=os.path.basename(target_file)) 913 tar.close() 914 # remove the real file 915 os.unlink(source_file) 916 # Let's extract it to the location which contains the symlink 917 tar = tarfile.open(temparchive,'r') 918 # this should not raise OSError: [Errno 17] File exists 919 try: 920 tar.extractall(path=tempdir) 921 except OSError: 922 self.fail("extractall failed with broken symlinked files") 923 finally: 924 tar.close() 925 finally: 926 os.unlink(temparchive) 927 shutil.rmtree(tempdir) 928 929 @unittest.skipUnless(hasattr(os, 'link'), "needs os.link") 930 def test_extractall_hardlinks(self): 931 # Test if extractall works properly when tarfile contains symlinks 932 tempdir = os.path.join(TEMPDIR, "testsymlinks") 933 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 934 os.mkdir(tempdir) 935 try: 936 source_file = os.path.join(tempdir,'source') 937 target_file = os.path.join(tempdir,'symlink') 938 with open(source_file,'w') as f: 939 f.write('something\n') 940 os.link(source_file, target_file) 941 tar = tarfile.open(temparchive,'w') 942 tar.add(source_file, arcname=os.path.basename(source_file)) 943 tar.add(target_file, arcname=os.path.basename(target_file)) 944 tar.close() 945 # Let's extract it to the location which contains the symlink 946 tar = tarfile.open(temparchive,'r') 947 # this should not raise OSError: [Errno 17] File exists 948 try: 949 tar.extractall(path=tempdir) 950 except OSError: 951 self.fail("extractall failed with linked files") 952 finally: 953 tar.close() 954 finally: 955 os.unlink(temparchive) 956 shutil.rmtree(tempdir) 678 957 679 958 class StreamWriteTest(WriteTestBase): … … 694 973 data = open(tmpname, "rb").read() 695 974 data = dec.decompress(data) 696 self.assert _(len(dec.unused_data) == 0,975 self.assertTrue(len(dec.unused_data) == 0, 697 976 "found trailing data") 698 977 else: … … 701 980 fobj.close() 702 981 703 self.assert _(data.count("\0") == tarfile.RECORDSIZE,982 self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, 704 983 "incorrect zero padding") 984 985 def test_file_mode(self): 986 # Test for issue #8464: Create files with correct 987 # permissions. 988 if sys.platform == "win32" or not hasattr(os, "umask"): 989 return 990 991 if os.path.exists(tmpname): 992 os.remove(tmpname) 993 994 original_umask = os.umask(0022) 995 try: 996 tar = tarfile.open(tmpname, self.mode) 997 tar.close() 998 mode = os.stat(tmpname).st_mode & 0777 999 self.assertEqual(mode, 0644, "wrong file permissions") 1000 finally: 1001 os.umask(original_umask) 1002 1003 def test_issue13639(self): 1004 try: 1005 with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode): 1006 pass 1007 except UnicodeDecodeError: 1008 self.fail("_Stream failed to write unicode filename") 705 1009 706 1010 … … 741 1045 v1 = self._calc_size(name, link) 742 1046 v2 = tar.offset 743 self.assert _(v1 == v2, "GNU longname/longlink creation failed")1047 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 744 1048 745 1049 tar.close() … … 747 1051 tar = tarfile.open(tmpname) 748 1052 member = tar.next() 749 self.failIf(member is None, "unable to read longname member") 750 self.assert_(tarinfo.name == member.name and \ 751 tarinfo.linkname == member.linkname, \ 752 "unable to read longname member") 1053 self.assertIsNotNone(member, 1054 "unable to read longname member") 1055 self.assertEqual(tarinfo.name, member.name, 1056 "unable to read longname member") 1057 self.assertEqual(tarinfo.linkname, member.linkname, 1058 "unable to read longname member") 753 1059 754 1060 def test_longname_1023(self): … … 808 1114 # time regardless of st_nlink. 809 1115 tarinfo = self.tar.gettarinfo(self.foo) 810 self.assert _(tarinfo.type == tarfile.REGTYPE,1116 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 811 1117 "add file as regular failed") 812 1118 813 1119 def test_add_hardlink(self): 814 1120 tarinfo = self.tar.gettarinfo(self.bar) 815 self.assert _(tarinfo.type == tarfile.LNKTYPE,1121 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 816 1122 "add file as hardlink failed") 817 1123 … … 819 1125 self.tar.dereference = True 820 1126 tarinfo = self.tar.gettarinfo(self.bar) 821 self.assert _(tarinfo.type == tarfile.REGTYPE,1127 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 822 1128 "dereferencing hardlink failed") 823 1129 … … 839 1145 if link: 840 1146 l = tar.getmembers()[0].linkname 841 self.assert _(link == l, "PAX longlink creation failed")1147 self.assertTrue(link == l, "PAX longlink creation failed") 842 1148 else: 843 1149 n = tar.getmembers()[0].name 844 self.assert _(name == n, "PAX longname creation failed")1150 self.assertTrue(name == n, "PAX longname creation failed") 845 1151 846 1152 def test_pax_global_header(self): … … 852 1158 u"äöü": u"test"} 853 1159 854 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \1160 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 855 1161 pax_headers=pax_headers) 856 1162 tar.addfile(tarfile.TarInfo("test")) … … 864 1170 # Test if all the fields are unicode. 865 1171 for key, val in tar.pax_headers.iteritems(): 866 self.assert _(type(key) is unicode)867 self.assert _(type(val) is unicode)1172 self.assertTrue(type(key) is unicode) 1173 self.assertTrue(type(val) is unicode) 868 1174 if key in tarfile.PAX_NUMBER_FIELDS: 869 1175 try: … … 913 1219 914 1220 tar = tarfile.open(tmpname, encoding=encoding) 915 self.assert _(type(tar.getnames()[0]) is not unicode)1221 self.assertTrue(type(tar.getnames()[0]) is not unicode) 916 1222 self.assertEqual(tar.getmembers()[0].name, name.encode(encoding)) 917 1223 tar.close() … … 937 1243 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 938 1244 for t in tar: 939 self.assert _(type(t.name) is str)940 self.assert _(type(t.linkname) is str)941 self.assert _(type(t.uname) is str)942 self.assert _(type(t.gname) is str)1245 self.assertTrue(type(t.name) is str) 1246 self.assertTrue(type(t.linkname) is str) 1247 self.assertTrue(type(t.uname) is str) 1248 self.assertTrue(type(t.gname) is str) 943 1249 tar.close() 944 1250 … … 1032 1338 1033 1339 def test_empty(self): 1034 open(self.tarname, "w").close()1340 tarfile.open(self.tarname, "w:").close() 1035 1341 self._add_testfile() 1036 1342 self._test() 1037 1343 1038 1344 def test_empty_fileobj(self): 1039 fobj = StringIO.StringIO( )1345 fobj = StringIO.StringIO("\0" * 1024) 1040 1346 self._add_testfile(fobj) 1041 1347 fobj.seek(0) … … 1067 1373 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1068 1374 1375 # Append mode is supposed to fail if the tarfile to append to 1376 # does not end with a zero block. 1377 def _test_error(self, data): 1378 open(self.tarname, "wb").write(data) 1379 self.assertRaises(tarfile.ReadError, self._add_testfile) 1380 1381 def test_null(self): 1382 self._test_error("") 1383 1384 def test_incomplete(self): 1385 self._test_error("\0" * 13) 1386 1387 def test_premature_eof(self): 1388 data = tarfile.TarInfo("foo").tobuf() 1389 self._test_error(data) 1390 1391 def test_trailing_garbage(self): 1392 data = tarfile.TarInfo("foo").tobuf() 1393 self._test_error(data + "\0" * 13) 1394 1395 def test_invalid(self): 1396 self._test_error("a" * 512) 1397 1069 1398 1070 1399 class LimitsTest(unittest.TestCase): … … 1125 1454 tarinfo.uid = 04000000000000000000L 1126 1455 tarinfo.tobuf(tarfile.PAX_FORMAT) 1456 1457 1458 class ContextManagerTest(unittest.TestCase): 1459 1460 def test_basic(self): 1461 with tarfile.open(tarname) as tar: 1462 self.assertFalse(tar.closed, "closed inside runtime context") 1463 self.assertTrue(tar.closed, "context manager failed") 1464 1465 def test_closed(self): 1466 # The __enter__() method is supposed to raise IOError 1467 # if the TarFile object is already closed. 1468 tar = tarfile.open(tarname) 1469 tar.close() 1470 with self.assertRaises(IOError): 1471 with tar: 1472 pass 1473 1474 def test_exception(self): 1475 # Test if the IOError exception is passed through properly. 1476 with self.assertRaises(Exception) as exc: 1477 with tarfile.open(tarname) as tar: 1478 raise IOError 1479 self.assertIsInstance(exc.exception, IOError, 1480 "wrong exception raised in context manager") 1481 self.assertTrue(tar.closed, "context manager failed") 1482 1483 def test_no_eof(self): 1484 # __exit__() must not write end-of-archive blocks if an 1485 # exception was raised. 1486 try: 1487 with tarfile.open(tmpname, "w") as tar: 1488 raise Exception 1489 except: 1490 pass 1491 self.assertEqual(os.path.getsize(tmpname), 0, 1492 "context manager wrote an end-of-archive block") 1493 self.assertTrue(tar.closed, "context manager failed") 1494 1495 def test_eof(self): 1496 # __exit__() must write end-of-archive blocks, i.e. call 1497 # TarFile.close() if there was no error. 1498 with tarfile.open(tmpname, "w"): 1499 pass 1500 self.assertNotEqual(os.path.getsize(tmpname), 0, 1501 "context manager wrote no end-of-archive block") 1502 1503 def test_fileobj(self): 1504 # Test that __exit__() did not close the external file 1505 # object. 1506 fobj = open(tmpname, "wb") 1507 try: 1508 with tarfile.open(fileobj=fobj, mode="w") as tar: 1509 raise Exception 1510 except: 1511 pass 1512 self.assertFalse(fobj.closed, "external file object was closed") 1513 self.assertTrue(tar.closed, "context manager failed") 1514 fobj.close() 1515 1516 1517 class LinkEmulationTest(ReadTest): 1518 1519 # Test for issue #8741 regression. On platforms that do not support 1520 # symbolic or hard links tarfile tries to extract these types of members as 1521 # the regular files they point to. 1522 def _test_link_extraction(self, name): 1523 self.tar.extract(name, TEMPDIR) 1524 data = open(os.path.join(TEMPDIR, name), "rb").read() 1525 self.assertEqual(md5sum(data), md5_regtype) 1526 1527 def test_hardlink_extraction1(self): 1528 self._test_link_extraction("ustar/lnktype") 1529 1530 def test_hardlink_extraction2(self): 1531 self._test_link_extraction("./ustar/linktest2/lnktype") 1532 1533 def test_symlink_extraction1(self): 1534 self._test_link_extraction("ustar/symtype") 1535 1536 def test_symlink_extraction2(self): 1537 self._test_link_extraction("./ustar/linktest2/symtype") 1127 1538 1128 1539 … … 1168 1579 self.hit_eof = self.pos == self.len 1169 1580 return StringIO.StringIO.read(self, n) 1581 def seek(self, *args): 1582 self.hit_eof = False 1583 return StringIO.StringIO.seek(self, *args) 1170 1584 1171 1585 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1172 1586 for x in range(len(data) + 1): 1173 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) 1587 try: 1588 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) 1589 except tarfile.ReadError: 1590 pass # we have no interest in ReadErrors 1174 1591 1175 1592 def test_partial_input(self): … … 1200 1617 AppendTest, 1201 1618 LimitsTest, 1619 ContextManagerTest, 1202 1620 ] 1203 1621 1204 1622 if hasattr(os, "link"): 1205 1623 tests.append(HardlinkTest) 1624 else: 1625 tests.append(LinkEmulationTest) 1206 1626 1207 1627 fobj = open(tarname, "rb")
Note:
See TracChangeset
for help on using the changeset viewer.