Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_io.py

    r2 r391  
    1 """Unit tests for io.py."""
     1"""Unit tests for the io module."""
     2
     3# Tests of io are scattered over the test suite:
     4# * test_bufio - tests file buffering
     5# * test_memoryio - tests BytesIO and StringIO
     6# * test_fileio - tests FileIO
     7# * test_file - tests the file interface
     8# * test_io - tests everything else in the io module
     9# * test_univnewlines - tests universal newline support
     10# * test_largefile - tests operations on a file greater than 2**32 bytes
     11#     (only enabled with -ulargefile)
     12
     13################################################################################
     14# ATTENTION TEST WRITERS!!!
     15################################################################################
     16# When writing tests for io, it's important to test both the C and Python
     17# implementations. This is usually done by writing a base test that refers to
     18# the type it is testing as a attribute. Then it provides custom subclasses to
     19# test both implementations. This file has lots of examples.
     20################################################################################
     21
    222from __future__ import print_function
    323from __future__ import unicode_literals
     
    727import time
    828import array
    9 import threading
    1029import random
    1130import unittest
    12 from itertools import chain, cycle
    13 from test import test_support
     31import weakref
     32import abc
     33import signal
     34import errno
     35from itertools import cycle, count
     36from collections import deque
     37from UserList import UserList
     38from test import test_support as support
     39import contextlib
    1440
    1541import codecs
    16 import io  # The module under test
    17 
    18 
    19 class MockRawIO(io.RawIOBase):
     42import io  # C implementation of io
     43import _pyio as pyio # Python implementation of io
     44try:
     45    import threading
     46except ImportError:
     47    threading = None
     48try:
     49    import fcntl
     50except ImportError:
     51    fcntl = None
     52
     53__metaclass__ = type
     54bytes = support.py3k_bytes
     55
     56def _default_chunk_size():
     57    """Get the default TextIOWrapper chunk size"""
     58    with io.open(__file__, "r", encoding="latin1") as f:
     59        return f._CHUNK_SIZE
     60
     61
     62class MockRawIOWithoutRead:
     63    """A RawIO implementation without read(), so as to exercise the default
     64    RawIO.read() which calls readinto()."""
    2065
    2166    def __init__(self, read_stack=()):
    2267        self._read_stack = list(read_stack)
    2368        self._write_stack = []
     69        self._reads = 0
     70        self._extraneous_reads = 0
     71
     72    def write(self, b):
     73        self._write_stack.append(bytes(b))
     74        return len(b)
     75
     76    def writable(self):
     77        return True
     78
     79    def fileno(self):
     80        return 42
     81
     82    def readable(self):
     83        return True
     84
     85    def seekable(self):
     86        return True
     87
     88    def seek(self, pos, whence):
     89        return 0   # wrong but we gotta return something
     90
     91    def tell(self):
     92        return 0   # same comment as above
     93
     94    def readinto(self, buf):
     95        self._reads += 1
     96        max_len = len(buf)
     97        try:
     98            data = self._read_stack[0]
     99        except IndexError:
     100            self._extraneous_reads += 1
     101            return 0
     102        if data is None:
     103            del self._read_stack[0]
     104            return None
     105        n = len(data)
     106        if len(data) <= max_len:
     107            del self._read_stack[0]
     108            buf[:n] = data
     109            return n
     110        else:
     111            buf[:] = data[:max_len]
     112            self._read_stack[0] = data[max_len:]
     113            return max_len
     114
     115    def truncate(self, pos=None):
     116        return pos
     117
     118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
     119    pass
     120
     121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
     122    pass
     123
     124
     125class MockRawIO(MockRawIOWithoutRead):
    24126
    25127    def read(self, n=None):
     128        self._reads += 1
    26129        try:
    27130            return self._read_stack.pop(0)
    28131        except:
     132            self._extraneous_reads += 1
    29133            return b""
    30134
     135class CMockRawIO(MockRawIO, io.RawIOBase):
     136    pass
     137
     138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
     139    pass
     140
     141
     142class MisbehavedRawIO(MockRawIO):
    31143    def write(self, b):
    32         self._write_stack.append(b[:])
    33         return len(b)
     144        return MockRawIO.write(self, b) * 2
     145
     146    def read(self, n=None):
     147        return MockRawIO.read(self, n) * 2
     148
     149    def seek(self, pos, whence):
     150        return -123
     151
     152    def tell(self):
     153        return -456
     154
     155    def readinto(self, buf):
     156        MockRawIO.readinto(self, buf)
     157        return len(buf) * 5
     158
     159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
     160    pass
     161
     162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
     163    pass
     164
     165
     166class CloseFailureIO(MockRawIO):
     167    closed = 0
     168
     169    def close(self):
     170        if not self.closed:
     171            self.closed = 1
     172            raise IOError
     173
     174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
     175    pass
     176
     177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
     178    pass
     179
     180
     181class MockFileIO:
     182
     183    def __init__(self, data):
     184        self.read_history = []
     185        super(MockFileIO, self).__init__(data)
     186
     187    def read(self, n=None):
     188        res = super(MockFileIO, self).read(n)
     189        self.read_history.append(None if res is None else len(res))
     190        return res
     191
     192    def readinto(self, b):
     193        res = super(MockFileIO, self).readinto(b)
     194        self.read_history.append(res)
     195        return res
     196
     197class CMockFileIO(MockFileIO, io.BytesIO):
     198    pass
     199
     200class PyMockFileIO(MockFileIO, pyio.BytesIO):
     201    pass
     202
     203
     204class MockNonBlockWriterIO:
     205
     206    def __init__(self):
     207        self._write_stack = []
     208        self._blocker_char = None
     209
     210    def pop_written(self):
     211        s = b"".join(self._write_stack)
     212        self._write_stack[:] = []
     213        return s
     214
     215    def block_on(self, char):
     216        """Block when a given char is encountered."""
     217        self._blocker_char = char
     218
     219    def readable(self):
     220        return True
     221
     222    def seekable(self):
     223        return True
    34224
    35225    def writable(self):
    36226        return True
    37227
    38     def fileno(self):
    39         return 42
    40 
    41     def readable(self):
    42         return True
    43 
    44     def seekable(self):
    45         return True
    46 
    47     def seek(self, pos, whence):
    48         pass
    49 
    50     def tell(self):
    51         return 42
    52 
    53 
    54 class MockFileIO(io.BytesIO):
    55 
    56     def __init__(self, data):
    57         self.read_history = []
    58         io.BytesIO.__init__(self, data)
    59 
    60     def read(self, n=None):
    61         res = io.BytesIO.read(self, n)
    62         self.read_history.append(None if res is None else len(res))
    63         return res
    64 
    65 
    66 class MockNonBlockWriterIO(io.RawIOBase):
    67 
    68     def __init__(self, blocking_script):
    69         self._blocking_script = list(blocking_script)
    70         self._write_stack = []
    71 
    72228    def write(self, b):
    73         self._write_stack.append(b[:])
    74         n = self._blocking_script.pop(0)
    75         if (n < 0):
    76             raise io.BlockingIOError(0, "test blocking", -n)
    77         else:
    78             return n
    79 
    80     def writable(self):
    81         return True
     229        b = bytes(b)
     230        n = -1
     231        if self._blocker_char:
     232            try:
     233                n = b.index(self._blocker_char)
     234            except ValueError:
     235                pass
     236            else:
     237                if n > 0:
     238                    # write data up to the first blocker
     239                    self._write_stack.append(b[:n])
     240                    return n
     241                else:
     242                    # cancel blocker and indicate would block
     243                    self._blocker_char = None
     244                    return None
     245        self._write_stack.append(b)
     246        return len(b)
     247
     248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
     249    BlockingIOError = io.BlockingIOError
     250
     251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
     252    BlockingIOError = pyio.BlockingIOError
    82253
    83254
    84255class IOTest(unittest.TestCase):
    85256
     257    def setUp(self):
     258        support.unlink(support.TESTFN)
     259
    86260    def tearDown(self):
    87         test_support.unlink(test_support.TESTFN)
     261        support.unlink(support.TESTFN)
    88262
    89263    def write_ops(self, f):
    90 
    91264        self.assertEqual(f.write(b"blah."), 5)
    92265        f.truncate(0)
     
    157330        self.assertEqual(f.read(2), b"x")
    158331
     332    def test_invalid_operations(self):
     333        # Try writing on a file opened in read mode and vice-versa.
     334        for mode in ("w", "wb"):
     335            with self.open(support.TESTFN, mode) as fp:
     336                self.assertRaises(IOError, fp.read)
     337                self.assertRaises(IOError, fp.readline)
     338        with self.open(support.TESTFN, "rb") as fp:
     339            self.assertRaises(IOError, fp.write, b"blah")
     340            self.assertRaises(IOError, fp.writelines, [b"blah\n"])
     341        with self.open(support.TESTFN, "r") as fp:
     342            self.assertRaises(IOError, fp.write, "blah")
     343            self.assertRaises(IOError, fp.writelines, ["blah\n"])
     344
    159345    def test_raw_file_io(self):
    160         f = io.open(test_support.TESTFN, "wb", buffering=0)
    161         self.assertEqual(f.readable(), False)
    162         self.assertEqual(f.writable(), True)
    163         self.assertEqual(f.seekable(), True)
    164         self.write_ops(f)
    165         f.close()
    166         f = io.open(test_support.TESTFN, "rb", buffering=0)
    167         self.assertEqual(f.readable(), True)
    168         self.assertEqual(f.writable(), False)
    169         self.assertEqual(f.seekable(), True)
    170         self.read_ops(f)
    171         f.close()
     346        with self.open(support.TESTFN, "wb", buffering=0) as f:
     347            self.assertEqual(f.readable(), False)
     348            self.assertEqual(f.writable(), True)
     349            self.assertEqual(f.seekable(), True)
     350            self.write_ops(f)
     351        with self.open(support.TESTFN, "rb", buffering=0) as f:
     352            self.assertEqual(f.readable(), True)
     353            self.assertEqual(f.writable(), False)
     354            self.assertEqual(f.seekable(), True)
     355            self.read_ops(f)
    172356
    173357    def test_buffered_file_io(self):
    174         f = io.open(test_support.TESTFN, "wb")
    175         self.assertEqual(f.readable(), False)
    176         self.assertEqual(f.writable(), True)
    177         self.assertEqual(f.seekable(), True)
    178         self.write_ops(f)
    179         f.close()
    180         f = io.open(test_support.TESTFN, "rb")
    181         self.assertEqual(f.readable(), True)
    182         self.assertEqual(f.writable(), False)
    183         self.assertEqual(f.seekable(), True)
    184         self.read_ops(f, True)
    185         f.close()
     358        with self.open(support.TESTFN, "wb") as f:
     359            self.assertEqual(f.readable(), False)
     360            self.assertEqual(f.writable(), True)
     361            self.assertEqual(f.seekable(), True)
     362            self.write_ops(f)
     363        with self.open(support.TESTFN, "rb") as f:
     364            self.assertEqual(f.readable(), True)
     365            self.assertEqual(f.writable(), False)
     366            self.assertEqual(f.seekable(), True)
     367            self.read_ops(f, True)
    186368
    187369    def test_readline(self):
    188         f = io.open(test_support.TESTFN, "wb")
    189         f.write(b"abc\ndef\nxyzzy\nfoo")
    190         f.close()
    191         f = io.open(test_support.TESTFN, "rb")
    192         self.assertEqual(f.readline(), b"abc\n")
    193         self.assertEqual(f.readline(10), b"def\n")
    194         self.assertEqual(f.readline(2), b"xy")
    195         self.assertEqual(f.readline(4), b"zzy\n")
    196         self.assertEqual(f.readline(), b"foo")
    197         f.close()
     370        with self.open(support.TESTFN, "wb") as f:
     371            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
     372        with self.open(support.TESTFN, "rb") as f:
     373            self.assertEqual(f.readline(), b"abc\n")
     374            self.assertEqual(f.readline(10), b"def\n")
     375            self.assertEqual(f.readline(2), b"xy")
     376            self.assertEqual(f.readline(4), b"zzy\n")
     377            self.assertEqual(f.readline(), b"foo\x00bar\n")
     378            self.assertEqual(f.readline(None), b"another line")
     379            self.assertRaises(TypeError, f.readline, 5.3)
     380        with self.open(support.TESTFN, "r") as f:
     381            self.assertRaises(TypeError, f.readline, 5.3)
    198382
    199383    def test_raw_bytes_io(self):
    200         f = io.BytesIO()
     384        f = self.BytesIO()
    201385        self.write_ops(f)
    202386        data = f.getvalue()
    203387        self.assertEqual(data, b"hello world\n")
    204         f = io.BytesIO(data)
     388        f = self.BytesIO(data)
    205389        self.read_ops(f, True)
    206390
     
    209393        # a long time to build the >2GB file and takes >2GB of disk space
    210394        # therefore the resource must be enabled to run this test.
    211         if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
    212             if not test_support.is_resource_enabled("largefile"):
     395        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
     396            if not support.is_resource_enabled("largefile"):
    213397                print("\nTesting large file ops skipped on %s." % sys.platform,
    214398                      file=sys.stderr)
     
    218402                      file=sys.stderr)
    219403                return
    220         f = io.open(test_support.TESTFN, "w+b", 0)
    221         self.large_file_ops(f)
    222         f.close()
    223         f = io.open(test_support.TESTFN, "w+b")
    224         self.large_file_ops(f)
    225         f.close()
     404        with self.open(support.TESTFN, "w+b", 0) as f:
     405            self.large_file_ops(f)
     406        with self.open(support.TESTFN, "w+b") as f:
     407            self.large_file_ops(f)
    226408
    227409    def test_with_open(self):
    228410        for bufsize in (0, 1, 100):
    229411            f = None
    230             with open(test_support.TESTFN, "wb", bufsize) as f:
     412            with self.open(support.TESTFN, "wb", bufsize) as f:
    231413                f.write(b"xxx")
    232414            self.assertEqual(f.closed, True)
    233415            f = None
    234416            try:
    235                 with open(test_support.TESTFN, "wb", bufsize) as f:
    236                     1/0
     417                with self.open(support.TESTFN, "wb", bufsize) as f:
     418                    1 // 0
    237419            except ZeroDivisionError:
    238420                self.assertEqual(f.closed, True)
    239421            else:
    240                 self.fail("1/0 didn't raise an exception")
     422                self.fail("1 // 0 didn't raise an exception")
    241423
    242424    # issue 5008
    243425    def test_append_mode_tell(self):
    244         with io.open(test_support.TESTFN, "wb") as f:
     426        with self.open(support.TESTFN, "wb") as f:
    245427            f.write(b"xxx")
    246         with io.open(test_support.TESTFN, "ab", buffering=0) as f:
     428        with self.open(support.TESTFN, "ab", buffering=0) as f:
    247429            self.assertEqual(f.tell(), 3)
    248         with io.open(test_support.TESTFN, "ab") as f:
     430        with self.open(support.TESTFN, "ab") as f:
    249431            self.assertEqual(f.tell(), 3)
    250         with io.open(test_support.TESTFN, "a") as f:
    251             self.assert_(f.tell() > 0)
     432        with self.open(support.TESTFN, "a") as f:
     433            self.assertTrue(f.tell() > 0)
    252434
    253435    def test_destructor(self):
    254436        record = []
    255         class MyFileIO(io.FileIO):
     437        class MyFileIO(self.FileIO):
    256438            def __del__(self):
    257439                record.append(1)
    258                 io.FileIO.__del__(self)
     440                try:
     441                    f = super(MyFileIO, self).__del__
     442                except AttributeError:
     443                    pass
     444                else:
     445                    f()
    259446            def close(self):
    260447                record.append(2)
    261                 io.FileIO.close(self)
     448                super(MyFileIO, self).close()
    262449            def flush(self):
    263450                record.append(3)
    264                 io.FileIO.flush(self)
    265         f = MyFileIO(test_support.TESTFN, "w")
    266         f.write("xxx")
     451                super(MyFileIO, self).flush()
     452        f = MyFileIO(support.TESTFN, "wb")
     453        f.write(b"xxx")
    267454        del f
     455        support.gc_collect()
    268456        self.assertEqual(record, [1, 2, 3])
     457        with self.open(support.TESTFN, "rb") as f:
     458            self.assertEqual(f.read(), b"xxx")
     459
     460    def _check_base_destructor(self, base):
     461        record = []
     462        class MyIO(base):
     463            def __init__(self):
     464                # This exercises the availability of attributes on object
     465                # destruction.
     466                # (in the C version, close() is called by the tp_dealloc
     467                # function, not by __del__)
     468                self.on_del = 1
     469                self.on_close = 2
     470                self.on_flush = 3
     471            def __del__(self):
     472                record.append(self.on_del)
     473                try:
     474                    f = super(MyIO, self).__del__
     475                except AttributeError:
     476                    pass
     477                else:
     478                    f()
     479            def close(self):
     480                record.append(self.on_close)
     481                super(MyIO, self).close()
     482            def flush(self):
     483                record.append(self.on_flush)
     484                super(MyIO, self).flush()
     485        f = MyIO()
     486        del f
     487        support.gc_collect()
     488        self.assertEqual(record, [1, 2, 3])
     489
     490    def test_IOBase_destructor(self):
     491        self._check_base_destructor(self.IOBase)
     492
     493    def test_RawIOBase_destructor(self):
     494        self._check_base_destructor(self.RawIOBase)
     495
     496    def test_BufferedIOBase_destructor(self):
     497        self._check_base_destructor(self.BufferedIOBase)
     498
     499    def test_TextIOBase_destructor(self):
     500        self._check_base_destructor(self.TextIOBase)
    269501
    270502    def test_close_flushes(self):
    271         f = io.open(test_support.TESTFN, "wb")
    272         f.write(b"xxx")
    273         f.close()
    274         f = io.open(test_support.TESTFN, "rb")
    275         self.assertEqual(f.read(), b"xxx")
    276         f.close()
    277 
    278     def XXXtest_array_writes(self):
    279         # XXX memory view not available yet
    280         a = array.array('i', range(10))
    281         n = len(memoryview(a))
    282         f = io.open(test_support.TESTFN, "wb", 0)
    283         self.assertEqual(f.write(a), n)
    284         f.close()
    285         f = io.open(test_support.TESTFN, "wb")
    286         self.assertEqual(f.write(a), n)
    287         f.close()
     503        with self.open(support.TESTFN, "wb") as f:
     504            f.write(b"xxx")
     505        with self.open(support.TESTFN, "rb") as f:
     506            self.assertEqual(f.read(), b"xxx")
     507
     508    def test_array_writes(self):
     509        a = array.array(b'i', range(10))
     510        n = len(a.tostring())
     511        with self.open(support.TESTFN, "wb", 0) as f:
     512            self.assertEqual(f.write(a), n)
     513        with self.open(support.TESTFN, "wb") as f:
     514            self.assertEqual(f.write(a), n)
    288515
    289516    def test_closefd(self):
    290         self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
     517        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
    291518                          closefd=False)
    292519
    293     def testReadClosed(self):
    294         with io.open(test_support.TESTFN, "w") as f:
     520    def test_read_closed(self):
     521        with self.open(support.TESTFN, "w") as f:
    295522            f.write("egg\n")
    296         with io.open(test_support.TESTFN, "r") as f:
    297             file = io.open(f.fileno(), "r", closefd=False)
     523        with self.open(support.TESTFN, "r") as f:
     524            file = self.open(f.fileno(), "r", closefd=False)
    298525            self.assertEqual(file.read(), "egg\n")
    299526            file.seek(0)
     
    303530    def test_no_closefd_with_filename(self):
    304531        # can't use closefd in combination with a file name
    305         self.assertRaises(ValueError,
    306                           io.open, test_support.TESTFN, "r", closefd=False)
     532        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
    307533
    308534    def test_closefd_attr(self):
    309         with io.open(test_support.TESTFN, "wb") as f:
     535        with self.open(support.TESTFN, "wb") as f:
    310536            f.write(b"egg\n")
    311         with io.open(test_support.TESTFN, "r") as f:
     537        with self.open(support.TESTFN, "r") as f:
    312538            self.assertEqual(f.buffer.raw.closefd, True)
    313             file = io.open(f.fileno(), "r", closefd=False)
     539            file = self.open(f.fileno(), "r", closefd=False)
    314540            self.assertEqual(file.buffer.raw.closefd, False)
    315541
    316 
    317 class MemorySeekTestMixin:
    318 
    319     def testInit(self):
    320         buf = self.buftype("1234567890")
    321         bytesIo = self.ioclass(buf)
    322 
    323     def testRead(self):
    324         buf = self.buftype("1234567890")
    325         bytesIo = self.ioclass(buf)
    326 
    327         self.assertEquals(buf[:1], bytesIo.read(1))
    328         self.assertEquals(buf[1:5], bytesIo.read(4))
    329         self.assertEquals(buf[5:], bytesIo.read(900))
    330         self.assertEquals(self.EOF, bytesIo.read())
    331 
    332     def testReadNoArgs(self):
    333         buf = self.buftype("1234567890")
    334         bytesIo = self.ioclass(buf)
    335 
    336         self.assertEquals(buf, bytesIo.read())
    337         self.assertEquals(self.EOF, bytesIo.read())
    338 
    339     def testSeek(self):
    340         buf = self.buftype("1234567890")
    341         bytesIo = self.ioclass(buf)
    342 
    343         bytesIo.read(5)
    344         bytesIo.seek(0)
    345         self.assertEquals(buf, bytesIo.read())
    346 
    347         bytesIo.seek(3)
    348         self.assertEquals(buf[3:], bytesIo.read())
    349         self.assertRaises(TypeError, bytesIo.seek, 0.0)
    350 
    351     def testTell(self):
    352         buf = self.buftype("1234567890")
    353         bytesIo = self.ioclass(buf)
    354 
    355         self.assertEquals(0, bytesIo.tell())
    356         bytesIo.seek(5)
    357         self.assertEquals(5, bytesIo.tell())
    358         bytesIo.seek(10000)
    359         self.assertEquals(10000, bytesIo.tell())
    360 
    361 
    362 class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
    363     @staticmethod
    364     def buftype(s):
    365         return s.encode("utf-8")
    366     ioclass = io.BytesIO
    367     EOF = b""
    368 
    369 
    370 class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
    371     buftype = str
    372     ioclass = io.StringIO
    373     EOF = ""
    374 
    375 
    376 class BufferedReaderTest(unittest.TestCase):
    377 
    378     def testRead(self):
    379         rawio = MockRawIO((b"abc", b"d", b"efg"))
    380         bufio = io.BufferedReader(rawio)
    381 
    382         self.assertEquals(b"abcdef", bufio.read(6))
    383 
    384     def testBuffering(self):
     542    def test_garbage_collection(self):
     543        # FileIO objects are collected, and collecting them flushes
     544        # all data to disk.
     545        f = self.FileIO(support.TESTFN, "wb")
     546        f.write(b"abcxxx")
     547        f.f = f
     548        wr = weakref.ref(f)
     549        del f
     550        support.gc_collect()
     551        self.assertTrue(wr() is None, wr)
     552        with self.open(support.TESTFN, "rb") as f:
     553            self.assertEqual(f.read(), b"abcxxx")
     554
     555    def test_unbounded_file(self):
     556        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
     557        zero = "/dev/zero"
     558        if not os.path.exists(zero):
     559            self.skipTest("{0} does not exist".format(zero))
     560        if sys.maxsize > 0x7FFFFFFF:
     561            self.skipTest("test can only run in a 32-bit address space")
     562        if support.real_max_memuse < support._2G:
     563            self.skipTest("test requires at least 2GB of memory")
     564        with self.open(zero, "rb", buffering=0) as f:
     565            self.assertRaises(OverflowError, f.read)
     566        with self.open(zero, "rb") as f:
     567            self.assertRaises(OverflowError, f.read)
     568        with self.open(zero, "r") as f:
     569            self.assertRaises(OverflowError, f.read)
     570
     571    def test_flush_error_on_close(self):
     572        f = self.open(support.TESTFN, "wb", buffering=0)
     573        def bad_flush():
     574            raise IOError()
     575        f.flush = bad_flush
     576        self.assertRaises(IOError, f.close) # exception not swallowed
     577        self.assertTrue(f.closed)
     578
     579    def test_multi_close(self):
     580        f = self.open(support.TESTFN, "wb", buffering=0)
     581        f.close()
     582        f.close()
     583        f.close()
     584        self.assertRaises(ValueError, f.flush)
     585
     586    def test_RawIOBase_read(self):
     587        # Exercise the default RawIOBase.read() implementation (which calls
     588        # readinto() internally).
     589        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
     590        self.assertEqual(rawio.read(2), b"ab")
     591        self.assertEqual(rawio.read(2), b"c")
     592        self.assertEqual(rawio.read(2), b"d")
     593        self.assertEqual(rawio.read(2), None)
     594        self.assertEqual(rawio.read(2), b"ef")
     595        self.assertEqual(rawio.read(2), b"g")
     596        self.assertEqual(rawio.read(2), None)
     597        self.assertEqual(rawio.read(2), b"")
     598
     599    def test_fileio_closefd(self):
     600        # Issue #4841
     601        with self.open(__file__, 'rb') as f1, \
     602             self.open(__file__, 'rb') as f2:
     603            fileio = self.FileIO(f1.fileno(), closefd=False)
     604            # .__init__() must not close f1
     605            fileio.__init__(f2.fileno(), closefd=False)
     606            f1.readline()
     607            # .close() must not close f2
     608            fileio.close()
     609            f2.readline()
     610
     611
     612class CIOTest(IOTest):
     613
     614    def test_IOBase_finalize(self):
     615        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
     616        # class which inherits IOBase and an object of this class are caught
     617        # in a reference cycle and close() is already in the method cache.
     618        class MyIO(self.IOBase):
     619            def close(self):
     620                pass
     621
     622        # create an instance to populate the method cache
     623        MyIO()
     624        obj = MyIO()
     625        obj.obj = obj
     626        wr = weakref.ref(obj)
     627        del MyIO
     628        del obj
     629        support.gc_collect()
     630        self.assertTrue(wr() is None, wr)
     631
     632class PyIOTest(IOTest):
     633    test_array_writes = unittest.skip(
     634        "len(array.array) returns number of elements rather than bytelength"
     635    )(IOTest.test_array_writes)
     636
     637
     638class CommonBufferedTests:
     639    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
     640
     641    def test_detach(self):
     642        raw = self.MockRawIO()
     643        buf = self.tp(raw)
     644        self.assertIs(buf.detach(), raw)
     645        self.assertRaises(ValueError, buf.detach)
     646
     647    def test_fileno(self):
     648        rawio = self.MockRawIO()
     649        bufio = self.tp(rawio)
     650
     651        self.assertEqual(42, bufio.fileno())
     652
     653    def test_no_fileno(self):
     654        # XXX will we always have fileno() function? If so, kill
     655        # this test. Else, write it.
     656        pass
     657
     658    def test_invalid_args(self):
     659        rawio = self.MockRawIO()
     660        bufio = self.tp(rawio)
     661        # Invalid whence
     662        self.assertRaises(ValueError, bufio.seek, 0, -1)
     663        self.assertRaises(ValueError, bufio.seek, 0, 3)
     664
     665    def test_override_destructor(self):
     666        tp = self.tp
     667        record = []
     668        class MyBufferedIO(tp):
     669            def __del__(self):
     670                record.append(1)
     671                try:
     672                    f = super(MyBufferedIO, self).__del__
     673                except AttributeError:
     674                    pass
     675                else:
     676                    f()
     677            def close(self):
     678                record.append(2)
     679                super(MyBufferedIO, self).close()
     680            def flush(self):
     681                record.append(3)
     682                super(MyBufferedIO, self).flush()
     683        rawio = self.MockRawIO()
     684        bufio = MyBufferedIO(rawio)
     685        writable = bufio.writable()
     686        del bufio
     687        support.gc_collect()
     688        if writable:
     689            self.assertEqual(record, [1, 2, 3])
     690        else:
     691            self.assertEqual(record, [1, 2])
     692
     693    def test_context_manager(self):
     694        # Test usability as a context manager
     695        rawio = self.MockRawIO()
     696        bufio = self.tp(rawio)
     697        def _with():
     698            with bufio:
     699                pass
     700        _with()
     701        # bufio should now be closed, and using it a second time should raise
     702        # a ValueError.
     703        self.assertRaises(ValueError, _with)
     704
     705    def test_error_through_destructor(self):
     706        # Test that the exception state is not modified by a destructor,
     707        # even if close() fails.
     708        rawio = self.CloseFailureIO()
     709        def f():
     710            self.tp(rawio).xyzzy
     711        with support.captured_output("stderr") as s:
     712            self.assertRaises(AttributeError, f)
     713        s = s.getvalue().strip()
     714        if s:
     715            # The destructor *may* have printed an unraisable error, check it
     716            self.assertEqual(len(s.splitlines()), 1)
     717            self.assertTrue(s.startswith("Exception IOError: "), s)
     718            self.assertTrue(s.endswith(" ignored"), s)
     719
     720    def test_repr(self):
     721        raw = self.MockRawIO()
     722        b = self.tp(raw)
     723        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
     724        self.assertEqual(repr(b), "<%s>" % clsname)
     725        raw.name = "dummy"
     726        self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
     727        raw.name = b"dummy"
     728        self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
     729
     730    def test_flush_error_on_close(self):
     731        raw = self.MockRawIO()
     732        def bad_flush():
     733            raise IOError()
     734        raw.flush = bad_flush
     735        b = self.tp(raw)
     736        self.assertRaises(IOError, b.close) # exception not swallowed
     737        self.assertTrue(b.closed)
     738
     739    def test_close_error_on_close(self):
     740        raw = self.MockRawIO()
     741        def bad_flush():
     742            raise IOError('flush')
     743        def bad_close():
     744            raise IOError('close')
     745        raw.close = bad_close
     746        b = self.tp(raw)
     747        b.flush = bad_flush
     748        with self.assertRaises(IOError) as err: # exception not swallowed
     749            b.close()
     750        self.assertEqual(err.exception.args, ('close',))
     751        self.assertFalse(b.closed)
     752
     753    def test_multi_close(self):
     754        raw = self.MockRawIO()
     755        b = self.tp(raw)
     756        b.close()
     757        b.close()
     758        b.close()
     759        self.assertRaises(ValueError, b.flush)
     760
     761    def test_readonly_attributes(self):
     762        raw = self.MockRawIO()
     763        buf = self.tp(raw)
     764        x = self.MockRawIO()
     765        with self.assertRaises((AttributeError, TypeError)):
     766            buf.raw = x
     767
     768
     769class SizeofTest:
     770
     771    @support.cpython_only
     772    def test_sizeof(self):
     773        bufsize1 = 4096
     774        bufsize2 = 8192
     775        rawio = self.MockRawIO()
     776        bufio = self.tp(rawio, buffer_size=bufsize1)
     777        size = sys.getsizeof(bufio) - bufsize1
     778        rawio = self.MockRawIO()
     779        bufio = self.tp(rawio, buffer_size=bufsize2)
     780        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
     781
     782
     783class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
     784    read_mode = "rb"
     785
     786    def test_constructor(self):
     787        rawio = self.MockRawIO([b"abc"])
     788        bufio = self.tp(rawio)
     789        bufio.__init__(rawio)
     790        bufio.__init__(rawio, buffer_size=1024)
     791        bufio.__init__(rawio, buffer_size=16)
     792        self.assertEqual(b"abc", bufio.read())
     793        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
     794        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
     795        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
     796        rawio = self.MockRawIO([b"abc"])
     797        bufio.__init__(rawio)
     798        self.assertEqual(b"abc", bufio.read())
     799
     800    def test_read(self):
     801        for arg in (None, 7):
     802            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
     803            bufio = self.tp(rawio)
     804            self.assertEqual(b"abcdefg", bufio.read(arg))
     805        # Invalid args
     806        self.assertRaises(ValueError, bufio.read, -2)
     807
     808    def test_read1(self):
     809        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
     810        bufio = self.tp(rawio)
     811        self.assertEqual(b"a", bufio.read(1))
     812        self.assertEqual(b"b", bufio.read1(1))
     813        self.assertEqual(rawio._reads, 1)
     814        self.assertEqual(b"c", bufio.read1(100))
     815        self.assertEqual(rawio._reads, 1)
     816        self.assertEqual(b"d", bufio.read1(100))
     817        self.assertEqual(rawio._reads, 2)
     818        self.assertEqual(b"efg", bufio.read1(100))
     819        self.assertEqual(rawio._reads, 3)
     820        self.assertEqual(b"", bufio.read1(100))
     821        self.assertEqual(rawio._reads, 4)
     822        # Invalid args
     823        self.assertRaises(ValueError, bufio.read1, -1)
     824
     825    def test_readinto(self):
     826        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
     827        bufio = self.tp(rawio)
     828        b = bytearray(2)
     829        self.assertEqual(bufio.readinto(b), 2)
     830        self.assertEqual(b, b"ab")
     831        self.assertEqual(bufio.readinto(b), 2)
     832        self.assertEqual(b, b"cd")
     833        self.assertEqual(bufio.readinto(b), 2)
     834        self.assertEqual(b, b"ef")
     835        self.assertEqual(bufio.readinto(b), 1)
     836        self.assertEqual(b, b"gf")
     837        self.assertEqual(bufio.readinto(b), 0)
     838        self.assertEqual(b, b"gf")
     839
     840    def test_readlines(self):
     841        def bufio():
     842            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
     843            return self.tp(rawio)
     844        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
     845        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
     846        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
     847
     848    def test_buffering(self):
    385849        data = b"abcdefghi"
    386850        dlen = len(data)
     
    393857
    394858        for bufsize, buf_read_sizes, raw_read_sizes in tests:
    395             rawio = MockFileIO(data)
    396             bufio = io.BufferedReader(rawio, buffer_size=bufsize)
     859            rawio = self.MockFileIO(data)
     860            bufio = self.tp(rawio, buffer_size=bufsize)
    397861            pos = 0
    398862            for nbytes in buf_read_sizes:
    399                 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
     863                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
    400864                pos += nbytes
    401             self.assertEquals(rawio.read_history, raw_read_sizes)
    402 
    403     def testReadNonBlocking(self):
     865            # this is mildly implementation-dependent
     866            self.assertEqual(rawio.read_history, raw_read_sizes)
     867
     868    def test_read_non_blocking(self):
    404869        # Inject some None's in there to simulate EWOULDBLOCK
    405         rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
    406         bufio = io.BufferedReader(rawio)
    407 
    408         self.assertEquals(b"abcd", bufio.read(6))
    409         self.assertEquals(b"e", bufio.read(1))
    410         self.assertEquals(b"fg", bufio.read())
    411         self.assert_(None is bufio.read())
    412         self.assertEquals(b"", bufio.read())
    413 
    414     def testReadToEof(self):
    415         rawio = MockRawIO((b"abc", b"d", b"efg"))
    416         bufio = io.BufferedReader(rawio)
    417 
    418         self.assertEquals(b"abcdefg", bufio.read(9000))
    419 
    420     def testReadNoArgs(self):
    421         rawio = MockRawIO((b"abc", b"d", b"efg"))
    422         bufio = io.BufferedReader(rawio)
    423 
    424         self.assertEquals(b"abcdefg", bufio.read())
    425 
    426     def testFileno(self):
    427         rawio = MockRawIO((b"abc", b"d", b"efg"))
    428         bufio = io.BufferedReader(rawio)
    429 
    430         self.assertEquals(42, bufio.fileno())
    431 
    432     def testFilenoNoFileno(self):
    433         # XXX will we always have fileno() function? If so, kill
    434         # this test. Else, write it.
    435         pass
    436 
    437     def testThreads(self):
     870        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
     871        bufio = self.tp(rawio)
     872        self.assertEqual(b"abcd", bufio.read(6))
     873        self.assertEqual(b"e", bufio.read(1))
     874        self.assertEqual(b"fg", bufio.read())
     875        self.assertEqual(b"", bufio.peek(1))
     876        self.assertIsNone(bufio.read())
     877        self.assertEqual(b"", bufio.read())
     878
     879        rawio = self.MockRawIO((b"a", None, None))
     880        self.assertEqual(b"a", rawio.readall())
     881        self.assertIsNone(rawio.readall())
     882
     883    def test_read_past_eof(self):
     884        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
     885        bufio = self.tp(rawio)
     886
     887        self.assertEqual(b"abcdefg", bufio.read(9000))
     888
     889    def test_read_all(self):
     890        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
     891        bufio = self.tp(rawio)
     892
     893        self.assertEqual(b"abcdefg", bufio.read())
     894
     895    @unittest.skipUnless(threading, 'Threading required for this test.')
     896    @support.requires_resource('cpu')
     897    def test_threads(self):
    438898        try:
    439899            # Write out many bytes with exactly the same number of 0's,
     
    441901            # doesn't duplicate or forget contents.
    442902            N = 1000
    443             l = range(256) * N
     903            l = list(range(256)) * N
    444904            random.shuffle(l)
    445905            s = bytes(bytearray(l))
    446             with io.open(test_support.TESTFN, "wb") as f:
     906            with self.open(support.TESTFN, "wb") as f:
    447907                f.write(s)
    448             with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
    449                 bufio = io.BufferedReader(raw, 8)
     908            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
     909                bufio = self.tp(raw, 8)
    450910                errors = []
    451911                results = []
     
    475935                    self.assertEqual(s.count(c), N)
    476936        finally:
    477             test_support.unlink(test_support.TESTFN)
    478 
    479 
    480 
    481 class BufferedWriterTest(unittest.TestCase):
    482 
    483     def testWrite(self):
     937            support.unlink(support.TESTFN)
     938
     939    def test_misbehaved_io(self):
     940        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
     941        bufio = self.tp(rawio)
     942        self.assertRaises(IOError, bufio.seek, 0)
     943        self.assertRaises(IOError, bufio.tell)
     944
     945    def test_no_extraneous_read(self):
     946        # Issue #9550; when the raw IO object has satisfied the read request,
     947        # we should not issue any additional reads, otherwise it may block
     948        # (e.g. socket).
     949        bufsize = 16
     950        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
     951            rawio = self.MockRawIO([b"x" * n])
     952            bufio = self.tp(rawio, bufsize)
     953            self.assertEqual(bufio.read(n), b"x" * n)
     954            # Simple case: one raw read is enough to satisfy the request.
     955            self.assertEqual(rawio._extraneous_reads, 0,
     956                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
     957            # A more complex case where two raw reads are needed to satisfy
     958            # the request.
     959            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
     960            bufio = self.tp(rawio, bufsize)
     961            self.assertEqual(bufio.read(n), b"x" * n)
     962            self.assertEqual(rawio._extraneous_reads, 0,
     963                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
     964
     965
     966class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
     967    tp = io.BufferedReader
     968
     969    def test_constructor(self):
     970        BufferedReaderTest.test_constructor(self)
     971        # The allocation can succeed on 32-bit builds, e.g. with more
     972        # than 2GB RAM and a 64-bit kernel.
     973        if sys.maxsize > 0x7FFFFFFF:
     974            rawio = self.MockRawIO()
     975            bufio = self.tp(rawio)
     976            self.assertRaises((OverflowError, MemoryError, ValueError),
     977                bufio.__init__, rawio, sys.maxsize)
     978
     979    def test_initialization(self):
     980        rawio = self.MockRawIO([b"abc"])
     981        bufio = self.tp(rawio)
     982        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
     983        self.assertRaises(ValueError, bufio.read)
     984        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
     985        self.assertRaises(ValueError, bufio.read)
     986        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
     987        self.assertRaises(ValueError, bufio.read)
     988
     989    def test_misbehaved_io_read(self):
     990        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
     991        bufio = self.tp(rawio)
     992        # _pyio.BufferedReader seems to implement reading different, so that
     993        # checking this is not so easy.
     994        self.assertRaises(IOError, bufio.read, 10)
     995
     996    def test_garbage_collection(self):
     997        # C BufferedReader objects are collected.
     998        # The Python version has __del__, so it ends into gc.garbage instead
     999        rawio = self.FileIO(support.TESTFN, "w+b")
     1000        f = self.tp(rawio)
     1001        f.f = f
     1002        wr = weakref.ref(f)
     1003        del f
     1004        support.gc_collect()
     1005        self.assertTrue(wr() is None, wr)
     1006
     1007    def test_args_error(self):
     1008        # Issue #17275
     1009        with self.assertRaisesRegexp(TypeError, "BufferedReader"):
     1010            self.tp(io.BytesIO(), 1024, 1024, 1024)
     1011
     1012
     1013class PyBufferedReaderTest(BufferedReaderTest):
     1014    tp = pyio.BufferedReader
     1015
     1016
     1017class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
     1018    write_mode = "wb"
     1019
     1020    def test_constructor(self):
     1021        rawio = self.MockRawIO()
     1022        bufio = self.tp(rawio)
     1023        bufio.__init__(rawio)
     1024        bufio.__init__(rawio, buffer_size=1024)
     1025        bufio.__init__(rawio, buffer_size=16)
     1026        self.assertEqual(3, bufio.write(b"abc"))
     1027        bufio.flush()
     1028        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
     1029        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
     1030        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
     1031        bufio.__init__(rawio)
     1032        self.assertEqual(3, bufio.write(b"ghi"))
     1033        bufio.flush()
     1034        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
     1035
     1036    def test_detach_flush(self):
     1037        raw = self.MockRawIO()
     1038        buf = self.tp(raw)
     1039        buf.write(b"howdy!")
     1040        self.assertFalse(raw._write_stack)
     1041        buf.detach()
     1042        self.assertEqual(raw._write_stack, [b"howdy!"])
     1043
     1044    def test_write(self):
    4841045        # Write to the buffered IO but don't overflow the buffer.
    485         writer = MockRawIO()
    486         bufio = io.BufferedWriter(writer, 8)
    487 
     1046        writer = self.MockRawIO()
     1047        bufio = self.tp(writer, 8)
    4881048        bufio.write(b"abc")
    489 
    4901049        self.assertFalse(writer._write_stack)
    4911050
    492     def testWriteOverflow(self):
    493         writer = MockRawIO()
    494         bufio = io.BufferedWriter(writer, 8)
    495 
    496         bufio.write(b"abc")
    497         bufio.write(b"defghijkl")
    498 
    499         self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
    500 
    501     def testWriteNonBlocking(self):
    502         raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
    503         bufio = io.BufferedWriter(raw, 8, 16)
    504 
    505         bufio.write(b"asdf")
    506         bufio.write(b"asdfa")
    507         self.assertEquals(b"asdfasdfa", raw._write_stack[0])
    508 
    509         bufio.write(b"asdfasdfasdf")
    510         self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
    511         bufio.write(b"asdfasdfasdf")
    512         self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
    513         self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
    514 
    515         bufio.write(b"asdfasdfasdf")
    516 
    517         # XXX I don't like this test. It relies too heavily on how the
    518         # algorithm actually works, which we might change. Refactor
    519         # later.
    520 
    521     def testFileno(self):
    522         rawio = MockRawIO((b"abc", b"d", b"efg"))
    523         bufio = io.BufferedWriter(rawio)
    524 
    525         self.assertEquals(42, bufio.fileno())
    526 
    527     def testFlush(self):
    528         writer = MockRawIO()
    529         bufio = io.BufferedWriter(writer, 8)
    530 
     1051    def test_write_overflow(self):
     1052        writer = self.MockRawIO()
     1053        bufio = self.tp(writer, 8)
     1054        contents = b"abcdefghijklmnop"
     1055        for n in range(0, len(contents), 3):
     1056            bufio.write(contents[n:n+3])
     1057        flushed = b"".join(writer._write_stack)
     1058        # At least (total - 8) bytes were implicitly flushed, perhaps more
     1059        # depending on the implementation.
     1060        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
     1061
     1062    def check_writes(self, intermediate_func):
     1063        # Lots of writes, test the flushed output is as expected.
     1064        contents = bytes(range(256)) * 1000
     1065        n = 0
     1066        writer = self.MockRawIO()
     1067        bufio = self.tp(writer, 13)
     1068        # Generator of write sizes: repeat each N 15 times then proceed to N+1
     1069        def gen_sizes():
     1070            for size in count(1):
     1071                for i in range(15):
     1072                    yield size
     1073        sizes = gen_sizes()
     1074        while n < len(contents):
     1075            size = min(next(sizes), len(contents) - n)
     1076            self.assertEqual(bufio.write(contents[n:n+size]), size)
     1077            intermediate_func(bufio)
     1078            n += size
     1079        bufio.flush()
     1080        self.assertEqual(contents,
     1081            b"".join(writer._write_stack))
     1082
     1083    def test_writes(self):
     1084        self.check_writes(lambda bufio: None)
     1085
     1086    def test_writes_and_flushes(self):
     1087        self.check_writes(lambda bufio: bufio.flush())
     1088
     1089    def test_writes_and_seeks(self):
     1090        def _seekabs(bufio):
     1091            pos = bufio.tell()
     1092            bufio.seek(pos + 1, 0)
     1093            bufio.seek(pos - 1, 0)
     1094            bufio.seek(pos, 0)
     1095        self.check_writes(_seekabs)
     1096        def _seekrel(bufio):
     1097            pos = bufio.seek(0, 1)
     1098            bufio.seek(+1, 1)
     1099            bufio.seek(-1, 1)
     1100            bufio.seek(pos, 0)
     1101        self.check_writes(_seekrel)
     1102
     1103    def test_writes_and_truncates(self):
     1104        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
     1105
     1106    def test_write_non_blocking(self):
     1107        raw = self.MockNonBlockWriterIO()
     1108        bufio = self.tp(raw, 8)
     1109
     1110        self.assertEqual(bufio.write(b"abcd"), 4)
     1111        self.assertEqual(bufio.write(b"efghi"), 5)
     1112        # 1 byte will be written, the rest will be buffered
     1113        raw.block_on(b"k")
     1114        self.assertEqual(bufio.write(b"jklmn"), 5)
     1115
     1116        # 8 bytes will be written, 8 will be buffered and the rest will be lost
     1117        raw.block_on(b"0")
     1118        try:
     1119            bufio.write(b"opqrwxyz0123456789")
     1120        except self.BlockingIOError as e:
     1121            written = e.characters_written
     1122        else:
     1123            self.fail("BlockingIOError should have been raised")
     1124        self.assertEqual(written, 16)
     1125        self.assertEqual(raw.pop_written(),
     1126            b"abcdefghijklmnopqrwxyz")
     1127
     1128        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
     1129        s = raw.pop_written()
     1130        # Previously buffered bytes were flushed
     1131        self.assertTrue(s.startswith(b"01234567A"), s)
     1132
     1133    def test_write_and_rewind(self):
     1134        raw = io.BytesIO()
     1135        bufio = self.tp(raw, 4)
     1136        self.assertEqual(bufio.write(b"abcdef"), 6)
     1137        self.assertEqual(bufio.tell(), 6)
     1138        bufio.seek(0, 0)
     1139        self.assertEqual(bufio.write(b"XY"), 2)
     1140        bufio.seek(6, 0)
     1141        self.assertEqual(raw.getvalue(), b"XYcdef")
     1142        self.assertEqual(bufio.write(b"123456"), 6)
     1143        bufio.flush()
     1144        self.assertEqual(raw.getvalue(), b"XYcdef123456")
     1145
     1146    def test_flush(self):
     1147        writer = self.MockRawIO()
     1148        bufio = self.tp(writer, 8)
    5311149        bufio.write(b"abc")
    5321150        bufio.flush()
    533 
    534         self.assertEquals(b"abc", writer._write_stack[0])
    535 
    536     def testThreads(self):
    537         # BufferedWriter should not raise exceptions or crash
    538         # when called from multiple threads.
     1151        self.assertEqual(b"abc", writer._write_stack[0])
     1152
     1153    def test_writelines(self):
     1154        l = [b'ab', b'cd', b'ef']
     1155        writer = self.MockRawIO()
     1156        bufio = self.tp(writer, 8)
     1157        bufio.writelines(l)
     1158        bufio.flush()
     1159        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
     1160
     1161    def test_writelines_userlist(self):
     1162        l = UserList([b'ab', b'cd', b'ef'])
     1163        writer = self.MockRawIO()
     1164        bufio = self.tp(writer, 8)
     1165        bufio.writelines(l)
     1166        bufio.flush()
     1167        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
     1168
     1169    def test_writelines_error(self):
     1170        writer = self.MockRawIO()
     1171        bufio = self.tp(writer, 8)
     1172        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
     1173        self.assertRaises(TypeError, bufio.writelines, None)
     1174
     1175    def test_destructor(self):
     1176        writer = self.MockRawIO()
     1177        bufio = self.tp(writer, 8)
     1178        bufio.write(b"abc")
     1179        del bufio
     1180        support.gc_collect()
     1181        self.assertEqual(b"abc", writer._write_stack[0])
     1182
     1183    def test_truncate(self):
     1184        # Truncate implicitly flushes the buffer.
     1185        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
     1186            bufio = self.tp(raw, 8)
     1187            bufio.write(b"abcdef")
     1188            self.assertEqual(bufio.truncate(3), 3)
     1189            self.assertEqual(bufio.tell(), 6)
     1190        with self.open(support.TESTFN, "rb", buffering=0) as f:
     1191            self.assertEqual(f.read(), b"abc")
     1192
     1193    @unittest.skipUnless(threading, 'Threading required for this test.')
     1194    @support.requires_resource('cpu')
     1195    def test_threads(self):
    5391196        try:
     1197            # Write out many bytes from many threads and test they were
     1198            # all flushed.
     1199            N = 1000
     1200            contents = bytes(range(256)) * N
     1201            sizes = cycle([1, 19])
     1202            n = 0
     1203            queue = deque()
     1204            while n < len(contents):
     1205                size = next(sizes)
     1206                queue.append(contents[n:n+size])
     1207                n += size
     1208            del contents
    5401209            # We use a real file object because it allows us to
    5411210            # exercise situations where the GIL is released before
     
    5431212            # to concurrency issues due to switching threads in the middle
    5441213            # of Python code.
    545             with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
    546                 bufio = io.BufferedWriter(raw, 8)
     1214            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
     1215                bufio = self.tp(raw, 8)
    5471216                errors = []
    5481217                def f():
    5491218                    try:
    550                         # Write enough bytes to flush the buffer
    551                         s = b"a" * 19
    552                         for i in range(50):
     1219                        while True:
     1220                            try:
     1221                                s = queue.popleft()
     1222                            except IndexError:
     1223                                return
    5531224                            bufio.write(s)
    5541225                    except Exception as e:
     
    5631234                self.assertFalse(errors,
    5641235                    "the following exceptions were caught: %r" % errors)
     1236                bufio.close()
     1237            with self.open(support.TESTFN, "rb") as f:
     1238                s = f.read()
     1239            for i in range(256):
     1240                self.assertEqual(s.count(bytes([i])), N)
    5651241        finally:
    566             test_support.unlink(test_support.TESTFN)
    567 
     1242            support.unlink(support.TESTFN)
     1243
     1244    def test_misbehaved_io(self):
     1245        rawio = self.MisbehavedRawIO()
     1246        bufio = self.tp(rawio, 5)
     1247        self.assertRaises(IOError, bufio.seek, 0)
     1248        self.assertRaises(IOError, bufio.tell)
     1249        self.assertRaises(IOError, bufio.write, b"abcdef")
     1250
     1251    def test_max_buffer_size_deprecation(self):
     1252        with support.check_warnings(("max_buffer_size is deprecated",
     1253                                     DeprecationWarning)):
     1254            self.tp(self.MockRawIO(), 8, 12)
     1255
     1256    def test_write_error_on_close(self):
     1257        raw = self.MockRawIO()
     1258        def bad_write(b):
     1259            raise IOError()
     1260        raw.write = bad_write
     1261        b = self.tp(raw)
     1262        b.write(b'spam')
     1263        self.assertRaises(IOError, b.close) # exception not swallowed
     1264        self.assertTrue(b.closed)
     1265
     1266
     1267class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
     1268    tp = io.BufferedWriter
     1269
     1270    def test_constructor(self):
     1271        BufferedWriterTest.test_constructor(self)
     1272        # The allocation can succeed on 32-bit builds, e.g. with more
     1273        # than 2GB RAM and a 64-bit kernel.
     1274        if sys.maxsize > 0x7FFFFFFF:
     1275            rawio = self.MockRawIO()
     1276            bufio = self.tp(rawio)
     1277            self.assertRaises((OverflowError, MemoryError, ValueError),
     1278                bufio.__init__, rawio, sys.maxsize)
     1279
     1280    def test_initialization(self):
     1281        rawio = self.MockRawIO()
     1282        bufio = self.tp(rawio)
     1283        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
     1284        self.assertRaises(ValueError, bufio.write, b"def")
     1285        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
     1286        self.assertRaises(ValueError, bufio.write, b"def")
     1287        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
     1288        self.assertRaises(ValueError, bufio.write, b"def")
     1289
     1290    def test_garbage_collection(self):
     1291        # C BufferedWriter objects are collected, and collecting them flushes
     1292        # all data to disk.
     1293        # The Python version has __del__, so it ends into gc.garbage instead
     1294        rawio = self.FileIO(support.TESTFN, "w+b")
     1295        f = self.tp(rawio)
     1296        f.write(b"123xxx")
     1297        f.x = f
     1298        wr = weakref.ref(f)
     1299        del f
     1300        support.gc_collect()
     1301        self.assertTrue(wr() is None, wr)
     1302        with self.open(support.TESTFN, "rb") as f:
     1303            self.assertEqual(f.read(), b"123xxx")
     1304
     1305    def test_args_error(self):
     1306        # Issue #17275
     1307        with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
     1308            self.tp(io.BytesIO(), 1024, 1024, 1024)
     1309
     1310
     1311class PyBufferedWriterTest(BufferedWriterTest):
     1312    tp = pyio.BufferedWriter
    5681313
    5691314class BufferedRWPairTest(unittest.TestCase):
    5701315
    571     def testRWPair(self):
    572         r = MockRawIO(())
    573         w = MockRawIO()
    574         pair = io.BufferedRWPair(r, w)
     1316    def test_constructor(self):
     1317        pair = self.tp(self.MockRawIO(), self.MockRawIO())
    5751318        self.assertFalse(pair.closed)
    5761319
    577         # XXX More Tests
    578 
    579 
    580 class BufferedRandomTest(unittest.TestCase):
    581 
    582     def testReadAndWrite(self):
    583         raw = MockRawIO((b"asdf", b"ghjk"))
    584         rw = io.BufferedRandom(raw, 8, 12)
     1320    def test_detach(self):
     1321        pair = self.tp(self.MockRawIO(), self.MockRawIO())
     1322        self.assertRaises(self.UnsupportedOperation, pair.detach)
     1323
     1324    def test_constructor_max_buffer_size_deprecation(self):
     1325        with support.check_warnings(("max_buffer_size is deprecated",
     1326                                     DeprecationWarning)):
     1327            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
     1328
     1329    def test_constructor_with_not_readable(self):
     1330        class NotReadable(MockRawIO):
     1331            def readable(self):
     1332                return False
     1333
     1334        self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
     1335
     1336    def test_constructor_with_not_writeable(self):
     1337        class NotWriteable(MockRawIO):
     1338            def writable(self):
     1339                return False
     1340
     1341        self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
     1342
     1343    def test_read(self):
     1344        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
     1345
     1346        self.assertEqual(pair.read(3), b"abc")
     1347        self.assertEqual(pair.read(1), b"d")
     1348        self.assertEqual(pair.read(), b"ef")
     1349        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
     1350        self.assertEqual(pair.read(None), b"abc")
     1351
     1352    def test_readlines(self):
     1353        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
     1354        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
     1355        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
     1356        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
     1357
     1358    def test_read1(self):
     1359        # .read1() is delegated to the underlying reader object, so this test
     1360        # can be shallow.
     1361        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
     1362
     1363        self.assertEqual(pair.read1(3), b"abc")
     1364
     1365    def test_readinto(self):
     1366        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
     1367
     1368        data = bytearray(5)
     1369        self.assertEqual(pair.readinto(data), 5)
     1370        self.assertEqual(data, b"abcde")
     1371
     1372    def test_write(self):
     1373        w = self.MockRawIO()
     1374        pair = self.tp(self.MockRawIO(), w)
     1375
     1376        pair.write(b"abc")
     1377        pair.flush()
     1378        pair.write(b"def")
     1379        pair.flush()
     1380        self.assertEqual(w._write_stack, [b"abc", b"def"])
     1381
     1382    def test_peek(self):
     1383        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
     1384
     1385        self.assertTrue(pair.peek(3).startswith(b"abc"))
     1386        self.assertEqual(pair.read(3), b"abc")
     1387
     1388    def test_readable(self):
     1389        pair = self.tp(self.MockRawIO(), self.MockRawIO())
     1390        self.assertTrue(pair.readable())
     1391
     1392    def test_writeable(self):
     1393        pair = self.tp(self.MockRawIO(), self.MockRawIO())
     1394        self.assertTrue(pair.writable())
     1395
     1396    def test_seekable(self):
     1397        # BufferedRWPairs are never seekable, even if their readers and writers
     1398        # are.
     1399        pair = self.tp(self.MockRawIO(), self.MockRawIO())
     1400        self.assertFalse(pair.seekable())
     1401
     1402    # .flush() is delegated to the underlying writer object and has been
     1403    # tested in the test_write method.
     1404
     1405    def test_close_and_closed(self):
     1406        pair = self.tp(self.MockRawIO(), self.MockRawIO())
     1407        self.assertFalse(pair.closed)
     1408        pair.close()
     1409        self.assertTrue(pair.closed)
     1410
     1411    def test_isatty(self):
     1412        class SelectableIsAtty(MockRawIO):
     1413            def __init__(self, isatty):
     1414                MockRawIO.__init__(self)
     1415                self._isatty = isatty
     1416
     1417            def isatty(self):
     1418                return self._isatty
     1419
     1420        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
     1421        self.assertFalse(pair.isatty())
     1422
     1423        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
     1424        self.assertTrue(pair.isatty())
     1425
     1426        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
     1427        self.assertTrue(pair.isatty())
     1428
     1429        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
     1430        self.assertTrue(pair.isatty())
     1431
     1432class CBufferedRWPairTest(BufferedRWPairTest):
     1433    tp = io.BufferedRWPair
     1434
     1435class PyBufferedRWPairTest(BufferedRWPairTest):
     1436    tp = pyio.BufferedRWPair
     1437
     1438
     1439class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
     1440    read_mode = "rb+"
     1441    write_mode = "wb+"
     1442
     1443    def test_constructor(self):
     1444        BufferedReaderTest.test_constructor(self)
     1445        BufferedWriterTest.test_constructor(self)
     1446
     1447    def test_read_and_write(self):
     1448        raw = self.MockRawIO((b"asdf", b"ghjk"))
     1449        rw = self.tp(raw, 8)
    5851450
    5861451        self.assertEqual(b"as", rw.read(2))
     
    5881453        rw.write(b"eee")
    5891454        self.assertFalse(raw._write_stack) # Buffer writes
    590         self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
    591         self.assertEquals(b"dddeee", raw._write_stack[0])
    592 
    593     def testSeekAndTell(self):
    594         raw = io.BytesIO(b"asdfghjkl")
    595         rw = io.BufferedRandom(raw)
    596 
    597         self.assertEquals(b"as", rw.read(2))
    598         self.assertEquals(2, rw.tell())
     1455        self.assertEqual(b"ghjk", rw.read())
     1456        self.assertEqual(b"dddeee", raw._write_stack[0])
     1457
     1458    def test_seek_and_tell(self):
     1459        raw = self.BytesIO(b"asdfghjkl")
     1460        rw = self.tp(raw)
     1461
     1462        self.assertEqual(b"as", rw.read(2))
     1463        self.assertEqual(2, rw.tell())
    5991464        rw.seek(0, 0)
    600         self.assertEquals(b"asdf", rw.read(4))
    601 
    602         rw.write(b"asdf")
     1465        self.assertEqual(b"asdf", rw.read(4))
     1466
     1467        rw.write(b"123f")
    6031468        rw.seek(0, 0)
    604         self.assertEquals(b"asdfasdfl", rw.read())
    605         self.assertEquals(9, rw.tell())
     1469        self.assertEqual(b"asdf123fl", rw.read())
     1470        self.assertEqual(9, rw.tell())
    6061471        rw.seek(-4, 2)
    607         self.assertEquals(5, rw.tell())
     1472        self.assertEqual(5, rw.tell())
    6081473        rw.seek(2, 1)
    609         self.assertEquals(7, rw.tell())
    610         self.assertEquals(b"fl", rw.read(11))
     1474        self.assertEqual(7, rw.tell())
     1475        self.assertEqual(b"fl", rw.read(11))
     1476        rw.flush()
     1477        self.assertEqual(b"asdf123fl", raw.getvalue())
     1478
    6111479        self.assertRaises(TypeError, rw.seek, 0.0)
     1480
     1481    def check_flush_and_read(self, read_func):
     1482        raw = self.BytesIO(b"abcdefghi")
     1483        bufio = self.tp(raw)
     1484
     1485        self.assertEqual(b"ab", read_func(bufio, 2))
     1486        bufio.write(b"12")
     1487        self.assertEqual(b"ef", read_func(bufio, 2))
     1488        self.assertEqual(6, bufio.tell())
     1489        bufio.flush()
     1490        self.assertEqual(6, bufio.tell())
     1491        self.assertEqual(b"ghi", read_func(bufio))
     1492        raw.seek(0, 0)
     1493        raw.write(b"XYZ")
     1494        # flush() resets the read buffer
     1495        bufio.flush()
     1496        bufio.seek(0, 0)
     1497        self.assertEqual(b"XYZ", read_func(bufio, 3))
     1498
     1499    def test_flush_and_read(self):
     1500        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
     1501
     1502    def test_flush_and_readinto(self):
     1503        def _readinto(bufio, n=-1):
     1504            b = bytearray(n if n >= 0 else 9999)
     1505            n = bufio.readinto(b)
     1506            return bytes(b[:n])
     1507        self.check_flush_and_read(_readinto)
     1508
     1509    def test_flush_and_peek(self):
     1510        def _peek(bufio, n=-1):
     1511            # This relies on the fact that the buffer can contain the whole
     1512            # raw stream, otherwise peek() can return less.
     1513            b = bufio.peek(n)
     1514            if n != -1:
     1515                b = b[:n]
     1516            bufio.seek(len(b), 1)
     1517            return b
     1518        self.check_flush_and_read(_peek)
     1519
     1520    def test_flush_and_write(self):
     1521        raw = self.BytesIO(b"abcdefghi")
     1522        bufio = self.tp(raw)
     1523
     1524        bufio.write(b"123")
     1525        bufio.flush()
     1526        bufio.write(b"45")
     1527        bufio.flush()
     1528        bufio.seek(0, 0)
     1529        self.assertEqual(b"12345fghi", raw.getvalue())
     1530        self.assertEqual(b"12345fghi", bufio.read())
     1531
     1532    def test_threads(self):
     1533        BufferedReaderTest.test_threads(self)
     1534        BufferedWriterTest.test_threads(self)
     1535
     1536    def test_writes_and_peek(self):
     1537        def _peek(bufio):
     1538            bufio.peek(1)
     1539        self.check_writes(_peek)
     1540        def _peek(bufio):
     1541            pos = bufio.tell()
     1542            bufio.seek(-1, 1)
     1543            bufio.peek(1)
     1544            bufio.seek(pos, 0)
     1545        self.check_writes(_peek)
     1546
     1547    def test_writes_and_reads(self):
     1548        def _read(bufio):
     1549            bufio.seek(-1, 1)
     1550            bufio.read(1)
     1551        self.check_writes(_read)
     1552
     1553    def test_writes_and_read1s(self):
     1554        def _read1(bufio):
     1555            bufio.seek(-1, 1)
     1556            bufio.read1(1)
     1557        self.check_writes(_read1)
     1558
     1559    def test_writes_and_readintos(self):
     1560        def _read(bufio):
     1561            bufio.seek(-1, 1)
     1562            bufio.readinto(bytearray(1))
     1563        self.check_writes(_read)
     1564
     1565    def test_write_after_readahead(self):
     1566        # Issue #6629: writing after the buffer was filled by readahead should
     1567        # first rewind the raw stream.
     1568        for overwrite_size in [1, 5]:
     1569            raw = self.BytesIO(b"A" * 10)
     1570            bufio = self.tp(raw, 4)
     1571            # Trigger readahead
     1572            self.assertEqual(bufio.read(1), b"A")
     1573            self.assertEqual(bufio.tell(), 1)
     1574            # Overwriting should rewind the raw stream if it needs so
     1575            bufio.write(b"B" * overwrite_size)
     1576            self.assertEqual(bufio.tell(), overwrite_size + 1)
     1577            # If the write size was smaller than the buffer size, flush() and
     1578            # check that rewind happens.
     1579            bufio.flush()
     1580            self.assertEqual(bufio.tell(), overwrite_size + 1)
     1581            s = raw.getvalue()
     1582            self.assertEqual(s,
     1583                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
     1584
     1585    def test_write_rewind_write(self):
     1586        # Various combinations of reading / writing / seeking backwards / writing again
     1587        def mutate(bufio, pos1, pos2):
     1588            assert pos2 >= pos1
     1589            # Fill the buffer
     1590            bufio.seek(pos1)
     1591            bufio.read(pos2 - pos1)
     1592            bufio.write(b'\x02')
     1593            # This writes earlier than the previous write, but still inside
     1594            # the buffer.
     1595            bufio.seek(pos1)
     1596            bufio.write(b'\x01')
     1597
     1598        b = b"\x80\x81\x82\x83\x84"
     1599        for i in range(0, len(b)):
     1600            for j in range(i, len(b)):
     1601                raw = self.BytesIO(b)
     1602                bufio = self.tp(raw, 100)
     1603                mutate(bufio, i, j)
     1604                bufio.flush()
     1605                expected = bytearray(b)
     1606                expected[j] = 2
     1607                expected[i] = 1
     1608                self.assertEqual(raw.getvalue(), expected,
     1609                                 "failed result for i=%d, j=%d" % (i, j))
     1610
     1611    def test_truncate_after_read_or_write(self):
     1612        raw = self.BytesIO(b"A" * 10)
     1613        bufio = self.tp(raw, 100)
     1614        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
     1615        self.assertEqual(bufio.truncate(), 2)
     1616        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
     1617        self.assertEqual(bufio.truncate(), 4)
     1618
     1619    def test_misbehaved_io(self):
     1620        BufferedReaderTest.test_misbehaved_io(self)
     1621        BufferedWriterTest.test_misbehaved_io(self)
     1622
     1623    def test_interleaved_read_write(self):
     1624        # Test for issue #12213
     1625        with self.BytesIO(b'abcdefgh') as raw:
     1626            with self.tp(raw, 100) as f:
     1627                f.write(b"1")
     1628                self.assertEqual(f.read(1), b'b')
     1629                f.write(b'2')
     1630                self.assertEqual(f.read1(1), b'd')
     1631                f.write(b'3')
     1632                buf = bytearray(1)
     1633                f.readinto(buf)
     1634                self.assertEqual(buf, b'f')
     1635                f.write(b'4')
     1636                self.assertEqual(f.peek(1), b'h')
     1637                f.flush()
     1638                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
     1639
     1640        with self.BytesIO(b'abc') as raw:
     1641            with self.tp(raw, 100) as f:
     1642                self.assertEqual(f.read(1), b'a')
     1643                f.write(b"2")
     1644                self.assertEqual(f.read(1), b'c')
     1645                f.flush()
     1646                self.assertEqual(raw.getvalue(), b'a2c')
     1647
     1648    def test_interleaved_readline_write(self):
     1649        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
     1650            with self.tp(raw) as f:
     1651                f.write(b'1')
     1652                self.assertEqual(f.readline(), b'b\n')
     1653                f.write(b'2')
     1654                self.assertEqual(f.readline(), b'def\n')
     1655                f.write(b'3')
     1656                self.assertEqual(f.readline(), b'\n')
     1657                f.flush()
     1658                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
     1659
     1660
     1661class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
     1662                          BufferedRandomTest, SizeofTest):
     1663    tp = io.BufferedRandom
     1664
     1665    def test_constructor(self):
     1666        BufferedRandomTest.test_constructor(self)
     1667        # The allocation can succeed on 32-bit builds, e.g. with more
     1668        # than 2GB RAM and a 64-bit kernel.
     1669        if sys.maxsize > 0x7FFFFFFF:
     1670            rawio = self.MockRawIO()
     1671            bufio = self.tp(rawio)
     1672            self.assertRaises((OverflowError, MemoryError, ValueError),
     1673                bufio.__init__, rawio, sys.maxsize)
     1674
     1675    def test_garbage_collection(self):
     1676        CBufferedReaderTest.test_garbage_collection(self)
     1677        CBufferedWriterTest.test_garbage_collection(self)
     1678
     1679    def test_args_error(self):
     1680        # Issue #17275
     1681        with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
     1682            self.tp(io.BytesIO(), 1024, 1024, 1024)
     1683
     1684
     1685class PyBufferedRandomTest(BufferedRandomTest):
     1686    tp = pyio.BufferedRandom
     1687
    6121688
    6131689# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
     
    7441820    ]
    7451821
    746     def testDecoder(self):
     1822    def test_decoder(self):
    7471823        # Try a few one-shot test cases.
    7481824        for input, eof, output in self.test_cases:
    7491825            d = StatefulIncrementalDecoder()
    750             self.assertEquals(d.decode(input, eof), output)
     1826            self.assertEqual(d.decode(input, eof), output)
    7511827
    7521828        # Also test an unfinished decode, followed by forcing EOF.
    7531829        d = StatefulIncrementalDecoder()
    754         self.assertEquals(d.decode(b'oiabcd'), '')
    755         self.assertEquals(d.decode(b'', 1), 'abcd.')
     1830        self.assertEqual(d.decode(b'oiabcd'), '')
     1831        self.assertEqual(d.decode(b'', 1), 'abcd.')
    7561832
    7571833class TextIOWrapperTest(unittest.TestCase):
     
    7601836        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
    7611837        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
     1838        support.unlink(support.TESTFN)
    7621839
    7631840    def tearDown(self):
    764         test_support.unlink(test_support.TESTFN)
    765 
    766     def testLineBuffering(self):
    767         r = io.BytesIO()
    768         b = io.BufferedWriter(r, 1000)
    769         t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
    770         t.write(u"X")
    771         self.assertEquals(r.getvalue(), b"")  # No flush happened
    772         t.write(u"Y\nZ")
    773         self.assertEquals(r.getvalue(), b"XY\nZ")  # All got flushed
    774         t.write(u"A\rB")
    775         self.assertEquals(r.getvalue(), b"XY\nZA\rB")
    776 
    777     def testEncodingErrorsReading(self):
     1841        support.unlink(support.TESTFN)
     1842
     1843    def test_constructor(self):
     1844        r = self.BytesIO(b"\xc3\xa9\n\n")
     1845        b = self.BufferedReader(r, 1000)
     1846        t = self.TextIOWrapper(b)
     1847        t.__init__(b, encoding="latin1", newline="\r\n")
     1848        self.assertEqual(t.encoding, "latin1")
     1849        self.assertEqual(t.line_buffering, False)
     1850        t.__init__(b, encoding="utf8", line_buffering=True)
     1851        self.assertEqual(t.encoding, "utf8")
     1852        self.assertEqual(t.line_buffering, True)
     1853        self.assertEqual("\xe9\n", t.readline())
     1854        self.assertRaises(TypeError, t.__init__, b, newline=42)
     1855        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
     1856
     1857    def test_detach(self):
     1858        r = self.BytesIO()
     1859        b = self.BufferedWriter(r)
     1860        t = self.TextIOWrapper(b)
     1861        self.assertIs(t.detach(), b)
     1862
     1863        t = self.TextIOWrapper(b, encoding="ascii")
     1864        t.write("howdy")
     1865        self.assertFalse(r.getvalue())
     1866        t.detach()
     1867        self.assertEqual(r.getvalue(), b"howdy")
     1868        self.assertRaises(ValueError, t.detach)
     1869
     1870    def test_repr(self):
     1871        raw = self.BytesIO("hello".encode("utf-8"))
     1872        b = self.BufferedReader(raw)
     1873        t = self.TextIOWrapper(b, encoding="utf-8")
     1874        modname = self.TextIOWrapper.__module__
     1875        self.assertEqual(repr(t),
     1876                         "<%s.TextIOWrapper encoding='utf-8'>" % modname)
     1877        raw.name = "dummy"
     1878        self.assertEqual(repr(t),
     1879                         "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
     1880        raw.name = b"dummy"
     1881        self.assertEqual(repr(t),
     1882                         "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
     1883
     1884    def test_line_buffering(self):
     1885        r = self.BytesIO()
     1886        b = self.BufferedWriter(r, 1000)
     1887        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
     1888        t.write("X")
     1889        self.assertEqual(r.getvalue(), b"")  # No flush happened
     1890        t.write("Y\nZ")
     1891        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
     1892        t.write("A\rB")
     1893        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
     1894
     1895    def test_encoding(self):
     1896        # Check the encoding attribute is always set, and valid
     1897        b = self.BytesIO()
     1898        t = self.TextIOWrapper(b, encoding="utf8")
     1899        self.assertEqual(t.encoding, "utf8")
     1900        t = self.TextIOWrapper(b)
     1901        self.assertTrue(t.encoding is not None)
     1902        codecs.lookup(t.encoding)
     1903
     1904    def test_encoding_errors_reading(self):
    7781905        # (1) default
    779         b = io.BytesIO(b"abc\n\xff\n")
    780         t = io.TextIOWrapper(b, encoding="ascii")
     1906        b = self.BytesIO(b"abc\n\xff\n")
     1907        t = self.TextIOWrapper(b, encoding="ascii")
    7811908        self.assertRaises(UnicodeError, t.read)
    7821909        # (2) explicit strict
    783         b = io.BytesIO(b"abc\n\xff\n")
    784         t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
     1910        b = self.BytesIO(b"abc\n\xff\n")
     1911        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
    7851912        self.assertRaises(UnicodeError, t.read)
    7861913        # (3) ignore
    787         b = io.BytesIO(b"abc\n\xff\n")
    788         t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
    789         self.assertEquals(t.read(), "abc\n\n")
     1914        b = self.BytesIO(b"abc\n\xff\n")
     1915        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
     1916        self.assertEqual(t.read(), "abc\n\n")
    7901917        # (4) replace
    791         b = io.BytesIO(b"abc\n\xff\n")
    792         t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
    793         self.assertEquals(t.read(), u"abc\n\ufffd\n")
    794 
    795     def testEncodingErrorsWriting(self):
     1918        b = self.BytesIO(b"abc\n\xff\n")
     1919        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
     1920        self.assertEqual(t.read(), "abc\n\ufffd\n")
     1921
     1922    def test_encoding_errors_writing(self):
    7961923        # (1) default
    797         b = io.BytesIO()
    798         t = io.TextIOWrapper(b, encoding="ascii")
    799         self.assertRaises(UnicodeError, t.write, u"\xff")
     1924        b = self.BytesIO()
     1925        t = self.TextIOWrapper(b, encoding="ascii")
     1926        self.assertRaises(UnicodeError, t.write, "\xff")
    8001927        # (2) explicit strict
    801         b = io.BytesIO()
    802         t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
    803         self.assertRaises(UnicodeError, t.write, u"\xff")
     1928        b = self.BytesIO()
     1929        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
     1930        self.assertRaises(UnicodeError, t.write, "\xff")
    8041931        # (3) ignore
    805         b = io.BytesIO()
    806         t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
     1932        b = self.BytesIO()
     1933        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
    8071934                             newline="\n")
    808         t.write(u"abc\xffdef\n")
     1935        t.write("abc\xffdef\n")
    8091936        t.flush()
    810         self.assertEquals(b.getvalue(), b"abcdef\n")
     1937        self.assertEqual(b.getvalue(), b"abcdef\n")
    8111938        # (4) replace
    812         b = io.BytesIO()
    813         t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
     1939        b = self.BytesIO()
     1940        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
    8141941                             newline="\n")
    815         t.write(u"abc\xffdef\n")
     1942        t.write("abc\xffdef\n")
    8161943        t.flush()
    817         self.assertEquals(b.getvalue(), b"abc?def\n")
    818 
    819     def testNewlinesInput(self):
    820         testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
    821         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
    822         for newline, expected in [
    823             (None, normalized.decode("ascii").splitlines(True)),
    824             ("", testdata.decode("ascii").splitlines(True)),
    825             ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
    826             ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
    827             ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
    828             ]:
    829             buf = io.BytesIO(testdata)
    830             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
    831             self.assertEquals(txt.readlines(), expected)
    832             txt.seek(0)
    833             self.assertEquals(txt.read(), "".join(expected))
    834 
    835     def testNewlinesOutput(self):
    836         testdict = {
    837             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
    838             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
    839             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
    840             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
    841             }
    842         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
    843         for newline, expected in tests:
    844             buf = io.BytesIO()
    845             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
    846             txt.write("AAA\nB")
    847             txt.write("BB\nCCC\n")
    848             txt.write("X\rY\r\nZ")
    849             txt.flush()
    850             self.assertEquals(buf.closed, False)
    851             self.assertEquals(buf.getvalue(), expected)
    852 
    853     def testNewlines(self):
     1944        self.assertEqual(b.getvalue(), b"abc?def\n")
     1945
     1946    def test_newlines(self):
    8541947        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
    8551948
     
    8751968                for bufsize in range(1, 10):
    8761969                    for newline, exp_lines in tests:
    877                         bufio = io.BufferedReader(io.BytesIO(data), bufsize)
    878                         textio = io.TextIOWrapper(bufio, newline=newline,
     1970                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
     1971                        textio = self.TextIOWrapper(bufio, newline=newline,
    8791972                                                  encoding=encoding)
    8801973                        if do_reads:
     
    8841977                                if c2 == '':
    8851978                                    break
    886                                 self.assertEquals(len(c2), 2)
     1979                                self.assertEqual(len(c2), 2)
    8871980                                got_lines.append(c2 + textio.readline())
    8881981                        else:
     
    8901983
    8911984                        for got_line, exp_line in zip(got_lines, exp_lines):
    892                             self.assertEquals(got_line, exp_line)
    893                         self.assertEquals(len(got_lines), len(exp_lines))
    894 
    895     def testNewlinesInput(self):
    896         testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
     1985                            self.assertEqual(got_line, exp_line)
     1986                        self.assertEqual(len(got_lines), len(exp_lines))
     1987
     1988    def test_newlines_input(self):
     1989        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
    8971990        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
    8981991        for newline, expected in [
    8991992            (None, normalized.decode("ascii").splitlines(True)),
    9001993            ("", testdata.decode("ascii").splitlines(True)),
    901             ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
    902             ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
    903             ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
     1994            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
     1995            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
     1996            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
    9041997            ]:
    905             buf = io.BytesIO(testdata)
    906             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
    907             self.assertEquals(txt.readlines(), expected)
     1998            buf = self.BytesIO(testdata)
     1999            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
     2000            self.assertEqual(txt.readlines(), expected)
    9082001            txt.seek(0)
    909             self.assertEquals(txt.read(), "".join(expected))
    910 
    911     def testNewlinesOutput(self):
    912         data = u"AAA\nBBB\rCCC\n"
    913         data_lf = b"AAA\nBBB\rCCC\n"
    914         data_cr = b"AAA\rBBB\rCCC\r"
    915         data_crlf = b"AAA\r\nBBB\rCCC\r\n"
    916         save_linesep = os.linesep
    917         try:
    918             for os.linesep, newline, expected in [
    919                 ("\n", None, data_lf),
    920                 ("\r\n", None, data_crlf),
    921                 ("\n", "", data_lf),
    922                 ("\r\n", "", data_lf),
    923                 ("\n", "\n", data_lf),
    924                 ("\r\n", "\n", data_lf),
    925                 ("\n", "\r", data_cr),
    926                 ("\r\n", "\r", data_cr),
    927                 ("\n", "\r\n", data_crlf),
    928                 ("\r\n", "\r\n", data_crlf),
    929                 ]:
    930                 buf = io.BytesIO()
    931                 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
    932                 txt.write(data)
    933                 txt.close()
    934                 self.assertEquals(buf.closed, True)
    935                 self.assertRaises(ValueError, buf.getvalue)
    936         finally:
    937             os.linesep = save_linesep
     2002            self.assertEqual(txt.read(), "".join(expected))
     2003
     2004    def test_newlines_output(self):
     2005        testdict = {
     2006            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
     2007            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
     2008            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
     2009            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
     2010            }
     2011        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
     2012        for newline, expected in tests:
     2013            buf = self.BytesIO()
     2014            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
     2015            txt.write("AAA\nB")
     2016            txt.write("BB\nCCC\n")
     2017            txt.write("X\rY\r\nZ")
     2018            txt.flush()
     2019            self.assertEqual(buf.closed, False)
     2020            self.assertEqual(buf.getvalue(), expected)
     2021
     2022    def test_destructor(self):
     2023        l = []
     2024        base = self.BytesIO
     2025        class MyBytesIO(base):
     2026            def close(self):
     2027                l.append(self.getvalue())
     2028                base.close(self)
     2029        b = MyBytesIO()
     2030        t = self.TextIOWrapper(b, encoding="ascii")
     2031        t.write("abc")
     2032        del t
     2033        support.gc_collect()
     2034        self.assertEqual([b"abc"], l)
     2035
     2036    def test_override_destructor(self):
     2037        record = []
     2038        class MyTextIO(self.TextIOWrapper):
     2039            def __del__(self):
     2040                record.append(1)
     2041                try:
     2042                    f = super(MyTextIO, self).__del__
     2043                except AttributeError:
     2044                    pass
     2045                else:
     2046                    f()
     2047            def close(self):
     2048                record.append(2)
     2049                super(MyTextIO, self).close()
     2050            def flush(self):
     2051                record.append(3)
     2052                super(MyTextIO, self).flush()
     2053        b = self.BytesIO()
     2054        t = MyTextIO(b, encoding="ascii")
     2055        del t
     2056        support.gc_collect()
     2057        self.assertEqual(record, [1, 2, 3])
     2058
     2059    def test_error_through_destructor(self):
     2060        # Test that the exception state is not modified by a destructor,
     2061        # even if close() fails.
     2062        rawio = self.CloseFailureIO()
     2063        def f():
     2064            self.TextIOWrapper(rawio).xyzzy
     2065        with support.captured_output("stderr") as s:
     2066            self.assertRaises(AttributeError, f)
     2067        s = s.getvalue().strip()
     2068        if s:
     2069            # The destructor *may* have printed an unraisable error, check it
     2070            self.assertEqual(len(s.splitlines()), 1)
     2071            self.assertTrue(s.startswith("Exception IOError: "), s)
     2072            self.assertTrue(s.endswith(" ignored"), s)
    9382073
    9392074    # Systematic tests of the text I/O API
    9402075
    941     def testBasicIO(self):
     2076    def test_basic_io(self):
    9422077        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
    9432078            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
    944                 f = io.open(test_support.TESTFN, "w+", encoding=enc)
     2079                f = self.open(support.TESTFN, "w+", encoding=enc)
    9452080                f._CHUNK_SIZE = chunksize
    946                 self.assertEquals(f.write(u"abc"), 3)
     2081                self.assertEqual(f.write("abc"), 3)
    9472082                f.close()
    948                 f = io.open(test_support.TESTFN, "r+", encoding=enc)
     2083                f = self.open(support.TESTFN, "r+", encoding=enc)
    9492084                f._CHUNK_SIZE = chunksize
    950                 self.assertEquals(f.tell(), 0)
    951                 self.assertEquals(f.read(), u"abc")
     2085                self.assertEqual(f.tell(), 0)
     2086                self.assertEqual(f.read(), "abc")
    9522087                cookie = f.tell()
    953                 self.assertEquals(f.seek(0), 0)
    954                 self.assertEquals(f.read(2), u"ab")
    955                 self.assertEquals(f.read(1), u"c")
    956                 self.assertEquals(f.read(1), u"")
    957                 self.assertEquals(f.read(), u"")
    958                 self.assertEquals(f.tell(), cookie)
    959                 self.assertEquals(f.seek(0), 0)
    960                 self.assertEquals(f.seek(0, 2), cookie)
    961                 self.assertEquals(f.write(u"def"), 3)
    962                 self.assertEquals(f.seek(cookie), cookie)
    963                 self.assertEquals(f.read(), u"def")
     2088                self.assertEqual(f.seek(0), 0)
     2089                self.assertEqual(f.read(None), "abc")
     2090                f.seek(0)
     2091                self.assertEqual(f.read(2), "ab")
     2092                self.assertEqual(f.read(1), "c")
     2093                self.assertEqual(f.read(1), "")
     2094                self.assertEqual(f.read(), "")
     2095                self.assertEqual(f.tell(), cookie)
     2096                self.assertEqual(f.seek(0), 0)
     2097                self.assertEqual(f.seek(0, 2), cookie)
     2098                self.assertEqual(f.write("def"), 3)
     2099                self.assertEqual(f.seek(cookie), cookie)
     2100                self.assertEqual(f.read(), "def")
    9642101                if enc.startswith("utf"):
    9652102                    self.multi_line_test(f, enc)
     
    9692106        f.seek(0)
    9702107        f.truncate()
    971         sample = u"s\xff\u0fff\uffff"
     2108        sample = "s\xff\u0fff\uffff"
    9722109        wlines = []
    9732110        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
     
    9752112            for i in range(size):
    9762113                chars.append(sample[i % len(sample)])
    977             line = u"".join(chars) + u"\n"
     2114            line = "".join(chars) + "\n"
    9782115            wlines.append((f.tell(), line))
    9792116            f.write(line)
     
    9862123                break
    9872124            rlines.append((pos, line))
    988         self.assertEquals(rlines, wlines)
    989 
    990     def testTelling(self):
    991         f = io.open(test_support.TESTFN, "w+", encoding="utf8")
     2125        self.assertEqual(rlines, wlines)
     2126
     2127    def test_telling(self):
     2128        f = self.open(support.TESTFN, "w+", encoding="utf8")
    9922129        p0 = f.tell()
    993         f.write(u"\xff\n")
     2130        f.write("\xff\n")
    9942131        p1 = f.tell()
    995         f.write(u"\xff\n")
     2132        f.write("\xff\n")
    9962133        p2 = f.tell()
    9972134        f.seek(0)
    998         self.assertEquals(f.tell(), p0)
    999         self.assertEquals(f.readline(), u"\xff\n")
    1000         self.assertEquals(f.tell(), p1)
    1001         self.assertEquals(f.readline(), u"\xff\n")
    1002         self.assertEquals(f.tell(), p2)
     2135        self.assertEqual(f.tell(), p0)
     2136        self.assertEqual(f.readline(), "\xff\n")
     2137        self.assertEqual(f.tell(), p1)
     2138        self.assertEqual(f.readline(), "\xff\n")
     2139        self.assertEqual(f.tell(), p2)
    10032140        f.seek(0)
    10042141        for line in f:
    1005             self.assertEquals(line, u"\xff\n")
     2142            self.assertEqual(line, "\xff\n")
    10062143            self.assertRaises(IOError, f.tell)
    1007         self.assertEquals(f.tell(), p2)
     2144        self.assertEqual(f.tell(), p2)
    10082145        f.close()
    10092146
    1010     def testSeeking(self):
    1011         chunk_size = io.TextIOWrapper._CHUNK_SIZE
     2147    def test_seeking(self):
     2148        chunk_size = _default_chunk_size()
    10122149        prefix_size = chunk_size - 2
    10132150        u_prefix = "a" * prefix_size
    10142151        prefix = bytes(u_prefix.encode("utf-8"))
    1015         self.assertEquals(len(u_prefix), len(prefix))
     2152        self.assertEqual(len(u_prefix), len(prefix))
    10162153        u_suffix = "\u8888\n"
    10172154        suffix = bytes(u_suffix.encode("utf-8"))
    10182155        line = prefix + suffix
    1019         f = io.open(test_support.TESTFN, "wb")
     2156        f = self.open(support.TESTFN, "wb")
    10202157        f.write(line*2)
    10212158        f.close()
    1022         f = io.open(test_support.TESTFN, "r", encoding="utf-8")
     2159        f = self.open(support.TESTFN, "r", encoding="utf-8")
    10232160        s = f.read(prefix_size)
    1024         self.assertEquals(s, unicode(prefix, "ascii"))
    1025         self.assertEquals(f.tell(), prefix_size)
    1026         self.assertEquals(f.readline(), u_suffix)
    1027 
    1028     def testSeekingToo(self):
     2161        self.assertEqual(s, prefix.decode("ascii"))
     2162        self.assertEqual(f.tell(), prefix_size)
     2163        self.assertEqual(f.readline(), u_suffix)
     2164
     2165    def test_seeking_too(self):
    10292166        # Regression test for a specific bug
    10302167        data = b'\xe0\xbf\xbf\n'
    1031         f = io.open(test_support.TESTFN, "wb")
     2168        f = self.open(support.TESTFN, "wb")
    10322169        f.write(data)
    10332170        f.close()
    1034         f = io.open(test_support.TESTFN, "r", encoding="utf-8")
     2171        f = self.open(support.TESTFN, "r", encoding="utf-8")
    10352172        f._CHUNK_SIZE  # Just test that it exists
    10362173        f._CHUNK_SIZE = 2
     
    10382175        f.tell()
    10392176
    1040     def testSeekAndTell(self):
    1041         """Test seek/tell using the StatefulIncrementalDecoder."""
    1042 
    1043         def testSeekAndTellWithData(data, min_pos=0):
     2177    def test_seek_and_tell(self):
     2178        #Test seek/tell using the StatefulIncrementalDecoder.
     2179        # Make test faster by doing smaller seeks
     2180        CHUNK_SIZE = 128
     2181
     2182        def test_seek_and_tell_with_data(data, min_pos=0):
    10442183            """Tell/seek to various points within a data stream and ensure
    10452184            that the decoded data returned by read() is consistent."""
    1046             f = io.open(test_support.TESTFN, 'wb')
     2185            f = self.open(support.TESTFN, 'wb')
    10472186            f.write(data)
    10482187            f.close()
    1049             f = io.open(test_support.TESTFN, encoding='test_decoder')
     2188            f = self.open(support.TESTFN, encoding='test_decoder')
     2189            f._CHUNK_SIZE = CHUNK_SIZE
    10502190            decoded = f.read()
    10512191            f.close()
     
    10532193            for i in range(min_pos, len(decoded) + 1): # seek positions
    10542194                for j in [1, 5, len(decoded) - i]: # read lengths
    1055                     f = io.open(test_support.TESTFN, encoding='test_decoder')
    1056                     self.assertEquals(f.read(i), decoded[:i])
     2195                    f = self.open(support.TESTFN, encoding='test_decoder')
     2196                    self.assertEqual(f.read(i), decoded[:i])
    10572197                    cookie = f.tell()
    1058                     self.assertEquals(f.read(j), decoded[i:i + j])
     2198                    self.assertEqual(f.read(j), decoded[i:i + j])
    10592199                    f.seek(cookie)
    1060                     self.assertEquals(f.read(), decoded[i:])
     2200                    self.assertEqual(f.read(), decoded[i:])
    10612201                    f.close()
    10622202
     
    10682208            # Try each test case.
    10692209            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
    1070                 testSeekAndTellWithData(input)
     2210                test_seek_and_tell_with_data(input)
    10712211
    10722212            # Position each test case so that it crosses a chunk boundary.
    1073             CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
    10742213            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
    10752214                offset = CHUNK_SIZE - len(input)//2
     
    10772216                # Don't bother seeking into the prefix (takes too long).
    10782217                min_pos = offset*2
    1079                 testSeekAndTellWithData(prefix + input, min_pos)
     2218                test_seek_and_tell_with_data(prefix + input, min_pos)
    10802219
    10812220        # Ensure our test decoder won't interfere with subsequent tests.
     
    10832222            StatefulIncrementalDecoder.codecEnabled = 0
    10842223
    1085     def testEncodedWrites(self):
    1086         data = u"1234567890"
     2224    def test_encoded_writes(self):
     2225        data = "1234567890"
    10872226        tests = ("utf-16",
    10882227                 "utf-16-le",
     
    10922231                 "utf-32-be")
    10932232        for encoding in tests:
    1094             buf = io.BytesIO()
    1095             f = io.TextIOWrapper(buf, encoding=encoding)
     2233            buf = self.BytesIO()
     2234            f = self.TextIOWrapper(buf, encoding=encoding)
    10962235            # Check if the BOM is written only once (see issue1753).
    10972236            f.write(data)
    10982237            f.write(data)
    10992238            f.seek(0)
    1100             self.assertEquals(f.read(), data * 2)
    1101             self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
    1102 
    1103     def timingTest(self):
    1104         timer = time.time
    1105         enc = "utf8"
    1106         line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
    1107         nlines = 10000
    1108         nchars = len(line)
    1109         nbytes = len(line.encode(enc))
    1110         for chunk_size in (32, 64, 128, 256):
    1111             f = io.open(test_support.TESTFN, "w+", encoding=enc)
    1112             f._CHUNK_SIZE = chunk_size
    1113             t0 = timer()
    1114             for i in range(nlines):
    1115                 f.write(line)
    1116             f.flush()
    1117             t1 = timer()
     2239            self.assertEqual(f.read(), data * 2)
    11182240            f.seek(0)
    1119             for line in f:
    1120                 pass
    1121             t2 = timer()
    1122             f.seek(0)
    1123             while f.readline():
    1124                 pass
    1125             t3 = timer()
    1126             f.seek(0)
    1127             while f.readline():
    1128                 f.tell()
    1129             t4 = timer()
    1130             f.close()
    1131             if test_support.verbose:
    1132                 print("\nTiming test: %d lines of %d characters (%d bytes)" %
    1133                       (nlines, nchars, nbytes))
    1134                 print("File chunk size:          %6s" % f._CHUNK_SIZE)
    1135                 print("Writing:                  %6.3f seconds" % (t1-t0))
    1136                 print("Reading using iteration:  %6.3f seconds" % (t2-t1))
    1137                 print("Reading using readline(): %6.3f seconds" % (t3-t2))
    1138                 print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
    1139 
    1140     def testReadOneByOne(self):
    1141         txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
     2241            self.assertEqual(f.read(), data * 2)
     2242            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
     2243
     2244    def test_unreadable(self):
     2245        class UnReadable(self.BytesIO):
     2246            def readable(self):
     2247                return False
     2248        txt = self.TextIOWrapper(UnReadable())
     2249        self.assertRaises(IOError, txt.read)
     2250
     2251    def test_read_one_by_one(self):
     2252        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
    11422253        reads = ""
    11432254        while True:
     
    11462257                break
    11472258            reads += c
    1148         self.assertEquals(reads, "AA\nBB")
     2259        self.assertEqual(reads, "AA\nBB")
     2260
     2261    def test_readlines(self):
     2262        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
     2263        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
     2264        txt.seek(0)
     2265        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
     2266        txt.seek(0)
     2267        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
    11492268
    11502269    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
    1151     def testReadByChunk(self):
     2270    def test_read_by_chunk(self):
    11522271        # make sure "\r\n" straddles 128 char boundary.
    1153         txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
     2272        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
    11542273        reads = ""
    11552274        while True:
     
    11582277                break
    11592278            reads += c
    1160         self.assertEquals(reads, "A"*127+"\nB")
     2279        self.assertEqual(reads, "A"*127+"\nB")
     2280
     2281    def test_writelines(self):
     2282        l = ['ab', 'cd', 'ef']
     2283        buf = self.BytesIO()
     2284        txt = self.TextIOWrapper(buf)
     2285        txt.writelines(l)
     2286        txt.flush()
     2287        self.assertEqual(buf.getvalue(), b'abcdef')
     2288
     2289    def test_writelines_userlist(self):
     2290        l = UserList(['ab', 'cd', 'ef'])
     2291        buf = self.BytesIO()
     2292        txt = self.TextIOWrapper(buf)
     2293        txt.writelines(l)
     2294        txt.flush()
     2295        self.assertEqual(buf.getvalue(), b'abcdef')
     2296
     2297    def test_writelines_error(self):
     2298        txt = self.TextIOWrapper(self.BytesIO())
     2299        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
     2300        self.assertRaises(TypeError, txt.writelines, None)
     2301        self.assertRaises(TypeError, txt.writelines, b'abc')
    11612302
    11622303    def test_issue1395_1(self):
    1163         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
     2304        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
    11642305
    11652306        # read one char at a time
     
    11702311                break
    11712312            reads += c
    1172         self.assertEquals(reads, self.normalized)
     2313        self.assertEqual(reads, self.normalized)
    11732314
    11742315    def test_issue1395_2(self):
    1175         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
     2316        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
    11762317        txt._CHUNK_SIZE = 4
    11772318
     
    11822323                break
    11832324            reads += c
    1184         self.assertEquals(reads, self.normalized)
     2325        self.assertEqual(reads, self.normalized)
    11852326
    11862327    def test_issue1395_3(self):
    1187         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
     2328        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
    11882329        txt._CHUNK_SIZE = 4
    11892330
     
    11932334        reads += txt.readline()
    11942335        reads += txt.readline()
    1195         self.assertEquals(reads, self.normalized)
     2336        self.assertEqual(reads, self.normalized)
    11962337
    11972338    def test_issue1395_4(self):
    1198         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
     2339        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
    11992340        txt._CHUNK_SIZE = 4
    12002341
    12012342        reads = txt.read(4)
    12022343        reads += txt.read()
    1203         self.assertEquals(reads, self.normalized)
     2344        self.assertEqual(reads, self.normalized)
    12042345
    12052346    def test_issue1395_5(self):
    1206         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
     2347        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
    12072348        txt._CHUNK_SIZE = 4
    12082349
     
    12112352        txt.seek(0)
    12122353        txt.seek(pos)
    1213         self.assertEquals(txt.read(4), "BBB\n")
     2354        self.assertEqual(txt.read(4), "BBB\n")
    12142355
    12152356    def test_issue2282(self):
    1216         buffer = io.BytesIO(self.testdata)
    1217         txt = io.TextIOWrapper(buffer, encoding="ascii")
     2357        buffer = self.BytesIO(self.testdata)
     2358        txt = self.TextIOWrapper(buffer, encoding="ascii")
    12182359
    12192360        self.assertEqual(buffer.seekable(), txt.seekable())
    12202361
    1221     def check_newline_decoder_utf8(self, decoder):
     2362    def test_append_bom(self):
     2363        # The BOM is not written again when appending to a non-empty file
     2364        filename = support.TESTFN
     2365        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
     2366            with self.open(filename, 'w', encoding=charset) as f:
     2367                f.write('aaa')
     2368                pos = f.tell()
     2369            with self.open(filename, 'rb') as f:
     2370                self.assertEqual(f.read(), 'aaa'.encode(charset))
     2371
     2372            with self.open(filename, 'a', encoding=charset) as f:
     2373                f.write('xxx')
     2374            with self.open(filename, 'rb') as f:
     2375                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
     2376
     2377    def test_seek_bom(self):
     2378        # Same test, but when seeking manually
     2379        filename = support.TESTFN
     2380        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
     2381            with self.open(filename, 'w', encoding=charset) as f:
     2382                f.write('aaa')
     2383                pos = f.tell()
     2384            with self.open(filename, 'r+', encoding=charset) as f:
     2385                f.seek(pos)
     2386                f.write('zzz')
     2387                f.seek(0)
     2388                f.write('bbb')
     2389            with self.open(filename, 'rb') as f:
     2390                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
     2391
     2392    def test_errors_property(self):
     2393        with self.open(support.TESTFN, "w") as f:
     2394            self.assertEqual(f.errors, "strict")
     2395        with self.open(support.TESTFN, "w", errors="replace") as f:
     2396            self.assertEqual(f.errors, "replace")
     2397
     2398    @unittest.skipUnless(threading, 'Threading required for this test.')
     2399    def test_threads_write(self):
     2400        # Issue6750: concurrent writes could duplicate data
     2401        event = threading.Event()
     2402        with self.open(support.TESTFN, "w", buffering=1) as f:
     2403            def run(n):
     2404                text = "Thread%03d\n" % n
     2405                event.wait()
     2406                f.write(text)
     2407            threads = [threading.Thread(target=lambda n=x: run(n))
     2408                       for x in range(20)]
     2409            for t in threads:
     2410                t.start()
     2411            time.sleep(0.02)
     2412            event.set()
     2413            for t in threads:
     2414                t.join()
     2415        with self.open(support.TESTFN) as f:
     2416            content = f.read()
     2417            for n in range(20):
     2418                self.assertEqual(content.count("Thread%03d\n" % n), 1)
     2419
     2420    def test_flush_error_on_close(self):
     2421        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
     2422        def bad_flush():
     2423            raise IOError()
     2424        txt.flush = bad_flush
     2425        self.assertRaises(IOError, txt.close) # exception not swallowed
     2426        self.assertTrue(txt.closed)
     2427
     2428    def test_multi_close(self):
     2429        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
     2430        txt.close()
     2431        txt.close()
     2432        txt.close()
     2433        self.assertRaises(ValueError, txt.flush)
     2434
     2435    def test_readonly_attributes(self):
     2436        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
     2437        buf = self.BytesIO(self.testdata)
     2438        with self.assertRaises((AttributeError, TypeError)):
     2439            txt.buffer = buf
     2440
     2441    def test_read_nonbytes(self):
     2442        # Issue #17106
     2443        # Crash when underlying read() returns non-bytes
     2444        class NonbytesStream(self.StringIO):
     2445            read1 = self.StringIO.read
     2446        class NonbytesStream(self.StringIO):
     2447            read1 = self.StringIO.read
     2448        t = self.TextIOWrapper(NonbytesStream('a'))
     2449        with self.maybeRaises(TypeError):
     2450            t.read(1)
     2451        t = self.TextIOWrapper(NonbytesStream('a'))
     2452        with self.maybeRaises(TypeError):
     2453            t.readline()
     2454        t = self.TextIOWrapper(NonbytesStream('a'))
     2455        self.assertEqual(t.read(), u'a')
     2456
     2457    def test_illegal_decoder(self):
     2458        # Issue #17106
     2459        # Crash when decoder returns non-string
     2460        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
     2461                               encoding='quopri_codec')
     2462        with self.maybeRaises(TypeError):
     2463            t.read(1)
     2464        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
     2465                               encoding='quopri_codec')
     2466        with self.maybeRaises(TypeError):
     2467            t.readline()
     2468        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
     2469                               encoding='quopri_codec')
     2470        with self.maybeRaises(TypeError):
     2471            t.read()
     2472
     2473
     2474class CTextIOWrapperTest(TextIOWrapperTest):
     2475
     2476    def test_initialization(self):
     2477        r = self.BytesIO(b"\xc3\xa9\n\n")
     2478        b = self.BufferedReader(r, 1000)
     2479        t = self.TextIOWrapper(b)
     2480        self.assertRaises(TypeError, t.__init__, b, newline=42)
     2481        self.assertRaises(ValueError, t.read)
     2482        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
     2483        self.assertRaises(ValueError, t.read)
     2484
     2485    def test_garbage_collection(self):
     2486        # C TextIOWrapper objects are collected, and collecting them flushes
     2487        # all data to disk.
     2488        # The Python version has __del__, so it ends in gc.garbage instead.
     2489        rawio = io.FileIO(support.TESTFN, "wb")
     2490        b = self.BufferedWriter(rawio)
     2491        t = self.TextIOWrapper(b, encoding="ascii")
     2492        t.write("456def")
     2493        t.x = t
     2494        wr = weakref.ref(t)
     2495        del t
     2496        support.gc_collect()
     2497        self.assertTrue(wr() is None, wr)
     2498        with self.open(support.TESTFN, "rb") as f:
     2499            self.assertEqual(f.read(), b"456def")
     2500
     2501    def test_rwpair_cleared_before_textio(self):
     2502        # Issue 13070: TextIOWrapper's finalization would crash when called
     2503        # after the reference to the underlying BufferedRWPair's writer got
     2504        # cleared by the GC.
     2505        for i in range(1000):
     2506            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
     2507            t1 = self.TextIOWrapper(b1, encoding="ascii")
     2508            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
     2509            t2 = self.TextIOWrapper(b2, encoding="ascii")
     2510            # circular references
     2511            t1.buddy = t2
     2512            t2.buddy = t1
     2513        support.gc_collect()
     2514
     2515    maybeRaises = unittest.TestCase.assertRaises
     2516
     2517
     2518class PyTextIOWrapperTest(TextIOWrapperTest):
     2519    @contextlib.contextmanager
     2520    def maybeRaises(self, *args, **kwds):
     2521        yield
     2522
     2523
     2524class IncrementalNewlineDecoderTest(unittest.TestCase):
     2525
     2526    def check_newline_decoding_utf8(self, decoder):
    12222527        # UTF-8 specific tests for a newline decoder
    12232528        def _check_decode(b, s, **kwargs):
    12242529            # We exercise getstate() / setstate() as well as decode()
    12252530            state = decoder.getstate()
    1226             self.assertEquals(decoder.decode(b, **kwargs), s)
     2531            self.assertEqual(decoder.decode(b, **kwargs), s)
    12272532            decoder.setstate(state)
    1228             self.assertEquals(decoder.decode(b, **kwargs), s)
     2533            self.assertEqual(decoder.decode(b, **kwargs), s)
    12292534
    12302535        _check_decode(b'\xe8\xa2\x88', "\u8888")
     
    12612566        _check_decode(b'\n', "\n")
    12622567
    1263     def check_newline_decoder(self, decoder, encoding):
     2568    def check_newline_decoding(self, decoder, encoding):
    12642569        result = []
    1265         encoder = codecs.getincrementalencoder(encoding)()
    1266         def _decode_bytewise(s):
    1267             for b in encoder.encode(s):
    1268                 result.append(decoder.decode(b))
    1269         self.assertEquals(decoder.newlines, None)
     2570        if encoding is not None:
     2571            encoder = codecs.getincrementalencoder(encoding)()
     2572            def _decode_bytewise(s):
     2573                # Decode one byte at a time
     2574                for b in encoder.encode(s):
     2575                    result.append(decoder.decode(b))
     2576        else:
     2577            encoder = None
     2578            def _decode_bytewise(s):
     2579                # Decode one char at a time
     2580                for c in s:
     2581                    result.append(decoder.decode(c))
     2582        self.assertEqual(decoder.newlines, None)
    12702583        _decode_bytewise("abc\n\r")
    1271         self.assertEquals(decoder.newlines, '\n')
     2584        self.assertEqual(decoder.newlines, '\n')
    12722585        _decode_bytewise("\nabc")
    1273         self.assertEquals(decoder.newlines, ('\n', '\r\n'))
     2586        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
    12742587        _decode_bytewise("abc\r")
    1275         self.assertEquals(decoder.newlines, ('\n', '\r\n'))
     2588        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
    12762589        _decode_bytewise("abc")
    1277         self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
     2590        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
    12782591        _decode_bytewise("abc\r")
    1279         self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
     2592        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
    12802593        decoder.reset()
    1281         self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
    1282         self.assertEquals(decoder.newlines, None)
     2594        input = "abc"
     2595        if encoder is not None:
     2596            encoder.reset()
     2597            input = encoder.encode(input)
     2598        self.assertEqual(decoder.decode(input), "abc")
     2599        self.assertEqual(decoder.newlines, None)
    12832600
    12842601    def test_newline_decoder(self):
    12852602        encodings = (
    1286             'utf-8', 'latin-1',
     2603            # None meaning the IncrementalNewlineDecoder takes unicode input
     2604            # rather than bytes input
     2605            None, 'utf-8', 'latin-1',
    12872606            'utf-16', 'utf-16-le', 'utf-16-be',
    12882607            'utf-32', 'utf-32-le', 'utf-32-be',
    12892608        )
    12902609        for enc in encodings:
    1291             decoder = codecs.getincrementaldecoder(enc)()
    1292             decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
    1293             self.check_newline_decoder(decoder, enc)
     2610            decoder = enc and codecs.getincrementaldecoder(enc)()
     2611            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
     2612            self.check_newline_decoding(decoder, enc)
    12942613        decoder = codecs.getincrementaldecoder("utf-8")()
    1295         decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
    1296         self.check_newline_decoder_utf8(decoder)
     2614        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
     2615        self.check_newline_decoding_utf8(decoder)
     2616
     2617    def test_newline_bytes(self):
     2618        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
     2619        def _check(dec):
     2620            self.assertEqual(dec.newlines, None)
     2621            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
     2622            self.assertEqual(dec.newlines, None)
     2623            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
     2624            self.assertEqual(dec.newlines, None)
     2625        dec = self.IncrementalNewlineDecoder(None, translate=False)
     2626        _check(dec)
     2627        dec = self.IncrementalNewlineDecoder(None, translate=True)
     2628        _check(dec)
     2629
     2630class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
     2631    pass
     2632
     2633class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
     2634    pass
    12972635
    12982636
     
    13022640
    13032641    def tearDown(self):
    1304         test_support.unlink(test_support.TESTFN)
    1305 
    1306     def testImport__all__(self):
    1307         for name in io.__all__:
    1308             obj = getattr(io, name, None)
    1309             self.assert_(obj is not None, name)
     2642        support.unlink(support.TESTFN)
     2643
     2644    def test___all__(self):
     2645        for name in self.io.__all__:
     2646            obj = getattr(self.io, name, None)
     2647            self.assertTrue(obj is not None, name)
    13102648            if name == "open":
    13112649                continue
    1312             elif "error" in name.lower():
    1313                 self.assert_(issubclass(obj, Exception), name)
    1314             else:
    1315                 self.assert_(issubclass(obj, io.IOBase))
    1316 
     2650            elif "error" in name.lower() or name == "UnsupportedOperation":
     2651                self.assertTrue(issubclass(obj, Exception), name)
     2652            elif not name.startswith("SEEK_"):
     2653                self.assertTrue(issubclass(obj, self.IOBase))
    13172654
    13182655    def test_attributes(self):
    1319         f = io.open(test_support.TESTFN, "wb", buffering=0)
    1320         self.assertEquals(f.mode, "wb")
     2656        f = self.open(support.TESTFN, "wb", buffering=0)
     2657        self.assertEqual(f.mode, "wb")
    13212658        f.close()
    13222659
    1323         f = io.open(test_support.TESTFN, "U")
    1324         self.assertEquals(f.name,            test_support.TESTFN)
    1325         self.assertEquals(f.buffer.name,     test_support.TESTFN)
    1326         self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
    1327         self.assertEquals(f.mode,            "U")
    1328         self.assertEquals(f.buffer.mode,     "rb")
    1329         self.assertEquals(f.buffer.raw.mode, "rb")
     2660        f = self.open(support.TESTFN, "U")
     2661        self.assertEqual(f.name,            support.TESTFN)
     2662        self.assertEqual(f.buffer.name,     support.TESTFN)
     2663        self.assertEqual(f.buffer.raw.name, support.TESTFN)
     2664        self.assertEqual(f.mode,            "U")
     2665        self.assertEqual(f.buffer.mode,     "rb")
     2666        self.assertEqual(f.buffer.raw.mode, "rb")
    13302667        f.close()
    13312668
    1332         f = io.open(test_support.TESTFN, "w+")
    1333         self.assertEquals(f.mode,            "w+")
    1334         self.assertEquals(f.buffer.mode,     "rb+") # Does it really matter?
    1335         self.assertEquals(f.buffer.raw.mode, "rb+")
    1336 
    1337         g = io.open(f.fileno(), "wb", closefd=False)
    1338         self.assertEquals(g.mode,     "wb")
    1339         self.assertEquals(g.raw.mode, "wb")
    1340         self.assertEquals(g.name,     f.fileno())
    1341         self.assertEquals(g.raw.name, f.fileno())
     2669        f = self.open(support.TESTFN, "w+")
     2670        self.assertEqual(f.mode,            "w+")
     2671        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
     2672        self.assertEqual(f.buffer.raw.mode, "rb+")
     2673
     2674        g = self.open(f.fileno(), "wb", closefd=False)
     2675        self.assertEqual(g.mode,     "wb")
     2676        self.assertEqual(g.raw.mode, "wb")
     2677        self.assertEqual(g.name,     f.fileno())
     2678        self.assertEqual(g.raw.name, f.fileno())
    13422679        f.close()
    13432680        g.close()
    13442681
     2682    def test_io_after_close(self):
     2683        for kwargs in [
     2684                {"mode": "w"},
     2685                {"mode": "wb"},
     2686                {"mode": "w", "buffering": 1},
     2687                {"mode": "w", "buffering": 2},
     2688                {"mode": "wb", "buffering": 0},
     2689                {"mode": "r"},
     2690                {"mode": "rb"},
     2691                {"mode": "r", "buffering": 1},
     2692                {"mode": "r", "buffering": 2},
     2693                {"mode": "rb", "buffering": 0},
     2694                {"mode": "w+"},
     2695                {"mode": "w+b"},
     2696                {"mode": "w+", "buffering": 1},
     2697                {"mode": "w+", "buffering": 2},
     2698                {"mode": "w+b", "buffering": 0},
     2699            ]:
     2700            f = self.open(support.TESTFN, **kwargs)
     2701            f.close()
     2702            self.assertRaises(ValueError, f.flush)
     2703            self.assertRaises(ValueError, f.fileno)
     2704            self.assertRaises(ValueError, f.isatty)
     2705            self.assertRaises(ValueError, f.__iter__)
     2706            if hasattr(f, "peek"):
     2707                self.assertRaises(ValueError, f.peek, 1)
     2708            self.assertRaises(ValueError, f.read)
     2709            if hasattr(f, "read1"):
     2710                self.assertRaises(ValueError, f.read1, 1024)
     2711            if hasattr(f, "readall"):
     2712                self.assertRaises(ValueError, f.readall)
     2713            if hasattr(f, "readinto"):
     2714                self.assertRaises(ValueError, f.readinto, bytearray(1024))
     2715            self.assertRaises(ValueError, f.readline)
     2716            self.assertRaises(ValueError, f.readlines)
     2717            self.assertRaises(ValueError, f.seek, 0)
     2718            self.assertRaises(ValueError, f.tell)
     2719            self.assertRaises(ValueError, f.truncate)
     2720            self.assertRaises(ValueError, f.write,
     2721                              b"" if "b" in kwargs['mode'] else "")
     2722            self.assertRaises(ValueError, f.writelines, [])
     2723            self.assertRaises(ValueError, next, f)
     2724
     2725    def test_blockingioerror(self):
     2726        # Various BlockingIOError issues
     2727        self.assertRaises(TypeError, self.BlockingIOError)
     2728        self.assertRaises(TypeError, self.BlockingIOError, 1)
     2729        self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
     2730        self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
     2731        b = self.BlockingIOError(1, "")
     2732        self.assertEqual(b.characters_written, 0)
     2733        class C(unicode):
     2734            pass
     2735        c = C("")
     2736        b = self.BlockingIOError(1, c)
     2737        c.b = b
     2738        b.c = c
     2739        wr = weakref.ref(c)
     2740        del c, b
     2741        support.gc_collect()
     2742        self.assertTrue(wr() is None, wr)
     2743
     2744    def test_abcs(self):
     2745        # Test the visible base classes are ABCs.
     2746        self.assertIsInstance(self.IOBase, abc.ABCMeta)
     2747        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
     2748        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
     2749        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
     2750
     2751    def _check_abc_inheritance(self, abcmodule):
     2752        with self.open(support.TESTFN, "wb", buffering=0) as f:
     2753            self.assertIsInstance(f, abcmodule.IOBase)
     2754            self.assertIsInstance(f, abcmodule.RawIOBase)
     2755            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
     2756            self.assertNotIsInstance(f, abcmodule.TextIOBase)
     2757        with self.open(support.TESTFN, "wb") as f:
     2758            self.assertIsInstance(f, abcmodule.IOBase)
     2759            self.assertNotIsInstance(f, abcmodule.RawIOBase)
     2760            self.assertIsInstance(f, abcmodule.BufferedIOBase)
     2761            self.assertNotIsInstance(f, abcmodule.TextIOBase)
     2762        with self.open(support.TESTFN, "w") as f:
     2763            self.assertIsInstance(f, abcmodule.IOBase)
     2764            self.assertNotIsInstance(f, abcmodule.RawIOBase)
     2765            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
     2766            self.assertIsInstance(f, abcmodule.TextIOBase)
     2767
     2768    def test_abc_inheritance(self):
     2769        # Test implementations inherit from their respective ABCs
     2770        self._check_abc_inheritance(self)
     2771
     2772    def test_abc_inheritance_official(self):
     2773        # Test implementations inherit from the official ABCs of the
     2774        # baseline "io" module.
     2775        self._check_abc_inheritance(io)
     2776
     2777    @unittest.skipUnless(fcntl, 'fcntl required for this test')
     2778    def test_nonblock_pipe_write_bigbuf(self):
     2779        self._test_nonblock_pipe_write(16*1024)
     2780
     2781    @unittest.skipUnless(fcntl, 'fcntl required for this test')
     2782    def test_nonblock_pipe_write_smallbuf(self):
     2783        self._test_nonblock_pipe_write(1024)
     2784
     2785    def _set_non_blocking(self, fd):
     2786        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
     2787        self.assertNotEqual(flags, -1)
     2788        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
     2789        self.assertEqual(res, 0)
     2790
     2791    def _test_nonblock_pipe_write(self, bufsize):
     2792        sent = []
     2793        received = []
     2794        r, w = os.pipe()
     2795        self._set_non_blocking(r)
     2796        self._set_non_blocking(w)
     2797
     2798        # To exercise all code paths in the C implementation we need
     2799        # to play with buffer sizes.  For instance, if we choose a
     2800        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
     2801        # then we will never get a partial write of the buffer.
     2802        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
     2803        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
     2804
     2805        with rf, wf:
     2806            for N in 9999, 73, 7574:
     2807                try:
     2808                    i = 0
     2809                    while True:
     2810                        msg = bytes([i % 26 + 97]) * N
     2811                        sent.append(msg)
     2812                        wf.write(msg)
     2813                        i += 1
     2814
     2815                except self.BlockingIOError as e:
     2816                    self.assertEqual(e.args[0], errno.EAGAIN)
     2817                    sent[-1] = sent[-1][:e.characters_written]
     2818                    received.append(rf.read())
     2819                    msg = b'BLOCKED'
     2820                    wf.write(msg)
     2821                    sent.append(msg)
     2822
     2823            while True:
     2824                try:
     2825                    wf.flush()
     2826                    break
     2827                except self.BlockingIOError as e:
     2828                    self.assertEqual(e.args[0], errno.EAGAIN)
     2829                    self.assertEqual(e.characters_written, 0)
     2830                    received.append(rf.read())
     2831
     2832            received += iter(rf.read, None)
     2833
     2834        sent, received = b''.join(sent), b''.join(received)
     2835        self.assertTrue(sent == received)
     2836        self.assertTrue(wf.closed)
     2837        self.assertTrue(rf.closed)
     2838
     2839class CMiscIOTest(MiscIOTest):
     2840    io = io
     2841
     2842class PyMiscIOTest(MiscIOTest):
     2843    io = pyio
     2844
     2845
     2846@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
     2847class SignalsTest(unittest.TestCase):
     2848
     2849    def setUp(self):
     2850        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
     2851
     2852    def tearDown(self):
     2853        signal.signal(signal.SIGALRM, self.oldalrm)
     2854
     2855    def alarm_interrupt(self, sig, frame):
     2856        1 // 0
     2857
     2858    @unittest.skipUnless(threading, 'Threading required for this test.')
     2859    @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
     2860                     'issue #12429: skip test on FreeBSD <= 7')
     2861    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
     2862        """Check that a partial write, when it gets interrupted, properly
     2863        invokes the signal handler, and bubbles up the exception raised
     2864        in the latter."""
     2865        read_results = []
     2866        def _read():
     2867            s = os.read(r, 1)
     2868            read_results.append(s)
     2869        t = threading.Thread(target=_read)
     2870        t.daemon = True
     2871        r, w = os.pipe()
     2872        try:
     2873            wio = self.io.open(w, **fdopen_kwargs)
     2874            t.start()
     2875            signal.alarm(1)
     2876            # Fill the pipe enough that the write will be blocking.
     2877            # It will be interrupted by the timer armed above.  Since the
     2878            # other thread has read one byte, the low-level write will
     2879            # return with a successful (partial) result rather than an EINTR.
     2880            # The buffered IO layer must check for pending signal
     2881            # handlers, which in this case will invoke alarm_interrupt().
     2882            self.assertRaises(ZeroDivisionError,
     2883                        wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
     2884            t.join()
     2885            # We got one byte, get another one and check that it isn't a
     2886            # repeat of the first one.
     2887            read_results.append(os.read(r, 1))
     2888            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
     2889        finally:
     2890            os.close(w)
     2891            os.close(r)
     2892            # This is deliberate. If we didn't close the file descriptor
     2893            # before closing wio, wio would try to flush its internal
     2894            # buffer, and block again.
     2895            try:
     2896                wio.close()
     2897            except IOError as e:
     2898                if e.errno != errno.EBADF:
     2899                    raise
     2900
     2901    def test_interrupted_write_unbuffered(self):
     2902        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
     2903
     2904    def test_interrupted_write_buffered(self):
     2905        self.check_interrupted_write(b"xy", b"xy", mode="wb")
     2906
     2907    def test_interrupted_write_text(self):
     2908        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
     2909
     2910    def check_reentrant_write(self, data, **fdopen_kwargs):
     2911        def on_alarm(*args):
     2912            # Will be called reentrantly from the same thread
     2913            wio.write(data)
     2914            1//0
     2915        signal.signal(signal.SIGALRM, on_alarm)
     2916        r, w = os.pipe()
     2917        wio = self.io.open(w, **fdopen_kwargs)
     2918        try:
     2919            signal.alarm(1)
     2920            # Either the reentrant call to wio.write() fails with RuntimeError,
     2921            # or the signal handler raises ZeroDivisionError.
     2922            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
     2923                while 1:
     2924                    for i in range(100):
     2925                        wio.write(data)
     2926                        wio.flush()
     2927                    # Make sure the buffer doesn't fill up and block further writes
     2928                    os.read(r, len(data) * 100)
     2929            exc = cm.exception
     2930            if isinstance(exc, RuntimeError):
     2931                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
     2932        finally:
     2933            wio.close()
     2934            os.close(r)
     2935
     2936    def test_reentrant_write_buffered(self):
     2937        self.check_reentrant_write(b"xy", mode="wb")
     2938
     2939    def test_reentrant_write_text(self):
     2940        self.check_reentrant_write("xy", mode="w", encoding="ascii")
     2941
     2942    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
     2943        """Check that a buffered read, when it gets interrupted (either
     2944        returning a partial result or EINTR), properly invokes the signal
     2945        handler and retries if the latter returned successfully."""
     2946        r, w = os.pipe()
     2947        fdopen_kwargs["closefd"] = False
     2948        def alarm_handler(sig, frame):
     2949            os.write(w, b"bar")
     2950        signal.signal(signal.SIGALRM, alarm_handler)
     2951        try:
     2952            rio = self.io.open(r, **fdopen_kwargs)
     2953            os.write(w, b"foo")
     2954            signal.alarm(1)
     2955            # Expected behaviour:
     2956            # - first raw read() returns partial b"foo"
     2957            # - second raw read() returns EINTR
     2958            # - third raw read() returns b"bar"
     2959            self.assertEqual(decode(rio.read(6)), "foobar")
     2960        finally:
     2961            rio.close()
     2962            os.close(w)
     2963            os.close(r)
     2964
     2965    def test_interrupterd_read_retry_buffered(self):
     2966        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
     2967                                          mode="rb")
     2968
     2969    def test_interrupterd_read_retry_text(self):
     2970        self.check_interrupted_read_retry(lambda x: x,
     2971                                          mode="r")
     2972
     2973    @unittest.skipUnless(threading, 'Threading required for this test.')
     2974    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
     2975        """Check that a buffered write, when it gets interrupted (either
     2976        returning a partial result or EINTR), properly invokes the signal
     2977        handler and retries if the latter returned successfully."""
     2978        select = support.import_module("select")
     2979        # A quantity that exceeds the buffer size of an anonymous pipe's
     2980        # write end.
     2981        N = support.PIPE_MAX_SIZE
     2982        r, w = os.pipe()
     2983        fdopen_kwargs["closefd"] = False
     2984        # We need a separate thread to read from the pipe and allow the
     2985        # write() to finish.  This thread is started after the SIGALRM is
     2986        # received (forcing a first EINTR in write()).
     2987        read_results = []
     2988        write_finished = False
     2989        def _read():
     2990            while not write_finished:
     2991                while r in select.select([r], [], [], 1.0)[0]:
     2992                    s = os.read(r, 1024)
     2993                    read_results.append(s)
     2994        t = threading.Thread(target=_read)
     2995        t.daemon = True
     2996        def alarm1(sig, frame):
     2997            signal.signal(signal.SIGALRM, alarm2)
     2998            signal.alarm(1)
     2999        def alarm2(sig, frame):
     3000            t.start()
     3001        signal.signal(signal.SIGALRM, alarm1)
     3002        try:
     3003            wio = self.io.open(w, **fdopen_kwargs)
     3004            signal.alarm(1)
     3005            # Expected behaviour:
     3006            # - first raw write() is partial (because of the limited pipe buffer
     3007            #   and the first alarm)
     3008            # - second raw write() returns EINTR (because of the second alarm)
     3009            # - subsequent write()s are successful (either partial or complete)
     3010            self.assertEqual(N, wio.write(item * N))
     3011            wio.flush()
     3012            write_finished = True
     3013            t.join()
     3014            self.assertEqual(N, sum(len(x) for x in read_results))
     3015        finally:
     3016            write_finished = True
     3017            os.close(w)
     3018            os.close(r)
     3019            # This is deliberate. If we didn't close the file descriptor
     3020            # before closing wio, wio would try to flush its internal
     3021            # buffer, and could block (in case of failure).
     3022            try:
     3023                wio.close()
     3024            except IOError as e:
     3025                if e.errno != errno.EBADF:
     3026                    raise
     3027
     3028    def test_interrupterd_write_retry_buffered(self):
     3029        self.check_interrupted_write_retry(b"x", mode="wb")
     3030
     3031    def test_interrupterd_write_retry_text(self):
     3032        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
     3033
     3034
     3035class CSignalsTest(SignalsTest):
     3036    io = io
     3037
     3038class PySignalsTest(SignalsTest):
     3039    io = pyio
     3040
     3041    # Handling reentrancy issues would slow down _pyio even more, so the
     3042    # tests are disabled.
     3043    test_reentrant_write_buffered = None
     3044    test_reentrant_write_text = None
     3045
    13453046
    13463047def test_main():
    1347     test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
    1348                               BufferedReaderTest, BufferedWriterTest,
    1349                               BufferedRWPairTest, BufferedRandomTest,
    1350                               StatefulIncrementalDecoderTest,
    1351                               TextIOWrapperTest, MiscIOTest)
     3048    tests = (CIOTest, PyIOTest,
     3049             CBufferedReaderTest, PyBufferedReaderTest,
     3050             CBufferedWriterTest, PyBufferedWriterTest,
     3051             CBufferedRWPairTest, PyBufferedRWPairTest,
     3052             CBufferedRandomTest, PyBufferedRandomTest,
     3053             StatefulIncrementalDecoderTest,
     3054             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
     3055             CTextIOWrapperTest, PyTextIOWrapperTest,
     3056             CMiscIOTest, PyMiscIOTest,
     3057             CSignalsTest, PySignalsTest,
     3058             )
     3059
     3060    # Put the namespaces of the IO module we are testing and some useful mock
     3061    # classes in the __dict__ of each test.
     3062    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
     3063             MockNonBlockWriterIO, MockRawIOWithoutRead)
     3064    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
     3065    c_io_ns = dict((name, getattr(io, name)) for name in all_members)
     3066    py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
     3067    globs = globals()
     3068    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
     3069    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
     3070    # Avoid turning open into a bound method.
     3071    py_io_ns["open"] = pyio.OpenWrapper
     3072    for test in tests:
     3073        if test.__name__.startswith("C"):
     3074            for name, obj in c_io_ns.items():
     3075                setattr(test, name, obj)
     3076        elif test.__name__.startswith("Py"):
     3077            for name, obj in py_io_ns.items():
     3078                setattr(test, name, obj)
     3079
     3080    support.run_unittest(*tests)
    13523081
    13533082if __name__ == "__main__":
    1354     unittest.main()
     3083    test_main()
Note: See TracChangeset for help on using the changeset viewer.