Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_shutil.py

    r2 r391  
    88import os
    99import os.path
     10import errno
     11from os.path import splitdrive
     12from distutils.spawn import find_executable, spawn
     13from shutil import (_make_tarball, _make_zipfile, make_archive,
     14                    register_archive_format, unregister_archive_format,
     15                    get_archive_formats)
     16import tarfile
     17import warnings
     18
    1019from test import test_support
    11 from test.test_support import TESTFN
     20from test.test_support import TESTFN, check_warnings, captured_stdout
     21
     22TESTFN2 = TESTFN + "2"
     23
     24try:
     25    import grp
     26    import pwd
     27    UID_GID_SUPPORT = True
     28except ImportError:
     29    UID_GID_SUPPORT = False
     30
     31try:
     32    import zlib
     33except ImportError:
     34    zlib = None
     35
     36try:
     37    import zipfile
     38    ZIP_SUPPORT = True
     39except ImportError:
     40    ZIP_SUPPORT = find_executable('zip')
    1241
    1342class TestShutil(unittest.TestCase):
     43
     44    def setUp(self):
     45        super(TestShutil, self).setUp()
     46        self.tempdirs = []
     47
     48    def tearDown(self):
     49        super(TestShutil, self).tearDown()
     50        while self.tempdirs:
     51            d = self.tempdirs.pop()
     52            shutil.rmtree(d, os.name in ('nt', 'cygwin'))
     53
     54    def write_file(self, path, content='xxx'):
     55        """Writes a file in the given path.
     56
     57
     58        path can be a string or a sequence.
     59        """
     60        if isinstance(path, (list, tuple)):
     61            path = os.path.join(*path)
     62        f = open(path, 'w')
     63        try:
     64            f.write(content)
     65        finally:
     66            f.close()
     67
     68    def mkdtemp(self):
     69        """Create a temporary directory that will be cleaned up.
     70
     71        Returns the path of the directory.
     72        """
     73        d = tempfile.mkdtemp()
     74        self.tempdirs.append(d)
     75        return d
    1476    def test_rmtree_errors(self):
    1577        # filename is guaranteed not to exist
     
    46108
    47109    def check_args_to_onerror(self, func, arg, exc):
     110        # test_rmtree_errors deliberately runs rmtree
     111        # on a directory that is chmod 400, which will fail.
     112        # This function is run when shutil.rmtree fails.
     113        # 99.9% of the time it initially fails to remove
     114        # a file in the directory, so the first time through
     115        # func is os.remove.
     116        # However, some Linux machines running ZFS on
     117        # FUSE experienced a failure earlier in the process
     118        # at os.listdir.  The first failure may legally
     119        # be either.
    48120        if self.errorState == 0:
    49             self.assertEqual(func, os.remove)
    50             self.assertEqual(arg, self.childpath)
    51             self.failUnless(issubclass(exc[0], OSError))
     121            if func is os.remove:
     122                self.assertEqual(arg, self.childpath)
     123            else:
     124                self.assertIs(func, os.listdir,
     125                              "func must be either os.remove or os.listdir")
     126                self.assertEqual(arg, TESTFN)
     127            self.assertTrue(issubclass(exc[0], OSError))
    52128            self.errorState = 1
    53129        else:
    54130            self.assertEqual(func, os.rmdir)
    55131            self.assertEqual(arg, TESTFN)
    56             self.failUnless(issubclass(exc[0], OSError))
     132            self.assertTrue(issubclass(exc[0], OSError))
    57133            self.errorState = 2
    58134
     
    139215            write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
    140216
     217
    141218            # testing glob-like patterns
    142219            try:
     
    202279                os.link(src, dst)
    203280                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
    204                 self.assertEqual(open(src,'r').read(), 'cheddar')
     281                with open(src, 'r') as f:
     282                    self.assertEqual(f.read(), 'cheddar')
    205283                os.remove(dst)
    206284
     
    210288                os.symlink('cheese', dst)
    211289                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
    212                 self.assertEqual(open(src,'r').read(), 'cheddar')
     290                with open(src, 'r') as f:
     291                    self.assertEqual(f.read(), 'cheddar')
    213292                os.remove(dst)
    214293            finally:
     
    229308            finally:
    230309                shutil.rmtree(TESTFN, ignore_errors=True)
     310
     311    if hasattr(os, "mkfifo"):
     312        # Issue #3002: copyfile and copytree block indefinitely on named pipes
     313        def test_copyfile_named_pipe(self):
     314            os.mkfifo(TESTFN)
     315            try:
     316                self.assertRaises(shutil.SpecialFileError,
     317                                  shutil.copyfile, TESTFN, TESTFN2)
     318                self.assertRaises(shutil.SpecialFileError,
     319                                  shutil.copyfile, __file__, TESTFN)
     320            finally:
     321                os.remove(TESTFN)
     322
     323        def test_copytree_named_pipe(self):
     324            os.mkdir(TESTFN)
     325            try:
     326                subdir = os.path.join(TESTFN, "subdir")
     327                os.mkdir(subdir)
     328                pipe = os.path.join(subdir, "mypipe")
     329                os.mkfifo(pipe)
     330                try:
     331                    shutil.copytree(TESTFN, TESTFN2)
     332                except shutil.Error as e:
     333                    errors = e.args[0]
     334                    self.assertEqual(len(errors), 1)
     335                    src, dst, error_msg = errors[0]
     336                    self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
     337                else:
     338                    self.fail("shutil.Error should have been raised")
     339            finally:
     340                shutil.rmtree(TESTFN, ignore_errors=True)
     341                shutil.rmtree(TESTFN2, ignore_errors=True)
     342
     343    @unittest.skipUnless(hasattr(os, 'chflags') and
     344                         hasattr(errno, 'EOPNOTSUPP') and
     345                         hasattr(errno, 'ENOTSUP'),
     346                         "requires os.chflags, EOPNOTSUPP & ENOTSUP")
     347    def test_copystat_handles_harmless_chflags_errors(self):
     348        tmpdir = self.mkdtemp()
     349        file1 = os.path.join(tmpdir, 'file1')
     350        file2 = os.path.join(tmpdir, 'file2')
     351        self.write_file(file1, 'xxx')
     352        self.write_file(file2, 'xxx')
     353
     354        def make_chflags_raiser(err):
     355            ex = OSError()
     356
     357            def _chflags_raiser(path, flags):
     358                ex.errno = err
     359                raise ex
     360            return _chflags_raiser
     361        old_chflags = os.chflags
     362        try:
     363            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
     364                os.chflags = make_chflags_raiser(err)
     365                shutil.copystat(file1, file2)
     366            # assert others errors break it
     367            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
     368            self.assertRaises(OSError, shutil.copystat, file1, file2)
     369        finally:
     370            os.chflags = old_chflags
     371
     372    @unittest.skipUnless(zlib, "requires zlib")
     373    def test_make_tarball(self):
     374        # creating something to tar
     375        tmpdir = self.mkdtemp()
     376        self.write_file([tmpdir, 'file1'], 'xxx')
     377        self.write_file([tmpdir, 'file2'], 'xxx')
     378        os.mkdir(os.path.join(tmpdir, 'sub'))
     379        self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
     380
     381        tmpdir2 = self.mkdtemp()
     382        # force shutil to create the directory
     383        os.rmdir(tmpdir2)
     384        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
     385                            "source and target should be on same drive")
     386
     387        base_name = os.path.join(tmpdir2, 'archive')
     388
     389        # working with relative paths to avoid tar warnings
     390        old_dir = os.getcwd()
     391        os.chdir(tmpdir)
     392        try:
     393            _make_tarball(splitdrive(base_name)[1], '.')
     394        finally:
     395            os.chdir(old_dir)
     396
     397        # check if the compressed tarball was created
     398        tarball = base_name + '.tar.gz'
     399        self.assertTrue(os.path.exists(tarball))
     400
     401        # trying an uncompressed one
     402        base_name = os.path.join(tmpdir2, 'archive')
     403        old_dir = os.getcwd()
     404        os.chdir(tmpdir)
     405        try:
     406            _make_tarball(splitdrive(base_name)[1], '.', compress=None)
     407        finally:
     408            os.chdir(old_dir)
     409        tarball = base_name + '.tar'
     410        self.assertTrue(os.path.exists(tarball))
     411
     412    def _tarinfo(self, path):
     413        tar = tarfile.open(path)
     414        try:
     415            names = tar.getnames()
     416            names.sort()
     417            return tuple(names)
     418        finally:
     419            tar.close()
     420
     421    def _create_files(self):
     422        # creating something to tar
     423        tmpdir = self.mkdtemp()
     424        dist = os.path.join(tmpdir, 'dist')
     425        os.mkdir(dist)
     426        self.write_file([dist, 'file1'], 'xxx')
     427        self.write_file([dist, 'file2'], 'xxx')
     428        os.mkdir(os.path.join(dist, 'sub'))
     429        self.write_file([dist, 'sub', 'file3'], 'xxx')
     430        os.mkdir(os.path.join(dist, 'sub2'))
     431        tmpdir2 = self.mkdtemp()
     432        base_name = os.path.join(tmpdir2, 'archive')
     433        return tmpdir, tmpdir2, base_name
     434
     435    @unittest.skipUnless(zlib, "Requires zlib")
     436    @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
     437                         'Need the tar command to run')
     438    def test_tarfile_vs_tar(self):
     439        tmpdir, tmpdir2, base_name =  self._create_files()
     440        old_dir = os.getcwd()
     441        os.chdir(tmpdir)
     442        try:
     443            _make_tarball(base_name, 'dist')
     444        finally:
     445            os.chdir(old_dir)
     446
     447        # check if the compressed tarball was created
     448        tarball = base_name + '.tar.gz'
     449        self.assertTrue(os.path.exists(tarball))
     450
     451        # now create another tarball using `tar`
     452        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
     453        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
     454        gzip_cmd = ['gzip', '-f9', 'archive2.tar']
     455        old_dir = os.getcwd()
     456        os.chdir(tmpdir)
     457        try:
     458            with captured_stdout() as s:
     459                spawn(tar_cmd)
     460                spawn(gzip_cmd)
     461        finally:
     462            os.chdir(old_dir)
     463
     464        self.assertTrue(os.path.exists(tarball2))
     465        # let's compare both tarballs
     466        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
     467
     468        # trying an uncompressed one
     469        base_name = os.path.join(tmpdir2, 'archive')
     470        old_dir = os.getcwd()
     471        os.chdir(tmpdir)
     472        try:
     473            _make_tarball(base_name, 'dist', compress=None)
     474        finally:
     475            os.chdir(old_dir)
     476        tarball = base_name + '.tar'
     477        self.assertTrue(os.path.exists(tarball))
     478
     479        # now for a dry_run
     480        base_name = os.path.join(tmpdir2, 'archive')
     481        old_dir = os.getcwd()
     482        os.chdir(tmpdir)
     483        try:
     484            _make_tarball(base_name, 'dist', compress=None, dry_run=True)
     485        finally:
     486            os.chdir(old_dir)
     487        tarball = base_name + '.tar'
     488        self.assertTrue(os.path.exists(tarball))
     489
     490    @unittest.skipUnless(zlib, "Requires zlib")
     491    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
     492    def test_make_zipfile(self):
     493        # creating something to tar
     494        tmpdir = self.mkdtemp()
     495        self.write_file([tmpdir, 'file1'], 'xxx')
     496        self.write_file([tmpdir, 'file2'], 'xxx')
     497
     498        tmpdir2 = self.mkdtemp()
     499        # force shutil to create the directory
     500        os.rmdir(tmpdir2)
     501        base_name = os.path.join(tmpdir2, 'archive')
     502        _make_zipfile(base_name, tmpdir)
     503
     504        # check if the compressed tarball was created
     505        tarball = base_name + '.zip'
     506        self.assertTrue(os.path.exists(tarball))
     507
     508
     509    def test_make_archive(self):
     510        tmpdir = self.mkdtemp()
     511        base_name = os.path.join(tmpdir, 'archive')
     512        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
     513
     514    @unittest.skipUnless(zlib, "Requires zlib")
     515    def test_make_archive_owner_group(self):
     516        # testing make_archive with owner and group, with various combinations
     517        # this works even if there's not gid/uid support
     518        if UID_GID_SUPPORT:
     519            group = grp.getgrgid(0)[0]
     520            owner = pwd.getpwuid(0)[0]
     521        else:
     522            group = owner = 'root'
     523
     524        base_dir, root_dir, base_name =  self._create_files()
     525        base_name = os.path.join(self.mkdtemp() , 'archive')
     526        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
     527                           group=group)
     528        self.assertTrue(os.path.exists(res))
     529
     530        res = make_archive(base_name, 'zip', root_dir, base_dir)
     531        self.assertTrue(os.path.exists(res))
     532
     533        res = make_archive(base_name, 'tar', root_dir, base_dir,
     534                           owner=owner, group=group)
     535        self.assertTrue(os.path.exists(res))
     536
     537        res = make_archive(base_name, 'tar', root_dir, base_dir,
     538                           owner='kjhkjhkjg', group='oihohoh')
     539        self.assertTrue(os.path.exists(res))
     540
     541    @unittest.skipUnless(zlib, "Requires zlib")
     542    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
     543    def test_tarfile_root_owner(self):
     544        tmpdir, tmpdir2, base_name =  self._create_files()
     545        old_dir = os.getcwd()
     546        os.chdir(tmpdir)
     547        group = grp.getgrgid(0)[0]
     548        owner = pwd.getpwuid(0)[0]
     549        try:
     550            archive_name = _make_tarball(base_name, 'dist', compress=None,
     551                                         owner=owner, group=group)
     552        finally:
     553            os.chdir(old_dir)
     554
     555        # check if the compressed tarball was created
     556        self.assertTrue(os.path.exists(archive_name))
     557
     558        # now checks the rights
     559        archive = tarfile.open(archive_name)
     560        try:
     561            for member in archive.getmembers():
     562                self.assertEqual(member.uid, 0)
     563                self.assertEqual(member.gid, 0)
     564        finally:
     565            archive.close()
     566
     567    def test_make_archive_cwd(self):
     568        current_dir = os.getcwd()
     569        def _breaks(*args, **kw):
     570            raise RuntimeError()
     571
     572        register_archive_format('xxx', _breaks, [], 'xxx file')
     573        try:
     574            try:
     575                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
     576            except Exception:
     577                pass
     578            self.assertEqual(os.getcwd(), current_dir)
     579        finally:
     580            unregister_archive_format('xxx')
     581
     582    def test_register_archive_format(self):
     583
     584        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
     585        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
     586                          1)
     587        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
     588                          [(1, 2), (1, 2, 3)])
     589
     590        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
     591        formats = [name for name, params in get_archive_formats()]
     592        self.assertIn('xxx', formats)
     593
     594        unregister_archive_format('xxx')
     595        formats = [name for name, params in get_archive_formats()]
     596        self.assertNotIn('xxx', formats)
    231597
    232598
     
    260626
    261627    def _check_move_file(self, src, dst, real_dst):
    262         contents = open(src, "rb").read()
     628        with open(src, "rb") as f:
     629            contents = f.read()
    263630        shutil.move(src, dst)
    264         self.assertEqual(contents, open(real_dst, "rb").read())
     631        with open(real_dst, "rb") as f:
     632            self.assertEqual(contents, f.read())
    265633        self.assertFalse(os.path.exists(src))
    266634
     
    350718                src = os.path.join(TESTFN, src)
    351719                dst = os.path.join(TESTFN, dst)
    352                 self.assert_(shutil.destinsrc(src, dst),
    353                              msg='destinsrc() wrongly concluded that '
     720                self.assertTrue(shutil._destinsrc(src, dst),
     721                             msg='_destinsrc() wrongly concluded that '
    354722                             'dst (%s) is not in src (%s)' % (dst, src))
    355723        finally:
     
    362730                src = os.path.join(TESTFN, src)
    363731                dst = os.path.join(TESTFN, dst)
    364                 self.failIf(shutil.destinsrc(src, dst),
    365                             msg='destinsrc() wrongly concluded that '
     732                self.assertFalse(shutil._destinsrc(src, dst),
     733                            msg='_destinsrc() wrongly concluded that '
    366734                            'dst (%s) is in src (%s)' % (dst, src))
    367735        finally:
    368736            shutil.rmtree(TESTFN, ignore_errors=True)
    369737
     738
     739class TestCopyFile(unittest.TestCase):
     740
     741    _delete = False
     742
     743    class Faux(object):
     744        _entered = False
     745        _exited_with = None
     746        _raised = False
     747        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
     748            self._raise_in_exit = raise_in_exit
     749            self._suppress_at_exit = suppress_at_exit
     750        def read(self, *args):
     751            return ''
     752        def __enter__(self):
     753            self._entered = True
     754        def __exit__(self, exc_type, exc_val, exc_tb):
     755            self._exited_with = exc_type, exc_val, exc_tb
     756            if self._raise_in_exit:
     757                self._raised = True
     758                raise IOError("Cannot close")
     759            return self._suppress_at_exit
     760
     761    def tearDown(self):
     762        if self._delete:
     763            del shutil.open
     764
     765    def _set_shutil_open(self, func):
     766        shutil.open = func
     767        self._delete = True
     768
     769    def test_w_source_open_fails(self):
     770        def _open(filename, mode='r'):
     771            if filename == 'srcfile':
     772                raise IOError('Cannot open "srcfile"')
     773            assert 0  # shouldn't reach here.
     774
     775        self._set_shutil_open(_open)
     776
     777        self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
     778
     779    def test_w_dest_open_fails(self):
     780
     781        srcfile = self.Faux()
     782
     783        def _open(filename, mode='r'):
     784            if filename == 'srcfile':
     785                return srcfile
     786            if filename == 'destfile':
     787                raise IOError('Cannot open "destfile"')
     788            assert 0  # shouldn't reach here.
     789
     790        self._set_shutil_open(_open)
     791
     792        shutil.copyfile('srcfile', 'destfile')
     793        self.assertTrue(srcfile._entered)
     794        self.assertTrue(srcfile._exited_with[0] is IOError)
     795        self.assertEqual(srcfile._exited_with[1].args,
     796                         ('Cannot open "destfile"',))
     797
     798    def test_w_dest_close_fails(self):
     799
     800        srcfile = self.Faux()
     801        destfile = self.Faux(True)
     802
     803        def _open(filename, mode='r'):
     804            if filename == 'srcfile':
     805                return srcfile
     806            if filename == 'destfile':
     807                return destfile
     808            assert 0  # shouldn't reach here.
     809
     810        self._set_shutil_open(_open)
     811
     812        shutil.copyfile('srcfile', 'destfile')
     813        self.assertTrue(srcfile._entered)
     814        self.assertTrue(destfile._entered)
     815        self.assertTrue(destfile._raised)
     816        self.assertTrue(srcfile._exited_with[0] is IOError)
     817        self.assertEqual(srcfile._exited_with[1].args,
     818                         ('Cannot close',))
     819
     820    def test_w_source_close_fails(self):
     821
     822        srcfile = self.Faux(True)
     823        destfile = self.Faux()
     824
     825        def _open(filename, mode='r'):
     826            if filename == 'srcfile':
     827                return srcfile
     828            if filename == 'destfile':
     829                return destfile
     830            assert 0  # shouldn't reach here.
     831
     832        self._set_shutil_open(_open)
     833
     834        self.assertRaises(IOError,
     835                          shutil.copyfile, 'srcfile', 'destfile')
     836        self.assertTrue(srcfile._entered)
     837        self.assertTrue(destfile._entered)
     838        self.assertFalse(destfile._raised)
     839        self.assertTrue(srcfile._exited_with[0] is None)
     840        self.assertTrue(srcfile._raised)
     841
     842    def test_move_dir_caseinsensitive(self):
     843        # Renames a folder to the same name
     844        # but a different case.
     845
     846        self.src_dir = tempfile.mkdtemp()
     847        dst_dir = os.path.join(
     848                os.path.dirname(self.src_dir),
     849                os.path.basename(self.src_dir).upper())
     850        self.assertNotEqual(self.src_dir, dst_dir)
     851
     852        try:
     853            shutil.move(self.src_dir, dst_dir)
     854            self.assertTrue(os.path.isdir(dst_dir))
     855        finally:
     856            if os.path.exists(dst_dir):
     857                os.rmdir(dst_dir)
     858
     859
     860
    370861def test_main():
    371     test_support.run_unittest(TestShutil, TestMove)
     862    test_support.run_unittest(TestShutil, TestMove, TestCopyFile)
    372863
    373864if __name__ == '__main__':
Note: See TracChangeset for help on using the changeset viewer.