Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_tempfile.py

    r10 r391  
    11# tempfile.py unit tests.
    22import tempfile
     3import errno
     4import io
    35import os
     6import signal
     7import shutil
    48import sys
    59import re
    6 import errno
    710import warnings
     11import contextlib
    812
    913import unittest
    10 from test import test_support
     14from test import test_support as support
    1115
    1216warnings.filterwarnings("ignore",
     
    2529# TEST_FILES may need to be tweaked for systems depending on the maximum
    2630# number of files that can be opened at one time (see ulimit -n)
    27 if sys.platform == 'mac':
    28     TEST_FILES = 32
    29 elif sys.platform in ('openbsd3', 'openbsd4'):
     31if sys.platform in ('openbsd3', 'openbsd4'):
    3032    TEST_FILES = 48
    3133else:
     
    6062
    6163        nbase = nbase[len(pre):len(nbase)-len(suf)]
    62         self.assert_(self.str_check.match(nbase),
     64        self.assertTrue(self.str_check.match(nbase),
    6365                     "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
    6466                     % nbase)
     
    8991            if key[0] != '_' and key not in expected:
    9092                unexp.append(key)
    91         self.failUnless(len(unexp) == 0,
     93        self.assertTrue(len(unexp) == 0,
    9294                        "unexpected keys: %s" % unexp)
    9395
     
    114116            s = r.next()
    115117            self.nameCheck(s, '', '', '')
    116             self.failIf(s in dict)
     118            self.assertNotIn(s, dict)
    117119            dict[s] = 1
    118120
     
    130132            self.failOnException("iteration")
    131133
     134    @unittest.skipUnless(hasattr(os, 'fork'),
     135        "os.fork is required for this test")
     136    def test_process_awareness(self):
     137        # ensure that the random source differs between
     138        # child and parent.
     139        read_fd, write_fd = os.pipe()
     140        pid = None
     141        try:
     142            pid = os.fork()
     143            if not pid:
     144                os.close(read_fd)
     145                os.write(write_fd, next(self.r).encode("ascii"))
     146                os.close(write_fd)
     147                # bypass the normal exit handlers- leave those to
     148                # the parent.
     149                os._exit(0)
     150            parent_value = next(self.r)
     151            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
     152        finally:
     153            if pid:
     154                # best effort to ensure the process can't bleed out
     155                # via any bugs above
     156                try:
     157                    os.kill(pid, signal.SIGKILL)
     158                except EnvironmentError:
     159                    pass
     160            os.close(read_fd)
     161            os.close(write_fd)
     162        self.assertNotEqual(child_value, parent_value)
     163
     164
    132165test_classes.append(test__RandomNameSequence)
    133166
     
    141174        cand = tempfile._candidate_tempdir_list()
    142175
    143         self.failIf(len(cand) == 0)
     176        self.assertFalse(len(cand) == 0)
    144177        for c in cand:
    145             self.assert_(isinstance(c, basestring),
    146                          "%s is not a string" % c)
     178            self.assertIsInstance(c, basestring)
    147179
    148180    def test_wanted_dirs(self):
     
    150182
    151183        # Make sure the interesting environment variables are all set.
    152         with test_support.EnvironmentVarGuard() as env:
     184        with support.EnvironmentVarGuard() as env:
    153185            for envname in 'TMPDIR', 'TEMP', 'TMP':
    154186                dirname = os.getenv(envname)
    155187                if not dirname:
    156                     env.set(envname, os.path.abspath(envname))
     188                    env[envname] = os.path.abspath(envname)
    157189
    158190            cand = tempfile._candidate_tempdir_list()
     
    161193                dirname = os.getenv(envname)
    162194                if not dirname: raise ValueError
    163                 self.assert_(dirname in cand)
     195                self.assertIn(dirname, cand)
    164196
    165197            try:
     
    168200                dirname = os.curdir
    169201
    170             self.assert_(dirname in cand)
     202            self.assertIn(dirname, cand)
    171203
    172204            # Not practical to try to verify the presence of OS-specific
     
    175207test_classes.append(test__candidate_tempdir_list)
    176208
    177 
    178 # We test _get_default_tempdir by testing gettempdir.
     209# We test _get_default_tempdir some more by testing gettempdir.
     210
     211class TestGetDefaultTempdir(TC):
     212    """Test _get_default_tempdir()."""
     213
     214    def test_no_files_left_behind(self):
     215        # use a private empty directory
     216        our_temp_directory = tempfile.mkdtemp()
     217        try:
     218            # force _get_default_tempdir() to consider our empty directory
     219            def our_candidate_list():
     220                return [our_temp_directory]
     221
     222            with support.swap_attr(tempfile, "_candidate_tempdir_list",
     223                                   our_candidate_list):
     224                # verify our directory is empty after _get_default_tempdir()
     225                tempfile._get_default_tempdir()
     226                self.assertEqual(os.listdir(our_temp_directory), [])
     227
     228                def raise_OSError(*args, **kwargs):
     229                    raise OSError(-1)
     230
     231                with support.swap_attr(io, "open", raise_OSError):
     232                    # test again with failing io.open()
     233                    with self.assertRaises(IOError) as cm:
     234                        tempfile._get_default_tempdir()
     235                    self.assertEqual(cm.exception.errno, errno.ENOENT)
     236                    self.assertEqual(os.listdir(our_temp_directory), [])
     237
     238                open = io.open
     239                def bad_writer(*args, **kwargs):
     240                    fp = open(*args, **kwargs)
     241                    fp.write = raise_OSError
     242                    return fp
     243
     244                with support.swap_attr(io, "open", bad_writer):
     245                    # test again with failing write()
     246                    with self.assertRaises(IOError) as cm:
     247                        tempfile._get_default_tempdir()
     248                    self.assertEqual(cm.exception.errno, errno.ENOENT)
     249                    self.assertEqual(os.listdir(our_temp_directory), [])
     250        finally:
     251            shutil.rmtree(our_temp_directory)
     252
     253test_classes.append(TestGetDefaultTempdir)
    179254
    180255
     
    185260        # _get_candidate_names returns a _RandomNameSequence object
    186261        obj = tempfile._get_candidate_names()
    187         self.assert_(isinstance(obj, tempfile._RandomNameSequence))
     262        self.assertIsInstance(obj, tempfile._RandomNameSequence)
    188263
    189264    def test_same_thing(self):
     
    192267        b = tempfile._get_candidate_names()
    193268
    194         self.assert_(a is b)
     269        self.assertTrue(a is b)
    195270
    196271test_classes.append(test__get_candidate_names)
     272
     273
     274@contextlib.contextmanager
     275def _inside_empty_temp_dir():
     276    dir = tempfile.mkdtemp()
     277    try:
     278        with support.swap_attr(tempfile, 'tempdir', dir):
     279            yield
     280    finally:
     281        support.rmtree(dir)
     282
     283
     284def _mock_candidate_names(*names):
     285    return support.swap_attr(tempfile,
     286                             '_get_candidate_names',
     287                             lambda: iter(names))
    197288
    198289
     
    255346        # _mkstemp_inner creates files with the proper mode
    256347        if not has_stat:
    257             return            # ugh, can't use TestSkipped.
     348            return            # ugh, can't use SkipTest.
    258349
    259350        file = self.do_create()
    260351        mode = stat.S_IMODE(os.stat(file.name).st_mode)
    261352        expected = 0600
    262         if sys.platform in ('win32', 'os2emx', 'os2knix', 'mac'):
     353        if sys.platform in ('win32', 'os2emx', 'os2knix'):
    263354            # There's no distinction among 'user', 'group' and 'world';
    264355            # replicate the 'user' bits.
     
    270361        # _mkstemp_inner file handles are not inherited by child processes
    271362        if not has_spawnl:
    272             return            # ugh, can't use TestSkipped.
    273 
    274         if test_support.verbose:
     363            return            # ugh, can't use SkipTest.
     364
     365        if support.verbose:
    275366            v="v"
    276367        else:
     
    301392
    302393        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
    303         self.failIf(retval < 0,
     394        self.assertFalse(retval < 0,
    304395                    "child process caught fatal signal %d" % -retval)
    305         self.failIf(retval > 0, "child process reports failure %d"%retval)
     396        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
    306397
    307398    def test_textmode(self):
    308399        # _mkstemp_inner can create files in text mode
    309400        if not has_textmode:
    310             return            # ugh, can't use TestSkipped.
     401            return            # ugh, can't use SkipTest.
    311402
    312403        self.do_create(bin=0).write("blat\n")
    313404        # XXX should test that the file really is a text file
     405
     406    def default_mkstemp_inner(self):
     407        return tempfile._mkstemp_inner(tempfile.gettempdir(),
     408                                       tempfile.template,
     409                                       '',
     410                                       tempfile._bin_openflags)
     411
     412    def test_collision_with_existing_file(self):
     413        # _mkstemp_inner tries another name when a file with
     414        # the chosen name already exists
     415        with _inside_empty_temp_dir(), \
     416             _mock_candidate_names('aaa', 'aaa', 'bbb'):
     417            (fd1, name1) = self.default_mkstemp_inner()
     418            os.close(fd1)
     419            self.assertTrue(name1.endswith('aaa'))
     420
     421            (fd2, name2) = self.default_mkstemp_inner()
     422            os.close(fd2)
     423            self.assertTrue(name2.endswith('bbb'))
     424
     425    def test_collision_with_existing_directory(self):
     426        # _mkstemp_inner tries another name when a directory with
     427        # the chosen name already exists
     428        with _inside_empty_temp_dir(), \
     429             _mock_candidate_names('aaa', 'aaa', 'bbb'):
     430            dir = tempfile.mkdtemp()
     431            self.assertTrue(dir.endswith('aaa'))
     432
     433            (fd, name) = self.default_mkstemp_inner()
     434            os.close(fd)
     435            self.assertTrue(name.endswith('bbb'))
    314436
    315437test_classes.append(test__mkstemp_inner)
     
    323445        p = tempfile.gettempprefix()
    324446
    325         self.assert_(isinstance(p, basestring))
    326         self.assert_(len(p) > 0)
     447        self.assertIsInstance(p, basestring)
     448        self.assertTrue(len(p) > 0)
    327449
    328450    def test_usable_template(self):
     
    355477
    356478        dir = tempfile.gettempdir()
    357         self.assert_(os.path.isabs(dir) or dir == os.curdir,
     479        self.assertTrue(os.path.isabs(dir) or dir == os.curdir,
    358480                     "%s is not an absolute path" % dir)
    359         self.assert_(os.path.isdir(dir),
     481        self.assertTrue(os.path.isdir(dir),
    360482                     "%s is not a directory" % dir)
    361483
     
    378500        b = tempfile.gettempdir()
    379501
    380         self.assert_(a is b)
     502        self.assertTrue(a is b)
    381503
    382504test_classes.append(test_gettempdir)
     
    472594        # mkdtemp creates directories with the proper mode
    473595        if not has_stat:
    474             return            # ugh, can't use TestSkipped.
     596            return            # ugh, can't use SkipTest.
    475597
    476598        dir = self.do_create()
     
    479601            mode &= 0777 # Mask off sticky bits inherited from /tmp
    480602            expected = 0700
    481             if sys.platform in ('win32', 'os2emx', 'os2knix', 'mac'):
     603            if sys.platform in ('win32', 'os2emx', 'os2knix'):
    482604                # There's no distinction among 'user', 'group' and 'world';
    483605                # replicate the 'user' bits.
     
    487609        finally:
    488610            os.rmdir(dir)
     611
     612    def test_collision_with_existing_file(self):
     613        # mkdtemp tries another name when a file with
     614        # the chosen name already exists
     615        with _inside_empty_temp_dir(), \
     616             _mock_candidate_names('aaa', 'aaa', 'bbb'):
     617            file = tempfile.NamedTemporaryFile(delete=False)
     618            file.close()
     619            self.assertTrue(file.name.endswith('aaa'))
     620            dir = tempfile.mkdtemp()
     621            self.assertTrue(dir.endswith('bbb'))
     622
     623    def test_collision_with_existing_directory(self):
     624        # mkdtemp tries another name when a directory with
     625        # the chosen name already exists
     626        with _inside_empty_temp_dir(), \
     627             _mock_candidate_names('aaa', 'aaa', 'bbb'):
     628            dir1 = tempfile.mkdtemp()
     629            self.assertTrue(dir1.endswith('aaa'))
     630            dir2 = tempfile.mkdtemp()
     631            self.assertTrue(dir2.endswith('bbb'))
    489632
    490633test_classes.append(test_mkdtemp)
     
    581724        # NamedTemporaryFile creates files with names
    582725        f = tempfile.NamedTemporaryFile()
    583         self.failUnless(os.path.exists(f.name),
     726        self.assertTrue(os.path.exists(f.name),
    584727                        "NamedTemporaryFile %s does not exist" % f.name)
    585728
     
    591734            f.write('blat')
    592735            f.close()
    593             self.failIf(os.path.exists(f.name),
     736            self.assertFalse(os.path.exists(f.name),
    594737                        "NamedTemporaryFile %s exists after close" % f.name)
    595738        finally:
     
    605748            f.write('blat')
    606749            f.close()
    607             self.failUnless(os.path.exists(f.name),
     750            self.assertTrue(os.path.exists(f.name),
    608751                        "NamedTemporaryFile %s missing after close" % f.name)
    609752        finally:
     
    626769        # A NamedTemporaryFile can be used as a context manager
    627770        with tempfile.NamedTemporaryFile() as f:
    628             self.failUnless(os.path.exists(f.name))
    629         self.failIf(os.path.exists(f.name))
     771            self.assertTrue(os.path.exists(f.name))
     772        self.assertFalse(os.path.exists(f.name))
    630773        def use_closed():
    631774            with f:
    632775                pass
    633         self.failUnlessRaises(ValueError, use_closed)
     776        self.assertRaises(ValueError, use_closed)
    634777
    635778    # How to test the mode and bufsize parameters?
     
    654797        # SpooledTemporaryFile can create files
    655798        f = self.do_create()
    656         self.failIf(f._rolled)
     799        self.assertFalse(f._rolled)
    657800        f = self.do_create(max_size=100, pre="a", suf=".txt")
    658         self.failIf(f._rolled)
     801        self.assertFalse(f._rolled)
    659802
    660803    def test_del_on_close(self):
     
    663806        try:
    664807            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
    665             self.failIf(f._rolled)
     808            self.assertFalse(f._rolled)
    666809            f.write('blat ' * 5)
    667             self.failUnless(f._rolled)
     810            self.assertTrue(f._rolled)
    668811            filename = f.name
    669812            f.close()
    670             self.failIf(os.path.exists(filename),
     813            self.assertFalse(os.path.exists(filename),
    671814                        "SpooledTemporaryFile %s exists after close" % filename)
    672815        finally:
     
    676819        # A SpooledTemporaryFile can be written to multiple within the max_size
    677820        f = self.do_create(max_size=30)
    678         self.failIf(f._rolled)
     821        self.assertFalse(f._rolled)
    679822        for i in range(5):
    680823            f.seek(0, 0)
    681824            f.write('x' * 20)
    682         self.failIf(f._rolled)
     825        self.assertFalse(f._rolled)
    683826
    684827    def test_write_sequential(self):
     
    686829        # over afterward
    687830        f = self.do_create(max_size=30)
    688         self.failIf(f._rolled)
     831        self.assertFalse(f._rolled)
    689832        f.write('x' * 20)
    690         self.failIf(f._rolled)
     833        self.assertFalse(f._rolled)
    691834        f.write('x' * 10)
    692         self.failIf(f._rolled)
     835        self.assertFalse(f._rolled)
    693836        f.write('x')
    694         self.failUnless(f._rolled)
     837        self.assertTrue(f._rolled)
     838
     839    def test_writelines(self):
     840        # Verify writelines with a SpooledTemporaryFile
     841        f = self.do_create()
     842        f.writelines((b'x', b'y', b'z'))
     843        f.seek(0)
     844        buf = f.read()
     845        self.assertEqual(buf, b'xyz')
     846
     847    def test_writelines_sequential(self):
     848        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
     849        # over afterward
     850        f = self.do_create(max_size=35)
     851        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
     852        self.assertFalse(f._rolled)
     853        f.write(b'x')
     854        self.assertTrue(f._rolled)
     855
     856    def test_xreadlines(self):
     857        f = self.do_create(max_size=20)
     858        f.write(b'abc\n' * 5)
     859        f.seek(0)
     860        self.assertFalse(f._rolled)
     861        self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5)
     862        f.write(b'x\ny')
     863        self.assertTrue(f._rolled)
     864        f.seek(0)
     865        self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y'])
    695866
    696867    def test_sparse(self):
     
    698869        # when that occurs
    699870        f = self.do_create(max_size=30)
    700         self.failIf(f._rolled)
     871        self.assertFalse(f._rolled)
    701872        f.seek(100, 0)
    702         self.failIf(f._rolled)
     873        self.assertFalse(f._rolled)
    703874        f.write('x')
    704         self.failUnless(f._rolled)
     875        self.assertTrue(f._rolled)
    705876
    706877    def test_fileno(self):
    707878        # A SpooledTemporaryFile should roll over to a real file on fileno()
    708879        f = self.do_create(max_size=30)
    709         self.failIf(f._rolled)
    710         self.failUnless(f.fileno() > 0)
    711         self.failUnless(f._rolled)
     880        self.assertFalse(f._rolled)
     881        self.assertTrue(f.fileno() > 0)
     882        self.assertTrue(f._rolled)
    712883
    713884    def test_multiple_close_before_rollover(self):
     
    715886        f = tempfile.SpooledTemporaryFile()
    716887        f.write('abc\n')
    717         self.failIf(f._rolled)
     888        self.assertFalse(f._rolled)
    718889        f.close()
    719890        try:
     
    727898        f = tempfile.SpooledTemporaryFile(max_size=1)
    728899        f.write('abc\n')
    729         self.failUnless(f._rolled)
     900        self.assertTrue(f._rolled)
    730901        f.close()
    731902        try:
     
    747918        write("b" * 35)
    748919        seek(0, 0)
    749         self.failUnless(read(70) == 'a'*35 + 'b'*35)
     920        self.assertTrue(read(70) == 'a'*35 + 'b'*35)
     921
     922    def test_properties(self):
     923        f = tempfile.SpooledTemporaryFile(max_size=10)
     924        f.write(b'x' * 10)
     925        self.assertFalse(f._rolled)
     926        self.assertEqual(f.mode, 'w+b')
     927        self.assertIsNone(f.name)
     928        with self.assertRaises(AttributeError):
     929            f.newlines
     930        with self.assertRaises(AttributeError):
     931            f.encoding
     932
     933        f.write(b'x')
     934        self.assertTrue(f._rolled)
     935        self.assertEqual(f.mode, 'w+b')
     936        self.assertIsNotNone(f.name)
     937        with self.assertRaises(AttributeError):
     938            f.newlines
     939        with self.assertRaises(AttributeError):
     940            f.encoding
    750941
    751942    def test_context_manager_before_rollover(self):
    752943        # A SpooledTemporaryFile can be used as a context manager
    753944        with tempfile.SpooledTemporaryFile(max_size=1) as f:
    754             self.failIf(f._rolled)
    755             self.failIf(f.closed)
    756         self.failUnless(f.closed)
     945            self.assertFalse(f._rolled)
     946            self.assertFalse(f.closed)
     947        self.assertTrue(f.closed)
    757948        def use_closed():
    758949            with f:
    759950                pass
    760         self.failUnlessRaises(ValueError, use_closed)
     951        self.assertRaises(ValueError, use_closed)
    761952
    762953    def test_context_manager_during_rollover(self):
    763954        # A SpooledTemporaryFile can be used as a context manager
    764955        with tempfile.SpooledTemporaryFile(max_size=1) as f:
    765             self.failIf(f._rolled)
     956            self.assertFalse(f._rolled)
    766957            f.write('abc\n')
    767958            f.flush()
    768             self.failUnless(f._rolled)
    769             self.failIf(f.closed)
    770         self.failUnless(f.closed)
     959            self.assertTrue(f._rolled)
     960            self.assertFalse(f.closed)
     961        self.assertTrue(f.closed)
    771962        def use_closed():
    772963            with f:
    773964                pass
    774         self.failUnlessRaises(ValueError, use_closed)
     965        self.assertRaises(ValueError, use_closed)
    775966
    776967    def test_context_manager_after_rollover(self):
     
    779970        f.write('abc\n')
    780971        f.flush()
    781         self.failUnless(f._rolled)
     972        self.assertTrue(f._rolled)
    782973        with f:
    783             self.failIf(f.closed)
    784         self.failUnless(f.closed)
     974            self.assertFalse(f.closed)
     975        self.assertTrue(f.closed)
    785976        def use_closed():
    786977            with f:
    787978                pass
    788         self.failUnlessRaises(ValueError, use_closed)
     979        self.assertRaises(ValueError, use_closed)
    789980
    790981
     
    8381029
    8391030def test_main():
    840     test_support.run_unittest(*test_classes)
     1031    support.run_unittest(*test_classes)
    8411032
    8421033if __name__ == "__main__":
Note: See TracChangeset for help on using the changeset viewer.