1 | """Unit tests for the io module."""
|
---|
2 |
|
---|
3 | # Tests of io are scattered over the test suite:
|
---|
4 | # * test_bufio - tests file buffering
|
---|
5 | # * test_memoryio - tests BytesIO and StringIO
|
---|
6 | # * test_fileio - tests FileIO
|
---|
7 | # * test_file - tests the file interface
|
---|
8 | # * test_io - tests everything else in the io module
|
---|
9 | # * test_univnewlines - tests universal newline support
|
---|
10 | # * test_largefile - tests operations on a file greater than 2**32 bytes
|
---|
11 | # (only enabled with -ulargefile)
|
---|
12 |
|
---|
13 | ################################################################################
|
---|
14 | # ATTENTION TEST WRITERS!!!
|
---|
15 | ################################################################################
|
---|
16 | # When writing tests for io, it's important to test both the C and Python
|
---|
17 | # implementations. This is usually done by writing a base test that refers to
|
---|
18 | # the type it is testing as a attribute. Then it provides custom subclasses to
|
---|
19 | # test both implementations. This file has lots of examples.
|
---|
20 | ################################################################################
|
---|
21 |
|
---|
22 | from __future__ import print_function
|
---|
23 | from __future__ import unicode_literals
|
---|
24 |
|
---|
25 | import os
|
---|
26 | import sys
|
---|
27 | import time
|
---|
28 | import array
|
---|
29 | import random
|
---|
30 | import unittest
|
---|
31 | import weakref
|
---|
32 | import abc
|
---|
33 | import signal
|
---|
34 | import errno
|
---|
35 | from itertools import cycle, count
|
---|
36 | from collections import deque
|
---|
37 | from UserList import UserList
|
---|
38 | from test import test_support as support
|
---|
39 | import contextlib
|
---|
40 |
|
---|
41 | import codecs
|
---|
42 | import io # C implementation of io
|
---|
43 | import _pyio as pyio # Python implementation of io
|
---|
44 | try:
|
---|
45 | import threading
|
---|
46 | except ImportError:
|
---|
47 | threading = None
|
---|
48 | try:
|
---|
49 | import fcntl
|
---|
50 | except ImportError:
|
---|
51 | fcntl = None
|
---|
52 |
|
---|
53 | __metaclass__ = type
|
---|
54 | bytes = support.py3k_bytes
|
---|
55 |
|
---|
56 | def _default_chunk_size():
|
---|
57 | """Get the default TextIOWrapper chunk size"""
|
---|
58 | with io.open(__file__, "r", encoding="latin1") as f:
|
---|
59 | return f._CHUNK_SIZE
|
---|
60 |
|
---|
61 |
|
---|
62 | class MockRawIOWithoutRead:
|
---|
63 | """A RawIO implementation without read(), so as to exercise the default
|
---|
64 | RawIO.read() which calls readinto()."""
|
---|
65 |
|
---|
66 | def __init__(self, read_stack=()):
|
---|
67 | self._read_stack = list(read_stack)
|
---|
68 | self._write_stack = []
|
---|
69 | self._reads = 0
|
---|
70 | self._extraneous_reads = 0
|
---|
71 |
|
---|
72 | def write(self, b):
|
---|
73 | self._write_stack.append(bytes(b))
|
---|
74 | return len(b)
|
---|
75 |
|
---|
76 | def writable(self):
|
---|
77 | return True
|
---|
78 |
|
---|
79 | def fileno(self):
|
---|
80 | return 42
|
---|
81 |
|
---|
82 | def readable(self):
|
---|
83 | return True
|
---|
84 |
|
---|
85 | def seekable(self):
|
---|
86 | return True
|
---|
87 |
|
---|
88 | def seek(self, pos, whence):
|
---|
89 | return 0 # wrong but we gotta return something
|
---|
90 |
|
---|
91 | def tell(self):
|
---|
92 | return 0 # same comment as above
|
---|
93 |
|
---|
94 | def readinto(self, buf):
|
---|
95 | self._reads += 1
|
---|
96 | max_len = len(buf)
|
---|
97 | try:
|
---|
98 | data = self._read_stack[0]
|
---|
99 | except IndexError:
|
---|
100 | self._extraneous_reads += 1
|
---|
101 | return 0
|
---|
102 | if data is None:
|
---|
103 | del self._read_stack[0]
|
---|
104 | return None
|
---|
105 | n = len(data)
|
---|
106 | if len(data) <= max_len:
|
---|
107 | del self._read_stack[0]
|
---|
108 | buf[:n] = data
|
---|
109 | return n
|
---|
110 | else:
|
---|
111 | buf[:] = data[:max_len]
|
---|
112 | self._read_stack[0] = data[max_len:]
|
---|
113 | return max_len
|
---|
114 |
|
---|
115 | def truncate(self, pos=None):
|
---|
116 | return pos
|
---|
117 |
|
---|
118 | class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
|
---|
119 | pass
|
---|
120 |
|
---|
121 | class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
|
---|
122 | pass
|
---|
123 |
|
---|
124 |
|
---|
125 | class MockRawIO(MockRawIOWithoutRead):
|
---|
126 |
|
---|
127 | def read(self, n=None):
|
---|
128 | self._reads += 1
|
---|
129 | try:
|
---|
130 | return self._read_stack.pop(0)
|
---|
131 | except:
|
---|
132 | self._extraneous_reads += 1
|
---|
133 | return b""
|
---|
134 |
|
---|
135 | class CMockRawIO(MockRawIO, io.RawIOBase):
|
---|
136 | pass
|
---|
137 |
|
---|
138 | class PyMockRawIO(MockRawIO, pyio.RawIOBase):
|
---|
139 | pass
|
---|
140 |
|
---|
141 |
|
---|
142 | class MisbehavedRawIO(MockRawIO):
|
---|
143 | def write(self, b):
|
---|
144 | return MockRawIO.write(self, b) * 2
|
---|
145 |
|
---|
146 | def read(self, n=None):
|
---|
147 | return MockRawIO.read(self, n) * 2
|
---|
148 |
|
---|
149 | def seek(self, pos, whence):
|
---|
150 | return -123
|
---|
151 |
|
---|
152 | def tell(self):
|
---|
153 | return -456
|
---|
154 |
|
---|
155 | def readinto(self, buf):
|
---|
156 | MockRawIO.readinto(self, buf)
|
---|
157 | return len(buf) * 5
|
---|
158 |
|
---|
159 | class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
|
---|
160 | pass
|
---|
161 |
|
---|
162 | class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
|
---|
163 | pass
|
---|
164 |
|
---|
165 |
|
---|
166 | class CloseFailureIO(MockRawIO):
|
---|
167 | closed = 0
|
---|
168 |
|
---|
169 | def close(self):
|
---|
170 | if not self.closed:
|
---|
171 | self.closed = 1
|
---|
172 | raise IOError
|
---|
173 |
|
---|
174 | class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
|
---|
175 | pass
|
---|
176 |
|
---|
177 | class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
|
---|
178 | pass
|
---|
179 |
|
---|
180 |
|
---|
181 | class MockFileIO:
|
---|
182 |
|
---|
183 | def __init__(self, data):
|
---|
184 | self.read_history = []
|
---|
185 | super(MockFileIO, self).__init__(data)
|
---|
186 |
|
---|
187 | def read(self, n=None):
|
---|
188 | res = super(MockFileIO, self).read(n)
|
---|
189 | self.read_history.append(None if res is None else len(res))
|
---|
190 | return res
|
---|
191 |
|
---|
192 | def readinto(self, b):
|
---|
193 | res = super(MockFileIO, self).readinto(b)
|
---|
194 | self.read_history.append(res)
|
---|
195 | return res
|
---|
196 |
|
---|
197 | class CMockFileIO(MockFileIO, io.BytesIO):
|
---|
198 | pass
|
---|
199 |
|
---|
200 | class PyMockFileIO(MockFileIO, pyio.BytesIO):
|
---|
201 | pass
|
---|
202 |
|
---|
203 |
|
---|
204 | class MockNonBlockWriterIO:
|
---|
205 |
|
---|
206 | def __init__(self):
|
---|
207 | self._write_stack = []
|
---|
208 | self._blocker_char = None
|
---|
209 |
|
---|
210 | def pop_written(self):
|
---|
211 | s = b"".join(self._write_stack)
|
---|
212 | self._write_stack[:] = []
|
---|
213 | return s
|
---|
214 |
|
---|
215 | def block_on(self, char):
|
---|
216 | """Block when a given char is encountered."""
|
---|
217 | self._blocker_char = char
|
---|
218 |
|
---|
219 | def readable(self):
|
---|
220 | return True
|
---|
221 |
|
---|
222 | def seekable(self):
|
---|
223 | return True
|
---|
224 |
|
---|
225 | def writable(self):
|
---|
226 | return True
|
---|
227 |
|
---|
228 | def write(self, b):
|
---|
229 | b = bytes(b)
|
---|
230 | n = -1
|
---|
231 | if self._blocker_char:
|
---|
232 | try:
|
---|
233 | n = b.index(self._blocker_char)
|
---|
234 | except ValueError:
|
---|
235 | pass
|
---|
236 | else:
|
---|
237 | if n > 0:
|
---|
238 | # write data up to the first blocker
|
---|
239 | self._write_stack.append(b[:n])
|
---|
240 | return n
|
---|
241 | else:
|
---|
242 | # cancel blocker and indicate would block
|
---|
243 | self._blocker_char = None
|
---|
244 | return None
|
---|
245 | self._write_stack.append(b)
|
---|
246 | return len(b)
|
---|
247 |
|
---|
248 | class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
|
---|
249 | BlockingIOError = io.BlockingIOError
|
---|
250 |
|
---|
251 | class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
|
---|
252 | BlockingIOError = pyio.BlockingIOError
|
---|
253 |
|
---|
254 |
|
---|
255 | class IOTest(unittest.TestCase):
|
---|
256 |
|
---|
257 | def setUp(self):
|
---|
258 | support.unlink(support.TESTFN)
|
---|
259 |
|
---|
260 | def tearDown(self):
|
---|
261 | support.unlink(support.TESTFN)
|
---|
262 |
|
---|
263 | def write_ops(self, f):
|
---|
264 | self.assertEqual(f.write(b"blah."), 5)
|
---|
265 | f.truncate(0)
|
---|
266 | self.assertEqual(f.tell(), 5)
|
---|
267 | f.seek(0)
|
---|
268 |
|
---|
269 | self.assertEqual(f.write(b"blah."), 5)
|
---|
270 | self.assertEqual(f.seek(0), 0)
|
---|
271 | self.assertEqual(f.write(b"Hello."), 6)
|
---|
272 | self.assertEqual(f.tell(), 6)
|
---|
273 | self.assertEqual(f.seek(-1, 1), 5)
|
---|
274 | self.assertEqual(f.tell(), 5)
|
---|
275 | self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
|
---|
276 | self.assertEqual(f.seek(0), 0)
|
---|
277 | self.assertEqual(f.write(b"h"), 1)
|
---|
278 | self.assertEqual(f.seek(-1, 2), 13)
|
---|
279 | self.assertEqual(f.tell(), 13)
|
---|
280 |
|
---|
281 | self.assertEqual(f.truncate(12), 12)
|
---|
282 | self.assertEqual(f.tell(), 13)
|
---|
283 | self.assertRaises(TypeError, f.seek, 0.0)
|
---|
284 |
|
---|
285 | def read_ops(self, f, buffered=False):
|
---|
286 | data = f.read(5)
|
---|
287 | self.assertEqual(data, b"hello")
|
---|
288 | data = bytearray(data)
|
---|
289 | self.assertEqual(f.readinto(data), 5)
|
---|
290 | self.assertEqual(data, b" worl")
|
---|
291 | self.assertEqual(f.readinto(data), 2)
|
---|
292 | self.assertEqual(len(data), 5)
|
---|
293 | self.assertEqual(data[:2], b"d\n")
|
---|
294 | self.assertEqual(f.seek(0), 0)
|
---|
295 | self.assertEqual(f.read(20), b"hello world\n")
|
---|
296 | self.assertEqual(f.read(1), b"")
|
---|
297 | self.assertEqual(f.readinto(bytearray(b"x")), 0)
|
---|
298 | self.assertEqual(f.seek(-6, 2), 6)
|
---|
299 | self.assertEqual(f.read(5), b"world")
|
---|
300 | self.assertEqual(f.read(0), b"")
|
---|
301 | self.assertEqual(f.readinto(bytearray()), 0)
|
---|
302 | self.assertEqual(f.seek(-6, 1), 5)
|
---|
303 | self.assertEqual(f.read(5), b" worl")
|
---|
304 | self.assertEqual(f.tell(), 10)
|
---|
305 | self.assertRaises(TypeError, f.seek, 0.0)
|
---|
306 | if buffered:
|
---|
307 | f.seek(0)
|
---|
308 | self.assertEqual(f.read(), b"hello world\n")
|
---|
309 | f.seek(6)
|
---|
310 | self.assertEqual(f.read(), b"world\n")
|
---|
311 | self.assertEqual(f.read(), b"")
|
---|
312 |
|
---|
313 | LARGE = 2**31
|
---|
314 |
|
---|
315 | def large_file_ops(self, f):
|
---|
316 | assert f.readable()
|
---|
317 | assert f.writable()
|
---|
318 | self.assertEqual(f.seek(self.LARGE), self.LARGE)
|
---|
319 | self.assertEqual(f.tell(), self.LARGE)
|
---|
320 | self.assertEqual(f.write(b"xxx"), 3)
|
---|
321 | self.assertEqual(f.tell(), self.LARGE + 3)
|
---|
322 | self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
|
---|
323 | self.assertEqual(f.truncate(), self.LARGE + 2)
|
---|
324 | self.assertEqual(f.tell(), self.LARGE + 2)
|
---|
325 | self.assertEqual(f.seek(0, 2), self.LARGE + 2)
|
---|
326 | self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
|
---|
327 | self.assertEqual(f.tell(), self.LARGE + 2)
|
---|
328 | self.assertEqual(f.seek(0, 2), self.LARGE + 1)
|
---|
329 | self.assertEqual(f.seek(-1, 2), self.LARGE)
|
---|
330 | self.assertEqual(f.read(2), b"x")
|
---|
331 |
|
---|
332 | def test_invalid_operations(self):
|
---|
333 | # Try writing on a file opened in read mode and vice-versa.
|
---|
334 | for mode in ("w", "wb"):
|
---|
335 | with self.open(support.TESTFN, mode) as fp:
|
---|
336 | self.assertRaises(IOError, fp.read)
|
---|
337 | self.assertRaises(IOError, fp.readline)
|
---|
338 | with self.open(support.TESTFN, "rb") as fp:
|
---|
339 | self.assertRaises(IOError, fp.write, b"blah")
|
---|
340 | self.assertRaises(IOError, fp.writelines, [b"blah\n"])
|
---|
341 | with self.open(support.TESTFN, "r") as fp:
|
---|
342 | self.assertRaises(IOError, fp.write, "blah")
|
---|
343 | self.assertRaises(IOError, fp.writelines, ["blah\n"])
|
---|
344 |
|
---|
345 | def test_raw_file_io(self):
|
---|
346 | with self.open(support.TESTFN, "wb", buffering=0) as f:
|
---|
347 | self.assertEqual(f.readable(), False)
|
---|
348 | self.assertEqual(f.writable(), True)
|
---|
349 | self.assertEqual(f.seekable(), True)
|
---|
350 | self.write_ops(f)
|
---|
351 | with self.open(support.TESTFN, "rb", buffering=0) as f:
|
---|
352 | self.assertEqual(f.readable(), True)
|
---|
353 | self.assertEqual(f.writable(), False)
|
---|
354 | self.assertEqual(f.seekable(), True)
|
---|
355 | self.read_ops(f)
|
---|
356 |
|
---|
357 | def test_buffered_file_io(self):
|
---|
358 | with self.open(support.TESTFN, "wb") as f:
|
---|
359 | self.assertEqual(f.readable(), False)
|
---|
360 | self.assertEqual(f.writable(), True)
|
---|
361 | self.assertEqual(f.seekable(), True)
|
---|
362 | self.write_ops(f)
|
---|
363 | with self.open(support.TESTFN, "rb") as f:
|
---|
364 | self.assertEqual(f.readable(), True)
|
---|
365 | self.assertEqual(f.writable(), False)
|
---|
366 | self.assertEqual(f.seekable(), True)
|
---|
367 | self.read_ops(f, True)
|
---|
368 |
|
---|
369 | def test_readline(self):
|
---|
370 | with self.open(support.TESTFN, "wb") as f:
|
---|
371 | f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
|
---|
372 | with self.open(support.TESTFN, "rb") as f:
|
---|
373 | self.assertEqual(f.readline(), b"abc\n")
|
---|
374 | self.assertEqual(f.readline(10), b"def\n")
|
---|
375 | self.assertEqual(f.readline(2), b"xy")
|
---|
376 | self.assertEqual(f.readline(4), b"zzy\n")
|
---|
377 | self.assertEqual(f.readline(), b"foo\x00bar\n")
|
---|
378 | self.assertEqual(f.readline(None), b"another line")
|
---|
379 | self.assertRaises(TypeError, f.readline, 5.3)
|
---|
380 | with self.open(support.TESTFN, "r") as f:
|
---|
381 | self.assertRaises(TypeError, f.readline, 5.3)
|
---|
382 |
|
---|
383 | def test_raw_bytes_io(self):
|
---|
384 | f = self.BytesIO()
|
---|
385 | self.write_ops(f)
|
---|
386 | data = f.getvalue()
|
---|
387 | self.assertEqual(data, b"hello world\n")
|
---|
388 | f = self.BytesIO(data)
|
---|
389 | self.read_ops(f, True)
|
---|
390 |
|
---|
391 | def test_large_file_ops(self):
|
---|
392 | # On Windows and Mac OSX this test comsumes large resources; It takes
|
---|
393 | # a long time to build the >2GB file and takes >2GB of disk space
|
---|
394 | # therefore the resource must be enabled to run this test.
|
---|
395 | if sys.platform[:3] == 'win' or sys.platform == 'darwin':
|
---|
396 | if not support.is_resource_enabled("largefile"):
|
---|
397 | print("\nTesting large file ops skipped on %s." % sys.platform,
|
---|
398 | file=sys.stderr)
|
---|
399 | print("It requires %d bytes and a long time." % self.LARGE,
|
---|
400 | file=sys.stderr)
|
---|
401 | print("Use 'regrtest.py -u largefile test_io' to run it.",
|
---|
402 | file=sys.stderr)
|
---|
403 | return
|
---|
404 | with self.open(support.TESTFN, "w+b", 0) as f:
|
---|
405 | self.large_file_ops(f)
|
---|
406 | with self.open(support.TESTFN, "w+b") as f:
|
---|
407 | self.large_file_ops(f)
|
---|
408 |
|
---|
409 | def test_with_open(self):
|
---|
410 | for bufsize in (0, 1, 100):
|
---|
411 | f = None
|
---|
412 | with self.open(support.TESTFN, "wb", bufsize) as f:
|
---|
413 | f.write(b"xxx")
|
---|
414 | self.assertEqual(f.closed, True)
|
---|
415 | f = None
|
---|
416 | try:
|
---|
417 | with self.open(support.TESTFN, "wb", bufsize) as f:
|
---|
418 | 1 // 0
|
---|
419 | except ZeroDivisionError:
|
---|
420 | self.assertEqual(f.closed, True)
|
---|
421 | else:
|
---|
422 | self.fail("1 // 0 didn't raise an exception")
|
---|
423 |
|
---|
424 | # issue 5008
|
---|
425 | def test_append_mode_tell(self):
|
---|
426 | with self.open(support.TESTFN, "wb") as f:
|
---|
427 | f.write(b"xxx")
|
---|
428 | with self.open(support.TESTFN, "ab", buffering=0) as f:
|
---|
429 | self.assertEqual(f.tell(), 3)
|
---|
430 | with self.open(support.TESTFN, "ab") as f:
|
---|
431 | self.assertEqual(f.tell(), 3)
|
---|
432 | with self.open(support.TESTFN, "a") as f:
|
---|
433 | self.assertTrue(f.tell() > 0)
|
---|
434 |
|
---|
435 | def test_destructor(self):
|
---|
436 | record = []
|
---|
437 | class MyFileIO(self.FileIO):
|
---|
438 | def __del__(self):
|
---|
439 | record.append(1)
|
---|
440 | try:
|
---|
441 | f = super(MyFileIO, self).__del__
|
---|
442 | except AttributeError:
|
---|
443 | pass
|
---|
444 | else:
|
---|
445 | f()
|
---|
446 | def close(self):
|
---|
447 | record.append(2)
|
---|
448 | super(MyFileIO, self).close()
|
---|
449 | def flush(self):
|
---|
450 | record.append(3)
|
---|
451 | super(MyFileIO, self).flush()
|
---|
452 | f = MyFileIO(support.TESTFN, "wb")
|
---|
453 | f.write(b"xxx")
|
---|
454 | del f
|
---|
455 | support.gc_collect()
|
---|
456 | self.assertEqual(record, [1, 2, 3])
|
---|
457 | with self.open(support.TESTFN, "rb") as f:
|
---|
458 | self.assertEqual(f.read(), b"xxx")
|
---|
459 |
|
---|
460 | def _check_base_destructor(self, base):
|
---|
461 | record = []
|
---|
462 | class MyIO(base):
|
---|
463 | def __init__(self):
|
---|
464 | # This exercises the availability of attributes on object
|
---|
465 | # destruction.
|
---|
466 | # (in the C version, close() is called by the tp_dealloc
|
---|
467 | # function, not by __del__)
|
---|
468 | self.on_del = 1
|
---|
469 | self.on_close = 2
|
---|
470 | self.on_flush = 3
|
---|
471 | def __del__(self):
|
---|
472 | record.append(self.on_del)
|
---|
473 | try:
|
---|
474 | f = super(MyIO, self).__del__
|
---|
475 | except AttributeError:
|
---|
476 | pass
|
---|
477 | else:
|
---|
478 | f()
|
---|
479 | def close(self):
|
---|
480 | record.append(self.on_close)
|
---|
481 | super(MyIO, self).close()
|
---|
482 | def flush(self):
|
---|
483 | record.append(self.on_flush)
|
---|
484 | super(MyIO, self).flush()
|
---|
485 | f = MyIO()
|
---|
486 | del f
|
---|
487 | support.gc_collect()
|
---|
488 | self.assertEqual(record, [1, 2, 3])
|
---|
489 |
|
---|
490 | def test_IOBase_destructor(self):
|
---|
491 | self._check_base_destructor(self.IOBase)
|
---|
492 |
|
---|
493 | def test_RawIOBase_destructor(self):
|
---|
494 | self._check_base_destructor(self.RawIOBase)
|
---|
495 |
|
---|
496 | def test_BufferedIOBase_destructor(self):
|
---|
497 | self._check_base_destructor(self.BufferedIOBase)
|
---|
498 |
|
---|
499 | def test_TextIOBase_destructor(self):
|
---|
500 | self._check_base_destructor(self.TextIOBase)
|
---|
501 |
|
---|
502 | def test_close_flushes(self):
|
---|
503 | with self.open(support.TESTFN, "wb") as f:
|
---|
504 | f.write(b"xxx")
|
---|
505 | with self.open(support.TESTFN, "rb") as f:
|
---|
506 | self.assertEqual(f.read(), b"xxx")
|
---|
507 |
|
---|
508 | def test_array_writes(self):
|
---|
509 | a = array.array(b'i', range(10))
|
---|
510 | n = len(a.tostring())
|
---|
511 | with self.open(support.TESTFN, "wb", 0) as f:
|
---|
512 | self.assertEqual(f.write(a), n)
|
---|
513 | with self.open(support.TESTFN, "wb") as f:
|
---|
514 | self.assertEqual(f.write(a), n)
|
---|
515 |
|
---|
516 | def test_closefd(self):
|
---|
517 | self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
|
---|
518 | closefd=False)
|
---|
519 |
|
---|
520 | def test_read_closed(self):
|
---|
521 | with self.open(support.TESTFN, "w") as f:
|
---|
522 | f.write("egg\n")
|
---|
523 | with self.open(support.TESTFN, "r") as f:
|
---|
524 | file = self.open(f.fileno(), "r", closefd=False)
|
---|
525 | self.assertEqual(file.read(), "egg\n")
|
---|
526 | file.seek(0)
|
---|
527 | file.close()
|
---|
528 | self.assertRaises(ValueError, file.read)
|
---|
529 |
|
---|
530 | def test_no_closefd_with_filename(self):
|
---|
531 | # can't use closefd in combination with a file name
|
---|
532 | self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
|
---|
533 |
|
---|
534 | def test_closefd_attr(self):
|
---|
535 | with self.open(support.TESTFN, "wb") as f:
|
---|
536 | f.write(b"egg\n")
|
---|
537 | with self.open(support.TESTFN, "r") as f:
|
---|
538 | self.assertEqual(f.buffer.raw.closefd, True)
|
---|
539 | file = self.open(f.fileno(), "r", closefd=False)
|
---|
540 | self.assertEqual(file.buffer.raw.closefd, False)
|
---|
541 |
|
---|
542 | def test_garbage_collection(self):
|
---|
543 | # FileIO objects are collected, and collecting them flushes
|
---|
544 | # all data to disk.
|
---|
545 | f = self.FileIO(support.TESTFN, "wb")
|
---|
546 | f.write(b"abcxxx")
|
---|
547 | f.f = f
|
---|
548 | wr = weakref.ref(f)
|
---|
549 | del f
|
---|
550 | support.gc_collect()
|
---|
551 | self.assertTrue(wr() is None, wr)
|
---|
552 | with self.open(support.TESTFN, "rb") as f:
|
---|
553 | self.assertEqual(f.read(), b"abcxxx")
|
---|
554 |
|
---|
555 | def test_unbounded_file(self):
|
---|
556 | # Issue #1174606: reading from an unbounded stream such as /dev/zero.
|
---|
557 | zero = "/dev/zero"
|
---|
558 | if not os.path.exists(zero):
|
---|
559 | self.skipTest("{0} does not exist".format(zero))
|
---|
560 | if sys.maxsize > 0x7FFFFFFF:
|
---|
561 | self.skipTest("test can only run in a 32-bit address space")
|
---|
562 | if support.real_max_memuse < support._2G:
|
---|
563 | self.skipTest("test requires at least 2GB of memory")
|
---|
564 | with self.open(zero, "rb", buffering=0) as f:
|
---|
565 | self.assertRaises(OverflowError, f.read)
|
---|
566 | with self.open(zero, "rb") as f:
|
---|
567 | self.assertRaises(OverflowError, f.read)
|
---|
568 | with self.open(zero, "r") as f:
|
---|
569 | self.assertRaises(OverflowError, f.read)
|
---|
570 |
|
---|
571 | def test_flush_error_on_close(self):
|
---|
572 | f = self.open(support.TESTFN, "wb", buffering=0)
|
---|
573 | def bad_flush():
|
---|
574 | raise IOError()
|
---|
575 | f.flush = bad_flush
|
---|
576 | self.assertRaises(IOError, f.close) # exception not swallowed
|
---|
577 | self.assertTrue(f.closed)
|
---|
578 |
|
---|
579 | def test_multi_close(self):
|
---|
580 | f = self.open(support.TESTFN, "wb", buffering=0)
|
---|
581 | f.close()
|
---|
582 | f.close()
|
---|
583 | f.close()
|
---|
584 | self.assertRaises(ValueError, f.flush)
|
---|
585 |
|
---|
586 | def test_RawIOBase_read(self):
|
---|
587 | # Exercise the default RawIOBase.read() implementation (which calls
|
---|
588 | # readinto() internally).
|
---|
589 | rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
|
---|
590 | self.assertEqual(rawio.read(2), b"ab")
|
---|
591 | self.assertEqual(rawio.read(2), b"c")
|
---|
592 | self.assertEqual(rawio.read(2), b"d")
|
---|
593 | self.assertEqual(rawio.read(2), None)
|
---|
594 | self.assertEqual(rawio.read(2), b"ef")
|
---|
595 | self.assertEqual(rawio.read(2), b"g")
|
---|
596 | self.assertEqual(rawio.read(2), None)
|
---|
597 | self.assertEqual(rawio.read(2), b"")
|
---|
598 |
|
---|
599 | def test_fileio_closefd(self):
|
---|
600 | # Issue #4841
|
---|
601 | with self.open(__file__, 'rb') as f1, \
|
---|
602 | self.open(__file__, 'rb') as f2:
|
---|
603 | fileio = self.FileIO(f1.fileno(), closefd=False)
|
---|
604 | # .__init__() must not close f1
|
---|
605 | fileio.__init__(f2.fileno(), closefd=False)
|
---|
606 | f1.readline()
|
---|
607 | # .close() must not close f2
|
---|
608 | fileio.close()
|
---|
609 | f2.readline()
|
---|
610 |
|
---|
611 |
|
---|
612 | class CIOTest(IOTest):
|
---|
613 |
|
---|
614 | def test_IOBase_finalize(self):
|
---|
615 | # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
|
---|
616 | # class which inherits IOBase and an object of this class are caught
|
---|
617 | # in a reference cycle and close() is already in the method cache.
|
---|
618 | class MyIO(self.IOBase):
|
---|
619 | def close(self):
|
---|
620 | pass
|
---|
621 |
|
---|
622 | # create an instance to populate the method cache
|
---|
623 | MyIO()
|
---|
624 | obj = MyIO()
|
---|
625 | obj.obj = obj
|
---|
626 | wr = weakref.ref(obj)
|
---|
627 | del MyIO
|
---|
628 | del obj
|
---|
629 | support.gc_collect()
|
---|
630 | self.assertTrue(wr() is None, wr)
|
---|
631 |
|
---|
632 | class PyIOTest(IOTest):
|
---|
633 | test_array_writes = unittest.skip(
|
---|
634 | "len(array.array) returns number of elements rather than bytelength"
|
---|
635 | )(IOTest.test_array_writes)
|
---|
636 |
|
---|
637 |
|
---|
638 | class CommonBufferedTests:
|
---|
639 | # Tests common to BufferedReader, BufferedWriter and BufferedRandom
|
---|
640 |
|
---|
641 | def test_detach(self):
|
---|
642 | raw = self.MockRawIO()
|
---|
643 | buf = self.tp(raw)
|
---|
644 | self.assertIs(buf.detach(), raw)
|
---|
645 | self.assertRaises(ValueError, buf.detach)
|
---|
646 |
|
---|
647 | def test_fileno(self):
|
---|
648 | rawio = self.MockRawIO()
|
---|
649 | bufio = self.tp(rawio)
|
---|
650 |
|
---|
651 | self.assertEqual(42, bufio.fileno())
|
---|
652 |
|
---|
653 | def test_no_fileno(self):
|
---|
654 | # XXX will we always have fileno() function? If so, kill
|
---|
655 | # this test. Else, write it.
|
---|
656 | pass
|
---|
657 |
|
---|
658 | def test_invalid_args(self):
|
---|
659 | rawio = self.MockRawIO()
|
---|
660 | bufio = self.tp(rawio)
|
---|
661 | # Invalid whence
|
---|
662 | self.assertRaises(ValueError, bufio.seek, 0, -1)
|
---|
663 | self.assertRaises(ValueError, bufio.seek, 0, 3)
|
---|
664 |
|
---|
665 | def test_override_destructor(self):
|
---|
666 | tp = self.tp
|
---|
667 | record = []
|
---|
668 | class MyBufferedIO(tp):
|
---|
669 | def __del__(self):
|
---|
670 | record.append(1)
|
---|
671 | try:
|
---|
672 | f = super(MyBufferedIO, self).__del__
|
---|
673 | except AttributeError:
|
---|
674 | pass
|
---|
675 | else:
|
---|
676 | f()
|
---|
677 | def close(self):
|
---|
678 | record.append(2)
|
---|
679 | super(MyBufferedIO, self).close()
|
---|
680 | def flush(self):
|
---|
681 | record.append(3)
|
---|
682 | super(MyBufferedIO, self).flush()
|
---|
683 | rawio = self.MockRawIO()
|
---|
684 | bufio = MyBufferedIO(rawio)
|
---|
685 | writable = bufio.writable()
|
---|
686 | del bufio
|
---|
687 | support.gc_collect()
|
---|
688 | if writable:
|
---|
689 | self.assertEqual(record, [1, 2, 3])
|
---|
690 | else:
|
---|
691 | self.assertEqual(record, [1, 2])
|
---|
692 |
|
---|
693 | def test_context_manager(self):
|
---|
694 | # Test usability as a context manager
|
---|
695 | rawio = self.MockRawIO()
|
---|
696 | bufio = self.tp(rawio)
|
---|
697 | def _with():
|
---|
698 | with bufio:
|
---|
699 | pass
|
---|
700 | _with()
|
---|
701 | # bufio should now be closed, and using it a second time should raise
|
---|
702 | # a ValueError.
|
---|
703 | self.assertRaises(ValueError, _with)
|
---|
704 |
|
---|
705 | def test_error_through_destructor(self):
|
---|
706 | # Test that the exception state is not modified by a destructor,
|
---|
707 | # even if close() fails.
|
---|
708 | rawio = self.CloseFailureIO()
|
---|
709 | def f():
|
---|
710 | self.tp(rawio).xyzzy
|
---|
711 | with support.captured_output("stderr") as s:
|
---|
712 | self.assertRaises(AttributeError, f)
|
---|
713 | s = s.getvalue().strip()
|
---|
714 | if s:
|
---|
715 | # The destructor *may* have printed an unraisable error, check it
|
---|
716 | self.assertEqual(len(s.splitlines()), 1)
|
---|
717 | self.assertTrue(s.startswith("Exception IOError: "), s)
|
---|
718 | self.assertTrue(s.endswith(" ignored"), s)
|
---|
719 |
|
---|
720 | def test_repr(self):
|
---|
721 | raw = self.MockRawIO()
|
---|
722 | b = self.tp(raw)
|
---|
723 | clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
|
---|
724 | self.assertEqual(repr(b), "<%s>" % clsname)
|
---|
725 | raw.name = "dummy"
|
---|
726 | self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
|
---|
727 | raw.name = b"dummy"
|
---|
728 | self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
|
---|
729 |
|
---|
730 | def test_flush_error_on_close(self):
|
---|
731 | raw = self.MockRawIO()
|
---|
732 | def bad_flush():
|
---|
733 | raise IOError()
|
---|
734 | raw.flush = bad_flush
|
---|
735 | b = self.tp(raw)
|
---|
736 | self.assertRaises(IOError, b.close) # exception not swallowed
|
---|
737 | self.assertTrue(b.closed)
|
---|
738 |
|
---|
739 | def test_close_error_on_close(self):
|
---|
740 | raw = self.MockRawIO()
|
---|
741 | def bad_flush():
|
---|
742 | raise IOError('flush')
|
---|
743 | def bad_close():
|
---|
744 | raise IOError('close')
|
---|
745 | raw.close = bad_close
|
---|
746 | b = self.tp(raw)
|
---|
747 | b.flush = bad_flush
|
---|
748 | with self.assertRaises(IOError) as err: # exception not swallowed
|
---|
749 | b.close()
|
---|
750 | self.assertEqual(err.exception.args, ('close',))
|
---|
751 | self.assertFalse(b.closed)
|
---|
752 |
|
---|
753 | def test_multi_close(self):
|
---|
754 | raw = self.MockRawIO()
|
---|
755 | b = self.tp(raw)
|
---|
756 | b.close()
|
---|
757 | b.close()
|
---|
758 | b.close()
|
---|
759 | self.assertRaises(ValueError, b.flush)
|
---|
760 |
|
---|
761 | def test_readonly_attributes(self):
|
---|
762 | raw = self.MockRawIO()
|
---|
763 | buf = self.tp(raw)
|
---|
764 | x = self.MockRawIO()
|
---|
765 | with self.assertRaises((AttributeError, TypeError)):
|
---|
766 | buf.raw = x
|
---|
767 |
|
---|
768 |
|
---|
769 | class SizeofTest:
|
---|
770 |
|
---|
771 | @support.cpython_only
|
---|
772 | def test_sizeof(self):
|
---|
773 | bufsize1 = 4096
|
---|
774 | bufsize2 = 8192
|
---|
775 | rawio = self.MockRawIO()
|
---|
776 | bufio = self.tp(rawio, buffer_size=bufsize1)
|
---|
777 | size = sys.getsizeof(bufio) - bufsize1
|
---|
778 | rawio = self.MockRawIO()
|
---|
779 | bufio = self.tp(rawio, buffer_size=bufsize2)
|
---|
780 | self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
|
---|
781 |
|
---|
782 |
|
---|
783 | class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
|
---|
784 | read_mode = "rb"
|
---|
785 |
|
---|
786 | def test_constructor(self):
|
---|
787 | rawio = self.MockRawIO([b"abc"])
|
---|
788 | bufio = self.tp(rawio)
|
---|
789 | bufio.__init__(rawio)
|
---|
790 | bufio.__init__(rawio, buffer_size=1024)
|
---|
791 | bufio.__init__(rawio, buffer_size=16)
|
---|
792 | self.assertEqual(b"abc", bufio.read())
|
---|
793 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
---|
794 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
---|
795 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
---|
796 | rawio = self.MockRawIO([b"abc"])
|
---|
797 | bufio.__init__(rawio)
|
---|
798 | self.assertEqual(b"abc", bufio.read())
|
---|
799 |
|
---|
800 | def test_read(self):
|
---|
801 | for arg in (None, 7):
|
---|
802 | rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
---|
803 | bufio = self.tp(rawio)
|
---|
804 | self.assertEqual(b"abcdefg", bufio.read(arg))
|
---|
805 | # Invalid args
|
---|
806 | self.assertRaises(ValueError, bufio.read, -2)
|
---|
807 |
|
---|
808 | def test_read1(self):
|
---|
809 | rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
---|
810 | bufio = self.tp(rawio)
|
---|
811 | self.assertEqual(b"a", bufio.read(1))
|
---|
812 | self.assertEqual(b"b", bufio.read1(1))
|
---|
813 | self.assertEqual(rawio._reads, 1)
|
---|
814 | self.assertEqual(b"c", bufio.read1(100))
|
---|
815 | self.assertEqual(rawio._reads, 1)
|
---|
816 | self.assertEqual(b"d", bufio.read1(100))
|
---|
817 | self.assertEqual(rawio._reads, 2)
|
---|
818 | self.assertEqual(b"efg", bufio.read1(100))
|
---|
819 | self.assertEqual(rawio._reads, 3)
|
---|
820 | self.assertEqual(b"", bufio.read1(100))
|
---|
821 | self.assertEqual(rawio._reads, 4)
|
---|
822 | # Invalid args
|
---|
823 | self.assertRaises(ValueError, bufio.read1, -1)
|
---|
824 |
|
---|
825 | def test_readinto(self):
|
---|
826 | rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
---|
827 | bufio = self.tp(rawio)
|
---|
828 | b = bytearray(2)
|
---|
829 | self.assertEqual(bufio.readinto(b), 2)
|
---|
830 | self.assertEqual(b, b"ab")
|
---|
831 | self.assertEqual(bufio.readinto(b), 2)
|
---|
832 | self.assertEqual(b, b"cd")
|
---|
833 | self.assertEqual(bufio.readinto(b), 2)
|
---|
834 | self.assertEqual(b, b"ef")
|
---|
835 | self.assertEqual(bufio.readinto(b), 1)
|
---|
836 | self.assertEqual(b, b"gf")
|
---|
837 | self.assertEqual(bufio.readinto(b), 0)
|
---|
838 | self.assertEqual(b, b"gf")
|
---|
839 |
|
---|
840 | def test_readlines(self):
|
---|
841 | def bufio():
|
---|
842 | rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
|
---|
843 | return self.tp(rawio)
|
---|
844 | self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
|
---|
845 | self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
|
---|
846 | self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
|
---|
847 |
|
---|
848 | def test_buffering(self):
|
---|
849 | data = b"abcdefghi"
|
---|
850 | dlen = len(data)
|
---|
851 |
|
---|
852 | tests = [
|
---|
853 | [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
|
---|
854 | [ 100, [ 3, 3, 3], [ dlen ] ],
|
---|
855 | [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
|
---|
856 | ]
|
---|
857 |
|
---|
858 | for bufsize, buf_read_sizes, raw_read_sizes in tests:
|
---|
859 | rawio = self.MockFileIO(data)
|
---|
860 | bufio = self.tp(rawio, buffer_size=bufsize)
|
---|
861 | pos = 0
|
---|
862 | for nbytes in buf_read_sizes:
|
---|
863 | self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
|
---|
864 | pos += nbytes
|
---|
865 | # this is mildly implementation-dependent
|
---|
866 | self.assertEqual(rawio.read_history, raw_read_sizes)
|
---|
867 |
|
---|
868 | def test_read_non_blocking(self):
|
---|
869 | # Inject some None's in there to simulate EWOULDBLOCK
|
---|
870 | rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
|
---|
871 | bufio = self.tp(rawio)
|
---|
872 | self.assertEqual(b"abcd", bufio.read(6))
|
---|
873 | self.assertEqual(b"e", bufio.read(1))
|
---|
874 | self.assertEqual(b"fg", bufio.read())
|
---|
875 | self.assertEqual(b"", bufio.peek(1))
|
---|
876 | self.assertIsNone(bufio.read())
|
---|
877 | self.assertEqual(b"", bufio.read())
|
---|
878 |
|
---|
879 | rawio = self.MockRawIO((b"a", None, None))
|
---|
880 | self.assertEqual(b"a", rawio.readall())
|
---|
881 | self.assertIsNone(rawio.readall())
|
---|
882 |
|
---|
883 | def test_read_past_eof(self):
|
---|
884 | rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
---|
885 | bufio = self.tp(rawio)
|
---|
886 |
|
---|
887 | self.assertEqual(b"abcdefg", bufio.read(9000))
|
---|
888 |
|
---|
889 | def test_read_all(self):
|
---|
890 | rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
---|
891 | bufio = self.tp(rawio)
|
---|
892 |
|
---|
893 | self.assertEqual(b"abcdefg", bufio.read())
|
---|
894 |
|
---|
895 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
896 | @support.requires_resource('cpu')
|
---|
897 | def test_threads(self):
|
---|
898 | try:
|
---|
899 | # Write out many bytes with exactly the same number of 0's,
|
---|
900 | # 1's... 255's. This will help us check that concurrent reading
|
---|
901 | # doesn't duplicate or forget contents.
|
---|
902 | N = 1000
|
---|
903 | l = list(range(256)) * N
|
---|
904 | random.shuffle(l)
|
---|
905 | s = bytes(bytearray(l))
|
---|
906 | with self.open(support.TESTFN, "wb") as f:
|
---|
907 | f.write(s)
|
---|
908 | with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
|
---|
909 | bufio = self.tp(raw, 8)
|
---|
910 | errors = []
|
---|
911 | results = []
|
---|
912 | def f():
|
---|
913 | try:
|
---|
914 | # Intra-buffer read then buffer-flushing read
|
---|
915 | for n in cycle([1, 19]):
|
---|
916 | s = bufio.read(n)
|
---|
917 | if not s:
|
---|
918 | break
|
---|
919 | # list.append() is atomic
|
---|
920 | results.append(s)
|
---|
921 | except Exception as e:
|
---|
922 | errors.append(e)
|
---|
923 | raise
|
---|
924 | threads = [threading.Thread(target=f) for x in range(20)]
|
---|
925 | for t in threads:
|
---|
926 | t.start()
|
---|
927 | time.sleep(0.02) # yield
|
---|
928 | for t in threads:
|
---|
929 | t.join()
|
---|
930 | self.assertFalse(errors,
|
---|
931 | "the following exceptions were caught: %r" % errors)
|
---|
932 | s = b''.join(results)
|
---|
933 | for i in range(256):
|
---|
934 | c = bytes(bytearray([i]))
|
---|
935 | self.assertEqual(s.count(c), N)
|
---|
936 | finally:
|
---|
937 | support.unlink(support.TESTFN)
|
---|
938 |
|
---|
939 | def test_misbehaved_io(self):
|
---|
940 | rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
|
---|
941 | bufio = self.tp(rawio)
|
---|
942 | self.assertRaises(IOError, bufio.seek, 0)
|
---|
943 | self.assertRaises(IOError, bufio.tell)
|
---|
944 |
|
---|
945 | def test_no_extraneous_read(self):
|
---|
946 | # Issue #9550; when the raw IO object has satisfied the read request,
|
---|
947 | # we should not issue any additional reads, otherwise it may block
|
---|
948 | # (e.g. socket).
|
---|
949 | bufsize = 16
|
---|
950 | for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
|
---|
951 | rawio = self.MockRawIO([b"x" * n])
|
---|
952 | bufio = self.tp(rawio, bufsize)
|
---|
953 | self.assertEqual(bufio.read(n), b"x" * n)
|
---|
954 | # Simple case: one raw read is enough to satisfy the request.
|
---|
955 | self.assertEqual(rawio._extraneous_reads, 0,
|
---|
956 | "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
|
---|
957 | # A more complex case where two raw reads are needed to satisfy
|
---|
958 | # the request.
|
---|
959 | rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
|
---|
960 | bufio = self.tp(rawio, bufsize)
|
---|
961 | self.assertEqual(bufio.read(n), b"x" * n)
|
---|
962 | self.assertEqual(rawio._extraneous_reads, 0,
|
---|
963 | "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
|
---|
964 |
|
---|
965 |
|
---|
966 | class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
|
---|
967 | tp = io.BufferedReader
|
---|
968 |
|
---|
969 | def test_constructor(self):
|
---|
970 | BufferedReaderTest.test_constructor(self)
|
---|
971 | # The allocation can succeed on 32-bit builds, e.g. with more
|
---|
972 | # than 2GB RAM and a 64-bit kernel.
|
---|
973 | if sys.maxsize > 0x7FFFFFFF:
|
---|
974 | rawio = self.MockRawIO()
|
---|
975 | bufio = self.tp(rawio)
|
---|
976 | self.assertRaises((OverflowError, MemoryError, ValueError),
|
---|
977 | bufio.__init__, rawio, sys.maxsize)
|
---|
978 |
|
---|
979 | def test_initialization(self):
|
---|
980 | rawio = self.MockRawIO([b"abc"])
|
---|
981 | bufio = self.tp(rawio)
|
---|
982 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
---|
983 | self.assertRaises(ValueError, bufio.read)
|
---|
984 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
---|
985 | self.assertRaises(ValueError, bufio.read)
|
---|
986 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
---|
987 | self.assertRaises(ValueError, bufio.read)
|
---|
988 |
|
---|
989 | def test_misbehaved_io_read(self):
|
---|
990 | rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
|
---|
991 | bufio = self.tp(rawio)
|
---|
992 | # _pyio.BufferedReader seems to implement reading different, so that
|
---|
993 | # checking this is not so easy.
|
---|
994 | self.assertRaises(IOError, bufio.read, 10)
|
---|
995 |
|
---|
996 | def test_garbage_collection(self):
|
---|
997 | # C BufferedReader objects are collected.
|
---|
998 | # The Python version has __del__, so it ends into gc.garbage instead
|
---|
999 | rawio = self.FileIO(support.TESTFN, "w+b")
|
---|
1000 | f = self.tp(rawio)
|
---|
1001 | f.f = f
|
---|
1002 | wr = weakref.ref(f)
|
---|
1003 | del f
|
---|
1004 | support.gc_collect()
|
---|
1005 | self.assertTrue(wr() is None, wr)
|
---|
1006 |
|
---|
1007 | def test_args_error(self):
|
---|
1008 | # Issue #17275
|
---|
1009 | with self.assertRaisesRegexp(TypeError, "BufferedReader"):
|
---|
1010 | self.tp(io.BytesIO(), 1024, 1024, 1024)
|
---|
1011 |
|
---|
1012 |
|
---|
1013 | class PyBufferedReaderTest(BufferedReaderTest):
|
---|
1014 | tp = pyio.BufferedReader
|
---|
1015 |
|
---|
1016 |
|
---|
1017 | class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
|
---|
1018 | write_mode = "wb"
|
---|
1019 |
|
---|
1020 | def test_constructor(self):
|
---|
1021 | rawio = self.MockRawIO()
|
---|
1022 | bufio = self.tp(rawio)
|
---|
1023 | bufio.__init__(rawio)
|
---|
1024 | bufio.__init__(rawio, buffer_size=1024)
|
---|
1025 | bufio.__init__(rawio, buffer_size=16)
|
---|
1026 | self.assertEqual(3, bufio.write(b"abc"))
|
---|
1027 | bufio.flush()
|
---|
1028 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
---|
1029 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
---|
1030 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
---|
1031 | bufio.__init__(rawio)
|
---|
1032 | self.assertEqual(3, bufio.write(b"ghi"))
|
---|
1033 | bufio.flush()
|
---|
1034 | self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
|
---|
1035 |
|
---|
1036 | def test_detach_flush(self):
|
---|
1037 | raw = self.MockRawIO()
|
---|
1038 | buf = self.tp(raw)
|
---|
1039 | buf.write(b"howdy!")
|
---|
1040 | self.assertFalse(raw._write_stack)
|
---|
1041 | buf.detach()
|
---|
1042 | self.assertEqual(raw._write_stack, [b"howdy!"])
|
---|
1043 |
|
---|
1044 | def test_write(self):
|
---|
1045 | # Write to the buffered IO but don't overflow the buffer.
|
---|
1046 | writer = self.MockRawIO()
|
---|
1047 | bufio = self.tp(writer, 8)
|
---|
1048 | bufio.write(b"abc")
|
---|
1049 | self.assertFalse(writer._write_stack)
|
---|
1050 |
|
---|
1051 | def test_write_overflow(self):
|
---|
1052 | writer = self.MockRawIO()
|
---|
1053 | bufio = self.tp(writer, 8)
|
---|
1054 | contents = b"abcdefghijklmnop"
|
---|
1055 | for n in range(0, len(contents), 3):
|
---|
1056 | bufio.write(contents[n:n+3])
|
---|
1057 | flushed = b"".join(writer._write_stack)
|
---|
1058 | # At least (total - 8) bytes were implicitly flushed, perhaps more
|
---|
1059 | # depending on the implementation.
|
---|
1060 | self.assertTrue(flushed.startswith(contents[:-8]), flushed)
|
---|
1061 |
|
---|
1062 | def check_writes(self, intermediate_func):
|
---|
1063 | # Lots of writes, test the flushed output is as expected.
|
---|
1064 | contents = bytes(range(256)) * 1000
|
---|
1065 | n = 0
|
---|
1066 | writer = self.MockRawIO()
|
---|
1067 | bufio = self.tp(writer, 13)
|
---|
1068 | # Generator of write sizes: repeat each N 15 times then proceed to N+1
|
---|
1069 | def gen_sizes():
|
---|
1070 | for size in count(1):
|
---|
1071 | for i in range(15):
|
---|
1072 | yield size
|
---|
1073 | sizes = gen_sizes()
|
---|
1074 | while n < len(contents):
|
---|
1075 | size = min(next(sizes), len(contents) - n)
|
---|
1076 | self.assertEqual(bufio.write(contents[n:n+size]), size)
|
---|
1077 | intermediate_func(bufio)
|
---|
1078 | n += size
|
---|
1079 | bufio.flush()
|
---|
1080 | self.assertEqual(contents,
|
---|
1081 | b"".join(writer._write_stack))
|
---|
1082 |
|
---|
1083 | def test_writes(self):
|
---|
1084 | self.check_writes(lambda bufio: None)
|
---|
1085 |
|
---|
1086 | def test_writes_and_flushes(self):
|
---|
1087 | self.check_writes(lambda bufio: bufio.flush())
|
---|
1088 |
|
---|
1089 | def test_writes_and_seeks(self):
|
---|
1090 | def _seekabs(bufio):
|
---|
1091 | pos = bufio.tell()
|
---|
1092 | bufio.seek(pos + 1, 0)
|
---|
1093 | bufio.seek(pos - 1, 0)
|
---|
1094 | bufio.seek(pos, 0)
|
---|
1095 | self.check_writes(_seekabs)
|
---|
1096 | def _seekrel(bufio):
|
---|
1097 | pos = bufio.seek(0, 1)
|
---|
1098 | bufio.seek(+1, 1)
|
---|
1099 | bufio.seek(-1, 1)
|
---|
1100 | bufio.seek(pos, 0)
|
---|
1101 | self.check_writes(_seekrel)
|
---|
1102 |
|
---|
1103 | def test_writes_and_truncates(self):
|
---|
1104 | self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
|
---|
1105 |
|
---|
1106 | def test_write_non_blocking(self):
|
---|
1107 | raw = self.MockNonBlockWriterIO()
|
---|
1108 | bufio = self.tp(raw, 8)
|
---|
1109 |
|
---|
1110 | self.assertEqual(bufio.write(b"abcd"), 4)
|
---|
1111 | self.assertEqual(bufio.write(b"efghi"), 5)
|
---|
1112 | # 1 byte will be written, the rest will be buffered
|
---|
1113 | raw.block_on(b"k")
|
---|
1114 | self.assertEqual(bufio.write(b"jklmn"), 5)
|
---|
1115 |
|
---|
1116 | # 8 bytes will be written, 8 will be buffered and the rest will be lost
|
---|
1117 | raw.block_on(b"0")
|
---|
1118 | try:
|
---|
1119 | bufio.write(b"opqrwxyz0123456789")
|
---|
1120 | except self.BlockingIOError as e:
|
---|
1121 | written = e.characters_written
|
---|
1122 | else:
|
---|
1123 | self.fail("BlockingIOError should have been raised")
|
---|
1124 | self.assertEqual(written, 16)
|
---|
1125 | self.assertEqual(raw.pop_written(),
|
---|
1126 | b"abcdefghijklmnopqrwxyz")
|
---|
1127 |
|
---|
1128 | self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
|
---|
1129 | s = raw.pop_written()
|
---|
1130 | # Previously buffered bytes were flushed
|
---|
1131 | self.assertTrue(s.startswith(b"01234567A"), s)
|
---|
1132 |
|
---|
1133 | def test_write_and_rewind(self):
|
---|
1134 | raw = io.BytesIO()
|
---|
1135 | bufio = self.tp(raw, 4)
|
---|
1136 | self.assertEqual(bufio.write(b"abcdef"), 6)
|
---|
1137 | self.assertEqual(bufio.tell(), 6)
|
---|
1138 | bufio.seek(0, 0)
|
---|
1139 | self.assertEqual(bufio.write(b"XY"), 2)
|
---|
1140 | bufio.seek(6, 0)
|
---|
1141 | self.assertEqual(raw.getvalue(), b"XYcdef")
|
---|
1142 | self.assertEqual(bufio.write(b"123456"), 6)
|
---|
1143 | bufio.flush()
|
---|
1144 | self.assertEqual(raw.getvalue(), b"XYcdef123456")
|
---|
1145 |
|
---|
1146 | def test_flush(self):
|
---|
1147 | writer = self.MockRawIO()
|
---|
1148 | bufio = self.tp(writer, 8)
|
---|
1149 | bufio.write(b"abc")
|
---|
1150 | bufio.flush()
|
---|
1151 | self.assertEqual(b"abc", writer._write_stack[0])
|
---|
1152 |
|
---|
1153 | def test_writelines(self):
|
---|
1154 | l = [b'ab', b'cd', b'ef']
|
---|
1155 | writer = self.MockRawIO()
|
---|
1156 | bufio = self.tp(writer, 8)
|
---|
1157 | bufio.writelines(l)
|
---|
1158 | bufio.flush()
|
---|
1159 | self.assertEqual(b''.join(writer._write_stack), b'abcdef')
|
---|
1160 |
|
---|
1161 | def test_writelines_userlist(self):
|
---|
1162 | l = UserList([b'ab', b'cd', b'ef'])
|
---|
1163 | writer = self.MockRawIO()
|
---|
1164 | bufio = self.tp(writer, 8)
|
---|
1165 | bufio.writelines(l)
|
---|
1166 | bufio.flush()
|
---|
1167 | self.assertEqual(b''.join(writer._write_stack), b'abcdef')
|
---|
1168 |
|
---|
1169 | def test_writelines_error(self):
|
---|
1170 | writer = self.MockRawIO()
|
---|
1171 | bufio = self.tp(writer, 8)
|
---|
1172 | self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
|
---|
1173 | self.assertRaises(TypeError, bufio.writelines, None)
|
---|
1174 |
|
---|
1175 | def test_destructor(self):
|
---|
1176 | writer = self.MockRawIO()
|
---|
1177 | bufio = self.tp(writer, 8)
|
---|
1178 | bufio.write(b"abc")
|
---|
1179 | del bufio
|
---|
1180 | support.gc_collect()
|
---|
1181 | self.assertEqual(b"abc", writer._write_stack[0])
|
---|
1182 |
|
---|
1183 | def test_truncate(self):
|
---|
1184 | # Truncate implicitly flushes the buffer.
|
---|
1185 | with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
|
---|
1186 | bufio = self.tp(raw, 8)
|
---|
1187 | bufio.write(b"abcdef")
|
---|
1188 | self.assertEqual(bufio.truncate(3), 3)
|
---|
1189 | self.assertEqual(bufio.tell(), 6)
|
---|
1190 | with self.open(support.TESTFN, "rb", buffering=0) as f:
|
---|
1191 | self.assertEqual(f.read(), b"abc")
|
---|
1192 |
|
---|
1193 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
1194 | @support.requires_resource('cpu')
|
---|
1195 | def test_threads(self):
|
---|
1196 | try:
|
---|
1197 | # Write out many bytes from many threads and test they were
|
---|
1198 | # all flushed.
|
---|
1199 | N = 1000
|
---|
1200 | contents = bytes(range(256)) * N
|
---|
1201 | sizes = cycle([1, 19])
|
---|
1202 | n = 0
|
---|
1203 | queue = deque()
|
---|
1204 | while n < len(contents):
|
---|
1205 | size = next(sizes)
|
---|
1206 | queue.append(contents[n:n+size])
|
---|
1207 | n += size
|
---|
1208 | del contents
|
---|
1209 | # We use a real file object because it allows us to
|
---|
1210 | # exercise situations where the GIL is released before
|
---|
1211 | # writing the buffer to the raw streams. This is in addition
|
---|
1212 | # to concurrency issues due to switching threads in the middle
|
---|
1213 | # of Python code.
|
---|
1214 | with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
|
---|
1215 | bufio = self.tp(raw, 8)
|
---|
1216 | errors = []
|
---|
1217 | def f():
|
---|
1218 | try:
|
---|
1219 | while True:
|
---|
1220 | try:
|
---|
1221 | s = queue.popleft()
|
---|
1222 | except IndexError:
|
---|
1223 | return
|
---|
1224 | bufio.write(s)
|
---|
1225 | except Exception as e:
|
---|
1226 | errors.append(e)
|
---|
1227 | raise
|
---|
1228 | threads = [threading.Thread(target=f) for x in range(20)]
|
---|
1229 | for t in threads:
|
---|
1230 | t.start()
|
---|
1231 | time.sleep(0.02) # yield
|
---|
1232 | for t in threads:
|
---|
1233 | t.join()
|
---|
1234 | self.assertFalse(errors,
|
---|
1235 | "the following exceptions were caught: %r" % errors)
|
---|
1236 | bufio.close()
|
---|
1237 | with self.open(support.TESTFN, "rb") as f:
|
---|
1238 | s = f.read()
|
---|
1239 | for i in range(256):
|
---|
1240 | self.assertEqual(s.count(bytes([i])), N)
|
---|
1241 | finally:
|
---|
1242 | support.unlink(support.TESTFN)
|
---|
1243 |
|
---|
1244 | def test_misbehaved_io(self):
|
---|
1245 | rawio = self.MisbehavedRawIO()
|
---|
1246 | bufio = self.tp(rawio, 5)
|
---|
1247 | self.assertRaises(IOError, bufio.seek, 0)
|
---|
1248 | self.assertRaises(IOError, bufio.tell)
|
---|
1249 | self.assertRaises(IOError, bufio.write, b"abcdef")
|
---|
1250 |
|
---|
1251 | def test_max_buffer_size_deprecation(self):
|
---|
1252 | with support.check_warnings(("max_buffer_size is deprecated",
|
---|
1253 | DeprecationWarning)):
|
---|
1254 | self.tp(self.MockRawIO(), 8, 12)
|
---|
1255 |
|
---|
1256 | def test_write_error_on_close(self):
|
---|
1257 | raw = self.MockRawIO()
|
---|
1258 | def bad_write(b):
|
---|
1259 | raise IOError()
|
---|
1260 | raw.write = bad_write
|
---|
1261 | b = self.tp(raw)
|
---|
1262 | b.write(b'spam')
|
---|
1263 | self.assertRaises(IOError, b.close) # exception not swallowed
|
---|
1264 | self.assertTrue(b.closed)
|
---|
1265 |
|
---|
1266 |
|
---|
1267 | class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
|
---|
1268 | tp = io.BufferedWriter
|
---|
1269 |
|
---|
1270 | def test_constructor(self):
|
---|
1271 | BufferedWriterTest.test_constructor(self)
|
---|
1272 | # The allocation can succeed on 32-bit builds, e.g. with more
|
---|
1273 | # than 2GB RAM and a 64-bit kernel.
|
---|
1274 | if sys.maxsize > 0x7FFFFFFF:
|
---|
1275 | rawio = self.MockRawIO()
|
---|
1276 | bufio = self.tp(rawio)
|
---|
1277 | self.assertRaises((OverflowError, MemoryError, ValueError),
|
---|
1278 | bufio.__init__, rawio, sys.maxsize)
|
---|
1279 |
|
---|
1280 | def test_initialization(self):
|
---|
1281 | rawio = self.MockRawIO()
|
---|
1282 | bufio = self.tp(rawio)
|
---|
1283 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
---|
1284 | self.assertRaises(ValueError, bufio.write, b"def")
|
---|
1285 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
---|
1286 | self.assertRaises(ValueError, bufio.write, b"def")
|
---|
1287 | self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
---|
1288 | self.assertRaises(ValueError, bufio.write, b"def")
|
---|
1289 |
|
---|
1290 | def test_garbage_collection(self):
|
---|
1291 | # C BufferedWriter objects are collected, and collecting them flushes
|
---|
1292 | # all data to disk.
|
---|
1293 | # The Python version has __del__, so it ends into gc.garbage instead
|
---|
1294 | rawio = self.FileIO(support.TESTFN, "w+b")
|
---|
1295 | f = self.tp(rawio)
|
---|
1296 | f.write(b"123xxx")
|
---|
1297 | f.x = f
|
---|
1298 | wr = weakref.ref(f)
|
---|
1299 | del f
|
---|
1300 | support.gc_collect()
|
---|
1301 | self.assertTrue(wr() is None, wr)
|
---|
1302 | with self.open(support.TESTFN, "rb") as f:
|
---|
1303 | self.assertEqual(f.read(), b"123xxx")
|
---|
1304 |
|
---|
1305 | def test_args_error(self):
|
---|
1306 | # Issue #17275
|
---|
1307 | with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
|
---|
1308 | self.tp(io.BytesIO(), 1024, 1024, 1024)
|
---|
1309 |
|
---|
1310 |
|
---|
1311 | class PyBufferedWriterTest(BufferedWriterTest):
|
---|
1312 | tp = pyio.BufferedWriter
|
---|
1313 |
|
---|
1314 | class BufferedRWPairTest(unittest.TestCase):
|
---|
1315 |
|
---|
1316 | def test_constructor(self):
|
---|
1317 | pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
---|
1318 | self.assertFalse(pair.closed)
|
---|
1319 |
|
---|
1320 | def test_detach(self):
|
---|
1321 | pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
---|
1322 | self.assertRaises(self.UnsupportedOperation, pair.detach)
|
---|
1323 |
|
---|
1324 | def test_constructor_max_buffer_size_deprecation(self):
|
---|
1325 | with support.check_warnings(("max_buffer_size is deprecated",
|
---|
1326 | DeprecationWarning)):
|
---|
1327 | self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
|
---|
1328 |
|
---|
1329 | def test_constructor_with_not_readable(self):
|
---|
1330 | class NotReadable(MockRawIO):
|
---|
1331 | def readable(self):
|
---|
1332 | return False
|
---|
1333 |
|
---|
1334 | self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
|
---|
1335 |
|
---|
1336 | def test_constructor_with_not_writeable(self):
|
---|
1337 | class NotWriteable(MockRawIO):
|
---|
1338 | def writable(self):
|
---|
1339 | return False
|
---|
1340 |
|
---|
1341 | self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
|
---|
1342 |
|
---|
1343 | def test_read(self):
|
---|
1344 | pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
---|
1345 |
|
---|
1346 | self.assertEqual(pair.read(3), b"abc")
|
---|
1347 | self.assertEqual(pair.read(1), b"d")
|
---|
1348 | self.assertEqual(pair.read(), b"ef")
|
---|
1349 | pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
|
---|
1350 | self.assertEqual(pair.read(None), b"abc")
|
---|
1351 |
|
---|
1352 | def test_readlines(self):
|
---|
1353 | pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
|
---|
1354 | self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
|
---|
1355 | self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
|
---|
1356 | self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
|
---|
1357 |
|
---|
1358 | def test_read1(self):
|
---|
1359 | # .read1() is delegated to the underlying reader object, so this test
|
---|
1360 | # can be shallow.
|
---|
1361 | pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
---|
1362 |
|
---|
1363 | self.assertEqual(pair.read1(3), b"abc")
|
---|
1364 |
|
---|
1365 | def test_readinto(self):
|
---|
1366 | pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
---|
1367 |
|
---|
1368 | data = bytearray(5)
|
---|
1369 | self.assertEqual(pair.readinto(data), 5)
|
---|
1370 | self.assertEqual(data, b"abcde")
|
---|
1371 |
|
---|
1372 | def test_write(self):
|
---|
1373 | w = self.MockRawIO()
|
---|
1374 | pair = self.tp(self.MockRawIO(), w)
|
---|
1375 |
|
---|
1376 | pair.write(b"abc")
|
---|
1377 | pair.flush()
|
---|
1378 | pair.write(b"def")
|
---|
1379 | pair.flush()
|
---|
1380 | self.assertEqual(w._write_stack, [b"abc", b"def"])
|
---|
1381 |
|
---|
1382 | def test_peek(self):
|
---|
1383 | pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
---|
1384 |
|
---|
1385 | self.assertTrue(pair.peek(3).startswith(b"abc"))
|
---|
1386 | self.assertEqual(pair.read(3), b"abc")
|
---|
1387 |
|
---|
1388 | def test_readable(self):
|
---|
1389 | pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
---|
1390 | self.assertTrue(pair.readable())
|
---|
1391 |
|
---|
1392 | def test_writeable(self):
|
---|
1393 | pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
---|
1394 | self.assertTrue(pair.writable())
|
---|
1395 |
|
---|
1396 | def test_seekable(self):
|
---|
1397 | # BufferedRWPairs are never seekable, even if their readers and writers
|
---|
1398 | # are.
|
---|
1399 | pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
---|
1400 | self.assertFalse(pair.seekable())
|
---|
1401 |
|
---|
1402 | # .flush() is delegated to the underlying writer object and has been
|
---|
1403 | # tested in the test_write method.
|
---|
1404 |
|
---|
1405 | def test_close_and_closed(self):
|
---|
1406 | pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
---|
1407 | self.assertFalse(pair.closed)
|
---|
1408 | pair.close()
|
---|
1409 | self.assertTrue(pair.closed)
|
---|
1410 |
|
---|
1411 | def test_isatty(self):
|
---|
1412 | class SelectableIsAtty(MockRawIO):
|
---|
1413 | def __init__(self, isatty):
|
---|
1414 | MockRawIO.__init__(self)
|
---|
1415 | self._isatty = isatty
|
---|
1416 |
|
---|
1417 | def isatty(self):
|
---|
1418 | return self._isatty
|
---|
1419 |
|
---|
1420 | pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
|
---|
1421 | self.assertFalse(pair.isatty())
|
---|
1422 |
|
---|
1423 | pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
|
---|
1424 | self.assertTrue(pair.isatty())
|
---|
1425 |
|
---|
1426 | pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
|
---|
1427 | self.assertTrue(pair.isatty())
|
---|
1428 |
|
---|
1429 | pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
|
---|
1430 | self.assertTrue(pair.isatty())
|
---|
1431 |
|
---|
1432 | class CBufferedRWPairTest(BufferedRWPairTest):
|
---|
1433 | tp = io.BufferedRWPair
|
---|
1434 |
|
---|
1435 | class PyBufferedRWPairTest(BufferedRWPairTest):
|
---|
1436 | tp = pyio.BufferedRWPair
|
---|
1437 |
|
---|
1438 |
|
---|
1439 | class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
|
---|
1440 | read_mode = "rb+"
|
---|
1441 | write_mode = "wb+"
|
---|
1442 |
|
---|
1443 | def test_constructor(self):
|
---|
1444 | BufferedReaderTest.test_constructor(self)
|
---|
1445 | BufferedWriterTest.test_constructor(self)
|
---|
1446 |
|
---|
1447 | def test_read_and_write(self):
|
---|
1448 | raw = self.MockRawIO((b"asdf", b"ghjk"))
|
---|
1449 | rw = self.tp(raw, 8)
|
---|
1450 |
|
---|
1451 | self.assertEqual(b"as", rw.read(2))
|
---|
1452 | rw.write(b"ddd")
|
---|
1453 | rw.write(b"eee")
|
---|
1454 | self.assertFalse(raw._write_stack) # Buffer writes
|
---|
1455 | self.assertEqual(b"ghjk", rw.read())
|
---|
1456 | self.assertEqual(b"dddeee", raw._write_stack[0])
|
---|
1457 |
|
---|
1458 | def test_seek_and_tell(self):
|
---|
1459 | raw = self.BytesIO(b"asdfghjkl")
|
---|
1460 | rw = self.tp(raw)
|
---|
1461 |
|
---|
1462 | self.assertEqual(b"as", rw.read(2))
|
---|
1463 | self.assertEqual(2, rw.tell())
|
---|
1464 | rw.seek(0, 0)
|
---|
1465 | self.assertEqual(b"asdf", rw.read(4))
|
---|
1466 |
|
---|
1467 | rw.write(b"123f")
|
---|
1468 | rw.seek(0, 0)
|
---|
1469 | self.assertEqual(b"asdf123fl", rw.read())
|
---|
1470 | self.assertEqual(9, rw.tell())
|
---|
1471 | rw.seek(-4, 2)
|
---|
1472 | self.assertEqual(5, rw.tell())
|
---|
1473 | rw.seek(2, 1)
|
---|
1474 | self.assertEqual(7, rw.tell())
|
---|
1475 | self.assertEqual(b"fl", rw.read(11))
|
---|
1476 | rw.flush()
|
---|
1477 | self.assertEqual(b"asdf123fl", raw.getvalue())
|
---|
1478 |
|
---|
1479 | self.assertRaises(TypeError, rw.seek, 0.0)
|
---|
1480 |
|
---|
1481 | def check_flush_and_read(self, read_func):
|
---|
1482 | raw = self.BytesIO(b"abcdefghi")
|
---|
1483 | bufio = self.tp(raw)
|
---|
1484 |
|
---|
1485 | self.assertEqual(b"ab", read_func(bufio, 2))
|
---|
1486 | bufio.write(b"12")
|
---|
1487 | self.assertEqual(b"ef", read_func(bufio, 2))
|
---|
1488 | self.assertEqual(6, bufio.tell())
|
---|
1489 | bufio.flush()
|
---|
1490 | self.assertEqual(6, bufio.tell())
|
---|
1491 | self.assertEqual(b"ghi", read_func(bufio))
|
---|
1492 | raw.seek(0, 0)
|
---|
1493 | raw.write(b"XYZ")
|
---|
1494 | # flush() resets the read buffer
|
---|
1495 | bufio.flush()
|
---|
1496 | bufio.seek(0, 0)
|
---|
1497 | self.assertEqual(b"XYZ", read_func(bufio, 3))
|
---|
1498 |
|
---|
1499 | def test_flush_and_read(self):
|
---|
1500 | self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
|
---|
1501 |
|
---|
1502 | def test_flush_and_readinto(self):
|
---|
1503 | def _readinto(bufio, n=-1):
|
---|
1504 | b = bytearray(n if n >= 0 else 9999)
|
---|
1505 | n = bufio.readinto(b)
|
---|
1506 | return bytes(b[:n])
|
---|
1507 | self.check_flush_and_read(_readinto)
|
---|
1508 |
|
---|
1509 | def test_flush_and_peek(self):
|
---|
1510 | def _peek(bufio, n=-1):
|
---|
1511 | # This relies on the fact that the buffer can contain the whole
|
---|
1512 | # raw stream, otherwise peek() can return less.
|
---|
1513 | b = bufio.peek(n)
|
---|
1514 | if n != -1:
|
---|
1515 | b = b[:n]
|
---|
1516 | bufio.seek(len(b), 1)
|
---|
1517 | return b
|
---|
1518 | self.check_flush_and_read(_peek)
|
---|
1519 |
|
---|
1520 | def test_flush_and_write(self):
|
---|
1521 | raw = self.BytesIO(b"abcdefghi")
|
---|
1522 | bufio = self.tp(raw)
|
---|
1523 |
|
---|
1524 | bufio.write(b"123")
|
---|
1525 | bufio.flush()
|
---|
1526 | bufio.write(b"45")
|
---|
1527 | bufio.flush()
|
---|
1528 | bufio.seek(0, 0)
|
---|
1529 | self.assertEqual(b"12345fghi", raw.getvalue())
|
---|
1530 | self.assertEqual(b"12345fghi", bufio.read())
|
---|
1531 |
|
---|
1532 | def test_threads(self):
|
---|
1533 | BufferedReaderTest.test_threads(self)
|
---|
1534 | BufferedWriterTest.test_threads(self)
|
---|
1535 |
|
---|
1536 | def test_writes_and_peek(self):
|
---|
1537 | def _peek(bufio):
|
---|
1538 | bufio.peek(1)
|
---|
1539 | self.check_writes(_peek)
|
---|
1540 | def _peek(bufio):
|
---|
1541 | pos = bufio.tell()
|
---|
1542 | bufio.seek(-1, 1)
|
---|
1543 | bufio.peek(1)
|
---|
1544 | bufio.seek(pos, 0)
|
---|
1545 | self.check_writes(_peek)
|
---|
1546 |
|
---|
1547 | def test_writes_and_reads(self):
|
---|
1548 | def _read(bufio):
|
---|
1549 | bufio.seek(-1, 1)
|
---|
1550 | bufio.read(1)
|
---|
1551 | self.check_writes(_read)
|
---|
1552 |
|
---|
1553 | def test_writes_and_read1s(self):
|
---|
1554 | def _read1(bufio):
|
---|
1555 | bufio.seek(-1, 1)
|
---|
1556 | bufio.read1(1)
|
---|
1557 | self.check_writes(_read1)
|
---|
1558 |
|
---|
1559 | def test_writes_and_readintos(self):
|
---|
1560 | def _read(bufio):
|
---|
1561 | bufio.seek(-1, 1)
|
---|
1562 | bufio.readinto(bytearray(1))
|
---|
1563 | self.check_writes(_read)
|
---|
1564 |
|
---|
1565 | def test_write_after_readahead(self):
|
---|
1566 | # Issue #6629: writing after the buffer was filled by readahead should
|
---|
1567 | # first rewind the raw stream.
|
---|
1568 | for overwrite_size in [1, 5]:
|
---|
1569 | raw = self.BytesIO(b"A" * 10)
|
---|
1570 | bufio = self.tp(raw, 4)
|
---|
1571 | # Trigger readahead
|
---|
1572 | self.assertEqual(bufio.read(1), b"A")
|
---|
1573 | self.assertEqual(bufio.tell(), 1)
|
---|
1574 | # Overwriting should rewind the raw stream if it needs so
|
---|
1575 | bufio.write(b"B" * overwrite_size)
|
---|
1576 | self.assertEqual(bufio.tell(), overwrite_size + 1)
|
---|
1577 | # If the write size was smaller than the buffer size, flush() and
|
---|
1578 | # check that rewind happens.
|
---|
1579 | bufio.flush()
|
---|
1580 | self.assertEqual(bufio.tell(), overwrite_size + 1)
|
---|
1581 | s = raw.getvalue()
|
---|
1582 | self.assertEqual(s,
|
---|
1583 | b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
|
---|
1584 |
|
---|
1585 | def test_write_rewind_write(self):
|
---|
1586 | # Various combinations of reading / writing / seeking backwards / writing again
|
---|
1587 | def mutate(bufio, pos1, pos2):
|
---|
1588 | assert pos2 >= pos1
|
---|
1589 | # Fill the buffer
|
---|
1590 | bufio.seek(pos1)
|
---|
1591 | bufio.read(pos2 - pos1)
|
---|
1592 | bufio.write(b'\x02')
|
---|
1593 | # This writes earlier than the previous write, but still inside
|
---|
1594 | # the buffer.
|
---|
1595 | bufio.seek(pos1)
|
---|
1596 | bufio.write(b'\x01')
|
---|
1597 |
|
---|
1598 | b = b"\x80\x81\x82\x83\x84"
|
---|
1599 | for i in range(0, len(b)):
|
---|
1600 | for j in range(i, len(b)):
|
---|
1601 | raw = self.BytesIO(b)
|
---|
1602 | bufio = self.tp(raw, 100)
|
---|
1603 | mutate(bufio, i, j)
|
---|
1604 | bufio.flush()
|
---|
1605 | expected = bytearray(b)
|
---|
1606 | expected[j] = 2
|
---|
1607 | expected[i] = 1
|
---|
1608 | self.assertEqual(raw.getvalue(), expected,
|
---|
1609 | "failed result for i=%d, j=%d" % (i, j))
|
---|
1610 |
|
---|
1611 | def test_truncate_after_read_or_write(self):
|
---|
1612 | raw = self.BytesIO(b"A" * 10)
|
---|
1613 | bufio = self.tp(raw, 100)
|
---|
1614 | self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
|
---|
1615 | self.assertEqual(bufio.truncate(), 2)
|
---|
1616 | self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
|
---|
1617 | self.assertEqual(bufio.truncate(), 4)
|
---|
1618 |
|
---|
1619 | def test_misbehaved_io(self):
|
---|
1620 | BufferedReaderTest.test_misbehaved_io(self)
|
---|
1621 | BufferedWriterTest.test_misbehaved_io(self)
|
---|
1622 |
|
---|
1623 | def test_interleaved_read_write(self):
|
---|
1624 | # Test for issue #12213
|
---|
1625 | with self.BytesIO(b'abcdefgh') as raw:
|
---|
1626 | with self.tp(raw, 100) as f:
|
---|
1627 | f.write(b"1")
|
---|
1628 | self.assertEqual(f.read(1), b'b')
|
---|
1629 | f.write(b'2')
|
---|
1630 | self.assertEqual(f.read1(1), b'd')
|
---|
1631 | f.write(b'3')
|
---|
1632 | buf = bytearray(1)
|
---|
1633 | f.readinto(buf)
|
---|
1634 | self.assertEqual(buf, b'f')
|
---|
1635 | f.write(b'4')
|
---|
1636 | self.assertEqual(f.peek(1), b'h')
|
---|
1637 | f.flush()
|
---|
1638 | self.assertEqual(raw.getvalue(), b'1b2d3f4h')
|
---|
1639 |
|
---|
1640 | with self.BytesIO(b'abc') as raw:
|
---|
1641 | with self.tp(raw, 100) as f:
|
---|
1642 | self.assertEqual(f.read(1), b'a')
|
---|
1643 | f.write(b"2")
|
---|
1644 | self.assertEqual(f.read(1), b'c')
|
---|
1645 | f.flush()
|
---|
1646 | self.assertEqual(raw.getvalue(), b'a2c')
|
---|
1647 |
|
---|
1648 | def test_interleaved_readline_write(self):
|
---|
1649 | with self.BytesIO(b'ab\ncdef\ng\n') as raw:
|
---|
1650 | with self.tp(raw) as f:
|
---|
1651 | f.write(b'1')
|
---|
1652 | self.assertEqual(f.readline(), b'b\n')
|
---|
1653 | f.write(b'2')
|
---|
1654 | self.assertEqual(f.readline(), b'def\n')
|
---|
1655 | f.write(b'3')
|
---|
1656 | self.assertEqual(f.readline(), b'\n')
|
---|
1657 | f.flush()
|
---|
1658 | self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
|
---|
1659 |
|
---|
1660 |
|
---|
1661 | class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
|
---|
1662 | BufferedRandomTest, SizeofTest):
|
---|
1663 | tp = io.BufferedRandom
|
---|
1664 |
|
---|
1665 | def test_constructor(self):
|
---|
1666 | BufferedRandomTest.test_constructor(self)
|
---|
1667 | # The allocation can succeed on 32-bit builds, e.g. with more
|
---|
1668 | # than 2GB RAM and a 64-bit kernel.
|
---|
1669 | if sys.maxsize > 0x7FFFFFFF:
|
---|
1670 | rawio = self.MockRawIO()
|
---|
1671 | bufio = self.tp(rawio)
|
---|
1672 | self.assertRaises((OverflowError, MemoryError, ValueError),
|
---|
1673 | bufio.__init__, rawio, sys.maxsize)
|
---|
1674 |
|
---|
1675 | def test_garbage_collection(self):
|
---|
1676 | CBufferedReaderTest.test_garbage_collection(self)
|
---|
1677 | CBufferedWriterTest.test_garbage_collection(self)
|
---|
1678 |
|
---|
1679 | def test_args_error(self):
|
---|
1680 | # Issue #17275
|
---|
1681 | with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
|
---|
1682 | self.tp(io.BytesIO(), 1024, 1024, 1024)
|
---|
1683 |
|
---|
1684 |
|
---|
1685 | class PyBufferedRandomTest(BufferedRandomTest):
|
---|
1686 | tp = pyio.BufferedRandom
|
---|
1687 |
|
---|
1688 |
|
---|
1689 | # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
|
---|
1690 | # properties:
|
---|
1691 | # - A single output character can correspond to many bytes of input.
|
---|
1692 | # - The number of input bytes to complete the character can be
|
---|
1693 | # undetermined until the last input byte is received.
|
---|
1694 | # - The number of input bytes can vary depending on previous input.
|
---|
1695 | # - A single input byte can correspond to many characters of output.
|
---|
1696 | # - The number of output characters can be undetermined until the
|
---|
1697 | # last input byte is received.
|
---|
1698 | # - The number of output characters can vary depending on previous input.
|
---|
1699 |
|
---|
1700 | class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
|
---|
1701 | """
|
---|
1702 | For testing seek/tell behavior with a stateful, buffering decoder.
|
---|
1703 |
|
---|
1704 | Input is a sequence of words. Words may be fixed-length (length set
|
---|
1705 | by input) or variable-length (period-terminated). In variable-length
|
---|
1706 | mode, extra periods are ignored. Possible words are:
|
---|
1707 | - 'i' followed by a number sets the input length, I (maximum 99).
|
---|
1708 | When I is set to 0, words are space-terminated.
|
---|
1709 | - 'o' followed by a number sets the output length, O (maximum 99).
|
---|
1710 | - Any other word is converted into a word followed by a period on
|
---|
1711 | the output. The output word consists of the input word truncated
|
---|
1712 | or padded out with hyphens to make its length equal to O. If O
|
---|
1713 | is 0, the word is output verbatim without truncating or padding.
|
---|
1714 | I and O are initially set to 1. When I changes, any buffered input is
|
---|
1715 | re-scanned according to the new I. EOF also terminates the last word.
|
---|
1716 | """
|
---|
1717 |
|
---|
1718 | def __init__(self, errors='strict'):
|
---|
1719 | codecs.IncrementalDecoder.__init__(self, errors)
|
---|
1720 | self.reset()
|
---|
1721 |
|
---|
1722 | def __repr__(self):
|
---|
1723 | return '<SID %x>' % id(self)
|
---|
1724 |
|
---|
1725 | def reset(self):
|
---|
1726 | self.i = 1
|
---|
1727 | self.o = 1
|
---|
1728 | self.buffer = bytearray()
|
---|
1729 |
|
---|
1730 | def getstate(self):
|
---|
1731 | i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
|
---|
1732 | return bytes(self.buffer), i*100 + o
|
---|
1733 |
|
---|
1734 | def setstate(self, state):
|
---|
1735 | buffer, io = state
|
---|
1736 | self.buffer = bytearray(buffer)
|
---|
1737 | i, o = divmod(io, 100)
|
---|
1738 | self.i, self.o = i ^ 1, o ^ 1
|
---|
1739 |
|
---|
1740 | def decode(self, input, final=False):
|
---|
1741 | output = ''
|
---|
1742 | for b in input:
|
---|
1743 | if self.i == 0: # variable-length, terminated with period
|
---|
1744 | if b == '.':
|
---|
1745 | if self.buffer:
|
---|
1746 | output += self.process_word()
|
---|
1747 | else:
|
---|
1748 | self.buffer.append(b)
|
---|
1749 | else: # fixed-length, terminate after self.i bytes
|
---|
1750 | self.buffer.append(b)
|
---|
1751 | if len(self.buffer) == self.i:
|
---|
1752 | output += self.process_word()
|
---|
1753 | if final and self.buffer: # EOF terminates the last word
|
---|
1754 | output += self.process_word()
|
---|
1755 | return output
|
---|
1756 |
|
---|
1757 | def process_word(self):
|
---|
1758 | output = ''
|
---|
1759 | if self.buffer[0] == ord('i'):
|
---|
1760 | self.i = min(99, int(self.buffer[1:] or 0)) # set input length
|
---|
1761 | elif self.buffer[0] == ord('o'):
|
---|
1762 | self.o = min(99, int(self.buffer[1:] or 0)) # set output length
|
---|
1763 | else:
|
---|
1764 | output = self.buffer.decode('ascii')
|
---|
1765 | if len(output) < self.o:
|
---|
1766 | output += '-'*self.o # pad out with hyphens
|
---|
1767 | if self.o:
|
---|
1768 | output = output[:self.o] # truncate to output length
|
---|
1769 | output += '.'
|
---|
1770 | self.buffer = bytearray()
|
---|
1771 | return output
|
---|
1772 |
|
---|
1773 | codecEnabled = False
|
---|
1774 |
|
---|
1775 | @classmethod
|
---|
1776 | def lookupTestDecoder(cls, name):
|
---|
1777 | if cls.codecEnabled and name == 'test_decoder':
|
---|
1778 | latin1 = codecs.lookup('latin-1')
|
---|
1779 | return codecs.CodecInfo(
|
---|
1780 | name='test_decoder', encode=latin1.encode, decode=None,
|
---|
1781 | incrementalencoder=None,
|
---|
1782 | streamreader=None, streamwriter=None,
|
---|
1783 | incrementaldecoder=cls)
|
---|
1784 |
|
---|
1785 | # Register the previous decoder for testing.
|
---|
1786 | # Disabled by default, tests will enable it.
|
---|
1787 | codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
|
---|
1788 |
|
---|
1789 |
|
---|
1790 | class StatefulIncrementalDecoderTest(unittest.TestCase):
|
---|
1791 | """
|
---|
1792 | Make sure the StatefulIncrementalDecoder actually works.
|
---|
1793 | """
|
---|
1794 |
|
---|
1795 | test_cases = [
|
---|
1796 | # I=1, O=1 (fixed-length input == fixed-length output)
|
---|
1797 | (b'abcd', False, 'a.b.c.d.'),
|
---|
1798 | # I=0, O=0 (variable-length input, variable-length output)
|
---|
1799 | (b'oiabcd', True, 'abcd.'),
|
---|
1800 | # I=0, O=0 (should ignore extra periods)
|
---|
1801 | (b'oi...abcd...', True, 'abcd.'),
|
---|
1802 | # I=0, O=6 (variable-length input, fixed-length output)
|
---|
1803 | (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
|
---|
1804 | # I=2, O=6 (fixed-length input < fixed-length output)
|
---|
1805 | (b'i.i2.o6xyz', True, 'xy----.z-----.'),
|
---|
1806 | # I=6, O=3 (fixed-length input > fixed-length output)
|
---|
1807 | (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
|
---|
1808 | # I=0, then 3; O=29, then 15 (with longer output)
|
---|
1809 | (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
|
---|
1810 | 'a----------------------------.' +
|
---|
1811 | 'b----------------------------.' +
|
---|
1812 | 'cde--------------------------.' +
|
---|
1813 | 'abcdefghijabcde.' +
|
---|
1814 | 'a.b------------.' +
|
---|
1815 | '.c.------------.' +
|
---|
1816 | 'd.e------------.' +
|
---|
1817 | 'k--------------.' +
|
---|
1818 | 'l--------------.' +
|
---|
1819 | 'm--------------.')
|
---|
1820 | ]
|
---|
1821 |
|
---|
1822 | def test_decoder(self):
|
---|
1823 | # Try a few one-shot test cases.
|
---|
1824 | for input, eof, output in self.test_cases:
|
---|
1825 | d = StatefulIncrementalDecoder()
|
---|
1826 | self.assertEqual(d.decode(input, eof), output)
|
---|
1827 |
|
---|
1828 | # Also test an unfinished decode, followed by forcing EOF.
|
---|
1829 | d = StatefulIncrementalDecoder()
|
---|
1830 | self.assertEqual(d.decode(b'oiabcd'), '')
|
---|
1831 | self.assertEqual(d.decode(b'', 1), 'abcd.')
|
---|
1832 |
|
---|
1833 | class TextIOWrapperTest(unittest.TestCase):
|
---|
1834 |
|
---|
1835 | def setUp(self):
|
---|
1836 | self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
|
---|
1837 | self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
|
---|
1838 | support.unlink(support.TESTFN)
|
---|
1839 |
|
---|
1840 | def tearDown(self):
|
---|
1841 | support.unlink(support.TESTFN)
|
---|
1842 |
|
---|
1843 | def test_constructor(self):
|
---|
1844 | r = self.BytesIO(b"\xc3\xa9\n\n")
|
---|
1845 | b = self.BufferedReader(r, 1000)
|
---|
1846 | t = self.TextIOWrapper(b)
|
---|
1847 | t.__init__(b, encoding="latin1", newline="\r\n")
|
---|
1848 | self.assertEqual(t.encoding, "latin1")
|
---|
1849 | self.assertEqual(t.line_buffering, False)
|
---|
1850 | t.__init__(b, encoding="utf8", line_buffering=True)
|
---|
1851 | self.assertEqual(t.encoding, "utf8")
|
---|
1852 | self.assertEqual(t.line_buffering, True)
|
---|
1853 | self.assertEqual("\xe9\n", t.readline())
|
---|
1854 | self.assertRaises(TypeError, t.__init__, b, newline=42)
|
---|
1855 | self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
|
---|
1856 |
|
---|
1857 | def test_detach(self):
|
---|
1858 | r = self.BytesIO()
|
---|
1859 | b = self.BufferedWriter(r)
|
---|
1860 | t = self.TextIOWrapper(b)
|
---|
1861 | self.assertIs(t.detach(), b)
|
---|
1862 |
|
---|
1863 | t = self.TextIOWrapper(b, encoding="ascii")
|
---|
1864 | t.write("howdy")
|
---|
1865 | self.assertFalse(r.getvalue())
|
---|
1866 | t.detach()
|
---|
1867 | self.assertEqual(r.getvalue(), b"howdy")
|
---|
1868 | self.assertRaises(ValueError, t.detach)
|
---|
1869 |
|
---|
1870 | def test_repr(self):
|
---|
1871 | raw = self.BytesIO("hello".encode("utf-8"))
|
---|
1872 | b = self.BufferedReader(raw)
|
---|
1873 | t = self.TextIOWrapper(b, encoding="utf-8")
|
---|
1874 | modname = self.TextIOWrapper.__module__
|
---|
1875 | self.assertEqual(repr(t),
|
---|
1876 | "<%s.TextIOWrapper encoding='utf-8'>" % modname)
|
---|
1877 | raw.name = "dummy"
|
---|
1878 | self.assertEqual(repr(t),
|
---|
1879 | "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
|
---|
1880 | raw.name = b"dummy"
|
---|
1881 | self.assertEqual(repr(t),
|
---|
1882 | "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
|
---|
1883 |
|
---|
1884 | def test_line_buffering(self):
|
---|
1885 | r = self.BytesIO()
|
---|
1886 | b = self.BufferedWriter(r, 1000)
|
---|
1887 | t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
|
---|
1888 | t.write("X")
|
---|
1889 | self.assertEqual(r.getvalue(), b"") # No flush happened
|
---|
1890 | t.write("Y\nZ")
|
---|
1891 | self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
|
---|
1892 | t.write("A\rB")
|
---|
1893 | self.assertEqual(r.getvalue(), b"XY\nZA\rB")
|
---|
1894 |
|
---|
1895 | def test_encoding(self):
|
---|
1896 | # Check the encoding attribute is always set, and valid
|
---|
1897 | b = self.BytesIO()
|
---|
1898 | t = self.TextIOWrapper(b, encoding="utf8")
|
---|
1899 | self.assertEqual(t.encoding, "utf8")
|
---|
1900 | t = self.TextIOWrapper(b)
|
---|
1901 | self.assertTrue(t.encoding is not None)
|
---|
1902 | codecs.lookup(t.encoding)
|
---|
1903 |
|
---|
1904 | def test_encoding_errors_reading(self):
|
---|
1905 | # (1) default
|
---|
1906 | b = self.BytesIO(b"abc\n\xff\n")
|
---|
1907 | t = self.TextIOWrapper(b, encoding="ascii")
|
---|
1908 | self.assertRaises(UnicodeError, t.read)
|
---|
1909 | # (2) explicit strict
|
---|
1910 | b = self.BytesIO(b"abc\n\xff\n")
|
---|
1911 | t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
|
---|
1912 | self.assertRaises(UnicodeError, t.read)
|
---|
1913 | # (3) ignore
|
---|
1914 | b = self.BytesIO(b"abc\n\xff\n")
|
---|
1915 | t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
|
---|
1916 | self.assertEqual(t.read(), "abc\n\n")
|
---|
1917 | # (4) replace
|
---|
1918 | b = self.BytesIO(b"abc\n\xff\n")
|
---|
1919 | t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
|
---|
1920 | self.assertEqual(t.read(), "abc\n\ufffd\n")
|
---|
1921 |
|
---|
1922 | def test_encoding_errors_writing(self):
|
---|
1923 | # (1) default
|
---|
1924 | b = self.BytesIO()
|
---|
1925 | t = self.TextIOWrapper(b, encoding="ascii")
|
---|
1926 | self.assertRaises(UnicodeError, t.write, "\xff")
|
---|
1927 | # (2) explicit strict
|
---|
1928 | b = self.BytesIO()
|
---|
1929 | t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
|
---|
1930 | self.assertRaises(UnicodeError, t.write, "\xff")
|
---|
1931 | # (3) ignore
|
---|
1932 | b = self.BytesIO()
|
---|
1933 | t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
|
---|
1934 | newline="\n")
|
---|
1935 | t.write("abc\xffdef\n")
|
---|
1936 | t.flush()
|
---|
1937 | self.assertEqual(b.getvalue(), b"abcdef\n")
|
---|
1938 | # (4) replace
|
---|
1939 | b = self.BytesIO()
|
---|
1940 | t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
|
---|
1941 | newline="\n")
|
---|
1942 | t.write("abc\xffdef\n")
|
---|
1943 | t.flush()
|
---|
1944 | self.assertEqual(b.getvalue(), b"abc?def\n")
|
---|
1945 |
|
---|
1946 | def test_newlines(self):
|
---|
1947 | input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
|
---|
1948 |
|
---|
1949 | tests = [
|
---|
1950 | [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
|
---|
1951 | [ '', input_lines ],
|
---|
1952 | [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
|
---|
1953 | [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
|
---|
1954 | [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
|
---|
1955 | ]
|
---|
1956 | encodings = (
|
---|
1957 | 'utf-8', 'latin-1',
|
---|
1958 | 'utf-16', 'utf-16-le', 'utf-16-be',
|
---|
1959 | 'utf-32', 'utf-32-le', 'utf-32-be',
|
---|
1960 | )
|
---|
1961 |
|
---|
1962 | # Try a range of buffer sizes to test the case where \r is the last
|
---|
1963 | # character in TextIOWrapper._pending_line.
|
---|
1964 | for encoding in encodings:
|
---|
1965 | # XXX: str.encode() should return bytes
|
---|
1966 | data = bytes(''.join(input_lines).encode(encoding))
|
---|
1967 | for do_reads in (False, True):
|
---|
1968 | for bufsize in range(1, 10):
|
---|
1969 | for newline, exp_lines in tests:
|
---|
1970 | bufio = self.BufferedReader(self.BytesIO(data), bufsize)
|
---|
1971 | textio = self.TextIOWrapper(bufio, newline=newline,
|
---|
1972 | encoding=encoding)
|
---|
1973 | if do_reads:
|
---|
1974 | got_lines = []
|
---|
1975 | while True:
|
---|
1976 | c2 = textio.read(2)
|
---|
1977 | if c2 == '':
|
---|
1978 | break
|
---|
1979 | self.assertEqual(len(c2), 2)
|
---|
1980 | got_lines.append(c2 + textio.readline())
|
---|
1981 | else:
|
---|
1982 | got_lines = list(textio)
|
---|
1983 |
|
---|
1984 | for got_line, exp_line in zip(got_lines, exp_lines):
|
---|
1985 | self.assertEqual(got_line, exp_line)
|
---|
1986 | self.assertEqual(len(got_lines), len(exp_lines))
|
---|
1987 |
|
---|
1988 | def test_newlines_input(self):
|
---|
1989 | testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
|
---|
1990 | normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
|
---|
1991 | for newline, expected in [
|
---|
1992 | (None, normalized.decode("ascii").splitlines(True)),
|
---|
1993 | ("", testdata.decode("ascii").splitlines(True)),
|
---|
1994 | ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
|
---|
1995 | ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
|
---|
1996 | ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
|
---|
1997 | ]:
|
---|
1998 | buf = self.BytesIO(testdata)
|
---|
1999 | txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
|
---|
2000 | self.assertEqual(txt.readlines(), expected)
|
---|
2001 | txt.seek(0)
|
---|
2002 | self.assertEqual(txt.read(), "".join(expected))
|
---|
2003 |
|
---|
2004 | def test_newlines_output(self):
|
---|
2005 | testdict = {
|
---|
2006 | "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
|
---|
2007 | "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
|
---|
2008 | "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
|
---|
2009 | "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
|
---|
2010 | }
|
---|
2011 | tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
|
---|
2012 | for newline, expected in tests:
|
---|
2013 | buf = self.BytesIO()
|
---|
2014 | txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
|
---|
2015 | txt.write("AAA\nB")
|
---|
2016 | txt.write("BB\nCCC\n")
|
---|
2017 | txt.write("X\rY\r\nZ")
|
---|
2018 | txt.flush()
|
---|
2019 | self.assertEqual(buf.closed, False)
|
---|
2020 | self.assertEqual(buf.getvalue(), expected)
|
---|
2021 |
|
---|
2022 | def test_destructor(self):
|
---|
2023 | l = []
|
---|
2024 | base = self.BytesIO
|
---|
2025 | class MyBytesIO(base):
|
---|
2026 | def close(self):
|
---|
2027 | l.append(self.getvalue())
|
---|
2028 | base.close(self)
|
---|
2029 | b = MyBytesIO()
|
---|
2030 | t = self.TextIOWrapper(b, encoding="ascii")
|
---|
2031 | t.write("abc")
|
---|
2032 | del t
|
---|
2033 | support.gc_collect()
|
---|
2034 | self.assertEqual([b"abc"], l)
|
---|
2035 |
|
---|
2036 | def test_override_destructor(self):
|
---|
2037 | record = []
|
---|
2038 | class MyTextIO(self.TextIOWrapper):
|
---|
2039 | def __del__(self):
|
---|
2040 | record.append(1)
|
---|
2041 | try:
|
---|
2042 | f = super(MyTextIO, self).__del__
|
---|
2043 | except AttributeError:
|
---|
2044 | pass
|
---|
2045 | else:
|
---|
2046 | f()
|
---|
2047 | def close(self):
|
---|
2048 | record.append(2)
|
---|
2049 | super(MyTextIO, self).close()
|
---|
2050 | def flush(self):
|
---|
2051 | record.append(3)
|
---|
2052 | super(MyTextIO, self).flush()
|
---|
2053 | b = self.BytesIO()
|
---|
2054 | t = MyTextIO(b, encoding="ascii")
|
---|
2055 | del t
|
---|
2056 | support.gc_collect()
|
---|
2057 | self.assertEqual(record, [1, 2, 3])
|
---|
2058 |
|
---|
2059 | def test_error_through_destructor(self):
|
---|
2060 | # Test that the exception state is not modified by a destructor,
|
---|
2061 | # even if close() fails.
|
---|
2062 | rawio = self.CloseFailureIO()
|
---|
2063 | def f():
|
---|
2064 | self.TextIOWrapper(rawio).xyzzy
|
---|
2065 | with support.captured_output("stderr") as s:
|
---|
2066 | self.assertRaises(AttributeError, f)
|
---|
2067 | s = s.getvalue().strip()
|
---|
2068 | if s:
|
---|
2069 | # The destructor *may* have printed an unraisable error, check it
|
---|
2070 | self.assertEqual(len(s.splitlines()), 1)
|
---|
2071 | self.assertTrue(s.startswith("Exception IOError: "), s)
|
---|
2072 | self.assertTrue(s.endswith(" ignored"), s)
|
---|
2073 |
|
---|
2074 | # Systematic tests of the text I/O API
|
---|
2075 |
|
---|
2076 | def test_basic_io(self):
|
---|
2077 | for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
|
---|
2078 | for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
|
---|
2079 | f = self.open(support.TESTFN, "w+", encoding=enc)
|
---|
2080 | f._CHUNK_SIZE = chunksize
|
---|
2081 | self.assertEqual(f.write("abc"), 3)
|
---|
2082 | f.close()
|
---|
2083 | f = self.open(support.TESTFN, "r+", encoding=enc)
|
---|
2084 | f._CHUNK_SIZE = chunksize
|
---|
2085 | self.assertEqual(f.tell(), 0)
|
---|
2086 | self.assertEqual(f.read(), "abc")
|
---|
2087 | cookie = f.tell()
|
---|
2088 | self.assertEqual(f.seek(0), 0)
|
---|
2089 | self.assertEqual(f.read(None), "abc")
|
---|
2090 | f.seek(0)
|
---|
2091 | self.assertEqual(f.read(2), "ab")
|
---|
2092 | self.assertEqual(f.read(1), "c")
|
---|
2093 | self.assertEqual(f.read(1), "")
|
---|
2094 | self.assertEqual(f.read(), "")
|
---|
2095 | self.assertEqual(f.tell(), cookie)
|
---|
2096 | self.assertEqual(f.seek(0), 0)
|
---|
2097 | self.assertEqual(f.seek(0, 2), cookie)
|
---|
2098 | self.assertEqual(f.write("def"), 3)
|
---|
2099 | self.assertEqual(f.seek(cookie), cookie)
|
---|
2100 | self.assertEqual(f.read(), "def")
|
---|
2101 | if enc.startswith("utf"):
|
---|
2102 | self.multi_line_test(f, enc)
|
---|
2103 | f.close()
|
---|
2104 |
|
---|
2105 | def multi_line_test(self, f, enc):
|
---|
2106 | f.seek(0)
|
---|
2107 | f.truncate()
|
---|
2108 | sample = "s\xff\u0fff\uffff"
|
---|
2109 | wlines = []
|
---|
2110 | for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
|
---|
2111 | chars = []
|
---|
2112 | for i in range(size):
|
---|
2113 | chars.append(sample[i % len(sample)])
|
---|
2114 | line = "".join(chars) + "\n"
|
---|
2115 | wlines.append((f.tell(), line))
|
---|
2116 | f.write(line)
|
---|
2117 | f.seek(0)
|
---|
2118 | rlines = []
|
---|
2119 | while True:
|
---|
2120 | pos = f.tell()
|
---|
2121 | line = f.readline()
|
---|
2122 | if not line:
|
---|
2123 | break
|
---|
2124 | rlines.append((pos, line))
|
---|
2125 | self.assertEqual(rlines, wlines)
|
---|
2126 |
|
---|
2127 | def test_telling(self):
|
---|
2128 | f = self.open(support.TESTFN, "w+", encoding="utf8")
|
---|
2129 | p0 = f.tell()
|
---|
2130 | f.write("\xff\n")
|
---|
2131 | p1 = f.tell()
|
---|
2132 | f.write("\xff\n")
|
---|
2133 | p2 = f.tell()
|
---|
2134 | f.seek(0)
|
---|
2135 | self.assertEqual(f.tell(), p0)
|
---|
2136 | self.assertEqual(f.readline(), "\xff\n")
|
---|
2137 | self.assertEqual(f.tell(), p1)
|
---|
2138 | self.assertEqual(f.readline(), "\xff\n")
|
---|
2139 | self.assertEqual(f.tell(), p2)
|
---|
2140 | f.seek(0)
|
---|
2141 | for line in f:
|
---|
2142 | self.assertEqual(line, "\xff\n")
|
---|
2143 | self.assertRaises(IOError, f.tell)
|
---|
2144 | self.assertEqual(f.tell(), p2)
|
---|
2145 | f.close()
|
---|
2146 |
|
---|
2147 | def test_seeking(self):
|
---|
2148 | chunk_size = _default_chunk_size()
|
---|
2149 | prefix_size = chunk_size - 2
|
---|
2150 | u_prefix = "a" * prefix_size
|
---|
2151 | prefix = bytes(u_prefix.encode("utf-8"))
|
---|
2152 | self.assertEqual(len(u_prefix), len(prefix))
|
---|
2153 | u_suffix = "\u8888\n"
|
---|
2154 | suffix = bytes(u_suffix.encode("utf-8"))
|
---|
2155 | line = prefix + suffix
|
---|
2156 | f = self.open(support.TESTFN, "wb")
|
---|
2157 | f.write(line*2)
|
---|
2158 | f.close()
|
---|
2159 | f = self.open(support.TESTFN, "r", encoding="utf-8")
|
---|
2160 | s = f.read(prefix_size)
|
---|
2161 | self.assertEqual(s, prefix.decode("ascii"))
|
---|
2162 | self.assertEqual(f.tell(), prefix_size)
|
---|
2163 | self.assertEqual(f.readline(), u_suffix)
|
---|
2164 |
|
---|
2165 | def test_seeking_too(self):
|
---|
2166 | # Regression test for a specific bug
|
---|
2167 | data = b'\xe0\xbf\xbf\n'
|
---|
2168 | f = self.open(support.TESTFN, "wb")
|
---|
2169 | f.write(data)
|
---|
2170 | f.close()
|
---|
2171 | f = self.open(support.TESTFN, "r", encoding="utf-8")
|
---|
2172 | f._CHUNK_SIZE # Just test that it exists
|
---|
2173 | f._CHUNK_SIZE = 2
|
---|
2174 | f.readline()
|
---|
2175 | f.tell()
|
---|
2176 |
|
---|
2177 | def test_seek_and_tell(self):
|
---|
2178 | #Test seek/tell using the StatefulIncrementalDecoder.
|
---|
2179 | # Make test faster by doing smaller seeks
|
---|
2180 | CHUNK_SIZE = 128
|
---|
2181 |
|
---|
2182 | def test_seek_and_tell_with_data(data, min_pos=0):
|
---|
2183 | """Tell/seek to various points within a data stream and ensure
|
---|
2184 | that the decoded data returned by read() is consistent."""
|
---|
2185 | f = self.open(support.TESTFN, 'wb')
|
---|
2186 | f.write(data)
|
---|
2187 | f.close()
|
---|
2188 | f = self.open(support.TESTFN, encoding='test_decoder')
|
---|
2189 | f._CHUNK_SIZE = CHUNK_SIZE
|
---|
2190 | decoded = f.read()
|
---|
2191 | f.close()
|
---|
2192 |
|
---|
2193 | for i in range(min_pos, len(decoded) + 1): # seek positions
|
---|
2194 | for j in [1, 5, len(decoded) - i]: # read lengths
|
---|
2195 | f = self.open(support.TESTFN, encoding='test_decoder')
|
---|
2196 | self.assertEqual(f.read(i), decoded[:i])
|
---|
2197 | cookie = f.tell()
|
---|
2198 | self.assertEqual(f.read(j), decoded[i:i + j])
|
---|
2199 | f.seek(cookie)
|
---|
2200 | self.assertEqual(f.read(), decoded[i:])
|
---|
2201 | f.close()
|
---|
2202 |
|
---|
2203 | # Enable the test decoder.
|
---|
2204 | StatefulIncrementalDecoder.codecEnabled = 1
|
---|
2205 |
|
---|
2206 | # Run the tests.
|
---|
2207 | try:
|
---|
2208 | # Try each test case.
|
---|
2209 | for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
|
---|
2210 | test_seek_and_tell_with_data(input)
|
---|
2211 |
|
---|
2212 | # Position each test case so that it crosses a chunk boundary.
|
---|
2213 | for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
|
---|
2214 | offset = CHUNK_SIZE - len(input)//2
|
---|
2215 | prefix = b'.'*offset
|
---|
2216 | # Don't bother seeking into the prefix (takes too long).
|
---|
2217 | min_pos = offset*2
|
---|
2218 | test_seek_and_tell_with_data(prefix + input, min_pos)
|
---|
2219 |
|
---|
2220 | # Ensure our test decoder won't interfere with subsequent tests.
|
---|
2221 | finally:
|
---|
2222 | StatefulIncrementalDecoder.codecEnabled = 0
|
---|
2223 |
|
---|
2224 | def test_encoded_writes(self):
|
---|
2225 | data = "1234567890"
|
---|
2226 | tests = ("utf-16",
|
---|
2227 | "utf-16-le",
|
---|
2228 | "utf-16-be",
|
---|
2229 | "utf-32",
|
---|
2230 | "utf-32-le",
|
---|
2231 | "utf-32-be")
|
---|
2232 | for encoding in tests:
|
---|
2233 | buf = self.BytesIO()
|
---|
2234 | f = self.TextIOWrapper(buf, encoding=encoding)
|
---|
2235 | # Check if the BOM is written only once (see issue1753).
|
---|
2236 | f.write(data)
|
---|
2237 | f.write(data)
|
---|
2238 | f.seek(0)
|
---|
2239 | self.assertEqual(f.read(), data * 2)
|
---|
2240 | f.seek(0)
|
---|
2241 | self.assertEqual(f.read(), data * 2)
|
---|
2242 | self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
|
---|
2243 |
|
---|
2244 | def test_unreadable(self):
|
---|
2245 | class UnReadable(self.BytesIO):
|
---|
2246 | def readable(self):
|
---|
2247 | return False
|
---|
2248 | txt = self.TextIOWrapper(UnReadable())
|
---|
2249 | self.assertRaises(IOError, txt.read)
|
---|
2250 |
|
---|
2251 | def test_read_one_by_one(self):
|
---|
2252 | txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
|
---|
2253 | reads = ""
|
---|
2254 | while True:
|
---|
2255 | c = txt.read(1)
|
---|
2256 | if not c:
|
---|
2257 | break
|
---|
2258 | reads += c
|
---|
2259 | self.assertEqual(reads, "AA\nBB")
|
---|
2260 |
|
---|
2261 | def test_readlines(self):
|
---|
2262 | txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
|
---|
2263 | self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
|
---|
2264 | txt.seek(0)
|
---|
2265 | self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
|
---|
2266 | txt.seek(0)
|
---|
2267 | self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
|
---|
2268 |
|
---|
2269 | # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
|
---|
2270 | def test_read_by_chunk(self):
|
---|
2271 | # make sure "\r\n" straddles 128 char boundary.
|
---|
2272 | txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
|
---|
2273 | reads = ""
|
---|
2274 | while True:
|
---|
2275 | c = txt.read(128)
|
---|
2276 | if not c:
|
---|
2277 | break
|
---|
2278 | reads += c
|
---|
2279 | self.assertEqual(reads, "A"*127+"\nB")
|
---|
2280 |
|
---|
2281 | def test_writelines(self):
|
---|
2282 | l = ['ab', 'cd', 'ef']
|
---|
2283 | buf = self.BytesIO()
|
---|
2284 | txt = self.TextIOWrapper(buf)
|
---|
2285 | txt.writelines(l)
|
---|
2286 | txt.flush()
|
---|
2287 | self.assertEqual(buf.getvalue(), b'abcdef')
|
---|
2288 |
|
---|
2289 | def test_writelines_userlist(self):
|
---|
2290 | l = UserList(['ab', 'cd', 'ef'])
|
---|
2291 | buf = self.BytesIO()
|
---|
2292 | txt = self.TextIOWrapper(buf)
|
---|
2293 | txt.writelines(l)
|
---|
2294 | txt.flush()
|
---|
2295 | self.assertEqual(buf.getvalue(), b'abcdef')
|
---|
2296 |
|
---|
2297 | def test_writelines_error(self):
|
---|
2298 | txt = self.TextIOWrapper(self.BytesIO())
|
---|
2299 | self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
|
---|
2300 | self.assertRaises(TypeError, txt.writelines, None)
|
---|
2301 | self.assertRaises(TypeError, txt.writelines, b'abc')
|
---|
2302 |
|
---|
2303 | def test_issue1395_1(self):
|
---|
2304 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2305 |
|
---|
2306 | # read one char at a time
|
---|
2307 | reads = ""
|
---|
2308 | while True:
|
---|
2309 | c = txt.read(1)
|
---|
2310 | if not c:
|
---|
2311 | break
|
---|
2312 | reads += c
|
---|
2313 | self.assertEqual(reads, self.normalized)
|
---|
2314 |
|
---|
2315 | def test_issue1395_2(self):
|
---|
2316 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2317 | txt._CHUNK_SIZE = 4
|
---|
2318 |
|
---|
2319 | reads = ""
|
---|
2320 | while True:
|
---|
2321 | c = txt.read(4)
|
---|
2322 | if not c:
|
---|
2323 | break
|
---|
2324 | reads += c
|
---|
2325 | self.assertEqual(reads, self.normalized)
|
---|
2326 |
|
---|
2327 | def test_issue1395_3(self):
|
---|
2328 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2329 | txt._CHUNK_SIZE = 4
|
---|
2330 |
|
---|
2331 | reads = txt.read(4)
|
---|
2332 | reads += txt.read(4)
|
---|
2333 | reads += txt.readline()
|
---|
2334 | reads += txt.readline()
|
---|
2335 | reads += txt.readline()
|
---|
2336 | self.assertEqual(reads, self.normalized)
|
---|
2337 |
|
---|
2338 | def test_issue1395_4(self):
|
---|
2339 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2340 | txt._CHUNK_SIZE = 4
|
---|
2341 |
|
---|
2342 | reads = txt.read(4)
|
---|
2343 | reads += txt.read()
|
---|
2344 | self.assertEqual(reads, self.normalized)
|
---|
2345 |
|
---|
2346 | def test_issue1395_5(self):
|
---|
2347 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2348 | txt._CHUNK_SIZE = 4
|
---|
2349 |
|
---|
2350 | reads = txt.read(4)
|
---|
2351 | pos = txt.tell()
|
---|
2352 | txt.seek(0)
|
---|
2353 | txt.seek(pos)
|
---|
2354 | self.assertEqual(txt.read(4), "BBB\n")
|
---|
2355 |
|
---|
2356 | def test_issue2282(self):
|
---|
2357 | buffer = self.BytesIO(self.testdata)
|
---|
2358 | txt = self.TextIOWrapper(buffer, encoding="ascii")
|
---|
2359 |
|
---|
2360 | self.assertEqual(buffer.seekable(), txt.seekable())
|
---|
2361 |
|
---|
2362 | def test_append_bom(self):
|
---|
2363 | # The BOM is not written again when appending to a non-empty file
|
---|
2364 | filename = support.TESTFN
|
---|
2365 | for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
---|
2366 | with self.open(filename, 'w', encoding=charset) as f:
|
---|
2367 | f.write('aaa')
|
---|
2368 | pos = f.tell()
|
---|
2369 | with self.open(filename, 'rb') as f:
|
---|
2370 | self.assertEqual(f.read(), 'aaa'.encode(charset))
|
---|
2371 |
|
---|
2372 | with self.open(filename, 'a', encoding=charset) as f:
|
---|
2373 | f.write('xxx')
|
---|
2374 | with self.open(filename, 'rb') as f:
|
---|
2375 | self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
|
---|
2376 |
|
---|
2377 | def test_seek_bom(self):
|
---|
2378 | # Same test, but when seeking manually
|
---|
2379 | filename = support.TESTFN
|
---|
2380 | for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
|
---|
2381 | with self.open(filename, 'w', encoding=charset) as f:
|
---|
2382 | f.write('aaa')
|
---|
2383 | pos = f.tell()
|
---|
2384 | with self.open(filename, 'r+', encoding=charset) as f:
|
---|
2385 | f.seek(pos)
|
---|
2386 | f.write('zzz')
|
---|
2387 | f.seek(0)
|
---|
2388 | f.write('bbb')
|
---|
2389 | with self.open(filename, 'rb') as f:
|
---|
2390 | self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
|
---|
2391 |
|
---|
2392 | def test_errors_property(self):
|
---|
2393 | with self.open(support.TESTFN, "w") as f:
|
---|
2394 | self.assertEqual(f.errors, "strict")
|
---|
2395 | with self.open(support.TESTFN, "w", errors="replace") as f:
|
---|
2396 | self.assertEqual(f.errors, "replace")
|
---|
2397 |
|
---|
2398 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
2399 | def test_threads_write(self):
|
---|
2400 | # Issue6750: concurrent writes could duplicate data
|
---|
2401 | event = threading.Event()
|
---|
2402 | with self.open(support.TESTFN, "w", buffering=1) as f:
|
---|
2403 | def run(n):
|
---|
2404 | text = "Thread%03d\n" % n
|
---|
2405 | event.wait()
|
---|
2406 | f.write(text)
|
---|
2407 | threads = [threading.Thread(target=lambda n=x: run(n))
|
---|
2408 | for x in range(20)]
|
---|
2409 | for t in threads:
|
---|
2410 | t.start()
|
---|
2411 | time.sleep(0.02)
|
---|
2412 | event.set()
|
---|
2413 | for t in threads:
|
---|
2414 | t.join()
|
---|
2415 | with self.open(support.TESTFN) as f:
|
---|
2416 | content = f.read()
|
---|
2417 | for n in range(20):
|
---|
2418 | self.assertEqual(content.count("Thread%03d\n" % n), 1)
|
---|
2419 |
|
---|
2420 | def test_flush_error_on_close(self):
|
---|
2421 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2422 | def bad_flush():
|
---|
2423 | raise IOError()
|
---|
2424 | txt.flush = bad_flush
|
---|
2425 | self.assertRaises(IOError, txt.close) # exception not swallowed
|
---|
2426 | self.assertTrue(txt.closed)
|
---|
2427 |
|
---|
2428 | def test_multi_close(self):
|
---|
2429 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2430 | txt.close()
|
---|
2431 | txt.close()
|
---|
2432 | txt.close()
|
---|
2433 | self.assertRaises(ValueError, txt.flush)
|
---|
2434 |
|
---|
2435 | def test_readonly_attributes(self):
|
---|
2436 | txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
|
---|
2437 | buf = self.BytesIO(self.testdata)
|
---|
2438 | with self.assertRaises((AttributeError, TypeError)):
|
---|
2439 | txt.buffer = buf
|
---|
2440 |
|
---|
2441 | def test_read_nonbytes(self):
|
---|
2442 | # Issue #17106
|
---|
2443 | # Crash when underlying read() returns non-bytes
|
---|
2444 | class NonbytesStream(self.StringIO):
|
---|
2445 | read1 = self.StringIO.read
|
---|
2446 | class NonbytesStream(self.StringIO):
|
---|
2447 | read1 = self.StringIO.read
|
---|
2448 | t = self.TextIOWrapper(NonbytesStream('a'))
|
---|
2449 | with self.maybeRaises(TypeError):
|
---|
2450 | t.read(1)
|
---|
2451 | t = self.TextIOWrapper(NonbytesStream('a'))
|
---|
2452 | with self.maybeRaises(TypeError):
|
---|
2453 | t.readline()
|
---|
2454 | t = self.TextIOWrapper(NonbytesStream('a'))
|
---|
2455 | self.assertEqual(t.read(), u'a')
|
---|
2456 |
|
---|
2457 | def test_illegal_decoder(self):
|
---|
2458 | # Issue #17106
|
---|
2459 | # Crash when decoder returns non-string
|
---|
2460 | t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
|
---|
2461 | encoding='quopri_codec')
|
---|
2462 | with self.maybeRaises(TypeError):
|
---|
2463 | t.read(1)
|
---|
2464 | t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
|
---|
2465 | encoding='quopri_codec')
|
---|
2466 | with self.maybeRaises(TypeError):
|
---|
2467 | t.readline()
|
---|
2468 | t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
|
---|
2469 | encoding='quopri_codec')
|
---|
2470 | with self.maybeRaises(TypeError):
|
---|
2471 | t.read()
|
---|
2472 |
|
---|
2473 |
|
---|
2474 | class CTextIOWrapperTest(TextIOWrapperTest):
|
---|
2475 |
|
---|
2476 | def test_initialization(self):
|
---|
2477 | r = self.BytesIO(b"\xc3\xa9\n\n")
|
---|
2478 | b = self.BufferedReader(r, 1000)
|
---|
2479 | t = self.TextIOWrapper(b)
|
---|
2480 | self.assertRaises(TypeError, t.__init__, b, newline=42)
|
---|
2481 | self.assertRaises(ValueError, t.read)
|
---|
2482 | self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
|
---|
2483 | self.assertRaises(ValueError, t.read)
|
---|
2484 |
|
---|
2485 | def test_garbage_collection(self):
|
---|
2486 | # C TextIOWrapper objects are collected, and collecting them flushes
|
---|
2487 | # all data to disk.
|
---|
2488 | # The Python version has __del__, so it ends in gc.garbage instead.
|
---|
2489 | rawio = io.FileIO(support.TESTFN, "wb")
|
---|
2490 | b = self.BufferedWriter(rawio)
|
---|
2491 | t = self.TextIOWrapper(b, encoding="ascii")
|
---|
2492 | t.write("456def")
|
---|
2493 | t.x = t
|
---|
2494 | wr = weakref.ref(t)
|
---|
2495 | del t
|
---|
2496 | support.gc_collect()
|
---|
2497 | self.assertTrue(wr() is None, wr)
|
---|
2498 | with self.open(support.TESTFN, "rb") as f:
|
---|
2499 | self.assertEqual(f.read(), b"456def")
|
---|
2500 |
|
---|
2501 | def test_rwpair_cleared_before_textio(self):
|
---|
2502 | # Issue 13070: TextIOWrapper's finalization would crash when called
|
---|
2503 | # after the reference to the underlying BufferedRWPair's writer got
|
---|
2504 | # cleared by the GC.
|
---|
2505 | for i in range(1000):
|
---|
2506 | b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
|
---|
2507 | t1 = self.TextIOWrapper(b1, encoding="ascii")
|
---|
2508 | b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
|
---|
2509 | t2 = self.TextIOWrapper(b2, encoding="ascii")
|
---|
2510 | # circular references
|
---|
2511 | t1.buddy = t2
|
---|
2512 | t2.buddy = t1
|
---|
2513 | support.gc_collect()
|
---|
2514 |
|
---|
2515 | maybeRaises = unittest.TestCase.assertRaises
|
---|
2516 |
|
---|
2517 |
|
---|
2518 | class PyTextIOWrapperTest(TextIOWrapperTest):
|
---|
2519 | @contextlib.contextmanager
|
---|
2520 | def maybeRaises(self, *args, **kwds):
|
---|
2521 | yield
|
---|
2522 |
|
---|
2523 |
|
---|
2524 | class IncrementalNewlineDecoderTest(unittest.TestCase):
|
---|
2525 |
|
---|
2526 | def check_newline_decoding_utf8(self, decoder):
|
---|
2527 | # UTF-8 specific tests for a newline decoder
|
---|
2528 | def _check_decode(b, s, **kwargs):
|
---|
2529 | # We exercise getstate() / setstate() as well as decode()
|
---|
2530 | state = decoder.getstate()
|
---|
2531 | self.assertEqual(decoder.decode(b, **kwargs), s)
|
---|
2532 | decoder.setstate(state)
|
---|
2533 | self.assertEqual(decoder.decode(b, **kwargs), s)
|
---|
2534 |
|
---|
2535 | _check_decode(b'\xe8\xa2\x88', "\u8888")
|
---|
2536 |
|
---|
2537 | _check_decode(b'\xe8', "")
|
---|
2538 | _check_decode(b'\xa2', "")
|
---|
2539 | _check_decode(b'\x88', "\u8888")
|
---|
2540 |
|
---|
2541 | _check_decode(b'\xe8', "")
|
---|
2542 | _check_decode(b'\xa2', "")
|
---|
2543 | _check_decode(b'\x88', "\u8888")
|
---|
2544 |
|
---|
2545 | _check_decode(b'\xe8', "")
|
---|
2546 | self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
|
---|
2547 |
|
---|
2548 | decoder.reset()
|
---|
2549 | _check_decode(b'\n', "\n")
|
---|
2550 | _check_decode(b'\r', "")
|
---|
2551 | _check_decode(b'', "\n", final=True)
|
---|
2552 | _check_decode(b'\r', "\n", final=True)
|
---|
2553 |
|
---|
2554 | _check_decode(b'\r', "")
|
---|
2555 | _check_decode(b'a', "\na")
|
---|
2556 |
|
---|
2557 | _check_decode(b'\r\r\n', "\n\n")
|
---|
2558 | _check_decode(b'\r', "")
|
---|
2559 | _check_decode(b'\r', "\n")
|
---|
2560 | _check_decode(b'\na', "\na")
|
---|
2561 |
|
---|
2562 | _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
|
---|
2563 | _check_decode(b'\xe8\xa2\x88', "\u8888")
|
---|
2564 | _check_decode(b'\n', "\n")
|
---|
2565 | _check_decode(b'\xe8\xa2\x88\r', "\u8888")
|
---|
2566 | _check_decode(b'\n', "\n")
|
---|
2567 |
|
---|
2568 | def check_newline_decoding(self, decoder, encoding):
|
---|
2569 | result = []
|
---|
2570 | if encoding is not None:
|
---|
2571 | encoder = codecs.getincrementalencoder(encoding)()
|
---|
2572 | def _decode_bytewise(s):
|
---|
2573 | # Decode one byte at a time
|
---|
2574 | for b in encoder.encode(s):
|
---|
2575 | result.append(decoder.decode(b))
|
---|
2576 | else:
|
---|
2577 | encoder = None
|
---|
2578 | def _decode_bytewise(s):
|
---|
2579 | # Decode one char at a time
|
---|
2580 | for c in s:
|
---|
2581 | result.append(decoder.decode(c))
|
---|
2582 | self.assertEqual(decoder.newlines, None)
|
---|
2583 | _decode_bytewise("abc\n\r")
|
---|
2584 | self.assertEqual(decoder.newlines, '\n')
|
---|
2585 | _decode_bytewise("\nabc")
|
---|
2586 | self.assertEqual(decoder.newlines, ('\n', '\r\n'))
|
---|
2587 | _decode_bytewise("abc\r")
|
---|
2588 | self.assertEqual(decoder.newlines, ('\n', '\r\n'))
|
---|
2589 | _decode_bytewise("abc")
|
---|
2590 | self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
|
---|
2591 | _decode_bytewise("abc\r")
|
---|
2592 | self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
|
---|
2593 | decoder.reset()
|
---|
2594 | input = "abc"
|
---|
2595 | if encoder is not None:
|
---|
2596 | encoder.reset()
|
---|
2597 | input = encoder.encode(input)
|
---|
2598 | self.assertEqual(decoder.decode(input), "abc")
|
---|
2599 | self.assertEqual(decoder.newlines, None)
|
---|
2600 |
|
---|
2601 | def test_newline_decoder(self):
|
---|
2602 | encodings = (
|
---|
2603 | # None meaning the IncrementalNewlineDecoder takes unicode input
|
---|
2604 | # rather than bytes input
|
---|
2605 | None, 'utf-8', 'latin-1',
|
---|
2606 | 'utf-16', 'utf-16-le', 'utf-16-be',
|
---|
2607 | 'utf-32', 'utf-32-le', 'utf-32-be',
|
---|
2608 | )
|
---|
2609 | for enc in encodings:
|
---|
2610 | decoder = enc and codecs.getincrementaldecoder(enc)()
|
---|
2611 | decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
|
---|
2612 | self.check_newline_decoding(decoder, enc)
|
---|
2613 | decoder = codecs.getincrementaldecoder("utf-8")()
|
---|
2614 | decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
|
---|
2615 | self.check_newline_decoding_utf8(decoder)
|
---|
2616 |
|
---|
2617 | def test_newline_bytes(self):
|
---|
2618 | # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
|
---|
2619 | def _check(dec):
|
---|
2620 | self.assertEqual(dec.newlines, None)
|
---|
2621 | self.assertEqual(dec.decode("\u0D00"), "\u0D00")
|
---|
2622 | self.assertEqual(dec.newlines, None)
|
---|
2623 | self.assertEqual(dec.decode("\u0A00"), "\u0A00")
|
---|
2624 | self.assertEqual(dec.newlines, None)
|
---|
2625 | dec = self.IncrementalNewlineDecoder(None, translate=False)
|
---|
2626 | _check(dec)
|
---|
2627 | dec = self.IncrementalNewlineDecoder(None, translate=True)
|
---|
2628 | _check(dec)
|
---|
2629 |
|
---|
2630 | class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
|
---|
2631 | pass
|
---|
2632 |
|
---|
2633 | class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
|
---|
2634 | pass
|
---|
2635 |
|
---|
2636 |
|
---|
2637 | # XXX Tests for open()
|
---|
2638 |
|
---|
2639 | class MiscIOTest(unittest.TestCase):
|
---|
2640 |
|
---|
2641 | def tearDown(self):
|
---|
2642 | support.unlink(support.TESTFN)
|
---|
2643 |
|
---|
2644 | def test___all__(self):
|
---|
2645 | for name in self.io.__all__:
|
---|
2646 | obj = getattr(self.io, name, None)
|
---|
2647 | self.assertTrue(obj is not None, name)
|
---|
2648 | if name == "open":
|
---|
2649 | continue
|
---|
2650 | elif "error" in name.lower() or name == "UnsupportedOperation":
|
---|
2651 | self.assertTrue(issubclass(obj, Exception), name)
|
---|
2652 | elif not name.startswith("SEEK_"):
|
---|
2653 | self.assertTrue(issubclass(obj, self.IOBase))
|
---|
2654 |
|
---|
2655 | def test_attributes(self):
|
---|
2656 | f = self.open(support.TESTFN, "wb", buffering=0)
|
---|
2657 | self.assertEqual(f.mode, "wb")
|
---|
2658 | f.close()
|
---|
2659 |
|
---|
2660 | f = self.open(support.TESTFN, "U")
|
---|
2661 | self.assertEqual(f.name, support.TESTFN)
|
---|
2662 | self.assertEqual(f.buffer.name, support.TESTFN)
|
---|
2663 | self.assertEqual(f.buffer.raw.name, support.TESTFN)
|
---|
2664 | self.assertEqual(f.mode, "U")
|
---|
2665 | self.assertEqual(f.buffer.mode, "rb")
|
---|
2666 | self.assertEqual(f.buffer.raw.mode, "rb")
|
---|
2667 | f.close()
|
---|
2668 |
|
---|
2669 | f = self.open(support.TESTFN, "w+")
|
---|
2670 | self.assertEqual(f.mode, "w+")
|
---|
2671 | self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
|
---|
2672 | self.assertEqual(f.buffer.raw.mode, "rb+")
|
---|
2673 |
|
---|
2674 | g = self.open(f.fileno(), "wb", closefd=False)
|
---|
2675 | self.assertEqual(g.mode, "wb")
|
---|
2676 | self.assertEqual(g.raw.mode, "wb")
|
---|
2677 | self.assertEqual(g.name, f.fileno())
|
---|
2678 | self.assertEqual(g.raw.name, f.fileno())
|
---|
2679 | f.close()
|
---|
2680 | g.close()
|
---|
2681 |
|
---|
2682 | def test_io_after_close(self):
|
---|
2683 | for kwargs in [
|
---|
2684 | {"mode": "w"},
|
---|
2685 | {"mode": "wb"},
|
---|
2686 | {"mode": "w", "buffering": 1},
|
---|
2687 | {"mode": "w", "buffering": 2},
|
---|
2688 | {"mode": "wb", "buffering": 0},
|
---|
2689 | {"mode": "r"},
|
---|
2690 | {"mode": "rb"},
|
---|
2691 | {"mode": "r", "buffering": 1},
|
---|
2692 | {"mode": "r", "buffering": 2},
|
---|
2693 | {"mode": "rb", "buffering": 0},
|
---|
2694 | {"mode": "w+"},
|
---|
2695 | {"mode": "w+b"},
|
---|
2696 | {"mode": "w+", "buffering": 1},
|
---|
2697 | {"mode": "w+", "buffering": 2},
|
---|
2698 | {"mode": "w+b", "buffering": 0},
|
---|
2699 | ]:
|
---|
2700 | f = self.open(support.TESTFN, **kwargs)
|
---|
2701 | f.close()
|
---|
2702 | self.assertRaises(ValueError, f.flush)
|
---|
2703 | self.assertRaises(ValueError, f.fileno)
|
---|
2704 | self.assertRaises(ValueError, f.isatty)
|
---|
2705 | self.assertRaises(ValueError, f.__iter__)
|
---|
2706 | if hasattr(f, "peek"):
|
---|
2707 | self.assertRaises(ValueError, f.peek, 1)
|
---|
2708 | self.assertRaises(ValueError, f.read)
|
---|
2709 | if hasattr(f, "read1"):
|
---|
2710 | self.assertRaises(ValueError, f.read1, 1024)
|
---|
2711 | if hasattr(f, "readall"):
|
---|
2712 | self.assertRaises(ValueError, f.readall)
|
---|
2713 | if hasattr(f, "readinto"):
|
---|
2714 | self.assertRaises(ValueError, f.readinto, bytearray(1024))
|
---|
2715 | self.assertRaises(ValueError, f.readline)
|
---|
2716 | self.assertRaises(ValueError, f.readlines)
|
---|
2717 | self.assertRaises(ValueError, f.seek, 0)
|
---|
2718 | self.assertRaises(ValueError, f.tell)
|
---|
2719 | self.assertRaises(ValueError, f.truncate)
|
---|
2720 | self.assertRaises(ValueError, f.write,
|
---|
2721 | b"" if "b" in kwargs['mode'] else "")
|
---|
2722 | self.assertRaises(ValueError, f.writelines, [])
|
---|
2723 | self.assertRaises(ValueError, next, f)
|
---|
2724 |
|
---|
2725 | def test_blockingioerror(self):
|
---|
2726 | # Various BlockingIOError issues
|
---|
2727 | self.assertRaises(TypeError, self.BlockingIOError)
|
---|
2728 | self.assertRaises(TypeError, self.BlockingIOError, 1)
|
---|
2729 | self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
|
---|
2730 | self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
|
---|
2731 | b = self.BlockingIOError(1, "")
|
---|
2732 | self.assertEqual(b.characters_written, 0)
|
---|
2733 | class C(unicode):
|
---|
2734 | pass
|
---|
2735 | c = C("")
|
---|
2736 | b = self.BlockingIOError(1, c)
|
---|
2737 | c.b = b
|
---|
2738 | b.c = c
|
---|
2739 | wr = weakref.ref(c)
|
---|
2740 | del c, b
|
---|
2741 | support.gc_collect()
|
---|
2742 | self.assertTrue(wr() is None, wr)
|
---|
2743 |
|
---|
2744 | def test_abcs(self):
|
---|
2745 | # Test the visible base classes are ABCs.
|
---|
2746 | self.assertIsInstance(self.IOBase, abc.ABCMeta)
|
---|
2747 | self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
|
---|
2748 | self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
|
---|
2749 | self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
|
---|
2750 |
|
---|
2751 | def _check_abc_inheritance(self, abcmodule):
|
---|
2752 | with self.open(support.TESTFN, "wb", buffering=0) as f:
|
---|
2753 | self.assertIsInstance(f, abcmodule.IOBase)
|
---|
2754 | self.assertIsInstance(f, abcmodule.RawIOBase)
|
---|
2755 | self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
|
---|
2756 | self.assertNotIsInstance(f, abcmodule.TextIOBase)
|
---|
2757 | with self.open(support.TESTFN, "wb") as f:
|
---|
2758 | self.assertIsInstance(f, abcmodule.IOBase)
|
---|
2759 | self.assertNotIsInstance(f, abcmodule.RawIOBase)
|
---|
2760 | self.assertIsInstance(f, abcmodule.BufferedIOBase)
|
---|
2761 | self.assertNotIsInstance(f, abcmodule.TextIOBase)
|
---|
2762 | with self.open(support.TESTFN, "w") as f:
|
---|
2763 | self.assertIsInstance(f, abcmodule.IOBase)
|
---|
2764 | self.assertNotIsInstance(f, abcmodule.RawIOBase)
|
---|
2765 | self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
|
---|
2766 | self.assertIsInstance(f, abcmodule.TextIOBase)
|
---|
2767 |
|
---|
2768 | def test_abc_inheritance(self):
|
---|
2769 | # Test implementations inherit from their respective ABCs
|
---|
2770 | self._check_abc_inheritance(self)
|
---|
2771 |
|
---|
2772 | def test_abc_inheritance_official(self):
|
---|
2773 | # Test implementations inherit from the official ABCs of the
|
---|
2774 | # baseline "io" module.
|
---|
2775 | self._check_abc_inheritance(io)
|
---|
2776 |
|
---|
2777 | @unittest.skipUnless(fcntl, 'fcntl required for this test')
|
---|
2778 | def test_nonblock_pipe_write_bigbuf(self):
|
---|
2779 | self._test_nonblock_pipe_write(16*1024)
|
---|
2780 |
|
---|
2781 | @unittest.skipUnless(fcntl, 'fcntl required for this test')
|
---|
2782 | def test_nonblock_pipe_write_smallbuf(self):
|
---|
2783 | self._test_nonblock_pipe_write(1024)
|
---|
2784 |
|
---|
2785 | def _set_non_blocking(self, fd):
|
---|
2786 | flags = fcntl.fcntl(fd, fcntl.F_GETFL)
|
---|
2787 | self.assertNotEqual(flags, -1)
|
---|
2788 | res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
---|
2789 | self.assertEqual(res, 0)
|
---|
2790 |
|
---|
2791 | def _test_nonblock_pipe_write(self, bufsize):
|
---|
2792 | sent = []
|
---|
2793 | received = []
|
---|
2794 | r, w = os.pipe()
|
---|
2795 | self._set_non_blocking(r)
|
---|
2796 | self._set_non_blocking(w)
|
---|
2797 |
|
---|
2798 | # To exercise all code paths in the C implementation we need
|
---|
2799 | # to play with buffer sizes. For instance, if we choose a
|
---|
2800 | # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
|
---|
2801 | # then we will never get a partial write of the buffer.
|
---|
2802 | rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
|
---|
2803 | wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
|
---|
2804 |
|
---|
2805 | with rf, wf:
|
---|
2806 | for N in 9999, 73, 7574:
|
---|
2807 | try:
|
---|
2808 | i = 0
|
---|
2809 | while True:
|
---|
2810 | msg = bytes([i % 26 + 97]) * N
|
---|
2811 | sent.append(msg)
|
---|
2812 | wf.write(msg)
|
---|
2813 | i += 1
|
---|
2814 |
|
---|
2815 | except self.BlockingIOError as e:
|
---|
2816 | self.assertEqual(e.args[0], errno.EAGAIN)
|
---|
2817 | sent[-1] = sent[-1][:e.characters_written]
|
---|
2818 | received.append(rf.read())
|
---|
2819 | msg = b'BLOCKED'
|
---|
2820 | wf.write(msg)
|
---|
2821 | sent.append(msg)
|
---|
2822 |
|
---|
2823 | while True:
|
---|
2824 | try:
|
---|
2825 | wf.flush()
|
---|
2826 | break
|
---|
2827 | except self.BlockingIOError as e:
|
---|
2828 | self.assertEqual(e.args[0], errno.EAGAIN)
|
---|
2829 | self.assertEqual(e.characters_written, 0)
|
---|
2830 | received.append(rf.read())
|
---|
2831 |
|
---|
2832 | received += iter(rf.read, None)
|
---|
2833 |
|
---|
2834 | sent, received = b''.join(sent), b''.join(received)
|
---|
2835 | self.assertTrue(sent == received)
|
---|
2836 | self.assertTrue(wf.closed)
|
---|
2837 | self.assertTrue(rf.closed)
|
---|
2838 |
|
---|
2839 | class CMiscIOTest(MiscIOTest):
|
---|
2840 | io = io
|
---|
2841 |
|
---|
2842 | class PyMiscIOTest(MiscIOTest):
|
---|
2843 | io = pyio
|
---|
2844 |
|
---|
2845 |
|
---|
2846 | @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
|
---|
2847 | class SignalsTest(unittest.TestCase):
|
---|
2848 |
|
---|
2849 | def setUp(self):
|
---|
2850 | self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
|
---|
2851 |
|
---|
2852 | def tearDown(self):
|
---|
2853 | signal.signal(signal.SIGALRM, self.oldalrm)
|
---|
2854 |
|
---|
2855 | def alarm_interrupt(self, sig, frame):
|
---|
2856 | 1 // 0
|
---|
2857 |
|
---|
2858 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
2859 | @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
|
---|
2860 | 'issue #12429: skip test on FreeBSD <= 7')
|
---|
2861 | def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
|
---|
2862 | """Check that a partial write, when it gets interrupted, properly
|
---|
2863 | invokes the signal handler, and bubbles up the exception raised
|
---|
2864 | in the latter."""
|
---|
2865 | read_results = []
|
---|
2866 | def _read():
|
---|
2867 | s = os.read(r, 1)
|
---|
2868 | read_results.append(s)
|
---|
2869 | t = threading.Thread(target=_read)
|
---|
2870 | t.daemon = True
|
---|
2871 | r, w = os.pipe()
|
---|
2872 | try:
|
---|
2873 | wio = self.io.open(w, **fdopen_kwargs)
|
---|
2874 | t.start()
|
---|
2875 | signal.alarm(1)
|
---|
2876 | # Fill the pipe enough that the write will be blocking.
|
---|
2877 | # It will be interrupted by the timer armed above. Since the
|
---|
2878 | # other thread has read one byte, the low-level write will
|
---|
2879 | # return with a successful (partial) result rather than an EINTR.
|
---|
2880 | # The buffered IO layer must check for pending signal
|
---|
2881 | # handlers, which in this case will invoke alarm_interrupt().
|
---|
2882 | self.assertRaises(ZeroDivisionError,
|
---|
2883 | wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
|
---|
2884 | t.join()
|
---|
2885 | # We got one byte, get another one and check that it isn't a
|
---|
2886 | # repeat of the first one.
|
---|
2887 | read_results.append(os.read(r, 1))
|
---|
2888 | self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
|
---|
2889 | finally:
|
---|
2890 | os.close(w)
|
---|
2891 | os.close(r)
|
---|
2892 | # This is deliberate. If we didn't close the file descriptor
|
---|
2893 | # before closing wio, wio would try to flush its internal
|
---|
2894 | # buffer, and block again.
|
---|
2895 | try:
|
---|
2896 | wio.close()
|
---|
2897 | except IOError as e:
|
---|
2898 | if e.errno != errno.EBADF:
|
---|
2899 | raise
|
---|
2900 |
|
---|
2901 | def test_interrupted_write_unbuffered(self):
|
---|
2902 | self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
|
---|
2903 |
|
---|
2904 | def test_interrupted_write_buffered(self):
|
---|
2905 | self.check_interrupted_write(b"xy", b"xy", mode="wb")
|
---|
2906 |
|
---|
2907 | def test_interrupted_write_text(self):
|
---|
2908 | self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
|
---|
2909 |
|
---|
2910 | def check_reentrant_write(self, data, **fdopen_kwargs):
|
---|
2911 | def on_alarm(*args):
|
---|
2912 | # Will be called reentrantly from the same thread
|
---|
2913 | wio.write(data)
|
---|
2914 | 1//0
|
---|
2915 | signal.signal(signal.SIGALRM, on_alarm)
|
---|
2916 | r, w = os.pipe()
|
---|
2917 | wio = self.io.open(w, **fdopen_kwargs)
|
---|
2918 | try:
|
---|
2919 | signal.alarm(1)
|
---|
2920 | # Either the reentrant call to wio.write() fails with RuntimeError,
|
---|
2921 | # or the signal handler raises ZeroDivisionError.
|
---|
2922 | with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
|
---|
2923 | while 1:
|
---|
2924 | for i in range(100):
|
---|
2925 | wio.write(data)
|
---|
2926 | wio.flush()
|
---|
2927 | # Make sure the buffer doesn't fill up and block further writes
|
---|
2928 | os.read(r, len(data) * 100)
|
---|
2929 | exc = cm.exception
|
---|
2930 | if isinstance(exc, RuntimeError):
|
---|
2931 | self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
|
---|
2932 | finally:
|
---|
2933 | wio.close()
|
---|
2934 | os.close(r)
|
---|
2935 |
|
---|
2936 | def test_reentrant_write_buffered(self):
|
---|
2937 | self.check_reentrant_write(b"xy", mode="wb")
|
---|
2938 |
|
---|
2939 | def test_reentrant_write_text(self):
|
---|
2940 | self.check_reentrant_write("xy", mode="w", encoding="ascii")
|
---|
2941 |
|
---|
2942 | def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
|
---|
2943 | """Check that a buffered read, when it gets interrupted (either
|
---|
2944 | returning a partial result or EINTR), properly invokes the signal
|
---|
2945 | handler and retries if the latter returned successfully."""
|
---|
2946 | r, w = os.pipe()
|
---|
2947 | fdopen_kwargs["closefd"] = False
|
---|
2948 | def alarm_handler(sig, frame):
|
---|
2949 | os.write(w, b"bar")
|
---|
2950 | signal.signal(signal.SIGALRM, alarm_handler)
|
---|
2951 | try:
|
---|
2952 | rio = self.io.open(r, **fdopen_kwargs)
|
---|
2953 | os.write(w, b"foo")
|
---|
2954 | signal.alarm(1)
|
---|
2955 | # Expected behaviour:
|
---|
2956 | # - first raw read() returns partial b"foo"
|
---|
2957 | # - second raw read() returns EINTR
|
---|
2958 | # - third raw read() returns b"bar"
|
---|
2959 | self.assertEqual(decode(rio.read(6)), "foobar")
|
---|
2960 | finally:
|
---|
2961 | rio.close()
|
---|
2962 | os.close(w)
|
---|
2963 | os.close(r)
|
---|
2964 |
|
---|
2965 | def test_interrupterd_read_retry_buffered(self):
|
---|
2966 | self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
|
---|
2967 | mode="rb")
|
---|
2968 |
|
---|
2969 | def test_interrupterd_read_retry_text(self):
|
---|
2970 | self.check_interrupted_read_retry(lambda x: x,
|
---|
2971 | mode="r")
|
---|
2972 |
|
---|
2973 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
2974 | def check_interrupted_write_retry(self, item, **fdopen_kwargs):
|
---|
2975 | """Check that a buffered write, when it gets interrupted (either
|
---|
2976 | returning a partial result or EINTR), properly invokes the signal
|
---|
2977 | handler and retries if the latter returned successfully."""
|
---|
2978 | select = support.import_module("select")
|
---|
2979 | # A quantity that exceeds the buffer size of an anonymous pipe's
|
---|
2980 | # write end.
|
---|
2981 | N = support.PIPE_MAX_SIZE
|
---|
2982 | r, w = os.pipe()
|
---|
2983 | fdopen_kwargs["closefd"] = False
|
---|
2984 | # We need a separate thread to read from the pipe and allow the
|
---|
2985 | # write() to finish. This thread is started after the SIGALRM is
|
---|
2986 | # received (forcing a first EINTR in write()).
|
---|
2987 | read_results = []
|
---|
2988 | write_finished = False
|
---|
2989 | def _read():
|
---|
2990 | while not write_finished:
|
---|
2991 | while r in select.select([r], [], [], 1.0)[0]:
|
---|
2992 | s = os.read(r, 1024)
|
---|
2993 | read_results.append(s)
|
---|
2994 | t = threading.Thread(target=_read)
|
---|
2995 | t.daemon = True
|
---|
2996 | def alarm1(sig, frame):
|
---|
2997 | signal.signal(signal.SIGALRM, alarm2)
|
---|
2998 | signal.alarm(1)
|
---|
2999 | def alarm2(sig, frame):
|
---|
3000 | t.start()
|
---|
3001 | signal.signal(signal.SIGALRM, alarm1)
|
---|
3002 | try:
|
---|
3003 | wio = self.io.open(w, **fdopen_kwargs)
|
---|
3004 | signal.alarm(1)
|
---|
3005 | # Expected behaviour:
|
---|
3006 | # - first raw write() is partial (because of the limited pipe buffer
|
---|
3007 | # and the first alarm)
|
---|
3008 | # - second raw write() returns EINTR (because of the second alarm)
|
---|
3009 | # - subsequent write()s are successful (either partial or complete)
|
---|
3010 | self.assertEqual(N, wio.write(item * N))
|
---|
3011 | wio.flush()
|
---|
3012 | write_finished = True
|
---|
3013 | t.join()
|
---|
3014 | self.assertEqual(N, sum(len(x) for x in read_results))
|
---|
3015 | finally:
|
---|
3016 | write_finished = True
|
---|
3017 | os.close(w)
|
---|
3018 | os.close(r)
|
---|
3019 | # This is deliberate. If we didn't close the file descriptor
|
---|
3020 | # before closing wio, wio would try to flush its internal
|
---|
3021 | # buffer, and could block (in case of failure).
|
---|
3022 | try:
|
---|
3023 | wio.close()
|
---|
3024 | except IOError as e:
|
---|
3025 | if e.errno != errno.EBADF:
|
---|
3026 | raise
|
---|
3027 |
|
---|
3028 | def test_interrupterd_write_retry_buffered(self):
|
---|
3029 | self.check_interrupted_write_retry(b"x", mode="wb")
|
---|
3030 |
|
---|
3031 | def test_interrupterd_write_retry_text(self):
|
---|
3032 | self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
|
---|
3033 |
|
---|
3034 |
|
---|
3035 | class CSignalsTest(SignalsTest):
|
---|
3036 | io = io
|
---|
3037 |
|
---|
3038 | class PySignalsTest(SignalsTest):
|
---|
3039 | io = pyio
|
---|
3040 |
|
---|
3041 | # Handling reentrancy issues would slow down _pyio even more, so the
|
---|
3042 | # tests are disabled.
|
---|
3043 | test_reentrant_write_buffered = None
|
---|
3044 | test_reentrant_write_text = None
|
---|
3045 |
|
---|
3046 |
|
---|
3047 | def test_main():
|
---|
3048 | tests = (CIOTest, PyIOTest,
|
---|
3049 | CBufferedReaderTest, PyBufferedReaderTest,
|
---|
3050 | CBufferedWriterTest, PyBufferedWriterTest,
|
---|
3051 | CBufferedRWPairTest, PyBufferedRWPairTest,
|
---|
3052 | CBufferedRandomTest, PyBufferedRandomTest,
|
---|
3053 | StatefulIncrementalDecoderTest,
|
---|
3054 | CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
|
---|
3055 | CTextIOWrapperTest, PyTextIOWrapperTest,
|
---|
3056 | CMiscIOTest, PyMiscIOTest,
|
---|
3057 | CSignalsTest, PySignalsTest,
|
---|
3058 | )
|
---|
3059 |
|
---|
3060 | # Put the namespaces of the IO module we are testing and some useful mock
|
---|
3061 | # classes in the __dict__ of each test.
|
---|
3062 | mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
|
---|
3063 | MockNonBlockWriterIO, MockRawIOWithoutRead)
|
---|
3064 | all_members = io.__all__ + ["IncrementalNewlineDecoder"]
|
---|
3065 | c_io_ns = dict((name, getattr(io, name)) for name in all_members)
|
---|
3066 | py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
|
---|
3067 | globs = globals()
|
---|
3068 | c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
|
---|
3069 | py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
|
---|
3070 | # Avoid turning open into a bound method.
|
---|
3071 | py_io_ns["open"] = pyio.OpenWrapper
|
---|
3072 | for test in tests:
|
---|
3073 | if test.__name__.startswith("C"):
|
---|
3074 | for name, obj in c_io_ns.items():
|
---|
3075 | setattr(test, name, obj)
|
---|
3076 | elif test.__name__.startswith("Py"):
|
---|
3077 | for name, obj in py_io_ns.items():
|
---|
3078 | setattr(test, name, obj)
|
---|
3079 |
|
---|
3080 | support.run_unittest(*tests)
|
---|
3081 |
|
---|
3082 | if __name__ == "__main__":
|
---|
3083 | test_main()
|
---|