Changeset 391 for python/trunk/Lib/test/test_file.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_file.py
r2 r391 1 # NOTE: this file tests the new `io` library backported from Python 3.x. 2 # Similar tests for the builtin file object can be found in test_file2k.py. 3 4 from __future__ import print_function 5 1 6 import sys 2 7 import os 3 8 import unittest 4 import itertools5 import time6 import threading7 9 from array import array 8 10 from weakref import proxy 9 11 10 from test import test_support 11 from test.test_support import TESTFN, findfile, run_unittest 12 import io 13 import _pyio as pyio 14 15 from test.test_support import TESTFN, run_unittest 12 16 from UserList import UserList 13 17 … … 16 20 17 21 def setUp(self): 18 self.f = open(TESTFN, 'wb')22 self.f = self.open(TESTFN, 'wb') 19 23 20 24 def tearDown(self): … … 26 30 # verify weak references 27 31 p = proxy(self.f) 28 p.write( 'teststring')29 self.assertEqual s(self.f.tell(), p.tell())32 p.write(b'teststring') 33 self.assertEqual(self.f.tell(), p.tell()) 30 34 self.f.close() 31 35 self.f = None … … 35 39 # verify expected attributes exist 36 40 f = self.f 37 softspace = f.softspace38 41 f.name # merely shouldn't blow up 39 42 f.mode # ditto 40 43 f.closed # ditto 41 44 42 # verify softspace is writable43 f.softspace = softspace # merely shouldn't blow up44 45 # verify the others aren't46 for attr in 'name', 'mode', 'closed':47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')48 49 45 def testReadinto(self): 50 46 # verify readinto 51 self.f.write( '12')47 self.f.write(b'12') 52 48 self.f.close() 53 a = array(' c','x'*10)54 self.f = open(TESTFN, 'rb')49 a = array('b', b'x'*10) 50 self.f = self.open(TESTFN, 'rb') 55 51 n = self.f.readinto(a) 56 self.assertEquals('12', a.tostring()[:n]) 52 self.assertEqual(b'12', a.tostring()[:n]) 53 54 def testReadinto_text(self): 55 # verify readinto refuses text files 56 a = array('b', b'x'*10) 57 self.f.close() 58 self.f = self.open(TESTFN, 'r') 59 if hasattr(self.f, "readinto"): 60 self.assertRaises(TypeError, self.f.readinto, a) 57 61 58 62 def testWritelinesUserList(self): 59 63 # verify writelines with instance sequence 60 l = UserList([ '1','2'])64 l = UserList([b'1', b'2']) 61 65 self.f.writelines(l) 62 66 self.f.close() 63 self.f = open(TESTFN, 'rb')67 self.f = self.open(TESTFN, 'rb') 64 68 buf = self.f.read() 65 self.assertEqual s(buf,'12')69 self.assertEqual(buf, b'12') 66 70 67 71 def testWritelinesIntegers(self): … … 82 86 [NonString(), NonString()]) 83 87 84 def testRepr(self):85 # verify repr works86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))87 88 88 def testErrors(self): 89 self.f.close()90 self.f = open(TESTFN, 'rb')91 89 f = self.f 92 self.assertEquals(f.name, TESTFN) 93 self.assert_(not f.isatty()) 94 self.assert_(not f.closed) 95 96 self.assertRaises(TypeError, f.readinto, "") 90 self.assertEqual(f.name, TESTFN) 91 self.assertTrue(not f.isatty()) 92 self.assertTrue(not f.closed) 93 94 if hasattr(f, "readinto"): 95 self.assertRaises((IOError, TypeError), f.readinto, "") 97 96 f.close() 98 self.assert _(f.closed)97 self.assertTrue(f.closed) 99 98 100 99 def testMethods(self): 101 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 102 'readline', 'readlines', 'seek', 'tell', 'truncate', 103 'write', 'xreadlines', '__iter__'] 104 if sys.platform.startswith('atheos'): 105 methods.remove('truncate') 100 methods = [('fileno', ()), 101 ('flush', ()), 102 ('isatty', ()), 103 ('next', ()), 104 ('read', ()), 105 ('write', (b"",)), 106 ('readline', ()), 107 ('readlines', ()), 108 ('seek', (0,)), 109 ('tell', ()), 110 ('write', (b"",)), 111 ('writelines', ([],)), 112 ('__iter__', ()), 113 ] 114 if not sys.platform.startswith('atheos'): 115 methods.append(('truncate', ())) 106 116 107 117 # __exit__ should close the file 108 118 self.f.__exit__(None, None, None) 109 self.assert _(self.f.closed)110 111 for methodname in methods:119 self.assertTrue(self.f.closed) 120 121 for methodname, args in methods: 112 122 method = getattr(self.f, methodname) 113 123 # should raise on closed file 114 self.assertRaises(ValueError, method) 115 self.assertRaises(ValueError, self.f.writelines, []) 124 self.assertRaises(ValueError, method, *args) 116 125 117 126 # file is closed, __exit__ shouldn't do anything 118 self.assertEqual s(self.f.__exit__(None, None, None), None)127 self.assertEqual(self.f.__exit__(None, None, None), None) 119 128 # it must also return None if an exception was given 120 129 try: 121 1 /0130 1 // 0 122 131 except: 123 self.assertEqual s(self.f.__exit__(*sys.exc_info()), None)132 self.assertEqual(self.f.__exit__(*sys.exc_info()), None) 124 133 125 134 def testReadWhenWriting(self): 126 135 self.assertRaises(IOError, self.f.read) 127 136 128 def testIssue5677(self): 129 # Remark: Do not perform more than one test per open file, 130 # since that does NOT catch the readline error on Windows. 131 data = 'xxx' 132 for mode in ['w', 'wb', 'a', 'ab']: 133 for attr in ['read', 'readline', 'readlines']: 134 self.f = open(TESTFN, mode) 135 self.f.write(data) 136 self.assertRaises(IOError, getattr(self.f, attr)) 137 self.f.close() 138 139 self.f = open(TESTFN, mode) 140 self.f.write(data) 141 self.assertRaises(IOError, lambda: [line for line in self.f]) 142 self.f.close() 143 144 self.f = open(TESTFN, mode) 145 self.f.write(data) 146 self.assertRaises(IOError, self.f.readinto, bytearray(len(data))) 147 self.f.close() 148 149 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']: 150 self.f = open(TESTFN, mode) 151 self.assertRaises(IOError, self.f.write, data) 152 self.f.close() 153 154 self.f = open(TESTFN, mode) 155 self.assertRaises(IOError, self.f.writelines, [data, data]) 156 self.f.close() 157 158 self.f = open(TESTFN, mode) 159 self.assertRaises(IOError, self.f.truncate) 160 self.f.close() 137 class CAutoFileTests(AutoFileTests): 138 open = io.open 139 140 class PyAutoFileTests(AutoFileTests): 141 open = staticmethod(pyio.open) 142 161 143 162 144 class OtherFileTests(unittest.TestCase): 163 164 def testOpenDir(self):165 this_dir = os.path.dirname(__file__)166 for mode in (None, "w"):167 try:168 if mode:169 f = open(this_dir, mode)170 else:171 f = open(this_dir)172 except IOError as e:173 self.assertEqual(e.filename, this_dir)174 else:175 self.fail("opening a directory didn't raise an IOError")176 145 177 146 def testModeStrings(self): … … 179 148 for mode in ("", "aU", "wU+"): 180 149 try: 181 f = open(TESTFN, mode)150 f = self.open(TESTFN, mode) 182 151 except ValueError: 183 152 pass … … 185 154 f.close() 186 155 self.fail('%r is an invalid file mode' % mode) 187 188 # Some invalid modes fail on Windows, but pass on Unix189 # Issue3965: avoid a crash on Windows when filename is unicode190 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):191 try:192 f = open(name, "rr")193 except IOError:194 pass195 else:196 f.close()197 198 def testStdin(self):199 # This causes the interpreter to exit on OSF1 v5.1.200 if sys.platform != 'osf1V5':201 self.assertRaises(IOError, sys.stdin.seek, -1)202 else:203 print >>sys.__stdout__, (204 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'205 ' Test manually.')206 self.assertRaises(IOError, sys.stdin.truncate)207 208 def testUnicodeOpen(self):209 # verify repr works for unicode too210 f = open(unicode(TESTFN), "w")211 self.assert_(repr(f).startswith("<open file u'" + TESTFN))212 f.close()213 os.unlink(TESTFN)214 156 215 157 def testBadModeArgument(self): … … 217 159 bad_mode = "qwerty" 218 160 try: 219 f = open(TESTFN, bad_mode)220 except ValueError ,msg:221 if msg [0] != 0:161 f = self.open(TESTFN, bad_mode) 162 except ValueError as msg: 163 if msg.args[0] != 0: 222 164 s = str(msg) 223 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:165 if TESTFN in s or bad_mode not in s: 224 166 self.fail("bad error message for invalid mode: %s" % s) 225 # if msg [0] == 0, we're probably on Windows where there may be167 # if msg.args[0] == 0, we're probably on Windows where there may be 226 168 # no obvious way to discover why open() failed. 227 169 else: … … 234 176 for s in (-1, 0, 1, 512): 235 177 try: 236 f = open(TESTFN, 'w', s)237 f.write(str(s) )238 f.close() 239 f.close() 240 f = open(TESTFN, 'r', s)241 d = int(f.read() )242 f.close() 243 f.close() 244 except IOError ,msg:178 f = self.open(TESTFN, 'wb', s) 179 f.write(str(s).encode("ascii")) 180 f.close() 181 f.close() 182 f = self.open(TESTFN, 'rb', s) 183 d = int(f.read().decode("ascii")) 184 f.close() 185 f.close() 186 except IOError as msg: 245 187 self.fail('error setting buffer size %d: %s' % (s, str(msg))) 246 self.assertEqual s(d, s)188 self.assertEqual(d, s) 247 189 248 190 def testTruncateOnWindows(self): 191 # SF bug <http://www.python.org/sf/801631> 192 # "file.truncate fault on windows" 193 249 194 os.unlink(TESTFN) 250 251 def bug801631(): 252 # SF bug <http://www.python.org/sf/801631> 253 # "file.truncate fault on windows" 254 f = open(TESTFN, 'wb') 255 f.write('12345678901') # 11 bytes 195 f = self.open(TESTFN, 'wb') 196 197 try: 198 f.write(b'12345678901') # 11 bytes 256 199 f.close() 257 200 258 f = open(TESTFN,'rb+')201 f = self.open(TESTFN,'rb+') 259 202 data = f.read(5) 260 if data != '12345':203 if data != b'12345': 261 204 self.fail("Read on file opened for update failed %r" % data) 262 205 if f.tell() != 5: … … 271 214 if size != 5: 272 215 self.fail("File size after ftruncate wrong %d" % size) 273 274 try:275 bug801631()276 216 finally: 217 f.close() 277 218 os.unlink(TESTFN) 278 219 279 220 def testIteration(self): 280 221 # Test the complex interaction when mixing file-iteration and the 281 # various read* methods. Ostensibly, the mixture could just be tested 282 # to work when it should work according to the Python language, 283 # instead of fail when it should fail according to the current CPython 284 # implementation. People don't always program Python the way they 285 # should, though, and the implemenation might change in subtle ways, 286 # so we explicitly test for errors, too; the test will just have to 287 # be updated when the implementation changes. 222 # various read* methods. 288 223 dataoffset = 16384 289 filler = "ham\n"224 filler = b"ham\n" 290 225 assert not dataoffset % len(filler), \ 291 226 "dataoffset must be multiple of len(filler)" 292 227 nchunks = dataoffset // len(filler) 293 228 testlines = [ 294 "spam, spam and eggs\n",295 "eggs, spam, ham and spam\n",296 "saussages, spam, spam and eggs\n",297 "spam, ham, spam and eggs\n",298 "spam, spam, spam, spam, spam, ham, spam\n",299 "wonderful spaaaaaam.\n"229 b"spam, spam and eggs\n", 230 b"eggs, spam, ham and spam\n", 231 b"saussages, spam, spam and eggs\n", 232 b"spam, ham, spam and eggs\n", 233 b"spam, spam, spam, spam, spam, ham, spam\n", 234 b"wonderful spaaaaaam.\n" 300 235 ] 301 236 methods = [("readline", ()), ("read", ()), ("readlines", ()), 302 ("readinto", (array(" c"," "*100),))]237 ("readinto", (array("b", b" "*100),))] 303 238 304 239 try: 305 240 # Prepare the testfile 306 bag = open(TESTFN, "w")241 bag = self.open(TESTFN, "wb") 307 242 bag.write(filler * nchunks) 308 243 bag.writelines(testlines) … … 310 245 # Test for appropriate errors mixing read* and iteration 311 246 for methodname, args in methods: 312 f = open(TESTFN)313 if f.next() != filler:247 f = self.open(TESTFN, 'rb') 248 if next(f) != filler: 314 249 self.fail, "Broken testfile" 315 250 meth = getattr(f, methodname) 316 try: 317 meth(*args) 318 except ValueError: 319 pass 320 else: 321 self.fail("%s%r after next() didn't raise ValueError" % 322 (methodname, args)) 251 meth(*args) # This simply shouldn't fail 323 252 f.close() 324 253 … … 330 259 # exactly on the buffer boundary for any power-of-2 buffersize 331 260 # between 4 and 16384 (inclusive). 332 f = open(TESTFN)261 f = self.open(TESTFN, 'rb') 333 262 for i in range(nchunks): 334 f.next()263 next(f) 335 264 testline = testlines.pop(0) 336 265 try: … … 343 272 "failed. Got %r, expected %r" % (line, testline)) 344 273 testline = testlines.pop(0) 345 buf = array(" c","\x00" * len(testline))274 buf = array("b", b"\x00" * len(testline)) 346 275 try: 347 276 f.readinto(buf) … … 372 301 "failed. Got %r, expected %r" % (line, testline)) 373 302 # Reading after iteration hit EOF shouldn't hurt either 374 f = open(TESTFN)303 f = self.open(TESTFN, 'rb') 375 304 try: 376 305 for line in f: … … 388 317 os.unlink(TESTFN) 389 318 390 class FileSubclassTests(unittest.TestCase): 391 392 def testExit(self): 393 # test that exiting with context calls subclass' close 394 class C(file): 395 def __init__(self, *args): 396 self.subclass_closed = False 397 file.__init__(self, *args) 398 def close(self): 399 self.subclass_closed = True 400 file.close(self) 401 402 with C(TESTFN, 'w') as f: 403 pass 404 self.failUnless(f.subclass_closed) 405 406 407 class FileThreadingTests(unittest.TestCase): 408 # These tests check the ability to call various methods of file objects 409 # (including close()) concurrently without crashing the Python interpreter. 410 # See #815646, #595601 411 412 def setUp(self): 413 self.f = None 414 self.filename = TESTFN 415 with open(self.filename, "w") as f: 416 f.write("\n".join("0123456789")) 417 self._count_lock = threading.Lock() 418 self.close_count = 0 419 self.close_success_count = 0 420 421 def tearDown(self): 422 if self.f: 423 try: 424 self.f.close() 425 except (EnvironmentError, ValueError): 426 pass 427 try: 428 os.remove(self.filename) 429 except EnvironmentError: 430 pass 431 432 def _create_file(self): 433 self.f = open(self.filename, "w+") 434 435 def _close_file(self): 436 with self._count_lock: 437 self.close_count += 1 438 self.f.close() 439 with self._count_lock: 440 self.close_success_count += 1 441 442 def _close_and_reopen_file(self): 443 self._close_file() 444 # if close raises an exception thats fine, self.f remains valid so 445 # we don't need to reopen. 446 self._create_file() 447 448 def _run_workers(self, func, nb_workers, duration=0.2): 449 with self._count_lock: 450 self.close_count = 0 451 self.close_success_count = 0 452 self.do_continue = True 453 threads = [] 454 try: 455 for i in range(nb_workers): 456 t = threading.Thread(target=func) 457 t.start() 458 threads.append(t) 459 for _ in xrange(100): 460 time.sleep(duration/100) 461 with self._count_lock: 462 if self.close_count-self.close_success_count > nb_workers+1: 463 if test_support.verbose: 464 print 'Q', 465 break 466 time.sleep(duration) 467 finally: 468 self.do_continue = False 469 for t in threads: 470 t.join() 471 472 def _test_close_open_io(self, io_func, nb_workers=5): 473 def worker(): 474 self._create_file() 475 funcs = itertools.cycle(( 476 lambda: io_func(), 477 lambda: self._close_and_reopen_file(), 478 )) 479 for f in funcs: 480 if not self.do_continue: 481 break 482 try: 483 f() 484 except (IOError, ValueError): 485 pass 486 self._run_workers(worker, nb_workers) 487 if test_support.verbose: 488 # Useful verbose statistics when tuning this test to take 489 # less time to run but still ensuring that its still useful. 490 # 491 # the percent of close calls that raised an error 492 percent = 100. - 100.*self.close_success_count/self.close_count 493 print self.close_count, ('%.4f ' % percent), 494 495 def test_close_open(self): 496 def io_func(): 497 pass 498 self._test_close_open_io(io_func) 499 500 def test_close_open_flush(self): 501 def io_func(): 502 self.f.flush() 503 self._test_close_open_io(io_func) 504 505 def test_close_open_iter(self): 506 def io_func(): 507 list(iter(self.f)) 508 self._test_close_open_io(io_func) 509 510 def test_close_open_isatty(self): 511 def io_func(): 512 self.f.isatty() 513 self._test_close_open_io(io_func) 514 515 def test_close_open_print(self): 516 def io_func(): 517 print >> self.f, '' 518 self._test_close_open_io(io_func) 519 520 def test_close_open_read(self): 521 def io_func(): 522 self.f.read(0) 523 self._test_close_open_io(io_func) 524 525 def test_close_open_readinto(self): 526 def io_func(): 527 a = array('c', 'xxxxx') 528 self.f.readinto(a) 529 self._test_close_open_io(io_func) 530 531 def test_close_open_readline(self): 532 def io_func(): 533 self.f.readline() 534 self._test_close_open_io(io_func) 535 536 def test_close_open_readlines(self): 537 def io_func(): 538 self.f.readlines() 539 self._test_close_open_io(io_func) 540 541 def test_close_open_seek(self): 542 def io_func(): 543 self.f.seek(0, 0) 544 self._test_close_open_io(io_func) 545 546 def test_close_open_tell(self): 547 def io_func(): 548 self.f.tell() 549 self._test_close_open_io(io_func) 550 551 def test_close_open_truncate(self): 552 def io_func(): 553 self.f.truncate() 554 self._test_close_open_io(io_func) 555 556 def test_close_open_write(self): 557 def io_func(): 558 self.f.write('') 559 self._test_close_open_io(io_func) 560 561 def test_close_open_writelines(self): 562 def io_func(): 563 self.f.writelines('') 564 self._test_close_open_io(io_func) 565 566 567 class StdoutTests(unittest.TestCase): 568 569 def test_move_stdout_on_write(self): 570 # Issue 3242: sys.stdout can be replaced (and freed) during a 571 # print statement; prevent a segfault in this case 572 save_stdout = sys.stdout 573 574 class File: 575 def write(self, data): 576 if '\n' in data: 577 sys.stdout = save_stdout 578 579 try: 580 sys.stdout = File() 581 print "some text" 582 finally: 583 sys.stdout = save_stdout 584 585 def test_del_stdout_before_print(self): 586 # Issue 4597: 'print' with no argument wasn't reporting when 587 # sys.stdout was deleted. 588 save_stdout = sys.stdout 589 del sys.stdout 590 try: 591 print 592 except RuntimeError as e: 593 self.assertEquals(str(e), "lost sys.stdout") 594 else: 595 self.fail("Expected RuntimeError") 596 finally: 597 sys.stdout = save_stdout 319 class COtherFileTests(OtherFileTests): 320 open = io.open 321 322 class PyOtherFileTests(OtherFileTests): 323 open = staticmethod(pyio.open) 598 324 599 325 … … 602 328 # So get rid of it no matter what. 603 329 try: 604 run_unittest( AutoFileTests, OtherFileTests, FileSubclassTests,605 FileThreadingTests, StdoutTests)330 run_unittest(CAutoFileTests, PyAutoFileTests, 331 COtherFileTests, PyOtherFileTests) 606 332 finally: 607 333 if os.path.exists(TESTFN):
Note:
See TracChangeset
for help on using the changeset viewer.