1 | # tempfile.py unit tests.
|
---|
2 | import tempfile
|
---|
3 | import errno
|
---|
4 | import io
|
---|
5 | import os
|
---|
6 | import signal
|
---|
7 | import shutil
|
---|
8 | import sys
|
---|
9 | import re
|
---|
10 | import warnings
|
---|
11 | import contextlib
|
---|
12 |
|
---|
13 | import unittest
|
---|
14 | from test import test_support as support
|
---|
15 |
|
---|
16 | warnings.filterwarnings("ignore",
|
---|
17 | category=RuntimeWarning,
|
---|
18 | message="mktemp", module=__name__)
|
---|
19 |
|
---|
20 | if hasattr(os, 'stat'):
|
---|
21 | import stat
|
---|
22 | has_stat = 1
|
---|
23 | else:
|
---|
24 | has_stat = 0
|
---|
25 |
|
---|
26 | has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
|
---|
27 | has_spawnl = hasattr(os, 'spawnl')
|
---|
28 |
|
---|
29 | # TEST_FILES may need to be tweaked for systems depending on the maximum
|
---|
30 | # number of files that can be opened at one time (see ulimit -n)
|
---|
31 | if sys.platform in ('openbsd3', 'openbsd4'):
|
---|
32 | TEST_FILES = 48
|
---|
33 | else:
|
---|
34 | TEST_FILES = 100
|
---|
35 |
|
---|
36 | # This is organized as one test for each chunk of code in tempfile.py,
|
---|
37 | # in order of their appearance in the file. Testing which requires
|
---|
38 | # threads is not done here.
|
---|
39 |
|
---|
40 | # Common functionality.
|
---|
41 | class TC(unittest.TestCase):
|
---|
42 |
|
---|
43 | str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
|
---|
44 |
|
---|
45 | def failOnException(self, what, ei=None):
|
---|
46 | if ei is None:
|
---|
47 | ei = sys.exc_info()
|
---|
48 | self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
|
---|
49 |
|
---|
50 | def nameCheck(self, name, dir, pre, suf):
|
---|
51 | (ndir, nbase) = os.path.split(name)
|
---|
52 | npre = nbase[:len(pre)]
|
---|
53 | nsuf = nbase[len(nbase)-len(suf):]
|
---|
54 |
|
---|
55 | # check for equality of the absolute paths!
|
---|
56 | self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
|
---|
57 | "file '%s' not in directory '%s'" % (name, dir))
|
---|
58 | self.assertEqual(npre, pre,
|
---|
59 | "file '%s' does not begin with '%s'" % (nbase, pre))
|
---|
60 | self.assertEqual(nsuf, suf,
|
---|
61 | "file '%s' does not end with '%s'" % (nbase, suf))
|
---|
62 |
|
---|
63 | nbase = nbase[len(pre):len(nbase)-len(suf)]
|
---|
64 | self.assertTrue(self.str_check.match(nbase),
|
---|
65 | "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
|
---|
66 | % nbase)
|
---|
67 |
|
---|
68 | test_classes = []
|
---|
69 |
|
---|
70 | class test_exports(TC):
|
---|
71 | def test_exports(self):
|
---|
72 | # There are no surprising symbols in the tempfile module
|
---|
73 | dict = tempfile.__dict__
|
---|
74 |
|
---|
75 | expected = {
|
---|
76 | "NamedTemporaryFile" : 1,
|
---|
77 | "TemporaryFile" : 1,
|
---|
78 | "mkstemp" : 1,
|
---|
79 | "mkdtemp" : 1,
|
---|
80 | "mktemp" : 1,
|
---|
81 | "TMP_MAX" : 1,
|
---|
82 | "gettempprefix" : 1,
|
---|
83 | "gettempdir" : 1,
|
---|
84 | "tempdir" : 1,
|
---|
85 | "template" : 1,
|
---|
86 | "SpooledTemporaryFile" : 1
|
---|
87 | }
|
---|
88 |
|
---|
89 | unexp = []
|
---|
90 | for key in dict:
|
---|
91 | if key[0] != '_' and key not in expected:
|
---|
92 | unexp.append(key)
|
---|
93 | self.assertTrue(len(unexp) == 0,
|
---|
94 | "unexpected keys: %s" % unexp)
|
---|
95 |
|
---|
96 | test_classes.append(test_exports)
|
---|
97 |
|
---|
98 |
|
---|
99 | class test__RandomNameSequence(TC):
|
---|
100 | """Test the internal iterator object _RandomNameSequence."""
|
---|
101 |
|
---|
102 | def setUp(self):
|
---|
103 | self.r = tempfile._RandomNameSequence()
|
---|
104 |
|
---|
105 | def test_get_six_char_str(self):
|
---|
106 | # _RandomNameSequence returns a six-character string
|
---|
107 | s = self.r.next()
|
---|
108 | self.nameCheck(s, '', '', '')
|
---|
109 |
|
---|
110 | def test_many(self):
|
---|
111 | # _RandomNameSequence returns no duplicate strings (stochastic)
|
---|
112 |
|
---|
113 | dict = {}
|
---|
114 | r = self.r
|
---|
115 | for i in xrange(TEST_FILES):
|
---|
116 | s = r.next()
|
---|
117 | self.nameCheck(s, '', '', '')
|
---|
118 | self.assertNotIn(s, dict)
|
---|
119 | dict[s] = 1
|
---|
120 |
|
---|
121 | def test_supports_iter(self):
|
---|
122 | # _RandomNameSequence supports the iterator protocol
|
---|
123 |
|
---|
124 | i = 0
|
---|
125 | r = self.r
|
---|
126 | try:
|
---|
127 | for s in r:
|
---|
128 | i += 1
|
---|
129 | if i == 20:
|
---|
130 | break
|
---|
131 | except:
|
---|
132 | self.failOnException("iteration")
|
---|
133 |
|
---|
134 | @unittest.skipUnless(hasattr(os, 'fork'),
|
---|
135 | "os.fork is required for this test")
|
---|
136 | def test_process_awareness(self):
|
---|
137 | # ensure that the random source differs between
|
---|
138 | # child and parent.
|
---|
139 | read_fd, write_fd = os.pipe()
|
---|
140 | pid = None
|
---|
141 | try:
|
---|
142 | pid = os.fork()
|
---|
143 | if not pid:
|
---|
144 | os.close(read_fd)
|
---|
145 | os.write(write_fd, next(self.r).encode("ascii"))
|
---|
146 | os.close(write_fd)
|
---|
147 | # bypass the normal exit handlers- leave those to
|
---|
148 | # the parent.
|
---|
149 | os._exit(0)
|
---|
150 | parent_value = next(self.r)
|
---|
151 | child_value = os.read(read_fd, len(parent_value)).decode("ascii")
|
---|
152 | finally:
|
---|
153 | if pid:
|
---|
154 | # best effort to ensure the process can't bleed out
|
---|
155 | # via any bugs above
|
---|
156 | try:
|
---|
157 | os.kill(pid, signal.SIGKILL)
|
---|
158 | except EnvironmentError:
|
---|
159 | pass
|
---|
160 | os.close(read_fd)
|
---|
161 | os.close(write_fd)
|
---|
162 | self.assertNotEqual(child_value, parent_value)
|
---|
163 |
|
---|
164 |
|
---|
165 | test_classes.append(test__RandomNameSequence)
|
---|
166 |
|
---|
167 |
|
---|
168 | class test__candidate_tempdir_list(TC):
|
---|
169 | """Test the internal function _candidate_tempdir_list."""
|
---|
170 |
|
---|
171 | def test_nonempty_list(self):
|
---|
172 | # _candidate_tempdir_list returns a nonempty list of strings
|
---|
173 |
|
---|
174 | cand = tempfile._candidate_tempdir_list()
|
---|
175 |
|
---|
176 | self.assertFalse(len(cand) == 0)
|
---|
177 | for c in cand:
|
---|
178 | self.assertIsInstance(c, basestring)
|
---|
179 |
|
---|
180 | def test_wanted_dirs(self):
|
---|
181 | # _candidate_tempdir_list contains the expected directories
|
---|
182 |
|
---|
183 | # Make sure the interesting environment variables are all set.
|
---|
184 | with support.EnvironmentVarGuard() as env:
|
---|
185 | for envname in 'TMPDIR', 'TEMP', 'TMP':
|
---|
186 | dirname = os.getenv(envname)
|
---|
187 | if not dirname:
|
---|
188 | env[envname] = os.path.abspath(envname)
|
---|
189 |
|
---|
190 | cand = tempfile._candidate_tempdir_list()
|
---|
191 |
|
---|
192 | for envname in 'TMPDIR', 'TEMP', 'TMP':
|
---|
193 | dirname = os.getenv(envname)
|
---|
194 | if not dirname: raise ValueError
|
---|
195 | self.assertIn(dirname, cand)
|
---|
196 |
|
---|
197 | try:
|
---|
198 | dirname = os.getcwd()
|
---|
199 | except (AttributeError, os.error):
|
---|
200 | dirname = os.curdir
|
---|
201 |
|
---|
202 | self.assertIn(dirname, cand)
|
---|
203 |
|
---|
204 | # Not practical to try to verify the presence of OS-specific
|
---|
205 | # paths in this list.
|
---|
206 |
|
---|
207 | test_classes.append(test__candidate_tempdir_list)
|
---|
208 |
|
---|
209 | # We test _get_default_tempdir some more by testing gettempdir.
|
---|
210 |
|
---|
211 | class TestGetDefaultTempdir(TC):
|
---|
212 | """Test _get_default_tempdir()."""
|
---|
213 |
|
---|
214 | def test_no_files_left_behind(self):
|
---|
215 | # use a private empty directory
|
---|
216 | our_temp_directory = tempfile.mkdtemp()
|
---|
217 | try:
|
---|
218 | # force _get_default_tempdir() to consider our empty directory
|
---|
219 | def our_candidate_list():
|
---|
220 | return [our_temp_directory]
|
---|
221 |
|
---|
222 | with support.swap_attr(tempfile, "_candidate_tempdir_list",
|
---|
223 | our_candidate_list):
|
---|
224 | # verify our directory is empty after _get_default_tempdir()
|
---|
225 | tempfile._get_default_tempdir()
|
---|
226 | self.assertEqual(os.listdir(our_temp_directory), [])
|
---|
227 |
|
---|
228 | def raise_OSError(*args, **kwargs):
|
---|
229 | raise OSError(-1)
|
---|
230 |
|
---|
231 | with support.swap_attr(io, "open", raise_OSError):
|
---|
232 | # test again with failing io.open()
|
---|
233 | with self.assertRaises(IOError) as cm:
|
---|
234 | tempfile._get_default_tempdir()
|
---|
235 | self.assertEqual(cm.exception.errno, errno.ENOENT)
|
---|
236 | self.assertEqual(os.listdir(our_temp_directory), [])
|
---|
237 |
|
---|
238 | open = io.open
|
---|
239 | def bad_writer(*args, **kwargs):
|
---|
240 | fp = open(*args, **kwargs)
|
---|
241 | fp.write = raise_OSError
|
---|
242 | return fp
|
---|
243 |
|
---|
244 | with support.swap_attr(io, "open", bad_writer):
|
---|
245 | # test again with failing write()
|
---|
246 | with self.assertRaises(IOError) as cm:
|
---|
247 | tempfile._get_default_tempdir()
|
---|
248 | self.assertEqual(cm.exception.errno, errno.ENOENT)
|
---|
249 | self.assertEqual(os.listdir(our_temp_directory), [])
|
---|
250 | finally:
|
---|
251 | shutil.rmtree(our_temp_directory)
|
---|
252 |
|
---|
253 | test_classes.append(TestGetDefaultTempdir)
|
---|
254 |
|
---|
255 |
|
---|
256 | class test__get_candidate_names(TC):
|
---|
257 | """Test the internal function _get_candidate_names."""
|
---|
258 |
|
---|
259 | def test_retval(self):
|
---|
260 | # _get_candidate_names returns a _RandomNameSequence object
|
---|
261 | obj = tempfile._get_candidate_names()
|
---|
262 | self.assertIsInstance(obj, tempfile._RandomNameSequence)
|
---|
263 |
|
---|
264 | def test_same_thing(self):
|
---|
265 | # _get_candidate_names always returns the same object
|
---|
266 | a = tempfile._get_candidate_names()
|
---|
267 | b = tempfile._get_candidate_names()
|
---|
268 |
|
---|
269 | self.assertTrue(a is b)
|
---|
270 |
|
---|
271 | test_classes.append(test__get_candidate_names)
|
---|
272 |
|
---|
273 |
|
---|
274 | @contextlib.contextmanager
|
---|
275 | def _inside_empty_temp_dir():
|
---|
276 | dir = tempfile.mkdtemp()
|
---|
277 | try:
|
---|
278 | with support.swap_attr(tempfile, 'tempdir', dir):
|
---|
279 | yield
|
---|
280 | finally:
|
---|
281 | support.rmtree(dir)
|
---|
282 |
|
---|
283 |
|
---|
284 | def _mock_candidate_names(*names):
|
---|
285 | return support.swap_attr(tempfile,
|
---|
286 | '_get_candidate_names',
|
---|
287 | lambda: iter(names))
|
---|
288 |
|
---|
289 |
|
---|
290 | class test__mkstemp_inner(TC):
|
---|
291 | """Test the internal function _mkstemp_inner."""
|
---|
292 |
|
---|
293 | class mkstemped:
|
---|
294 | _bflags = tempfile._bin_openflags
|
---|
295 | _tflags = tempfile._text_openflags
|
---|
296 | _close = os.close
|
---|
297 | _unlink = os.unlink
|
---|
298 |
|
---|
299 | def __init__(self, dir, pre, suf, bin):
|
---|
300 | if bin: flags = self._bflags
|
---|
301 | else: flags = self._tflags
|
---|
302 |
|
---|
303 | (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
|
---|
304 |
|
---|
305 | def write(self, str):
|
---|
306 | os.write(self.fd, str)
|
---|
307 |
|
---|
308 | def __del__(self):
|
---|
309 | self._close(self.fd)
|
---|
310 | self._unlink(self.name)
|
---|
311 |
|
---|
312 | def do_create(self, dir=None, pre="", suf="", bin=1):
|
---|
313 | if dir is None:
|
---|
314 | dir = tempfile.gettempdir()
|
---|
315 | try:
|
---|
316 | file = self.mkstemped(dir, pre, suf, bin)
|
---|
317 | except:
|
---|
318 | self.failOnException("_mkstemp_inner")
|
---|
319 |
|
---|
320 | self.nameCheck(file.name, dir, pre, suf)
|
---|
321 | return file
|
---|
322 |
|
---|
323 | def test_basic(self):
|
---|
324 | # _mkstemp_inner can create files
|
---|
325 | self.do_create().write("blat")
|
---|
326 | self.do_create(pre="a").write("blat")
|
---|
327 | self.do_create(suf="b").write("blat")
|
---|
328 | self.do_create(pre="a", suf="b").write("blat")
|
---|
329 | self.do_create(pre="aa", suf=".txt").write("blat")
|
---|
330 |
|
---|
331 | def test_basic_many(self):
|
---|
332 | # _mkstemp_inner can create many files (stochastic)
|
---|
333 | extant = range(TEST_FILES)
|
---|
334 | for i in extant:
|
---|
335 | extant[i] = self.do_create(pre="aa")
|
---|
336 |
|
---|
337 | def test_choose_directory(self):
|
---|
338 | # _mkstemp_inner can create files in a user-selected directory
|
---|
339 | dir = tempfile.mkdtemp()
|
---|
340 | try:
|
---|
341 | self.do_create(dir=dir).write("blat")
|
---|
342 | finally:
|
---|
343 | os.rmdir(dir)
|
---|
344 |
|
---|
345 | def test_file_mode(self):
|
---|
346 | # _mkstemp_inner creates files with the proper mode
|
---|
347 | if not has_stat:
|
---|
348 | return # ugh, can't use SkipTest.
|
---|
349 |
|
---|
350 | file = self.do_create()
|
---|
351 | mode = stat.S_IMODE(os.stat(file.name).st_mode)
|
---|
352 | expected = 0600
|
---|
353 | if sys.platform in ('win32', 'os2emx', 'os2knix'):
|
---|
354 | # There's no distinction among 'user', 'group' and 'world';
|
---|
355 | # replicate the 'user' bits.
|
---|
356 | user = expected >> 6
|
---|
357 | expected = user * (1 + 8 + 64)
|
---|
358 | self.assertEqual(mode, expected)
|
---|
359 |
|
---|
360 | def test_noinherit(self):
|
---|
361 | # _mkstemp_inner file handles are not inherited by child processes
|
---|
362 | if not has_spawnl:
|
---|
363 | return # ugh, can't use SkipTest.
|
---|
364 |
|
---|
365 | if support.verbose:
|
---|
366 | v="v"
|
---|
367 | else:
|
---|
368 | v="q"
|
---|
369 |
|
---|
370 | file = self.do_create()
|
---|
371 | fd = "%d" % file.fd
|
---|
372 |
|
---|
373 | try:
|
---|
374 | me = __file__
|
---|
375 | except NameError:
|
---|
376 | me = sys.argv[0]
|
---|
377 |
|
---|
378 | # We have to exec something, so that FD_CLOEXEC will take
|
---|
379 | # effect. The core of this test is therefore in
|
---|
380 | # tf_inherit_check.py, which see.
|
---|
381 | tester = os.path.join(os.path.dirname(os.path.abspath(me)),
|
---|
382 | "tf_inherit_check.py")
|
---|
383 |
|
---|
384 | # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
|
---|
385 | # but an arg with embedded spaces should be decorated with double
|
---|
386 | # quotes on each end
|
---|
387 | if sys.platform in ('win32',):
|
---|
388 | decorated = '"%s"' % sys.executable
|
---|
389 | tester = '"%s"' % tester
|
---|
390 | else:
|
---|
391 | decorated = sys.executable
|
---|
392 |
|
---|
393 | retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
|
---|
394 | self.assertFalse(retval < 0,
|
---|
395 | "child process caught fatal signal %d" % -retval)
|
---|
396 | self.assertFalse(retval > 0, "child process reports failure %d"%retval)
|
---|
397 |
|
---|
398 | def test_textmode(self):
|
---|
399 | # _mkstemp_inner can create files in text mode
|
---|
400 | if not has_textmode:
|
---|
401 | return # ugh, can't use SkipTest.
|
---|
402 |
|
---|
403 | self.do_create(bin=0).write("blat\n")
|
---|
404 | # XXX should test that the file really is a text file
|
---|
405 |
|
---|
406 | def default_mkstemp_inner(self):
|
---|
407 | return tempfile._mkstemp_inner(tempfile.gettempdir(),
|
---|
408 | tempfile.template,
|
---|
409 | '',
|
---|
410 | tempfile._bin_openflags)
|
---|
411 |
|
---|
412 | def test_collision_with_existing_file(self):
|
---|
413 | # _mkstemp_inner tries another name when a file with
|
---|
414 | # the chosen name already exists
|
---|
415 | with _inside_empty_temp_dir(), \
|
---|
416 | _mock_candidate_names('aaa', 'aaa', 'bbb'):
|
---|
417 | (fd1, name1) = self.default_mkstemp_inner()
|
---|
418 | os.close(fd1)
|
---|
419 | self.assertTrue(name1.endswith('aaa'))
|
---|
420 |
|
---|
421 | (fd2, name2) = self.default_mkstemp_inner()
|
---|
422 | os.close(fd2)
|
---|
423 | self.assertTrue(name2.endswith('bbb'))
|
---|
424 |
|
---|
425 | def test_collision_with_existing_directory(self):
|
---|
426 | # _mkstemp_inner tries another name when a directory with
|
---|
427 | # the chosen name already exists
|
---|
428 | with _inside_empty_temp_dir(), \
|
---|
429 | _mock_candidate_names('aaa', 'aaa', 'bbb'):
|
---|
430 | dir = tempfile.mkdtemp()
|
---|
431 | self.assertTrue(dir.endswith('aaa'))
|
---|
432 |
|
---|
433 | (fd, name) = self.default_mkstemp_inner()
|
---|
434 | os.close(fd)
|
---|
435 | self.assertTrue(name.endswith('bbb'))
|
---|
436 |
|
---|
437 | test_classes.append(test__mkstemp_inner)
|
---|
438 |
|
---|
439 |
|
---|
440 | class test_gettempprefix(TC):
|
---|
441 | """Test gettempprefix()."""
|
---|
442 |
|
---|
443 | def test_sane_template(self):
|
---|
444 | # gettempprefix returns a nonempty prefix string
|
---|
445 | p = tempfile.gettempprefix()
|
---|
446 |
|
---|
447 | self.assertIsInstance(p, basestring)
|
---|
448 | self.assertTrue(len(p) > 0)
|
---|
449 |
|
---|
450 | def test_usable_template(self):
|
---|
451 | # gettempprefix returns a usable prefix string
|
---|
452 |
|
---|
453 | # Create a temp directory, avoiding use of the prefix.
|
---|
454 | # Then attempt to create a file whose name is
|
---|
455 | # prefix + 'xxxxxx.xxx' in that directory.
|
---|
456 | p = tempfile.gettempprefix() + "xxxxxx.xxx"
|
---|
457 | d = tempfile.mkdtemp(prefix="")
|
---|
458 | try:
|
---|
459 | p = os.path.join(d, p)
|
---|
460 | try:
|
---|
461 | fd = os.open(p, os.O_RDWR | os.O_CREAT)
|
---|
462 | except:
|
---|
463 | self.failOnException("os.open")
|
---|
464 | os.close(fd)
|
---|
465 | os.unlink(p)
|
---|
466 | finally:
|
---|
467 | os.rmdir(d)
|
---|
468 |
|
---|
469 | test_classes.append(test_gettempprefix)
|
---|
470 |
|
---|
471 |
|
---|
472 | class test_gettempdir(TC):
|
---|
473 | """Test gettempdir()."""
|
---|
474 |
|
---|
475 | def test_directory_exists(self):
|
---|
476 | # gettempdir returns a directory which exists
|
---|
477 |
|
---|
478 | dir = tempfile.gettempdir()
|
---|
479 | self.assertTrue(os.path.isabs(dir) or dir == os.curdir,
|
---|
480 | "%s is not an absolute path" % dir)
|
---|
481 | self.assertTrue(os.path.isdir(dir),
|
---|
482 | "%s is not a directory" % dir)
|
---|
483 |
|
---|
484 | def test_directory_writable(self):
|
---|
485 | # gettempdir returns a directory writable by the user
|
---|
486 |
|
---|
487 | # sneaky: just instantiate a NamedTemporaryFile, which
|
---|
488 | # defaults to writing into the directory returned by
|
---|
489 | # gettempdir.
|
---|
490 | try:
|
---|
491 | file = tempfile.NamedTemporaryFile()
|
---|
492 | file.write("blat")
|
---|
493 | file.close()
|
---|
494 | except:
|
---|
495 | self.failOnException("create file in %s" % tempfile.gettempdir())
|
---|
496 |
|
---|
497 | def test_same_thing(self):
|
---|
498 | # gettempdir always returns the same object
|
---|
499 | a = tempfile.gettempdir()
|
---|
500 | b = tempfile.gettempdir()
|
---|
501 |
|
---|
502 | self.assertTrue(a is b)
|
---|
503 |
|
---|
504 | test_classes.append(test_gettempdir)
|
---|
505 |
|
---|
506 |
|
---|
507 | class test_mkstemp(TC):
|
---|
508 | """Test mkstemp()."""
|
---|
509 |
|
---|
510 | def do_create(self, dir=None, pre="", suf=""):
|
---|
511 | if dir is None:
|
---|
512 | dir = tempfile.gettempdir()
|
---|
513 | try:
|
---|
514 | (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
|
---|
515 | (ndir, nbase) = os.path.split(name)
|
---|
516 | adir = os.path.abspath(dir)
|
---|
517 | self.assertEqual(adir, ndir,
|
---|
518 | "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
|
---|
519 | except:
|
---|
520 | self.failOnException("mkstemp")
|
---|
521 |
|
---|
522 | try:
|
---|
523 | self.nameCheck(name, dir, pre, suf)
|
---|
524 | finally:
|
---|
525 | os.close(fd)
|
---|
526 | os.unlink(name)
|
---|
527 |
|
---|
528 | def test_basic(self):
|
---|
529 | # mkstemp can create files
|
---|
530 | self.do_create()
|
---|
531 | self.do_create(pre="a")
|
---|
532 | self.do_create(suf="b")
|
---|
533 | self.do_create(pre="a", suf="b")
|
---|
534 | self.do_create(pre="aa", suf=".txt")
|
---|
535 | self.do_create(dir=".")
|
---|
536 |
|
---|
537 | def test_choose_directory(self):
|
---|
538 | # mkstemp can create directories in a user-selected directory
|
---|
539 | dir = tempfile.mkdtemp()
|
---|
540 | try:
|
---|
541 | self.do_create(dir=dir)
|
---|
542 | finally:
|
---|
543 | os.rmdir(dir)
|
---|
544 |
|
---|
545 | test_classes.append(test_mkstemp)
|
---|
546 |
|
---|
547 |
|
---|
548 | class test_mkdtemp(TC):
|
---|
549 | """Test mkdtemp()."""
|
---|
550 |
|
---|
551 | def do_create(self, dir=None, pre="", suf=""):
|
---|
552 | if dir is None:
|
---|
553 | dir = tempfile.gettempdir()
|
---|
554 | try:
|
---|
555 | name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
|
---|
556 | except:
|
---|
557 | self.failOnException("mkdtemp")
|
---|
558 |
|
---|
559 | try:
|
---|
560 | self.nameCheck(name, dir, pre, suf)
|
---|
561 | return name
|
---|
562 | except:
|
---|
563 | os.rmdir(name)
|
---|
564 | raise
|
---|
565 |
|
---|
566 | def test_basic(self):
|
---|
567 | # mkdtemp can create directories
|
---|
568 | os.rmdir(self.do_create())
|
---|
569 | os.rmdir(self.do_create(pre="a"))
|
---|
570 | os.rmdir(self.do_create(suf="b"))
|
---|
571 | os.rmdir(self.do_create(pre="a", suf="b"))
|
---|
572 | os.rmdir(self.do_create(pre="aa", suf=".txt"))
|
---|
573 |
|
---|
574 | def test_basic_many(self):
|
---|
575 | # mkdtemp can create many directories (stochastic)
|
---|
576 | extant = range(TEST_FILES)
|
---|
577 | try:
|
---|
578 | for i in extant:
|
---|
579 | extant[i] = self.do_create(pre="aa")
|
---|
580 | finally:
|
---|
581 | for i in extant:
|
---|
582 | if(isinstance(i, basestring)):
|
---|
583 | os.rmdir(i)
|
---|
584 |
|
---|
585 | def test_choose_directory(self):
|
---|
586 | # mkdtemp can create directories in a user-selected directory
|
---|
587 | dir = tempfile.mkdtemp()
|
---|
588 | try:
|
---|
589 | os.rmdir(self.do_create(dir=dir))
|
---|
590 | finally:
|
---|
591 | os.rmdir(dir)
|
---|
592 |
|
---|
593 | def test_mode(self):
|
---|
594 | # mkdtemp creates directories with the proper mode
|
---|
595 | if not has_stat:
|
---|
596 | return # ugh, can't use SkipTest.
|
---|
597 |
|
---|
598 | dir = self.do_create()
|
---|
599 | try:
|
---|
600 | mode = stat.S_IMODE(os.stat(dir).st_mode)
|
---|
601 | mode &= 0777 # Mask off sticky bits inherited from /tmp
|
---|
602 | expected = 0700
|
---|
603 | if sys.platform in ('win32', 'os2emx', 'os2knix'):
|
---|
604 | # There's no distinction among 'user', 'group' and 'world';
|
---|
605 | # replicate the 'user' bits.
|
---|
606 | user = expected >> 6
|
---|
607 | expected = user * (1 + 8 + 64)
|
---|
608 | self.assertEqual(mode, expected)
|
---|
609 | finally:
|
---|
610 | os.rmdir(dir)
|
---|
611 |
|
---|
612 | def test_collision_with_existing_file(self):
|
---|
613 | # mkdtemp tries another name when a file with
|
---|
614 | # the chosen name already exists
|
---|
615 | with _inside_empty_temp_dir(), \
|
---|
616 | _mock_candidate_names('aaa', 'aaa', 'bbb'):
|
---|
617 | file = tempfile.NamedTemporaryFile(delete=False)
|
---|
618 | file.close()
|
---|
619 | self.assertTrue(file.name.endswith('aaa'))
|
---|
620 | dir = tempfile.mkdtemp()
|
---|
621 | self.assertTrue(dir.endswith('bbb'))
|
---|
622 |
|
---|
623 | def test_collision_with_existing_directory(self):
|
---|
624 | # mkdtemp tries another name when a directory with
|
---|
625 | # the chosen name already exists
|
---|
626 | with _inside_empty_temp_dir(), \
|
---|
627 | _mock_candidate_names('aaa', 'aaa', 'bbb'):
|
---|
628 | dir1 = tempfile.mkdtemp()
|
---|
629 | self.assertTrue(dir1.endswith('aaa'))
|
---|
630 | dir2 = tempfile.mkdtemp()
|
---|
631 | self.assertTrue(dir2.endswith('bbb'))
|
---|
632 |
|
---|
633 | test_classes.append(test_mkdtemp)
|
---|
634 |
|
---|
635 |
|
---|
636 | class test_mktemp(TC):
|
---|
637 | """Test mktemp()."""
|
---|
638 |
|
---|
639 | # For safety, all use of mktemp must occur in a private directory.
|
---|
640 | # We must also suppress the RuntimeWarning it generates.
|
---|
641 | def setUp(self):
|
---|
642 | self.dir = tempfile.mkdtemp()
|
---|
643 |
|
---|
644 | def tearDown(self):
|
---|
645 | if self.dir:
|
---|
646 | os.rmdir(self.dir)
|
---|
647 | self.dir = None
|
---|
648 |
|
---|
649 | class mktemped:
|
---|
650 | _unlink = os.unlink
|
---|
651 | _bflags = tempfile._bin_openflags
|
---|
652 |
|
---|
653 | def __init__(self, dir, pre, suf):
|
---|
654 | self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
|
---|
655 | # Create the file. This will raise an exception if it's
|
---|
656 | # mysteriously appeared in the meanwhile.
|
---|
657 | os.close(os.open(self.name, self._bflags, 0600))
|
---|
658 |
|
---|
659 | def __del__(self):
|
---|
660 | self._unlink(self.name)
|
---|
661 |
|
---|
662 | def do_create(self, pre="", suf=""):
|
---|
663 | try:
|
---|
664 | file = self.mktemped(self.dir, pre, suf)
|
---|
665 | except:
|
---|
666 | self.failOnException("mktemp")
|
---|
667 |
|
---|
668 | self.nameCheck(file.name, self.dir, pre, suf)
|
---|
669 | return file
|
---|
670 |
|
---|
671 | def test_basic(self):
|
---|
672 | # mktemp can choose usable file names
|
---|
673 | self.do_create()
|
---|
674 | self.do_create(pre="a")
|
---|
675 | self.do_create(suf="b")
|
---|
676 | self.do_create(pre="a", suf="b")
|
---|
677 | self.do_create(pre="aa", suf=".txt")
|
---|
678 |
|
---|
679 | def test_many(self):
|
---|
680 | # mktemp can choose many usable file names (stochastic)
|
---|
681 | extant = range(TEST_FILES)
|
---|
682 | for i in extant:
|
---|
683 | extant[i] = self.do_create(pre="aa")
|
---|
684 |
|
---|
685 | ## def test_warning(self):
|
---|
686 | ## # mktemp issues a warning when used
|
---|
687 | ## warnings.filterwarnings("error",
|
---|
688 | ## category=RuntimeWarning,
|
---|
689 | ## message="mktemp")
|
---|
690 | ## self.assertRaises(RuntimeWarning,
|
---|
691 | ## tempfile.mktemp, dir=self.dir)
|
---|
692 |
|
---|
693 | test_classes.append(test_mktemp)
|
---|
694 |
|
---|
695 |
|
---|
696 | # We test _TemporaryFileWrapper by testing NamedTemporaryFile.
|
---|
697 |
|
---|
698 |
|
---|
699 | class test_NamedTemporaryFile(TC):
|
---|
700 | """Test NamedTemporaryFile()."""
|
---|
701 |
|
---|
702 | def do_create(self, dir=None, pre="", suf="", delete=True):
|
---|
703 | if dir is None:
|
---|
704 | dir = tempfile.gettempdir()
|
---|
705 | try:
|
---|
706 | file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
|
---|
707 | delete=delete)
|
---|
708 | except:
|
---|
709 | self.failOnException("NamedTemporaryFile")
|
---|
710 |
|
---|
711 | self.nameCheck(file.name, dir, pre, suf)
|
---|
712 | return file
|
---|
713 |
|
---|
714 |
|
---|
715 | def test_basic(self):
|
---|
716 | # NamedTemporaryFile can create files
|
---|
717 | self.do_create()
|
---|
718 | self.do_create(pre="a")
|
---|
719 | self.do_create(suf="b")
|
---|
720 | self.do_create(pre="a", suf="b")
|
---|
721 | self.do_create(pre="aa", suf=".txt")
|
---|
722 |
|
---|
723 | def test_creates_named(self):
|
---|
724 | # NamedTemporaryFile creates files with names
|
---|
725 | f = tempfile.NamedTemporaryFile()
|
---|
726 | self.assertTrue(os.path.exists(f.name),
|
---|
727 | "NamedTemporaryFile %s does not exist" % f.name)
|
---|
728 |
|
---|
729 | def test_del_on_close(self):
|
---|
730 | # A NamedTemporaryFile is deleted when closed
|
---|
731 | dir = tempfile.mkdtemp()
|
---|
732 | try:
|
---|
733 | f = tempfile.NamedTemporaryFile(dir=dir)
|
---|
734 | f.write('blat')
|
---|
735 | f.close()
|
---|
736 | self.assertFalse(os.path.exists(f.name),
|
---|
737 | "NamedTemporaryFile %s exists after close" % f.name)
|
---|
738 | finally:
|
---|
739 | os.rmdir(dir)
|
---|
740 |
|
---|
741 | def test_dis_del_on_close(self):
|
---|
742 | # Tests that delete-on-close can be disabled
|
---|
743 | dir = tempfile.mkdtemp()
|
---|
744 | tmp = None
|
---|
745 | try:
|
---|
746 | f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
|
---|
747 | tmp = f.name
|
---|
748 | f.write('blat')
|
---|
749 | f.close()
|
---|
750 | self.assertTrue(os.path.exists(f.name),
|
---|
751 | "NamedTemporaryFile %s missing after close" % f.name)
|
---|
752 | finally:
|
---|
753 | if tmp is not None:
|
---|
754 | os.unlink(tmp)
|
---|
755 | os.rmdir(dir)
|
---|
756 |
|
---|
757 | def test_multiple_close(self):
|
---|
758 | # A NamedTemporaryFile can be closed many times without error
|
---|
759 | f = tempfile.NamedTemporaryFile()
|
---|
760 | f.write('abc\n')
|
---|
761 | f.close()
|
---|
762 | try:
|
---|
763 | f.close()
|
---|
764 | f.close()
|
---|
765 | except:
|
---|
766 | self.failOnException("close")
|
---|
767 |
|
---|
768 | def test_context_manager(self):
|
---|
769 | # A NamedTemporaryFile can be used as a context manager
|
---|
770 | with tempfile.NamedTemporaryFile() as f:
|
---|
771 | self.assertTrue(os.path.exists(f.name))
|
---|
772 | self.assertFalse(os.path.exists(f.name))
|
---|
773 | def use_closed():
|
---|
774 | with f:
|
---|
775 | pass
|
---|
776 | self.assertRaises(ValueError, use_closed)
|
---|
777 |
|
---|
778 | # How to test the mode and bufsize parameters?
|
---|
779 |
|
---|
780 | test_classes.append(test_NamedTemporaryFile)
|
---|
781 |
|
---|
782 | class test_SpooledTemporaryFile(TC):
|
---|
783 | """Test SpooledTemporaryFile()."""
|
---|
784 |
|
---|
785 | def do_create(self, max_size=0, dir=None, pre="", suf=""):
|
---|
786 | if dir is None:
|
---|
787 | dir = tempfile.gettempdir()
|
---|
788 | try:
|
---|
789 | file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
|
---|
790 | except:
|
---|
791 | self.failOnException("SpooledTemporaryFile")
|
---|
792 |
|
---|
793 | return file
|
---|
794 |
|
---|
795 |
|
---|
796 | def test_basic(self):
|
---|
797 | # SpooledTemporaryFile can create files
|
---|
798 | f = self.do_create()
|
---|
799 | self.assertFalse(f._rolled)
|
---|
800 | f = self.do_create(max_size=100, pre="a", suf=".txt")
|
---|
801 | self.assertFalse(f._rolled)
|
---|
802 |
|
---|
803 | def test_del_on_close(self):
|
---|
804 | # A SpooledTemporaryFile is deleted when closed
|
---|
805 | dir = tempfile.mkdtemp()
|
---|
806 | try:
|
---|
807 | f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
|
---|
808 | self.assertFalse(f._rolled)
|
---|
809 | f.write('blat ' * 5)
|
---|
810 | self.assertTrue(f._rolled)
|
---|
811 | filename = f.name
|
---|
812 | f.close()
|
---|
813 | self.assertFalse(os.path.exists(filename),
|
---|
814 | "SpooledTemporaryFile %s exists after close" % filename)
|
---|
815 | finally:
|
---|
816 | os.rmdir(dir)
|
---|
817 |
|
---|
818 | def test_rewrite_small(self):
|
---|
819 | # A SpooledTemporaryFile can be written to multiple within the max_size
|
---|
820 | f = self.do_create(max_size=30)
|
---|
821 | self.assertFalse(f._rolled)
|
---|
822 | for i in range(5):
|
---|
823 | f.seek(0, 0)
|
---|
824 | f.write('x' * 20)
|
---|
825 | self.assertFalse(f._rolled)
|
---|
826 |
|
---|
827 | def test_write_sequential(self):
|
---|
828 | # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
|
---|
829 | # over afterward
|
---|
830 | f = self.do_create(max_size=30)
|
---|
831 | self.assertFalse(f._rolled)
|
---|
832 | f.write('x' * 20)
|
---|
833 | self.assertFalse(f._rolled)
|
---|
834 | f.write('x' * 10)
|
---|
835 | self.assertFalse(f._rolled)
|
---|
836 | f.write('x')
|
---|
837 | self.assertTrue(f._rolled)
|
---|
838 |
|
---|
839 | def test_writelines(self):
|
---|
840 | # Verify writelines with a SpooledTemporaryFile
|
---|
841 | f = self.do_create()
|
---|
842 | f.writelines((b'x', b'y', b'z'))
|
---|
843 | f.seek(0)
|
---|
844 | buf = f.read()
|
---|
845 | self.assertEqual(buf, b'xyz')
|
---|
846 |
|
---|
847 | def test_writelines_sequential(self):
|
---|
848 | # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
|
---|
849 | # over afterward
|
---|
850 | f = self.do_create(max_size=35)
|
---|
851 | f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
|
---|
852 | self.assertFalse(f._rolled)
|
---|
853 | f.write(b'x')
|
---|
854 | self.assertTrue(f._rolled)
|
---|
855 |
|
---|
856 | def test_xreadlines(self):
|
---|
857 | f = self.do_create(max_size=20)
|
---|
858 | f.write(b'abc\n' * 5)
|
---|
859 | f.seek(0)
|
---|
860 | self.assertFalse(f._rolled)
|
---|
861 | self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5)
|
---|
862 | f.write(b'x\ny')
|
---|
863 | self.assertTrue(f._rolled)
|
---|
864 | f.seek(0)
|
---|
865 | self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y'])
|
---|
866 |
|
---|
867 | def test_sparse(self):
|
---|
868 | # A SpooledTemporaryFile that is written late in the file will extend
|
---|
869 | # when that occurs
|
---|
870 | f = self.do_create(max_size=30)
|
---|
871 | self.assertFalse(f._rolled)
|
---|
872 | f.seek(100, 0)
|
---|
873 | self.assertFalse(f._rolled)
|
---|
874 | f.write('x')
|
---|
875 | self.assertTrue(f._rolled)
|
---|
876 |
|
---|
877 | def test_fileno(self):
|
---|
878 | # A SpooledTemporaryFile should roll over to a real file on fileno()
|
---|
879 | f = self.do_create(max_size=30)
|
---|
880 | self.assertFalse(f._rolled)
|
---|
881 | self.assertTrue(f.fileno() > 0)
|
---|
882 | self.assertTrue(f._rolled)
|
---|
883 |
|
---|
884 | def test_multiple_close_before_rollover(self):
|
---|
885 | # A SpooledTemporaryFile can be closed many times without error
|
---|
886 | f = tempfile.SpooledTemporaryFile()
|
---|
887 | f.write('abc\n')
|
---|
888 | self.assertFalse(f._rolled)
|
---|
889 | f.close()
|
---|
890 | try:
|
---|
891 | f.close()
|
---|
892 | f.close()
|
---|
893 | except:
|
---|
894 | self.failOnException("close")
|
---|
895 |
|
---|
896 | def test_multiple_close_after_rollover(self):
|
---|
897 | # A SpooledTemporaryFile can be closed many times without error
|
---|
898 | f = tempfile.SpooledTemporaryFile(max_size=1)
|
---|
899 | f.write('abc\n')
|
---|
900 | self.assertTrue(f._rolled)
|
---|
901 | f.close()
|
---|
902 | try:
|
---|
903 | f.close()
|
---|
904 | f.close()
|
---|
905 | except:
|
---|
906 | self.failOnException("close")
|
---|
907 |
|
---|
908 | def test_bound_methods(self):
|
---|
909 | # It should be OK to steal a bound method from a SpooledTemporaryFile
|
---|
910 | # and use it independently; when the file rolls over, those bound
|
---|
911 | # methods should continue to function
|
---|
912 | f = self.do_create(max_size=30)
|
---|
913 | read = f.read
|
---|
914 | write = f.write
|
---|
915 | seek = f.seek
|
---|
916 |
|
---|
917 | write("a" * 35)
|
---|
918 | write("b" * 35)
|
---|
919 | seek(0, 0)
|
---|
920 | self.assertTrue(read(70) == 'a'*35 + 'b'*35)
|
---|
921 |
|
---|
922 | def test_properties(self):
|
---|
923 | f = tempfile.SpooledTemporaryFile(max_size=10)
|
---|
924 | f.write(b'x' * 10)
|
---|
925 | self.assertFalse(f._rolled)
|
---|
926 | self.assertEqual(f.mode, 'w+b')
|
---|
927 | self.assertIsNone(f.name)
|
---|
928 | with self.assertRaises(AttributeError):
|
---|
929 | f.newlines
|
---|
930 | with self.assertRaises(AttributeError):
|
---|
931 | f.encoding
|
---|
932 |
|
---|
933 | f.write(b'x')
|
---|
934 | self.assertTrue(f._rolled)
|
---|
935 | self.assertEqual(f.mode, 'w+b')
|
---|
936 | self.assertIsNotNone(f.name)
|
---|
937 | with self.assertRaises(AttributeError):
|
---|
938 | f.newlines
|
---|
939 | with self.assertRaises(AttributeError):
|
---|
940 | f.encoding
|
---|
941 |
|
---|
942 | def test_context_manager_before_rollover(self):
|
---|
943 | # A SpooledTemporaryFile can be used as a context manager
|
---|
944 | with tempfile.SpooledTemporaryFile(max_size=1) as f:
|
---|
945 | self.assertFalse(f._rolled)
|
---|
946 | self.assertFalse(f.closed)
|
---|
947 | self.assertTrue(f.closed)
|
---|
948 | def use_closed():
|
---|
949 | with f:
|
---|
950 | pass
|
---|
951 | self.assertRaises(ValueError, use_closed)
|
---|
952 |
|
---|
953 | def test_context_manager_during_rollover(self):
|
---|
954 | # A SpooledTemporaryFile can be used as a context manager
|
---|
955 | with tempfile.SpooledTemporaryFile(max_size=1) as f:
|
---|
956 | self.assertFalse(f._rolled)
|
---|
957 | f.write('abc\n')
|
---|
958 | f.flush()
|
---|
959 | self.assertTrue(f._rolled)
|
---|
960 | self.assertFalse(f.closed)
|
---|
961 | self.assertTrue(f.closed)
|
---|
962 | def use_closed():
|
---|
963 | with f:
|
---|
964 | pass
|
---|
965 | self.assertRaises(ValueError, use_closed)
|
---|
966 |
|
---|
967 | def test_context_manager_after_rollover(self):
|
---|
968 | # A SpooledTemporaryFile can be used as a context manager
|
---|
969 | f = tempfile.SpooledTemporaryFile(max_size=1)
|
---|
970 | f.write('abc\n')
|
---|
971 | f.flush()
|
---|
972 | self.assertTrue(f._rolled)
|
---|
973 | with f:
|
---|
974 | self.assertFalse(f.closed)
|
---|
975 | self.assertTrue(f.closed)
|
---|
976 | def use_closed():
|
---|
977 | with f:
|
---|
978 | pass
|
---|
979 | self.assertRaises(ValueError, use_closed)
|
---|
980 |
|
---|
981 |
|
---|
982 | test_classes.append(test_SpooledTemporaryFile)
|
---|
983 |
|
---|
984 |
|
---|
985 | class test_TemporaryFile(TC):
|
---|
986 | """Test TemporaryFile()."""
|
---|
987 |
|
---|
988 | def test_basic(self):
|
---|
989 | # TemporaryFile can create files
|
---|
990 | # No point in testing the name params - the file has no name.
|
---|
991 | try:
|
---|
992 | tempfile.TemporaryFile()
|
---|
993 | except:
|
---|
994 | self.failOnException("TemporaryFile")
|
---|
995 |
|
---|
996 | def test_has_no_name(self):
|
---|
997 | # TemporaryFile creates files with no names (on this system)
|
---|
998 | dir = tempfile.mkdtemp()
|
---|
999 | f = tempfile.TemporaryFile(dir=dir)
|
---|
1000 | f.write('blat')
|
---|
1001 |
|
---|
1002 | # Sneaky: because this file has no name, it should not prevent
|
---|
1003 | # us from removing the directory it was created in.
|
---|
1004 | try:
|
---|
1005 | os.rmdir(dir)
|
---|
1006 | except:
|
---|
1007 | ei = sys.exc_info()
|
---|
1008 | # cleanup
|
---|
1009 | f.close()
|
---|
1010 | os.rmdir(dir)
|
---|
1011 | self.failOnException("rmdir", ei)
|
---|
1012 |
|
---|
1013 | def test_multiple_close(self):
|
---|
1014 | # A TemporaryFile can be closed many times without error
|
---|
1015 | f = tempfile.TemporaryFile()
|
---|
1016 | f.write('abc\n')
|
---|
1017 | f.close()
|
---|
1018 | try:
|
---|
1019 | f.close()
|
---|
1020 | f.close()
|
---|
1021 | except:
|
---|
1022 | self.failOnException("close")
|
---|
1023 |
|
---|
1024 | # How to test the mode and bufsize parameters?
|
---|
1025 |
|
---|
1026 |
|
---|
1027 | if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
|
---|
1028 | test_classes.append(test_TemporaryFile)
|
---|
1029 |
|
---|
1030 | def test_main():
|
---|
1031 | support.run_unittest(*test_classes)
|
---|
1032 |
|
---|
1033 | if __name__ == "__main__":
|
---|
1034 | test_main()
|
---|