source: python/trunk/Lib/test/test_io.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 110.5 KB
Line 
1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22from __future__ import print_function
23from __future__ import unicode_literals
24
25import os
26import sys
27import time
28import array
29import random
30import unittest
31import weakref
32import abc
33import signal
34import errno
35from itertools import cycle, count
36from collections import deque
37from UserList import UserList
38from test import test_support as support
39import contextlib
40
41import codecs
42import io # C implementation of io
43import _pyio as pyio # Python implementation of io
44try:
45 import threading
46except ImportError:
47 threading = None
48try:
49 import fcntl
50except ImportError:
51 fcntl = None
52
53__metaclass__ = type
54bytes = support.py3k_bytes
55
56def _default_chunk_size():
57 """Get the default TextIOWrapper chunk size"""
58 with io.open(__file__, "r", encoding="latin1") as f:
59 return f._CHUNK_SIZE
60
61
62class MockRawIOWithoutRead:
63 """A RawIO implementation without read(), so as to exercise the default
64 RawIO.read() which calls readinto()."""
65
66 def __init__(self, read_stack=()):
67 self._read_stack = list(read_stack)
68 self._write_stack = []
69 self._reads = 0
70 self._extraneous_reads = 0
71
72 def write(self, b):
73 self._write_stack.append(bytes(b))
74 return len(b)
75
76 def writable(self):
77 return True
78
79 def fileno(self):
80 return 42
81
82 def readable(self):
83 return True
84
85 def seekable(self):
86 return True
87
88 def seek(self, pos, whence):
89 return 0 # wrong but we gotta return something
90
91 def tell(self):
92 return 0 # same comment as above
93
94 def readinto(self, buf):
95 self._reads += 1
96 max_len = len(buf)
97 try:
98 data = self._read_stack[0]
99 except IndexError:
100 self._extraneous_reads += 1
101 return 0
102 if data is None:
103 del self._read_stack[0]
104 return None
105 n = len(data)
106 if len(data) <= max_len:
107 del self._read_stack[0]
108 buf[:n] = data
109 return n
110 else:
111 buf[:] = data[:max_len]
112 self._read_stack[0] = data[max_len:]
113 return max_len
114
115 def truncate(self, pos=None):
116 return pos
117
118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
119 pass
120
121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
122 pass
123
124
125class MockRawIO(MockRawIOWithoutRead):
126
127 def read(self, n=None):
128 self._reads += 1
129 try:
130 return self._read_stack.pop(0)
131 except:
132 self._extraneous_reads += 1
133 return b""
134
135class CMockRawIO(MockRawIO, io.RawIOBase):
136 pass
137
138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
139 pass
140
141
142class MisbehavedRawIO(MockRawIO):
143 def write(self, b):
144 return MockRawIO.write(self, b) * 2
145
146 def read(self, n=None):
147 return MockRawIO.read(self, n) * 2
148
149 def seek(self, pos, whence):
150 return -123
151
152 def tell(self):
153 return -456
154
155 def readinto(self, buf):
156 MockRawIO.readinto(self, buf)
157 return len(buf) * 5
158
159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
160 pass
161
162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
163 pass
164
165
166class CloseFailureIO(MockRawIO):
167 closed = 0
168
169 def close(self):
170 if not self.closed:
171 self.closed = 1
172 raise IOError
173
174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
175 pass
176
177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
178 pass
179
180
181class MockFileIO:
182
183 def __init__(self, data):
184 self.read_history = []
185 super(MockFileIO, self).__init__(data)
186
187 def read(self, n=None):
188 res = super(MockFileIO, self).read(n)
189 self.read_history.append(None if res is None else len(res))
190 return res
191
192 def readinto(self, b):
193 res = super(MockFileIO, self).readinto(b)
194 self.read_history.append(res)
195 return res
196
197class CMockFileIO(MockFileIO, io.BytesIO):
198 pass
199
200class PyMockFileIO(MockFileIO, pyio.BytesIO):
201 pass
202
203
204class MockNonBlockWriterIO:
205
206 def __init__(self):
207 self._write_stack = []
208 self._blocker_char = None
209
210 def pop_written(self):
211 s = b"".join(self._write_stack)
212 self._write_stack[:] = []
213 return s
214
215 def block_on(self, char):
216 """Block when a given char is encountered."""
217 self._blocker_char = char
218
219 def readable(self):
220 return True
221
222 def seekable(self):
223 return True
224
225 def writable(self):
226 return True
227
228 def write(self, b):
229 b = bytes(b)
230 n = -1
231 if self._blocker_char:
232 try:
233 n = b.index(self._blocker_char)
234 except ValueError:
235 pass
236 else:
237 if n > 0:
238 # write data up to the first blocker
239 self._write_stack.append(b[:n])
240 return n
241 else:
242 # cancel blocker and indicate would block
243 self._blocker_char = None
244 return None
245 self._write_stack.append(b)
246 return len(b)
247
248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
249 BlockingIOError = io.BlockingIOError
250
251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
252 BlockingIOError = pyio.BlockingIOError
253
254
255class IOTest(unittest.TestCase):
256
257 def setUp(self):
258 support.unlink(support.TESTFN)
259
260 def tearDown(self):
261 support.unlink(support.TESTFN)
262
263 def write_ops(self, f):
264 self.assertEqual(f.write(b"blah."), 5)
265 f.truncate(0)
266 self.assertEqual(f.tell(), 5)
267 f.seek(0)
268
269 self.assertEqual(f.write(b"blah."), 5)
270 self.assertEqual(f.seek(0), 0)
271 self.assertEqual(f.write(b"Hello."), 6)
272 self.assertEqual(f.tell(), 6)
273 self.assertEqual(f.seek(-1, 1), 5)
274 self.assertEqual(f.tell(), 5)
275 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
276 self.assertEqual(f.seek(0), 0)
277 self.assertEqual(f.write(b"h"), 1)
278 self.assertEqual(f.seek(-1, 2), 13)
279 self.assertEqual(f.tell(), 13)
280
281 self.assertEqual(f.truncate(12), 12)
282 self.assertEqual(f.tell(), 13)
283 self.assertRaises(TypeError, f.seek, 0.0)
284
285 def read_ops(self, f, buffered=False):
286 data = f.read(5)
287 self.assertEqual(data, b"hello")
288 data = bytearray(data)
289 self.assertEqual(f.readinto(data), 5)
290 self.assertEqual(data, b" worl")
291 self.assertEqual(f.readinto(data), 2)
292 self.assertEqual(len(data), 5)
293 self.assertEqual(data[:2], b"d\n")
294 self.assertEqual(f.seek(0), 0)
295 self.assertEqual(f.read(20), b"hello world\n")
296 self.assertEqual(f.read(1), b"")
297 self.assertEqual(f.readinto(bytearray(b"x")), 0)
298 self.assertEqual(f.seek(-6, 2), 6)
299 self.assertEqual(f.read(5), b"world")
300 self.assertEqual(f.read(0), b"")
301 self.assertEqual(f.readinto(bytearray()), 0)
302 self.assertEqual(f.seek(-6, 1), 5)
303 self.assertEqual(f.read(5), b" worl")
304 self.assertEqual(f.tell(), 10)
305 self.assertRaises(TypeError, f.seek, 0.0)
306 if buffered:
307 f.seek(0)
308 self.assertEqual(f.read(), b"hello world\n")
309 f.seek(6)
310 self.assertEqual(f.read(), b"world\n")
311 self.assertEqual(f.read(), b"")
312
313 LARGE = 2**31
314
315 def large_file_ops(self, f):
316 assert f.readable()
317 assert f.writable()
318 self.assertEqual(f.seek(self.LARGE), self.LARGE)
319 self.assertEqual(f.tell(), self.LARGE)
320 self.assertEqual(f.write(b"xxx"), 3)
321 self.assertEqual(f.tell(), self.LARGE + 3)
322 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
323 self.assertEqual(f.truncate(), self.LARGE + 2)
324 self.assertEqual(f.tell(), self.LARGE + 2)
325 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
326 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
329 self.assertEqual(f.seek(-1, 2), self.LARGE)
330 self.assertEqual(f.read(2), b"x")
331
332 def test_invalid_operations(self):
333 # Try writing on a file opened in read mode and vice-versa.
334 for mode in ("w", "wb"):
335 with self.open(support.TESTFN, mode) as fp:
336 self.assertRaises(IOError, fp.read)
337 self.assertRaises(IOError, fp.readline)
338 with self.open(support.TESTFN, "rb") as fp:
339 self.assertRaises(IOError, fp.write, b"blah")
340 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
341 with self.open(support.TESTFN, "r") as fp:
342 self.assertRaises(IOError, fp.write, "blah")
343 self.assertRaises(IOError, fp.writelines, ["blah\n"])
344
345 def test_raw_file_io(self):
346 with self.open(support.TESTFN, "wb", buffering=0) as f:
347 self.assertEqual(f.readable(), False)
348 self.assertEqual(f.writable(), True)
349 self.assertEqual(f.seekable(), True)
350 self.write_ops(f)
351 with self.open(support.TESTFN, "rb", buffering=0) as f:
352 self.assertEqual(f.readable(), True)
353 self.assertEqual(f.writable(), False)
354 self.assertEqual(f.seekable(), True)
355 self.read_ops(f)
356
357 def test_buffered_file_io(self):
358 with self.open(support.TESTFN, "wb") as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb") as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f, True)
368
369 def test_readline(self):
370 with self.open(support.TESTFN, "wb") as f:
371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372 with self.open(support.TESTFN, "rb") as f:
373 self.assertEqual(f.readline(), b"abc\n")
374 self.assertEqual(f.readline(10), b"def\n")
375 self.assertEqual(f.readline(2), b"xy")
376 self.assertEqual(f.readline(4), b"zzy\n")
377 self.assertEqual(f.readline(), b"foo\x00bar\n")
378 self.assertEqual(f.readline(None), b"another line")
379 self.assertRaises(TypeError, f.readline, 5.3)
380 with self.open(support.TESTFN, "r") as f:
381 self.assertRaises(TypeError, f.readline, 5.3)
382
383 def test_raw_bytes_io(self):
384 f = self.BytesIO()
385 self.write_ops(f)
386 data = f.getvalue()
387 self.assertEqual(data, b"hello world\n")
388 f = self.BytesIO(data)
389 self.read_ops(f, True)
390
391 def test_large_file_ops(self):
392 # On Windows and Mac OSX this test comsumes large resources; It takes
393 # a long time to build the >2GB file and takes >2GB of disk space
394 # therefore the resource must be enabled to run this test.
395 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
396 if not support.is_resource_enabled("largefile"):
397 print("\nTesting large file ops skipped on %s." % sys.platform,
398 file=sys.stderr)
399 print("It requires %d bytes and a long time." % self.LARGE,
400 file=sys.stderr)
401 print("Use 'regrtest.py -u largefile test_io' to run it.",
402 file=sys.stderr)
403 return
404 with self.open(support.TESTFN, "w+b", 0) as f:
405 self.large_file_ops(f)
406 with self.open(support.TESTFN, "w+b") as f:
407 self.large_file_ops(f)
408
409 def test_with_open(self):
410 for bufsize in (0, 1, 100):
411 f = None
412 with self.open(support.TESTFN, "wb", bufsize) as f:
413 f.write(b"xxx")
414 self.assertEqual(f.closed, True)
415 f = None
416 try:
417 with self.open(support.TESTFN, "wb", bufsize) as f:
418 1 // 0
419 except ZeroDivisionError:
420 self.assertEqual(f.closed, True)
421 else:
422 self.fail("1 // 0 didn't raise an exception")
423
424 # issue 5008
425 def test_append_mode_tell(self):
426 with self.open(support.TESTFN, "wb") as f:
427 f.write(b"xxx")
428 with self.open(support.TESTFN, "ab", buffering=0) as f:
429 self.assertEqual(f.tell(), 3)
430 with self.open(support.TESTFN, "ab") as f:
431 self.assertEqual(f.tell(), 3)
432 with self.open(support.TESTFN, "a") as f:
433 self.assertTrue(f.tell() > 0)
434
435 def test_destructor(self):
436 record = []
437 class MyFileIO(self.FileIO):
438 def __del__(self):
439 record.append(1)
440 try:
441 f = super(MyFileIO, self).__del__
442 except AttributeError:
443 pass
444 else:
445 f()
446 def close(self):
447 record.append(2)
448 super(MyFileIO, self).close()
449 def flush(self):
450 record.append(3)
451 super(MyFileIO, self).flush()
452 f = MyFileIO(support.TESTFN, "wb")
453 f.write(b"xxx")
454 del f
455 support.gc_collect()
456 self.assertEqual(record, [1, 2, 3])
457 with self.open(support.TESTFN, "rb") as f:
458 self.assertEqual(f.read(), b"xxx")
459
460 def _check_base_destructor(self, base):
461 record = []
462 class MyIO(base):
463 def __init__(self):
464 # This exercises the availability of attributes on object
465 # destruction.
466 # (in the C version, close() is called by the tp_dealloc
467 # function, not by __del__)
468 self.on_del = 1
469 self.on_close = 2
470 self.on_flush = 3
471 def __del__(self):
472 record.append(self.on_del)
473 try:
474 f = super(MyIO, self).__del__
475 except AttributeError:
476 pass
477 else:
478 f()
479 def close(self):
480 record.append(self.on_close)
481 super(MyIO, self).close()
482 def flush(self):
483 record.append(self.on_flush)
484 super(MyIO, self).flush()
485 f = MyIO()
486 del f
487 support.gc_collect()
488 self.assertEqual(record, [1, 2, 3])
489
490 def test_IOBase_destructor(self):
491 self._check_base_destructor(self.IOBase)
492
493 def test_RawIOBase_destructor(self):
494 self._check_base_destructor(self.RawIOBase)
495
496 def test_BufferedIOBase_destructor(self):
497 self._check_base_destructor(self.BufferedIOBase)
498
499 def test_TextIOBase_destructor(self):
500 self._check_base_destructor(self.TextIOBase)
501
502 def test_close_flushes(self):
503 with self.open(support.TESTFN, "wb") as f:
504 f.write(b"xxx")
505 with self.open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"xxx")
507
508 def test_array_writes(self):
509 a = array.array(b'i', range(10))
510 n = len(a.tostring())
511 with self.open(support.TESTFN, "wb", 0) as f:
512 self.assertEqual(f.write(a), n)
513 with self.open(support.TESTFN, "wb") as f:
514 self.assertEqual(f.write(a), n)
515
516 def test_closefd(self):
517 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
518 closefd=False)
519
520 def test_read_closed(self):
521 with self.open(support.TESTFN, "w") as f:
522 f.write("egg\n")
523 with self.open(support.TESTFN, "r") as f:
524 file = self.open(f.fileno(), "r", closefd=False)
525 self.assertEqual(file.read(), "egg\n")
526 file.seek(0)
527 file.close()
528 self.assertRaises(ValueError, file.read)
529
530 def test_no_closefd_with_filename(self):
531 # can't use closefd in combination with a file name
532 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
533
534 def test_closefd_attr(self):
535 with self.open(support.TESTFN, "wb") as f:
536 f.write(b"egg\n")
537 with self.open(support.TESTFN, "r") as f:
538 self.assertEqual(f.buffer.raw.closefd, True)
539 file = self.open(f.fileno(), "r", closefd=False)
540 self.assertEqual(file.buffer.raw.closefd, False)
541
542 def test_garbage_collection(self):
543 # FileIO objects are collected, and collecting them flushes
544 # all data to disk.
545 f = self.FileIO(support.TESTFN, "wb")
546 f.write(b"abcxxx")
547 f.f = f
548 wr = weakref.ref(f)
549 del f
550 support.gc_collect()
551 self.assertTrue(wr() is None, wr)
552 with self.open(support.TESTFN, "rb") as f:
553 self.assertEqual(f.read(), b"abcxxx")
554
555 def test_unbounded_file(self):
556 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
557 zero = "/dev/zero"
558 if not os.path.exists(zero):
559 self.skipTest("{0} does not exist".format(zero))
560 if sys.maxsize > 0x7FFFFFFF:
561 self.skipTest("test can only run in a 32-bit address space")
562 if support.real_max_memuse < support._2G:
563 self.skipTest("test requires at least 2GB of memory")
564 with self.open(zero, "rb", buffering=0) as f:
565 self.assertRaises(OverflowError, f.read)
566 with self.open(zero, "rb") as f:
567 self.assertRaises(OverflowError, f.read)
568 with self.open(zero, "r") as f:
569 self.assertRaises(OverflowError, f.read)
570
571 def test_flush_error_on_close(self):
572 f = self.open(support.TESTFN, "wb", buffering=0)
573 def bad_flush():
574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
577 self.assertTrue(f.closed)
578
579 def test_multi_close(self):
580 f = self.open(support.TESTFN, "wb", buffering=0)
581 f.close()
582 f.close()
583 f.close()
584 self.assertRaises(ValueError, f.flush)
585
586 def test_RawIOBase_read(self):
587 # Exercise the default RawIOBase.read() implementation (which calls
588 # readinto() internally).
589 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
590 self.assertEqual(rawio.read(2), b"ab")
591 self.assertEqual(rawio.read(2), b"c")
592 self.assertEqual(rawio.read(2), b"d")
593 self.assertEqual(rawio.read(2), None)
594 self.assertEqual(rawio.read(2), b"ef")
595 self.assertEqual(rawio.read(2), b"g")
596 self.assertEqual(rawio.read(2), None)
597 self.assertEqual(rawio.read(2), b"")
598
599 def test_fileio_closefd(self):
600 # Issue #4841
601 with self.open(__file__, 'rb') as f1, \
602 self.open(__file__, 'rb') as f2:
603 fileio = self.FileIO(f1.fileno(), closefd=False)
604 # .__init__() must not close f1
605 fileio.__init__(f2.fileno(), closefd=False)
606 f1.readline()
607 # .close() must not close f2
608 fileio.close()
609 f2.readline()
610
611
612class CIOTest(IOTest):
613
614 def test_IOBase_finalize(self):
615 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
616 # class which inherits IOBase and an object of this class are caught
617 # in a reference cycle and close() is already in the method cache.
618 class MyIO(self.IOBase):
619 def close(self):
620 pass
621
622 # create an instance to populate the method cache
623 MyIO()
624 obj = MyIO()
625 obj.obj = obj
626 wr = weakref.ref(obj)
627 del MyIO
628 del obj
629 support.gc_collect()
630 self.assertTrue(wr() is None, wr)
631
632class PyIOTest(IOTest):
633 test_array_writes = unittest.skip(
634 "len(array.array) returns number of elements rather than bytelength"
635 )(IOTest.test_array_writes)
636
637
638class CommonBufferedTests:
639 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
640
641 def test_detach(self):
642 raw = self.MockRawIO()
643 buf = self.tp(raw)
644 self.assertIs(buf.detach(), raw)
645 self.assertRaises(ValueError, buf.detach)
646
647 def test_fileno(self):
648 rawio = self.MockRawIO()
649 bufio = self.tp(rawio)
650
651 self.assertEqual(42, bufio.fileno())
652
653 def test_no_fileno(self):
654 # XXX will we always have fileno() function? If so, kill
655 # this test. Else, write it.
656 pass
657
658 def test_invalid_args(self):
659 rawio = self.MockRawIO()
660 bufio = self.tp(rawio)
661 # Invalid whence
662 self.assertRaises(ValueError, bufio.seek, 0, -1)
663 self.assertRaises(ValueError, bufio.seek, 0, 3)
664
665 def test_override_destructor(self):
666 tp = self.tp
667 record = []
668 class MyBufferedIO(tp):
669 def __del__(self):
670 record.append(1)
671 try:
672 f = super(MyBufferedIO, self).__del__
673 except AttributeError:
674 pass
675 else:
676 f()
677 def close(self):
678 record.append(2)
679 super(MyBufferedIO, self).close()
680 def flush(self):
681 record.append(3)
682 super(MyBufferedIO, self).flush()
683 rawio = self.MockRawIO()
684 bufio = MyBufferedIO(rawio)
685 writable = bufio.writable()
686 del bufio
687 support.gc_collect()
688 if writable:
689 self.assertEqual(record, [1, 2, 3])
690 else:
691 self.assertEqual(record, [1, 2])
692
693 def test_context_manager(self):
694 # Test usability as a context manager
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697 def _with():
698 with bufio:
699 pass
700 _with()
701 # bufio should now be closed, and using it a second time should raise
702 # a ValueError.
703 self.assertRaises(ValueError, _with)
704
705 def test_error_through_destructor(self):
706 # Test that the exception state is not modified by a destructor,
707 # even if close() fails.
708 rawio = self.CloseFailureIO()
709 def f():
710 self.tp(rawio).xyzzy
711 with support.captured_output("stderr") as s:
712 self.assertRaises(AttributeError, f)
713 s = s.getvalue().strip()
714 if s:
715 # The destructor *may* have printed an unraisable error, check it
716 self.assertEqual(len(s.splitlines()), 1)
717 self.assertTrue(s.startswith("Exception IOError: "), s)
718 self.assertTrue(s.endswith(" ignored"), s)
719
720 def test_repr(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
724 self.assertEqual(repr(b), "<%s>" % clsname)
725 raw.name = "dummy"
726 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
727 raw.name = b"dummy"
728 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
729
730 def test_flush_error_on_close(self):
731 raw = self.MockRawIO()
732 def bad_flush():
733 raise IOError()
734 raw.flush = bad_flush
735 b = self.tp(raw)
736 self.assertRaises(IOError, b.close) # exception not swallowed
737 self.assertTrue(b.closed)
738
739 def test_close_error_on_close(self):
740 raw = self.MockRawIO()
741 def bad_flush():
742 raise IOError('flush')
743 def bad_close():
744 raise IOError('close')
745 raw.close = bad_close
746 b = self.tp(raw)
747 b.flush = bad_flush
748 with self.assertRaises(IOError) as err: # exception not swallowed
749 b.close()
750 self.assertEqual(err.exception.args, ('close',))
751 self.assertFalse(b.closed)
752
753 def test_multi_close(self):
754 raw = self.MockRawIO()
755 b = self.tp(raw)
756 b.close()
757 b.close()
758 b.close()
759 self.assertRaises(ValueError, b.flush)
760
761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises((AttributeError, TypeError)):
766 buf.raw = x
767
768
769class SizeofTest:
770
771 @support.cpython_only
772 def test_sizeof(self):
773 bufsize1 = 4096
774 bufsize2 = 8192
775 rawio = self.MockRawIO()
776 bufio = self.tp(rawio, buffer_size=bufsize1)
777 size = sys.getsizeof(bufio) - bufsize1
778 rawio = self.MockRawIO()
779 bufio = self.tp(rawio, buffer_size=bufsize2)
780 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
781
782
783class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
784 read_mode = "rb"
785
786 def test_constructor(self):
787 rawio = self.MockRawIO([b"abc"])
788 bufio = self.tp(rawio)
789 bufio.__init__(rawio)
790 bufio.__init__(rawio, buffer_size=1024)
791 bufio.__init__(rawio, buffer_size=16)
792 self.assertEqual(b"abc", bufio.read())
793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
796 rawio = self.MockRawIO([b"abc"])
797 bufio.__init__(rawio)
798 self.assertEqual(b"abc", bufio.read())
799
800 def test_read(self):
801 for arg in (None, 7):
802 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
803 bufio = self.tp(rawio)
804 self.assertEqual(b"abcdefg", bufio.read(arg))
805 # Invalid args
806 self.assertRaises(ValueError, bufio.read, -2)
807
808 def test_read1(self):
809 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810 bufio = self.tp(rawio)
811 self.assertEqual(b"a", bufio.read(1))
812 self.assertEqual(b"b", bufio.read1(1))
813 self.assertEqual(rawio._reads, 1)
814 self.assertEqual(b"c", bufio.read1(100))
815 self.assertEqual(rawio._reads, 1)
816 self.assertEqual(b"d", bufio.read1(100))
817 self.assertEqual(rawio._reads, 2)
818 self.assertEqual(b"efg", bufio.read1(100))
819 self.assertEqual(rawio._reads, 3)
820 self.assertEqual(b"", bufio.read1(100))
821 self.assertEqual(rawio._reads, 4)
822 # Invalid args
823 self.assertRaises(ValueError, bufio.read1, -1)
824
825 def test_readinto(self):
826 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
827 bufio = self.tp(rawio)
828 b = bytearray(2)
829 self.assertEqual(bufio.readinto(b), 2)
830 self.assertEqual(b, b"ab")
831 self.assertEqual(bufio.readinto(b), 2)
832 self.assertEqual(b, b"cd")
833 self.assertEqual(bufio.readinto(b), 2)
834 self.assertEqual(b, b"ef")
835 self.assertEqual(bufio.readinto(b), 1)
836 self.assertEqual(b, b"gf")
837 self.assertEqual(bufio.readinto(b), 0)
838 self.assertEqual(b, b"gf")
839
840 def test_readlines(self):
841 def bufio():
842 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
843 return self.tp(rawio)
844 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
845 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
846 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
847
848 def test_buffering(self):
849 data = b"abcdefghi"
850 dlen = len(data)
851
852 tests = [
853 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
854 [ 100, [ 3, 3, 3], [ dlen ] ],
855 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
856 ]
857
858 for bufsize, buf_read_sizes, raw_read_sizes in tests:
859 rawio = self.MockFileIO(data)
860 bufio = self.tp(rawio, buffer_size=bufsize)
861 pos = 0
862 for nbytes in buf_read_sizes:
863 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
864 pos += nbytes
865 # this is mildly implementation-dependent
866 self.assertEqual(rawio.read_history, raw_read_sizes)
867
868 def test_read_non_blocking(self):
869 # Inject some None's in there to simulate EWOULDBLOCK
870 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
871 bufio = self.tp(rawio)
872 self.assertEqual(b"abcd", bufio.read(6))
873 self.assertEqual(b"e", bufio.read(1))
874 self.assertEqual(b"fg", bufio.read())
875 self.assertEqual(b"", bufio.peek(1))
876 self.assertIsNone(bufio.read())
877 self.assertEqual(b"", bufio.read())
878
879 rawio = self.MockRawIO((b"a", None, None))
880 self.assertEqual(b"a", rawio.readall())
881 self.assertIsNone(rawio.readall())
882
883 def test_read_past_eof(self):
884 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
886
887 self.assertEqual(b"abcdefg", bufio.read(9000))
888
889 def test_read_all(self):
890 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
891 bufio = self.tp(rawio)
892
893 self.assertEqual(b"abcdefg", bufio.read())
894
895 @unittest.skipUnless(threading, 'Threading required for this test.')
896 @support.requires_resource('cpu')
897 def test_threads(self):
898 try:
899 # Write out many bytes with exactly the same number of 0's,
900 # 1's... 255's. This will help us check that concurrent reading
901 # doesn't duplicate or forget contents.
902 N = 1000
903 l = list(range(256)) * N
904 random.shuffle(l)
905 s = bytes(bytearray(l))
906 with self.open(support.TESTFN, "wb") as f:
907 f.write(s)
908 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
909 bufio = self.tp(raw, 8)
910 errors = []
911 results = []
912 def f():
913 try:
914 # Intra-buffer read then buffer-flushing read
915 for n in cycle([1, 19]):
916 s = bufio.read(n)
917 if not s:
918 break
919 # list.append() is atomic
920 results.append(s)
921 except Exception as e:
922 errors.append(e)
923 raise
924 threads = [threading.Thread(target=f) for x in range(20)]
925 for t in threads:
926 t.start()
927 time.sleep(0.02) # yield
928 for t in threads:
929 t.join()
930 self.assertFalse(errors,
931 "the following exceptions were caught: %r" % errors)
932 s = b''.join(results)
933 for i in range(256):
934 c = bytes(bytearray([i]))
935 self.assertEqual(s.count(c), N)
936 finally:
937 support.unlink(support.TESTFN)
938
939 def test_misbehaved_io(self):
940 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
942 self.assertRaises(IOError, bufio.seek, 0)
943 self.assertRaises(IOError, bufio.tell)
944
945 def test_no_extraneous_read(self):
946 # Issue #9550; when the raw IO object has satisfied the read request,
947 # we should not issue any additional reads, otherwise it may block
948 # (e.g. socket).
949 bufsize = 16
950 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
951 rawio = self.MockRawIO([b"x" * n])
952 bufio = self.tp(rawio, bufsize)
953 self.assertEqual(bufio.read(n), b"x" * n)
954 # Simple case: one raw read is enough to satisfy the request.
955 self.assertEqual(rawio._extraneous_reads, 0,
956 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
957 # A more complex case where two raw reads are needed to satisfy
958 # the request.
959 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
960 bufio = self.tp(rawio, bufsize)
961 self.assertEqual(bufio.read(n), b"x" * n)
962 self.assertEqual(rawio._extraneous_reads, 0,
963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
964
965
966class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
967 tp = io.BufferedReader
968
969 def test_constructor(self):
970 BufferedReaderTest.test_constructor(self)
971 # The allocation can succeed on 32-bit builds, e.g. with more
972 # than 2GB RAM and a 64-bit kernel.
973 if sys.maxsize > 0x7FFFFFFF:
974 rawio = self.MockRawIO()
975 bufio = self.tp(rawio)
976 self.assertRaises((OverflowError, MemoryError, ValueError),
977 bufio.__init__, rawio, sys.maxsize)
978
979 def test_initialization(self):
980 rawio = self.MockRawIO([b"abc"])
981 bufio = self.tp(rawio)
982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
983 self.assertRaises(ValueError, bufio.read)
984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
985 self.assertRaises(ValueError, bufio.read)
986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
987 self.assertRaises(ValueError, bufio.read)
988
989 def test_misbehaved_io_read(self):
990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
992 # _pyio.BufferedReader seems to implement reading different, so that
993 # checking this is not so easy.
994 self.assertRaises(IOError, bufio.read, 10)
995
996 def test_garbage_collection(self):
997 # C BufferedReader objects are collected.
998 # The Python version has __del__, so it ends into gc.garbage instead
999 rawio = self.FileIO(support.TESTFN, "w+b")
1000 f = self.tp(rawio)
1001 f.f = f
1002 wr = weakref.ref(f)
1003 del f
1004 support.gc_collect()
1005 self.assertTrue(wr() is None, wr)
1006
1007 def test_args_error(self):
1008 # Issue #17275
1009 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1010 self.tp(io.BytesIO(), 1024, 1024, 1024)
1011
1012
1013class PyBufferedReaderTest(BufferedReaderTest):
1014 tp = pyio.BufferedReader
1015
1016
1017class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1018 write_mode = "wb"
1019
1020 def test_constructor(self):
1021 rawio = self.MockRawIO()
1022 bufio = self.tp(rawio)
1023 bufio.__init__(rawio)
1024 bufio.__init__(rawio, buffer_size=1024)
1025 bufio.__init__(rawio, buffer_size=16)
1026 self.assertEqual(3, bufio.write(b"abc"))
1027 bufio.flush()
1028 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1029 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1031 bufio.__init__(rawio)
1032 self.assertEqual(3, bufio.write(b"ghi"))
1033 bufio.flush()
1034 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1035
1036 def test_detach_flush(self):
1037 raw = self.MockRawIO()
1038 buf = self.tp(raw)
1039 buf.write(b"howdy!")
1040 self.assertFalse(raw._write_stack)
1041 buf.detach()
1042 self.assertEqual(raw._write_stack, [b"howdy!"])
1043
1044 def test_write(self):
1045 # Write to the buffered IO but don't overflow the buffer.
1046 writer = self.MockRawIO()
1047 bufio = self.tp(writer, 8)
1048 bufio.write(b"abc")
1049 self.assertFalse(writer._write_stack)
1050
1051 def test_write_overflow(self):
1052 writer = self.MockRawIO()
1053 bufio = self.tp(writer, 8)
1054 contents = b"abcdefghijklmnop"
1055 for n in range(0, len(contents), 3):
1056 bufio.write(contents[n:n+3])
1057 flushed = b"".join(writer._write_stack)
1058 # At least (total - 8) bytes were implicitly flushed, perhaps more
1059 # depending on the implementation.
1060 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1061
1062 def check_writes(self, intermediate_func):
1063 # Lots of writes, test the flushed output is as expected.
1064 contents = bytes(range(256)) * 1000
1065 n = 0
1066 writer = self.MockRawIO()
1067 bufio = self.tp(writer, 13)
1068 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1069 def gen_sizes():
1070 for size in count(1):
1071 for i in range(15):
1072 yield size
1073 sizes = gen_sizes()
1074 while n < len(contents):
1075 size = min(next(sizes), len(contents) - n)
1076 self.assertEqual(bufio.write(contents[n:n+size]), size)
1077 intermediate_func(bufio)
1078 n += size
1079 bufio.flush()
1080 self.assertEqual(contents,
1081 b"".join(writer._write_stack))
1082
1083 def test_writes(self):
1084 self.check_writes(lambda bufio: None)
1085
1086 def test_writes_and_flushes(self):
1087 self.check_writes(lambda bufio: bufio.flush())
1088
1089 def test_writes_and_seeks(self):
1090 def _seekabs(bufio):
1091 pos = bufio.tell()
1092 bufio.seek(pos + 1, 0)
1093 bufio.seek(pos - 1, 0)
1094 bufio.seek(pos, 0)
1095 self.check_writes(_seekabs)
1096 def _seekrel(bufio):
1097 pos = bufio.seek(0, 1)
1098 bufio.seek(+1, 1)
1099 bufio.seek(-1, 1)
1100 bufio.seek(pos, 0)
1101 self.check_writes(_seekrel)
1102
1103 def test_writes_and_truncates(self):
1104 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1105
1106 def test_write_non_blocking(self):
1107 raw = self.MockNonBlockWriterIO()
1108 bufio = self.tp(raw, 8)
1109
1110 self.assertEqual(bufio.write(b"abcd"), 4)
1111 self.assertEqual(bufio.write(b"efghi"), 5)
1112 # 1 byte will be written, the rest will be buffered
1113 raw.block_on(b"k")
1114 self.assertEqual(bufio.write(b"jklmn"), 5)
1115
1116 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1117 raw.block_on(b"0")
1118 try:
1119 bufio.write(b"opqrwxyz0123456789")
1120 except self.BlockingIOError as e:
1121 written = e.characters_written
1122 else:
1123 self.fail("BlockingIOError should have been raised")
1124 self.assertEqual(written, 16)
1125 self.assertEqual(raw.pop_written(),
1126 b"abcdefghijklmnopqrwxyz")
1127
1128 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1129 s = raw.pop_written()
1130 # Previously buffered bytes were flushed
1131 self.assertTrue(s.startswith(b"01234567A"), s)
1132
1133 def test_write_and_rewind(self):
1134 raw = io.BytesIO()
1135 bufio = self.tp(raw, 4)
1136 self.assertEqual(bufio.write(b"abcdef"), 6)
1137 self.assertEqual(bufio.tell(), 6)
1138 bufio.seek(0, 0)
1139 self.assertEqual(bufio.write(b"XY"), 2)
1140 bufio.seek(6, 0)
1141 self.assertEqual(raw.getvalue(), b"XYcdef")
1142 self.assertEqual(bufio.write(b"123456"), 6)
1143 bufio.flush()
1144 self.assertEqual(raw.getvalue(), b"XYcdef123456")
1145
1146 def test_flush(self):
1147 writer = self.MockRawIO()
1148 bufio = self.tp(writer, 8)
1149 bufio.write(b"abc")
1150 bufio.flush()
1151 self.assertEqual(b"abc", writer._write_stack[0])
1152
1153 def test_writelines(self):
1154 l = [b'ab', b'cd', b'ef']
1155 writer = self.MockRawIO()
1156 bufio = self.tp(writer, 8)
1157 bufio.writelines(l)
1158 bufio.flush()
1159 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1160
1161 def test_writelines_userlist(self):
1162 l = UserList([b'ab', b'cd', b'ef'])
1163 writer = self.MockRawIO()
1164 bufio = self.tp(writer, 8)
1165 bufio.writelines(l)
1166 bufio.flush()
1167 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1168
1169 def test_writelines_error(self):
1170 writer = self.MockRawIO()
1171 bufio = self.tp(writer, 8)
1172 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1173 self.assertRaises(TypeError, bufio.writelines, None)
1174
1175 def test_destructor(self):
1176 writer = self.MockRawIO()
1177 bufio = self.tp(writer, 8)
1178 bufio.write(b"abc")
1179 del bufio
1180 support.gc_collect()
1181 self.assertEqual(b"abc", writer._write_stack[0])
1182
1183 def test_truncate(self):
1184 # Truncate implicitly flushes the buffer.
1185 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1186 bufio = self.tp(raw, 8)
1187 bufio.write(b"abcdef")
1188 self.assertEqual(bufio.truncate(3), 3)
1189 self.assertEqual(bufio.tell(), 6)
1190 with self.open(support.TESTFN, "rb", buffering=0) as f:
1191 self.assertEqual(f.read(), b"abc")
1192
1193 @unittest.skipUnless(threading, 'Threading required for this test.')
1194 @support.requires_resource('cpu')
1195 def test_threads(self):
1196 try:
1197 # Write out many bytes from many threads and test they were
1198 # all flushed.
1199 N = 1000
1200 contents = bytes(range(256)) * N
1201 sizes = cycle([1, 19])
1202 n = 0
1203 queue = deque()
1204 while n < len(contents):
1205 size = next(sizes)
1206 queue.append(contents[n:n+size])
1207 n += size
1208 del contents
1209 # We use a real file object because it allows us to
1210 # exercise situations where the GIL is released before
1211 # writing the buffer to the raw streams. This is in addition
1212 # to concurrency issues due to switching threads in the middle
1213 # of Python code.
1214 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1215 bufio = self.tp(raw, 8)
1216 errors = []
1217 def f():
1218 try:
1219 while True:
1220 try:
1221 s = queue.popleft()
1222 except IndexError:
1223 return
1224 bufio.write(s)
1225 except Exception as e:
1226 errors.append(e)
1227 raise
1228 threads = [threading.Thread(target=f) for x in range(20)]
1229 for t in threads:
1230 t.start()
1231 time.sleep(0.02) # yield
1232 for t in threads:
1233 t.join()
1234 self.assertFalse(errors,
1235 "the following exceptions were caught: %r" % errors)
1236 bufio.close()
1237 with self.open(support.TESTFN, "rb") as f:
1238 s = f.read()
1239 for i in range(256):
1240 self.assertEqual(s.count(bytes([i])), N)
1241 finally:
1242 support.unlink(support.TESTFN)
1243
1244 def test_misbehaved_io(self):
1245 rawio = self.MisbehavedRawIO()
1246 bufio = self.tp(rawio, 5)
1247 self.assertRaises(IOError, bufio.seek, 0)
1248 self.assertRaises(IOError, bufio.tell)
1249 self.assertRaises(IOError, bufio.write, b"abcdef")
1250
1251 def test_max_buffer_size_deprecation(self):
1252 with support.check_warnings(("max_buffer_size is deprecated",
1253 DeprecationWarning)):
1254 self.tp(self.MockRawIO(), 8, 12)
1255
1256 def test_write_error_on_close(self):
1257 raw = self.MockRawIO()
1258 def bad_write(b):
1259 raise IOError()
1260 raw.write = bad_write
1261 b = self.tp(raw)
1262 b.write(b'spam')
1263 self.assertRaises(IOError, b.close) # exception not swallowed
1264 self.assertTrue(b.closed)
1265
1266
1267class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1268 tp = io.BufferedWriter
1269
1270 def test_constructor(self):
1271 BufferedWriterTest.test_constructor(self)
1272 # The allocation can succeed on 32-bit builds, e.g. with more
1273 # than 2GB RAM and a 64-bit kernel.
1274 if sys.maxsize > 0x7FFFFFFF:
1275 rawio = self.MockRawIO()
1276 bufio = self.tp(rawio)
1277 self.assertRaises((OverflowError, MemoryError, ValueError),
1278 bufio.__init__, rawio, sys.maxsize)
1279
1280 def test_initialization(self):
1281 rawio = self.MockRawIO()
1282 bufio = self.tp(rawio)
1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1284 self.assertRaises(ValueError, bufio.write, b"def")
1285 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1286 self.assertRaises(ValueError, bufio.write, b"def")
1287 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1288 self.assertRaises(ValueError, bufio.write, b"def")
1289
1290 def test_garbage_collection(self):
1291 # C BufferedWriter objects are collected, and collecting them flushes
1292 # all data to disk.
1293 # The Python version has __del__, so it ends into gc.garbage instead
1294 rawio = self.FileIO(support.TESTFN, "w+b")
1295 f = self.tp(rawio)
1296 f.write(b"123xxx")
1297 f.x = f
1298 wr = weakref.ref(f)
1299 del f
1300 support.gc_collect()
1301 self.assertTrue(wr() is None, wr)
1302 with self.open(support.TESTFN, "rb") as f:
1303 self.assertEqual(f.read(), b"123xxx")
1304
1305 def test_args_error(self):
1306 # Issue #17275
1307 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1308 self.tp(io.BytesIO(), 1024, 1024, 1024)
1309
1310
1311class PyBufferedWriterTest(BufferedWriterTest):
1312 tp = pyio.BufferedWriter
1313
1314class BufferedRWPairTest(unittest.TestCase):
1315
1316 def test_constructor(self):
1317 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1318 self.assertFalse(pair.closed)
1319
1320 def test_detach(self):
1321 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1322 self.assertRaises(self.UnsupportedOperation, pair.detach)
1323
1324 def test_constructor_max_buffer_size_deprecation(self):
1325 with support.check_warnings(("max_buffer_size is deprecated",
1326 DeprecationWarning)):
1327 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1328
1329 def test_constructor_with_not_readable(self):
1330 class NotReadable(MockRawIO):
1331 def readable(self):
1332 return False
1333
1334 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1335
1336 def test_constructor_with_not_writeable(self):
1337 class NotWriteable(MockRawIO):
1338 def writable(self):
1339 return False
1340
1341 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1342
1343 def test_read(self):
1344 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1345
1346 self.assertEqual(pair.read(3), b"abc")
1347 self.assertEqual(pair.read(1), b"d")
1348 self.assertEqual(pair.read(), b"ef")
1349 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1350 self.assertEqual(pair.read(None), b"abc")
1351
1352 def test_readlines(self):
1353 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1354 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1355 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1356 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1357
1358 def test_read1(self):
1359 # .read1() is delegated to the underlying reader object, so this test
1360 # can be shallow.
1361 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1362
1363 self.assertEqual(pair.read1(3), b"abc")
1364
1365 def test_readinto(self):
1366 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1367
1368 data = bytearray(5)
1369 self.assertEqual(pair.readinto(data), 5)
1370 self.assertEqual(data, b"abcde")
1371
1372 def test_write(self):
1373 w = self.MockRawIO()
1374 pair = self.tp(self.MockRawIO(), w)
1375
1376 pair.write(b"abc")
1377 pair.flush()
1378 pair.write(b"def")
1379 pair.flush()
1380 self.assertEqual(w._write_stack, [b"abc", b"def"])
1381
1382 def test_peek(self):
1383 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1384
1385 self.assertTrue(pair.peek(3).startswith(b"abc"))
1386 self.assertEqual(pair.read(3), b"abc")
1387
1388 def test_readable(self):
1389 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1390 self.assertTrue(pair.readable())
1391
1392 def test_writeable(self):
1393 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1394 self.assertTrue(pair.writable())
1395
1396 def test_seekable(self):
1397 # BufferedRWPairs are never seekable, even if their readers and writers
1398 # are.
1399 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1400 self.assertFalse(pair.seekable())
1401
1402 # .flush() is delegated to the underlying writer object and has been
1403 # tested in the test_write method.
1404
1405 def test_close_and_closed(self):
1406 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1407 self.assertFalse(pair.closed)
1408 pair.close()
1409 self.assertTrue(pair.closed)
1410
1411 def test_isatty(self):
1412 class SelectableIsAtty(MockRawIO):
1413 def __init__(self, isatty):
1414 MockRawIO.__init__(self)
1415 self._isatty = isatty
1416
1417 def isatty(self):
1418 return self._isatty
1419
1420 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1421 self.assertFalse(pair.isatty())
1422
1423 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1424 self.assertTrue(pair.isatty())
1425
1426 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1427 self.assertTrue(pair.isatty())
1428
1429 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1430 self.assertTrue(pair.isatty())
1431
1432class CBufferedRWPairTest(BufferedRWPairTest):
1433 tp = io.BufferedRWPair
1434
1435class PyBufferedRWPairTest(BufferedRWPairTest):
1436 tp = pyio.BufferedRWPair
1437
1438
1439class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1440 read_mode = "rb+"
1441 write_mode = "wb+"
1442
1443 def test_constructor(self):
1444 BufferedReaderTest.test_constructor(self)
1445 BufferedWriterTest.test_constructor(self)
1446
1447 def test_read_and_write(self):
1448 raw = self.MockRawIO((b"asdf", b"ghjk"))
1449 rw = self.tp(raw, 8)
1450
1451 self.assertEqual(b"as", rw.read(2))
1452 rw.write(b"ddd")
1453 rw.write(b"eee")
1454 self.assertFalse(raw._write_stack) # Buffer writes
1455 self.assertEqual(b"ghjk", rw.read())
1456 self.assertEqual(b"dddeee", raw._write_stack[0])
1457
1458 def test_seek_and_tell(self):
1459 raw = self.BytesIO(b"asdfghjkl")
1460 rw = self.tp(raw)
1461
1462 self.assertEqual(b"as", rw.read(2))
1463 self.assertEqual(2, rw.tell())
1464 rw.seek(0, 0)
1465 self.assertEqual(b"asdf", rw.read(4))
1466
1467 rw.write(b"123f")
1468 rw.seek(0, 0)
1469 self.assertEqual(b"asdf123fl", rw.read())
1470 self.assertEqual(9, rw.tell())
1471 rw.seek(-4, 2)
1472 self.assertEqual(5, rw.tell())
1473 rw.seek(2, 1)
1474 self.assertEqual(7, rw.tell())
1475 self.assertEqual(b"fl", rw.read(11))
1476 rw.flush()
1477 self.assertEqual(b"asdf123fl", raw.getvalue())
1478
1479 self.assertRaises(TypeError, rw.seek, 0.0)
1480
1481 def check_flush_and_read(self, read_func):
1482 raw = self.BytesIO(b"abcdefghi")
1483 bufio = self.tp(raw)
1484
1485 self.assertEqual(b"ab", read_func(bufio, 2))
1486 bufio.write(b"12")
1487 self.assertEqual(b"ef", read_func(bufio, 2))
1488 self.assertEqual(6, bufio.tell())
1489 bufio.flush()
1490 self.assertEqual(6, bufio.tell())
1491 self.assertEqual(b"ghi", read_func(bufio))
1492 raw.seek(0, 0)
1493 raw.write(b"XYZ")
1494 # flush() resets the read buffer
1495 bufio.flush()
1496 bufio.seek(0, 0)
1497 self.assertEqual(b"XYZ", read_func(bufio, 3))
1498
1499 def test_flush_and_read(self):
1500 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1501
1502 def test_flush_and_readinto(self):
1503 def _readinto(bufio, n=-1):
1504 b = bytearray(n if n >= 0 else 9999)
1505 n = bufio.readinto(b)
1506 return bytes(b[:n])
1507 self.check_flush_and_read(_readinto)
1508
1509 def test_flush_and_peek(self):
1510 def _peek(bufio, n=-1):
1511 # This relies on the fact that the buffer can contain the whole
1512 # raw stream, otherwise peek() can return less.
1513 b = bufio.peek(n)
1514 if n != -1:
1515 b = b[:n]
1516 bufio.seek(len(b), 1)
1517 return b
1518 self.check_flush_and_read(_peek)
1519
1520 def test_flush_and_write(self):
1521 raw = self.BytesIO(b"abcdefghi")
1522 bufio = self.tp(raw)
1523
1524 bufio.write(b"123")
1525 bufio.flush()
1526 bufio.write(b"45")
1527 bufio.flush()
1528 bufio.seek(0, 0)
1529 self.assertEqual(b"12345fghi", raw.getvalue())
1530 self.assertEqual(b"12345fghi", bufio.read())
1531
1532 def test_threads(self):
1533 BufferedReaderTest.test_threads(self)
1534 BufferedWriterTest.test_threads(self)
1535
1536 def test_writes_and_peek(self):
1537 def _peek(bufio):
1538 bufio.peek(1)
1539 self.check_writes(_peek)
1540 def _peek(bufio):
1541 pos = bufio.tell()
1542 bufio.seek(-1, 1)
1543 bufio.peek(1)
1544 bufio.seek(pos, 0)
1545 self.check_writes(_peek)
1546
1547 def test_writes_and_reads(self):
1548 def _read(bufio):
1549 bufio.seek(-1, 1)
1550 bufio.read(1)
1551 self.check_writes(_read)
1552
1553 def test_writes_and_read1s(self):
1554 def _read1(bufio):
1555 bufio.seek(-1, 1)
1556 bufio.read1(1)
1557 self.check_writes(_read1)
1558
1559 def test_writes_and_readintos(self):
1560 def _read(bufio):
1561 bufio.seek(-1, 1)
1562 bufio.readinto(bytearray(1))
1563 self.check_writes(_read)
1564
1565 def test_write_after_readahead(self):
1566 # Issue #6629: writing after the buffer was filled by readahead should
1567 # first rewind the raw stream.
1568 for overwrite_size in [1, 5]:
1569 raw = self.BytesIO(b"A" * 10)
1570 bufio = self.tp(raw, 4)
1571 # Trigger readahead
1572 self.assertEqual(bufio.read(1), b"A")
1573 self.assertEqual(bufio.tell(), 1)
1574 # Overwriting should rewind the raw stream if it needs so
1575 bufio.write(b"B" * overwrite_size)
1576 self.assertEqual(bufio.tell(), overwrite_size + 1)
1577 # If the write size was smaller than the buffer size, flush() and
1578 # check that rewind happens.
1579 bufio.flush()
1580 self.assertEqual(bufio.tell(), overwrite_size + 1)
1581 s = raw.getvalue()
1582 self.assertEqual(s,
1583 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1584
1585 def test_write_rewind_write(self):
1586 # Various combinations of reading / writing / seeking backwards / writing again
1587 def mutate(bufio, pos1, pos2):
1588 assert pos2 >= pos1
1589 # Fill the buffer
1590 bufio.seek(pos1)
1591 bufio.read(pos2 - pos1)
1592 bufio.write(b'\x02')
1593 # This writes earlier than the previous write, but still inside
1594 # the buffer.
1595 bufio.seek(pos1)
1596 bufio.write(b'\x01')
1597
1598 b = b"\x80\x81\x82\x83\x84"
1599 for i in range(0, len(b)):
1600 for j in range(i, len(b)):
1601 raw = self.BytesIO(b)
1602 bufio = self.tp(raw, 100)
1603 mutate(bufio, i, j)
1604 bufio.flush()
1605 expected = bytearray(b)
1606 expected[j] = 2
1607 expected[i] = 1
1608 self.assertEqual(raw.getvalue(), expected,
1609 "failed result for i=%d, j=%d" % (i, j))
1610
1611 def test_truncate_after_read_or_write(self):
1612 raw = self.BytesIO(b"A" * 10)
1613 bufio = self.tp(raw, 100)
1614 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1615 self.assertEqual(bufio.truncate(), 2)
1616 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1617 self.assertEqual(bufio.truncate(), 4)
1618
1619 def test_misbehaved_io(self):
1620 BufferedReaderTest.test_misbehaved_io(self)
1621 BufferedWriterTest.test_misbehaved_io(self)
1622
1623 def test_interleaved_read_write(self):
1624 # Test for issue #12213
1625 with self.BytesIO(b'abcdefgh') as raw:
1626 with self.tp(raw, 100) as f:
1627 f.write(b"1")
1628 self.assertEqual(f.read(1), b'b')
1629 f.write(b'2')
1630 self.assertEqual(f.read1(1), b'd')
1631 f.write(b'3')
1632 buf = bytearray(1)
1633 f.readinto(buf)
1634 self.assertEqual(buf, b'f')
1635 f.write(b'4')
1636 self.assertEqual(f.peek(1), b'h')
1637 f.flush()
1638 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1639
1640 with self.BytesIO(b'abc') as raw:
1641 with self.tp(raw, 100) as f:
1642 self.assertEqual(f.read(1), b'a')
1643 f.write(b"2")
1644 self.assertEqual(f.read(1), b'c')
1645 f.flush()
1646 self.assertEqual(raw.getvalue(), b'a2c')
1647
1648 def test_interleaved_readline_write(self):
1649 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1650 with self.tp(raw) as f:
1651 f.write(b'1')
1652 self.assertEqual(f.readline(), b'b\n')
1653 f.write(b'2')
1654 self.assertEqual(f.readline(), b'def\n')
1655 f.write(b'3')
1656 self.assertEqual(f.readline(), b'\n')
1657 f.flush()
1658 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1659
1660
1661class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1662 BufferedRandomTest, SizeofTest):
1663 tp = io.BufferedRandom
1664
1665 def test_constructor(self):
1666 BufferedRandomTest.test_constructor(self)
1667 # The allocation can succeed on 32-bit builds, e.g. with more
1668 # than 2GB RAM and a 64-bit kernel.
1669 if sys.maxsize > 0x7FFFFFFF:
1670 rawio = self.MockRawIO()
1671 bufio = self.tp(rawio)
1672 self.assertRaises((OverflowError, MemoryError, ValueError),
1673 bufio.__init__, rawio, sys.maxsize)
1674
1675 def test_garbage_collection(self):
1676 CBufferedReaderTest.test_garbage_collection(self)
1677 CBufferedWriterTest.test_garbage_collection(self)
1678
1679 def test_args_error(self):
1680 # Issue #17275
1681 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1682 self.tp(io.BytesIO(), 1024, 1024, 1024)
1683
1684
1685class PyBufferedRandomTest(BufferedRandomTest):
1686 tp = pyio.BufferedRandom
1687
1688
1689# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1690# properties:
1691# - A single output character can correspond to many bytes of input.
1692# - The number of input bytes to complete the character can be
1693# undetermined until the last input byte is received.
1694# - The number of input bytes can vary depending on previous input.
1695# - A single input byte can correspond to many characters of output.
1696# - The number of output characters can be undetermined until the
1697# last input byte is received.
1698# - The number of output characters can vary depending on previous input.
1699
1700class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1701 """
1702 For testing seek/tell behavior with a stateful, buffering decoder.
1703
1704 Input is a sequence of words. Words may be fixed-length (length set
1705 by input) or variable-length (period-terminated). In variable-length
1706 mode, extra periods are ignored. Possible words are:
1707 - 'i' followed by a number sets the input length, I (maximum 99).
1708 When I is set to 0, words are space-terminated.
1709 - 'o' followed by a number sets the output length, O (maximum 99).
1710 - Any other word is converted into a word followed by a period on
1711 the output. The output word consists of the input word truncated
1712 or padded out with hyphens to make its length equal to O. If O
1713 is 0, the word is output verbatim without truncating or padding.
1714 I and O are initially set to 1. When I changes, any buffered input is
1715 re-scanned according to the new I. EOF also terminates the last word.
1716 """
1717
1718 def __init__(self, errors='strict'):
1719 codecs.IncrementalDecoder.__init__(self, errors)
1720 self.reset()
1721
1722 def __repr__(self):
1723 return '<SID %x>' % id(self)
1724
1725 def reset(self):
1726 self.i = 1
1727 self.o = 1
1728 self.buffer = bytearray()
1729
1730 def getstate(self):
1731 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1732 return bytes(self.buffer), i*100 + o
1733
1734 def setstate(self, state):
1735 buffer, io = state
1736 self.buffer = bytearray(buffer)
1737 i, o = divmod(io, 100)
1738 self.i, self.o = i ^ 1, o ^ 1
1739
1740 def decode(self, input, final=False):
1741 output = ''
1742 for b in input:
1743 if self.i == 0: # variable-length, terminated with period
1744 if b == '.':
1745 if self.buffer:
1746 output += self.process_word()
1747 else:
1748 self.buffer.append(b)
1749 else: # fixed-length, terminate after self.i bytes
1750 self.buffer.append(b)
1751 if len(self.buffer) == self.i:
1752 output += self.process_word()
1753 if final and self.buffer: # EOF terminates the last word
1754 output += self.process_word()
1755 return output
1756
1757 def process_word(self):
1758 output = ''
1759 if self.buffer[0] == ord('i'):
1760 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1761 elif self.buffer[0] == ord('o'):
1762 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1763 else:
1764 output = self.buffer.decode('ascii')
1765 if len(output) < self.o:
1766 output += '-'*self.o # pad out with hyphens
1767 if self.o:
1768 output = output[:self.o] # truncate to output length
1769 output += '.'
1770 self.buffer = bytearray()
1771 return output
1772
1773 codecEnabled = False
1774
1775 @classmethod
1776 def lookupTestDecoder(cls, name):
1777 if cls.codecEnabled and name == 'test_decoder':
1778 latin1 = codecs.lookup('latin-1')
1779 return codecs.CodecInfo(
1780 name='test_decoder', encode=latin1.encode, decode=None,
1781 incrementalencoder=None,
1782 streamreader=None, streamwriter=None,
1783 incrementaldecoder=cls)
1784
1785# Register the previous decoder for testing.
1786# Disabled by default, tests will enable it.
1787codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1788
1789
1790class StatefulIncrementalDecoderTest(unittest.TestCase):
1791 """
1792 Make sure the StatefulIncrementalDecoder actually works.
1793 """
1794
1795 test_cases = [
1796 # I=1, O=1 (fixed-length input == fixed-length output)
1797 (b'abcd', False, 'a.b.c.d.'),
1798 # I=0, O=0 (variable-length input, variable-length output)
1799 (b'oiabcd', True, 'abcd.'),
1800 # I=0, O=0 (should ignore extra periods)
1801 (b'oi...abcd...', True, 'abcd.'),
1802 # I=0, O=6 (variable-length input, fixed-length output)
1803 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1804 # I=2, O=6 (fixed-length input < fixed-length output)
1805 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1806 # I=6, O=3 (fixed-length input > fixed-length output)
1807 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1808 # I=0, then 3; O=29, then 15 (with longer output)
1809 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1810 'a----------------------------.' +
1811 'b----------------------------.' +
1812 'cde--------------------------.' +
1813 'abcdefghijabcde.' +
1814 'a.b------------.' +
1815 '.c.------------.' +
1816 'd.e------------.' +
1817 'k--------------.' +
1818 'l--------------.' +
1819 'm--------------.')
1820 ]
1821
1822 def test_decoder(self):
1823 # Try a few one-shot test cases.
1824 for input, eof, output in self.test_cases:
1825 d = StatefulIncrementalDecoder()
1826 self.assertEqual(d.decode(input, eof), output)
1827
1828 # Also test an unfinished decode, followed by forcing EOF.
1829 d = StatefulIncrementalDecoder()
1830 self.assertEqual(d.decode(b'oiabcd'), '')
1831 self.assertEqual(d.decode(b'', 1), 'abcd.')
1832
1833class TextIOWrapperTest(unittest.TestCase):
1834
1835 def setUp(self):
1836 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1837 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1838 support.unlink(support.TESTFN)
1839
1840 def tearDown(self):
1841 support.unlink(support.TESTFN)
1842
1843 def test_constructor(self):
1844 r = self.BytesIO(b"\xc3\xa9\n\n")
1845 b = self.BufferedReader(r, 1000)
1846 t = self.TextIOWrapper(b)
1847 t.__init__(b, encoding="latin1", newline="\r\n")
1848 self.assertEqual(t.encoding, "latin1")
1849 self.assertEqual(t.line_buffering, False)
1850 t.__init__(b, encoding="utf8", line_buffering=True)
1851 self.assertEqual(t.encoding, "utf8")
1852 self.assertEqual(t.line_buffering, True)
1853 self.assertEqual("\xe9\n", t.readline())
1854 self.assertRaises(TypeError, t.__init__, b, newline=42)
1855 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1856
1857 def test_detach(self):
1858 r = self.BytesIO()
1859 b = self.BufferedWriter(r)
1860 t = self.TextIOWrapper(b)
1861 self.assertIs(t.detach(), b)
1862
1863 t = self.TextIOWrapper(b, encoding="ascii")
1864 t.write("howdy")
1865 self.assertFalse(r.getvalue())
1866 t.detach()
1867 self.assertEqual(r.getvalue(), b"howdy")
1868 self.assertRaises(ValueError, t.detach)
1869
1870 def test_repr(self):
1871 raw = self.BytesIO("hello".encode("utf-8"))
1872 b = self.BufferedReader(raw)
1873 t = self.TextIOWrapper(b, encoding="utf-8")
1874 modname = self.TextIOWrapper.__module__
1875 self.assertEqual(repr(t),
1876 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1877 raw.name = "dummy"
1878 self.assertEqual(repr(t),
1879 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1880 raw.name = b"dummy"
1881 self.assertEqual(repr(t),
1882 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1883
1884 def test_line_buffering(self):
1885 r = self.BytesIO()
1886 b = self.BufferedWriter(r, 1000)
1887 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1888 t.write("X")
1889 self.assertEqual(r.getvalue(), b"") # No flush happened
1890 t.write("Y\nZ")
1891 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
1892 t.write("A\rB")
1893 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
1894
1895 def test_encoding(self):
1896 # Check the encoding attribute is always set, and valid
1897 b = self.BytesIO()
1898 t = self.TextIOWrapper(b, encoding="utf8")
1899 self.assertEqual(t.encoding, "utf8")
1900 t = self.TextIOWrapper(b)
1901 self.assertTrue(t.encoding is not None)
1902 codecs.lookup(t.encoding)
1903
1904 def test_encoding_errors_reading(self):
1905 # (1) default
1906 b = self.BytesIO(b"abc\n\xff\n")
1907 t = self.TextIOWrapper(b, encoding="ascii")
1908 self.assertRaises(UnicodeError, t.read)
1909 # (2) explicit strict
1910 b = self.BytesIO(b"abc\n\xff\n")
1911 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1912 self.assertRaises(UnicodeError, t.read)
1913 # (3) ignore
1914 b = self.BytesIO(b"abc\n\xff\n")
1915 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
1916 self.assertEqual(t.read(), "abc\n\n")
1917 # (4) replace
1918 b = self.BytesIO(b"abc\n\xff\n")
1919 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1920 self.assertEqual(t.read(), "abc\n\ufffd\n")
1921
1922 def test_encoding_errors_writing(self):
1923 # (1) default
1924 b = self.BytesIO()
1925 t = self.TextIOWrapper(b, encoding="ascii")
1926 self.assertRaises(UnicodeError, t.write, "\xff")
1927 # (2) explicit strict
1928 b = self.BytesIO()
1929 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1930 self.assertRaises(UnicodeError, t.write, "\xff")
1931 # (3) ignore
1932 b = self.BytesIO()
1933 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
1934 newline="\n")
1935 t.write("abc\xffdef\n")
1936 t.flush()
1937 self.assertEqual(b.getvalue(), b"abcdef\n")
1938 # (4) replace
1939 b = self.BytesIO()
1940 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
1941 newline="\n")
1942 t.write("abc\xffdef\n")
1943 t.flush()
1944 self.assertEqual(b.getvalue(), b"abc?def\n")
1945
1946 def test_newlines(self):
1947 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1948
1949 tests = [
1950 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1951 [ '', input_lines ],
1952 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1953 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1954 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1955 ]
1956 encodings = (
1957 'utf-8', 'latin-1',
1958 'utf-16', 'utf-16-le', 'utf-16-be',
1959 'utf-32', 'utf-32-le', 'utf-32-be',
1960 )
1961
1962 # Try a range of buffer sizes to test the case where \r is the last
1963 # character in TextIOWrapper._pending_line.
1964 for encoding in encodings:
1965 # XXX: str.encode() should return bytes
1966 data = bytes(''.join(input_lines).encode(encoding))
1967 for do_reads in (False, True):
1968 for bufsize in range(1, 10):
1969 for newline, exp_lines in tests:
1970 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1971 textio = self.TextIOWrapper(bufio, newline=newline,
1972 encoding=encoding)
1973 if do_reads:
1974 got_lines = []
1975 while True:
1976 c2 = textio.read(2)
1977 if c2 == '':
1978 break
1979 self.assertEqual(len(c2), 2)
1980 got_lines.append(c2 + textio.readline())
1981 else:
1982 got_lines = list(textio)
1983
1984 for got_line, exp_line in zip(got_lines, exp_lines):
1985 self.assertEqual(got_line, exp_line)
1986 self.assertEqual(len(got_lines), len(exp_lines))
1987
1988 def test_newlines_input(self):
1989 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1990 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1991 for newline, expected in [
1992 (None, normalized.decode("ascii").splitlines(True)),
1993 ("", testdata.decode("ascii").splitlines(True)),
1994 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1995 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1996 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1997 ]:
1998 buf = self.BytesIO(testdata)
1999 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2000 self.assertEqual(txt.readlines(), expected)
2001 txt.seek(0)
2002 self.assertEqual(txt.read(), "".join(expected))
2003
2004 def test_newlines_output(self):
2005 testdict = {
2006 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2007 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2008 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2009 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2010 }
2011 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2012 for newline, expected in tests:
2013 buf = self.BytesIO()
2014 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2015 txt.write("AAA\nB")
2016 txt.write("BB\nCCC\n")
2017 txt.write("X\rY\r\nZ")
2018 txt.flush()
2019 self.assertEqual(buf.closed, False)
2020 self.assertEqual(buf.getvalue(), expected)
2021
2022 def test_destructor(self):
2023 l = []
2024 base = self.BytesIO
2025 class MyBytesIO(base):
2026 def close(self):
2027 l.append(self.getvalue())
2028 base.close(self)
2029 b = MyBytesIO()
2030 t = self.TextIOWrapper(b, encoding="ascii")
2031 t.write("abc")
2032 del t
2033 support.gc_collect()
2034 self.assertEqual([b"abc"], l)
2035
2036 def test_override_destructor(self):
2037 record = []
2038 class MyTextIO(self.TextIOWrapper):
2039 def __del__(self):
2040 record.append(1)
2041 try:
2042 f = super(MyTextIO, self).__del__
2043 except AttributeError:
2044 pass
2045 else:
2046 f()
2047 def close(self):
2048 record.append(2)
2049 super(MyTextIO, self).close()
2050 def flush(self):
2051 record.append(3)
2052 super(MyTextIO, self).flush()
2053 b = self.BytesIO()
2054 t = MyTextIO(b, encoding="ascii")
2055 del t
2056 support.gc_collect()
2057 self.assertEqual(record, [1, 2, 3])
2058
2059 def test_error_through_destructor(self):
2060 # Test that the exception state is not modified by a destructor,
2061 # even if close() fails.
2062 rawio = self.CloseFailureIO()
2063 def f():
2064 self.TextIOWrapper(rawio).xyzzy
2065 with support.captured_output("stderr") as s:
2066 self.assertRaises(AttributeError, f)
2067 s = s.getvalue().strip()
2068 if s:
2069 # The destructor *may* have printed an unraisable error, check it
2070 self.assertEqual(len(s.splitlines()), 1)
2071 self.assertTrue(s.startswith("Exception IOError: "), s)
2072 self.assertTrue(s.endswith(" ignored"), s)
2073
2074 # Systematic tests of the text I/O API
2075
2076 def test_basic_io(self):
2077 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2078 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
2079 f = self.open(support.TESTFN, "w+", encoding=enc)
2080 f._CHUNK_SIZE = chunksize
2081 self.assertEqual(f.write("abc"), 3)
2082 f.close()
2083 f = self.open(support.TESTFN, "r+", encoding=enc)
2084 f._CHUNK_SIZE = chunksize
2085 self.assertEqual(f.tell(), 0)
2086 self.assertEqual(f.read(), "abc")
2087 cookie = f.tell()
2088 self.assertEqual(f.seek(0), 0)
2089 self.assertEqual(f.read(None), "abc")
2090 f.seek(0)
2091 self.assertEqual(f.read(2), "ab")
2092 self.assertEqual(f.read(1), "c")
2093 self.assertEqual(f.read(1), "")
2094 self.assertEqual(f.read(), "")
2095 self.assertEqual(f.tell(), cookie)
2096 self.assertEqual(f.seek(0), 0)
2097 self.assertEqual(f.seek(0, 2), cookie)
2098 self.assertEqual(f.write("def"), 3)
2099 self.assertEqual(f.seek(cookie), cookie)
2100 self.assertEqual(f.read(), "def")
2101 if enc.startswith("utf"):
2102 self.multi_line_test(f, enc)
2103 f.close()
2104
2105 def multi_line_test(self, f, enc):
2106 f.seek(0)
2107 f.truncate()
2108 sample = "s\xff\u0fff\uffff"
2109 wlines = []
2110 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2111 chars = []
2112 for i in range(size):
2113 chars.append(sample[i % len(sample)])
2114 line = "".join(chars) + "\n"
2115 wlines.append((f.tell(), line))
2116 f.write(line)
2117 f.seek(0)
2118 rlines = []
2119 while True:
2120 pos = f.tell()
2121 line = f.readline()
2122 if not line:
2123 break
2124 rlines.append((pos, line))
2125 self.assertEqual(rlines, wlines)
2126
2127 def test_telling(self):
2128 f = self.open(support.TESTFN, "w+", encoding="utf8")
2129 p0 = f.tell()
2130 f.write("\xff\n")
2131 p1 = f.tell()
2132 f.write("\xff\n")
2133 p2 = f.tell()
2134 f.seek(0)
2135 self.assertEqual(f.tell(), p0)
2136 self.assertEqual(f.readline(), "\xff\n")
2137 self.assertEqual(f.tell(), p1)
2138 self.assertEqual(f.readline(), "\xff\n")
2139 self.assertEqual(f.tell(), p2)
2140 f.seek(0)
2141 for line in f:
2142 self.assertEqual(line, "\xff\n")
2143 self.assertRaises(IOError, f.tell)
2144 self.assertEqual(f.tell(), p2)
2145 f.close()
2146
2147 def test_seeking(self):
2148 chunk_size = _default_chunk_size()
2149 prefix_size = chunk_size - 2
2150 u_prefix = "a" * prefix_size
2151 prefix = bytes(u_prefix.encode("utf-8"))
2152 self.assertEqual(len(u_prefix), len(prefix))
2153 u_suffix = "\u8888\n"
2154 suffix = bytes(u_suffix.encode("utf-8"))
2155 line = prefix + suffix
2156 f = self.open(support.TESTFN, "wb")
2157 f.write(line*2)
2158 f.close()
2159 f = self.open(support.TESTFN, "r", encoding="utf-8")
2160 s = f.read(prefix_size)
2161 self.assertEqual(s, prefix.decode("ascii"))
2162 self.assertEqual(f.tell(), prefix_size)
2163 self.assertEqual(f.readline(), u_suffix)
2164
2165 def test_seeking_too(self):
2166 # Regression test for a specific bug
2167 data = b'\xe0\xbf\xbf\n'
2168 f = self.open(support.TESTFN, "wb")
2169 f.write(data)
2170 f.close()
2171 f = self.open(support.TESTFN, "r", encoding="utf-8")
2172 f._CHUNK_SIZE # Just test that it exists
2173 f._CHUNK_SIZE = 2
2174 f.readline()
2175 f.tell()
2176
2177 def test_seek_and_tell(self):
2178 #Test seek/tell using the StatefulIncrementalDecoder.
2179 # Make test faster by doing smaller seeks
2180 CHUNK_SIZE = 128
2181
2182 def test_seek_and_tell_with_data(data, min_pos=0):
2183 """Tell/seek to various points within a data stream and ensure
2184 that the decoded data returned by read() is consistent."""
2185 f = self.open(support.TESTFN, 'wb')
2186 f.write(data)
2187 f.close()
2188 f = self.open(support.TESTFN, encoding='test_decoder')
2189 f._CHUNK_SIZE = CHUNK_SIZE
2190 decoded = f.read()
2191 f.close()
2192
2193 for i in range(min_pos, len(decoded) + 1): # seek positions
2194 for j in [1, 5, len(decoded) - i]: # read lengths
2195 f = self.open(support.TESTFN, encoding='test_decoder')
2196 self.assertEqual(f.read(i), decoded[:i])
2197 cookie = f.tell()
2198 self.assertEqual(f.read(j), decoded[i:i + j])
2199 f.seek(cookie)
2200 self.assertEqual(f.read(), decoded[i:])
2201 f.close()
2202
2203 # Enable the test decoder.
2204 StatefulIncrementalDecoder.codecEnabled = 1
2205
2206 # Run the tests.
2207 try:
2208 # Try each test case.
2209 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2210 test_seek_and_tell_with_data(input)
2211
2212 # Position each test case so that it crosses a chunk boundary.
2213 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2214 offset = CHUNK_SIZE - len(input)//2
2215 prefix = b'.'*offset
2216 # Don't bother seeking into the prefix (takes too long).
2217 min_pos = offset*2
2218 test_seek_and_tell_with_data(prefix + input, min_pos)
2219
2220 # Ensure our test decoder won't interfere with subsequent tests.
2221 finally:
2222 StatefulIncrementalDecoder.codecEnabled = 0
2223
2224 def test_encoded_writes(self):
2225 data = "1234567890"
2226 tests = ("utf-16",
2227 "utf-16-le",
2228 "utf-16-be",
2229 "utf-32",
2230 "utf-32-le",
2231 "utf-32-be")
2232 for encoding in tests:
2233 buf = self.BytesIO()
2234 f = self.TextIOWrapper(buf, encoding=encoding)
2235 # Check if the BOM is written only once (see issue1753).
2236 f.write(data)
2237 f.write(data)
2238 f.seek(0)
2239 self.assertEqual(f.read(), data * 2)
2240 f.seek(0)
2241 self.assertEqual(f.read(), data * 2)
2242 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2243
2244 def test_unreadable(self):
2245 class UnReadable(self.BytesIO):
2246 def readable(self):
2247 return False
2248 txt = self.TextIOWrapper(UnReadable())
2249 self.assertRaises(IOError, txt.read)
2250
2251 def test_read_one_by_one(self):
2252 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2253 reads = ""
2254 while True:
2255 c = txt.read(1)
2256 if not c:
2257 break
2258 reads += c
2259 self.assertEqual(reads, "AA\nBB")
2260
2261 def test_readlines(self):
2262 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2263 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2264 txt.seek(0)
2265 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2266 txt.seek(0)
2267 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2268
2269 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2270 def test_read_by_chunk(self):
2271 # make sure "\r\n" straddles 128 char boundary.
2272 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2273 reads = ""
2274 while True:
2275 c = txt.read(128)
2276 if not c:
2277 break
2278 reads += c
2279 self.assertEqual(reads, "A"*127+"\nB")
2280
2281 def test_writelines(self):
2282 l = ['ab', 'cd', 'ef']
2283 buf = self.BytesIO()
2284 txt = self.TextIOWrapper(buf)
2285 txt.writelines(l)
2286 txt.flush()
2287 self.assertEqual(buf.getvalue(), b'abcdef')
2288
2289 def test_writelines_userlist(self):
2290 l = UserList(['ab', 'cd', 'ef'])
2291 buf = self.BytesIO()
2292 txt = self.TextIOWrapper(buf)
2293 txt.writelines(l)
2294 txt.flush()
2295 self.assertEqual(buf.getvalue(), b'abcdef')
2296
2297 def test_writelines_error(self):
2298 txt = self.TextIOWrapper(self.BytesIO())
2299 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2300 self.assertRaises(TypeError, txt.writelines, None)
2301 self.assertRaises(TypeError, txt.writelines, b'abc')
2302
2303 def test_issue1395_1(self):
2304 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2305
2306 # read one char at a time
2307 reads = ""
2308 while True:
2309 c = txt.read(1)
2310 if not c:
2311 break
2312 reads += c
2313 self.assertEqual(reads, self.normalized)
2314
2315 def test_issue1395_2(self):
2316 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2317 txt._CHUNK_SIZE = 4
2318
2319 reads = ""
2320 while True:
2321 c = txt.read(4)
2322 if not c:
2323 break
2324 reads += c
2325 self.assertEqual(reads, self.normalized)
2326
2327 def test_issue1395_3(self):
2328 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2329 txt._CHUNK_SIZE = 4
2330
2331 reads = txt.read(4)
2332 reads += txt.read(4)
2333 reads += txt.readline()
2334 reads += txt.readline()
2335 reads += txt.readline()
2336 self.assertEqual(reads, self.normalized)
2337
2338 def test_issue1395_4(self):
2339 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2340 txt._CHUNK_SIZE = 4
2341
2342 reads = txt.read(4)
2343 reads += txt.read()
2344 self.assertEqual(reads, self.normalized)
2345
2346 def test_issue1395_5(self):
2347 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2348 txt._CHUNK_SIZE = 4
2349
2350 reads = txt.read(4)
2351 pos = txt.tell()
2352 txt.seek(0)
2353 txt.seek(pos)
2354 self.assertEqual(txt.read(4), "BBB\n")
2355
2356 def test_issue2282(self):
2357 buffer = self.BytesIO(self.testdata)
2358 txt = self.TextIOWrapper(buffer, encoding="ascii")
2359
2360 self.assertEqual(buffer.seekable(), txt.seekable())
2361
2362 def test_append_bom(self):
2363 # The BOM is not written again when appending to a non-empty file
2364 filename = support.TESTFN
2365 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2366 with self.open(filename, 'w', encoding=charset) as f:
2367 f.write('aaa')
2368 pos = f.tell()
2369 with self.open(filename, 'rb') as f:
2370 self.assertEqual(f.read(), 'aaa'.encode(charset))
2371
2372 with self.open(filename, 'a', encoding=charset) as f:
2373 f.write('xxx')
2374 with self.open(filename, 'rb') as f:
2375 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2376
2377 def test_seek_bom(self):
2378 # Same test, but when seeking manually
2379 filename = support.TESTFN
2380 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2381 with self.open(filename, 'w', encoding=charset) as f:
2382 f.write('aaa')
2383 pos = f.tell()
2384 with self.open(filename, 'r+', encoding=charset) as f:
2385 f.seek(pos)
2386 f.write('zzz')
2387 f.seek(0)
2388 f.write('bbb')
2389 with self.open(filename, 'rb') as f:
2390 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2391
2392 def test_errors_property(self):
2393 with self.open(support.TESTFN, "w") as f:
2394 self.assertEqual(f.errors, "strict")
2395 with self.open(support.TESTFN, "w", errors="replace") as f:
2396 self.assertEqual(f.errors, "replace")
2397
2398 @unittest.skipUnless(threading, 'Threading required for this test.')
2399 def test_threads_write(self):
2400 # Issue6750: concurrent writes could duplicate data
2401 event = threading.Event()
2402 with self.open(support.TESTFN, "w", buffering=1) as f:
2403 def run(n):
2404 text = "Thread%03d\n" % n
2405 event.wait()
2406 f.write(text)
2407 threads = [threading.Thread(target=lambda n=x: run(n))
2408 for x in range(20)]
2409 for t in threads:
2410 t.start()
2411 time.sleep(0.02)
2412 event.set()
2413 for t in threads:
2414 t.join()
2415 with self.open(support.TESTFN) as f:
2416 content = f.read()
2417 for n in range(20):
2418 self.assertEqual(content.count("Thread%03d\n" % n), 1)
2419
2420 def test_flush_error_on_close(self):
2421 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2422 def bad_flush():
2423 raise IOError()
2424 txt.flush = bad_flush
2425 self.assertRaises(IOError, txt.close) # exception not swallowed
2426 self.assertTrue(txt.closed)
2427
2428 def test_multi_close(self):
2429 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2430 txt.close()
2431 txt.close()
2432 txt.close()
2433 self.assertRaises(ValueError, txt.flush)
2434
2435 def test_readonly_attributes(self):
2436 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2437 buf = self.BytesIO(self.testdata)
2438 with self.assertRaises((AttributeError, TypeError)):
2439 txt.buffer = buf
2440
2441 def test_read_nonbytes(self):
2442 # Issue #17106
2443 # Crash when underlying read() returns non-bytes
2444 class NonbytesStream(self.StringIO):
2445 read1 = self.StringIO.read
2446 class NonbytesStream(self.StringIO):
2447 read1 = self.StringIO.read
2448 t = self.TextIOWrapper(NonbytesStream('a'))
2449 with self.maybeRaises(TypeError):
2450 t.read(1)
2451 t = self.TextIOWrapper(NonbytesStream('a'))
2452 with self.maybeRaises(TypeError):
2453 t.readline()
2454 t = self.TextIOWrapper(NonbytesStream('a'))
2455 self.assertEqual(t.read(), u'a')
2456
2457 def test_illegal_decoder(self):
2458 # Issue #17106
2459 # Crash when decoder returns non-string
2460 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2461 encoding='quopri_codec')
2462 with self.maybeRaises(TypeError):
2463 t.read(1)
2464 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2465 encoding='quopri_codec')
2466 with self.maybeRaises(TypeError):
2467 t.readline()
2468 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2469 encoding='quopri_codec')
2470 with self.maybeRaises(TypeError):
2471 t.read()
2472
2473
2474class CTextIOWrapperTest(TextIOWrapperTest):
2475
2476 def test_initialization(self):
2477 r = self.BytesIO(b"\xc3\xa9\n\n")
2478 b = self.BufferedReader(r, 1000)
2479 t = self.TextIOWrapper(b)
2480 self.assertRaises(TypeError, t.__init__, b, newline=42)
2481 self.assertRaises(ValueError, t.read)
2482 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2483 self.assertRaises(ValueError, t.read)
2484
2485 def test_garbage_collection(self):
2486 # C TextIOWrapper objects are collected, and collecting them flushes
2487 # all data to disk.
2488 # The Python version has __del__, so it ends in gc.garbage instead.
2489 rawio = io.FileIO(support.TESTFN, "wb")
2490 b = self.BufferedWriter(rawio)
2491 t = self.TextIOWrapper(b, encoding="ascii")
2492 t.write("456def")
2493 t.x = t
2494 wr = weakref.ref(t)
2495 del t
2496 support.gc_collect()
2497 self.assertTrue(wr() is None, wr)
2498 with self.open(support.TESTFN, "rb") as f:
2499 self.assertEqual(f.read(), b"456def")
2500
2501 def test_rwpair_cleared_before_textio(self):
2502 # Issue 13070: TextIOWrapper's finalization would crash when called
2503 # after the reference to the underlying BufferedRWPair's writer got
2504 # cleared by the GC.
2505 for i in range(1000):
2506 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2507 t1 = self.TextIOWrapper(b1, encoding="ascii")
2508 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2509 t2 = self.TextIOWrapper(b2, encoding="ascii")
2510 # circular references
2511 t1.buddy = t2
2512 t2.buddy = t1
2513 support.gc_collect()
2514
2515 maybeRaises = unittest.TestCase.assertRaises
2516
2517
2518class PyTextIOWrapperTest(TextIOWrapperTest):
2519 @contextlib.contextmanager
2520 def maybeRaises(self, *args, **kwds):
2521 yield
2522
2523
2524class IncrementalNewlineDecoderTest(unittest.TestCase):
2525
2526 def check_newline_decoding_utf8(self, decoder):
2527 # UTF-8 specific tests for a newline decoder
2528 def _check_decode(b, s, **kwargs):
2529 # We exercise getstate() / setstate() as well as decode()
2530 state = decoder.getstate()
2531 self.assertEqual(decoder.decode(b, **kwargs), s)
2532 decoder.setstate(state)
2533 self.assertEqual(decoder.decode(b, **kwargs), s)
2534
2535 _check_decode(b'\xe8\xa2\x88', "\u8888")
2536
2537 _check_decode(b'\xe8', "")
2538 _check_decode(b'\xa2', "")
2539 _check_decode(b'\x88', "\u8888")
2540
2541 _check_decode(b'\xe8', "")
2542 _check_decode(b'\xa2', "")
2543 _check_decode(b'\x88', "\u8888")
2544
2545 _check_decode(b'\xe8', "")
2546 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2547
2548 decoder.reset()
2549 _check_decode(b'\n', "\n")
2550 _check_decode(b'\r', "")
2551 _check_decode(b'', "\n", final=True)
2552 _check_decode(b'\r', "\n", final=True)
2553
2554 _check_decode(b'\r', "")
2555 _check_decode(b'a', "\na")
2556
2557 _check_decode(b'\r\r\n', "\n\n")
2558 _check_decode(b'\r', "")
2559 _check_decode(b'\r', "\n")
2560 _check_decode(b'\na', "\na")
2561
2562 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2563 _check_decode(b'\xe8\xa2\x88', "\u8888")
2564 _check_decode(b'\n', "\n")
2565 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2566 _check_decode(b'\n', "\n")
2567
2568 def check_newline_decoding(self, decoder, encoding):
2569 result = []
2570 if encoding is not None:
2571 encoder = codecs.getincrementalencoder(encoding)()
2572 def _decode_bytewise(s):
2573 # Decode one byte at a time
2574 for b in encoder.encode(s):
2575 result.append(decoder.decode(b))
2576 else:
2577 encoder = None
2578 def _decode_bytewise(s):
2579 # Decode one char at a time
2580 for c in s:
2581 result.append(decoder.decode(c))
2582 self.assertEqual(decoder.newlines, None)
2583 _decode_bytewise("abc\n\r")
2584 self.assertEqual(decoder.newlines, '\n')
2585 _decode_bytewise("\nabc")
2586 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2587 _decode_bytewise("abc\r")
2588 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2589 _decode_bytewise("abc")
2590 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
2591 _decode_bytewise("abc\r")
2592 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
2593 decoder.reset()
2594 input = "abc"
2595 if encoder is not None:
2596 encoder.reset()
2597 input = encoder.encode(input)
2598 self.assertEqual(decoder.decode(input), "abc")
2599 self.assertEqual(decoder.newlines, None)
2600
2601 def test_newline_decoder(self):
2602 encodings = (
2603 # None meaning the IncrementalNewlineDecoder takes unicode input
2604 # rather than bytes input
2605 None, 'utf-8', 'latin-1',
2606 'utf-16', 'utf-16-le', 'utf-16-be',
2607 'utf-32', 'utf-32-le', 'utf-32-be',
2608 )
2609 for enc in encodings:
2610 decoder = enc and codecs.getincrementaldecoder(enc)()
2611 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2612 self.check_newline_decoding(decoder, enc)
2613 decoder = codecs.getincrementaldecoder("utf-8")()
2614 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2615 self.check_newline_decoding_utf8(decoder)
2616
2617 def test_newline_bytes(self):
2618 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2619 def _check(dec):
2620 self.assertEqual(dec.newlines, None)
2621 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2622 self.assertEqual(dec.newlines, None)
2623 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2624 self.assertEqual(dec.newlines, None)
2625 dec = self.IncrementalNewlineDecoder(None, translate=False)
2626 _check(dec)
2627 dec = self.IncrementalNewlineDecoder(None, translate=True)
2628 _check(dec)
2629
2630class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2631 pass
2632
2633class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2634 pass
2635
2636
2637# XXX Tests for open()
2638
2639class MiscIOTest(unittest.TestCase):
2640
2641 def tearDown(self):
2642 support.unlink(support.TESTFN)
2643
2644 def test___all__(self):
2645 for name in self.io.__all__:
2646 obj = getattr(self.io, name, None)
2647 self.assertTrue(obj is not None, name)
2648 if name == "open":
2649 continue
2650 elif "error" in name.lower() or name == "UnsupportedOperation":
2651 self.assertTrue(issubclass(obj, Exception), name)
2652 elif not name.startswith("SEEK_"):
2653 self.assertTrue(issubclass(obj, self.IOBase))
2654
2655 def test_attributes(self):
2656 f = self.open(support.TESTFN, "wb", buffering=0)
2657 self.assertEqual(f.mode, "wb")
2658 f.close()
2659
2660 f = self.open(support.TESTFN, "U")
2661 self.assertEqual(f.name, support.TESTFN)
2662 self.assertEqual(f.buffer.name, support.TESTFN)
2663 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2664 self.assertEqual(f.mode, "U")
2665 self.assertEqual(f.buffer.mode, "rb")
2666 self.assertEqual(f.buffer.raw.mode, "rb")
2667 f.close()
2668
2669 f = self.open(support.TESTFN, "w+")
2670 self.assertEqual(f.mode, "w+")
2671 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2672 self.assertEqual(f.buffer.raw.mode, "rb+")
2673
2674 g = self.open(f.fileno(), "wb", closefd=False)
2675 self.assertEqual(g.mode, "wb")
2676 self.assertEqual(g.raw.mode, "wb")
2677 self.assertEqual(g.name, f.fileno())
2678 self.assertEqual(g.raw.name, f.fileno())
2679 f.close()
2680 g.close()
2681
2682 def test_io_after_close(self):
2683 for kwargs in [
2684 {"mode": "w"},
2685 {"mode": "wb"},
2686 {"mode": "w", "buffering": 1},
2687 {"mode": "w", "buffering": 2},
2688 {"mode": "wb", "buffering": 0},
2689 {"mode": "r"},
2690 {"mode": "rb"},
2691 {"mode": "r", "buffering": 1},
2692 {"mode": "r", "buffering": 2},
2693 {"mode": "rb", "buffering": 0},
2694 {"mode": "w+"},
2695 {"mode": "w+b"},
2696 {"mode": "w+", "buffering": 1},
2697 {"mode": "w+", "buffering": 2},
2698 {"mode": "w+b", "buffering": 0},
2699 ]:
2700 f = self.open(support.TESTFN, **kwargs)
2701 f.close()
2702 self.assertRaises(ValueError, f.flush)
2703 self.assertRaises(ValueError, f.fileno)
2704 self.assertRaises(ValueError, f.isatty)
2705 self.assertRaises(ValueError, f.__iter__)
2706 if hasattr(f, "peek"):
2707 self.assertRaises(ValueError, f.peek, 1)
2708 self.assertRaises(ValueError, f.read)
2709 if hasattr(f, "read1"):
2710 self.assertRaises(ValueError, f.read1, 1024)
2711 if hasattr(f, "readall"):
2712 self.assertRaises(ValueError, f.readall)
2713 if hasattr(f, "readinto"):
2714 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2715 self.assertRaises(ValueError, f.readline)
2716 self.assertRaises(ValueError, f.readlines)
2717 self.assertRaises(ValueError, f.seek, 0)
2718 self.assertRaises(ValueError, f.tell)
2719 self.assertRaises(ValueError, f.truncate)
2720 self.assertRaises(ValueError, f.write,
2721 b"" if "b" in kwargs['mode'] else "")
2722 self.assertRaises(ValueError, f.writelines, [])
2723 self.assertRaises(ValueError, next, f)
2724
2725 def test_blockingioerror(self):
2726 # Various BlockingIOError issues
2727 self.assertRaises(TypeError, self.BlockingIOError)
2728 self.assertRaises(TypeError, self.BlockingIOError, 1)
2729 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2730 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2731 b = self.BlockingIOError(1, "")
2732 self.assertEqual(b.characters_written, 0)
2733 class C(unicode):
2734 pass
2735 c = C("")
2736 b = self.BlockingIOError(1, c)
2737 c.b = b
2738 b.c = c
2739 wr = weakref.ref(c)
2740 del c, b
2741 support.gc_collect()
2742 self.assertTrue(wr() is None, wr)
2743
2744 def test_abcs(self):
2745 # Test the visible base classes are ABCs.
2746 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2747 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2748 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2749 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
2750
2751 def _check_abc_inheritance(self, abcmodule):
2752 with self.open(support.TESTFN, "wb", buffering=0) as f:
2753 self.assertIsInstance(f, abcmodule.IOBase)
2754 self.assertIsInstance(f, abcmodule.RawIOBase)
2755 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2756 self.assertNotIsInstance(f, abcmodule.TextIOBase)
2757 with self.open(support.TESTFN, "wb") as f:
2758 self.assertIsInstance(f, abcmodule.IOBase)
2759 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2760 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2761 self.assertNotIsInstance(f, abcmodule.TextIOBase)
2762 with self.open(support.TESTFN, "w") as f:
2763 self.assertIsInstance(f, abcmodule.IOBase)
2764 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2765 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2766 self.assertIsInstance(f, abcmodule.TextIOBase)
2767
2768 def test_abc_inheritance(self):
2769 # Test implementations inherit from their respective ABCs
2770 self._check_abc_inheritance(self)
2771
2772 def test_abc_inheritance_official(self):
2773 # Test implementations inherit from the official ABCs of the
2774 # baseline "io" module.
2775 self._check_abc_inheritance(io)
2776
2777 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2778 def test_nonblock_pipe_write_bigbuf(self):
2779 self._test_nonblock_pipe_write(16*1024)
2780
2781 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2782 def test_nonblock_pipe_write_smallbuf(self):
2783 self._test_nonblock_pipe_write(1024)
2784
2785 def _set_non_blocking(self, fd):
2786 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2787 self.assertNotEqual(flags, -1)
2788 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2789 self.assertEqual(res, 0)
2790
2791 def _test_nonblock_pipe_write(self, bufsize):
2792 sent = []
2793 received = []
2794 r, w = os.pipe()
2795 self._set_non_blocking(r)
2796 self._set_non_blocking(w)
2797
2798 # To exercise all code paths in the C implementation we need
2799 # to play with buffer sizes. For instance, if we choose a
2800 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2801 # then we will never get a partial write of the buffer.
2802 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2803 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2804
2805 with rf, wf:
2806 for N in 9999, 73, 7574:
2807 try:
2808 i = 0
2809 while True:
2810 msg = bytes([i % 26 + 97]) * N
2811 sent.append(msg)
2812 wf.write(msg)
2813 i += 1
2814
2815 except self.BlockingIOError as e:
2816 self.assertEqual(e.args[0], errno.EAGAIN)
2817 sent[-1] = sent[-1][:e.characters_written]
2818 received.append(rf.read())
2819 msg = b'BLOCKED'
2820 wf.write(msg)
2821 sent.append(msg)
2822
2823 while True:
2824 try:
2825 wf.flush()
2826 break
2827 except self.BlockingIOError as e:
2828 self.assertEqual(e.args[0], errno.EAGAIN)
2829 self.assertEqual(e.characters_written, 0)
2830 received.append(rf.read())
2831
2832 received += iter(rf.read, None)
2833
2834 sent, received = b''.join(sent), b''.join(received)
2835 self.assertTrue(sent == received)
2836 self.assertTrue(wf.closed)
2837 self.assertTrue(rf.closed)
2838
2839class CMiscIOTest(MiscIOTest):
2840 io = io
2841
2842class PyMiscIOTest(MiscIOTest):
2843 io = pyio
2844
2845
2846@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2847class SignalsTest(unittest.TestCase):
2848
2849 def setUp(self):
2850 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2851
2852 def tearDown(self):
2853 signal.signal(signal.SIGALRM, self.oldalrm)
2854
2855 def alarm_interrupt(self, sig, frame):
2856 1 // 0
2857
2858 @unittest.skipUnless(threading, 'Threading required for this test.')
2859 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2860 'issue #12429: skip test on FreeBSD <= 7')
2861 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2862 """Check that a partial write, when it gets interrupted, properly
2863 invokes the signal handler, and bubbles up the exception raised
2864 in the latter."""
2865 read_results = []
2866 def _read():
2867 s = os.read(r, 1)
2868 read_results.append(s)
2869 t = threading.Thread(target=_read)
2870 t.daemon = True
2871 r, w = os.pipe()
2872 try:
2873 wio = self.io.open(w, **fdopen_kwargs)
2874 t.start()
2875 signal.alarm(1)
2876 # Fill the pipe enough that the write will be blocking.
2877 # It will be interrupted by the timer armed above. Since the
2878 # other thread has read one byte, the low-level write will
2879 # return with a successful (partial) result rather than an EINTR.
2880 # The buffered IO layer must check for pending signal
2881 # handlers, which in this case will invoke alarm_interrupt().
2882 self.assertRaises(ZeroDivisionError,
2883 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
2884 t.join()
2885 # We got one byte, get another one and check that it isn't a
2886 # repeat of the first one.
2887 read_results.append(os.read(r, 1))
2888 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2889 finally:
2890 os.close(w)
2891 os.close(r)
2892 # This is deliberate. If we didn't close the file descriptor
2893 # before closing wio, wio would try to flush its internal
2894 # buffer, and block again.
2895 try:
2896 wio.close()
2897 except IOError as e:
2898 if e.errno != errno.EBADF:
2899 raise
2900
2901 def test_interrupted_write_unbuffered(self):
2902 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2903
2904 def test_interrupted_write_buffered(self):
2905 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2906
2907 def test_interrupted_write_text(self):
2908 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2909
2910 def check_reentrant_write(self, data, **fdopen_kwargs):
2911 def on_alarm(*args):
2912 # Will be called reentrantly from the same thread
2913 wio.write(data)
2914 1//0
2915 signal.signal(signal.SIGALRM, on_alarm)
2916 r, w = os.pipe()
2917 wio = self.io.open(w, **fdopen_kwargs)
2918 try:
2919 signal.alarm(1)
2920 # Either the reentrant call to wio.write() fails with RuntimeError,
2921 # or the signal handler raises ZeroDivisionError.
2922 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2923 while 1:
2924 for i in range(100):
2925 wio.write(data)
2926 wio.flush()
2927 # Make sure the buffer doesn't fill up and block further writes
2928 os.read(r, len(data) * 100)
2929 exc = cm.exception
2930 if isinstance(exc, RuntimeError):
2931 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2932 finally:
2933 wio.close()
2934 os.close(r)
2935
2936 def test_reentrant_write_buffered(self):
2937 self.check_reentrant_write(b"xy", mode="wb")
2938
2939 def test_reentrant_write_text(self):
2940 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2941
2942 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2943 """Check that a buffered read, when it gets interrupted (either
2944 returning a partial result or EINTR), properly invokes the signal
2945 handler and retries if the latter returned successfully."""
2946 r, w = os.pipe()
2947 fdopen_kwargs["closefd"] = False
2948 def alarm_handler(sig, frame):
2949 os.write(w, b"bar")
2950 signal.signal(signal.SIGALRM, alarm_handler)
2951 try:
2952 rio = self.io.open(r, **fdopen_kwargs)
2953 os.write(w, b"foo")
2954 signal.alarm(1)
2955 # Expected behaviour:
2956 # - first raw read() returns partial b"foo"
2957 # - second raw read() returns EINTR
2958 # - third raw read() returns b"bar"
2959 self.assertEqual(decode(rio.read(6)), "foobar")
2960 finally:
2961 rio.close()
2962 os.close(w)
2963 os.close(r)
2964
2965 def test_interrupterd_read_retry_buffered(self):
2966 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2967 mode="rb")
2968
2969 def test_interrupterd_read_retry_text(self):
2970 self.check_interrupted_read_retry(lambda x: x,
2971 mode="r")
2972
2973 @unittest.skipUnless(threading, 'Threading required for this test.')
2974 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2975 """Check that a buffered write, when it gets interrupted (either
2976 returning a partial result or EINTR), properly invokes the signal
2977 handler and retries if the latter returned successfully."""
2978 select = support.import_module("select")
2979 # A quantity that exceeds the buffer size of an anonymous pipe's
2980 # write end.
2981 N = support.PIPE_MAX_SIZE
2982 r, w = os.pipe()
2983 fdopen_kwargs["closefd"] = False
2984 # We need a separate thread to read from the pipe and allow the
2985 # write() to finish. This thread is started after the SIGALRM is
2986 # received (forcing a first EINTR in write()).
2987 read_results = []
2988 write_finished = False
2989 def _read():
2990 while not write_finished:
2991 while r in select.select([r], [], [], 1.0)[0]:
2992 s = os.read(r, 1024)
2993 read_results.append(s)
2994 t = threading.Thread(target=_read)
2995 t.daemon = True
2996 def alarm1(sig, frame):
2997 signal.signal(signal.SIGALRM, alarm2)
2998 signal.alarm(1)
2999 def alarm2(sig, frame):
3000 t.start()
3001 signal.signal(signal.SIGALRM, alarm1)
3002 try:
3003 wio = self.io.open(w, **fdopen_kwargs)
3004 signal.alarm(1)
3005 # Expected behaviour:
3006 # - first raw write() is partial (because of the limited pipe buffer
3007 # and the first alarm)
3008 # - second raw write() returns EINTR (because of the second alarm)
3009 # - subsequent write()s are successful (either partial or complete)
3010 self.assertEqual(N, wio.write(item * N))
3011 wio.flush()
3012 write_finished = True
3013 t.join()
3014 self.assertEqual(N, sum(len(x) for x in read_results))
3015 finally:
3016 write_finished = True
3017 os.close(w)
3018 os.close(r)
3019 # This is deliberate. If we didn't close the file descriptor
3020 # before closing wio, wio would try to flush its internal
3021 # buffer, and could block (in case of failure).
3022 try:
3023 wio.close()
3024 except IOError as e:
3025 if e.errno != errno.EBADF:
3026 raise
3027
3028 def test_interrupterd_write_retry_buffered(self):
3029 self.check_interrupted_write_retry(b"x", mode="wb")
3030
3031 def test_interrupterd_write_retry_text(self):
3032 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3033
3034
3035class CSignalsTest(SignalsTest):
3036 io = io
3037
3038class PySignalsTest(SignalsTest):
3039 io = pyio
3040
3041 # Handling reentrancy issues would slow down _pyio even more, so the
3042 # tests are disabled.
3043 test_reentrant_write_buffered = None
3044 test_reentrant_write_text = None
3045
3046
3047def test_main():
3048 tests = (CIOTest, PyIOTest,
3049 CBufferedReaderTest, PyBufferedReaderTest,
3050 CBufferedWriterTest, PyBufferedWriterTest,
3051 CBufferedRWPairTest, PyBufferedRWPairTest,
3052 CBufferedRandomTest, PyBufferedRandomTest,
3053 StatefulIncrementalDecoderTest,
3054 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3055 CTextIOWrapperTest, PyTextIOWrapperTest,
3056 CMiscIOTest, PyMiscIOTest,
3057 CSignalsTest, PySignalsTest,
3058 )
3059
3060 # Put the namespaces of the IO module we are testing and some useful mock
3061 # classes in the __dict__ of each test.
3062 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
3063 MockNonBlockWriterIO, MockRawIOWithoutRead)
3064 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3065 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3066 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3067 globs = globals()
3068 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3069 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3070 # Avoid turning open into a bound method.
3071 py_io_ns["open"] = pyio.OpenWrapper
3072 for test in tests:
3073 if test.__name__.startswith("C"):
3074 for name, obj in c_io_ns.items():
3075 setattr(test, name, obj)
3076 elif test.__name__.startswith("Py"):
3077 for name, obj in py_io_ns.items():
3078 setattr(test, name, obj)
3079
3080 support.run_unittest(*tests)
3081
3082if __name__ == "__main__":
3083 test_main()
Note: See TracBrowser for help on using the repository browser.