Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_tarfile.py

    r2 r391  
    44import os
    55import shutil
    6 import tempfile
    76import StringIO
    87from hashlib import md5
     
    5655        fobj = self.tar.extractfile(tarinfo)
    5756        data = fobj.read()
    58         self.assert_((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
     57        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
    5958                "regular file extraction failed")
    6059
     
    6766        lines1 = fobj1.readlines()
    6867        lines2 = fobj2.readlines()
    69         self.assert_(lines1 == lines2,
     68        self.assertTrue(lines1 == lines2,
    7069                "fileobj.readlines() failed")
    71         self.assert_(len(lines2) == 114,
     70        self.assertTrue(len(lines2) == 114,
    7271                "fileobj.readlines() failed")
    73         self.assert_(lines2[83] == \
     72        self.assertTrue(lines2[83] ==
    7473                "I will gladly admit that Python is not the fastest running scripting language.\n",
    7574                "fileobj.readlines() failed")
     
    8281        lines1 = fobj1.readlines()
    8382        lines2 = [line for line in fobj2]
    84         self.assert_(lines1 == lines2,
     83        self.assertTrue(lines1 == lines2,
    8584                     "fileobj.__iter__() failed")
    8685
     
    9695        text = fobj.read()
    9796        fobj.seek(0)
    98         self.assert_(0 == fobj.tell(),
     97        self.assertTrue(0 == fobj.tell(),
    9998                     "seek() to file's start failed")
    10099        fobj.seek(2048, 0)
    101         self.assert_(2048 == fobj.tell(),
     100        self.assertTrue(2048 == fobj.tell(),
    102101                     "seek() to absolute position failed")
    103102        fobj.seek(-1024, 1)
    104         self.assert_(1024 == fobj.tell(),
     103        self.assertTrue(1024 == fobj.tell(),
    105104                     "seek() to negative relative position failed")
    106105        fobj.seek(1024, 1)
    107         self.assert_(2048 == fobj.tell(),
     106        self.assertTrue(2048 == fobj.tell(),
    108107                     "seek() to positive relative position failed")
    109108        s = fobj.read(10)
    110         self.assert_(s == data[2048:2058],
     109        self.assertTrue(s == data[2048:2058],
    111110                     "read() after seek failed")
    112111        fobj.seek(0, 2)
    113         self.assert_(tarinfo.size == fobj.tell(),
     112        self.assertTrue(tarinfo.size == fobj.tell(),
    114113                     "seek() to file's end failed")
    115         self.assert_(fobj.read() == "",
     114        self.assertTrue(fobj.read() == "",
    116115                     "read() at file's end did not return empty string")
    117116        fobj.seek(-tarinfo.size, 2)
    118         self.assert_(0 == fobj.tell(),
     117        self.assertTrue(0 == fobj.tell(),
    119118                     "relative seek() to file's start failed")
    120119        fobj.seek(512)
     
    122121        fobj.seek(512)
    123122        s2 = fobj.readlines()
    124         self.assert_(s1 == s2,
     123        self.assertTrue(s1 == s2,
    125124                     "readlines() after seek failed")
    126125        fobj.seek(0)
    127         self.assert_(len(fobj.readline()) == fobj.tell(),
     126        self.assertTrue(len(fobj.readline()) == fobj.tell(),
    128127                     "tell() after readline() failed")
    129128        fobj.seek(512)
    130         self.assert_(len(fobj.readline()) + 512 == fobj.tell(),
     129        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
    131130                     "tell() after seek() and readline() failed")
    132131        fobj.seek(0)
    133132        line = fobj.readline()
    134         self.assert_(fobj.read() == data[len(line):],
     133        self.assertTrue(fobj.read() == data[len(line):],
    135134                     "read() after readline() failed")
    136135        fobj.close()
    137136
    138 
    139 class MiscReadTest(ReadTest):
     137    # Test if symbolic and hard links are resolved by extractfile().  The
     138    # test link members each point to a regular member whose data is
     139    # supposed to be exported.
     140    def _test_fileobj_link(self, lnktype, regtype):
     141        a = self.tar.extractfile(lnktype)
     142        b = self.tar.extractfile(regtype)
     143        self.assertEqual(a.name, b.name)
     144
     145    def test_fileobj_link1(self):
     146        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
     147
     148    def test_fileobj_link2(self):
     149        self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
     150
     151    def test_fileobj_symlink1(self):
     152        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
     153
     154    def test_fileobj_symlink2(self):
     155        self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
     156
     157    def test_issue14160(self):
     158        self._test_fileobj_link("symtype2", "ustar/regtype")
     159
     160
     161class CommonReadTest(ReadTest):
     162
     163    def test_empty_tarfile(self):
     164        # Test for issue6123: Allow opening empty archives.
     165        # This test checks if tarfile.open() is able to open an empty tar
     166        # archive successfully. Note that an empty tar archive is not the
     167        # same as an empty file!
     168        tarfile.open(tmpname, self.mode.replace("r", "w")).close()
     169        try:
     170            tar = tarfile.open(tmpname, self.mode)
     171            tar.getnames()
     172        except tarfile.ReadError:
     173            self.fail("tarfile.open() failed on empty archive")
     174        self.assertListEqual(tar.getmembers(), [])
     175
     176    def test_null_tarfile(self):
     177        # Test for issue6123: Allow opening empty archives.
     178        # This test guarantees that tarfile.open() does not treat an empty
     179        # file as an empty tar archive.
     180        open(tmpname, "wb").close()
     181        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
     182        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
     183
     184    def test_ignore_zeros(self):
     185        # Test TarFile's ignore_zeros option.
     186        if self.mode.endswith(":gz"):
     187            _open = gzip.GzipFile
     188        elif self.mode.endswith(":bz2"):
     189            _open = bz2.BZ2File
     190        else:
     191            _open = open
     192
     193        for char in ('\0', 'a'):
     194            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
     195            # are ignored correctly.
     196            fobj = _open(tmpname, "wb")
     197            fobj.write(char * 1024)
     198            fobj.write(tarfile.TarInfo("foo").tobuf())
     199            fobj.close()
     200
     201            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
     202            self.assertListEqual(tar.getnames(), ["foo"],
     203                    "ignore_zeros=True should have skipped the %r-blocks" % char)
     204            tar.close()
     205
     206
     207class MiscReadTest(CommonReadTest):
    140208
    141209    def test_no_name_argument(self):
     
    203271        # header with a "/" appended to the filename field.
    204272        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
    205         self.assert_(tarinfo.type == tarfile.DIRTYPE,
     273        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
    206274                "v7 dirtype failed")
    207275
     
    217285    def test_check_members(self):
    218286        for tarinfo in self.tar:
    219             self.assert_(int(tarinfo.mtime) == 07606136617,
     287            self.assertTrue(int(tarinfo.mtime) == 07606136617,
    220288                    "wrong mtime for %s" % tarinfo.name)
    221289            if not tarinfo.name.startswith("ustar/"):
    222290                continue
    223             self.assert_(tarinfo.uname == "tarfile",
     291            self.assertTrue(tarinfo.uname == "tarfile",
    224292                    "wrong uname for %s" % tarinfo.name)
    225293
    226294    def test_find_members(self):
    227         self.assert_(self.tar.getmembers()[-1].name == "misc/eof",
     295        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
    228296                "could not find all members")
    229297
    230298    def test_extract_hardlink(self):
    231299        # Test hardlink extraction (e.g. bug #857297).
    232         tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
    233 
    234         tar.extract("ustar/regtype", TEMPDIR)
    235         try:
     300        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
     301            tar.extract("ustar/regtype", TEMPDIR)
     302            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
     303
    236304            tar.extract("ustar/lnktype", TEMPDIR)
    237         except EnvironmentError, e:
    238             if e.errno == errno.ENOENT:
    239                 self.fail("hardlink not extracted properly")
    240 
    241         data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
    242         self.assertEqual(md5sum(data), md5_regtype)
    243 
    244         try:
     305            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
     306            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
     307                data = f.read()
     308            self.assertEqual(md5sum(data), md5_regtype)
     309
    245310            tar.extract("ustar/symtype", TEMPDIR)
    246         except EnvironmentError, e:
    247             if e.errno == errno.ENOENT:
    248                 self.fail("symlink not extracted properly")
    249 
    250         data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
    251         self.assertEqual(md5sum(data), md5_regtype)
     311            self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
     312            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
     313                data = f.read()
     314            self.assertEqual(md5sum(data), md5_regtype)
    252315
    253316    def test_extractall(self):
     
    268331        # Issue #7341: Close the internal file object in the TarFile
    269332        # constructor in case of an error. For the test we rely on
    270         # the fact that opening an invalid file raises a ReadError.
    271         invalid = os.path.join(TEMPDIR, "invalid")
    272         open(invalid, "wb").write("foo")
     333        # the fact that opening an empty file raises a ReadError.
     334        empty = os.path.join(TEMPDIR, "empty")
     335        open(empty, "wb").write("")
    273336
    274337        try:
    275338            tar = object.__new__(tarfile.TarFile)
    276339            try:
    277                 tar.__init__(invalid)
     340                tar.__init__(empty)
    278341            except tarfile.ReadError:
    279342                self.assertTrue(tar.fileobj.closed)
     
    281344                self.fail("ReadError not raised")
    282345        finally:
    283             os.remove(invalid)
    284 
    285 
    286 class StreamReadTest(ReadTest):
     346            os.remove(empty)
     347
     348    def test_parallel_iteration(self):
     349        # Issue #16601: Restarting iteration over tarfile continued
     350        # from where it left off.
     351        with tarfile.open(self.tarname) as tar:
     352            for m1, m2 in zip(tar, tar):
     353                self.assertEqual(m1.offset, m2.offset)
     354                self.assertEqual(m1.name, m2.name)
     355
     356
     357class StreamReadTest(CommonReadTest):
    287358
    288359    mode="r|"
     
    292363        fobj = self.tar.extractfile(tarinfo)
    293364        data = fobj.read()
    294         self.assert_((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
     365        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
    295366                "regular file extraction failed")
    296367
     
    309380            if t1 is None:
    310381                break
    311             self.assert_(t2 is not None, "stream.next() failed.")
     382            self.assertTrue(t2 is not None, "stream.next() failed.")
    312383
    313384            if t2.islnk() or t2.issym():
     
    319390            if v1 is None:
    320391                continue
    321             self.assert_(v2 is not None, "stream.extractfile() failed")
    322             self.assert_(v1.read() == v2.read(), "stream extraction failed")
     392            self.assertTrue(v2 is not None, "stream.extractfile() failed")
     393            self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
    323394
    324395        tar1.close()
     
    376447        self._test_modes(self._testfunc_fileobj)
    377448
     449    def test_detect_stream_bz2(self):
     450        # Originally, tarfile's stream detection looked for the string
     451        # "BZh91" at the start of the file. This is incorrect because
     452        # the '9' represents the blocksize (900kB). If the file was
     453        # compressed using another blocksize autodetection fails.
     454        if not bz2:
     455            return
     456
     457        with open(tarname, "rb") as fobj:
     458            data = fobj.read()
     459
     460        # Compress with blocksize 100kB, the file starts with "BZh11".
     461        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
     462            fobj.write(data)
     463
     464        self._testfunc_file(tmpname, "r|*")
     465
    378466
    379467class MemberReadTest(ReadTest):
     
    381469    def _test_member(self, tarinfo, chksum=None, **kwargs):
    382470        if chksum is not None:
    383             self.assert_(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
     471            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
    384472                    "wrong md5sum for %s" % tarinfo.name)
    385473
     
    392480            kwargs["gname"] = "tarfile"
    393481        for k, v in kwargs.iteritems():
    394             self.assert_(getattr(tarinfo, k) == v,
     482            self.assertTrue(getattr(tarinfo, k) == v,
    395483                    "wrong value in %s field of %s" % (k, tarinfo.name))
    396484
     
    441529    def test_find_ustar_longname(self):
    442530        name = "ustar/" + "12345/" * 39 + "1234567/longname"
    443         self.assert_(name in self.tar.getnames())
     531        self.assertIn(name, self.tar.getnames())
    444532
    445533    def test_find_regtype_oldv7(self):
     
    462550        except KeyError:
    463551            self.fail("longname not found")
    464         self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
     552        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
    465553
    466554    def test_read_longlink(self):
     
    471559        except KeyError:
    472560            self.fail("longlink not found")
    473         self.assert_(tarinfo.linkname == longname, "linkname wrong")
     561        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
    474562
    475563    def test_truncated_longname(self):
     
    502590        tarinfo2 = self.tar.getmember("gnu/sparse")
    503591        fobj2 = self.tar.extractfile(tarinfo2)
    504         self.assert_(fobj1.read() == fobj2.read(),
     592        self.assertTrue(fobj1.read() == fobj2.read(),
    505593                "sparse file extraction failed")
    506594
     
    551639        tar.addfile(tarfile.TarInfo("foo"))
    552640        tar.close()
    553         self.assert_(fobj.closed is False, "external fileobjs must never closed")
     641        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
    554642
    555643
     
    570658
    571659        tar = tarfile.open(tmpname)
    572         self.assert_(tar.getnames()[0] == name,
     660        self.assertTrue(tar.getnames()[0] == name,
    573661                "failed to store 100 char filename")
    574662        tar.close()
     
    583671        tar.add(path)
    584672        tar.close()
    585         self.assert_(os.path.getsize(tmpname) > 0,
     673        self.assertTrue(os.path.getsize(tmpname) > 0,
    586674                "tarfile is empty")
    587675
     
    618706            link = os.path.join(TEMPDIR, "link")
    619707            target = os.path.join(TEMPDIR, "link_target")
    620             open(target, "wb").close()
     708            fobj = open(target, "wb")
     709            fobj.write("aaa")
     710            fobj.close()
    621711            os.link(target, link)
    622712            try:
    623713                tar = tarfile.open(tmpname, self.mode)
     714                # Record the link target in the inodes list.
     715                tar.gettarinfo(target)
    624716                tarinfo = tar.gettarinfo(link)
    625717                self.assertEqual(tarinfo.size, 0)
     
    644736
    645737        tar = tarfile.open(tmpname, self.mode)
    646         self.assert_(tar.name == dstname, "archive name must be absolute")
     738        self.assertTrue(tar.name == dstname, "archive name must be absolute")
    647739
    648740        tar.add(dstname)
    649         self.assert_(tar.getnames() == [], "added the archive to itself")
     741        self.assertTrue(tar.getnames() == [], "added the archive to itself")
    650742
    651743        cwd = os.getcwd()
     
    653745        tar.add(dstname)
    654746        os.chdir(cwd)
    655         self.assert_(tar.getnames() == [], "added the archive to itself")
     747        self.assertTrue(tar.getnames() == [], "added the archive to itself")
    656748
    657749    def test_exclude(self):
     
    663755                open(name, "wb").close()
    664756
    665             def exclude(name):
    666                 return os.path.isfile(name)
     757            exclude = os.path.isfile
    667758
    668759            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    669             tar.add(tempdir, arcname="empty_dir", exclude=exclude)
     760            with test_support.check_warnings(("use the filter argument",
     761                                              DeprecationWarning)):
     762                tar.add(tempdir, arcname="empty_dir", exclude=exclude)
    670763            tar.close()
    671764
     
    676769            shutil.rmtree(tempdir)
    677770
     771    def test_filter(self):
     772        tempdir = os.path.join(TEMPDIR, "filter")
     773        os.mkdir(tempdir)
     774        try:
     775            for name in ("foo", "bar", "baz"):
     776                name = os.path.join(tempdir, name)
     777                open(name, "wb").close()
     778
     779            def filter(tarinfo):
     780                if os.path.basename(tarinfo.name) == "bar":
     781                    return
     782                tarinfo.uid = 123
     783                tarinfo.uname = "foo"
     784                return tarinfo
     785
     786            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
     787            tar.add(tempdir, arcname="empty_dir", filter=filter)
     788            tar.close()
     789
     790            tar = tarfile.open(tmpname, "r")
     791            for tarinfo in tar:
     792                self.assertEqual(tarinfo.uid, 123)
     793                self.assertEqual(tarinfo.uname, "foo")
     794            self.assertEqual(len(tar.getmembers()), 3)
     795            tar.close()
     796        finally:
     797            shutil.rmtree(tempdir)
     798
     799    # Guarantee that stored pathnames are not modified. Don't
     800    # remove ./ or ../ or double slashes. Still make absolute
     801    # pathnames relative.
     802    # For details see bug #6054.
     803    def _test_pathname(self, path, cmp_path=None, dir=False):
     804        # Create a tarfile with an empty member named path
     805        # and compare the stored name with the original.
     806        foo = os.path.join(TEMPDIR, "foo")
     807        if not dir:
     808            open(foo, "w").close()
     809        else:
     810            os.mkdir(foo)
     811
     812        tar = tarfile.open(tmpname, self.mode)
     813        tar.add(foo, arcname=path)
     814        tar.close()
     815
     816        tar = tarfile.open(tmpname, "r")
     817        t = tar.next()
     818        tar.close()
     819
     820        if not dir:
     821            os.remove(foo)
     822        else:
     823            os.rmdir(foo)
     824
     825        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
     826
     827    def test_pathnames(self):
     828        self._test_pathname("foo")
     829        self._test_pathname(os.path.join("foo", ".", "bar"))
     830        self._test_pathname(os.path.join("foo", "..", "bar"))
     831        self._test_pathname(os.path.join(".", "foo"))
     832        self._test_pathname(os.path.join(".", "foo", "."))
     833        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
     834        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
     835        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
     836        self._test_pathname(os.path.join("..", "foo"))
     837        self._test_pathname(os.path.join("..", "foo", ".."))
     838        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
     839        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
     840
     841        self._test_pathname("foo" + os.sep + os.sep + "bar")
     842        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
     843
     844    def test_abs_pathnames(self):
     845        if sys.platform == "win32":
     846            self._test_pathname("C:\\foo", "foo")
     847        else:
     848            self._test_pathname("/foo", "foo")
     849            self._test_pathname("///foo", "foo")
     850
     851    def test_cwd(self):
     852        # Test adding the current working directory.
     853        cwd = os.getcwd()
     854        os.chdir(TEMPDIR)
     855        try:
     856            open("foo", "w").close()
     857
     858            tar = tarfile.open(tmpname, self.mode)
     859            tar.add(".")
     860            tar.close()
     861
     862            tar = tarfile.open(tmpname, "r")
     863            for t in tar:
     864                self.assertTrue(t.name == "." or t.name.startswith("./"))
     865            tar.close()
     866        finally:
     867            os.chdir(cwd)
     868
     869    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
     870    def test_extractall_symlinks(self):
     871        # Test if extractall works properly when tarfile contains symlinks
     872        tempdir = os.path.join(TEMPDIR, "testsymlinks")
     873        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
     874        os.mkdir(tempdir)
     875        try:
     876            source_file = os.path.join(tempdir,'source')
     877            target_file = os.path.join(tempdir,'symlink')
     878            with open(source_file,'w') as f:
     879                f.write('something\n')
     880            os.symlink(source_file, target_file)
     881            tar = tarfile.open(temparchive,'w')
     882            tar.add(source_file, arcname=os.path.basename(source_file))
     883            tar.add(target_file, arcname=os.path.basename(target_file))
     884            tar.close()
     885            # Let's extract it to the location which contains the symlink
     886            tar = tarfile.open(temparchive,'r')
     887            # this should not raise OSError: [Errno 17] File exists
     888            try:
     889                tar.extractall(path=tempdir)
     890            except OSError:
     891                self.fail("extractall failed with symlinked files")
     892            finally:
     893                tar.close()
     894        finally:
     895            os.unlink(temparchive)
     896            shutil.rmtree(tempdir)
     897
     898    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
     899    def test_extractall_broken_symlinks(self):
     900        # Test if extractall works properly when tarfile contains broken
     901        # symlinks
     902        tempdir = os.path.join(TEMPDIR, "testsymlinks")
     903        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
     904        os.mkdir(tempdir)
     905        try:
     906            source_file = os.path.join(tempdir,'source')
     907            target_file = os.path.join(tempdir,'symlink')
     908            with open(source_file,'w') as f:
     909                f.write('something\n')
     910            os.symlink(source_file, target_file)
     911            tar = tarfile.open(temparchive,'w')
     912            tar.add(target_file, arcname=os.path.basename(target_file))
     913            tar.close()
     914            # remove the real file
     915            os.unlink(source_file)
     916            # Let's extract it to the location which contains the symlink
     917            tar = tarfile.open(temparchive,'r')
     918            # this should not raise OSError: [Errno 17] File exists
     919            try:
     920                tar.extractall(path=tempdir)
     921            except OSError:
     922                self.fail("extractall failed with broken symlinked files")
     923            finally:
     924                tar.close()
     925        finally:
     926            os.unlink(temparchive)
     927            shutil.rmtree(tempdir)
     928
     929    @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
     930    def test_extractall_hardlinks(self):
     931        # Test if extractall works properly when tarfile contains symlinks
     932        tempdir = os.path.join(TEMPDIR, "testsymlinks")
     933        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
     934        os.mkdir(tempdir)
     935        try:
     936            source_file = os.path.join(tempdir,'source')
     937            target_file = os.path.join(tempdir,'symlink')
     938            with open(source_file,'w') as f:
     939                f.write('something\n')
     940            os.link(source_file, target_file)
     941            tar = tarfile.open(temparchive,'w')
     942            tar.add(source_file, arcname=os.path.basename(source_file))
     943            tar.add(target_file, arcname=os.path.basename(target_file))
     944            tar.close()
     945            # Let's extract it to the location which contains the symlink
     946            tar = tarfile.open(temparchive,'r')
     947            # this should not raise OSError: [Errno 17] File exists
     948            try:
     949                tar.extractall(path=tempdir)
     950            except OSError:
     951                self.fail("extractall failed with linked files")
     952            finally:
     953                tar.close()
     954        finally:
     955            os.unlink(temparchive)
     956            shutil.rmtree(tempdir)
    678957
    679958class StreamWriteTest(WriteTestBase):
     
    694973            data = open(tmpname, "rb").read()
    695974            data = dec.decompress(data)
    696             self.assert_(len(dec.unused_data) == 0,
     975            self.assertTrue(len(dec.unused_data) == 0,
    697976                    "found trailing data")
    698977        else:
     
    701980            fobj.close()
    702981
    703         self.assert_(data.count("\0") == tarfile.RECORDSIZE,
     982        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
    704983                         "incorrect zero padding")
     984
     985    def test_file_mode(self):
     986        # Test for issue #8464: Create files with correct
     987        # permissions.
     988        if sys.platform == "win32" or not hasattr(os, "umask"):
     989            return
     990
     991        if os.path.exists(tmpname):
     992            os.remove(tmpname)
     993
     994        original_umask = os.umask(0022)
     995        try:
     996            tar = tarfile.open(tmpname, self.mode)
     997            tar.close()
     998            mode = os.stat(tmpname).st_mode & 0777
     999            self.assertEqual(mode, 0644, "wrong file permissions")
     1000        finally:
     1001            os.umask(original_umask)
     1002
     1003    def test_issue13639(self):
     1004        try:
     1005            with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
     1006                pass
     1007        except UnicodeDecodeError:
     1008            self.fail("_Stream failed to write unicode filename")
    7051009
    7061010
     
    7411045        v1 = self._calc_size(name, link)
    7421046        v2 = tar.offset
    743         self.assert_(v1 == v2, "GNU longname/longlink creation failed")
     1047        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
    7441048
    7451049        tar.close()
     
    7471051        tar = tarfile.open(tmpname)
    7481052        member = tar.next()
    749         self.failIf(member is None, "unable to read longname member")
    750         self.assert_(tarinfo.name == member.name and \
    751                      tarinfo.linkname == member.linkname, \
    752                      "unable to read longname member")
     1053        self.assertIsNotNone(member,
     1054                "unable to read longname member")
     1055        self.assertEqual(tarinfo.name, member.name,
     1056                "unable to read longname member")
     1057        self.assertEqual(tarinfo.linkname, member.linkname,
     1058                "unable to read longname member")
    7531059
    7541060    def test_longname_1023(self):
     
    8081114        # time regardless of st_nlink.
    8091115        tarinfo = self.tar.gettarinfo(self.foo)
    810         self.assert_(tarinfo.type == tarfile.REGTYPE,
     1116        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
    8111117                "add file as regular failed")
    8121118
    8131119    def test_add_hardlink(self):
    8141120        tarinfo = self.tar.gettarinfo(self.bar)
    815         self.assert_(tarinfo.type == tarfile.LNKTYPE,
     1121        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
    8161122                "add file as hardlink failed")
    8171123
     
    8191125        self.tar.dereference = True
    8201126        tarinfo = self.tar.gettarinfo(self.bar)
    821         self.assert_(tarinfo.type == tarfile.REGTYPE,
     1127        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
    8221128                "dereferencing hardlink failed")
    8231129
     
    8391145        if link:
    8401146            l = tar.getmembers()[0].linkname
    841             self.assert_(link == l, "PAX longlink creation failed")
     1147            self.assertTrue(link == l, "PAX longlink creation failed")
    8421148        else:
    8431149            n = tar.getmembers()[0].name
    844             self.assert_(name == n, "PAX longname creation failed")
     1150            self.assertTrue(name == n, "PAX longname creation failed")
    8451151
    8461152    def test_pax_global_header(self):
     
    8521158                u"äöü": u"test"}
    8531159
    854         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \
     1160        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
    8551161                pax_headers=pax_headers)
    8561162        tar.addfile(tarfile.TarInfo("test"))
     
    8641170        # Test if all the fields are unicode.
    8651171        for key, val in tar.pax_headers.iteritems():
    866             self.assert_(type(key) is unicode)
    867             self.assert_(type(val) is unicode)
     1172            self.assertTrue(type(key) is unicode)
     1173            self.assertTrue(type(val) is unicode)
    8681174            if key in tarfile.PAX_NUMBER_FIELDS:
    8691175                try:
     
    9131219
    9141220        tar = tarfile.open(tmpname, encoding=encoding)
    915         self.assert_(type(tar.getnames()[0]) is not unicode)
     1221        self.assertTrue(type(tar.getnames()[0]) is not unicode)
    9161222        self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
    9171223        tar.close()
     
    9371243        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
    9381244        for t in tar:
    939             self.assert_(type(t.name) is str)
    940             self.assert_(type(t.linkname) is str)
    941             self.assert_(type(t.uname) is str)
    942             self.assert_(type(t.gname) is str)
     1245            self.assertTrue(type(t.name) is str)
     1246            self.assertTrue(type(t.linkname) is str)
     1247            self.assertTrue(type(t.uname) is str)
     1248            self.assertTrue(type(t.gname) is str)
    9431249        tar.close()
    9441250
     
    10321338
    10331339    def test_empty(self):
    1034         open(self.tarname, "w").close()
     1340        tarfile.open(self.tarname, "w:").close()
    10351341        self._add_testfile()
    10361342        self._test()
    10371343
    10381344    def test_empty_fileobj(self):
    1039         fobj = StringIO.StringIO()
     1345        fobj = StringIO.StringIO("\0" * 1024)
    10401346        self._add_testfile(fobj)
    10411347        fobj.seek(0)
     
    10671373        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
    10681374
     1375    # Append mode is supposed to fail if the tarfile to append to
     1376    # does not end with a zero block.
     1377    def _test_error(self, data):
     1378        open(self.tarname, "wb").write(data)
     1379        self.assertRaises(tarfile.ReadError, self._add_testfile)
     1380
     1381    def test_null(self):
     1382        self._test_error("")
     1383
     1384    def test_incomplete(self):
     1385        self._test_error("\0" * 13)
     1386
     1387    def test_premature_eof(self):
     1388        data = tarfile.TarInfo("foo").tobuf()
     1389        self._test_error(data)
     1390
     1391    def test_trailing_garbage(self):
     1392        data = tarfile.TarInfo("foo").tobuf()
     1393        self._test_error(data + "\0" * 13)
     1394
     1395    def test_invalid(self):
     1396        self._test_error("a" * 512)
     1397
    10691398
    10701399class LimitsTest(unittest.TestCase):
     
    11251454        tarinfo.uid = 04000000000000000000L
    11261455        tarinfo.tobuf(tarfile.PAX_FORMAT)
     1456
     1457
     1458class ContextManagerTest(unittest.TestCase):
     1459
     1460    def test_basic(self):
     1461        with tarfile.open(tarname) as tar:
     1462            self.assertFalse(tar.closed, "closed inside runtime context")
     1463        self.assertTrue(tar.closed, "context manager failed")
     1464
     1465    def test_closed(self):
     1466        # The __enter__() method is supposed to raise IOError
     1467        # if the TarFile object is already closed.
     1468        tar = tarfile.open(tarname)
     1469        tar.close()
     1470        with self.assertRaises(IOError):
     1471            with tar:
     1472                pass
     1473
     1474    def test_exception(self):
     1475        # Test if the IOError exception is passed through properly.
     1476        with self.assertRaises(Exception) as exc:
     1477            with tarfile.open(tarname) as tar:
     1478                raise IOError
     1479        self.assertIsInstance(exc.exception, IOError,
     1480                              "wrong exception raised in context manager")
     1481        self.assertTrue(tar.closed, "context manager failed")
     1482
     1483    def test_no_eof(self):
     1484        # __exit__() must not write end-of-archive blocks if an
     1485        # exception was raised.
     1486        try:
     1487            with tarfile.open(tmpname, "w") as tar:
     1488                raise Exception
     1489        except:
     1490            pass
     1491        self.assertEqual(os.path.getsize(tmpname), 0,
     1492                "context manager wrote an end-of-archive block")
     1493        self.assertTrue(tar.closed, "context manager failed")
     1494
     1495    def test_eof(self):
     1496        # __exit__() must write end-of-archive blocks, i.e. call
     1497        # TarFile.close() if there was no error.
     1498        with tarfile.open(tmpname, "w"):
     1499            pass
     1500        self.assertNotEqual(os.path.getsize(tmpname), 0,
     1501                "context manager wrote no end-of-archive block")
     1502
     1503    def test_fileobj(self):
     1504        # Test that __exit__() did not close the external file
     1505        # object.
     1506        fobj = open(tmpname, "wb")
     1507        try:
     1508            with tarfile.open(fileobj=fobj, mode="w") as tar:
     1509                raise Exception
     1510        except:
     1511            pass
     1512        self.assertFalse(fobj.closed, "external file object was closed")
     1513        self.assertTrue(tar.closed, "context manager failed")
     1514        fobj.close()
     1515
     1516
     1517class LinkEmulationTest(ReadTest):
     1518
     1519    # Test for issue #8741 regression. On platforms that do not support
     1520    # symbolic or hard links tarfile tries to extract these types of members as
     1521    # the regular files they point to.
     1522    def _test_link_extraction(self, name):
     1523        self.tar.extract(name, TEMPDIR)
     1524        data = open(os.path.join(TEMPDIR, name), "rb").read()
     1525        self.assertEqual(md5sum(data), md5_regtype)
     1526
     1527    def test_hardlink_extraction1(self):
     1528        self._test_link_extraction("ustar/lnktype")
     1529
     1530    def test_hardlink_extraction2(self):
     1531        self._test_link_extraction("./ustar/linktest2/lnktype")
     1532
     1533    def test_symlink_extraction1(self):
     1534        self._test_link_extraction("ustar/symtype")
     1535
     1536    def test_symlink_extraction2(self):
     1537        self._test_link_extraction("./ustar/linktest2/symtype")
    11271538
    11281539
     
    11681579                self.hit_eof = self.pos == self.len
    11691580                return StringIO.StringIO.read(self, n)
     1581            def seek(self, *args):
     1582                self.hit_eof = False
     1583                return StringIO.StringIO.seek(self, *args)
    11701584
    11711585        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
    11721586        for x in range(len(data) + 1):
    1173             tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
     1587            try:
     1588                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
     1589            except tarfile.ReadError:
     1590                pass # we have no interest in ReadErrors
    11741591
    11751592    def test_partial_input(self):
     
    12001617        AppendTest,
    12011618        LimitsTest,
     1619        ContextManagerTest,
    12021620    ]
    12031621
    12041622    if hasattr(os, "link"):
    12051623        tests.append(HardlinkTest)
     1624    else:
     1625        tests.append(LinkEmulationTest)
    12061626
    12071627    fobj = open(tarname, "rb")
Note: See TracChangeset for help on using the changeset viewer.