Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_file.py

    r2 r391  
     1# NOTE: this file tests the new `io` library backported from Python 3.x.
     2# Similar tests for the builtin file object can be found in test_file2k.py.
     3
     4from __future__ import print_function
     5
    16import sys
    27import os
    38import unittest
    4 import itertools
    5 import time
    6 import threading
    79from array import array
    810from weakref import proxy
    911
    10 from test import test_support
    11 from test.test_support import TESTFN, findfile, run_unittest
     12import io
     13import _pyio as pyio
     14
     15from test.test_support import TESTFN, run_unittest
    1216from UserList import UserList
    1317
     
    1620
    1721    def setUp(self):
    18         self.f = open(TESTFN, 'wb')
     22        self.f = self.open(TESTFN, 'wb')
    1923
    2024    def tearDown(self):
     
    2630        # verify weak references
    2731        p = proxy(self.f)
    28         p.write('teststring')
    29         self.assertEquals(self.f.tell(), p.tell())
     32        p.write(b'teststring')
     33        self.assertEqual(self.f.tell(), p.tell())
    3034        self.f.close()
    3135        self.f = None
     
    3539        # verify expected attributes exist
    3640        f = self.f
    37         softspace = f.softspace
    3841        f.name     # merely shouldn't blow up
    3942        f.mode     # ditto
    4043        f.closed   # ditto
    4144
    42         # verify softspace is writable
    43         f.softspace = softspace    # merely shouldn't blow up
    44 
    45         # verify the others aren't
    46         for attr in 'name', 'mode', 'closed':
    47             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
    48 
    4945    def testReadinto(self):
    5046        # verify readinto
    51         self.f.write('12')
     47        self.f.write(b'12')
    5248        self.f.close()
    53         a = array('c', 'x'*10)
    54         self.f = open(TESTFN, 'rb')
     49        a = array('b', b'x'*10)
     50        self.f = self.open(TESTFN, 'rb')
    5551        n = self.f.readinto(a)
    56         self.assertEquals('12', a.tostring()[:n])
     52        self.assertEqual(b'12', a.tostring()[:n])
     53
     54    def testReadinto_text(self):
     55        # verify readinto refuses text files
     56        a = array('b', b'x'*10)
     57        self.f.close()
     58        self.f = self.open(TESTFN, 'r')
     59        if hasattr(self.f, "readinto"):
     60            self.assertRaises(TypeError, self.f.readinto, a)
    5761
    5862    def testWritelinesUserList(self):
    5963        # verify writelines with instance sequence
    60         l = UserList(['1', '2'])
     64        l = UserList([b'1', b'2'])
    6165        self.f.writelines(l)
    6266        self.f.close()
    63         self.f = open(TESTFN, 'rb')
     67        self.f = self.open(TESTFN, 'rb')
    6468        buf = self.f.read()
    65         self.assertEquals(buf, '12')
     69        self.assertEqual(buf, b'12')
    6670
    6771    def testWritelinesIntegers(self):
     
    8286                          [NonString(), NonString()])
    8387
    84     def testRepr(self):
    85         # verify repr works
    86         self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
    87 
    8888    def testErrors(self):
    89         self.f.close()
    90         self.f = open(TESTFN, 'rb')
    9189        f = self.f
    92         self.assertEquals(f.name, TESTFN)
    93         self.assert_(not f.isatty())
    94         self.assert_(not f.closed)
    95 
    96         self.assertRaises(TypeError, f.readinto, "")
     90        self.assertEqual(f.name, TESTFN)
     91        self.assertTrue(not f.isatty())
     92        self.assertTrue(not f.closed)
     93
     94        if hasattr(f, "readinto"):
     95            self.assertRaises((IOError, TypeError), f.readinto, "")
    9796        f.close()
    98         self.assert_(f.closed)
     97        self.assertTrue(f.closed)
    9998
    10099    def testMethods(self):
    101         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
    102                    'readline', 'readlines', 'seek', 'tell', 'truncate',
    103                    'write', 'xreadlines', '__iter__']
    104         if sys.platform.startswith('atheos'):
    105             methods.remove('truncate')
     100        methods = [('fileno', ()),
     101                   ('flush', ()),
     102                   ('isatty', ()),
     103                   ('next', ()),
     104                   ('read', ()),
     105                   ('write', (b"",)),
     106                   ('readline', ()),
     107                   ('readlines', ()),
     108                   ('seek', (0,)),
     109                   ('tell', ()),
     110                   ('write', (b"",)),
     111                   ('writelines', ([],)),
     112                   ('__iter__', ()),
     113                   ]
     114        if not sys.platform.startswith('atheos'):
     115            methods.append(('truncate', ()))
    106116
    107117        # __exit__ should close the file
    108118        self.f.__exit__(None, None, None)
    109         self.assert_(self.f.closed)
    110 
    111         for methodname in methods:
     119        self.assertTrue(self.f.closed)
     120
     121        for methodname, args in methods:
    112122            method = getattr(self.f, methodname)
    113123            # should raise on closed file
    114             self.assertRaises(ValueError, method)
    115         self.assertRaises(ValueError, self.f.writelines, [])
     124            self.assertRaises(ValueError, method, *args)
    116125
    117126        # file is closed, __exit__ shouldn't do anything
    118         self.assertEquals(self.f.__exit__(None, None, None), None)
     127        self.assertEqual(self.f.__exit__(None, None, None), None)
    119128        # it must also return None if an exception was given
    120129        try:
    121             1/0
     130            1 // 0
    122131        except:
    123             self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
     132            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
    124133
    125134    def testReadWhenWriting(self):
    126135        self.assertRaises(IOError, self.f.read)
    127136
    128     def testIssue5677(self):
    129         # Remark: Do not perform more than one test per open file,
    130         # since that does NOT catch the readline error on Windows.
    131         data = 'xxx'
    132         for mode in ['w', 'wb', 'a', 'ab']:
    133             for attr in ['read', 'readline', 'readlines']:
    134                 self.f = open(TESTFN, mode)
    135                 self.f.write(data)
    136                 self.assertRaises(IOError, getattr(self.f, attr))
    137                 self.f.close()
    138 
    139             self.f = open(TESTFN, mode)
    140             self.f.write(data)
    141             self.assertRaises(IOError, lambda: [line for line in self.f])
    142             self.f.close()
    143 
    144             self.f = open(TESTFN, mode)
    145             self.f.write(data)
    146             self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
    147             self.f.close()
    148 
    149         for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
    150             self.f = open(TESTFN, mode)
    151             self.assertRaises(IOError, self.f.write, data)
    152             self.f.close()
    153 
    154             self.f = open(TESTFN, mode)
    155             self.assertRaises(IOError, self.f.writelines, [data, data])
    156             self.f.close()
    157 
    158             self.f = open(TESTFN, mode)
    159             self.assertRaises(IOError, self.f.truncate)
    160             self.f.close()
     137class CAutoFileTests(AutoFileTests):
     138    open = io.open
     139
     140class PyAutoFileTests(AutoFileTests):
     141    open = staticmethod(pyio.open)
     142
    161143
    162144class OtherFileTests(unittest.TestCase):
    163 
    164     def testOpenDir(self):
    165         this_dir = os.path.dirname(__file__)
    166         for mode in (None, "w"):
    167             try:
    168                 if mode:
    169                     f = open(this_dir, mode)
    170                 else:
    171                     f = open(this_dir)
    172             except IOError as e:
    173                 self.assertEqual(e.filename, this_dir)
    174             else:
    175                 self.fail("opening a directory didn't raise an IOError")
    176145
    177146    def testModeStrings(self):
     
    179148        for mode in ("", "aU", "wU+"):
    180149            try:
    181                 f = open(TESTFN, mode)
     150                f = self.open(TESTFN, mode)
    182151            except ValueError:
    183152                pass
     
    185154                f.close()
    186155                self.fail('%r is an invalid file mode' % mode)
    187 
    188         # Some invalid modes fail on Windows, but pass on Unix
    189         # Issue3965: avoid a crash on Windows when filename is unicode
    190         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
    191             try:
    192                 f = open(name, "rr")
    193             except IOError:
    194                 pass
    195             else:
    196                 f.close()
    197 
    198     def testStdin(self):
    199         # This causes the interpreter to exit on OSF1 v5.1.
    200         if sys.platform != 'osf1V5':
    201             self.assertRaises(IOError, sys.stdin.seek, -1)
    202         else:
    203             print >>sys.__stdout__, (
    204                 '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
    205                 ' Test manually.')
    206         self.assertRaises(IOError, sys.stdin.truncate)
    207 
    208     def testUnicodeOpen(self):
    209         # verify repr works for unicode too
    210         f = open(unicode(TESTFN), "w")
    211         self.assert_(repr(f).startswith("<open file u'" + TESTFN))
    212         f.close()
    213         os.unlink(TESTFN)
    214156
    215157    def testBadModeArgument(self):
     
    217159        bad_mode = "qwerty"
    218160        try:
    219             f = open(TESTFN, bad_mode)
    220         except ValueError, msg:
    221             if msg[0] != 0:
     161            f = self.open(TESTFN, bad_mode)
     162        except ValueError as msg:
     163            if msg.args[0] != 0:
    222164                s = str(msg)
    223                 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
     165                if TESTFN in s or bad_mode not in s:
    224166                    self.fail("bad error message for invalid mode: %s" % s)
    225             # if msg[0] == 0, we're probably on Windows where there may be
     167            # if msg.args[0] == 0, we're probably on Windows where there may be
    226168            # no obvious way to discover why open() failed.
    227169        else:
     
    234176        for s in (-1, 0, 1, 512):
    235177            try:
    236                 f = open(TESTFN, 'w', s)
    237                 f.write(str(s))
    238                 f.close()
    239                 f.close()
    240                 f = open(TESTFN, 'r', s)
    241                 d = int(f.read())
    242                 f.close()
    243                 f.close()
    244             except IOError, msg:
     178                f = self.open(TESTFN, 'wb', s)
     179                f.write(str(s).encode("ascii"))
     180                f.close()
     181                f.close()
     182                f = self.open(TESTFN, 'rb', s)
     183                d = int(f.read().decode("ascii"))
     184                f.close()
     185                f.close()
     186            except IOError as msg:
    245187                self.fail('error setting buffer size %d: %s' % (s, str(msg)))
    246             self.assertEquals(d, s)
     188            self.assertEqual(d, s)
    247189
    248190    def testTruncateOnWindows(self):
     191        # SF bug <http://www.python.org/sf/801631>
     192        # "file.truncate fault on windows"
     193
    249194        os.unlink(TESTFN)
    250 
    251         def bug801631():
    252             # SF bug <http://www.python.org/sf/801631>
    253             # "file.truncate fault on windows"
    254             f = open(TESTFN, 'wb')
    255             f.write('12345678901')   # 11 bytes
     195        f = self.open(TESTFN, 'wb')
     196
     197        try:
     198            f.write(b'12345678901')   # 11 bytes
    256199            f.close()
    257200
    258             f = open(TESTFN,'rb+')
     201            f = self.open(TESTFN,'rb+')
    259202            data = f.read(5)
    260             if data != '12345':
     203            if data != b'12345':
    261204                self.fail("Read on file opened for update failed %r" % data)
    262205            if f.tell() != 5:
     
    271214            if size != 5:
    272215                self.fail("File size after ftruncate wrong %d" % size)
    273 
    274         try:
    275             bug801631()
    276216        finally:
     217            f.close()
    277218            os.unlink(TESTFN)
    278219
    279220    def testIteration(self):
    280221        # Test the complex interaction when mixing file-iteration and the
    281         # various read* methods. Ostensibly, the mixture could just be tested
    282         # to work when it should work according to the Python language,
    283         # instead of fail when it should fail according to the current CPython
    284         # implementation.  People don't always program Python the way they
    285         # should, though, and the implemenation might change in subtle ways,
    286         # so we explicitly test for errors, too; the test will just have to
    287         # be updated when the implementation changes.
     222        # various read* methods.
    288223        dataoffset = 16384
    289         filler = "ham\n"
     224        filler = b"ham\n"
    290225        assert not dataoffset % len(filler), \
    291226            "dataoffset must be multiple of len(filler)"
    292227        nchunks = dataoffset // len(filler)
    293228        testlines = [
    294             "spam, spam and eggs\n",
    295             "eggs, spam, ham and spam\n",
    296             "saussages, spam, spam and eggs\n",
    297             "spam, ham, spam and eggs\n",
    298             "spam, spam, spam, spam, spam, ham, spam\n",
    299             "wonderful spaaaaaam.\n"
     229            b"spam, spam and eggs\n",
     230            b"eggs, spam, ham and spam\n",
     231            b"saussages, spam, spam and eggs\n",
     232            b"spam, ham, spam and eggs\n",
     233            b"spam, spam, spam, spam, spam, ham, spam\n",
     234            b"wonderful spaaaaaam.\n"
    300235        ]
    301236        methods = [("readline", ()), ("read", ()), ("readlines", ()),
    302                    ("readinto", (array("c", " "*100),))]
     237                   ("readinto", (array("b", b" "*100),))]
    303238
    304239        try:
    305240            # Prepare the testfile
    306             bag = open(TESTFN, "w")
     241            bag = self.open(TESTFN, "wb")
    307242            bag.write(filler * nchunks)
    308243            bag.writelines(testlines)
     
    310245            # Test for appropriate errors mixing read* and iteration
    311246            for methodname, args in methods:
    312                 f = open(TESTFN)
    313                 if f.next() != filler:
     247                f = self.open(TESTFN, 'rb')
     248                if next(f) != filler:
    314249                    self.fail, "Broken testfile"
    315250                meth = getattr(f, methodname)
    316                 try:
    317                     meth(*args)
    318                 except ValueError:
    319                     pass
    320                 else:
    321                     self.fail("%s%r after next() didn't raise ValueError" %
    322                                      (methodname, args))
     251                meth(*args)  # This simply shouldn't fail
    323252                f.close()
    324253
     
    330259            # exactly on the buffer boundary for any power-of-2 buffersize
    331260            # between 4 and 16384 (inclusive).
    332             f = open(TESTFN)
     261            f = self.open(TESTFN, 'rb')
    333262            for i in range(nchunks):
    334                 f.next()
     263                next(f)
    335264            testline = testlines.pop(0)
    336265            try:
     
    343272                          "failed. Got %r, expected %r" % (line, testline))
    344273            testline = testlines.pop(0)
    345             buf = array("c", "\x00" * len(testline))
     274            buf = array("b", b"\x00" * len(testline))
    346275            try:
    347276                f.readinto(buf)
     
    372301                          "failed. Got %r, expected %r" % (line, testline))
    373302            # Reading after iteration hit EOF shouldn't hurt either
    374             f = open(TESTFN)
     303            f = self.open(TESTFN, 'rb')
    375304            try:
    376305                for line in f:
     
    388317            os.unlink(TESTFN)
    389318
    390 class FileSubclassTests(unittest.TestCase):
    391 
    392     def testExit(self):
    393         # test that exiting with context calls subclass' close
    394         class C(file):
    395             def __init__(self, *args):
    396                 self.subclass_closed = False
    397                 file.__init__(self, *args)
    398             def close(self):
    399                 self.subclass_closed = True
    400                 file.close(self)
    401 
    402         with C(TESTFN, 'w') as f:
    403             pass
    404         self.failUnless(f.subclass_closed)
    405 
    406 
    407 class FileThreadingTests(unittest.TestCase):
    408     # These tests check the ability to call various methods of file objects
    409     # (including close()) concurrently without crashing the Python interpreter.
    410     # See #815646, #595601
    411 
    412     def setUp(self):
    413         self.f = None
    414         self.filename = TESTFN
    415         with open(self.filename, "w") as f:
    416             f.write("\n".join("0123456789"))
    417         self._count_lock = threading.Lock()
    418         self.close_count = 0
    419         self.close_success_count = 0
    420 
    421     def tearDown(self):
    422         if self.f:
    423             try:
    424                 self.f.close()
    425             except (EnvironmentError, ValueError):
    426                 pass
    427         try:
    428             os.remove(self.filename)
    429         except EnvironmentError:
    430             pass
    431 
    432     def _create_file(self):
    433         self.f = open(self.filename, "w+")
    434 
    435     def _close_file(self):
    436         with self._count_lock:
    437             self.close_count += 1
    438         self.f.close()
    439         with self._count_lock:
    440             self.close_success_count += 1
    441 
    442     def _close_and_reopen_file(self):
    443         self._close_file()
    444         # if close raises an exception thats fine, self.f remains valid so
    445         # we don't need to reopen.
    446         self._create_file()
    447 
    448     def _run_workers(self, func, nb_workers, duration=0.2):
    449         with self._count_lock:
    450             self.close_count = 0
    451             self.close_success_count = 0
    452         self.do_continue = True
    453         threads = []
    454         try:
    455             for i in range(nb_workers):
    456                 t = threading.Thread(target=func)
    457                 t.start()
    458                 threads.append(t)
    459             for _ in xrange(100):
    460                 time.sleep(duration/100)
    461                 with self._count_lock:
    462                     if self.close_count-self.close_success_count > nb_workers+1:
    463                         if test_support.verbose:
    464                             print 'Q',
    465                         break
    466             time.sleep(duration)
    467         finally:
    468             self.do_continue = False
    469             for t in threads:
    470                 t.join()
    471 
    472     def _test_close_open_io(self, io_func, nb_workers=5):
    473         def worker():
    474             self._create_file()
    475             funcs = itertools.cycle((
    476                 lambda: io_func(),
    477                 lambda: self._close_and_reopen_file(),
    478             ))
    479             for f in funcs:
    480                 if not self.do_continue:
    481                     break
    482                 try:
    483                     f()
    484                 except (IOError, ValueError):
    485                     pass
    486         self._run_workers(worker, nb_workers)
    487         if test_support.verbose:
    488             # Useful verbose statistics when tuning this test to take
    489             # less time to run but still ensuring that its still useful.
    490             #
    491             # the percent of close calls that raised an error
    492             percent = 100. - 100.*self.close_success_count/self.close_count
    493             print self.close_count, ('%.4f ' % percent),
    494 
    495     def test_close_open(self):
    496         def io_func():
    497             pass
    498         self._test_close_open_io(io_func)
    499 
    500     def test_close_open_flush(self):
    501         def io_func():
    502             self.f.flush()
    503         self._test_close_open_io(io_func)
    504 
    505     def test_close_open_iter(self):
    506         def io_func():
    507             list(iter(self.f))
    508         self._test_close_open_io(io_func)
    509 
    510     def test_close_open_isatty(self):
    511         def io_func():
    512             self.f.isatty()
    513         self._test_close_open_io(io_func)
    514 
    515     def test_close_open_print(self):
    516         def io_func():
    517             print >> self.f, ''
    518         self._test_close_open_io(io_func)
    519 
    520     def test_close_open_read(self):
    521         def io_func():
    522             self.f.read(0)
    523         self._test_close_open_io(io_func)
    524 
    525     def test_close_open_readinto(self):
    526         def io_func():
    527             a = array('c', 'xxxxx')
    528             self.f.readinto(a)
    529         self._test_close_open_io(io_func)
    530 
    531     def test_close_open_readline(self):
    532         def io_func():
    533             self.f.readline()
    534         self._test_close_open_io(io_func)
    535 
    536     def test_close_open_readlines(self):
    537         def io_func():
    538             self.f.readlines()
    539         self._test_close_open_io(io_func)
    540 
    541     def test_close_open_seek(self):
    542         def io_func():
    543             self.f.seek(0, 0)
    544         self._test_close_open_io(io_func)
    545 
    546     def test_close_open_tell(self):
    547         def io_func():
    548             self.f.tell()
    549         self._test_close_open_io(io_func)
    550 
    551     def test_close_open_truncate(self):
    552         def io_func():
    553             self.f.truncate()
    554         self._test_close_open_io(io_func)
    555 
    556     def test_close_open_write(self):
    557         def io_func():
    558             self.f.write('')
    559         self._test_close_open_io(io_func)
    560 
    561     def test_close_open_writelines(self):
    562         def io_func():
    563             self.f.writelines('')
    564         self._test_close_open_io(io_func)
    565 
    566 
    567 class StdoutTests(unittest.TestCase):
    568 
    569     def test_move_stdout_on_write(self):
    570         # Issue 3242: sys.stdout can be replaced (and freed) during a
    571         # print statement; prevent a segfault in this case
    572         save_stdout = sys.stdout
    573 
    574         class File:
    575             def write(self, data):
    576                 if '\n' in data:
    577                     sys.stdout = save_stdout
    578 
    579         try:
    580             sys.stdout = File()
    581             print "some text"
    582         finally:
    583             sys.stdout = save_stdout
    584 
    585     def test_del_stdout_before_print(self):
    586         # Issue 4597: 'print' with no argument wasn't reporting when
    587         # sys.stdout was deleted.
    588         save_stdout = sys.stdout
    589         del sys.stdout
    590         try:
    591             print
    592         except RuntimeError as e:
    593             self.assertEquals(str(e), "lost sys.stdout")
    594         else:
    595             self.fail("Expected RuntimeError")
    596         finally:
    597             sys.stdout = save_stdout
     319class COtherFileTests(OtherFileTests):
     320    open = io.open
     321
     322class PyOtherFileTests(OtherFileTests):
     323    open = staticmethod(pyio.open)
    598324
    599325
     
    602328    # So get rid of it no matter what.
    603329    try:
    604         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
    605             FileThreadingTests, StdoutTests)
     330        run_unittest(CAutoFileTests, PyAutoFileTests,
     331                     COtherFileTests, PyOtherFileTests)
    606332    finally:
    607333        if os.path.exists(TESTFN):
Note: See TracChangeset for help on using the changeset viewer.