Changeset 391 for python/trunk/Lib/test/test_io.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_io.py
r2 r391 1 """Unit tests for io.py.""" 1 """Unit tests for the io module.""" 2 3 # Tests of io are scattered over the test suite: 4 # * test_bufio - tests file buffering 5 # * test_memoryio - tests BytesIO and StringIO 6 # * test_fileio - tests FileIO 7 # * test_file - tests the file interface 8 # * test_io - tests everything else in the io module 9 # * test_univnewlines - tests universal newline support 10 # * test_largefile - tests operations on a file greater than 2**32 bytes 11 # (only enabled with -ulargefile) 12 13 ################################################################################ 14 # ATTENTION TEST WRITERS!!! 15 ################################################################################ 16 # When writing tests for io, it's important to test both the C and Python 17 # implementations. This is usually done by writing a base test that refers to 18 # the type it is testing as a attribute. Then it provides custom subclasses to 19 # test both implementations. This file has lots of examples. 20 ################################################################################ 21 2 22 from __future__ import print_function 3 23 from __future__ import unicode_literals … … 7 27 import time 8 28 import array 9 import threading10 29 import random 11 30 import unittest 12 from itertools import chain, cycle 13 from test import test_support 31 import weakref 32 import abc 33 import signal 34 import errno 35 from itertools import cycle, count 36 from collections import deque 37 from UserList import UserList 38 from test import test_support as support 39 import contextlib 14 40 15 41 import codecs 16 import io # The module under test 17 18 19 class MockRawIO(io.RawIOBase): 42 import io # C implementation of io 43 import _pyio as pyio # Python implementation of io 44 try: 45 import threading 46 except ImportError: 47 threading = None 48 try: 49 import fcntl 50 except ImportError: 51 fcntl = None 52 53 __metaclass__ = type 54 bytes = support.py3k_bytes 55 56 def _default_chunk_size(): 57 """Get the default TextIOWrapper chunk size""" 58 with io.open(__file__, "r", encoding="latin1") as f: 59 return f._CHUNK_SIZE 60 61 62 class MockRawIOWithoutRead: 63 """A RawIO implementation without read(), so as to exercise the default 64 RawIO.read() which calls readinto().""" 20 65 21 66 def __init__(self, read_stack=()): 22 67 self._read_stack = list(read_stack) 23 68 self._write_stack = [] 69 self._reads = 0 70 self._extraneous_reads = 0 71 72 def write(self, b): 73 self._write_stack.append(bytes(b)) 74 return len(b) 75 76 def writable(self): 77 return True 78 79 def fileno(self): 80 return 42 81 82 def readable(self): 83 return True 84 85 def seekable(self): 86 return True 87 88 def seek(self, pos, whence): 89 return 0 # wrong but we gotta return something 90 91 def tell(self): 92 return 0 # same comment as above 93 94 def readinto(self, buf): 95 self._reads += 1 96 max_len = len(buf) 97 try: 98 data = self._read_stack[0] 99 except IndexError: 100 self._extraneous_reads += 1 101 return 0 102 if data is None: 103 del self._read_stack[0] 104 return None 105 n = len(data) 106 if len(data) <= max_len: 107 del self._read_stack[0] 108 buf[:n] = data 109 return n 110 else: 111 buf[:] = data[:max_len] 112 self._read_stack[0] = data[max_len:] 113 return max_len 114 115 def truncate(self, pos=None): 116 return pos 117 118 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 119 pass 120 121 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 122 pass 123 124 125 class MockRawIO(MockRawIOWithoutRead): 24 126 25 127 def read(self, n=None): 128 self._reads += 1 26 129 try: 27 130 return self._read_stack.pop(0) 28 131 except: 132 self._extraneous_reads += 1 29 133 return b"" 30 134 135 class CMockRawIO(MockRawIO, io.RawIOBase): 136 pass 137 138 class PyMockRawIO(MockRawIO, pyio.RawIOBase): 139 pass 140 141 142 class MisbehavedRawIO(MockRawIO): 31 143 def write(self, b): 32 self._write_stack.append(b[:]) 33 return len(b) 144 return MockRawIO.write(self, b) * 2 145 146 def read(self, n=None): 147 return MockRawIO.read(self, n) * 2 148 149 def seek(self, pos, whence): 150 return -123 151 152 def tell(self): 153 return -456 154 155 def readinto(self, buf): 156 MockRawIO.readinto(self, buf) 157 return len(buf) * 5 158 159 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 160 pass 161 162 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 163 pass 164 165 166 class CloseFailureIO(MockRawIO): 167 closed = 0 168 169 def close(self): 170 if not self.closed: 171 self.closed = 1 172 raise IOError 173 174 class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 175 pass 176 177 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 178 pass 179 180 181 class MockFileIO: 182 183 def __init__(self, data): 184 self.read_history = [] 185 super(MockFileIO, self).__init__(data) 186 187 def read(self, n=None): 188 res = super(MockFileIO, self).read(n) 189 self.read_history.append(None if res is None else len(res)) 190 return res 191 192 def readinto(self, b): 193 res = super(MockFileIO, self).readinto(b) 194 self.read_history.append(res) 195 return res 196 197 class CMockFileIO(MockFileIO, io.BytesIO): 198 pass 199 200 class PyMockFileIO(MockFileIO, pyio.BytesIO): 201 pass 202 203 204 class MockNonBlockWriterIO: 205 206 def __init__(self): 207 self._write_stack = [] 208 self._blocker_char = None 209 210 def pop_written(self): 211 s = b"".join(self._write_stack) 212 self._write_stack[:] = [] 213 return s 214 215 def block_on(self, char): 216 """Block when a given char is encountered.""" 217 self._blocker_char = char 218 219 def readable(self): 220 return True 221 222 def seekable(self): 223 return True 34 224 35 225 def writable(self): 36 226 return True 37 227 38 def fileno(self):39 return 4240 41 def readable(self):42 return True43 44 def seekable(self):45 return True46 47 def seek(self, pos, whence):48 pass49 50 def tell(self):51 return 4252 53 54 class MockFileIO(io.BytesIO):55 56 def __init__(self, data):57 self.read_history = []58 io.BytesIO.__init__(self, data)59 60 def read(self, n=None):61 res = io.BytesIO.read(self, n)62 self.read_history.append(None if res is None else len(res))63 return res64 65 66 class MockNonBlockWriterIO(io.RawIOBase):67 68 def __init__(self, blocking_script):69 self._blocking_script = list(blocking_script)70 self._write_stack = []71 72 228 def write(self, b): 73 self._write_stack.append(b[:]) 74 n = self._blocking_script.pop(0) 75 if (n < 0): 76 raise io.BlockingIOError(0, "test blocking", -n) 77 else: 78 return n 79 80 def writable(self): 81 return True 229 b = bytes(b) 230 n = -1 231 if self._blocker_char: 232 try: 233 n = b.index(self._blocker_char) 234 except ValueError: 235 pass 236 else: 237 if n > 0: 238 # write data up to the first blocker 239 self._write_stack.append(b[:n]) 240 return n 241 else: 242 # cancel blocker and indicate would block 243 self._blocker_char = None 244 return None 245 self._write_stack.append(b) 246 return len(b) 247 248 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 249 BlockingIOError = io.BlockingIOError 250 251 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 252 BlockingIOError = pyio.BlockingIOError 82 253 83 254 84 255 class IOTest(unittest.TestCase): 85 256 257 def setUp(self): 258 support.unlink(support.TESTFN) 259 86 260 def tearDown(self): 87 test_support.unlink(test_support.TESTFN)261 support.unlink(support.TESTFN) 88 262 89 263 def write_ops(self, f): 90 91 264 self.assertEqual(f.write(b"blah."), 5) 92 265 f.truncate(0) … … 157 330 self.assertEqual(f.read(2), b"x") 158 331 332 def test_invalid_operations(self): 333 # Try writing on a file opened in read mode and vice-versa. 334 for mode in ("w", "wb"): 335 with self.open(support.TESTFN, mode) as fp: 336 self.assertRaises(IOError, fp.read) 337 self.assertRaises(IOError, fp.readline) 338 with self.open(support.TESTFN, "rb") as fp: 339 self.assertRaises(IOError, fp.write, b"blah") 340 self.assertRaises(IOError, fp.writelines, [b"blah\n"]) 341 with self.open(support.TESTFN, "r") as fp: 342 self.assertRaises(IOError, fp.write, "blah") 343 self.assertRaises(IOError, fp.writelines, ["blah\n"]) 344 159 345 def test_raw_file_io(self): 160 f = io.open(test_support.TESTFN, "wb", buffering=0) 161 self.assertEqual(f.readable(), False) 162 self.assertEqual(f.writable(), True) 163 self.assertEqual(f.seekable(), True) 164 self.write_ops(f) 165 f.close() 166 f = io.open(test_support.TESTFN, "rb", buffering=0) 167 self.assertEqual(f.readable(), True) 168 self.assertEqual(f.writable(), False) 169 self.assertEqual(f.seekable(), True) 170 self.read_ops(f) 171 f.close() 346 with self.open(support.TESTFN, "wb", buffering=0) as f: 347 self.assertEqual(f.readable(), False) 348 self.assertEqual(f.writable(), True) 349 self.assertEqual(f.seekable(), True) 350 self.write_ops(f) 351 with self.open(support.TESTFN, "rb", buffering=0) as f: 352 self.assertEqual(f.readable(), True) 353 self.assertEqual(f.writable(), False) 354 self.assertEqual(f.seekable(), True) 355 self.read_ops(f) 172 356 173 357 def test_buffered_file_io(self): 174 f = io.open(test_support.TESTFN, "wb") 175 self.assertEqual(f.readable(), False) 176 self.assertEqual(f.writable(), True) 177 self.assertEqual(f.seekable(), True) 178 self.write_ops(f) 179 f.close() 180 f = io.open(test_support.TESTFN, "rb") 181 self.assertEqual(f.readable(), True) 182 self.assertEqual(f.writable(), False) 183 self.assertEqual(f.seekable(), True) 184 self.read_ops(f, True) 185 f.close() 358 with self.open(support.TESTFN, "wb") as f: 359 self.assertEqual(f.readable(), False) 360 self.assertEqual(f.writable(), True) 361 self.assertEqual(f.seekable(), True) 362 self.write_ops(f) 363 with self.open(support.TESTFN, "rb") as f: 364 self.assertEqual(f.readable(), True) 365 self.assertEqual(f.writable(), False) 366 self.assertEqual(f.seekable(), True) 367 self.read_ops(f, True) 186 368 187 369 def test_readline(self): 188 f = io.open(test_support.TESTFN, "wb") 189 f.write(b"abc\ndef\nxyzzy\nfoo") 190 f.close() 191 f = io.open(test_support.TESTFN, "rb") 192 self.assertEqual(f.readline(), b"abc\n") 193 self.assertEqual(f.readline(10), b"def\n") 194 self.assertEqual(f.readline(2), b"xy") 195 self.assertEqual(f.readline(4), b"zzy\n") 196 self.assertEqual(f.readline(), b"foo") 197 f.close() 370 with self.open(support.TESTFN, "wb") as f: 371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 372 with self.open(support.TESTFN, "rb") as f: 373 self.assertEqual(f.readline(), b"abc\n") 374 self.assertEqual(f.readline(10), b"def\n") 375 self.assertEqual(f.readline(2), b"xy") 376 self.assertEqual(f.readline(4), b"zzy\n") 377 self.assertEqual(f.readline(), b"foo\x00bar\n") 378 self.assertEqual(f.readline(None), b"another line") 379 self.assertRaises(TypeError, f.readline, 5.3) 380 with self.open(support.TESTFN, "r") as f: 381 self.assertRaises(TypeError, f.readline, 5.3) 198 382 199 383 def test_raw_bytes_io(self): 200 f = io.BytesIO()384 f = self.BytesIO() 201 385 self.write_ops(f) 202 386 data = f.getvalue() 203 387 self.assertEqual(data, b"hello world\n") 204 f = io.BytesIO(data)388 f = self.BytesIO(data) 205 389 self.read_ops(f, True) 206 390 … … 209 393 # a long time to build the >2GB file and takes >2GB of disk space 210 394 # therefore the resource must be enabled to run this test. 211 if sys.platform[:3] in ('win', 'os2')or sys.platform == 'darwin':212 if not test_support.is_resource_enabled("largefile"):395 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 396 if not support.is_resource_enabled("largefile"): 213 397 print("\nTesting large file ops skipped on %s." % sys.platform, 214 398 file=sys.stderr) … … 218 402 file=sys.stderr) 219 403 return 220 f = io.open(test_support.TESTFN, "w+b", 0) 221 self.large_file_ops(f) 222 f.close() 223 f = io.open(test_support.TESTFN, "w+b") 224 self.large_file_ops(f) 225 f.close() 404 with self.open(support.TESTFN, "w+b", 0) as f: 405 self.large_file_ops(f) 406 with self.open(support.TESTFN, "w+b") as f: 407 self.large_file_ops(f) 226 408 227 409 def test_with_open(self): 228 410 for bufsize in (0, 1, 100): 229 411 f = None 230 with open(test_support.TESTFN, "wb", bufsize) as f:412 with self.open(support.TESTFN, "wb", bufsize) as f: 231 413 f.write(b"xxx") 232 414 self.assertEqual(f.closed, True) 233 415 f = None 234 416 try: 235 with open(test_support.TESTFN, "wb", bufsize) as f:236 1 /0417 with self.open(support.TESTFN, "wb", bufsize) as f: 418 1 // 0 237 419 except ZeroDivisionError: 238 420 self.assertEqual(f.closed, True) 239 421 else: 240 self.fail("1 /0 didn't raise an exception")422 self.fail("1 // 0 didn't raise an exception") 241 423 242 424 # issue 5008 243 425 def test_append_mode_tell(self): 244 with io.open(test_support.TESTFN, "wb") as f:426 with self.open(support.TESTFN, "wb") as f: 245 427 f.write(b"xxx") 246 with io.open(test_support.TESTFN, "ab", buffering=0) as f:428 with self.open(support.TESTFN, "ab", buffering=0) as f: 247 429 self.assertEqual(f.tell(), 3) 248 with io.open(test_support.TESTFN, "ab") as f:430 with self.open(support.TESTFN, "ab") as f: 249 431 self.assertEqual(f.tell(), 3) 250 with io.open(test_support.TESTFN, "a") as f:251 self.assert _(f.tell() > 0)432 with self.open(support.TESTFN, "a") as f: 433 self.assertTrue(f.tell() > 0) 252 434 253 435 def test_destructor(self): 254 436 record = [] 255 class MyFileIO( io.FileIO):437 class MyFileIO(self.FileIO): 256 438 def __del__(self): 257 439 record.append(1) 258 io.FileIO.__del__(self) 440 try: 441 f = super(MyFileIO, self).__del__ 442 except AttributeError: 443 pass 444 else: 445 f() 259 446 def close(self): 260 447 record.append(2) 261 io.FileIO.close(self)448 super(MyFileIO, self).close() 262 449 def flush(self): 263 450 record.append(3) 264 io.FileIO.flush(self)265 f = MyFileIO( test_support.TESTFN, "w")266 f.write( "xxx")451 super(MyFileIO, self).flush() 452 f = MyFileIO(support.TESTFN, "wb") 453 f.write(b"xxx") 267 454 del f 455 support.gc_collect() 268 456 self.assertEqual(record, [1, 2, 3]) 457 with self.open(support.TESTFN, "rb") as f: 458 self.assertEqual(f.read(), b"xxx") 459 460 def _check_base_destructor(self, base): 461 record = [] 462 class MyIO(base): 463 def __init__(self): 464 # This exercises the availability of attributes on object 465 # destruction. 466 # (in the C version, close() is called by the tp_dealloc 467 # function, not by __del__) 468 self.on_del = 1 469 self.on_close = 2 470 self.on_flush = 3 471 def __del__(self): 472 record.append(self.on_del) 473 try: 474 f = super(MyIO, self).__del__ 475 except AttributeError: 476 pass 477 else: 478 f() 479 def close(self): 480 record.append(self.on_close) 481 super(MyIO, self).close() 482 def flush(self): 483 record.append(self.on_flush) 484 super(MyIO, self).flush() 485 f = MyIO() 486 del f 487 support.gc_collect() 488 self.assertEqual(record, [1, 2, 3]) 489 490 def test_IOBase_destructor(self): 491 self._check_base_destructor(self.IOBase) 492 493 def test_RawIOBase_destructor(self): 494 self._check_base_destructor(self.RawIOBase) 495 496 def test_BufferedIOBase_destructor(self): 497 self._check_base_destructor(self.BufferedIOBase) 498 499 def test_TextIOBase_destructor(self): 500 self._check_base_destructor(self.TextIOBase) 269 501 270 502 def test_close_flushes(self): 271 f = io.open(test_support.TESTFN, "wb") 272 f.write(b"xxx") 273 f.close() 274 f = io.open(test_support.TESTFN, "rb") 275 self.assertEqual(f.read(), b"xxx") 276 f.close() 277 278 def XXXtest_array_writes(self): 279 # XXX memory view not available yet 280 a = array.array('i', range(10)) 281 n = len(memoryview(a)) 282 f = io.open(test_support.TESTFN, "wb", 0) 283 self.assertEqual(f.write(a), n) 284 f.close() 285 f = io.open(test_support.TESTFN, "wb") 286 self.assertEqual(f.write(a), n) 287 f.close() 503 with self.open(support.TESTFN, "wb") as f: 504 f.write(b"xxx") 505 with self.open(support.TESTFN, "rb") as f: 506 self.assertEqual(f.read(), b"xxx") 507 508 def test_array_writes(self): 509 a = array.array(b'i', range(10)) 510 n = len(a.tostring()) 511 with self.open(support.TESTFN, "wb", 0) as f: 512 self.assertEqual(f.write(a), n) 513 with self.open(support.TESTFN, "wb") as f: 514 self.assertEqual(f.write(a), n) 288 515 289 516 def test_closefd(self): 290 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',517 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 291 518 closefd=False) 292 519 293 def test ReadClosed(self):294 with io.open(test_support.TESTFN, "w") as f:520 def test_read_closed(self): 521 with self.open(support.TESTFN, "w") as f: 295 522 f.write("egg\n") 296 with io.open(test_support.TESTFN, "r") as f:297 file = io.open(f.fileno(), "r", closefd=False)523 with self.open(support.TESTFN, "r") as f: 524 file = self.open(f.fileno(), "r", closefd=False) 298 525 self.assertEqual(file.read(), "egg\n") 299 526 file.seek(0) … … 303 530 def test_no_closefd_with_filename(self): 304 531 # can't use closefd in combination with a file name 305 self.assertRaises(ValueError, 306 io.open, test_support.TESTFN, "r", closefd=False) 532 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 307 533 308 534 def test_closefd_attr(self): 309 with io.open(test_support.TESTFN, "wb") as f:535 with self.open(support.TESTFN, "wb") as f: 310 536 f.write(b"egg\n") 311 with io.open(test_support.TESTFN, "r") as f:537 with self.open(support.TESTFN, "r") as f: 312 538 self.assertEqual(f.buffer.raw.closefd, True) 313 file = io.open(f.fileno(), "r", closefd=False)539 file = self.open(f.fileno(), "r", closefd=False) 314 540 self.assertEqual(file.buffer.raw.closefd, False) 315 541 316 317 class MemorySeekTestMixin: 318 319 def testInit(self): 320 buf = self.buftype("1234567890") 321 bytesIo = self.ioclass(buf) 322 323 def testRead(self): 324 buf = self.buftype("1234567890") 325 bytesIo = self.ioclass(buf) 326 327 self.assertEquals(buf[:1], bytesIo.read(1)) 328 self.assertEquals(buf[1:5], bytesIo.read(4)) 329 self.assertEquals(buf[5:], bytesIo.read(900)) 330 self.assertEquals(self.EOF, bytesIo.read()) 331 332 def testReadNoArgs(self): 333 buf = self.buftype("1234567890") 334 bytesIo = self.ioclass(buf) 335 336 self.assertEquals(buf, bytesIo.read()) 337 self.assertEquals(self.EOF, bytesIo.read()) 338 339 def testSeek(self): 340 buf = self.buftype("1234567890") 341 bytesIo = self.ioclass(buf) 342 343 bytesIo.read(5) 344 bytesIo.seek(0) 345 self.assertEquals(buf, bytesIo.read()) 346 347 bytesIo.seek(3) 348 self.assertEquals(buf[3:], bytesIo.read()) 349 self.assertRaises(TypeError, bytesIo.seek, 0.0) 350 351 def testTell(self): 352 buf = self.buftype("1234567890") 353 bytesIo = self.ioclass(buf) 354 355 self.assertEquals(0, bytesIo.tell()) 356 bytesIo.seek(5) 357 self.assertEquals(5, bytesIo.tell()) 358 bytesIo.seek(10000) 359 self.assertEquals(10000, bytesIo.tell()) 360 361 362 class BytesIOTest(MemorySeekTestMixin, unittest.TestCase): 363 @staticmethod 364 def buftype(s): 365 return s.encode("utf-8") 366 ioclass = io.BytesIO 367 EOF = b"" 368 369 370 class StringIOTest(MemorySeekTestMixin, unittest.TestCase): 371 buftype = str 372 ioclass = io.StringIO 373 EOF = "" 374 375 376 class BufferedReaderTest(unittest.TestCase): 377 378 def testRead(self): 379 rawio = MockRawIO((b"abc", b"d", b"efg")) 380 bufio = io.BufferedReader(rawio) 381 382 self.assertEquals(b"abcdef", bufio.read(6)) 383 384 def testBuffering(self): 542 def test_garbage_collection(self): 543 # FileIO objects are collected, and collecting them flushes 544 # all data to disk. 545 f = self.FileIO(support.TESTFN, "wb") 546 f.write(b"abcxxx") 547 f.f = f 548 wr = weakref.ref(f) 549 del f 550 support.gc_collect() 551 self.assertTrue(wr() is None, wr) 552 with self.open(support.TESTFN, "rb") as f: 553 self.assertEqual(f.read(), b"abcxxx") 554 555 def test_unbounded_file(self): 556 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 557 zero = "/dev/zero" 558 if not os.path.exists(zero): 559 self.skipTest("{0} does not exist".format(zero)) 560 if sys.maxsize > 0x7FFFFFFF: 561 self.skipTest("test can only run in a 32-bit address space") 562 if support.real_max_memuse < support._2G: 563 self.skipTest("test requires at least 2GB of memory") 564 with self.open(zero, "rb", buffering=0) as f: 565 self.assertRaises(OverflowError, f.read) 566 with self.open(zero, "rb") as f: 567 self.assertRaises(OverflowError, f.read) 568 with self.open(zero, "r") as f: 569 self.assertRaises(OverflowError, f.read) 570 571 def test_flush_error_on_close(self): 572 f = self.open(support.TESTFN, "wb", buffering=0) 573 def bad_flush(): 574 raise IOError() 575 f.flush = bad_flush 576 self.assertRaises(IOError, f.close) # exception not swallowed 577 self.assertTrue(f.closed) 578 579 def test_multi_close(self): 580 f = self.open(support.TESTFN, "wb", buffering=0) 581 f.close() 582 f.close() 583 f.close() 584 self.assertRaises(ValueError, f.flush) 585 586 def test_RawIOBase_read(self): 587 # Exercise the default RawIOBase.read() implementation (which calls 588 # readinto() internally). 589 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 590 self.assertEqual(rawio.read(2), b"ab") 591 self.assertEqual(rawio.read(2), b"c") 592 self.assertEqual(rawio.read(2), b"d") 593 self.assertEqual(rawio.read(2), None) 594 self.assertEqual(rawio.read(2), b"ef") 595 self.assertEqual(rawio.read(2), b"g") 596 self.assertEqual(rawio.read(2), None) 597 self.assertEqual(rawio.read(2), b"") 598 599 def test_fileio_closefd(self): 600 # Issue #4841 601 with self.open(__file__, 'rb') as f1, \ 602 self.open(__file__, 'rb') as f2: 603 fileio = self.FileIO(f1.fileno(), closefd=False) 604 # .__init__() must not close f1 605 fileio.__init__(f2.fileno(), closefd=False) 606 f1.readline() 607 # .close() must not close f2 608 fileio.close() 609 f2.readline() 610 611 612 class CIOTest(IOTest): 613 614 def test_IOBase_finalize(self): 615 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 616 # class which inherits IOBase and an object of this class are caught 617 # in a reference cycle and close() is already in the method cache. 618 class MyIO(self.IOBase): 619 def close(self): 620 pass 621 622 # create an instance to populate the method cache 623 MyIO() 624 obj = MyIO() 625 obj.obj = obj 626 wr = weakref.ref(obj) 627 del MyIO 628 del obj 629 support.gc_collect() 630 self.assertTrue(wr() is None, wr) 631 632 class PyIOTest(IOTest): 633 test_array_writes = unittest.skip( 634 "len(array.array) returns number of elements rather than bytelength" 635 )(IOTest.test_array_writes) 636 637 638 class CommonBufferedTests: 639 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 640 641 def test_detach(self): 642 raw = self.MockRawIO() 643 buf = self.tp(raw) 644 self.assertIs(buf.detach(), raw) 645 self.assertRaises(ValueError, buf.detach) 646 647 def test_fileno(self): 648 rawio = self.MockRawIO() 649 bufio = self.tp(rawio) 650 651 self.assertEqual(42, bufio.fileno()) 652 653 def test_no_fileno(self): 654 # XXX will we always have fileno() function? If so, kill 655 # this test. Else, write it. 656 pass 657 658 def test_invalid_args(self): 659 rawio = self.MockRawIO() 660 bufio = self.tp(rawio) 661 # Invalid whence 662 self.assertRaises(ValueError, bufio.seek, 0, -1) 663 self.assertRaises(ValueError, bufio.seek, 0, 3) 664 665 def test_override_destructor(self): 666 tp = self.tp 667 record = [] 668 class MyBufferedIO(tp): 669 def __del__(self): 670 record.append(1) 671 try: 672 f = super(MyBufferedIO, self).__del__ 673 except AttributeError: 674 pass 675 else: 676 f() 677 def close(self): 678 record.append(2) 679 super(MyBufferedIO, self).close() 680 def flush(self): 681 record.append(3) 682 super(MyBufferedIO, self).flush() 683 rawio = self.MockRawIO() 684 bufio = MyBufferedIO(rawio) 685 writable = bufio.writable() 686 del bufio 687 support.gc_collect() 688 if writable: 689 self.assertEqual(record, [1, 2, 3]) 690 else: 691 self.assertEqual(record, [1, 2]) 692 693 def test_context_manager(self): 694 # Test usability as a context manager 695 rawio = self.MockRawIO() 696 bufio = self.tp(rawio) 697 def _with(): 698 with bufio: 699 pass 700 _with() 701 # bufio should now be closed, and using it a second time should raise 702 # a ValueError. 703 self.assertRaises(ValueError, _with) 704 705 def test_error_through_destructor(self): 706 # Test that the exception state is not modified by a destructor, 707 # even if close() fails. 708 rawio = self.CloseFailureIO() 709 def f(): 710 self.tp(rawio).xyzzy 711 with support.captured_output("stderr") as s: 712 self.assertRaises(AttributeError, f) 713 s = s.getvalue().strip() 714 if s: 715 # The destructor *may* have printed an unraisable error, check it 716 self.assertEqual(len(s.splitlines()), 1) 717 self.assertTrue(s.startswith("Exception IOError: "), s) 718 self.assertTrue(s.endswith(" ignored"), s) 719 720 def test_repr(self): 721 raw = self.MockRawIO() 722 b = self.tp(raw) 723 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) 724 self.assertEqual(repr(b), "<%s>" % clsname) 725 raw.name = "dummy" 726 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) 727 raw.name = b"dummy" 728 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 729 730 def test_flush_error_on_close(self): 731 raw = self.MockRawIO() 732 def bad_flush(): 733 raise IOError() 734 raw.flush = bad_flush 735 b = self.tp(raw) 736 self.assertRaises(IOError, b.close) # exception not swallowed 737 self.assertTrue(b.closed) 738 739 def test_close_error_on_close(self): 740 raw = self.MockRawIO() 741 def bad_flush(): 742 raise IOError('flush') 743 def bad_close(): 744 raise IOError('close') 745 raw.close = bad_close 746 b = self.tp(raw) 747 b.flush = bad_flush 748 with self.assertRaises(IOError) as err: # exception not swallowed 749 b.close() 750 self.assertEqual(err.exception.args, ('close',)) 751 self.assertFalse(b.closed) 752 753 def test_multi_close(self): 754 raw = self.MockRawIO() 755 b = self.tp(raw) 756 b.close() 757 b.close() 758 b.close() 759 self.assertRaises(ValueError, b.flush) 760 761 def test_readonly_attributes(self): 762 raw = self.MockRawIO() 763 buf = self.tp(raw) 764 x = self.MockRawIO() 765 with self.assertRaises((AttributeError, TypeError)): 766 buf.raw = x 767 768 769 class SizeofTest: 770 771 @support.cpython_only 772 def test_sizeof(self): 773 bufsize1 = 4096 774 bufsize2 = 8192 775 rawio = self.MockRawIO() 776 bufio = self.tp(rawio, buffer_size=bufsize1) 777 size = sys.getsizeof(bufio) - bufsize1 778 rawio = self.MockRawIO() 779 bufio = self.tp(rawio, buffer_size=bufsize2) 780 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 781 782 783 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 784 read_mode = "rb" 785 786 def test_constructor(self): 787 rawio = self.MockRawIO([b"abc"]) 788 bufio = self.tp(rawio) 789 bufio.__init__(rawio) 790 bufio.__init__(rawio, buffer_size=1024) 791 bufio.__init__(rawio, buffer_size=16) 792 self.assertEqual(b"abc", bufio.read()) 793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 796 rawio = self.MockRawIO([b"abc"]) 797 bufio.__init__(rawio) 798 self.assertEqual(b"abc", bufio.read()) 799 800 def test_read(self): 801 for arg in (None, 7): 802 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 803 bufio = self.tp(rawio) 804 self.assertEqual(b"abcdefg", bufio.read(arg)) 805 # Invalid args 806 self.assertRaises(ValueError, bufio.read, -2) 807 808 def test_read1(self): 809 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 810 bufio = self.tp(rawio) 811 self.assertEqual(b"a", bufio.read(1)) 812 self.assertEqual(b"b", bufio.read1(1)) 813 self.assertEqual(rawio._reads, 1) 814 self.assertEqual(b"c", bufio.read1(100)) 815 self.assertEqual(rawio._reads, 1) 816 self.assertEqual(b"d", bufio.read1(100)) 817 self.assertEqual(rawio._reads, 2) 818 self.assertEqual(b"efg", bufio.read1(100)) 819 self.assertEqual(rawio._reads, 3) 820 self.assertEqual(b"", bufio.read1(100)) 821 self.assertEqual(rawio._reads, 4) 822 # Invalid args 823 self.assertRaises(ValueError, bufio.read1, -1) 824 825 def test_readinto(self): 826 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 827 bufio = self.tp(rawio) 828 b = bytearray(2) 829 self.assertEqual(bufio.readinto(b), 2) 830 self.assertEqual(b, b"ab") 831 self.assertEqual(bufio.readinto(b), 2) 832 self.assertEqual(b, b"cd") 833 self.assertEqual(bufio.readinto(b), 2) 834 self.assertEqual(b, b"ef") 835 self.assertEqual(bufio.readinto(b), 1) 836 self.assertEqual(b, b"gf") 837 self.assertEqual(bufio.readinto(b), 0) 838 self.assertEqual(b, b"gf") 839 840 def test_readlines(self): 841 def bufio(): 842 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 843 return self.tp(rawio) 844 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 845 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 846 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 847 848 def test_buffering(self): 385 849 data = b"abcdefghi" 386 850 dlen = len(data) … … 393 857 394 858 for bufsize, buf_read_sizes, raw_read_sizes in tests: 395 rawio = MockFileIO(data)396 bufio = io.BufferedReader(rawio, buffer_size=bufsize)859 rawio = self.MockFileIO(data) 860 bufio = self.tp(rawio, buffer_size=bufsize) 397 861 pos = 0 398 862 for nbytes in buf_read_sizes: 399 self.assertEqual s(bufio.read(nbytes), data[pos:pos+nbytes])863 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 400 864 pos += nbytes 401 self.assertEquals(rawio.read_history, raw_read_sizes) 402 403 def testReadNonBlocking(self): 865 # this is mildly implementation-dependent 866 self.assertEqual(rawio.read_history, raw_read_sizes) 867 868 def test_read_non_blocking(self): 404 869 # Inject some None's in there to simulate EWOULDBLOCK 405 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None)) 406 bufio = io.BufferedReader(rawio) 407 408 self.assertEquals(b"abcd", bufio.read(6)) 409 self.assertEquals(b"e", bufio.read(1)) 410 self.assertEquals(b"fg", bufio.read()) 411 self.assert_(None is bufio.read()) 412 self.assertEquals(b"", bufio.read()) 413 414 def testReadToEof(self): 415 rawio = MockRawIO((b"abc", b"d", b"efg")) 416 bufio = io.BufferedReader(rawio) 417 418 self.assertEquals(b"abcdefg", bufio.read(9000)) 419 420 def testReadNoArgs(self): 421 rawio = MockRawIO((b"abc", b"d", b"efg")) 422 bufio = io.BufferedReader(rawio) 423 424 self.assertEquals(b"abcdefg", bufio.read()) 425 426 def testFileno(self): 427 rawio = MockRawIO((b"abc", b"d", b"efg")) 428 bufio = io.BufferedReader(rawio) 429 430 self.assertEquals(42, bufio.fileno()) 431 432 def testFilenoNoFileno(self): 433 # XXX will we always have fileno() function? If so, kill 434 # this test. Else, write it. 435 pass 436 437 def testThreads(self): 870 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 871 bufio = self.tp(rawio) 872 self.assertEqual(b"abcd", bufio.read(6)) 873 self.assertEqual(b"e", bufio.read(1)) 874 self.assertEqual(b"fg", bufio.read()) 875 self.assertEqual(b"", bufio.peek(1)) 876 self.assertIsNone(bufio.read()) 877 self.assertEqual(b"", bufio.read()) 878 879 rawio = self.MockRawIO((b"a", None, None)) 880 self.assertEqual(b"a", rawio.readall()) 881 self.assertIsNone(rawio.readall()) 882 883 def test_read_past_eof(self): 884 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 885 bufio = self.tp(rawio) 886 887 self.assertEqual(b"abcdefg", bufio.read(9000)) 888 889 def test_read_all(self): 890 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 891 bufio = self.tp(rawio) 892 893 self.assertEqual(b"abcdefg", bufio.read()) 894 895 @unittest.skipUnless(threading, 'Threading required for this test.') 896 @support.requires_resource('cpu') 897 def test_threads(self): 438 898 try: 439 899 # Write out many bytes with exactly the same number of 0's, … … 441 901 # doesn't duplicate or forget contents. 442 902 N = 1000 443 l = range(256) * N903 l = list(range(256)) * N 444 904 random.shuffle(l) 445 905 s = bytes(bytearray(l)) 446 with io.open(test_support.TESTFN, "wb") as f:906 with self.open(support.TESTFN, "wb") as f: 447 907 f.write(s) 448 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:449 bufio = io.BufferedReader(raw, 8)908 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 909 bufio = self.tp(raw, 8) 450 910 errors = [] 451 911 results = [] … … 475 935 self.assertEqual(s.count(c), N) 476 936 finally: 477 test_support.unlink(test_support.TESTFN) 478 479 480 481 class BufferedWriterTest(unittest.TestCase): 482 483 def testWrite(self): 937 support.unlink(support.TESTFN) 938 939 def test_misbehaved_io(self): 940 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 941 bufio = self.tp(rawio) 942 self.assertRaises(IOError, bufio.seek, 0) 943 self.assertRaises(IOError, bufio.tell) 944 945 def test_no_extraneous_read(self): 946 # Issue #9550; when the raw IO object has satisfied the read request, 947 # we should not issue any additional reads, otherwise it may block 948 # (e.g. socket). 949 bufsize = 16 950 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 951 rawio = self.MockRawIO([b"x" * n]) 952 bufio = self.tp(rawio, bufsize) 953 self.assertEqual(bufio.read(n), b"x" * n) 954 # Simple case: one raw read is enough to satisfy the request. 955 self.assertEqual(rawio._extraneous_reads, 0, 956 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 957 # A more complex case where two raw reads are needed to satisfy 958 # the request. 959 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 960 bufio = self.tp(rawio, bufsize) 961 self.assertEqual(bufio.read(n), b"x" * n) 962 self.assertEqual(rawio._extraneous_reads, 0, 963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 964 965 966 class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 967 tp = io.BufferedReader 968 969 def test_constructor(self): 970 BufferedReaderTest.test_constructor(self) 971 # The allocation can succeed on 32-bit builds, e.g. with more 972 # than 2GB RAM and a 64-bit kernel. 973 if sys.maxsize > 0x7FFFFFFF: 974 rawio = self.MockRawIO() 975 bufio = self.tp(rawio) 976 self.assertRaises((OverflowError, MemoryError, ValueError), 977 bufio.__init__, rawio, sys.maxsize) 978 979 def test_initialization(self): 980 rawio = self.MockRawIO([b"abc"]) 981 bufio = self.tp(rawio) 982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 983 self.assertRaises(ValueError, bufio.read) 984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 985 self.assertRaises(ValueError, bufio.read) 986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 987 self.assertRaises(ValueError, bufio.read) 988 989 def test_misbehaved_io_read(self): 990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 991 bufio = self.tp(rawio) 992 # _pyio.BufferedReader seems to implement reading different, so that 993 # checking this is not so easy. 994 self.assertRaises(IOError, bufio.read, 10) 995 996 def test_garbage_collection(self): 997 # C BufferedReader objects are collected. 998 # The Python version has __del__, so it ends into gc.garbage instead 999 rawio = self.FileIO(support.TESTFN, "w+b") 1000 f = self.tp(rawio) 1001 f.f = f 1002 wr = weakref.ref(f) 1003 del f 1004 support.gc_collect() 1005 self.assertTrue(wr() is None, wr) 1006 1007 def test_args_error(self): 1008 # Issue #17275 1009 with self.assertRaisesRegexp(TypeError, "BufferedReader"): 1010 self.tp(io.BytesIO(), 1024, 1024, 1024) 1011 1012 1013 class PyBufferedReaderTest(BufferedReaderTest): 1014 tp = pyio.BufferedReader 1015 1016 1017 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1018 write_mode = "wb" 1019 1020 def test_constructor(self): 1021 rawio = self.MockRawIO() 1022 bufio = self.tp(rawio) 1023 bufio.__init__(rawio) 1024 bufio.__init__(rawio, buffer_size=1024) 1025 bufio.__init__(rawio, buffer_size=16) 1026 self.assertEqual(3, bufio.write(b"abc")) 1027 bufio.flush() 1028 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1029 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1031 bufio.__init__(rawio) 1032 self.assertEqual(3, bufio.write(b"ghi")) 1033 bufio.flush() 1034 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1035 1036 def test_detach_flush(self): 1037 raw = self.MockRawIO() 1038 buf = self.tp(raw) 1039 buf.write(b"howdy!") 1040 self.assertFalse(raw._write_stack) 1041 buf.detach() 1042 self.assertEqual(raw._write_stack, [b"howdy!"]) 1043 1044 def test_write(self): 484 1045 # Write to the buffered IO but don't overflow the buffer. 485 writer = MockRawIO() 486 bufio = io.BufferedWriter(writer, 8) 487 1046 writer = self.MockRawIO() 1047 bufio = self.tp(writer, 8) 488 1048 bufio.write(b"abc") 489 490 1049 self.assertFalse(writer._write_stack) 491 1050 492 def testWriteOverflow(self): 493 writer = MockRawIO() 494 bufio = io.BufferedWriter(writer, 8) 495 496 bufio.write(b"abc") 497 bufio.write(b"defghijkl") 498 499 self.assertEquals(b"abcdefghijkl", writer._write_stack[0]) 500 501 def testWriteNonBlocking(self): 502 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12)) 503 bufio = io.BufferedWriter(raw, 8, 16) 504 505 bufio.write(b"asdf") 506 bufio.write(b"asdfa") 507 self.assertEquals(b"asdfasdfa", raw._write_stack[0]) 508 509 bufio.write(b"asdfasdfasdf") 510 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1]) 511 bufio.write(b"asdfasdfasdf") 512 self.assertEquals(b"dfasdfasdf", raw._write_stack[2]) 513 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3]) 514 515 bufio.write(b"asdfasdfasdf") 516 517 # XXX I don't like this test. It relies too heavily on how the 518 # algorithm actually works, which we might change. Refactor 519 # later. 520 521 def testFileno(self): 522 rawio = MockRawIO((b"abc", b"d", b"efg")) 523 bufio = io.BufferedWriter(rawio) 524 525 self.assertEquals(42, bufio.fileno()) 526 527 def testFlush(self): 528 writer = MockRawIO() 529 bufio = io.BufferedWriter(writer, 8) 530 1051 def test_write_overflow(self): 1052 writer = self.MockRawIO() 1053 bufio = self.tp(writer, 8) 1054 contents = b"abcdefghijklmnop" 1055 for n in range(0, len(contents), 3): 1056 bufio.write(contents[n:n+3]) 1057 flushed = b"".join(writer._write_stack) 1058 # At least (total - 8) bytes were implicitly flushed, perhaps more 1059 # depending on the implementation. 1060 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1061 1062 def check_writes(self, intermediate_func): 1063 # Lots of writes, test the flushed output is as expected. 1064 contents = bytes(range(256)) * 1000 1065 n = 0 1066 writer = self.MockRawIO() 1067 bufio = self.tp(writer, 13) 1068 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1069 def gen_sizes(): 1070 for size in count(1): 1071 for i in range(15): 1072 yield size 1073 sizes = gen_sizes() 1074 while n < len(contents): 1075 size = min(next(sizes), len(contents) - n) 1076 self.assertEqual(bufio.write(contents[n:n+size]), size) 1077 intermediate_func(bufio) 1078 n += size 1079 bufio.flush() 1080 self.assertEqual(contents, 1081 b"".join(writer._write_stack)) 1082 1083 def test_writes(self): 1084 self.check_writes(lambda bufio: None) 1085 1086 def test_writes_and_flushes(self): 1087 self.check_writes(lambda bufio: bufio.flush()) 1088 1089 def test_writes_and_seeks(self): 1090 def _seekabs(bufio): 1091 pos = bufio.tell() 1092 bufio.seek(pos + 1, 0) 1093 bufio.seek(pos - 1, 0) 1094 bufio.seek(pos, 0) 1095 self.check_writes(_seekabs) 1096 def _seekrel(bufio): 1097 pos = bufio.seek(0, 1) 1098 bufio.seek(+1, 1) 1099 bufio.seek(-1, 1) 1100 bufio.seek(pos, 0) 1101 self.check_writes(_seekrel) 1102 1103 def test_writes_and_truncates(self): 1104 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1105 1106 def test_write_non_blocking(self): 1107 raw = self.MockNonBlockWriterIO() 1108 bufio = self.tp(raw, 8) 1109 1110 self.assertEqual(bufio.write(b"abcd"), 4) 1111 self.assertEqual(bufio.write(b"efghi"), 5) 1112 # 1 byte will be written, the rest will be buffered 1113 raw.block_on(b"k") 1114 self.assertEqual(bufio.write(b"jklmn"), 5) 1115 1116 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1117 raw.block_on(b"0") 1118 try: 1119 bufio.write(b"opqrwxyz0123456789") 1120 except self.BlockingIOError as e: 1121 written = e.characters_written 1122 else: 1123 self.fail("BlockingIOError should have been raised") 1124 self.assertEqual(written, 16) 1125 self.assertEqual(raw.pop_written(), 1126 b"abcdefghijklmnopqrwxyz") 1127 1128 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1129 s = raw.pop_written() 1130 # Previously buffered bytes were flushed 1131 self.assertTrue(s.startswith(b"01234567A"), s) 1132 1133 def test_write_and_rewind(self): 1134 raw = io.BytesIO() 1135 bufio = self.tp(raw, 4) 1136 self.assertEqual(bufio.write(b"abcdef"), 6) 1137 self.assertEqual(bufio.tell(), 6) 1138 bufio.seek(0, 0) 1139 self.assertEqual(bufio.write(b"XY"), 2) 1140 bufio.seek(6, 0) 1141 self.assertEqual(raw.getvalue(), b"XYcdef") 1142 self.assertEqual(bufio.write(b"123456"), 6) 1143 bufio.flush() 1144 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1145 1146 def test_flush(self): 1147 writer = self.MockRawIO() 1148 bufio = self.tp(writer, 8) 531 1149 bufio.write(b"abc") 532 1150 bufio.flush() 533 534 self.assertEquals(b"abc", writer._write_stack[0]) 535 536 def testThreads(self): 537 # BufferedWriter should not raise exceptions or crash 538 # when called from multiple threads. 1151 self.assertEqual(b"abc", writer._write_stack[0]) 1152 1153 def test_writelines(self): 1154 l = [b'ab', b'cd', b'ef'] 1155 writer = self.MockRawIO() 1156 bufio = self.tp(writer, 8) 1157 bufio.writelines(l) 1158 bufio.flush() 1159 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1160 1161 def test_writelines_userlist(self): 1162 l = UserList([b'ab', b'cd', b'ef']) 1163 writer = self.MockRawIO() 1164 bufio = self.tp(writer, 8) 1165 bufio.writelines(l) 1166 bufio.flush() 1167 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1168 1169 def test_writelines_error(self): 1170 writer = self.MockRawIO() 1171 bufio = self.tp(writer, 8) 1172 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1173 self.assertRaises(TypeError, bufio.writelines, None) 1174 1175 def test_destructor(self): 1176 writer = self.MockRawIO() 1177 bufio = self.tp(writer, 8) 1178 bufio.write(b"abc") 1179 del bufio 1180 support.gc_collect() 1181 self.assertEqual(b"abc", writer._write_stack[0]) 1182 1183 def test_truncate(self): 1184 # Truncate implicitly flushes the buffer. 1185 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1186 bufio = self.tp(raw, 8) 1187 bufio.write(b"abcdef") 1188 self.assertEqual(bufio.truncate(3), 3) 1189 self.assertEqual(bufio.tell(), 6) 1190 with self.open(support.TESTFN, "rb", buffering=0) as f: 1191 self.assertEqual(f.read(), b"abc") 1192 1193 @unittest.skipUnless(threading, 'Threading required for this test.') 1194 @support.requires_resource('cpu') 1195 def test_threads(self): 539 1196 try: 1197 # Write out many bytes from many threads and test they were 1198 # all flushed. 1199 N = 1000 1200 contents = bytes(range(256)) * N 1201 sizes = cycle([1, 19]) 1202 n = 0 1203 queue = deque() 1204 while n < len(contents): 1205 size = next(sizes) 1206 queue.append(contents[n:n+size]) 1207 n += size 1208 del contents 540 1209 # We use a real file object because it allows us to 541 1210 # exercise situations where the GIL is released before … … 543 1212 # to concurrency issues due to switching threads in the middle 544 1213 # of Python code. 545 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:546 bufio = io.BufferedWriter(raw, 8)1214 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1215 bufio = self.tp(raw, 8) 547 1216 errors = [] 548 1217 def f(): 549 1218 try: 550 # Write enough bytes to flush the buffer 551 s = b"a" * 19 552 for i in range(50): 1219 while True: 1220 try: 1221 s = queue.popleft() 1222 except IndexError: 1223 return 553 1224 bufio.write(s) 554 1225 except Exception as e: … … 563 1234 self.assertFalse(errors, 564 1235 "the following exceptions were caught: %r" % errors) 1236 bufio.close() 1237 with self.open(support.TESTFN, "rb") as f: 1238 s = f.read() 1239 for i in range(256): 1240 self.assertEqual(s.count(bytes([i])), N) 565 1241 finally: 566 test_support.unlink(test_support.TESTFN) 567 1242 support.unlink(support.TESTFN) 1243 1244 def test_misbehaved_io(self): 1245 rawio = self.MisbehavedRawIO() 1246 bufio = self.tp(rawio, 5) 1247 self.assertRaises(IOError, bufio.seek, 0) 1248 self.assertRaises(IOError, bufio.tell) 1249 self.assertRaises(IOError, bufio.write, b"abcdef") 1250 1251 def test_max_buffer_size_deprecation(self): 1252 with support.check_warnings(("max_buffer_size is deprecated", 1253 DeprecationWarning)): 1254 self.tp(self.MockRawIO(), 8, 12) 1255 1256 def test_write_error_on_close(self): 1257 raw = self.MockRawIO() 1258 def bad_write(b): 1259 raise IOError() 1260 raw.write = bad_write 1261 b = self.tp(raw) 1262 b.write(b'spam') 1263 self.assertRaises(IOError, b.close) # exception not swallowed 1264 self.assertTrue(b.closed) 1265 1266 1267 class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1268 tp = io.BufferedWriter 1269 1270 def test_constructor(self): 1271 BufferedWriterTest.test_constructor(self) 1272 # The allocation can succeed on 32-bit builds, e.g. with more 1273 # than 2GB RAM and a 64-bit kernel. 1274 if sys.maxsize > 0x7FFFFFFF: 1275 rawio = self.MockRawIO() 1276 bufio = self.tp(rawio) 1277 self.assertRaises((OverflowError, MemoryError, ValueError), 1278 bufio.__init__, rawio, sys.maxsize) 1279 1280 def test_initialization(self): 1281 rawio = self.MockRawIO() 1282 bufio = self.tp(rawio) 1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1284 self.assertRaises(ValueError, bufio.write, b"def") 1285 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1286 self.assertRaises(ValueError, bufio.write, b"def") 1287 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1288 self.assertRaises(ValueError, bufio.write, b"def") 1289 1290 def test_garbage_collection(self): 1291 # C BufferedWriter objects are collected, and collecting them flushes 1292 # all data to disk. 1293 # The Python version has __del__, so it ends into gc.garbage instead 1294 rawio = self.FileIO(support.TESTFN, "w+b") 1295 f = self.tp(rawio) 1296 f.write(b"123xxx") 1297 f.x = f 1298 wr = weakref.ref(f) 1299 del f 1300 support.gc_collect() 1301 self.assertTrue(wr() is None, wr) 1302 with self.open(support.TESTFN, "rb") as f: 1303 self.assertEqual(f.read(), b"123xxx") 1304 1305 def test_args_error(self): 1306 # Issue #17275 1307 with self.assertRaisesRegexp(TypeError, "BufferedWriter"): 1308 self.tp(io.BytesIO(), 1024, 1024, 1024) 1309 1310 1311 class PyBufferedWriterTest(BufferedWriterTest): 1312 tp = pyio.BufferedWriter 568 1313 569 1314 class BufferedRWPairTest(unittest.TestCase): 570 1315 571 def testRWPair(self): 572 r = MockRawIO(()) 573 w = MockRawIO() 574 pair = io.BufferedRWPair(r, w) 1316 def test_constructor(self): 1317 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 575 1318 self.assertFalse(pair.closed) 576 1319 577 # XXX More Tests 578 579 580 class BufferedRandomTest(unittest.TestCase): 581 582 def testReadAndWrite(self): 583 raw = MockRawIO((b"asdf", b"ghjk")) 584 rw = io.BufferedRandom(raw, 8, 12) 1320 def test_detach(self): 1321 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1322 self.assertRaises(self.UnsupportedOperation, pair.detach) 1323 1324 def test_constructor_max_buffer_size_deprecation(self): 1325 with support.check_warnings(("max_buffer_size is deprecated", 1326 DeprecationWarning)): 1327 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1328 1329 def test_constructor_with_not_readable(self): 1330 class NotReadable(MockRawIO): 1331 def readable(self): 1332 return False 1333 1334 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) 1335 1336 def test_constructor_with_not_writeable(self): 1337 class NotWriteable(MockRawIO): 1338 def writable(self): 1339 return False 1340 1341 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) 1342 1343 def test_read(self): 1344 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1345 1346 self.assertEqual(pair.read(3), b"abc") 1347 self.assertEqual(pair.read(1), b"d") 1348 self.assertEqual(pair.read(), b"ef") 1349 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1350 self.assertEqual(pair.read(None), b"abc") 1351 1352 def test_readlines(self): 1353 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1354 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1355 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1356 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1357 1358 def test_read1(self): 1359 # .read1() is delegated to the underlying reader object, so this test 1360 # can be shallow. 1361 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1362 1363 self.assertEqual(pair.read1(3), b"abc") 1364 1365 def test_readinto(self): 1366 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1367 1368 data = bytearray(5) 1369 self.assertEqual(pair.readinto(data), 5) 1370 self.assertEqual(data, b"abcde") 1371 1372 def test_write(self): 1373 w = self.MockRawIO() 1374 pair = self.tp(self.MockRawIO(), w) 1375 1376 pair.write(b"abc") 1377 pair.flush() 1378 pair.write(b"def") 1379 pair.flush() 1380 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1381 1382 def test_peek(self): 1383 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1384 1385 self.assertTrue(pair.peek(3).startswith(b"abc")) 1386 self.assertEqual(pair.read(3), b"abc") 1387 1388 def test_readable(self): 1389 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1390 self.assertTrue(pair.readable()) 1391 1392 def test_writeable(self): 1393 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1394 self.assertTrue(pair.writable()) 1395 1396 def test_seekable(self): 1397 # BufferedRWPairs are never seekable, even if their readers and writers 1398 # are. 1399 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1400 self.assertFalse(pair.seekable()) 1401 1402 # .flush() is delegated to the underlying writer object and has been 1403 # tested in the test_write method. 1404 1405 def test_close_and_closed(self): 1406 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1407 self.assertFalse(pair.closed) 1408 pair.close() 1409 self.assertTrue(pair.closed) 1410 1411 def test_isatty(self): 1412 class SelectableIsAtty(MockRawIO): 1413 def __init__(self, isatty): 1414 MockRawIO.__init__(self) 1415 self._isatty = isatty 1416 1417 def isatty(self): 1418 return self._isatty 1419 1420 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1421 self.assertFalse(pair.isatty()) 1422 1423 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1424 self.assertTrue(pair.isatty()) 1425 1426 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1427 self.assertTrue(pair.isatty()) 1428 1429 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1430 self.assertTrue(pair.isatty()) 1431 1432 class CBufferedRWPairTest(BufferedRWPairTest): 1433 tp = io.BufferedRWPair 1434 1435 class PyBufferedRWPairTest(BufferedRWPairTest): 1436 tp = pyio.BufferedRWPair 1437 1438 1439 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1440 read_mode = "rb+" 1441 write_mode = "wb+" 1442 1443 def test_constructor(self): 1444 BufferedReaderTest.test_constructor(self) 1445 BufferedWriterTest.test_constructor(self) 1446 1447 def test_read_and_write(self): 1448 raw = self.MockRawIO((b"asdf", b"ghjk")) 1449 rw = self.tp(raw, 8) 585 1450 586 1451 self.assertEqual(b"as", rw.read(2)) … … 588 1453 rw.write(b"eee") 589 1454 self.assertFalse(raw._write_stack) # Buffer writes 590 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush591 self.assertEqual s(b"dddeee", raw._write_stack[0])592 593 def test SeekAndTell(self):594 raw = io.BytesIO(b"asdfghjkl")595 rw = io.BufferedRandom(raw)596 597 self.assertEqual s(b"as", rw.read(2))598 self.assertEqual s(2, rw.tell())1455 self.assertEqual(b"ghjk", rw.read()) 1456 self.assertEqual(b"dddeee", raw._write_stack[0]) 1457 1458 def test_seek_and_tell(self): 1459 raw = self.BytesIO(b"asdfghjkl") 1460 rw = self.tp(raw) 1461 1462 self.assertEqual(b"as", rw.read(2)) 1463 self.assertEqual(2, rw.tell()) 599 1464 rw.seek(0, 0) 600 self.assertEqual s(b"asdf", rw.read(4))601 602 rw.write(b" asdf")1465 self.assertEqual(b"asdf", rw.read(4)) 1466 1467 rw.write(b"123f") 603 1468 rw.seek(0, 0) 604 self.assertEqual s(b"asdfasdfl", rw.read())605 self.assertEqual s(9, rw.tell())1469 self.assertEqual(b"asdf123fl", rw.read()) 1470 self.assertEqual(9, rw.tell()) 606 1471 rw.seek(-4, 2) 607 self.assertEqual s(5, rw.tell())1472 self.assertEqual(5, rw.tell()) 608 1473 rw.seek(2, 1) 609 self.assertEquals(7, rw.tell()) 610 self.assertEquals(b"fl", rw.read(11)) 1474 self.assertEqual(7, rw.tell()) 1475 self.assertEqual(b"fl", rw.read(11)) 1476 rw.flush() 1477 self.assertEqual(b"asdf123fl", raw.getvalue()) 1478 611 1479 self.assertRaises(TypeError, rw.seek, 0.0) 1480 1481 def check_flush_and_read(self, read_func): 1482 raw = self.BytesIO(b"abcdefghi") 1483 bufio = self.tp(raw) 1484 1485 self.assertEqual(b"ab", read_func(bufio, 2)) 1486 bufio.write(b"12") 1487 self.assertEqual(b"ef", read_func(bufio, 2)) 1488 self.assertEqual(6, bufio.tell()) 1489 bufio.flush() 1490 self.assertEqual(6, bufio.tell()) 1491 self.assertEqual(b"ghi", read_func(bufio)) 1492 raw.seek(0, 0) 1493 raw.write(b"XYZ") 1494 # flush() resets the read buffer 1495 bufio.flush() 1496 bufio.seek(0, 0) 1497 self.assertEqual(b"XYZ", read_func(bufio, 3)) 1498 1499 def test_flush_and_read(self): 1500 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 1501 1502 def test_flush_and_readinto(self): 1503 def _readinto(bufio, n=-1): 1504 b = bytearray(n if n >= 0 else 9999) 1505 n = bufio.readinto(b) 1506 return bytes(b[:n]) 1507 self.check_flush_and_read(_readinto) 1508 1509 def test_flush_and_peek(self): 1510 def _peek(bufio, n=-1): 1511 # This relies on the fact that the buffer can contain the whole 1512 # raw stream, otherwise peek() can return less. 1513 b = bufio.peek(n) 1514 if n != -1: 1515 b = b[:n] 1516 bufio.seek(len(b), 1) 1517 return b 1518 self.check_flush_and_read(_peek) 1519 1520 def test_flush_and_write(self): 1521 raw = self.BytesIO(b"abcdefghi") 1522 bufio = self.tp(raw) 1523 1524 bufio.write(b"123") 1525 bufio.flush() 1526 bufio.write(b"45") 1527 bufio.flush() 1528 bufio.seek(0, 0) 1529 self.assertEqual(b"12345fghi", raw.getvalue()) 1530 self.assertEqual(b"12345fghi", bufio.read()) 1531 1532 def test_threads(self): 1533 BufferedReaderTest.test_threads(self) 1534 BufferedWriterTest.test_threads(self) 1535 1536 def test_writes_and_peek(self): 1537 def _peek(bufio): 1538 bufio.peek(1) 1539 self.check_writes(_peek) 1540 def _peek(bufio): 1541 pos = bufio.tell() 1542 bufio.seek(-1, 1) 1543 bufio.peek(1) 1544 bufio.seek(pos, 0) 1545 self.check_writes(_peek) 1546 1547 def test_writes_and_reads(self): 1548 def _read(bufio): 1549 bufio.seek(-1, 1) 1550 bufio.read(1) 1551 self.check_writes(_read) 1552 1553 def test_writes_and_read1s(self): 1554 def _read1(bufio): 1555 bufio.seek(-1, 1) 1556 bufio.read1(1) 1557 self.check_writes(_read1) 1558 1559 def test_writes_and_readintos(self): 1560 def _read(bufio): 1561 bufio.seek(-1, 1) 1562 bufio.readinto(bytearray(1)) 1563 self.check_writes(_read) 1564 1565 def test_write_after_readahead(self): 1566 # Issue #6629: writing after the buffer was filled by readahead should 1567 # first rewind the raw stream. 1568 for overwrite_size in [1, 5]: 1569 raw = self.BytesIO(b"A" * 10) 1570 bufio = self.tp(raw, 4) 1571 # Trigger readahead 1572 self.assertEqual(bufio.read(1), b"A") 1573 self.assertEqual(bufio.tell(), 1) 1574 # Overwriting should rewind the raw stream if it needs so 1575 bufio.write(b"B" * overwrite_size) 1576 self.assertEqual(bufio.tell(), overwrite_size + 1) 1577 # If the write size was smaller than the buffer size, flush() and 1578 # check that rewind happens. 1579 bufio.flush() 1580 self.assertEqual(bufio.tell(), overwrite_size + 1) 1581 s = raw.getvalue() 1582 self.assertEqual(s, 1583 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 1584 1585 def test_write_rewind_write(self): 1586 # Various combinations of reading / writing / seeking backwards / writing again 1587 def mutate(bufio, pos1, pos2): 1588 assert pos2 >= pos1 1589 # Fill the buffer 1590 bufio.seek(pos1) 1591 bufio.read(pos2 - pos1) 1592 bufio.write(b'\x02') 1593 # This writes earlier than the previous write, but still inside 1594 # the buffer. 1595 bufio.seek(pos1) 1596 bufio.write(b'\x01') 1597 1598 b = b"\x80\x81\x82\x83\x84" 1599 for i in range(0, len(b)): 1600 for j in range(i, len(b)): 1601 raw = self.BytesIO(b) 1602 bufio = self.tp(raw, 100) 1603 mutate(bufio, i, j) 1604 bufio.flush() 1605 expected = bytearray(b) 1606 expected[j] = 2 1607 expected[i] = 1 1608 self.assertEqual(raw.getvalue(), expected, 1609 "failed result for i=%d, j=%d" % (i, j)) 1610 1611 def test_truncate_after_read_or_write(self): 1612 raw = self.BytesIO(b"A" * 10) 1613 bufio = self.tp(raw, 100) 1614 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 1615 self.assertEqual(bufio.truncate(), 2) 1616 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 1617 self.assertEqual(bufio.truncate(), 4) 1618 1619 def test_misbehaved_io(self): 1620 BufferedReaderTest.test_misbehaved_io(self) 1621 BufferedWriterTest.test_misbehaved_io(self) 1622 1623 def test_interleaved_read_write(self): 1624 # Test for issue #12213 1625 with self.BytesIO(b'abcdefgh') as raw: 1626 with self.tp(raw, 100) as f: 1627 f.write(b"1") 1628 self.assertEqual(f.read(1), b'b') 1629 f.write(b'2') 1630 self.assertEqual(f.read1(1), b'd') 1631 f.write(b'3') 1632 buf = bytearray(1) 1633 f.readinto(buf) 1634 self.assertEqual(buf, b'f') 1635 f.write(b'4') 1636 self.assertEqual(f.peek(1), b'h') 1637 f.flush() 1638 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 1639 1640 with self.BytesIO(b'abc') as raw: 1641 with self.tp(raw, 100) as f: 1642 self.assertEqual(f.read(1), b'a') 1643 f.write(b"2") 1644 self.assertEqual(f.read(1), b'c') 1645 f.flush() 1646 self.assertEqual(raw.getvalue(), b'a2c') 1647 1648 def test_interleaved_readline_write(self): 1649 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 1650 with self.tp(raw) as f: 1651 f.write(b'1') 1652 self.assertEqual(f.readline(), b'b\n') 1653 f.write(b'2') 1654 self.assertEqual(f.readline(), b'def\n') 1655 f.write(b'3') 1656 self.assertEqual(f.readline(), b'\n') 1657 f.flush() 1658 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 1659 1660 1661 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, 1662 BufferedRandomTest, SizeofTest): 1663 tp = io.BufferedRandom 1664 1665 def test_constructor(self): 1666 BufferedRandomTest.test_constructor(self) 1667 # The allocation can succeed on 32-bit builds, e.g. with more 1668 # than 2GB RAM and a 64-bit kernel. 1669 if sys.maxsize > 0x7FFFFFFF: 1670 rawio = self.MockRawIO() 1671 bufio = self.tp(rawio) 1672 self.assertRaises((OverflowError, MemoryError, ValueError), 1673 bufio.__init__, rawio, sys.maxsize) 1674 1675 def test_garbage_collection(self): 1676 CBufferedReaderTest.test_garbage_collection(self) 1677 CBufferedWriterTest.test_garbage_collection(self) 1678 1679 def test_args_error(self): 1680 # Issue #17275 1681 with self.assertRaisesRegexp(TypeError, "BufferedRandom"): 1682 self.tp(io.BytesIO(), 1024, 1024, 1024) 1683 1684 1685 class PyBufferedRandomTest(BufferedRandomTest): 1686 tp = pyio.BufferedRandom 1687 612 1688 613 1689 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these … … 744 1820 ] 745 1821 746 def test Decoder(self):1822 def test_decoder(self): 747 1823 # Try a few one-shot test cases. 748 1824 for input, eof, output in self.test_cases: 749 1825 d = StatefulIncrementalDecoder() 750 self.assertEqual s(d.decode(input, eof), output)1826 self.assertEqual(d.decode(input, eof), output) 751 1827 752 1828 # Also test an unfinished decode, followed by forcing EOF. 753 1829 d = StatefulIncrementalDecoder() 754 self.assertEqual s(d.decode(b'oiabcd'), '')755 self.assertEqual s(d.decode(b'', 1), 'abcd.')1830 self.assertEqual(d.decode(b'oiabcd'), '') 1831 self.assertEqual(d.decode(b'', 1), 'abcd.') 756 1832 757 1833 class TextIOWrapperTest(unittest.TestCase): … … 760 1836 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 761 1837 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 1838 support.unlink(support.TESTFN) 762 1839 763 1840 def tearDown(self): 764 test_support.unlink(test_support.TESTFN) 765 766 def testLineBuffering(self): 767 r = io.BytesIO() 768 b = io.BufferedWriter(r, 1000) 769 t = io.TextIOWrapper(b, newline="\n", line_buffering=True) 770 t.write(u"X") 771 self.assertEquals(r.getvalue(), b"") # No flush happened 772 t.write(u"Y\nZ") 773 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed 774 t.write(u"A\rB") 775 self.assertEquals(r.getvalue(), b"XY\nZA\rB") 776 777 def testEncodingErrorsReading(self): 1841 support.unlink(support.TESTFN) 1842 1843 def test_constructor(self): 1844 r = self.BytesIO(b"\xc3\xa9\n\n") 1845 b = self.BufferedReader(r, 1000) 1846 t = self.TextIOWrapper(b) 1847 t.__init__(b, encoding="latin1", newline="\r\n") 1848 self.assertEqual(t.encoding, "latin1") 1849 self.assertEqual(t.line_buffering, False) 1850 t.__init__(b, encoding="utf8", line_buffering=True) 1851 self.assertEqual(t.encoding, "utf8") 1852 self.assertEqual(t.line_buffering, True) 1853 self.assertEqual("\xe9\n", t.readline()) 1854 self.assertRaises(TypeError, t.__init__, b, newline=42) 1855 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 1856 1857 def test_detach(self): 1858 r = self.BytesIO() 1859 b = self.BufferedWriter(r) 1860 t = self.TextIOWrapper(b) 1861 self.assertIs(t.detach(), b) 1862 1863 t = self.TextIOWrapper(b, encoding="ascii") 1864 t.write("howdy") 1865 self.assertFalse(r.getvalue()) 1866 t.detach() 1867 self.assertEqual(r.getvalue(), b"howdy") 1868 self.assertRaises(ValueError, t.detach) 1869 1870 def test_repr(self): 1871 raw = self.BytesIO("hello".encode("utf-8")) 1872 b = self.BufferedReader(raw) 1873 t = self.TextIOWrapper(b, encoding="utf-8") 1874 modname = self.TextIOWrapper.__module__ 1875 self.assertEqual(repr(t), 1876 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 1877 raw.name = "dummy" 1878 self.assertEqual(repr(t), 1879 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) 1880 raw.name = b"dummy" 1881 self.assertEqual(repr(t), 1882 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 1883 1884 def test_line_buffering(self): 1885 r = self.BytesIO() 1886 b = self.BufferedWriter(r, 1000) 1887 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 1888 t.write("X") 1889 self.assertEqual(r.getvalue(), b"") # No flush happened 1890 t.write("Y\nZ") 1891 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 1892 t.write("A\rB") 1893 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 1894 1895 def test_encoding(self): 1896 # Check the encoding attribute is always set, and valid 1897 b = self.BytesIO() 1898 t = self.TextIOWrapper(b, encoding="utf8") 1899 self.assertEqual(t.encoding, "utf8") 1900 t = self.TextIOWrapper(b) 1901 self.assertTrue(t.encoding is not None) 1902 codecs.lookup(t.encoding) 1903 1904 def test_encoding_errors_reading(self): 778 1905 # (1) default 779 b = io.BytesIO(b"abc\n\xff\n")780 t = io.TextIOWrapper(b, encoding="ascii")1906 b = self.BytesIO(b"abc\n\xff\n") 1907 t = self.TextIOWrapper(b, encoding="ascii") 781 1908 self.assertRaises(UnicodeError, t.read) 782 1909 # (2) explicit strict 783 b = io.BytesIO(b"abc\n\xff\n")784 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")1910 b = self.BytesIO(b"abc\n\xff\n") 1911 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 785 1912 self.assertRaises(UnicodeError, t.read) 786 1913 # (3) ignore 787 b = io.BytesIO(b"abc\n\xff\n")788 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")789 self.assertEqual s(t.read(), "abc\n\n")1914 b = self.BytesIO(b"abc\n\xff\n") 1915 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 1916 self.assertEqual(t.read(), "abc\n\n") 790 1917 # (4) replace 791 b = io.BytesIO(b"abc\n\xff\n")792 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")793 self.assertEqual s(t.read(), u"abc\n\ufffd\n")794 795 def test EncodingErrorsWriting(self):1918 b = self.BytesIO(b"abc\n\xff\n") 1919 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 1920 self.assertEqual(t.read(), "abc\n\ufffd\n") 1921 1922 def test_encoding_errors_writing(self): 796 1923 # (1) default 797 b = io.BytesIO()798 t = io.TextIOWrapper(b, encoding="ascii")799 self.assertRaises(UnicodeError, t.write, u"\xff")1924 b = self.BytesIO() 1925 t = self.TextIOWrapper(b, encoding="ascii") 1926 self.assertRaises(UnicodeError, t.write, "\xff") 800 1927 # (2) explicit strict 801 b = io.BytesIO()802 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")803 self.assertRaises(UnicodeError, t.write, u"\xff")1928 b = self.BytesIO() 1929 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 1930 self.assertRaises(UnicodeError, t.write, "\xff") 804 1931 # (3) ignore 805 b = io.BytesIO()806 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",1932 b = self.BytesIO() 1933 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 807 1934 newline="\n") 808 t.write( u"abc\xffdef\n")1935 t.write("abc\xffdef\n") 809 1936 t.flush() 810 self.assertEqual s(b.getvalue(), b"abcdef\n")1937 self.assertEqual(b.getvalue(), b"abcdef\n") 811 1938 # (4) replace 812 b = io.BytesIO()813 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",1939 b = self.BytesIO() 1940 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 814 1941 newline="\n") 815 t.write( u"abc\xffdef\n")1942 t.write("abc\xffdef\n") 816 1943 t.flush() 817 self.assertEquals(b.getvalue(), b"abc?def\n") 818 819 def testNewlinesInput(self): 820 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 821 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 822 for newline, expected in [ 823 (None, normalized.decode("ascii").splitlines(True)), 824 ("", testdata.decode("ascii").splitlines(True)), 825 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 826 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 827 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 828 ]: 829 buf = io.BytesIO(testdata) 830 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) 831 self.assertEquals(txt.readlines(), expected) 832 txt.seek(0) 833 self.assertEquals(txt.read(), "".join(expected)) 834 835 def testNewlinesOutput(self): 836 testdict = { 837 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 838 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 839 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 840 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 841 } 842 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 843 for newline, expected in tests: 844 buf = io.BytesIO() 845 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) 846 txt.write("AAA\nB") 847 txt.write("BB\nCCC\n") 848 txt.write("X\rY\r\nZ") 849 txt.flush() 850 self.assertEquals(buf.closed, False) 851 self.assertEquals(buf.getvalue(), expected) 852 853 def testNewlines(self): 1944 self.assertEqual(b.getvalue(), b"abc?def\n") 1945 1946 def test_newlines(self): 854 1947 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 855 1948 … … 875 1968 for bufsize in range(1, 10): 876 1969 for newline, exp_lines in tests: 877 bufio = io.BufferedReader(io.BytesIO(data), bufsize)878 textio = io.TextIOWrapper(bufio, newline=newline,1970 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 1971 textio = self.TextIOWrapper(bufio, newline=newline, 879 1972 encoding=encoding) 880 1973 if do_reads: … … 884 1977 if c2 == '': 885 1978 break 886 self.assertEqual s(len(c2), 2)1979 self.assertEqual(len(c2), 2) 887 1980 got_lines.append(c2 + textio.readline()) 888 1981 else: … … 890 1983 891 1984 for got_line, exp_line in zip(got_lines, exp_lines): 892 self.assertEqual s(got_line, exp_line)893 self.assertEqual s(len(got_lines), len(exp_lines))894 895 def test NewlinesInput(self):896 testdata = b"AAA\nBB B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"1985 self.assertEqual(got_line, exp_line) 1986 self.assertEqual(len(got_lines), len(exp_lines)) 1987 1988 def test_newlines_input(self): 1989 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 897 1990 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 898 1991 for newline, expected in [ 899 1992 (None, normalized.decode("ascii").splitlines(True)), 900 1993 ("", testdata.decode("ascii").splitlines(True)), 901 ("\n", ["AAA\n", "BB B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),902 ("\r\n", ["AAA\nBB B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),903 ("\r", ["AAA\nBB B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),1994 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 1995 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 1996 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 904 1997 ]: 905 buf = io.BytesIO(testdata)906 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)907 self.assertEqual s(txt.readlines(), expected)1998 buf = self.BytesIO(testdata) 1999 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2000 self.assertEqual(txt.readlines(), expected) 908 2001 txt.seek(0) 909 self.assertEquals(txt.read(), "".join(expected)) 910 911 def testNewlinesOutput(self): 912 data = u"AAA\nBBB\rCCC\n" 913 data_lf = b"AAA\nBBB\rCCC\n" 914 data_cr = b"AAA\rBBB\rCCC\r" 915 data_crlf = b"AAA\r\nBBB\rCCC\r\n" 916 save_linesep = os.linesep 917 try: 918 for os.linesep, newline, expected in [ 919 ("\n", None, data_lf), 920 ("\r\n", None, data_crlf), 921 ("\n", "", data_lf), 922 ("\r\n", "", data_lf), 923 ("\n", "\n", data_lf), 924 ("\r\n", "\n", data_lf), 925 ("\n", "\r", data_cr), 926 ("\r\n", "\r", data_cr), 927 ("\n", "\r\n", data_crlf), 928 ("\r\n", "\r\n", data_crlf), 929 ]: 930 buf = io.BytesIO() 931 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) 932 txt.write(data) 933 txt.close() 934 self.assertEquals(buf.closed, True) 935 self.assertRaises(ValueError, buf.getvalue) 936 finally: 937 os.linesep = save_linesep 2002 self.assertEqual(txt.read(), "".join(expected)) 2003 2004 def test_newlines_output(self): 2005 testdict = { 2006 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2007 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2008 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2009 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2010 } 2011 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2012 for newline, expected in tests: 2013 buf = self.BytesIO() 2014 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2015 txt.write("AAA\nB") 2016 txt.write("BB\nCCC\n") 2017 txt.write("X\rY\r\nZ") 2018 txt.flush() 2019 self.assertEqual(buf.closed, False) 2020 self.assertEqual(buf.getvalue(), expected) 2021 2022 def test_destructor(self): 2023 l = [] 2024 base = self.BytesIO 2025 class MyBytesIO(base): 2026 def close(self): 2027 l.append(self.getvalue()) 2028 base.close(self) 2029 b = MyBytesIO() 2030 t = self.TextIOWrapper(b, encoding="ascii") 2031 t.write("abc") 2032 del t 2033 support.gc_collect() 2034 self.assertEqual([b"abc"], l) 2035 2036 def test_override_destructor(self): 2037 record = [] 2038 class MyTextIO(self.TextIOWrapper): 2039 def __del__(self): 2040 record.append(1) 2041 try: 2042 f = super(MyTextIO, self).__del__ 2043 except AttributeError: 2044 pass 2045 else: 2046 f() 2047 def close(self): 2048 record.append(2) 2049 super(MyTextIO, self).close() 2050 def flush(self): 2051 record.append(3) 2052 super(MyTextIO, self).flush() 2053 b = self.BytesIO() 2054 t = MyTextIO(b, encoding="ascii") 2055 del t 2056 support.gc_collect() 2057 self.assertEqual(record, [1, 2, 3]) 2058 2059 def test_error_through_destructor(self): 2060 # Test that the exception state is not modified by a destructor, 2061 # even if close() fails. 2062 rawio = self.CloseFailureIO() 2063 def f(): 2064 self.TextIOWrapper(rawio).xyzzy 2065 with support.captured_output("stderr") as s: 2066 self.assertRaises(AttributeError, f) 2067 s = s.getvalue().strip() 2068 if s: 2069 # The destructor *may* have printed an unraisable error, check it 2070 self.assertEqual(len(s.splitlines()), 1) 2071 self.assertTrue(s.startswith("Exception IOError: "), s) 2072 self.assertTrue(s.endswith(" ignored"), s) 938 2073 939 2074 # Systematic tests of the text I/O API 940 2075 941 def test BasicIO(self):2076 def test_basic_io(self): 942 2077 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 943 2078 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": 944 f = io.open(test_support.TESTFN, "w+", encoding=enc)2079 f = self.open(support.TESTFN, "w+", encoding=enc) 945 2080 f._CHUNK_SIZE = chunksize 946 self.assertEqual s(f.write(u"abc"), 3)2081 self.assertEqual(f.write("abc"), 3) 947 2082 f.close() 948 f = io.open(test_support.TESTFN, "r+", encoding=enc)2083 f = self.open(support.TESTFN, "r+", encoding=enc) 949 2084 f._CHUNK_SIZE = chunksize 950 self.assertEqual s(f.tell(), 0)951 self.assertEqual s(f.read(), u"abc")2085 self.assertEqual(f.tell(), 0) 2086 self.assertEqual(f.read(), "abc") 952 2087 cookie = f.tell() 953 self.assertEquals(f.seek(0), 0) 954 self.assertEquals(f.read(2), u"ab") 955 self.assertEquals(f.read(1), u"c") 956 self.assertEquals(f.read(1), u"") 957 self.assertEquals(f.read(), u"") 958 self.assertEquals(f.tell(), cookie) 959 self.assertEquals(f.seek(0), 0) 960 self.assertEquals(f.seek(0, 2), cookie) 961 self.assertEquals(f.write(u"def"), 3) 962 self.assertEquals(f.seek(cookie), cookie) 963 self.assertEquals(f.read(), u"def") 2088 self.assertEqual(f.seek(0), 0) 2089 self.assertEqual(f.read(None), "abc") 2090 f.seek(0) 2091 self.assertEqual(f.read(2), "ab") 2092 self.assertEqual(f.read(1), "c") 2093 self.assertEqual(f.read(1), "") 2094 self.assertEqual(f.read(), "") 2095 self.assertEqual(f.tell(), cookie) 2096 self.assertEqual(f.seek(0), 0) 2097 self.assertEqual(f.seek(0, 2), cookie) 2098 self.assertEqual(f.write("def"), 3) 2099 self.assertEqual(f.seek(cookie), cookie) 2100 self.assertEqual(f.read(), "def") 964 2101 if enc.startswith("utf"): 965 2102 self.multi_line_test(f, enc) … … 969 2106 f.seek(0) 970 2107 f.truncate() 971 sample = u"s\xff\u0fff\uffff"2108 sample = "s\xff\u0fff\uffff" 972 2109 wlines = [] 973 2110 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): … … 975 2112 for i in range(size): 976 2113 chars.append(sample[i % len(sample)]) 977 line = u"".join(chars) + u"\n"2114 line = "".join(chars) + "\n" 978 2115 wlines.append((f.tell(), line)) 979 2116 f.write(line) … … 986 2123 break 987 2124 rlines.append((pos, line)) 988 self.assertEqual s(rlines, wlines)989 990 def test Telling(self):991 f = io.open(test_support.TESTFN, "w+", encoding="utf8")2125 self.assertEqual(rlines, wlines) 2126 2127 def test_telling(self): 2128 f = self.open(support.TESTFN, "w+", encoding="utf8") 992 2129 p0 = f.tell() 993 f.write( u"\xff\n")2130 f.write("\xff\n") 994 2131 p1 = f.tell() 995 f.write( u"\xff\n")2132 f.write("\xff\n") 996 2133 p2 = f.tell() 997 2134 f.seek(0) 998 self.assertEqual s(f.tell(), p0)999 self.assertEqual s(f.readline(), u"\xff\n")1000 self.assertEqual s(f.tell(), p1)1001 self.assertEqual s(f.readline(), u"\xff\n")1002 self.assertEqual s(f.tell(), p2)2135 self.assertEqual(f.tell(), p0) 2136 self.assertEqual(f.readline(), "\xff\n") 2137 self.assertEqual(f.tell(), p1) 2138 self.assertEqual(f.readline(), "\xff\n") 2139 self.assertEqual(f.tell(), p2) 1003 2140 f.seek(0) 1004 2141 for line in f: 1005 self.assertEqual s(line, u"\xff\n")2142 self.assertEqual(line, "\xff\n") 1006 2143 self.assertRaises(IOError, f.tell) 1007 self.assertEqual s(f.tell(), p2)2144 self.assertEqual(f.tell(), p2) 1008 2145 f.close() 1009 2146 1010 def test Seeking(self):1011 chunk_size = io.TextIOWrapper._CHUNK_SIZE2147 def test_seeking(self): 2148 chunk_size = _default_chunk_size() 1012 2149 prefix_size = chunk_size - 2 1013 2150 u_prefix = "a" * prefix_size 1014 2151 prefix = bytes(u_prefix.encode("utf-8")) 1015 self.assertEqual s(len(u_prefix), len(prefix))2152 self.assertEqual(len(u_prefix), len(prefix)) 1016 2153 u_suffix = "\u8888\n" 1017 2154 suffix = bytes(u_suffix.encode("utf-8")) 1018 2155 line = prefix + suffix 1019 f = io.open(test_support.TESTFN, "wb")2156 f = self.open(support.TESTFN, "wb") 1020 2157 f.write(line*2) 1021 2158 f.close() 1022 f = io.open(test_support.TESTFN, "r", encoding="utf-8")2159 f = self.open(support.TESTFN, "r", encoding="utf-8") 1023 2160 s = f.read(prefix_size) 1024 self.assertEqual s(s, unicode(prefix,"ascii"))1025 self.assertEqual s(f.tell(), prefix_size)1026 self.assertEqual s(f.readline(), u_suffix)1027 1028 def test SeekingToo(self):2161 self.assertEqual(s, prefix.decode("ascii")) 2162 self.assertEqual(f.tell(), prefix_size) 2163 self.assertEqual(f.readline(), u_suffix) 2164 2165 def test_seeking_too(self): 1029 2166 # Regression test for a specific bug 1030 2167 data = b'\xe0\xbf\xbf\n' 1031 f = io.open(test_support.TESTFN, "wb")2168 f = self.open(support.TESTFN, "wb") 1032 2169 f.write(data) 1033 2170 f.close() 1034 f = io.open(test_support.TESTFN, "r", encoding="utf-8")2171 f = self.open(support.TESTFN, "r", encoding="utf-8") 1035 2172 f._CHUNK_SIZE # Just test that it exists 1036 2173 f._CHUNK_SIZE = 2 … … 1038 2175 f.tell() 1039 2176 1040 def testSeekAndTell(self): 1041 """Test seek/tell using the StatefulIncrementalDecoder.""" 1042 1043 def testSeekAndTellWithData(data, min_pos=0): 2177 def test_seek_and_tell(self): 2178 #Test seek/tell using the StatefulIncrementalDecoder. 2179 # Make test faster by doing smaller seeks 2180 CHUNK_SIZE = 128 2181 2182 def test_seek_and_tell_with_data(data, min_pos=0): 1044 2183 """Tell/seek to various points within a data stream and ensure 1045 2184 that the decoded data returned by read() is consistent.""" 1046 f = io.open(test_support.TESTFN, 'wb')2185 f = self.open(support.TESTFN, 'wb') 1047 2186 f.write(data) 1048 2187 f.close() 1049 f = io.open(test_support.TESTFN, encoding='test_decoder') 2188 f = self.open(support.TESTFN, encoding='test_decoder') 2189 f._CHUNK_SIZE = CHUNK_SIZE 1050 2190 decoded = f.read() 1051 2191 f.close() … … 1053 2193 for i in range(min_pos, len(decoded) + 1): # seek positions 1054 2194 for j in [1, 5, len(decoded) - i]: # read lengths 1055 f = io.open(test_support.TESTFN, encoding='test_decoder')1056 self.assertEqual s(f.read(i), decoded[:i])2195 f = self.open(support.TESTFN, encoding='test_decoder') 2196 self.assertEqual(f.read(i), decoded[:i]) 1057 2197 cookie = f.tell() 1058 self.assertEqual s(f.read(j), decoded[i:i + j])2198 self.assertEqual(f.read(j), decoded[i:i + j]) 1059 2199 f.seek(cookie) 1060 self.assertEqual s(f.read(), decoded[i:])2200 self.assertEqual(f.read(), decoded[i:]) 1061 2201 f.close() 1062 2202 … … 1068 2208 # Try each test case. 1069 2209 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 1070 test SeekAndTellWithData(input)2210 test_seek_and_tell_with_data(input) 1071 2211 1072 2212 # Position each test case so that it crosses a chunk boundary. 1073 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE1074 2213 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 1075 2214 offset = CHUNK_SIZE - len(input)//2 … … 1077 2216 # Don't bother seeking into the prefix (takes too long). 1078 2217 min_pos = offset*2 1079 test SeekAndTellWithData(prefix + input, min_pos)2218 test_seek_and_tell_with_data(prefix + input, min_pos) 1080 2219 1081 2220 # Ensure our test decoder won't interfere with subsequent tests. … … 1083 2222 StatefulIncrementalDecoder.codecEnabled = 0 1084 2223 1085 def test EncodedWrites(self):1086 data = u"1234567890"2224 def test_encoded_writes(self): 2225 data = "1234567890" 1087 2226 tests = ("utf-16", 1088 2227 "utf-16-le", … … 1092 2231 "utf-32-be") 1093 2232 for encoding in tests: 1094 buf = io.BytesIO()1095 f = io.TextIOWrapper(buf, encoding=encoding)2233 buf = self.BytesIO() 2234 f = self.TextIOWrapper(buf, encoding=encoding) 1096 2235 # Check if the BOM is written only once (see issue1753). 1097 2236 f.write(data) 1098 2237 f.write(data) 1099 2238 f.seek(0) 1100 self.assertEquals(f.read(), data * 2) 1101 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding)) 1102 1103 def timingTest(self): 1104 timer = time.time 1105 enc = "utf8" 1106 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n" 1107 nlines = 10000 1108 nchars = len(line) 1109 nbytes = len(line.encode(enc)) 1110 for chunk_size in (32, 64, 128, 256): 1111 f = io.open(test_support.TESTFN, "w+", encoding=enc) 1112 f._CHUNK_SIZE = chunk_size 1113 t0 = timer() 1114 for i in range(nlines): 1115 f.write(line) 1116 f.flush() 1117 t1 = timer() 2239 self.assertEqual(f.read(), data * 2) 1118 2240 f.seek(0) 1119 for line in f: 1120 pass 1121 t2 = timer() 1122 f.seek(0) 1123 while f.readline(): 1124 pass 1125 t3 = timer() 1126 f.seek(0) 1127 while f.readline(): 1128 f.tell() 1129 t4 = timer() 1130 f.close() 1131 if test_support.verbose: 1132 print("\nTiming test: %d lines of %d characters (%d bytes)" % 1133 (nlines, nchars, nbytes)) 1134 print("File chunk size: %6s" % f._CHUNK_SIZE) 1135 print("Writing: %6.3f seconds" % (t1-t0)) 1136 print("Reading using iteration: %6.3f seconds" % (t2-t1)) 1137 print("Reading using readline(): %6.3f seconds" % (t3-t2)) 1138 print("Using readline()+tell(): %6.3f seconds" % (t4-t3)) 1139 1140 def testReadOneByOne(self): 1141 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB")) 2241 self.assertEqual(f.read(), data * 2) 2242 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2243 2244 def test_unreadable(self): 2245 class UnReadable(self.BytesIO): 2246 def readable(self): 2247 return False 2248 txt = self.TextIOWrapper(UnReadable()) 2249 self.assertRaises(IOError, txt.read) 2250 2251 def test_read_one_by_one(self): 2252 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 1142 2253 reads = "" 1143 2254 while True: … … 1146 2257 break 1147 2258 reads += c 1148 self.assertEquals(reads, "AA\nBB") 2259 self.assertEqual(reads, "AA\nBB") 2260 2261 def test_readlines(self): 2262 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2263 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2264 txt.seek(0) 2265 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2266 txt.seek(0) 2267 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 1149 2268 1150 2269 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 1151 def test ReadByChunk(self):2270 def test_read_by_chunk(self): 1152 2271 # make sure "\r\n" straddles 128 char boundary. 1153 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))2272 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 1154 2273 reads = "" 1155 2274 while True: … … 1158 2277 break 1159 2278 reads += c 1160 self.assertEquals(reads, "A"*127+"\nB") 2279 self.assertEqual(reads, "A"*127+"\nB") 2280 2281 def test_writelines(self): 2282 l = ['ab', 'cd', 'ef'] 2283 buf = self.BytesIO() 2284 txt = self.TextIOWrapper(buf) 2285 txt.writelines(l) 2286 txt.flush() 2287 self.assertEqual(buf.getvalue(), b'abcdef') 2288 2289 def test_writelines_userlist(self): 2290 l = UserList(['ab', 'cd', 'ef']) 2291 buf = self.BytesIO() 2292 txt = self.TextIOWrapper(buf) 2293 txt.writelines(l) 2294 txt.flush() 2295 self.assertEqual(buf.getvalue(), b'abcdef') 2296 2297 def test_writelines_error(self): 2298 txt = self.TextIOWrapper(self.BytesIO()) 2299 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 2300 self.assertRaises(TypeError, txt.writelines, None) 2301 self.assertRaises(TypeError, txt.writelines, b'abc') 1161 2302 1162 2303 def test_issue1395_1(self): 1163 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")2304 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 1164 2305 1165 2306 # read one char at a time … … 1170 2311 break 1171 2312 reads += c 1172 self.assertEqual s(reads, self.normalized)2313 self.assertEqual(reads, self.normalized) 1173 2314 1174 2315 def test_issue1395_2(self): 1175 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")2316 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 1176 2317 txt._CHUNK_SIZE = 4 1177 2318 … … 1182 2323 break 1183 2324 reads += c 1184 self.assertEqual s(reads, self.normalized)2325 self.assertEqual(reads, self.normalized) 1185 2326 1186 2327 def test_issue1395_3(self): 1187 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")2328 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 1188 2329 txt._CHUNK_SIZE = 4 1189 2330 … … 1193 2334 reads += txt.readline() 1194 2335 reads += txt.readline() 1195 self.assertEqual s(reads, self.normalized)2336 self.assertEqual(reads, self.normalized) 1196 2337 1197 2338 def test_issue1395_4(self): 1198 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")2339 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 1199 2340 txt._CHUNK_SIZE = 4 1200 2341 1201 2342 reads = txt.read(4) 1202 2343 reads += txt.read() 1203 self.assertEqual s(reads, self.normalized)2344 self.assertEqual(reads, self.normalized) 1204 2345 1205 2346 def test_issue1395_5(self): 1206 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")2347 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 1207 2348 txt._CHUNK_SIZE = 4 1208 2349 … … 1211 2352 txt.seek(0) 1212 2353 txt.seek(pos) 1213 self.assertEqual s(txt.read(4), "BBB\n")2354 self.assertEqual(txt.read(4), "BBB\n") 1214 2355 1215 2356 def test_issue2282(self): 1216 buffer = io.BytesIO(self.testdata)1217 txt = io.TextIOWrapper(buffer, encoding="ascii")2357 buffer = self.BytesIO(self.testdata) 2358 txt = self.TextIOWrapper(buffer, encoding="ascii") 1218 2359 1219 2360 self.assertEqual(buffer.seekable(), txt.seekable()) 1220 2361 1221 def check_newline_decoder_utf8(self, decoder): 2362 def test_append_bom(self): 2363 # The BOM is not written again when appending to a non-empty file 2364 filename = support.TESTFN 2365 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2366 with self.open(filename, 'w', encoding=charset) as f: 2367 f.write('aaa') 2368 pos = f.tell() 2369 with self.open(filename, 'rb') as f: 2370 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2371 2372 with self.open(filename, 'a', encoding=charset) as f: 2373 f.write('xxx') 2374 with self.open(filename, 'rb') as f: 2375 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2376 2377 def test_seek_bom(self): 2378 # Same test, but when seeking manually 2379 filename = support.TESTFN 2380 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2381 with self.open(filename, 'w', encoding=charset) as f: 2382 f.write('aaa') 2383 pos = f.tell() 2384 with self.open(filename, 'r+', encoding=charset) as f: 2385 f.seek(pos) 2386 f.write('zzz') 2387 f.seek(0) 2388 f.write('bbb') 2389 with self.open(filename, 'rb') as f: 2390 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2391 2392 def test_errors_property(self): 2393 with self.open(support.TESTFN, "w") as f: 2394 self.assertEqual(f.errors, "strict") 2395 with self.open(support.TESTFN, "w", errors="replace") as f: 2396 self.assertEqual(f.errors, "replace") 2397 2398 @unittest.skipUnless(threading, 'Threading required for this test.') 2399 def test_threads_write(self): 2400 # Issue6750: concurrent writes could duplicate data 2401 event = threading.Event() 2402 with self.open(support.TESTFN, "w", buffering=1) as f: 2403 def run(n): 2404 text = "Thread%03d\n" % n 2405 event.wait() 2406 f.write(text) 2407 threads = [threading.Thread(target=lambda n=x: run(n)) 2408 for x in range(20)] 2409 for t in threads: 2410 t.start() 2411 time.sleep(0.02) 2412 event.set() 2413 for t in threads: 2414 t.join() 2415 with self.open(support.TESTFN) as f: 2416 content = f.read() 2417 for n in range(20): 2418 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2419 2420 def test_flush_error_on_close(self): 2421 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2422 def bad_flush(): 2423 raise IOError() 2424 txt.flush = bad_flush 2425 self.assertRaises(IOError, txt.close) # exception not swallowed 2426 self.assertTrue(txt.closed) 2427 2428 def test_multi_close(self): 2429 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2430 txt.close() 2431 txt.close() 2432 txt.close() 2433 self.assertRaises(ValueError, txt.flush) 2434 2435 def test_readonly_attributes(self): 2436 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2437 buf = self.BytesIO(self.testdata) 2438 with self.assertRaises((AttributeError, TypeError)): 2439 txt.buffer = buf 2440 2441 def test_read_nonbytes(self): 2442 # Issue #17106 2443 # Crash when underlying read() returns non-bytes 2444 class NonbytesStream(self.StringIO): 2445 read1 = self.StringIO.read 2446 class NonbytesStream(self.StringIO): 2447 read1 = self.StringIO.read 2448 t = self.TextIOWrapper(NonbytesStream('a')) 2449 with self.maybeRaises(TypeError): 2450 t.read(1) 2451 t = self.TextIOWrapper(NonbytesStream('a')) 2452 with self.maybeRaises(TypeError): 2453 t.readline() 2454 t = self.TextIOWrapper(NonbytesStream('a')) 2455 self.assertEqual(t.read(), u'a') 2456 2457 def test_illegal_decoder(self): 2458 # Issue #17106 2459 # Crash when decoder returns non-string 2460 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2461 encoding='quopri_codec') 2462 with self.maybeRaises(TypeError): 2463 t.read(1) 2464 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2465 encoding='quopri_codec') 2466 with self.maybeRaises(TypeError): 2467 t.readline() 2468 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2469 encoding='quopri_codec') 2470 with self.maybeRaises(TypeError): 2471 t.read() 2472 2473 2474 class CTextIOWrapperTest(TextIOWrapperTest): 2475 2476 def test_initialization(self): 2477 r = self.BytesIO(b"\xc3\xa9\n\n") 2478 b = self.BufferedReader(r, 1000) 2479 t = self.TextIOWrapper(b) 2480 self.assertRaises(TypeError, t.__init__, b, newline=42) 2481 self.assertRaises(ValueError, t.read) 2482 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2483 self.assertRaises(ValueError, t.read) 2484 2485 def test_garbage_collection(self): 2486 # C TextIOWrapper objects are collected, and collecting them flushes 2487 # all data to disk. 2488 # The Python version has __del__, so it ends in gc.garbage instead. 2489 rawio = io.FileIO(support.TESTFN, "wb") 2490 b = self.BufferedWriter(rawio) 2491 t = self.TextIOWrapper(b, encoding="ascii") 2492 t.write("456def") 2493 t.x = t 2494 wr = weakref.ref(t) 2495 del t 2496 support.gc_collect() 2497 self.assertTrue(wr() is None, wr) 2498 with self.open(support.TESTFN, "rb") as f: 2499 self.assertEqual(f.read(), b"456def") 2500 2501 def test_rwpair_cleared_before_textio(self): 2502 # Issue 13070: TextIOWrapper's finalization would crash when called 2503 # after the reference to the underlying BufferedRWPair's writer got 2504 # cleared by the GC. 2505 for i in range(1000): 2506 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2507 t1 = self.TextIOWrapper(b1, encoding="ascii") 2508 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2509 t2 = self.TextIOWrapper(b2, encoding="ascii") 2510 # circular references 2511 t1.buddy = t2 2512 t2.buddy = t1 2513 support.gc_collect() 2514 2515 maybeRaises = unittest.TestCase.assertRaises 2516 2517 2518 class PyTextIOWrapperTest(TextIOWrapperTest): 2519 @contextlib.contextmanager 2520 def maybeRaises(self, *args, **kwds): 2521 yield 2522 2523 2524 class IncrementalNewlineDecoderTest(unittest.TestCase): 2525 2526 def check_newline_decoding_utf8(self, decoder): 1222 2527 # UTF-8 specific tests for a newline decoder 1223 2528 def _check_decode(b, s, **kwargs): 1224 2529 # We exercise getstate() / setstate() as well as decode() 1225 2530 state = decoder.getstate() 1226 self.assertEqual s(decoder.decode(b, **kwargs), s)2531 self.assertEqual(decoder.decode(b, **kwargs), s) 1227 2532 decoder.setstate(state) 1228 self.assertEqual s(decoder.decode(b, **kwargs), s)2533 self.assertEqual(decoder.decode(b, **kwargs), s) 1229 2534 1230 2535 _check_decode(b'\xe8\xa2\x88', "\u8888") … … 1261 2566 _check_decode(b'\n', "\n") 1262 2567 1263 def check_newline_decod er(self, decoder, encoding):2568 def check_newline_decoding(self, decoder, encoding): 1264 2569 result = [] 1265 encoder = codecs.getincrementalencoder(encoding)() 1266 def _decode_bytewise(s): 1267 for b in encoder.encode(s): 1268 result.append(decoder.decode(b)) 1269 self.assertEquals(decoder.newlines, None) 2570 if encoding is not None: 2571 encoder = codecs.getincrementalencoder(encoding)() 2572 def _decode_bytewise(s): 2573 # Decode one byte at a time 2574 for b in encoder.encode(s): 2575 result.append(decoder.decode(b)) 2576 else: 2577 encoder = None 2578 def _decode_bytewise(s): 2579 # Decode one char at a time 2580 for c in s: 2581 result.append(decoder.decode(c)) 2582 self.assertEqual(decoder.newlines, None) 1270 2583 _decode_bytewise("abc\n\r") 1271 self.assertEqual s(decoder.newlines, '\n')2584 self.assertEqual(decoder.newlines, '\n') 1272 2585 _decode_bytewise("\nabc") 1273 self.assertEqual s(decoder.newlines, ('\n', '\r\n'))2586 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 1274 2587 _decode_bytewise("abc\r") 1275 self.assertEqual s(decoder.newlines, ('\n', '\r\n'))2588 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 1276 2589 _decode_bytewise("abc") 1277 self.assertEqual s(decoder.newlines, ('\r', '\n', '\r\n'))2590 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 1278 2591 _decode_bytewise("abc\r") 1279 self.assertEqual s("".join(result), "abc\n\nabcabc\nabcabc")2592 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 1280 2593 decoder.reset() 1281 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc") 1282 self.assertEquals(decoder.newlines, None) 2594 input = "abc" 2595 if encoder is not None: 2596 encoder.reset() 2597 input = encoder.encode(input) 2598 self.assertEqual(decoder.decode(input), "abc") 2599 self.assertEqual(decoder.newlines, None) 1283 2600 1284 2601 def test_newline_decoder(self): 1285 2602 encodings = ( 1286 'utf-8', 'latin-1', 2603 # None meaning the IncrementalNewlineDecoder takes unicode input 2604 # rather than bytes input 2605 None, 'utf-8', 'latin-1', 1287 2606 'utf-16', 'utf-16-le', 'utf-16-be', 1288 2607 'utf-32', 'utf-32-le', 'utf-32-be', 1289 2608 ) 1290 2609 for enc in encodings: 1291 decoder = codecs.getincrementaldecoder(enc)()1292 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)1293 self.check_newline_decod er(decoder, enc)2610 decoder = enc and codecs.getincrementaldecoder(enc)() 2611 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2612 self.check_newline_decoding(decoder, enc) 1294 2613 decoder = codecs.getincrementaldecoder("utf-8")() 1295 decoder = io.IncrementalNewlineDecoder(decoder, translate=True) 1296 self.check_newline_decoder_utf8(decoder) 2614 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2615 self.check_newline_decoding_utf8(decoder) 2616 2617 def test_newline_bytes(self): 2618 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 2619 def _check(dec): 2620 self.assertEqual(dec.newlines, None) 2621 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 2622 self.assertEqual(dec.newlines, None) 2623 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 2624 self.assertEqual(dec.newlines, None) 2625 dec = self.IncrementalNewlineDecoder(None, translate=False) 2626 _check(dec) 2627 dec = self.IncrementalNewlineDecoder(None, translate=True) 2628 _check(dec) 2629 2630 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2631 pass 2632 2633 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2634 pass 1297 2635 1298 2636 … … 1302 2640 1303 2641 def tearDown(self): 1304 test_support.unlink(test_support.TESTFN)1305 1306 def test Import__all__(self):1307 for name in io.__all__:1308 obj = getattr( io, name, None)1309 self.assert _(obj is not None, name)2642 support.unlink(support.TESTFN) 2643 2644 def test___all__(self): 2645 for name in self.io.__all__: 2646 obj = getattr(self.io, name, None) 2647 self.assertTrue(obj is not None, name) 1310 2648 if name == "open": 1311 2649 continue 1312 elif "error" in name.lower(): 1313 self.assert_(issubclass(obj, Exception), name) 1314 else: 1315 self.assert_(issubclass(obj, io.IOBase)) 1316 2650 elif "error" in name.lower() or name == "UnsupportedOperation": 2651 self.assertTrue(issubclass(obj, Exception), name) 2652 elif not name.startswith("SEEK_"): 2653 self.assertTrue(issubclass(obj, self.IOBase)) 1317 2654 1318 2655 def test_attributes(self): 1319 f = io.open(test_support.TESTFN, "wb", buffering=0)1320 self.assertEqual s(f.mode, "wb")2656 f = self.open(support.TESTFN, "wb", buffering=0) 2657 self.assertEqual(f.mode, "wb") 1321 2658 f.close() 1322 2659 1323 f = io.open(test_support.TESTFN, "U")1324 self.assertEqual s(f.name, test_support.TESTFN)1325 self.assertEqual s(f.buffer.name, test_support.TESTFN)1326 self.assertEqual s(f.buffer.raw.name, test_support.TESTFN)1327 self.assertEqual s(f.mode, "U")1328 self.assertEqual s(f.buffer.mode, "rb")1329 self.assertEqual s(f.buffer.raw.mode, "rb")2660 f = self.open(support.TESTFN, "U") 2661 self.assertEqual(f.name, support.TESTFN) 2662 self.assertEqual(f.buffer.name, support.TESTFN) 2663 self.assertEqual(f.buffer.raw.name, support.TESTFN) 2664 self.assertEqual(f.mode, "U") 2665 self.assertEqual(f.buffer.mode, "rb") 2666 self.assertEqual(f.buffer.raw.mode, "rb") 1330 2667 f.close() 1331 2668 1332 f = io.open(test_support.TESTFN, "w+")1333 self.assertEqual s(f.mode, "w+")1334 self.assertEqual s(f.buffer.mode, "rb+") # Does it really matter?1335 self.assertEqual s(f.buffer.raw.mode, "rb+")1336 1337 g = io.open(f.fileno(), "wb", closefd=False)1338 self.assertEqual s(g.mode, "wb")1339 self.assertEqual s(g.raw.mode, "wb")1340 self.assertEqual s(g.name, f.fileno())1341 self.assertEqual s(g.raw.name, f.fileno())2669 f = self.open(support.TESTFN, "w+") 2670 self.assertEqual(f.mode, "w+") 2671 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 2672 self.assertEqual(f.buffer.raw.mode, "rb+") 2673 2674 g = self.open(f.fileno(), "wb", closefd=False) 2675 self.assertEqual(g.mode, "wb") 2676 self.assertEqual(g.raw.mode, "wb") 2677 self.assertEqual(g.name, f.fileno()) 2678 self.assertEqual(g.raw.name, f.fileno()) 1342 2679 f.close() 1343 2680 g.close() 1344 2681 2682 def test_io_after_close(self): 2683 for kwargs in [ 2684 {"mode": "w"}, 2685 {"mode": "wb"}, 2686 {"mode": "w", "buffering": 1}, 2687 {"mode": "w", "buffering": 2}, 2688 {"mode": "wb", "buffering": 0}, 2689 {"mode": "r"}, 2690 {"mode": "rb"}, 2691 {"mode": "r", "buffering": 1}, 2692 {"mode": "r", "buffering": 2}, 2693 {"mode": "rb", "buffering": 0}, 2694 {"mode": "w+"}, 2695 {"mode": "w+b"}, 2696 {"mode": "w+", "buffering": 1}, 2697 {"mode": "w+", "buffering": 2}, 2698 {"mode": "w+b", "buffering": 0}, 2699 ]: 2700 f = self.open(support.TESTFN, **kwargs) 2701 f.close() 2702 self.assertRaises(ValueError, f.flush) 2703 self.assertRaises(ValueError, f.fileno) 2704 self.assertRaises(ValueError, f.isatty) 2705 self.assertRaises(ValueError, f.__iter__) 2706 if hasattr(f, "peek"): 2707 self.assertRaises(ValueError, f.peek, 1) 2708 self.assertRaises(ValueError, f.read) 2709 if hasattr(f, "read1"): 2710 self.assertRaises(ValueError, f.read1, 1024) 2711 if hasattr(f, "readall"): 2712 self.assertRaises(ValueError, f.readall) 2713 if hasattr(f, "readinto"): 2714 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 2715 self.assertRaises(ValueError, f.readline) 2716 self.assertRaises(ValueError, f.readlines) 2717 self.assertRaises(ValueError, f.seek, 0) 2718 self.assertRaises(ValueError, f.tell) 2719 self.assertRaises(ValueError, f.truncate) 2720 self.assertRaises(ValueError, f.write, 2721 b"" if "b" in kwargs['mode'] else "") 2722 self.assertRaises(ValueError, f.writelines, []) 2723 self.assertRaises(ValueError, next, f) 2724 2725 def test_blockingioerror(self): 2726 # Various BlockingIOError issues 2727 self.assertRaises(TypeError, self.BlockingIOError) 2728 self.assertRaises(TypeError, self.BlockingIOError, 1) 2729 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) 2730 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) 2731 b = self.BlockingIOError(1, "") 2732 self.assertEqual(b.characters_written, 0) 2733 class C(unicode): 2734 pass 2735 c = C("") 2736 b = self.BlockingIOError(1, c) 2737 c.b = b 2738 b.c = c 2739 wr = weakref.ref(c) 2740 del c, b 2741 support.gc_collect() 2742 self.assertTrue(wr() is None, wr) 2743 2744 def test_abcs(self): 2745 # Test the visible base classes are ABCs. 2746 self.assertIsInstance(self.IOBase, abc.ABCMeta) 2747 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 2748 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 2749 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 2750 2751 def _check_abc_inheritance(self, abcmodule): 2752 with self.open(support.TESTFN, "wb", buffering=0) as f: 2753 self.assertIsInstance(f, abcmodule.IOBase) 2754 self.assertIsInstance(f, abcmodule.RawIOBase) 2755 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2756 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2757 with self.open(support.TESTFN, "wb") as f: 2758 self.assertIsInstance(f, abcmodule.IOBase) 2759 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2760 self.assertIsInstance(f, abcmodule.BufferedIOBase) 2761 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2762 with self.open(support.TESTFN, "w") as f: 2763 self.assertIsInstance(f, abcmodule.IOBase) 2764 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2765 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2766 self.assertIsInstance(f, abcmodule.TextIOBase) 2767 2768 def test_abc_inheritance(self): 2769 # Test implementations inherit from their respective ABCs 2770 self._check_abc_inheritance(self) 2771 2772 def test_abc_inheritance_official(self): 2773 # Test implementations inherit from the official ABCs of the 2774 # baseline "io" module. 2775 self._check_abc_inheritance(io) 2776 2777 @unittest.skipUnless(fcntl, 'fcntl required for this test') 2778 def test_nonblock_pipe_write_bigbuf(self): 2779 self._test_nonblock_pipe_write(16*1024) 2780 2781 @unittest.skipUnless(fcntl, 'fcntl required for this test') 2782 def test_nonblock_pipe_write_smallbuf(self): 2783 self._test_nonblock_pipe_write(1024) 2784 2785 def _set_non_blocking(self, fd): 2786 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 2787 self.assertNotEqual(flags, -1) 2788 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 2789 self.assertEqual(res, 0) 2790 2791 def _test_nonblock_pipe_write(self, bufsize): 2792 sent = [] 2793 received = [] 2794 r, w = os.pipe() 2795 self._set_non_blocking(r) 2796 self._set_non_blocking(w) 2797 2798 # To exercise all code paths in the C implementation we need 2799 # to play with buffer sizes. For instance, if we choose a 2800 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 2801 # then we will never get a partial write of the buffer. 2802 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 2803 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 2804 2805 with rf, wf: 2806 for N in 9999, 73, 7574: 2807 try: 2808 i = 0 2809 while True: 2810 msg = bytes([i % 26 + 97]) * N 2811 sent.append(msg) 2812 wf.write(msg) 2813 i += 1 2814 2815 except self.BlockingIOError as e: 2816 self.assertEqual(e.args[0], errno.EAGAIN) 2817 sent[-1] = sent[-1][:e.characters_written] 2818 received.append(rf.read()) 2819 msg = b'BLOCKED' 2820 wf.write(msg) 2821 sent.append(msg) 2822 2823 while True: 2824 try: 2825 wf.flush() 2826 break 2827 except self.BlockingIOError as e: 2828 self.assertEqual(e.args[0], errno.EAGAIN) 2829 self.assertEqual(e.characters_written, 0) 2830 received.append(rf.read()) 2831 2832 received += iter(rf.read, None) 2833 2834 sent, received = b''.join(sent), b''.join(received) 2835 self.assertTrue(sent == received) 2836 self.assertTrue(wf.closed) 2837 self.assertTrue(rf.closed) 2838 2839 class CMiscIOTest(MiscIOTest): 2840 io = io 2841 2842 class PyMiscIOTest(MiscIOTest): 2843 io = pyio 2844 2845 2846 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 2847 class SignalsTest(unittest.TestCase): 2848 2849 def setUp(self): 2850 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 2851 2852 def tearDown(self): 2853 signal.signal(signal.SIGALRM, self.oldalrm) 2854 2855 def alarm_interrupt(self, sig, frame): 2856 1 // 0 2857 2858 @unittest.skipUnless(threading, 'Threading required for this test.') 2859 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), 2860 'issue #12429: skip test on FreeBSD <= 7') 2861 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 2862 """Check that a partial write, when it gets interrupted, properly 2863 invokes the signal handler, and bubbles up the exception raised 2864 in the latter.""" 2865 read_results = [] 2866 def _read(): 2867 s = os.read(r, 1) 2868 read_results.append(s) 2869 t = threading.Thread(target=_read) 2870 t.daemon = True 2871 r, w = os.pipe() 2872 try: 2873 wio = self.io.open(w, **fdopen_kwargs) 2874 t.start() 2875 signal.alarm(1) 2876 # Fill the pipe enough that the write will be blocking. 2877 # It will be interrupted by the timer armed above. Since the 2878 # other thread has read one byte, the low-level write will 2879 # return with a successful (partial) result rather than an EINTR. 2880 # The buffered IO layer must check for pending signal 2881 # handlers, which in this case will invoke alarm_interrupt(). 2882 self.assertRaises(ZeroDivisionError, 2883 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1)) 2884 t.join() 2885 # We got one byte, get another one and check that it isn't a 2886 # repeat of the first one. 2887 read_results.append(os.read(r, 1)) 2888 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 2889 finally: 2890 os.close(w) 2891 os.close(r) 2892 # This is deliberate. If we didn't close the file descriptor 2893 # before closing wio, wio would try to flush its internal 2894 # buffer, and block again. 2895 try: 2896 wio.close() 2897 except IOError as e: 2898 if e.errno != errno.EBADF: 2899 raise 2900 2901 def test_interrupted_write_unbuffered(self): 2902 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 2903 2904 def test_interrupted_write_buffered(self): 2905 self.check_interrupted_write(b"xy", b"xy", mode="wb") 2906 2907 def test_interrupted_write_text(self): 2908 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 2909 2910 def check_reentrant_write(self, data, **fdopen_kwargs): 2911 def on_alarm(*args): 2912 # Will be called reentrantly from the same thread 2913 wio.write(data) 2914 1//0 2915 signal.signal(signal.SIGALRM, on_alarm) 2916 r, w = os.pipe() 2917 wio = self.io.open(w, **fdopen_kwargs) 2918 try: 2919 signal.alarm(1) 2920 # Either the reentrant call to wio.write() fails with RuntimeError, 2921 # or the signal handler raises ZeroDivisionError. 2922 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 2923 while 1: 2924 for i in range(100): 2925 wio.write(data) 2926 wio.flush() 2927 # Make sure the buffer doesn't fill up and block further writes 2928 os.read(r, len(data) * 100) 2929 exc = cm.exception 2930 if isinstance(exc, RuntimeError): 2931 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 2932 finally: 2933 wio.close() 2934 os.close(r) 2935 2936 def test_reentrant_write_buffered(self): 2937 self.check_reentrant_write(b"xy", mode="wb") 2938 2939 def test_reentrant_write_text(self): 2940 self.check_reentrant_write("xy", mode="w", encoding="ascii") 2941 2942 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 2943 """Check that a buffered read, when it gets interrupted (either 2944 returning a partial result or EINTR), properly invokes the signal 2945 handler and retries if the latter returned successfully.""" 2946 r, w = os.pipe() 2947 fdopen_kwargs["closefd"] = False 2948 def alarm_handler(sig, frame): 2949 os.write(w, b"bar") 2950 signal.signal(signal.SIGALRM, alarm_handler) 2951 try: 2952 rio = self.io.open(r, **fdopen_kwargs) 2953 os.write(w, b"foo") 2954 signal.alarm(1) 2955 # Expected behaviour: 2956 # - first raw read() returns partial b"foo" 2957 # - second raw read() returns EINTR 2958 # - third raw read() returns b"bar" 2959 self.assertEqual(decode(rio.read(6)), "foobar") 2960 finally: 2961 rio.close() 2962 os.close(w) 2963 os.close(r) 2964 2965 def test_interrupterd_read_retry_buffered(self): 2966 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 2967 mode="rb") 2968 2969 def test_interrupterd_read_retry_text(self): 2970 self.check_interrupted_read_retry(lambda x: x, 2971 mode="r") 2972 2973 @unittest.skipUnless(threading, 'Threading required for this test.') 2974 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 2975 """Check that a buffered write, when it gets interrupted (either 2976 returning a partial result or EINTR), properly invokes the signal 2977 handler and retries if the latter returned successfully.""" 2978 select = support.import_module("select") 2979 # A quantity that exceeds the buffer size of an anonymous pipe's 2980 # write end. 2981 N = support.PIPE_MAX_SIZE 2982 r, w = os.pipe() 2983 fdopen_kwargs["closefd"] = False 2984 # We need a separate thread to read from the pipe and allow the 2985 # write() to finish. This thread is started after the SIGALRM is 2986 # received (forcing a first EINTR in write()). 2987 read_results = [] 2988 write_finished = False 2989 def _read(): 2990 while not write_finished: 2991 while r in select.select([r], [], [], 1.0)[0]: 2992 s = os.read(r, 1024) 2993 read_results.append(s) 2994 t = threading.Thread(target=_read) 2995 t.daemon = True 2996 def alarm1(sig, frame): 2997 signal.signal(signal.SIGALRM, alarm2) 2998 signal.alarm(1) 2999 def alarm2(sig, frame): 3000 t.start() 3001 signal.signal(signal.SIGALRM, alarm1) 3002 try: 3003 wio = self.io.open(w, **fdopen_kwargs) 3004 signal.alarm(1) 3005 # Expected behaviour: 3006 # - first raw write() is partial (because of the limited pipe buffer 3007 # and the first alarm) 3008 # - second raw write() returns EINTR (because of the second alarm) 3009 # - subsequent write()s are successful (either partial or complete) 3010 self.assertEqual(N, wio.write(item * N)) 3011 wio.flush() 3012 write_finished = True 3013 t.join() 3014 self.assertEqual(N, sum(len(x) for x in read_results)) 3015 finally: 3016 write_finished = True 3017 os.close(w) 3018 os.close(r) 3019 # This is deliberate. If we didn't close the file descriptor 3020 # before closing wio, wio would try to flush its internal 3021 # buffer, and could block (in case of failure). 3022 try: 3023 wio.close() 3024 except IOError as e: 3025 if e.errno != errno.EBADF: 3026 raise 3027 3028 def test_interrupterd_write_retry_buffered(self): 3029 self.check_interrupted_write_retry(b"x", mode="wb") 3030 3031 def test_interrupterd_write_retry_text(self): 3032 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 3033 3034 3035 class CSignalsTest(SignalsTest): 3036 io = io 3037 3038 class PySignalsTest(SignalsTest): 3039 io = pyio 3040 3041 # Handling reentrancy issues would slow down _pyio even more, so the 3042 # tests are disabled. 3043 test_reentrant_write_buffered = None 3044 test_reentrant_write_text = None 3045 1345 3046 1346 3047 def test_main(): 1347 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest, 1348 BufferedReaderTest, BufferedWriterTest, 1349 BufferedRWPairTest, BufferedRandomTest, 1350 StatefulIncrementalDecoderTest, 1351 TextIOWrapperTest, MiscIOTest) 3048 tests = (CIOTest, PyIOTest, 3049 CBufferedReaderTest, PyBufferedReaderTest, 3050 CBufferedWriterTest, PyBufferedWriterTest, 3051 CBufferedRWPairTest, PyBufferedRWPairTest, 3052 CBufferedRandomTest, PyBufferedRandomTest, 3053 StatefulIncrementalDecoderTest, 3054 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 3055 CTextIOWrapperTest, PyTextIOWrapperTest, 3056 CMiscIOTest, PyMiscIOTest, 3057 CSignalsTest, PySignalsTest, 3058 ) 3059 3060 # Put the namespaces of the IO module we are testing and some useful mock 3061 # classes in the __dict__ of each test. 3062 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 3063 MockNonBlockWriterIO, MockRawIOWithoutRead) 3064 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 3065 c_io_ns = dict((name, getattr(io, name)) for name in all_members) 3066 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) 3067 globs = globals() 3068 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 3069 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 3070 # Avoid turning open into a bound method. 3071 py_io_ns["open"] = pyio.OpenWrapper 3072 for test in tests: 3073 if test.__name__.startswith("C"): 3074 for name, obj in c_io_ns.items(): 3075 setattr(test, name, obj) 3076 elif test.__name__.startswith("Py"): 3077 for name, obj in py_io_ns.items(): 3078 setattr(test, name, obj) 3079 3080 support.run_unittest(*tests) 1352 3081 1353 3082 if __name__ == "__main__": 1354 unittest.main()3083 test_main()
Note:
See TracChangeset
for help on using the changeset viewer.