1 | import sys
|
---|
2 | import os
|
---|
3 | import unittest
|
---|
4 | import itertools
|
---|
5 | import select
|
---|
6 | import signal
|
---|
7 | import subprocess
|
---|
8 | import time
|
---|
9 | from array import array
|
---|
10 | from weakref import proxy
|
---|
11 | try:
|
---|
12 | import threading
|
---|
13 | except ImportError:
|
---|
14 | threading = None
|
---|
15 |
|
---|
16 | from test import test_support
|
---|
17 | from test.test_support import TESTFN, run_unittest
|
---|
18 | from UserList import UserList
|
---|
19 |
|
---|
20 | class AutoFileTests(unittest.TestCase):
|
---|
21 | # file tests for which a test file is automatically set up
|
---|
22 |
|
---|
23 | def setUp(self):
|
---|
24 | self.f = open(TESTFN, 'wb')
|
---|
25 |
|
---|
26 | def tearDown(self):
|
---|
27 | if self.f:
|
---|
28 | self.f.close()
|
---|
29 | os.remove(TESTFN)
|
---|
30 |
|
---|
31 | def testWeakRefs(self):
|
---|
32 | # verify weak references
|
---|
33 | p = proxy(self.f)
|
---|
34 | p.write('teststring')
|
---|
35 | self.assertEqual(self.f.tell(), p.tell())
|
---|
36 | self.f.close()
|
---|
37 | self.f = None
|
---|
38 | self.assertRaises(ReferenceError, getattr, p, 'tell')
|
---|
39 |
|
---|
40 | def testAttributes(self):
|
---|
41 | # verify expected attributes exist
|
---|
42 | f = self.f
|
---|
43 | with test_support.check_py3k_warnings():
|
---|
44 | softspace = f.softspace
|
---|
45 | f.name # merely shouldn't blow up
|
---|
46 | f.mode # ditto
|
---|
47 | f.closed # ditto
|
---|
48 |
|
---|
49 | with test_support.check_py3k_warnings():
|
---|
50 | # verify softspace is writable
|
---|
51 | f.softspace = softspace # merely shouldn't blow up
|
---|
52 |
|
---|
53 | # verify the others aren't
|
---|
54 | for attr in 'name', 'mode', 'closed':
|
---|
55 | self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
|
---|
56 |
|
---|
57 | def testReadinto(self):
|
---|
58 | # verify readinto
|
---|
59 | self.f.write('12')
|
---|
60 | self.f.close()
|
---|
61 | a = array('c', 'x'*10)
|
---|
62 | self.f = open(TESTFN, 'rb')
|
---|
63 | n = self.f.readinto(a)
|
---|
64 | self.assertEqual('12', a.tostring()[:n])
|
---|
65 |
|
---|
66 | def testWritelinesUserList(self):
|
---|
67 | # verify writelines with instance sequence
|
---|
68 | l = UserList(['1', '2'])
|
---|
69 | self.f.writelines(l)
|
---|
70 | self.f.close()
|
---|
71 | self.f = open(TESTFN, 'rb')
|
---|
72 | buf = self.f.read()
|
---|
73 | self.assertEqual(buf, '12')
|
---|
74 |
|
---|
75 | def testWritelinesIntegers(self):
|
---|
76 | # verify writelines with integers
|
---|
77 | self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
|
---|
78 |
|
---|
79 | def testWritelinesIntegersUserList(self):
|
---|
80 | # verify writelines with integers in UserList
|
---|
81 | l = UserList([1,2,3])
|
---|
82 | self.assertRaises(TypeError, self.f.writelines, l)
|
---|
83 |
|
---|
84 | def testWritelinesNonString(self):
|
---|
85 | # verify writelines with non-string object
|
---|
86 | class NonString:
|
---|
87 | pass
|
---|
88 |
|
---|
89 | self.assertRaises(TypeError, self.f.writelines,
|
---|
90 | [NonString(), NonString()])
|
---|
91 |
|
---|
92 | def testRepr(self):
|
---|
93 | # verify repr works
|
---|
94 | self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
|
---|
95 | # see issue #14161
|
---|
96 | # Windows doesn't like \r\n\t" in the file name, but ' is ok
|
---|
97 | fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
|
---|
98 | with open(fname, 'w') as f:
|
---|
99 | self.addCleanup(os.remove, fname)
|
---|
100 | self.assertTrue(repr(f).startswith(
|
---|
101 | "<open file %r, mode 'w' at" % fname))
|
---|
102 |
|
---|
103 | def testErrors(self):
|
---|
104 | self.f.close()
|
---|
105 | self.f = open(TESTFN, 'rb')
|
---|
106 | f = self.f
|
---|
107 | self.assertEqual(f.name, TESTFN)
|
---|
108 | self.assertTrue(not f.isatty())
|
---|
109 | self.assertTrue(not f.closed)
|
---|
110 |
|
---|
111 | self.assertRaises(TypeError, f.readinto, "")
|
---|
112 | f.close()
|
---|
113 | self.assertTrue(f.closed)
|
---|
114 |
|
---|
115 | def testMethods(self):
|
---|
116 | methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
|
---|
117 | 'readline', 'readlines', 'seek', 'tell', 'truncate',
|
---|
118 | 'write', '__iter__']
|
---|
119 | deprecated_methods = ['xreadlines']
|
---|
120 | if sys.platform.startswith('atheos'):
|
---|
121 | methods.remove('truncate')
|
---|
122 |
|
---|
123 | # __exit__ should close the file
|
---|
124 | self.f.__exit__(None, None, None)
|
---|
125 | self.assertTrue(self.f.closed)
|
---|
126 |
|
---|
127 | for methodname in methods:
|
---|
128 | method = getattr(self.f, methodname)
|
---|
129 | # should raise on closed file
|
---|
130 | self.assertRaises(ValueError, method)
|
---|
131 | with test_support.check_py3k_warnings():
|
---|
132 | for methodname in deprecated_methods:
|
---|
133 | method = getattr(self.f, methodname)
|
---|
134 | self.assertRaises(ValueError, method)
|
---|
135 | self.assertRaises(ValueError, self.f.writelines, [])
|
---|
136 |
|
---|
137 | # file is closed, __exit__ shouldn't do anything
|
---|
138 | self.assertEqual(self.f.__exit__(None, None, None), None)
|
---|
139 | # it must also return None if an exception was given
|
---|
140 | try:
|
---|
141 | 1 // 0
|
---|
142 | except:
|
---|
143 | self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
|
---|
144 |
|
---|
145 | def testReadWhenWriting(self):
|
---|
146 | self.assertRaises(IOError, self.f.read)
|
---|
147 |
|
---|
148 | def testNastyWritelinesGenerator(self):
|
---|
149 | def nasty():
|
---|
150 | for i in range(5):
|
---|
151 | if i == 3:
|
---|
152 | self.f.close()
|
---|
153 | yield str(i)
|
---|
154 | self.assertRaises(ValueError, self.f.writelines, nasty())
|
---|
155 |
|
---|
156 | def testIssue5677(self):
|
---|
157 | # Remark: Do not perform more than one test per open file,
|
---|
158 | # since that does NOT catch the readline error on Windows.
|
---|
159 | data = 'xxx'
|
---|
160 | for mode in ['w', 'wb', 'a', 'ab']:
|
---|
161 | for attr in ['read', 'readline', 'readlines']:
|
---|
162 | self.f = open(TESTFN, mode)
|
---|
163 | self.f.write(data)
|
---|
164 | self.assertRaises(IOError, getattr(self.f, attr))
|
---|
165 | self.f.close()
|
---|
166 |
|
---|
167 | self.f = open(TESTFN, mode)
|
---|
168 | self.f.write(data)
|
---|
169 | self.assertRaises(IOError, lambda: [line for line in self.f])
|
---|
170 | self.f.close()
|
---|
171 |
|
---|
172 | self.f = open(TESTFN, mode)
|
---|
173 | self.f.write(data)
|
---|
174 | self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
|
---|
175 | self.f.close()
|
---|
176 |
|
---|
177 | for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
|
---|
178 | self.f = open(TESTFN, mode)
|
---|
179 | self.assertRaises(IOError, self.f.write, data)
|
---|
180 | self.f.close()
|
---|
181 |
|
---|
182 | self.f = open(TESTFN, mode)
|
---|
183 | self.assertRaises(IOError, self.f.writelines, [data, data])
|
---|
184 | self.f.close()
|
---|
185 |
|
---|
186 | self.f = open(TESTFN, mode)
|
---|
187 | self.assertRaises(IOError, self.f.truncate)
|
---|
188 | self.f.close()
|
---|
189 |
|
---|
190 | class OtherFileTests(unittest.TestCase):
|
---|
191 |
|
---|
192 | def testOpenDir(self):
|
---|
193 | this_dir = os.path.dirname(__file__) or os.curdir
|
---|
194 | for mode in (None, "w"):
|
---|
195 | try:
|
---|
196 | if mode:
|
---|
197 | f = open(this_dir, mode)
|
---|
198 | else:
|
---|
199 | f = open(this_dir)
|
---|
200 | except IOError as e:
|
---|
201 | self.assertEqual(e.filename, this_dir)
|
---|
202 | else:
|
---|
203 | self.fail("opening a directory didn't raise an IOError")
|
---|
204 |
|
---|
205 | def testModeStrings(self):
|
---|
206 | # check invalid mode strings
|
---|
207 | for mode in ("", "aU", "wU+"):
|
---|
208 | try:
|
---|
209 | f = open(TESTFN, mode)
|
---|
210 | except ValueError:
|
---|
211 | pass
|
---|
212 | else:
|
---|
213 | f.close()
|
---|
214 | self.fail('%r is an invalid file mode' % mode)
|
---|
215 |
|
---|
216 | # Some invalid modes fail on Windows, but pass on Unix
|
---|
217 | # Issue3965: avoid a crash on Windows when filename is unicode
|
---|
218 | for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
|
---|
219 | try:
|
---|
220 | f = open(name, "rr")
|
---|
221 | except (IOError, ValueError):
|
---|
222 | pass
|
---|
223 | else:
|
---|
224 | f.close()
|
---|
225 |
|
---|
226 | def testStdin(self):
|
---|
227 | # This causes the interpreter to exit on OSF1 v5.1.
|
---|
228 | if sys.platform != 'osf1V5':
|
---|
229 | self.assertRaises(IOError, sys.stdin.seek, -1)
|
---|
230 | else:
|
---|
231 | print >>sys.__stdout__, (
|
---|
232 | ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
|
---|
233 | ' Test manually.')
|
---|
234 | self.assertRaises(IOError, sys.stdin.truncate)
|
---|
235 |
|
---|
236 | def testUnicodeOpen(self):
|
---|
237 | # verify repr works for unicode too
|
---|
238 | f = open(unicode(TESTFN), "w")
|
---|
239 | self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
|
---|
240 | f.close()
|
---|
241 | os.unlink(TESTFN)
|
---|
242 |
|
---|
243 | def testBadModeArgument(self):
|
---|
244 | # verify that we get a sensible error message for bad mode argument
|
---|
245 | bad_mode = "qwerty"
|
---|
246 | try:
|
---|
247 | f = open(TESTFN, bad_mode)
|
---|
248 | except ValueError, msg:
|
---|
249 | if msg.args[0] != 0:
|
---|
250 | s = str(msg)
|
---|
251 | if TESTFN in s or bad_mode not in s:
|
---|
252 | self.fail("bad error message for invalid mode: %s" % s)
|
---|
253 | # if msg.args[0] == 0, we're probably on Windows where there may
|
---|
254 | # be no obvious way to discover why open() failed.
|
---|
255 | else:
|
---|
256 | f.close()
|
---|
257 | self.fail("no error for invalid mode: %s" % bad_mode)
|
---|
258 |
|
---|
259 | def testSetBufferSize(self):
|
---|
260 | # make sure that explicitly setting the buffer size doesn't cause
|
---|
261 | # misbehaviour especially with repeated close() calls
|
---|
262 | for s in (-1, 0, 1, 512):
|
---|
263 | try:
|
---|
264 | f = open(TESTFN, 'w', s)
|
---|
265 | f.write(str(s))
|
---|
266 | f.close()
|
---|
267 | f.close()
|
---|
268 | f = open(TESTFN, 'r', s)
|
---|
269 | d = int(f.read())
|
---|
270 | f.close()
|
---|
271 | f.close()
|
---|
272 | except IOError, msg:
|
---|
273 | self.fail('error setting buffer size %d: %s' % (s, str(msg)))
|
---|
274 | self.assertEqual(d, s)
|
---|
275 |
|
---|
276 | def testTruncateOnWindows(self):
|
---|
277 | os.unlink(TESTFN)
|
---|
278 |
|
---|
279 | def bug801631():
|
---|
280 | # SF bug <http://www.python.org/sf/801631>
|
---|
281 | # "file.truncate fault on windows"
|
---|
282 | f = open(TESTFN, 'wb')
|
---|
283 | f.write('12345678901') # 11 bytes
|
---|
284 | f.close()
|
---|
285 |
|
---|
286 | f = open(TESTFN,'rb+')
|
---|
287 | data = f.read(5)
|
---|
288 | if data != '12345':
|
---|
289 | self.fail("Read on file opened for update failed %r" % data)
|
---|
290 | if f.tell() != 5:
|
---|
291 | self.fail("File pos after read wrong %d" % f.tell())
|
---|
292 |
|
---|
293 | f.truncate()
|
---|
294 | if f.tell() != 5:
|
---|
295 | self.fail("File pos after ftruncate wrong %d" % f.tell())
|
---|
296 |
|
---|
297 | f.close()
|
---|
298 | size = os.path.getsize(TESTFN)
|
---|
299 | if size != 5:
|
---|
300 | self.fail("File size after ftruncate wrong %d" % size)
|
---|
301 |
|
---|
302 | try:
|
---|
303 | bug801631()
|
---|
304 | finally:
|
---|
305 | os.unlink(TESTFN)
|
---|
306 |
|
---|
307 | def testIteration(self):
|
---|
308 | # Test the complex interaction when mixing file-iteration and the
|
---|
309 | # various read* methods. Ostensibly, the mixture could just be tested
|
---|
310 | # to work when it should work according to the Python language,
|
---|
311 | # instead of fail when it should fail according to the current CPython
|
---|
312 | # implementation. People don't always program Python the way they
|
---|
313 | # should, though, and the implemenation might change in subtle ways,
|
---|
314 | # so we explicitly test for errors, too; the test will just have to
|
---|
315 | # be updated when the implementation changes.
|
---|
316 | dataoffset = 16384
|
---|
317 | filler = "ham\n"
|
---|
318 | assert not dataoffset % len(filler), \
|
---|
319 | "dataoffset must be multiple of len(filler)"
|
---|
320 | nchunks = dataoffset // len(filler)
|
---|
321 | testlines = [
|
---|
322 | "spam, spam and eggs\n",
|
---|
323 | "eggs, spam, ham and spam\n",
|
---|
324 | "saussages, spam, spam and eggs\n",
|
---|
325 | "spam, ham, spam and eggs\n",
|
---|
326 | "spam, spam, spam, spam, spam, ham, spam\n",
|
---|
327 | "wonderful spaaaaaam.\n"
|
---|
328 | ]
|
---|
329 | methods = [("readline", ()), ("read", ()), ("readlines", ()),
|
---|
330 | ("readinto", (array("c", " "*100),))]
|
---|
331 |
|
---|
332 | try:
|
---|
333 | # Prepare the testfile
|
---|
334 | bag = open(TESTFN, "w")
|
---|
335 | bag.write(filler * nchunks)
|
---|
336 | bag.writelines(testlines)
|
---|
337 | bag.close()
|
---|
338 | # Test for appropriate errors mixing read* and iteration
|
---|
339 | for methodname, args in methods:
|
---|
340 | f = open(TESTFN)
|
---|
341 | if f.next() != filler:
|
---|
342 | self.fail, "Broken testfile"
|
---|
343 | meth = getattr(f, methodname)
|
---|
344 | try:
|
---|
345 | meth(*args)
|
---|
346 | except ValueError:
|
---|
347 | pass
|
---|
348 | else:
|
---|
349 | self.fail("%s%r after next() didn't raise ValueError" %
|
---|
350 | (methodname, args))
|
---|
351 | f.close()
|
---|
352 |
|
---|
353 | # Test to see if harmless (by accident) mixing of read* and
|
---|
354 | # iteration still works. This depends on the size of the internal
|
---|
355 | # iteration buffer (currently 8192,) but we can test it in a
|
---|
356 | # flexible manner. Each line in the bag o' ham is 4 bytes
|
---|
357 | # ("h", "a", "m", "\n"), so 4096 lines of that should get us
|
---|
358 | # exactly on the buffer boundary for any power-of-2 buffersize
|
---|
359 | # between 4 and 16384 (inclusive).
|
---|
360 | f = open(TESTFN)
|
---|
361 | for i in range(nchunks):
|
---|
362 | f.next()
|
---|
363 | testline = testlines.pop(0)
|
---|
364 | try:
|
---|
365 | line = f.readline()
|
---|
366 | except ValueError:
|
---|
367 | self.fail("readline() after next() with supposedly empty "
|
---|
368 | "iteration-buffer failed anyway")
|
---|
369 | if line != testline:
|
---|
370 | self.fail("readline() after next() with empty buffer "
|
---|
371 | "failed. Got %r, expected %r" % (line, testline))
|
---|
372 | testline = testlines.pop(0)
|
---|
373 | buf = array("c", "\x00" * len(testline))
|
---|
374 | try:
|
---|
375 | f.readinto(buf)
|
---|
376 | except ValueError:
|
---|
377 | self.fail("readinto() after next() with supposedly empty "
|
---|
378 | "iteration-buffer failed anyway")
|
---|
379 | line = buf.tostring()
|
---|
380 | if line != testline:
|
---|
381 | self.fail("readinto() after next() with empty buffer "
|
---|
382 | "failed. Got %r, expected %r" % (line, testline))
|
---|
383 |
|
---|
384 | testline = testlines.pop(0)
|
---|
385 | try:
|
---|
386 | line = f.read(len(testline))
|
---|
387 | except ValueError:
|
---|
388 | self.fail("read() after next() with supposedly empty "
|
---|
389 | "iteration-buffer failed anyway")
|
---|
390 | if line != testline:
|
---|
391 | self.fail("read() after next() with empty buffer "
|
---|
392 | "failed. Got %r, expected %r" % (line, testline))
|
---|
393 | try:
|
---|
394 | lines = f.readlines()
|
---|
395 | except ValueError:
|
---|
396 | self.fail("readlines() after next() with supposedly empty "
|
---|
397 | "iteration-buffer failed anyway")
|
---|
398 | if lines != testlines:
|
---|
399 | self.fail("readlines() after next() with empty buffer "
|
---|
400 | "failed. Got %r, expected %r" % (line, testline))
|
---|
401 | # Reading after iteration hit EOF shouldn't hurt either
|
---|
402 | f = open(TESTFN)
|
---|
403 | try:
|
---|
404 | for line in f:
|
---|
405 | pass
|
---|
406 | try:
|
---|
407 | f.readline()
|
---|
408 | f.readinto(buf)
|
---|
409 | f.read()
|
---|
410 | f.readlines()
|
---|
411 | except ValueError:
|
---|
412 | self.fail("read* failed after next() consumed file")
|
---|
413 | finally:
|
---|
414 | f.close()
|
---|
415 | finally:
|
---|
416 | os.unlink(TESTFN)
|
---|
417 |
|
---|
418 | class FileSubclassTests(unittest.TestCase):
|
---|
419 |
|
---|
420 | def testExit(self):
|
---|
421 | # test that exiting with context calls subclass' close
|
---|
422 | class C(file):
|
---|
423 | def __init__(self, *args):
|
---|
424 | self.subclass_closed = False
|
---|
425 | file.__init__(self, *args)
|
---|
426 | def close(self):
|
---|
427 | self.subclass_closed = True
|
---|
428 | file.close(self)
|
---|
429 |
|
---|
430 | with C(TESTFN, 'w') as f:
|
---|
431 | pass
|
---|
432 | self.assertTrue(f.subclass_closed)
|
---|
433 |
|
---|
434 |
|
---|
435 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
436 | class FileThreadingTests(unittest.TestCase):
|
---|
437 | # These tests check the ability to call various methods of file objects
|
---|
438 | # (including close()) concurrently without crashing the Python interpreter.
|
---|
439 | # See #815646, #595601
|
---|
440 |
|
---|
441 | def setUp(self):
|
---|
442 | self._threads = test_support.threading_setup()
|
---|
443 | self.f = None
|
---|
444 | self.filename = TESTFN
|
---|
445 | with open(self.filename, "w") as f:
|
---|
446 | f.write("\n".join("0123456789"))
|
---|
447 | self._count_lock = threading.Lock()
|
---|
448 | self.close_count = 0
|
---|
449 | self.close_success_count = 0
|
---|
450 | self.use_buffering = False
|
---|
451 |
|
---|
452 | def tearDown(self):
|
---|
453 | if self.f:
|
---|
454 | try:
|
---|
455 | self.f.close()
|
---|
456 | except (EnvironmentError, ValueError):
|
---|
457 | pass
|
---|
458 | try:
|
---|
459 | os.remove(self.filename)
|
---|
460 | except EnvironmentError:
|
---|
461 | pass
|
---|
462 | test_support.threading_cleanup(*self._threads)
|
---|
463 |
|
---|
464 | def _create_file(self):
|
---|
465 | if self.use_buffering:
|
---|
466 | self.f = open(self.filename, "w+", buffering=1024*16)
|
---|
467 | else:
|
---|
468 | self.f = open(self.filename, "w+")
|
---|
469 |
|
---|
470 | def _close_file(self):
|
---|
471 | with self._count_lock:
|
---|
472 | self.close_count += 1
|
---|
473 | self.f.close()
|
---|
474 | with self._count_lock:
|
---|
475 | self.close_success_count += 1
|
---|
476 |
|
---|
477 | def _close_and_reopen_file(self):
|
---|
478 | self._close_file()
|
---|
479 | # if close raises an exception thats fine, self.f remains valid so
|
---|
480 | # we don't need to reopen.
|
---|
481 | self._create_file()
|
---|
482 |
|
---|
483 | def _run_workers(self, func, nb_workers, duration=0.2):
|
---|
484 | with self._count_lock:
|
---|
485 | self.close_count = 0
|
---|
486 | self.close_success_count = 0
|
---|
487 | self.do_continue = True
|
---|
488 | threads = []
|
---|
489 | try:
|
---|
490 | for i in range(nb_workers):
|
---|
491 | t = threading.Thread(target=func)
|
---|
492 | t.start()
|
---|
493 | threads.append(t)
|
---|
494 | for _ in xrange(100):
|
---|
495 | time.sleep(duration/100)
|
---|
496 | with self._count_lock:
|
---|
497 | if self.close_count-self.close_success_count > nb_workers+1:
|
---|
498 | if test_support.verbose:
|
---|
499 | print 'Q',
|
---|
500 | break
|
---|
501 | time.sleep(duration)
|
---|
502 | finally:
|
---|
503 | self.do_continue = False
|
---|
504 | for t in threads:
|
---|
505 | t.join()
|
---|
506 |
|
---|
507 | def _test_close_open_io(self, io_func, nb_workers=5):
|
---|
508 | def worker():
|
---|
509 | self._create_file()
|
---|
510 | funcs = itertools.cycle((
|
---|
511 | lambda: io_func(),
|
---|
512 | lambda: self._close_and_reopen_file(),
|
---|
513 | ))
|
---|
514 | for f in funcs:
|
---|
515 | if not self.do_continue:
|
---|
516 | break
|
---|
517 | try:
|
---|
518 | f()
|
---|
519 | except (IOError, ValueError):
|
---|
520 | pass
|
---|
521 | self._run_workers(worker, nb_workers)
|
---|
522 | if test_support.verbose:
|
---|
523 | # Useful verbose statistics when tuning this test to take
|
---|
524 | # less time to run but still ensuring that its still useful.
|
---|
525 | #
|
---|
526 | # the percent of close calls that raised an error
|
---|
527 | percent = 100. - 100.*self.close_success_count/self.close_count
|
---|
528 | print self.close_count, ('%.4f ' % percent),
|
---|
529 |
|
---|
530 | def test_close_open(self):
|
---|
531 | def io_func():
|
---|
532 | pass
|
---|
533 | self._test_close_open_io(io_func)
|
---|
534 |
|
---|
535 | def test_close_open_flush(self):
|
---|
536 | def io_func():
|
---|
537 | self.f.flush()
|
---|
538 | self._test_close_open_io(io_func)
|
---|
539 |
|
---|
540 | def test_close_open_iter(self):
|
---|
541 | def io_func():
|
---|
542 | list(iter(self.f))
|
---|
543 | self._test_close_open_io(io_func)
|
---|
544 |
|
---|
545 | def test_close_open_isatty(self):
|
---|
546 | def io_func():
|
---|
547 | self.f.isatty()
|
---|
548 | self._test_close_open_io(io_func)
|
---|
549 |
|
---|
550 | def test_close_open_print(self):
|
---|
551 | def io_func():
|
---|
552 | print >> self.f, ''
|
---|
553 | self._test_close_open_io(io_func)
|
---|
554 |
|
---|
555 | def test_close_open_print_buffered(self):
|
---|
556 | self.use_buffering = True
|
---|
557 | def io_func():
|
---|
558 | print >> self.f, ''
|
---|
559 | self._test_close_open_io(io_func)
|
---|
560 |
|
---|
561 | def test_close_open_read(self):
|
---|
562 | def io_func():
|
---|
563 | self.f.read(0)
|
---|
564 | self._test_close_open_io(io_func)
|
---|
565 |
|
---|
566 | def test_close_open_readinto(self):
|
---|
567 | def io_func():
|
---|
568 | a = array('c', 'xxxxx')
|
---|
569 | self.f.readinto(a)
|
---|
570 | self._test_close_open_io(io_func)
|
---|
571 |
|
---|
572 | def test_close_open_readline(self):
|
---|
573 | def io_func():
|
---|
574 | self.f.readline()
|
---|
575 | self._test_close_open_io(io_func)
|
---|
576 |
|
---|
577 | def test_close_open_readlines(self):
|
---|
578 | def io_func():
|
---|
579 | self.f.readlines()
|
---|
580 | self._test_close_open_io(io_func)
|
---|
581 |
|
---|
582 | def test_close_open_seek(self):
|
---|
583 | def io_func():
|
---|
584 | self.f.seek(0, 0)
|
---|
585 | self._test_close_open_io(io_func)
|
---|
586 |
|
---|
587 | def test_close_open_tell(self):
|
---|
588 | def io_func():
|
---|
589 | self.f.tell()
|
---|
590 | self._test_close_open_io(io_func)
|
---|
591 |
|
---|
592 | def test_close_open_truncate(self):
|
---|
593 | def io_func():
|
---|
594 | self.f.truncate()
|
---|
595 | self._test_close_open_io(io_func)
|
---|
596 |
|
---|
597 | def test_close_open_write(self):
|
---|
598 | def io_func():
|
---|
599 | self.f.write('')
|
---|
600 | self._test_close_open_io(io_func)
|
---|
601 |
|
---|
602 | def test_close_open_writelines(self):
|
---|
603 | def io_func():
|
---|
604 | self.f.writelines('')
|
---|
605 | self._test_close_open_io(io_func)
|
---|
606 |
|
---|
607 |
|
---|
608 | @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
|
---|
609 | class TestFileSignalEINTR(unittest.TestCase):
|
---|
610 | def _test_reading(self, data_to_write, read_and_verify_code, method_name,
|
---|
611 | universal_newlines=False):
|
---|
612 | """Generic buffered read method test harness to verify EINTR behavior.
|
---|
613 |
|
---|
614 | Also validates that Python signal handlers are run during the read.
|
---|
615 |
|
---|
616 | Args:
|
---|
617 | data_to_write: String to write to the child process for reading
|
---|
618 | before sending it a signal, confirming the signal was handled,
|
---|
619 | writing a final newline char and closing the infile pipe.
|
---|
620 | read_and_verify_code: Single "line" of code to read from a file
|
---|
621 | object named 'infile' and validate the result. This will be
|
---|
622 | executed as part of a python subprocess fed data_to_write.
|
---|
623 | method_name: The name of the read method being tested, for use in
|
---|
624 | an error message on failure.
|
---|
625 | universal_newlines: If True, infile will be opened in universal
|
---|
626 | newline mode in the child process.
|
---|
627 | """
|
---|
628 | if universal_newlines:
|
---|
629 | # Test the \r\n -> \n conversion while we're at it.
|
---|
630 | data_to_write = data_to_write.replace('\n', '\r\n')
|
---|
631 | infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
|
---|
632 | else:
|
---|
633 | infile_setup_code = 'infile = sys.stdin'
|
---|
634 | # Total pipe IO in this function is smaller than the minimum posix OS
|
---|
635 | # pipe buffer size of 512 bytes. No writer should block.
|
---|
636 | assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
|
---|
637 |
|
---|
638 | child_code = (
|
---|
639 | 'import os, signal, sys ;'
|
---|
640 | 'signal.signal('
|
---|
641 | 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
|
---|
642 | + infile_setup_code + ' ;' +
|
---|
643 | 'assert isinstance(infile, file) ;'
|
---|
644 | 'sys.stderr.write("Go.\\n") ;'
|
---|
645 | + read_and_verify_code)
|
---|
646 | reader_process = subprocess.Popen(
|
---|
647 | [sys.executable, '-c', child_code],
|
---|
648 | stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
---|
649 | stderr=subprocess.PIPE)
|
---|
650 | # Wait for the signal handler to be installed.
|
---|
651 | go = reader_process.stderr.read(4)
|
---|
652 | if go != 'Go.\n':
|
---|
653 | reader_process.kill()
|
---|
654 | self.fail('Error from %s process while awaiting "Go":\n%s' % (
|
---|
655 | method_name, go+reader_process.stderr.read()))
|
---|
656 | reader_process.stdin.write(data_to_write)
|
---|
657 | signals_sent = 0
|
---|
658 | rlist = []
|
---|
659 | # We don't know when the read_and_verify_code in our child is actually
|
---|
660 | # executing within the read system call we want to interrupt. This
|
---|
661 | # loop waits for a bit before sending the first signal to increase
|
---|
662 | # the likelihood of that. Implementations without correct EINTR
|
---|
663 | # and signal handling usually fail this test.
|
---|
664 | while not rlist:
|
---|
665 | rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
|
---|
666 | reader_process.send_signal(signal.SIGINT)
|
---|
667 | # Give the subprocess time to handle it before we loop around and
|
---|
668 | # send another one. On OSX the second signal happening close to
|
---|
669 | # immediately after the first was causing the subprocess to crash
|
---|
670 | # via the OS's default SIGINT handler.
|
---|
671 | time.sleep(0.1)
|
---|
672 | signals_sent += 1
|
---|
673 | if signals_sent > 200:
|
---|
674 | reader_process.kill()
|
---|
675 | self.fail("failed to handle signal during %s." % method_name)
|
---|
676 | # This assumes anything unexpected that writes to stderr will also
|
---|
677 | # write a newline. That is true of the traceback printing code.
|
---|
678 | signal_line = reader_process.stderr.readline()
|
---|
679 | if signal_line != '$\n':
|
---|
680 | reader_process.kill()
|
---|
681 | self.fail('Error from %s process while awaiting signal:\n%s' % (
|
---|
682 | method_name, signal_line+reader_process.stderr.read()))
|
---|
683 | # We append a newline to our input so that a readline call can
|
---|
684 | # end on its own before the EOF is seen.
|
---|
685 | stdout, stderr = reader_process.communicate(input='\n')
|
---|
686 | if reader_process.returncode != 0:
|
---|
687 | self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
|
---|
688 | method_name, reader_process.returncode, stdout, stderr))
|
---|
689 |
|
---|
690 | def test_readline(self, universal_newlines=False):
|
---|
691 | """file.readline must handle signals and not lose data."""
|
---|
692 | self._test_reading(
|
---|
693 | data_to_write='hello, world!',
|
---|
694 | read_and_verify_code=(
|
---|
695 | 'line = infile.readline() ;'
|
---|
696 | 'expected_line = "hello, world!\\n" ;'
|
---|
697 | 'assert line == expected_line, ('
|
---|
698 | '"read %r expected %r" % (line, expected_line))'
|
---|
699 | ),
|
---|
700 | method_name='readline',
|
---|
701 | universal_newlines=universal_newlines)
|
---|
702 |
|
---|
703 | def test_readline_with_universal_newlines(self):
|
---|
704 | self.test_readline(universal_newlines=True)
|
---|
705 |
|
---|
706 | def test_readlines(self, universal_newlines=False):
|
---|
707 | """file.readlines must handle signals and not lose data."""
|
---|
708 | self._test_reading(
|
---|
709 | data_to_write='hello\nworld!',
|
---|
710 | read_and_verify_code=(
|
---|
711 | 'lines = infile.readlines() ;'
|
---|
712 | 'expected_lines = ["hello\\n", "world!\\n"] ;'
|
---|
713 | 'assert lines == expected_lines, ('
|
---|
714 | '"readlines returned wrong data.\\n" '
|
---|
715 | '"got lines %r\\nexpected %r" '
|
---|
716 | '% (lines, expected_lines))'
|
---|
717 | ),
|
---|
718 | method_name='readlines',
|
---|
719 | universal_newlines=universal_newlines)
|
---|
720 |
|
---|
721 | def test_readlines_with_universal_newlines(self):
|
---|
722 | self.test_readlines(universal_newlines=True)
|
---|
723 |
|
---|
724 | def test_readall(self):
|
---|
725 | """Unbounded file.read() must handle signals and not lose data."""
|
---|
726 | self._test_reading(
|
---|
727 | data_to_write='hello, world!abcdefghijklm',
|
---|
728 | read_and_verify_code=(
|
---|
729 | 'data = infile.read() ;'
|
---|
730 | 'expected_data = "hello, world!abcdefghijklm\\n";'
|
---|
731 | 'assert data == expected_data, ('
|
---|
732 | '"read %r expected %r" % (data, expected_data))'
|
---|
733 | ),
|
---|
734 | method_name='unbounded read')
|
---|
735 |
|
---|
736 | def test_readinto(self):
|
---|
737 | """file.readinto must handle signals and not lose data."""
|
---|
738 | self._test_reading(
|
---|
739 | data_to_write='hello, world!',
|
---|
740 | read_and_verify_code=(
|
---|
741 | 'data = bytearray(50) ;'
|
---|
742 | 'num_read = infile.readinto(data) ;'
|
---|
743 | 'expected_data = "hello, world!\\n";'
|
---|
744 | 'assert data[:num_read] == expected_data, ('
|
---|
745 | '"read %r expected %r" % (data, expected_data))'
|
---|
746 | ),
|
---|
747 | method_name='readinto')
|
---|
748 |
|
---|
749 |
|
---|
750 | class StdoutTests(unittest.TestCase):
|
---|
751 |
|
---|
752 | def test_move_stdout_on_write(self):
|
---|
753 | # Issue 3242: sys.stdout can be replaced (and freed) during a
|
---|
754 | # print statement; prevent a segfault in this case
|
---|
755 | save_stdout = sys.stdout
|
---|
756 |
|
---|
757 | class File:
|
---|
758 | def write(self, data):
|
---|
759 | if '\n' in data:
|
---|
760 | sys.stdout = save_stdout
|
---|
761 |
|
---|
762 | try:
|
---|
763 | sys.stdout = File()
|
---|
764 | print "some text"
|
---|
765 | finally:
|
---|
766 | sys.stdout = save_stdout
|
---|
767 |
|
---|
768 | def test_del_stdout_before_print(self):
|
---|
769 | # Issue 4597: 'print' with no argument wasn't reporting when
|
---|
770 | # sys.stdout was deleted.
|
---|
771 | save_stdout = sys.stdout
|
---|
772 | del sys.stdout
|
---|
773 | try:
|
---|
774 | print
|
---|
775 | except RuntimeError as e:
|
---|
776 | self.assertEqual(str(e), "lost sys.stdout")
|
---|
777 | else:
|
---|
778 | self.fail("Expected RuntimeError")
|
---|
779 | finally:
|
---|
780 | sys.stdout = save_stdout
|
---|
781 |
|
---|
782 | def test_unicode(self):
|
---|
783 | import subprocess
|
---|
784 |
|
---|
785 | def get_message(encoding, *code):
|
---|
786 | code = '\n'.join(code)
|
---|
787 | env = os.environ.copy()
|
---|
788 | env['PYTHONIOENCODING'] = encoding
|
---|
789 | process = subprocess.Popen([sys.executable, "-c", code],
|
---|
790 | stdout=subprocess.PIPE, env=env)
|
---|
791 | stdout, stderr = process.communicate()
|
---|
792 | self.assertEqual(process.returncode, 0)
|
---|
793 | return stdout
|
---|
794 |
|
---|
795 | def check_message(text, encoding, expected):
|
---|
796 | stdout = get_message(encoding,
|
---|
797 | "import sys",
|
---|
798 | "sys.stdout.write(%r)" % text,
|
---|
799 | "sys.stdout.flush()")
|
---|
800 | self.assertEqual(stdout, expected)
|
---|
801 |
|
---|
802 | # test the encoding
|
---|
803 | check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
|
---|
804 | check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
|
---|
805 | check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
|
---|
806 |
|
---|
807 | # test the error handler
|
---|
808 | check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
|
---|
809 | check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
|
---|
810 | check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
|
---|
811 |
|
---|
812 | # test the buffer API
|
---|
813 | for objtype in ('buffer', 'bytearray'):
|
---|
814 | stdout = get_message('ascii',
|
---|
815 | 'import sys',
|
---|
816 | r'sys.stdout.write(%s("\xe9"))' % objtype,
|
---|
817 | 'sys.stdout.flush()')
|
---|
818 | self.assertEqual(stdout, "\xe9")
|
---|
819 |
|
---|
820 |
|
---|
821 | def test_main():
|
---|
822 | # Historically, these tests have been sloppy about removing TESTFN.
|
---|
823 | # So get rid of it no matter what.
|
---|
824 | try:
|
---|
825 | run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
|
---|
826 | FileThreadingTests, TestFileSignalEINTR, StdoutTests)
|
---|
827 | finally:
|
---|
828 | if os.path.exists(TESTFN):
|
---|
829 | os.unlink(TESTFN)
|
---|
830 |
|
---|
831 | if __name__ == '__main__':
|
---|
832 | test_main()
|
---|