Changeset 391 for python/trunk/Lib/test/test_tempfile.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_tempfile.py
r10 r391 1 1 # tempfile.py unit tests. 2 2 import tempfile 3 import errno 4 import io 3 5 import os 6 import signal 7 import shutil 4 8 import sys 5 9 import re 6 import errno7 10 import warnings 11 import contextlib 8 12 9 13 import unittest 10 from test import test_support 14 from test import test_support as support 11 15 12 16 warnings.filterwarnings("ignore", … … 25 29 # TEST_FILES may need to be tweaked for systems depending on the maximum 26 30 # number of files that can be opened at one time (see ulimit -n) 27 if sys.platform == 'mac': 28 TEST_FILES = 32 29 elif sys.platform in ('openbsd3', 'openbsd4'): 31 if sys.platform in ('openbsd3', 'openbsd4'): 30 32 TEST_FILES = 48 31 33 else: … … 60 62 61 63 nbase = nbase[len(pre):len(nbase)-len(suf)] 62 self.assert _(self.str_check.match(nbase),64 self.assertTrue(self.str_check.match(nbase), 63 65 "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/" 64 66 % nbase) … … 89 91 if key[0] != '_' and key not in expected: 90 92 unexp.append(key) 91 self. failUnless(len(unexp) == 0,93 self.assertTrue(len(unexp) == 0, 92 94 "unexpected keys: %s" % unexp) 93 95 … … 114 116 s = r.next() 115 117 self.nameCheck(s, '', '', '') 116 self. failIf(s indict)118 self.assertNotIn(s, dict) 117 119 dict[s] = 1 118 120 … … 130 132 self.failOnException("iteration") 131 133 134 @unittest.skipUnless(hasattr(os, 'fork'), 135 "os.fork is required for this test") 136 def test_process_awareness(self): 137 # ensure that the random source differs between 138 # child and parent. 139 read_fd, write_fd = os.pipe() 140 pid = None 141 try: 142 pid = os.fork() 143 if not pid: 144 os.close(read_fd) 145 os.write(write_fd, next(self.r).encode("ascii")) 146 os.close(write_fd) 147 # bypass the normal exit handlers- leave those to 148 # the parent. 149 os._exit(0) 150 parent_value = next(self.r) 151 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 152 finally: 153 if pid: 154 # best effort to ensure the process can't bleed out 155 # via any bugs above 156 try: 157 os.kill(pid, signal.SIGKILL) 158 except EnvironmentError: 159 pass 160 os.close(read_fd) 161 os.close(write_fd) 162 self.assertNotEqual(child_value, parent_value) 163 164 132 165 test_classes.append(test__RandomNameSequence) 133 166 … … 141 174 cand = tempfile._candidate_tempdir_list() 142 175 143 self. failIf(len(cand) == 0)176 self.assertFalse(len(cand) == 0) 144 177 for c in cand: 145 self.assert_(isinstance(c, basestring), 146 "%s is not a string" % c) 178 self.assertIsInstance(c, basestring) 147 179 148 180 def test_wanted_dirs(self): … … 150 182 151 183 # Make sure the interesting environment variables are all set. 152 with test_support.EnvironmentVarGuard() as env:184 with support.EnvironmentVarGuard() as env: 153 185 for envname in 'TMPDIR', 'TEMP', 'TMP': 154 186 dirname = os.getenv(envname) 155 187 if not dirname: 156 env .set(envname, os.path.abspath(envname))188 env[envname] = os.path.abspath(envname) 157 189 158 190 cand = tempfile._candidate_tempdir_list() … … 161 193 dirname = os.getenv(envname) 162 194 if not dirname: raise ValueError 163 self.assert _(dirname incand)195 self.assertIn(dirname, cand) 164 196 165 197 try: … … 168 200 dirname = os.curdir 169 201 170 self.assert _(dirname incand)202 self.assertIn(dirname, cand) 171 203 172 204 # Not practical to try to verify the presence of OS-specific … … 175 207 test_classes.append(test__candidate_tempdir_list) 176 208 177 178 # We test _get_default_tempdir by testing gettempdir. 209 # We test _get_default_tempdir some more by testing gettempdir. 210 211 class TestGetDefaultTempdir(TC): 212 """Test _get_default_tempdir().""" 213 214 def test_no_files_left_behind(self): 215 # use a private empty directory 216 our_temp_directory = tempfile.mkdtemp() 217 try: 218 # force _get_default_tempdir() to consider our empty directory 219 def our_candidate_list(): 220 return [our_temp_directory] 221 222 with support.swap_attr(tempfile, "_candidate_tempdir_list", 223 our_candidate_list): 224 # verify our directory is empty after _get_default_tempdir() 225 tempfile._get_default_tempdir() 226 self.assertEqual(os.listdir(our_temp_directory), []) 227 228 def raise_OSError(*args, **kwargs): 229 raise OSError(-1) 230 231 with support.swap_attr(io, "open", raise_OSError): 232 # test again with failing io.open() 233 with self.assertRaises(IOError) as cm: 234 tempfile._get_default_tempdir() 235 self.assertEqual(cm.exception.errno, errno.ENOENT) 236 self.assertEqual(os.listdir(our_temp_directory), []) 237 238 open = io.open 239 def bad_writer(*args, **kwargs): 240 fp = open(*args, **kwargs) 241 fp.write = raise_OSError 242 return fp 243 244 with support.swap_attr(io, "open", bad_writer): 245 # test again with failing write() 246 with self.assertRaises(IOError) as cm: 247 tempfile._get_default_tempdir() 248 self.assertEqual(cm.exception.errno, errno.ENOENT) 249 self.assertEqual(os.listdir(our_temp_directory), []) 250 finally: 251 shutil.rmtree(our_temp_directory) 252 253 test_classes.append(TestGetDefaultTempdir) 179 254 180 255 … … 185 260 # _get_candidate_names returns a _RandomNameSequence object 186 261 obj = tempfile._get_candidate_names() 187 self.assert _(isinstance(obj, tempfile._RandomNameSequence))262 self.assertIsInstance(obj, tempfile._RandomNameSequence) 188 263 189 264 def test_same_thing(self): … … 192 267 b = tempfile._get_candidate_names() 193 268 194 self.assert _(a is b)269 self.assertTrue(a is b) 195 270 196 271 test_classes.append(test__get_candidate_names) 272 273 274 @contextlib.contextmanager 275 def _inside_empty_temp_dir(): 276 dir = tempfile.mkdtemp() 277 try: 278 with support.swap_attr(tempfile, 'tempdir', dir): 279 yield 280 finally: 281 support.rmtree(dir) 282 283 284 def _mock_candidate_names(*names): 285 return support.swap_attr(tempfile, 286 '_get_candidate_names', 287 lambda: iter(names)) 197 288 198 289 … … 255 346 # _mkstemp_inner creates files with the proper mode 256 347 if not has_stat: 257 return # ugh, can't use TestSkipped.348 return # ugh, can't use SkipTest. 258 349 259 350 file = self.do_create() 260 351 mode = stat.S_IMODE(os.stat(file.name).st_mode) 261 352 expected = 0600 262 if sys.platform in ('win32', 'os2emx', 'os2knix' , 'mac'):353 if sys.platform in ('win32', 'os2emx', 'os2knix'): 263 354 # There's no distinction among 'user', 'group' and 'world'; 264 355 # replicate the 'user' bits. … … 270 361 # _mkstemp_inner file handles are not inherited by child processes 271 362 if not has_spawnl: 272 return # ugh, can't use TestSkipped.273 274 if test_support.verbose:363 return # ugh, can't use SkipTest. 364 365 if support.verbose: 275 366 v="v" 276 367 else: … … 301 392 302 393 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 303 self. failIf(retval < 0,394 self.assertFalse(retval < 0, 304 395 "child process caught fatal signal %d" % -retval) 305 self. failIf(retval > 0, "child process reports failure %d"%retval)396 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 306 397 307 398 def test_textmode(self): 308 399 # _mkstemp_inner can create files in text mode 309 400 if not has_textmode: 310 return # ugh, can't use TestSkipped.401 return # ugh, can't use SkipTest. 311 402 312 403 self.do_create(bin=0).write("blat\n") 313 404 # XXX should test that the file really is a text file 405 406 def default_mkstemp_inner(self): 407 return tempfile._mkstemp_inner(tempfile.gettempdir(), 408 tempfile.template, 409 '', 410 tempfile._bin_openflags) 411 412 def test_collision_with_existing_file(self): 413 # _mkstemp_inner tries another name when a file with 414 # the chosen name already exists 415 with _inside_empty_temp_dir(), \ 416 _mock_candidate_names('aaa', 'aaa', 'bbb'): 417 (fd1, name1) = self.default_mkstemp_inner() 418 os.close(fd1) 419 self.assertTrue(name1.endswith('aaa')) 420 421 (fd2, name2) = self.default_mkstemp_inner() 422 os.close(fd2) 423 self.assertTrue(name2.endswith('bbb')) 424 425 def test_collision_with_existing_directory(self): 426 # _mkstemp_inner tries another name when a directory with 427 # the chosen name already exists 428 with _inside_empty_temp_dir(), \ 429 _mock_candidate_names('aaa', 'aaa', 'bbb'): 430 dir = tempfile.mkdtemp() 431 self.assertTrue(dir.endswith('aaa')) 432 433 (fd, name) = self.default_mkstemp_inner() 434 os.close(fd) 435 self.assertTrue(name.endswith('bbb')) 314 436 315 437 test_classes.append(test__mkstemp_inner) … … 323 445 p = tempfile.gettempprefix() 324 446 325 self.assert _(isinstance(p, basestring))326 self.assert _(len(p) > 0)447 self.assertIsInstance(p, basestring) 448 self.assertTrue(len(p) > 0) 327 449 328 450 def test_usable_template(self): … … 355 477 356 478 dir = tempfile.gettempdir() 357 self.assert _(os.path.isabs(dir) or dir == os.curdir,479 self.assertTrue(os.path.isabs(dir) or dir == os.curdir, 358 480 "%s is not an absolute path" % dir) 359 self.assert _(os.path.isdir(dir),481 self.assertTrue(os.path.isdir(dir), 360 482 "%s is not a directory" % dir) 361 483 … … 378 500 b = tempfile.gettempdir() 379 501 380 self.assert _(a is b)502 self.assertTrue(a is b) 381 503 382 504 test_classes.append(test_gettempdir) … … 472 594 # mkdtemp creates directories with the proper mode 473 595 if not has_stat: 474 return # ugh, can't use TestSkipped.596 return # ugh, can't use SkipTest. 475 597 476 598 dir = self.do_create() … … 479 601 mode &= 0777 # Mask off sticky bits inherited from /tmp 480 602 expected = 0700 481 if sys.platform in ('win32', 'os2emx', 'os2knix' , 'mac'):603 if sys.platform in ('win32', 'os2emx', 'os2knix'): 482 604 # There's no distinction among 'user', 'group' and 'world'; 483 605 # replicate the 'user' bits. … … 487 609 finally: 488 610 os.rmdir(dir) 611 612 def test_collision_with_existing_file(self): 613 # mkdtemp tries another name when a file with 614 # the chosen name already exists 615 with _inside_empty_temp_dir(), \ 616 _mock_candidate_names('aaa', 'aaa', 'bbb'): 617 file = tempfile.NamedTemporaryFile(delete=False) 618 file.close() 619 self.assertTrue(file.name.endswith('aaa')) 620 dir = tempfile.mkdtemp() 621 self.assertTrue(dir.endswith('bbb')) 622 623 def test_collision_with_existing_directory(self): 624 # mkdtemp tries another name when a directory with 625 # the chosen name already exists 626 with _inside_empty_temp_dir(), \ 627 _mock_candidate_names('aaa', 'aaa', 'bbb'): 628 dir1 = tempfile.mkdtemp() 629 self.assertTrue(dir1.endswith('aaa')) 630 dir2 = tempfile.mkdtemp() 631 self.assertTrue(dir2.endswith('bbb')) 489 632 490 633 test_classes.append(test_mkdtemp) … … 581 724 # NamedTemporaryFile creates files with names 582 725 f = tempfile.NamedTemporaryFile() 583 self. failUnless(os.path.exists(f.name),726 self.assertTrue(os.path.exists(f.name), 584 727 "NamedTemporaryFile %s does not exist" % f.name) 585 728 … … 591 734 f.write('blat') 592 735 f.close() 593 self. failIf(os.path.exists(f.name),736 self.assertFalse(os.path.exists(f.name), 594 737 "NamedTemporaryFile %s exists after close" % f.name) 595 738 finally: … … 605 748 f.write('blat') 606 749 f.close() 607 self. failUnless(os.path.exists(f.name),750 self.assertTrue(os.path.exists(f.name), 608 751 "NamedTemporaryFile %s missing after close" % f.name) 609 752 finally: … … 626 769 # A NamedTemporaryFile can be used as a context manager 627 770 with tempfile.NamedTemporaryFile() as f: 628 self. failUnless(os.path.exists(f.name))629 self. failIf(os.path.exists(f.name))771 self.assertTrue(os.path.exists(f.name)) 772 self.assertFalse(os.path.exists(f.name)) 630 773 def use_closed(): 631 774 with f: 632 775 pass 633 self. failUnlessRaises(ValueError, use_closed)776 self.assertRaises(ValueError, use_closed) 634 777 635 778 # How to test the mode and bufsize parameters? … … 654 797 # SpooledTemporaryFile can create files 655 798 f = self.do_create() 656 self. failIf(f._rolled)799 self.assertFalse(f._rolled) 657 800 f = self.do_create(max_size=100, pre="a", suf=".txt") 658 self. failIf(f._rolled)801 self.assertFalse(f._rolled) 659 802 660 803 def test_del_on_close(self): … … 663 806 try: 664 807 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 665 self. failIf(f._rolled)808 self.assertFalse(f._rolled) 666 809 f.write('blat ' * 5) 667 self. failUnless(f._rolled)810 self.assertTrue(f._rolled) 668 811 filename = f.name 669 812 f.close() 670 self. failIf(os.path.exists(filename),813 self.assertFalse(os.path.exists(filename), 671 814 "SpooledTemporaryFile %s exists after close" % filename) 672 815 finally: … … 676 819 # A SpooledTemporaryFile can be written to multiple within the max_size 677 820 f = self.do_create(max_size=30) 678 self. failIf(f._rolled)821 self.assertFalse(f._rolled) 679 822 for i in range(5): 680 823 f.seek(0, 0) 681 824 f.write('x' * 20) 682 self. failIf(f._rolled)825 self.assertFalse(f._rolled) 683 826 684 827 def test_write_sequential(self): … … 686 829 # over afterward 687 830 f = self.do_create(max_size=30) 688 self. failIf(f._rolled)831 self.assertFalse(f._rolled) 689 832 f.write('x' * 20) 690 self. failIf(f._rolled)833 self.assertFalse(f._rolled) 691 834 f.write('x' * 10) 692 self. failIf(f._rolled)835 self.assertFalse(f._rolled) 693 836 f.write('x') 694 self.failUnless(f._rolled) 837 self.assertTrue(f._rolled) 838 839 def test_writelines(self): 840 # Verify writelines with a SpooledTemporaryFile 841 f = self.do_create() 842 f.writelines((b'x', b'y', b'z')) 843 f.seek(0) 844 buf = f.read() 845 self.assertEqual(buf, b'xyz') 846 847 def test_writelines_sequential(self): 848 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 849 # over afterward 850 f = self.do_create(max_size=35) 851 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 852 self.assertFalse(f._rolled) 853 f.write(b'x') 854 self.assertTrue(f._rolled) 855 856 def test_xreadlines(self): 857 f = self.do_create(max_size=20) 858 f.write(b'abc\n' * 5) 859 f.seek(0) 860 self.assertFalse(f._rolled) 861 self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5) 862 f.write(b'x\ny') 863 self.assertTrue(f._rolled) 864 f.seek(0) 865 self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y']) 695 866 696 867 def test_sparse(self): … … 698 869 # when that occurs 699 870 f = self.do_create(max_size=30) 700 self. failIf(f._rolled)871 self.assertFalse(f._rolled) 701 872 f.seek(100, 0) 702 self. failIf(f._rolled)873 self.assertFalse(f._rolled) 703 874 f.write('x') 704 self. failUnless(f._rolled)875 self.assertTrue(f._rolled) 705 876 706 877 def test_fileno(self): 707 878 # A SpooledTemporaryFile should roll over to a real file on fileno() 708 879 f = self.do_create(max_size=30) 709 self. failIf(f._rolled)710 self. failUnless(f.fileno() > 0)711 self. failUnless(f._rolled)880 self.assertFalse(f._rolled) 881 self.assertTrue(f.fileno() > 0) 882 self.assertTrue(f._rolled) 712 883 713 884 def test_multiple_close_before_rollover(self): … … 715 886 f = tempfile.SpooledTemporaryFile() 716 887 f.write('abc\n') 717 self. failIf(f._rolled)888 self.assertFalse(f._rolled) 718 889 f.close() 719 890 try: … … 727 898 f = tempfile.SpooledTemporaryFile(max_size=1) 728 899 f.write('abc\n') 729 self. failUnless(f._rolled)900 self.assertTrue(f._rolled) 730 901 f.close() 731 902 try: … … 747 918 write("b" * 35) 748 919 seek(0, 0) 749 self.failUnless(read(70) == 'a'*35 + 'b'*35) 920 self.assertTrue(read(70) == 'a'*35 + 'b'*35) 921 922 def test_properties(self): 923 f = tempfile.SpooledTemporaryFile(max_size=10) 924 f.write(b'x' * 10) 925 self.assertFalse(f._rolled) 926 self.assertEqual(f.mode, 'w+b') 927 self.assertIsNone(f.name) 928 with self.assertRaises(AttributeError): 929 f.newlines 930 with self.assertRaises(AttributeError): 931 f.encoding 932 933 f.write(b'x') 934 self.assertTrue(f._rolled) 935 self.assertEqual(f.mode, 'w+b') 936 self.assertIsNotNone(f.name) 937 with self.assertRaises(AttributeError): 938 f.newlines 939 with self.assertRaises(AttributeError): 940 f.encoding 750 941 751 942 def test_context_manager_before_rollover(self): 752 943 # A SpooledTemporaryFile can be used as a context manager 753 944 with tempfile.SpooledTemporaryFile(max_size=1) as f: 754 self. failIf(f._rolled)755 self. failIf(f.closed)756 self. failUnless(f.closed)945 self.assertFalse(f._rolled) 946 self.assertFalse(f.closed) 947 self.assertTrue(f.closed) 757 948 def use_closed(): 758 949 with f: 759 950 pass 760 self. failUnlessRaises(ValueError, use_closed)951 self.assertRaises(ValueError, use_closed) 761 952 762 953 def test_context_manager_during_rollover(self): 763 954 # A SpooledTemporaryFile can be used as a context manager 764 955 with tempfile.SpooledTemporaryFile(max_size=1) as f: 765 self. failIf(f._rolled)956 self.assertFalse(f._rolled) 766 957 f.write('abc\n') 767 958 f.flush() 768 self. failUnless(f._rolled)769 self. failIf(f.closed)770 self. failUnless(f.closed)959 self.assertTrue(f._rolled) 960 self.assertFalse(f.closed) 961 self.assertTrue(f.closed) 771 962 def use_closed(): 772 963 with f: 773 964 pass 774 self. failUnlessRaises(ValueError, use_closed)965 self.assertRaises(ValueError, use_closed) 775 966 776 967 def test_context_manager_after_rollover(self): … … 779 970 f.write('abc\n') 780 971 f.flush() 781 self. failUnless(f._rolled)972 self.assertTrue(f._rolled) 782 973 with f: 783 self. failIf(f.closed)784 self. failUnless(f.closed)974 self.assertFalse(f.closed) 975 self.assertTrue(f.closed) 785 976 def use_closed(): 786 977 with f: 787 978 pass 788 self. failUnlessRaises(ValueError, use_closed)979 self.assertRaises(ValueError, use_closed) 789 980 790 981 … … 838 1029 839 1030 def test_main(): 840 test_support.run_unittest(*test_classes)1031 support.run_unittest(*test_classes) 841 1032 842 1033 if __name__ == "__main__":
Note:
See TracChangeset
for help on using the changeset viewer.