1 | # Adapted from test_file.py by Daniel Stutzbach
|
---|
2 |
|
---|
3 | from __future__ import unicode_literals
|
---|
4 |
|
---|
5 | import sys
|
---|
6 | import os
|
---|
7 | import errno
|
---|
8 | import unittest
|
---|
9 | from array import array
|
---|
10 | from weakref import proxy
|
---|
11 | from functools import wraps
|
---|
12 | from UserList import UserList
|
---|
13 | import _testcapi
|
---|
14 |
|
---|
15 | from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
|
---|
16 | from test.test_support import py3k_bytes as bytes
|
---|
17 | from test.script_helper import run_python
|
---|
18 |
|
---|
19 | from _io import FileIO as _FileIO
|
---|
20 |
|
---|
21 | class AutoFileTests(unittest.TestCase):
|
---|
22 | # file tests for which a test file is automatically set up
|
---|
23 |
|
---|
24 | def setUp(self):
|
---|
25 | self.f = _FileIO(TESTFN, 'w')
|
---|
26 |
|
---|
27 | def tearDown(self):
|
---|
28 | if self.f:
|
---|
29 | self.f.close()
|
---|
30 | os.remove(TESTFN)
|
---|
31 |
|
---|
32 | def testWeakRefs(self):
|
---|
33 | # verify weak references
|
---|
34 | p = proxy(self.f)
|
---|
35 | p.write(bytes(range(10)))
|
---|
36 | self.assertEqual(self.f.tell(), p.tell())
|
---|
37 | self.f.close()
|
---|
38 | self.f = None
|
---|
39 | self.assertRaises(ReferenceError, getattr, p, 'tell')
|
---|
40 |
|
---|
41 | def testSeekTell(self):
|
---|
42 | self.f.write(bytes(range(20)))
|
---|
43 | self.assertEqual(self.f.tell(), 20)
|
---|
44 | self.f.seek(0)
|
---|
45 | self.assertEqual(self.f.tell(), 0)
|
---|
46 | self.f.seek(10)
|
---|
47 | self.assertEqual(self.f.tell(), 10)
|
---|
48 | self.f.seek(5, 1)
|
---|
49 | self.assertEqual(self.f.tell(), 15)
|
---|
50 | self.f.seek(-5, 1)
|
---|
51 | self.assertEqual(self.f.tell(), 10)
|
---|
52 | self.f.seek(-5, 2)
|
---|
53 | self.assertEqual(self.f.tell(), 15)
|
---|
54 |
|
---|
55 | def testAttributes(self):
|
---|
56 | # verify expected attributes exist
|
---|
57 | f = self.f
|
---|
58 |
|
---|
59 | self.assertEqual(f.mode, "wb")
|
---|
60 | self.assertEqual(f.closed, False)
|
---|
61 |
|
---|
62 | # verify the attributes are readonly
|
---|
63 | for attr in 'mode', 'closed':
|
---|
64 | self.assertRaises((AttributeError, TypeError),
|
---|
65 | setattr, f, attr, 'oops')
|
---|
66 |
|
---|
67 | def testReadinto(self):
|
---|
68 | # verify readinto
|
---|
69 | self.f.write(b"\x01\x02")
|
---|
70 | self.f.close()
|
---|
71 | a = array(b'b', b'x'*10)
|
---|
72 | self.f = _FileIO(TESTFN, 'r')
|
---|
73 | n = self.f.readinto(a)
|
---|
74 | self.assertEqual(array(b'b', [1, 2]), a[:n])
|
---|
75 |
|
---|
76 | def testWritelinesList(self):
|
---|
77 | l = [b'123', b'456']
|
---|
78 | self.f.writelines(l)
|
---|
79 | self.f.close()
|
---|
80 | self.f = _FileIO(TESTFN, 'rb')
|
---|
81 | buf = self.f.read()
|
---|
82 | self.assertEqual(buf, b'123456')
|
---|
83 |
|
---|
84 | def testWritelinesUserList(self):
|
---|
85 | l = UserList([b'123', b'456'])
|
---|
86 | self.f.writelines(l)
|
---|
87 | self.f.close()
|
---|
88 | self.f = _FileIO(TESTFN, 'rb')
|
---|
89 | buf = self.f.read()
|
---|
90 | self.assertEqual(buf, b'123456')
|
---|
91 |
|
---|
92 | def testWritelinesError(self):
|
---|
93 | self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
|
---|
94 | self.assertRaises(TypeError, self.f.writelines, None)
|
---|
95 |
|
---|
96 | def test_none_args(self):
|
---|
97 | self.f.write(b"hi\nbye\nabc")
|
---|
98 | self.f.close()
|
---|
99 | self.f = _FileIO(TESTFN, 'r')
|
---|
100 | self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
|
---|
101 | self.f.seek(0)
|
---|
102 | self.assertEqual(self.f.readline(None), b"hi\n")
|
---|
103 | self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
|
---|
104 |
|
---|
105 | def testRepr(self):
|
---|
106 | self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
|
---|
107 | % (self.f.name, self.f.mode))
|
---|
108 | del self.f.name
|
---|
109 | self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
|
---|
110 | % (self.f.fileno(), self.f.mode))
|
---|
111 | self.f.close()
|
---|
112 | self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
|
---|
113 |
|
---|
114 | def testErrors(self):
|
---|
115 | f = self.f
|
---|
116 | self.assertTrue(not f.isatty())
|
---|
117 | self.assertTrue(not f.closed)
|
---|
118 | #self.assertEqual(f.name, TESTFN)
|
---|
119 | self.assertRaises(ValueError, f.read, 10) # Open for reading
|
---|
120 | f.close()
|
---|
121 | self.assertTrue(f.closed)
|
---|
122 | f = _FileIO(TESTFN, 'r')
|
---|
123 | self.assertRaises(TypeError, f.readinto, "")
|
---|
124 | self.assertTrue(not f.closed)
|
---|
125 | f.close()
|
---|
126 | self.assertTrue(f.closed)
|
---|
127 |
|
---|
128 | def testMethods(self):
|
---|
129 | methods = ['fileno', 'isatty', 'read', 'readinto',
|
---|
130 | 'seek', 'tell', 'truncate', 'write', 'seekable',
|
---|
131 | 'readable', 'writable']
|
---|
132 | if sys.platform.startswith('atheos'):
|
---|
133 | methods.remove('truncate')
|
---|
134 |
|
---|
135 | self.f.close()
|
---|
136 | self.assertTrue(self.f.closed)
|
---|
137 |
|
---|
138 | for methodname in methods:
|
---|
139 | method = getattr(self.f, methodname)
|
---|
140 | # should raise on closed file
|
---|
141 | self.assertRaises(ValueError, method)
|
---|
142 |
|
---|
143 | def testOpendir(self):
|
---|
144 | # Issue 3703: opening a directory should fill the errno
|
---|
145 | # Windows always returns "[Errno 13]: Permission denied
|
---|
146 | # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
|
---|
147 | try:
|
---|
148 | _FileIO('.', 'r')
|
---|
149 | except IOError as e:
|
---|
150 | self.assertNotEqual(e.errno, 0)
|
---|
151 | self.assertEqual(e.filename, ".")
|
---|
152 | else:
|
---|
153 | self.fail("Should have raised IOError")
|
---|
154 |
|
---|
155 | @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
|
---|
156 | def testOpenDirFD(self):
|
---|
157 | fd = os.open('.', os.O_RDONLY)
|
---|
158 | with self.assertRaises(IOError) as cm:
|
---|
159 | _FileIO(fd, 'r')
|
---|
160 | os.close(fd)
|
---|
161 | self.assertEqual(cm.exception.errno, errno.EISDIR)
|
---|
162 |
|
---|
163 | #A set of functions testing that we get expected behaviour if someone has
|
---|
164 | #manually closed the internal file descriptor. First, a decorator:
|
---|
165 | def ClosedFD(func):
|
---|
166 | @wraps(func)
|
---|
167 | def wrapper(self):
|
---|
168 | #forcibly close the fd before invoking the problem function
|
---|
169 | f = self.f
|
---|
170 | os.close(f.fileno())
|
---|
171 | try:
|
---|
172 | func(self, f)
|
---|
173 | finally:
|
---|
174 | try:
|
---|
175 | self.f.close()
|
---|
176 | except IOError:
|
---|
177 | pass
|
---|
178 | return wrapper
|
---|
179 |
|
---|
180 | def ClosedFDRaises(func):
|
---|
181 | @wraps(func)
|
---|
182 | def wrapper(self):
|
---|
183 | #forcibly close the fd before invoking the problem function
|
---|
184 | f = self.f
|
---|
185 | os.close(f.fileno())
|
---|
186 | try:
|
---|
187 | func(self, f)
|
---|
188 | except IOError as e:
|
---|
189 | self.assertEqual(e.errno, errno.EBADF)
|
---|
190 | else:
|
---|
191 | self.fail("Should have raised IOError")
|
---|
192 | finally:
|
---|
193 | try:
|
---|
194 | self.f.close()
|
---|
195 | except IOError:
|
---|
196 | pass
|
---|
197 | return wrapper
|
---|
198 |
|
---|
199 | @ClosedFDRaises
|
---|
200 | def testErrnoOnClose(self, f):
|
---|
201 | f.close()
|
---|
202 |
|
---|
203 | @ClosedFDRaises
|
---|
204 | def testErrnoOnClosedWrite(self, f):
|
---|
205 | f.write('a')
|
---|
206 |
|
---|
207 | @ClosedFDRaises
|
---|
208 | def testErrnoOnClosedSeek(self, f):
|
---|
209 | f.seek(0)
|
---|
210 |
|
---|
211 | @ClosedFDRaises
|
---|
212 | def testErrnoOnClosedTell(self, f):
|
---|
213 | f.tell()
|
---|
214 |
|
---|
215 | @ClosedFDRaises
|
---|
216 | def testErrnoOnClosedTruncate(self, f):
|
---|
217 | f.truncate(0)
|
---|
218 |
|
---|
219 | @ClosedFD
|
---|
220 | def testErrnoOnClosedSeekable(self, f):
|
---|
221 | f.seekable()
|
---|
222 |
|
---|
223 | @ClosedFD
|
---|
224 | def testErrnoOnClosedReadable(self, f):
|
---|
225 | f.readable()
|
---|
226 |
|
---|
227 | @ClosedFD
|
---|
228 | def testErrnoOnClosedWritable(self, f):
|
---|
229 | f.writable()
|
---|
230 |
|
---|
231 | @ClosedFD
|
---|
232 | def testErrnoOnClosedFileno(self, f):
|
---|
233 | f.fileno()
|
---|
234 |
|
---|
235 | @ClosedFD
|
---|
236 | def testErrnoOnClosedIsatty(self, f):
|
---|
237 | self.assertEqual(f.isatty(), False)
|
---|
238 |
|
---|
239 | def ReopenForRead(self):
|
---|
240 | try:
|
---|
241 | self.f.close()
|
---|
242 | except IOError:
|
---|
243 | pass
|
---|
244 | self.f = _FileIO(TESTFN, 'r')
|
---|
245 | os.close(self.f.fileno())
|
---|
246 | return self.f
|
---|
247 |
|
---|
248 | @ClosedFDRaises
|
---|
249 | def testErrnoOnClosedRead(self, f):
|
---|
250 | f = self.ReopenForRead()
|
---|
251 | f.read(1)
|
---|
252 |
|
---|
253 | @ClosedFDRaises
|
---|
254 | def testErrnoOnClosedReadall(self, f):
|
---|
255 | f = self.ReopenForRead()
|
---|
256 | f.readall()
|
---|
257 |
|
---|
258 | @ClosedFDRaises
|
---|
259 | def testErrnoOnClosedReadinto(self, f):
|
---|
260 | f = self.ReopenForRead()
|
---|
261 | a = array(b'b', b'x'*10)
|
---|
262 | f.readinto(a)
|
---|
263 |
|
---|
264 | class OtherFileTests(unittest.TestCase):
|
---|
265 |
|
---|
266 | def testAbles(self):
|
---|
267 | try:
|
---|
268 | f = _FileIO(TESTFN, "w")
|
---|
269 | self.assertEqual(f.readable(), False)
|
---|
270 | self.assertEqual(f.writable(), True)
|
---|
271 | self.assertEqual(f.seekable(), True)
|
---|
272 | f.close()
|
---|
273 |
|
---|
274 | f = _FileIO(TESTFN, "r")
|
---|
275 | self.assertEqual(f.readable(), True)
|
---|
276 | self.assertEqual(f.writable(), False)
|
---|
277 | self.assertEqual(f.seekable(), True)
|
---|
278 | f.close()
|
---|
279 |
|
---|
280 | f = _FileIO(TESTFN, "a+")
|
---|
281 | self.assertEqual(f.readable(), True)
|
---|
282 | self.assertEqual(f.writable(), True)
|
---|
283 | self.assertEqual(f.seekable(), True)
|
---|
284 | self.assertEqual(f.isatty(), False)
|
---|
285 | f.close()
|
---|
286 |
|
---|
287 | if sys.platform != "win32":
|
---|
288 | try:
|
---|
289 | f = _FileIO("/dev/tty", "a")
|
---|
290 | except EnvironmentError:
|
---|
291 | # When run in a cron job there just aren't any
|
---|
292 | # ttys, so skip the test. This also handles other
|
---|
293 | # OS'es that don't support /dev/tty.
|
---|
294 | pass
|
---|
295 | else:
|
---|
296 | self.assertEqual(f.readable(), False)
|
---|
297 | self.assertEqual(f.writable(), True)
|
---|
298 | if sys.platform != "darwin" and \
|
---|
299 | 'bsd' not in sys.platform and \
|
---|
300 | not sys.platform.startswith('sunos'):
|
---|
301 | # Somehow /dev/tty appears seekable on some BSDs
|
---|
302 | self.assertEqual(f.seekable(), False)
|
---|
303 | self.assertEqual(f.isatty(), True)
|
---|
304 | f.close()
|
---|
305 | finally:
|
---|
306 | os.unlink(TESTFN)
|
---|
307 |
|
---|
308 | def testInvalidModeStrings(self):
|
---|
309 | # check invalid mode strings
|
---|
310 | for mode in ("", "aU", "wU+", "rw", "rt"):
|
---|
311 | try:
|
---|
312 | f = _FileIO(TESTFN, mode)
|
---|
313 | except ValueError:
|
---|
314 | pass
|
---|
315 | else:
|
---|
316 | f.close()
|
---|
317 | self.fail('%r is an invalid file mode' % mode)
|
---|
318 |
|
---|
319 | def testModeStrings(self):
|
---|
320 | # test that the mode attribute is correct for various mode strings
|
---|
321 | # given as init args
|
---|
322 | try:
|
---|
323 | for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
|
---|
324 | ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
|
---|
325 | ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
|
---|
326 | ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
|
---|
327 | # read modes are last so that TESTFN will exist first
|
---|
328 | with _FileIO(TESTFN, modes[0]) as f:
|
---|
329 | self.assertEqual(f.mode, modes[1])
|
---|
330 | finally:
|
---|
331 | if os.path.exists(TESTFN):
|
---|
332 | os.unlink(TESTFN)
|
---|
333 |
|
---|
334 | def testUnicodeOpen(self):
|
---|
335 | # verify repr works for unicode too
|
---|
336 | f = _FileIO(str(TESTFN), "w")
|
---|
337 | f.close()
|
---|
338 | os.unlink(TESTFN)
|
---|
339 |
|
---|
340 | def testBytesOpen(self):
|
---|
341 | # Opening a bytes filename
|
---|
342 | try:
|
---|
343 | fn = TESTFN.encode("ascii")
|
---|
344 | except UnicodeEncodeError:
|
---|
345 | # Skip test
|
---|
346 | return
|
---|
347 | f = _FileIO(fn, "w")
|
---|
348 | try:
|
---|
349 | f.write(b"abc")
|
---|
350 | f.close()
|
---|
351 | with open(TESTFN, "rb") as f:
|
---|
352 | self.assertEqual(f.read(), b"abc")
|
---|
353 | finally:
|
---|
354 | os.unlink(TESTFN)
|
---|
355 |
|
---|
356 | def testInvalidFd(self):
|
---|
357 | self.assertRaises(ValueError, _FileIO, -10)
|
---|
358 | self.assertRaises(OSError, _FileIO, make_bad_fd())
|
---|
359 | if sys.platform == 'win32':
|
---|
360 | import msvcrt
|
---|
361 | self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
|
---|
362 | # Issue 15989
|
---|
363 | self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
|
---|
364 | self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
|
---|
365 |
|
---|
366 | def testBadModeArgument(self):
|
---|
367 | # verify that we get a sensible error message for bad mode argument
|
---|
368 | bad_mode = "qwerty"
|
---|
369 | try:
|
---|
370 | f = _FileIO(TESTFN, bad_mode)
|
---|
371 | except ValueError as msg:
|
---|
372 | if msg.args[0] != 0:
|
---|
373 | s = str(msg)
|
---|
374 | if TESTFN in s or bad_mode not in s:
|
---|
375 | self.fail("bad error message for invalid mode: %s" % s)
|
---|
376 | # if msg.args[0] == 0, we're probably on Windows where there may be
|
---|
377 | # no obvious way to discover why open() failed.
|
---|
378 | else:
|
---|
379 | f.close()
|
---|
380 | self.fail("no error for invalid mode: %s" % bad_mode)
|
---|
381 |
|
---|
382 | def testTruncate(self):
|
---|
383 | f = _FileIO(TESTFN, 'w')
|
---|
384 | f.write(bytes(bytearray(range(10))))
|
---|
385 | self.assertEqual(f.tell(), 10)
|
---|
386 | f.truncate(5)
|
---|
387 | self.assertEqual(f.tell(), 10)
|
---|
388 | self.assertEqual(f.seek(0, os.SEEK_END), 5)
|
---|
389 | f.truncate(15)
|
---|
390 | self.assertEqual(f.tell(), 5)
|
---|
391 | self.assertEqual(f.seek(0, os.SEEK_END), 15)
|
---|
392 | f.close()
|
---|
393 |
|
---|
394 | def testTruncateOnWindows(self):
|
---|
395 | def bug801631():
|
---|
396 | # SF bug <http://www.python.org/sf/801631>
|
---|
397 | # "file.truncate fault on windows"
|
---|
398 | f = _FileIO(TESTFN, 'w')
|
---|
399 | f.write(bytes(range(11)))
|
---|
400 | f.close()
|
---|
401 |
|
---|
402 | f = _FileIO(TESTFN,'r+')
|
---|
403 | data = f.read(5)
|
---|
404 | if data != bytes(range(5)):
|
---|
405 | self.fail("Read on file opened for update failed %r" % data)
|
---|
406 | if f.tell() != 5:
|
---|
407 | self.fail("File pos after read wrong %d" % f.tell())
|
---|
408 |
|
---|
409 | f.truncate()
|
---|
410 | if f.tell() != 5:
|
---|
411 | self.fail("File pos after ftruncate wrong %d" % f.tell())
|
---|
412 |
|
---|
413 | f.close()
|
---|
414 | size = os.path.getsize(TESTFN)
|
---|
415 | if size != 5:
|
---|
416 | self.fail("File size after ftruncate wrong %d" % size)
|
---|
417 |
|
---|
418 | try:
|
---|
419 | bug801631()
|
---|
420 | finally:
|
---|
421 | os.unlink(TESTFN)
|
---|
422 |
|
---|
423 | def testAppend(self):
|
---|
424 | try:
|
---|
425 | f = open(TESTFN, 'wb')
|
---|
426 | f.write(b'spam')
|
---|
427 | f.close()
|
---|
428 | f = open(TESTFN, 'ab')
|
---|
429 | f.write(b'eggs')
|
---|
430 | f.close()
|
---|
431 | f = open(TESTFN, 'rb')
|
---|
432 | d = f.read()
|
---|
433 | f.close()
|
---|
434 | self.assertEqual(d, b'spameggs')
|
---|
435 | finally:
|
---|
436 | try:
|
---|
437 | os.unlink(TESTFN)
|
---|
438 | except:
|
---|
439 | pass
|
---|
440 |
|
---|
441 | def testInvalidInit(self):
|
---|
442 | self.assertRaises(TypeError, _FileIO, "1", 0, 0)
|
---|
443 |
|
---|
444 | def testWarnings(self):
|
---|
445 | with check_warnings(quiet=True) as w:
|
---|
446 | self.assertEqual(w.warnings, [])
|
---|
447 | self.assertRaises(TypeError, _FileIO, [])
|
---|
448 | self.assertEqual(w.warnings, [])
|
---|
449 | self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
|
---|
450 | self.assertEqual(w.warnings, [])
|
---|
451 |
|
---|
452 | def test_surrogates(self):
|
---|
453 | # Issue #8438: try to open a filename containing surrogates.
|
---|
454 | # It should either fail because the file doesn't exist or the filename
|
---|
455 | # can't be represented using the filesystem encoding, but not because
|
---|
456 | # of a LookupError for the error handler "surrogateescape".
|
---|
457 | filename = u'\udc80.txt'
|
---|
458 | try:
|
---|
459 | with _FileIO(filename):
|
---|
460 | pass
|
---|
461 | except (UnicodeEncodeError, IOError):
|
---|
462 | pass
|
---|
463 | # Spawn a separate Python process with a different "file system
|
---|
464 | # default encoding", to exercise this further.
|
---|
465 | env = dict(os.environ)
|
---|
466 | env[b'LC_CTYPE'] = b'C'
|
---|
467 | _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
|
---|
468 | if ('UnicodeEncodeError' not in out and not
|
---|
469 | ( ('IOError: [Errno 2] No such file or directory' in out) or
|
---|
470 | ('IOError: [Errno 22] Invalid argument' in out) ) ):
|
---|
471 | self.fail('Bad output: %r' % out)
|
---|
472 |
|
---|
473 | def testUnclosedFDOnException(self):
|
---|
474 | class MyException(Exception): pass
|
---|
475 | class MyFileIO(_FileIO):
|
---|
476 | def __setattr__(self, name, value):
|
---|
477 | if name == "name":
|
---|
478 | raise MyException("blocked setting name")
|
---|
479 | return super(MyFileIO, self).__setattr__(name, value)
|
---|
480 | fd = os.open(__file__, os.O_RDONLY)
|
---|
481 | self.assertRaises(MyException, MyFileIO, fd)
|
---|
482 | os.close(fd) # should not raise OSError(EBADF)
|
---|
483 |
|
---|
484 | def test_main():
|
---|
485 | # Historically, these tests have been sloppy about removing TESTFN.
|
---|
486 | # So get rid of it no matter what.
|
---|
487 | try:
|
---|
488 | run_unittest(AutoFileTests, OtherFileTests)
|
---|
489 | finally:
|
---|
490 | if os.path.exists(TESTFN):
|
---|
491 | os.unlink(TESTFN)
|
---|
492 |
|
---|
493 | if __name__ == '__main__':
|
---|
494 | test_main()
|
---|