1 | # Copyright (C) 2003 Python Software Foundation
|
---|
2 |
|
---|
3 | import unittest
|
---|
4 | import shutil
|
---|
5 | import tempfile
|
---|
6 | import sys
|
---|
7 | import stat
|
---|
8 | import os
|
---|
9 | import os.path
|
---|
10 | import errno
|
---|
11 | from os.path import splitdrive
|
---|
12 | from distutils.spawn import find_executable, spawn
|
---|
13 | from shutil import (_make_tarball, _make_zipfile, make_archive,
|
---|
14 | register_archive_format, unregister_archive_format,
|
---|
15 | get_archive_formats)
|
---|
16 | import tarfile
|
---|
17 | import warnings
|
---|
18 |
|
---|
19 | from test import test_support
|
---|
20 | from test.test_support import TESTFN, check_warnings, captured_stdout
|
---|
21 |
|
---|
22 | TESTFN2 = TESTFN + "2"
|
---|
23 |
|
---|
24 | try:
|
---|
25 | import grp
|
---|
26 | import pwd
|
---|
27 | UID_GID_SUPPORT = True
|
---|
28 | except ImportError:
|
---|
29 | UID_GID_SUPPORT = False
|
---|
30 |
|
---|
31 | try:
|
---|
32 | import zlib
|
---|
33 | except ImportError:
|
---|
34 | zlib = None
|
---|
35 |
|
---|
36 | try:
|
---|
37 | import zipfile
|
---|
38 | ZIP_SUPPORT = True
|
---|
39 | except ImportError:
|
---|
40 | ZIP_SUPPORT = find_executable('zip')
|
---|
41 |
|
---|
42 | class TestShutil(unittest.TestCase):
|
---|
43 |
|
---|
44 | def setUp(self):
|
---|
45 | super(TestShutil, self).setUp()
|
---|
46 | self.tempdirs = []
|
---|
47 |
|
---|
48 | def tearDown(self):
|
---|
49 | super(TestShutil, self).tearDown()
|
---|
50 | while self.tempdirs:
|
---|
51 | d = self.tempdirs.pop()
|
---|
52 | shutil.rmtree(d, os.name in ('nt', 'cygwin'))
|
---|
53 |
|
---|
54 | def write_file(self, path, content='xxx'):
|
---|
55 | """Writes a file in the given path.
|
---|
56 |
|
---|
57 |
|
---|
58 | path can be a string or a sequence.
|
---|
59 | """
|
---|
60 | if isinstance(path, (list, tuple)):
|
---|
61 | path = os.path.join(*path)
|
---|
62 | f = open(path, 'w')
|
---|
63 | try:
|
---|
64 | f.write(content)
|
---|
65 | finally:
|
---|
66 | f.close()
|
---|
67 |
|
---|
68 | def mkdtemp(self):
|
---|
69 | """Create a temporary directory that will be cleaned up.
|
---|
70 |
|
---|
71 | Returns the path of the directory.
|
---|
72 | """
|
---|
73 | d = tempfile.mkdtemp()
|
---|
74 | self.tempdirs.append(d)
|
---|
75 | return d
|
---|
76 | def test_rmtree_errors(self):
|
---|
77 | # filename is guaranteed not to exist
|
---|
78 | filename = tempfile.mktemp()
|
---|
79 | self.assertRaises(OSError, shutil.rmtree, filename)
|
---|
80 |
|
---|
81 | # See bug #1071513 for why we don't run this on cygwin
|
---|
82 | # and bug #1076467 for why we don't run this as root.
|
---|
83 | if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
|
---|
84 | and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
|
---|
85 | def test_on_error(self):
|
---|
86 | self.errorState = 0
|
---|
87 | os.mkdir(TESTFN)
|
---|
88 | self.childpath = os.path.join(TESTFN, 'a')
|
---|
89 | f = open(self.childpath, 'w')
|
---|
90 | f.close()
|
---|
91 | old_dir_mode = os.stat(TESTFN).st_mode
|
---|
92 | old_child_mode = os.stat(self.childpath).st_mode
|
---|
93 | # Make unwritable.
|
---|
94 | os.chmod(self.childpath, stat.S_IREAD)
|
---|
95 | os.chmod(TESTFN, stat.S_IREAD)
|
---|
96 |
|
---|
97 | shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
|
---|
98 | # Test whether onerror has actually been called.
|
---|
99 | self.assertEqual(self.errorState, 2,
|
---|
100 | "Expected call to onerror function did not happen.")
|
---|
101 |
|
---|
102 | # Make writable again.
|
---|
103 | os.chmod(TESTFN, old_dir_mode)
|
---|
104 | os.chmod(self.childpath, old_child_mode)
|
---|
105 |
|
---|
106 | # Clean up.
|
---|
107 | shutil.rmtree(TESTFN)
|
---|
108 |
|
---|
109 | def check_args_to_onerror(self, func, arg, exc):
|
---|
110 | # test_rmtree_errors deliberately runs rmtree
|
---|
111 | # on a directory that is chmod 400, which will fail.
|
---|
112 | # This function is run when shutil.rmtree fails.
|
---|
113 | # 99.9% of the time it initially fails to remove
|
---|
114 | # a file in the directory, so the first time through
|
---|
115 | # func is os.remove.
|
---|
116 | # However, some Linux machines running ZFS on
|
---|
117 | # FUSE experienced a failure earlier in the process
|
---|
118 | # at os.listdir. The first failure may legally
|
---|
119 | # be either.
|
---|
120 | if self.errorState == 0:
|
---|
121 | if func is os.remove:
|
---|
122 | self.assertEqual(arg, self.childpath)
|
---|
123 | else:
|
---|
124 | self.assertIs(func, os.listdir,
|
---|
125 | "func must be either os.remove or os.listdir")
|
---|
126 | self.assertEqual(arg, TESTFN)
|
---|
127 | self.assertTrue(issubclass(exc[0], OSError))
|
---|
128 | self.errorState = 1
|
---|
129 | else:
|
---|
130 | self.assertEqual(func, os.rmdir)
|
---|
131 | self.assertEqual(arg, TESTFN)
|
---|
132 | self.assertTrue(issubclass(exc[0], OSError))
|
---|
133 | self.errorState = 2
|
---|
134 |
|
---|
135 | def test_rmtree_dont_delete_file(self):
|
---|
136 | # When called on a file instead of a directory, don't delete it.
|
---|
137 | handle, path = tempfile.mkstemp()
|
---|
138 | os.fdopen(handle).close()
|
---|
139 | self.assertRaises(OSError, shutil.rmtree, path)
|
---|
140 | os.remove(path)
|
---|
141 |
|
---|
142 | def test_copytree_simple(self):
|
---|
143 | def write_data(path, data):
|
---|
144 | f = open(path, "w")
|
---|
145 | f.write(data)
|
---|
146 | f.close()
|
---|
147 |
|
---|
148 | def read_data(path):
|
---|
149 | f = open(path)
|
---|
150 | data = f.read()
|
---|
151 | f.close()
|
---|
152 | return data
|
---|
153 |
|
---|
154 | src_dir = tempfile.mkdtemp()
|
---|
155 | dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
|
---|
156 |
|
---|
157 | write_data(os.path.join(src_dir, 'test.txt'), '123')
|
---|
158 |
|
---|
159 | os.mkdir(os.path.join(src_dir, 'test_dir'))
|
---|
160 | write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
|
---|
161 |
|
---|
162 | try:
|
---|
163 | shutil.copytree(src_dir, dst_dir)
|
---|
164 | self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
|
---|
165 | self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
|
---|
166 | self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
|
---|
167 | 'test.txt')))
|
---|
168 | actual = read_data(os.path.join(dst_dir, 'test.txt'))
|
---|
169 | self.assertEqual(actual, '123')
|
---|
170 | actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
|
---|
171 | self.assertEqual(actual, '456')
|
---|
172 | finally:
|
---|
173 | for path in (
|
---|
174 | os.path.join(src_dir, 'test.txt'),
|
---|
175 | os.path.join(dst_dir, 'test.txt'),
|
---|
176 | os.path.join(src_dir, 'test_dir', 'test.txt'),
|
---|
177 | os.path.join(dst_dir, 'test_dir', 'test.txt'),
|
---|
178 | ):
|
---|
179 | if os.path.exists(path):
|
---|
180 | os.remove(path)
|
---|
181 | for path in (src_dir,
|
---|
182 | os.path.dirname(dst_dir)
|
---|
183 | ):
|
---|
184 | if os.path.exists(path):
|
---|
185 | shutil.rmtree(path)
|
---|
186 |
|
---|
187 | def test_copytree_with_exclude(self):
|
---|
188 |
|
---|
189 | def write_data(path, data):
|
---|
190 | f = open(path, "w")
|
---|
191 | f.write(data)
|
---|
192 | f.close()
|
---|
193 |
|
---|
194 | def read_data(path):
|
---|
195 | f = open(path)
|
---|
196 | data = f.read()
|
---|
197 | f.close()
|
---|
198 | return data
|
---|
199 |
|
---|
200 | # creating data
|
---|
201 | join = os.path.join
|
---|
202 | exists = os.path.exists
|
---|
203 | src_dir = tempfile.mkdtemp()
|
---|
204 | try:
|
---|
205 | dst_dir = join(tempfile.mkdtemp(), 'destination')
|
---|
206 | write_data(join(src_dir, 'test.txt'), '123')
|
---|
207 | write_data(join(src_dir, 'test.tmp'), '123')
|
---|
208 | os.mkdir(join(src_dir, 'test_dir'))
|
---|
209 | write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
|
---|
210 | os.mkdir(join(src_dir, 'test_dir2'))
|
---|
211 | write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
|
---|
212 | os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
|
---|
213 | os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
|
---|
214 | write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
|
---|
215 | write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
|
---|
216 |
|
---|
217 |
|
---|
218 | # testing glob-like patterns
|
---|
219 | try:
|
---|
220 | patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
|
---|
221 | shutil.copytree(src_dir, dst_dir, ignore=patterns)
|
---|
222 | # checking the result: some elements should not be copied
|
---|
223 | self.assertTrue(exists(join(dst_dir, 'test.txt')))
|
---|
224 | self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
|
---|
225 | self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
|
---|
226 | finally:
|
---|
227 | if os.path.exists(dst_dir):
|
---|
228 | shutil.rmtree(dst_dir)
|
---|
229 | try:
|
---|
230 | patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
|
---|
231 | shutil.copytree(src_dir, dst_dir, ignore=patterns)
|
---|
232 | # checking the result: some elements should not be copied
|
---|
233 | self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
|
---|
234 | self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
|
---|
235 | self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
|
---|
236 | finally:
|
---|
237 | if os.path.exists(dst_dir):
|
---|
238 | shutil.rmtree(dst_dir)
|
---|
239 |
|
---|
240 | # testing callable-style
|
---|
241 | try:
|
---|
242 | def _filter(src, names):
|
---|
243 | res = []
|
---|
244 | for name in names:
|
---|
245 | path = os.path.join(src, name)
|
---|
246 |
|
---|
247 | if (os.path.isdir(path) and
|
---|
248 | path.split()[-1] == 'subdir'):
|
---|
249 | res.append(name)
|
---|
250 | elif os.path.splitext(path)[-1] in ('.py'):
|
---|
251 | res.append(name)
|
---|
252 | return res
|
---|
253 |
|
---|
254 | shutil.copytree(src_dir, dst_dir, ignore=_filter)
|
---|
255 |
|
---|
256 | # checking the result: some elements should not be copied
|
---|
257 | self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
|
---|
258 | 'test.py')))
|
---|
259 | self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
|
---|
260 |
|
---|
261 | finally:
|
---|
262 | if os.path.exists(dst_dir):
|
---|
263 | shutil.rmtree(dst_dir)
|
---|
264 | finally:
|
---|
265 | shutil.rmtree(src_dir)
|
---|
266 | shutil.rmtree(os.path.dirname(dst_dir))
|
---|
267 |
|
---|
268 | if hasattr(os, "symlink"):
|
---|
269 | def test_dont_copy_file_onto_link_to_itself(self):
|
---|
270 | # bug 851123.
|
---|
271 | os.mkdir(TESTFN)
|
---|
272 | src = os.path.join(TESTFN, 'cheese')
|
---|
273 | dst = os.path.join(TESTFN, 'shop')
|
---|
274 | try:
|
---|
275 | f = open(src, 'w')
|
---|
276 | f.write('cheddar')
|
---|
277 | f.close()
|
---|
278 |
|
---|
279 | os.link(src, dst)
|
---|
280 | self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
|
---|
281 | with open(src, 'r') as f:
|
---|
282 | self.assertEqual(f.read(), 'cheddar')
|
---|
283 | os.remove(dst)
|
---|
284 |
|
---|
285 | # Using `src` here would mean we end up with a symlink pointing
|
---|
286 | # to TESTFN/TESTFN/cheese, while it should point at
|
---|
287 | # TESTFN/cheese.
|
---|
288 | os.symlink('cheese', dst)
|
---|
289 | self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
|
---|
290 | with open(src, 'r') as f:
|
---|
291 | self.assertEqual(f.read(), 'cheddar')
|
---|
292 | os.remove(dst)
|
---|
293 | finally:
|
---|
294 | try:
|
---|
295 | shutil.rmtree(TESTFN)
|
---|
296 | except OSError:
|
---|
297 | pass
|
---|
298 |
|
---|
299 | def test_rmtree_on_symlink(self):
|
---|
300 | # bug 1669.
|
---|
301 | os.mkdir(TESTFN)
|
---|
302 | try:
|
---|
303 | src = os.path.join(TESTFN, 'cheese')
|
---|
304 | dst = os.path.join(TESTFN, 'shop')
|
---|
305 | os.mkdir(src)
|
---|
306 | os.symlink(src, dst)
|
---|
307 | self.assertRaises(OSError, shutil.rmtree, dst)
|
---|
308 | finally:
|
---|
309 | shutil.rmtree(TESTFN, ignore_errors=True)
|
---|
310 |
|
---|
311 | if hasattr(os, "mkfifo"):
|
---|
312 | # Issue #3002: copyfile and copytree block indefinitely on named pipes
|
---|
313 | def test_copyfile_named_pipe(self):
|
---|
314 | os.mkfifo(TESTFN)
|
---|
315 | try:
|
---|
316 | self.assertRaises(shutil.SpecialFileError,
|
---|
317 | shutil.copyfile, TESTFN, TESTFN2)
|
---|
318 | self.assertRaises(shutil.SpecialFileError,
|
---|
319 | shutil.copyfile, __file__, TESTFN)
|
---|
320 | finally:
|
---|
321 | os.remove(TESTFN)
|
---|
322 |
|
---|
323 | def test_copytree_named_pipe(self):
|
---|
324 | os.mkdir(TESTFN)
|
---|
325 | try:
|
---|
326 | subdir = os.path.join(TESTFN, "subdir")
|
---|
327 | os.mkdir(subdir)
|
---|
328 | pipe = os.path.join(subdir, "mypipe")
|
---|
329 | os.mkfifo(pipe)
|
---|
330 | try:
|
---|
331 | shutil.copytree(TESTFN, TESTFN2)
|
---|
332 | except shutil.Error as e:
|
---|
333 | errors = e.args[0]
|
---|
334 | self.assertEqual(len(errors), 1)
|
---|
335 | src, dst, error_msg = errors[0]
|
---|
336 | self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
|
---|
337 | else:
|
---|
338 | self.fail("shutil.Error should have been raised")
|
---|
339 | finally:
|
---|
340 | shutil.rmtree(TESTFN, ignore_errors=True)
|
---|
341 | shutil.rmtree(TESTFN2, ignore_errors=True)
|
---|
342 |
|
---|
343 | @unittest.skipUnless(hasattr(os, 'chflags') and
|
---|
344 | hasattr(errno, 'EOPNOTSUPP') and
|
---|
345 | hasattr(errno, 'ENOTSUP'),
|
---|
346 | "requires os.chflags, EOPNOTSUPP & ENOTSUP")
|
---|
347 | def test_copystat_handles_harmless_chflags_errors(self):
|
---|
348 | tmpdir = self.mkdtemp()
|
---|
349 | file1 = os.path.join(tmpdir, 'file1')
|
---|
350 | file2 = os.path.join(tmpdir, 'file2')
|
---|
351 | self.write_file(file1, 'xxx')
|
---|
352 | self.write_file(file2, 'xxx')
|
---|
353 |
|
---|
354 | def make_chflags_raiser(err):
|
---|
355 | ex = OSError()
|
---|
356 |
|
---|
357 | def _chflags_raiser(path, flags):
|
---|
358 | ex.errno = err
|
---|
359 | raise ex
|
---|
360 | return _chflags_raiser
|
---|
361 | old_chflags = os.chflags
|
---|
362 | try:
|
---|
363 | for err in errno.EOPNOTSUPP, errno.ENOTSUP:
|
---|
364 | os.chflags = make_chflags_raiser(err)
|
---|
365 | shutil.copystat(file1, file2)
|
---|
366 | # assert others errors break it
|
---|
367 | os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
|
---|
368 | self.assertRaises(OSError, shutil.copystat, file1, file2)
|
---|
369 | finally:
|
---|
370 | os.chflags = old_chflags
|
---|
371 |
|
---|
372 | @unittest.skipUnless(zlib, "requires zlib")
|
---|
373 | def test_make_tarball(self):
|
---|
374 | # creating something to tar
|
---|
375 | tmpdir = self.mkdtemp()
|
---|
376 | self.write_file([tmpdir, 'file1'], 'xxx')
|
---|
377 | self.write_file([tmpdir, 'file2'], 'xxx')
|
---|
378 | os.mkdir(os.path.join(tmpdir, 'sub'))
|
---|
379 | self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
|
---|
380 |
|
---|
381 | tmpdir2 = self.mkdtemp()
|
---|
382 | # force shutil to create the directory
|
---|
383 | os.rmdir(tmpdir2)
|
---|
384 | unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
|
---|
385 | "source and target should be on same drive")
|
---|
386 |
|
---|
387 | base_name = os.path.join(tmpdir2, 'archive')
|
---|
388 |
|
---|
389 | # working with relative paths to avoid tar warnings
|
---|
390 | old_dir = os.getcwd()
|
---|
391 | os.chdir(tmpdir)
|
---|
392 | try:
|
---|
393 | _make_tarball(splitdrive(base_name)[1], '.')
|
---|
394 | finally:
|
---|
395 | os.chdir(old_dir)
|
---|
396 |
|
---|
397 | # check if the compressed tarball was created
|
---|
398 | tarball = base_name + '.tar.gz'
|
---|
399 | self.assertTrue(os.path.exists(tarball))
|
---|
400 |
|
---|
401 | # trying an uncompressed one
|
---|
402 | base_name = os.path.join(tmpdir2, 'archive')
|
---|
403 | old_dir = os.getcwd()
|
---|
404 | os.chdir(tmpdir)
|
---|
405 | try:
|
---|
406 | _make_tarball(splitdrive(base_name)[1], '.', compress=None)
|
---|
407 | finally:
|
---|
408 | os.chdir(old_dir)
|
---|
409 | tarball = base_name + '.tar'
|
---|
410 | self.assertTrue(os.path.exists(tarball))
|
---|
411 |
|
---|
412 | def _tarinfo(self, path):
|
---|
413 | tar = tarfile.open(path)
|
---|
414 | try:
|
---|
415 | names = tar.getnames()
|
---|
416 | names.sort()
|
---|
417 | return tuple(names)
|
---|
418 | finally:
|
---|
419 | tar.close()
|
---|
420 |
|
---|
421 | def _create_files(self):
|
---|
422 | # creating something to tar
|
---|
423 | tmpdir = self.mkdtemp()
|
---|
424 | dist = os.path.join(tmpdir, 'dist')
|
---|
425 | os.mkdir(dist)
|
---|
426 | self.write_file([dist, 'file1'], 'xxx')
|
---|
427 | self.write_file([dist, 'file2'], 'xxx')
|
---|
428 | os.mkdir(os.path.join(dist, 'sub'))
|
---|
429 | self.write_file([dist, 'sub', 'file3'], 'xxx')
|
---|
430 | os.mkdir(os.path.join(dist, 'sub2'))
|
---|
431 | tmpdir2 = self.mkdtemp()
|
---|
432 | base_name = os.path.join(tmpdir2, 'archive')
|
---|
433 | return tmpdir, tmpdir2, base_name
|
---|
434 |
|
---|
435 | @unittest.skipUnless(zlib, "Requires zlib")
|
---|
436 | @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
|
---|
437 | 'Need the tar command to run')
|
---|
438 | def test_tarfile_vs_tar(self):
|
---|
439 | tmpdir, tmpdir2, base_name = self._create_files()
|
---|
440 | old_dir = os.getcwd()
|
---|
441 | os.chdir(tmpdir)
|
---|
442 | try:
|
---|
443 | _make_tarball(base_name, 'dist')
|
---|
444 | finally:
|
---|
445 | os.chdir(old_dir)
|
---|
446 |
|
---|
447 | # check if the compressed tarball was created
|
---|
448 | tarball = base_name + '.tar.gz'
|
---|
449 | self.assertTrue(os.path.exists(tarball))
|
---|
450 |
|
---|
451 | # now create another tarball using `tar`
|
---|
452 | tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
|
---|
453 | tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
|
---|
454 | gzip_cmd = ['gzip', '-f9', 'archive2.tar']
|
---|
455 | old_dir = os.getcwd()
|
---|
456 | os.chdir(tmpdir)
|
---|
457 | try:
|
---|
458 | with captured_stdout() as s:
|
---|
459 | spawn(tar_cmd)
|
---|
460 | spawn(gzip_cmd)
|
---|
461 | finally:
|
---|
462 | os.chdir(old_dir)
|
---|
463 |
|
---|
464 | self.assertTrue(os.path.exists(tarball2))
|
---|
465 | # let's compare both tarballs
|
---|
466 | self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
|
---|
467 |
|
---|
468 | # trying an uncompressed one
|
---|
469 | base_name = os.path.join(tmpdir2, 'archive')
|
---|
470 | old_dir = os.getcwd()
|
---|
471 | os.chdir(tmpdir)
|
---|
472 | try:
|
---|
473 | _make_tarball(base_name, 'dist', compress=None)
|
---|
474 | finally:
|
---|
475 | os.chdir(old_dir)
|
---|
476 | tarball = base_name + '.tar'
|
---|
477 | self.assertTrue(os.path.exists(tarball))
|
---|
478 |
|
---|
479 | # now for a dry_run
|
---|
480 | base_name = os.path.join(tmpdir2, 'archive')
|
---|
481 | old_dir = os.getcwd()
|
---|
482 | os.chdir(tmpdir)
|
---|
483 | try:
|
---|
484 | _make_tarball(base_name, 'dist', compress=None, dry_run=True)
|
---|
485 | finally:
|
---|
486 | os.chdir(old_dir)
|
---|
487 | tarball = base_name + '.tar'
|
---|
488 | self.assertTrue(os.path.exists(tarball))
|
---|
489 |
|
---|
490 | @unittest.skipUnless(zlib, "Requires zlib")
|
---|
491 | @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
|
---|
492 | def test_make_zipfile(self):
|
---|
493 | # creating something to tar
|
---|
494 | tmpdir = self.mkdtemp()
|
---|
495 | self.write_file([tmpdir, 'file1'], 'xxx')
|
---|
496 | self.write_file([tmpdir, 'file2'], 'xxx')
|
---|
497 |
|
---|
498 | tmpdir2 = self.mkdtemp()
|
---|
499 | # force shutil to create the directory
|
---|
500 | os.rmdir(tmpdir2)
|
---|
501 | base_name = os.path.join(tmpdir2, 'archive')
|
---|
502 | _make_zipfile(base_name, tmpdir)
|
---|
503 |
|
---|
504 | # check if the compressed tarball was created
|
---|
505 | tarball = base_name + '.zip'
|
---|
506 | self.assertTrue(os.path.exists(tarball))
|
---|
507 |
|
---|
508 |
|
---|
509 | def test_make_archive(self):
|
---|
510 | tmpdir = self.mkdtemp()
|
---|
511 | base_name = os.path.join(tmpdir, 'archive')
|
---|
512 | self.assertRaises(ValueError, make_archive, base_name, 'xxx')
|
---|
513 |
|
---|
514 | @unittest.skipUnless(zlib, "Requires zlib")
|
---|
515 | def test_make_archive_owner_group(self):
|
---|
516 | # testing make_archive with owner and group, with various combinations
|
---|
517 | # this works even if there's not gid/uid support
|
---|
518 | if UID_GID_SUPPORT:
|
---|
519 | group = grp.getgrgid(0)[0]
|
---|
520 | owner = pwd.getpwuid(0)[0]
|
---|
521 | else:
|
---|
522 | group = owner = 'root'
|
---|
523 |
|
---|
524 | base_dir, root_dir, base_name = self._create_files()
|
---|
525 | base_name = os.path.join(self.mkdtemp() , 'archive')
|
---|
526 | res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
|
---|
527 | group=group)
|
---|
528 | self.assertTrue(os.path.exists(res))
|
---|
529 |
|
---|
530 | res = make_archive(base_name, 'zip', root_dir, base_dir)
|
---|
531 | self.assertTrue(os.path.exists(res))
|
---|
532 |
|
---|
533 | res = make_archive(base_name, 'tar', root_dir, base_dir,
|
---|
534 | owner=owner, group=group)
|
---|
535 | self.assertTrue(os.path.exists(res))
|
---|
536 |
|
---|
537 | res = make_archive(base_name, 'tar', root_dir, base_dir,
|
---|
538 | owner='kjhkjhkjg', group='oihohoh')
|
---|
539 | self.assertTrue(os.path.exists(res))
|
---|
540 |
|
---|
541 | @unittest.skipUnless(zlib, "Requires zlib")
|
---|
542 | @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
|
---|
543 | def test_tarfile_root_owner(self):
|
---|
544 | tmpdir, tmpdir2, base_name = self._create_files()
|
---|
545 | old_dir = os.getcwd()
|
---|
546 | os.chdir(tmpdir)
|
---|
547 | group = grp.getgrgid(0)[0]
|
---|
548 | owner = pwd.getpwuid(0)[0]
|
---|
549 | try:
|
---|
550 | archive_name = _make_tarball(base_name, 'dist', compress=None,
|
---|
551 | owner=owner, group=group)
|
---|
552 | finally:
|
---|
553 | os.chdir(old_dir)
|
---|
554 |
|
---|
555 | # check if the compressed tarball was created
|
---|
556 | self.assertTrue(os.path.exists(archive_name))
|
---|
557 |
|
---|
558 | # now checks the rights
|
---|
559 | archive = tarfile.open(archive_name)
|
---|
560 | try:
|
---|
561 | for member in archive.getmembers():
|
---|
562 | self.assertEqual(member.uid, 0)
|
---|
563 | self.assertEqual(member.gid, 0)
|
---|
564 | finally:
|
---|
565 | archive.close()
|
---|
566 |
|
---|
567 | def test_make_archive_cwd(self):
|
---|
568 | current_dir = os.getcwd()
|
---|
569 | def _breaks(*args, **kw):
|
---|
570 | raise RuntimeError()
|
---|
571 |
|
---|
572 | register_archive_format('xxx', _breaks, [], 'xxx file')
|
---|
573 | try:
|
---|
574 | try:
|
---|
575 | make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
|
---|
576 | except Exception:
|
---|
577 | pass
|
---|
578 | self.assertEqual(os.getcwd(), current_dir)
|
---|
579 | finally:
|
---|
580 | unregister_archive_format('xxx')
|
---|
581 |
|
---|
582 | def test_register_archive_format(self):
|
---|
583 |
|
---|
584 | self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
|
---|
585 | self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
|
---|
586 | 1)
|
---|
587 | self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
|
---|
588 | [(1, 2), (1, 2, 3)])
|
---|
589 |
|
---|
590 | register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
|
---|
591 | formats = [name for name, params in get_archive_formats()]
|
---|
592 | self.assertIn('xxx', formats)
|
---|
593 |
|
---|
594 | unregister_archive_format('xxx')
|
---|
595 | formats = [name for name, params in get_archive_formats()]
|
---|
596 | self.assertNotIn('xxx', formats)
|
---|
597 |
|
---|
598 |
|
---|
599 | class TestMove(unittest.TestCase):
|
---|
600 |
|
---|
601 | def setUp(self):
|
---|
602 | filename = "foo"
|
---|
603 | self.src_dir = tempfile.mkdtemp()
|
---|
604 | self.dst_dir = tempfile.mkdtemp()
|
---|
605 | self.src_file = os.path.join(self.src_dir, filename)
|
---|
606 | self.dst_file = os.path.join(self.dst_dir, filename)
|
---|
607 | # Try to create a dir in the current directory, hoping that it is
|
---|
608 | # not located on the same filesystem as the system tmp dir.
|
---|
609 | try:
|
---|
610 | self.dir_other_fs = tempfile.mkdtemp(
|
---|
611 | dir=os.path.dirname(__file__))
|
---|
612 | self.file_other_fs = os.path.join(self.dir_other_fs,
|
---|
613 | filename)
|
---|
614 | except OSError:
|
---|
615 | self.dir_other_fs = None
|
---|
616 | with open(self.src_file, "wb") as f:
|
---|
617 | f.write("spam")
|
---|
618 |
|
---|
619 | def tearDown(self):
|
---|
620 | for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
|
---|
621 | try:
|
---|
622 | if d:
|
---|
623 | shutil.rmtree(d)
|
---|
624 | except:
|
---|
625 | pass
|
---|
626 |
|
---|
627 | def _check_move_file(self, src, dst, real_dst):
|
---|
628 | with open(src, "rb") as f:
|
---|
629 | contents = f.read()
|
---|
630 | shutil.move(src, dst)
|
---|
631 | with open(real_dst, "rb") as f:
|
---|
632 | self.assertEqual(contents, f.read())
|
---|
633 | self.assertFalse(os.path.exists(src))
|
---|
634 |
|
---|
635 | def _check_move_dir(self, src, dst, real_dst):
|
---|
636 | contents = sorted(os.listdir(src))
|
---|
637 | shutil.move(src, dst)
|
---|
638 | self.assertEqual(contents, sorted(os.listdir(real_dst)))
|
---|
639 | self.assertFalse(os.path.exists(src))
|
---|
640 |
|
---|
641 | def test_move_file(self):
|
---|
642 | # Move a file to another location on the same filesystem.
|
---|
643 | self._check_move_file(self.src_file, self.dst_file, self.dst_file)
|
---|
644 |
|
---|
645 | def test_move_file_to_dir(self):
|
---|
646 | # Move a file inside an existing dir on the same filesystem.
|
---|
647 | self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
|
---|
648 |
|
---|
649 | def test_move_file_other_fs(self):
|
---|
650 | # Move a file to an existing dir on another filesystem.
|
---|
651 | if not self.dir_other_fs:
|
---|
652 | # skip
|
---|
653 | return
|
---|
654 | self._check_move_file(self.src_file, self.file_other_fs,
|
---|
655 | self.file_other_fs)
|
---|
656 |
|
---|
657 | def test_move_file_to_dir_other_fs(self):
|
---|
658 | # Move a file to another location on another filesystem.
|
---|
659 | if not self.dir_other_fs:
|
---|
660 | # skip
|
---|
661 | return
|
---|
662 | self._check_move_file(self.src_file, self.dir_other_fs,
|
---|
663 | self.file_other_fs)
|
---|
664 |
|
---|
665 | def test_move_dir(self):
|
---|
666 | # Move a dir to another location on the same filesystem.
|
---|
667 | dst_dir = tempfile.mktemp()
|
---|
668 | try:
|
---|
669 | self._check_move_dir(self.src_dir, dst_dir, dst_dir)
|
---|
670 | finally:
|
---|
671 | try:
|
---|
672 | shutil.rmtree(dst_dir)
|
---|
673 | except:
|
---|
674 | pass
|
---|
675 |
|
---|
676 | def test_move_dir_other_fs(self):
|
---|
677 | # Move a dir to another location on another filesystem.
|
---|
678 | if not self.dir_other_fs:
|
---|
679 | # skip
|
---|
680 | return
|
---|
681 | dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
|
---|
682 | try:
|
---|
683 | self._check_move_dir(self.src_dir, dst_dir, dst_dir)
|
---|
684 | finally:
|
---|
685 | try:
|
---|
686 | shutil.rmtree(dst_dir)
|
---|
687 | except:
|
---|
688 | pass
|
---|
689 |
|
---|
690 | def test_move_dir_to_dir(self):
|
---|
691 | # Move a dir inside an existing dir on the same filesystem.
|
---|
692 | self._check_move_dir(self.src_dir, self.dst_dir,
|
---|
693 | os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
|
---|
694 |
|
---|
695 | def test_move_dir_to_dir_other_fs(self):
|
---|
696 | # Move a dir inside an existing dir on another filesystem.
|
---|
697 | if not self.dir_other_fs:
|
---|
698 | # skip
|
---|
699 | return
|
---|
700 | self._check_move_dir(self.src_dir, self.dir_other_fs,
|
---|
701 | os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
|
---|
702 |
|
---|
703 | def test_existing_file_inside_dest_dir(self):
|
---|
704 | # A file with the same name inside the destination dir already exists.
|
---|
705 | with open(self.dst_file, "wb"):
|
---|
706 | pass
|
---|
707 | self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
|
---|
708 |
|
---|
709 | def test_dont_move_dir_in_itself(self):
|
---|
710 | # Moving a dir inside itself raises an Error.
|
---|
711 | dst = os.path.join(self.src_dir, "bar")
|
---|
712 | self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
|
---|
713 |
|
---|
714 | def test_destinsrc_false_negative(self):
|
---|
715 | os.mkdir(TESTFN)
|
---|
716 | try:
|
---|
717 | for src, dst in [('srcdir', 'srcdir/dest')]:
|
---|
718 | src = os.path.join(TESTFN, src)
|
---|
719 | dst = os.path.join(TESTFN, dst)
|
---|
720 | self.assertTrue(shutil._destinsrc(src, dst),
|
---|
721 | msg='_destinsrc() wrongly concluded that '
|
---|
722 | 'dst (%s) is not in src (%s)' % (dst, src))
|
---|
723 | finally:
|
---|
724 | shutil.rmtree(TESTFN, ignore_errors=True)
|
---|
725 |
|
---|
726 | def test_destinsrc_false_positive(self):
|
---|
727 | os.mkdir(TESTFN)
|
---|
728 | try:
|
---|
729 | for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
|
---|
730 | src = os.path.join(TESTFN, src)
|
---|
731 | dst = os.path.join(TESTFN, dst)
|
---|
732 | self.assertFalse(shutil._destinsrc(src, dst),
|
---|
733 | msg='_destinsrc() wrongly concluded that '
|
---|
734 | 'dst (%s) is in src (%s)' % (dst, src))
|
---|
735 | finally:
|
---|
736 | shutil.rmtree(TESTFN, ignore_errors=True)
|
---|
737 |
|
---|
738 |
|
---|
739 | class TestCopyFile(unittest.TestCase):
|
---|
740 |
|
---|
741 | _delete = False
|
---|
742 |
|
---|
743 | class Faux(object):
|
---|
744 | _entered = False
|
---|
745 | _exited_with = None
|
---|
746 | _raised = False
|
---|
747 | def __init__(self, raise_in_exit=False, suppress_at_exit=True):
|
---|
748 | self._raise_in_exit = raise_in_exit
|
---|
749 | self._suppress_at_exit = suppress_at_exit
|
---|
750 | def read(self, *args):
|
---|
751 | return ''
|
---|
752 | def __enter__(self):
|
---|
753 | self._entered = True
|
---|
754 | def __exit__(self, exc_type, exc_val, exc_tb):
|
---|
755 | self._exited_with = exc_type, exc_val, exc_tb
|
---|
756 | if self._raise_in_exit:
|
---|
757 | self._raised = True
|
---|
758 | raise IOError("Cannot close")
|
---|
759 | return self._suppress_at_exit
|
---|
760 |
|
---|
761 | def tearDown(self):
|
---|
762 | if self._delete:
|
---|
763 | del shutil.open
|
---|
764 |
|
---|
765 | def _set_shutil_open(self, func):
|
---|
766 | shutil.open = func
|
---|
767 | self._delete = True
|
---|
768 |
|
---|
769 | def test_w_source_open_fails(self):
|
---|
770 | def _open(filename, mode='r'):
|
---|
771 | if filename == 'srcfile':
|
---|
772 | raise IOError('Cannot open "srcfile"')
|
---|
773 | assert 0 # shouldn't reach here.
|
---|
774 |
|
---|
775 | self._set_shutil_open(_open)
|
---|
776 |
|
---|
777 | self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
|
---|
778 |
|
---|
779 | def test_w_dest_open_fails(self):
|
---|
780 |
|
---|
781 | srcfile = self.Faux()
|
---|
782 |
|
---|
783 | def _open(filename, mode='r'):
|
---|
784 | if filename == 'srcfile':
|
---|
785 | return srcfile
|
---|
786 | if filename == 'destfile':
|
---|
787 | raise IOError('Cannot open "destfile"')
|
---|
788 | assert 0 # shouldn't reach here.
|
---|
789 |
|
---|
790 | self._set_shutil_open(_open)
|
---|
791 |
|
---|
792 | shutil.copyfile('srcfile', 'destfile')
|
---|
793 | self.assertTrue(srcfile._entered)
|
---|
794 | self.assertTrue(srcfile._exited_with[0] is IOError)
|
---|
795 | self.assertEqual(srcfile._exited_with[1].args,
|
---|
796 | ('Cannot open "destfile"',))
|
---|
797 |
|
---|
798 | def test_w_dest_close_fails(self):
|
---|
799 |
|
---|
800 | srcfile = self.Faux()
|
---|
801 | destfile = self.Faux(True)
|
---|
802 |
|
---|
803 | def _open(filename, mode='r'):
|
---|
804 | if filename == 'srcfile':
|
---|
805 | return srcfile
|
---|
806 | if filename == 'destfile':
|
---|
807 | return destfile
|
---|
808 | assert 0 # shouldn't reach here.
|
---|
809 |
|
---|
810 | self._set_shutil_open(_open)
|
---|
811 |
|
---|
812 | shutil.copyfile('srcfile', 'destfile')
|
---|
813 | self.assertTrue(srcfile._entered)
|
---|
814 | self.assertTrue(destfile._entered)
|
---|
815 | self.assertTrue(destfile._raised)
|
---|
816 | self.assertTrue(srcfile._exited_with[0] is IOError)
|
---|
817 | self.assertEqual(srcfile._exited_with[1].args,
|
---|
818 | ('Cannot close',))
|
---|
819 |
|
---|
820 | def test_w_source_close_fails(self):
|
---|
821 |
|
---|
822 | srcfile = self.Faux(True)
|
---|
823 | destfile = self.Faux()
|
---|
824 |
|
---|
825 | def _open(filename, mode='r'):
|
---|
826 | if filename == 'srcfile':
|
---|
827 | return srcfile
|
---|
828 | if filename == 'destfile':
|
---|
829 | return destfile
|
---|
830 | assert 0 # shouldn't reach here.
|
---|
831 |
|
---|
832 | self._set_shutil_open(_open)
|
---|
833 |
|
---|
834 | self.assertRaises(IOError,
|
---|
835 | shutil.copyfile, 'srcfile', 'destfile')
|
---|
836 | self.assertTrue(srcfile._entered)
|
---|
837 | self.assertTrue(destfile._entered)
|
---|
838 | self.assertFalse(destfile._raised)
|
---|
839 | self.assertTrue(srcfile._exited_with[0] is None)
|
---|
840 | self.assertTrue(srcfile._raised)
|
---|
841 |
|
---|
842 | def test_move_dir_caseinsensitive(self):
|
---|
843 | # Renames a folder to the same name
|
---|
844 | # but a different case.
|
---|
845 |
|
---|
846 | self.src_dir = tempfile.mkdtemp()
|
---|
847 | dst_dir = os.path.join(
|
---|
848 | os.path.dirname(self.src_dir),
|
---|
849 | os.path.basename(self.src_dir).upper())
|
---|
850 | self.assertNotEqual(self.src_dir, dst_dir)
|
---|
851 |
|
---|
852 | try:
|
---|
853 | shutil.move(self.src_dir, dst_dir)
|
---|
854 | self.assertTrue(os.path.isdir(dst_dir))
|
---|
855 | finally:
|
---|
856 | if os.path.exists(dst_dir):
|
---|
857 | os.rmdir(dst_dir)
|
---|
858 |
|
---|
859 |
|
---|
860 |
|
---|
861 | def test_main():
|
---|
862 | test_support.run_unittest(TestShutil, TestMove, TestCopyFile)
|
---|
863 |
|
---|
864 | if __name__ == '__main__':
|
---|
865 | test_main()
|
---|