Changeset 391 for python/trunk/Lib/test/test_threading.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_threading.py
r2 r391 3 3 import test.test_support 4 4 from test.test_support import verbose 5 from test.script_helper import assert_python_ok 6 5 7 import random 6 8 import re 7 9 import sys 8 import threading 9 import thread 10 thread = test.test_support.import_module('thread') 11 threading = test.test_support.import_module('threading') 10 12 import time 11 13 import unittest 12 14 import weakref 15 import os 16 import subprocess 13 17 14 18 from test import lock_tests … … 44 48 if verbose: 45 49 print self.nrunning.get(), 'tasks are running' 46 self.testcase.assert _(self.nrunning.get() <= 3)50 self.testcase.assertTrue(self.nrunning.get() <= 3) 47 51 48 52 time.sleep(delay) … … 52 56 with self.mutex: 53 57 self.nrunning.dec() 54 self.testcase.assert _(self.nrunning.get() >= 0)58 self.testcase.assertTrue(self.nrunning.get() >= 0) 55 59 if verbose: 56 60 print '%s is finished. %d tasks are running' % ( 57 61 self.name, self.nrunning.get()) 58 62 59 class ThreadTests(unittest.TestCase): 63 class BaseTestCase(unittest.TestCase): 64 def setUp(self): 65 self._threads = test.test_support.threading_setup() 66 67 def tearDown(self): 68 test.test_support.threading_cleanup(*self._threads) 69 test.test_support.reap_children() 70 71 72 class ThreadTests(BaseTestCase): 60 73 61 74 # Create a bunch of threads, let each do some work, wait until all are … … 76 89 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 77 90 threads.append(t) 78 self. failUnlessEqual(t.ident, None)79 self.assert _(re.match('<TestThread\(.*, initial\)>', repr(t)))91 self.assertEqual(t.ident, None) 92 self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t))) 80 93 t.start() 81 94 … … 84 97 for t in threads: 85 98 t.join(NUMTASKS) 86 self.assert _(not t.is_alive())87 self. failIfEqual(t.ident, 0)99 self.assertTrue(not t.is_alive()) 100 self.assertNotEqual(t.ident, 0) 88 101 self.assertFalse(t.ident is None) 89 self.assert _(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))102 self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) 90 103 if verbose: 91 104 print 'all tasks done' … … 145 158 # Wait for the thread to finish. 146 159 mutex.acquire() 147 self.assert_(tid in threading._active) 148 self.assert_(isinstance(threading._active[tid], 149 threading._DummyThread)) 160 self.assertIn(tid, threading._active) 161 self.assertIsInstance(threading._active[tid], threading._DummyThread) 150 162 del threading._active[tid] 151 163 … … 167 179 exception = ctypes.py_object(AsyncExc) 168 180 181 # First check it works when setting the exception from the same thread. 182 tid = thread.get_ident() 183 184 try: 185 result = set_async_exc(ctypes.c_long(tid), exception) 186 # The exception is async, so we might have to keep the VM busy until 187 # it notices. 188 while True: 189 pass 190 except AsyncExc: 191 pass 192 else: 193 # This code is unreachable but it reflects the intent. If we wanted 194 # to be smarter the above loop wouldn't be infinite. 195 self.fail("AsyncExc not raised") 196 try: 197 self.assertEqual(result, 1) # one thread state modified 198 except UnboundLocalError: 199 # The exception was raised too quickly for us to get the result. 200 pass 201 169 202 # `worker_started` is set by the thread when it's inside a try/except 170 203 # block waiting to catch the asynchronously set AsyncExc exception. … … 202 235 if verbose: 203 236 print " waiting for worker thread to get started" 204 worker_started.wait() 237 ret = worker_started.wait() 238 self.assertTrue(ret) 205 239 if verbose: 206 240 print " verifying worker hasn't exited" 207 self.assert _(not t.finished)241 self.assertTrue(not t.finished) 208 242 if verbose: 209 243 print " attempting to raise asynch exception in worker" … … 213 247 print " waiting for worker to say it caught the exception" 214 248 worker_saw_exception.wait(timeout=10) 215 self.assert _(t.finished)249 self.assertTrue(t.finished) 216 250 if verbose: 217 251 print " all OK -- joining worker" … … 246 280 return # can't do anything 247 281 248 import subprocess249 282 rc = subprocess.call([sys.executable, "-c", """if 1: 250 283 import ctypes, sys, time, thread … … 277 310 # Issue1733757 278 311 # Avoid a deadlock when sys.settrace steps into threading._shutdown 279 import subprocess 280 rc = subprocess.call([sys.executable, "-c", """if 1: 312 p = subprocess.Popen([sys.executable, "-c", """if 1: 281 313 import sys, threading 282 314 … … 298 330 299 331 sys.settrace(func) 300 """]) 301 self.failIf(rc == 2, "interpreted was blocked") 302 self.failUnless(rc == 0, "Unexpected error") 332 """], 333 stdout=subprocess.PIPE, 334 stderr=subprocess.PIPE) 335 self.addCleanup(p.stdout.close) 336 self.addCleanup(p.stderr.close) 337 stdout, stderr = p.communicate() 338 rc = p.returncode 339 self.assertFalse(rc == 2, "interpreted was blocked") 340 self.assertTrue(rc == 0, 341 "Unexpected error: " + repr(stderr)) 303 342 304 343 def test_join_nondaemon_on_shutdown(self): 305 344 # Issue 1722344 306 345 # Raising SystemExit skipped threading._shutdown 307 import subprocess308 346 p = subprocess.Popen([sys.executable, "-c", """if 1: 309 347 import threading … … 321 359 stdout=subprocess.PIPE, 322 360 stderr=subprocess.PIPE) 361 self.addCleanup(p.stdout.close) 362 self.addCleanup(p.stderr.close) 323 363 stdout, stderr = p.communicate() 324 364 self.assertEqual(stdout.strip(), … … 341 381 t.join() 342 382 l = enum() 343 self.assert False(t inl,383 self.assertNotIn(t, l, 344 384 "#1703448 triggered after %d trials: %s" % (i, l)) 345 385 finally: … … 365 405 cyclic_object.thread.join() 366 406 del cyclic_object 367 self.assertEqual s(None, weak_cyclic_object(),368 369 407 self.assertEqual(None, weak_cyclic_object(), 408 msg=('%d references still around' % 409 sys.getrefcount(weak_cyclic_object()))) 370 410 371 411 raising_cyclic_object = RunSelfFunction(should_raise=True) … … 373 413 raising_cyclic_object.thread.join() 374 414 del raising_cyclic_object 375 self.assertEquals(None, weak_raising_cyclic_object(), 376 msg=('%d references still around' % 377 sys.getrefcount(weak_raising_cyclic_object()))) 378 379 380 class ThreadJoinOnShutdown(unittest.TestCase): 415 self.assertEqual(None, weak_raising_cyclic_object(), 416 msg=('%d references still around' % 417 sys.getrefcount(weak_raising_cyclic_object()))) 418 419 @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') 420 def test_dummy_thread_after_fork(self): 421 # Issue #14308: a dummy thread in the active list doesn't mess up 422 # the after-fork mechanism. 423 code = """if 1: 424 import thread, threading, os, time 425 426 def background_thread(evt): 427 # Creates and registers the _DummyThread instance 428 threading.current_thread() 429 evt.set() 430 time.sleep(10) 431 432 evt = threading.Event() 433 thread.start_new_thread(background_thread, (evt,)) 434 evt.wait() 435 assert threading.active_count() == 2, threading.active_count() 436 if os.fork() == 0: 437 assert threading.active_count() == 1, threading.active_count() 438 os._exit(0) 439 else: 440 os.wait() 441 """ 442 _, out, err = assert_python_ok("-c", code) 443 self.assertEqual(out, '') 444 self.assertEqual(err, '') 445 446 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 447 def test_is_alive_after_fork(self): 448 # Try hard to trigger #18418: is_alive() could sometimes be True on 449 # threads that vanished after a fork. 450 old_interval = sys.getcheckinterval() 451 452 # Make the bug more likely to manifest. 453 sys.setcheckinterval(10) 454 455 try: 456 for i in range(20): 457 t = threading.Thread(target=lambda: None) 458 t.start() 459 pid = os.fork() 460 if pid == 0: 461 os._exit(1 if t.is_alive() else 0) 462 else: 463 t.join() 464 pid, status = os.waitpid(pid, 0) 465 self.assertEqual(0, status) 466 finally: 467 sys.setcheckinterval(old_interval) 468 469 def test_BoundedSemaphore_limit(self): 470 # BoundedSemaphore should raise ValueError if released too often. 471 for limit in range(1, 10): 472 bs = threading.BoundedSemaphore(limit) 473 threads = [threading.Thread(target=bs.acquire) 474 for _ in range(limit)] 475 for t in threads: 476 t.start() 477 for t in threads: 478 t.join() 479 threads = [threading.Thread(target=bs.release) 480 for _ in range(limit)] 481 for t in threads: 482 t.start() 483 for t in threads: 484 t.join() 485 self.assertRaises(ValueError, bs.release) 486 487 class ThreadJoinOnShutdown(BaseTestCase): 488 489 # Between fork() and exec(), only async-safe functions are allowed (issues 490 # #12316 and #11870), and fork() from a worker thread is known to trigger 491 # problems with some operating systems (issue #3863): skip problematic tests 492 # on platforms known to behave badly. 493 platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', 494 'os2emx') 381 495 382 496 def _run_and_join(self, script): … … 390 504 \n""" + script 391 505 392 import subprocess393 506 p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) 394 507 rc = p.wait() 395 508 data = p.stdout.read().replace('\r', '') 509 p.stdout.close() 396 510 self.assertEqual(data, "end of main\nend of thread\n") 397 self. failIf(rc == 2, "interpreter was blocked")398 self. failUnless(rc == 0, "Unexpected error")511 self.assertFalse(rc == 2, "interpreter was blocked") 512 self.assertTrue(rc == 0, "Unexpected error") 399 513 400 514 def test_1_join_on_shutdown(self): … … 411 525 412 526 527 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 528 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 413 529 def test_2_join_in_forked_process(self): 414 530 # Like the test above, but from a forked interpreter 415 import os416 if not hasattr(os, 'fork'):417 return418 531 script = """if 1: 419 532 childpid = os.fork() … … 429 542 self._run_and_join(script) 430 543 544 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 545 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 431 546 def test_3_join_in_forked_from_thread(self): 432 547 # Like the test above, but fork() was called from a worker thread 433 548 # In the forked process, the main Thread object must be marked as stopped. 434 import os435 if not hasattr(os, 'fork'):436 return437 # Skip platforms with known problems forking from a worker thread.438 # See http://bugs.python.org/issue3863.439 if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):440 print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'441 ' due to known OS bugs on'), sys.platform442 return443 549 script = """if 1: 444 550 main_thread = threading.current_thread() … … 460 566 self._run_and_join(script) 461 567 462 463 class ThreadingExceptionTests(unittest.TestCase): 568 def assertScriptHasOutput(self, script, expected_output): 569 p = subprocess.Popen([sys.executable, "-c", script], 570 stdout=subprocess.PIPE) 571 rc = p.wait() 572 data = p.stdout.read().decode().replace('\r', '') 573 self.assertEqual(rc, 0, "Unexpected error") 574 self.assertEqual(data, expected_output) 575 576 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 577 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 578 def test_4_joining_across_fork_in_worker_thread(self): 579 # There used to be a possible deadlock when forking from a child 580 # thread. See http://bugs.python.org/issue6643. 581 582 # The script takes the following steps: 583 # - The main thread in the parent process starts a new thread and then 584 # tries to join it. 585 # - The join operation acquires the Lock inside the thread's _block 586 # Condition. (See threading.py:Thread.join().) 587 # - We stub out the acquire method on the condition to force it to wait 588 # until the child thread forks. (See LOCK ACQUIRED HERE) 589 # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS 590 # HERE) 591 # - The main thread of the parent process enters Condition.wait(), 592 # which releases the lock on the child thread. 593 # - The child process returns. Without the necessary fix, when the 594 # main thread of the child process (which used to be the child thread 595 # in the parent process) attempts to exit, it will try to acquire the 596 # lock in the Thread._block Condition object and hang, because the 597 # lock was held across the fork. 598 599 script = """if 1: 600 import os, time, threading 601 602 finish_join = False 603 start_fork = False 604 605 def worker(): 606 # Wait until this thread's lock is acquired before forking to 607 # create the deadlock. 608 global finish_join 609 while not start_fork: 610 time.sleep(0.01) 611 # LOCK HELD: Main thread holds lock across this call. 612 childpid = os.fork() 613 finish_join = True 614 if childpid != 0: 615 # Parent process just waits for child. 616 os.waitpid(childpid, 0) 617 # Child process should just return. 618 619 w = threading.Thread(target=worker) 620 621 # Stub out the private condition variable's lock acquire method. 622 # This acquires the lock and then waits until the child has forked 623 # before returning, which will release the lock soon after. If 624 # someone else tries to fix this test case by acquiring this lock 625 # before forking instead of resetting it, the test case will 626 # deadlock when it shouldn't. 627 condition = w._block 628 orig_acquire = condition.acquire 629 call_count_lock = threading.Lock() 630 call_count = 0 631 def my_acquire(): 632 global call_count 633 global start_fork 634 orig_acquire() # LOCK ACQUIRED HERE 635 start_fork = True 636 if call_count == 0: 637 while not finish_join: 638 time.sleep(0.01) # WORKER THREAD FORKS HERE 639 with call_count_lock: 640 call_count += 1 641 condition.acquire = my_acquire 642 643 w.start() 644 w.join() 645 print('end of main') 646 """ 647 self.assertScriptHasOutput(script, "end of main\n") 648 649 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 650 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 651 def test_5_clear_waiter_locks_to_avoid_crash(self): 652 # Check that a spawned thread that forks doesn't segfault on certain 653 # platforms, namely OS X. This used to happen if there was a waiter 654 # lock in the thread's condition variable's waiters list. Even though 655 # we know the lock will be held across the fork, it is not safe to 656 # release locks held across forks on all platforms, so releasing the 657 # waiter lock caused a segfault on OS X. Furthermore, since locks on 658 # OS X are (as of this writing) implemented with a mutex + condition 659 # variable instead of a semaphore, while we know that the Python-level 660 # lock will be acquired, we can't know if the internal mutex will be 661 # acquired at the time of the fork. 662 663 script = """if True: 664 import os, time, threading 665 666 start_fork = False 667 668 def worker(): 669 # Wait until the main thread has attempted to join this thread 670 # before continuing. 671 while not start_fork: 672 time.sleep(0.01) 673 childpid = os.fork() 674 if childpid != 0: 675 # Parent process just waits for child. 676 (cpid, rc) = os.waitpid(childpid, 0) 677 assert cpid == childpid 678 assert rc == 0 679 print('end of worker thread') 680 else: 681 # Child process should just return. 682 pass 683 684 w = threading.Thread(target=worker) 685 686 # Stub out the private condition variable's _release_save method. 687 # This releases the condition's lock and flips the global that 688 # causes the worker to fork. At this point, the problematic waiter 689 # lock has been acquired once by the waiter and has been put onto 690 # the waiters list. 691 condition = w._block 692 orig_release_save = condition._release_save 693 def my_release_save(): 694 global start_fork 695 orig_release_save() 696 # Waiter lock held here, condition lock released. 697 start_fork = True 698 condition._release_save = my_release_save 699 700 w.start() 701 w.join() 702 print('end of main thread') 703 """ 704 output = "end of worker thread\nend of main thread\n" 705 self.assertScriptHasOutput(script, output) 706 707 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 708 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 709 def test_reinit_tls_after_fork(self): 710 # Issue #13817: fork() would deadlock in a multithreaded program with 711 # the ad-hoc TLS implementation. 712 713 def do_fork_and_wait(): 714 # just fork a child process and wait it 715 pid = os.fork() 716 if pid > 0: 717 os.waitpid(pid, 0) 718 else: 719 os._exit(0) 720 721 # start a bunch of threads that will fork() child processes 722 threads = [] 723 for i in range(16): 724 t = threading.Thread(target=do_fork_and_wait) 725 threads.append(t) 726 t.start() 727 728 for t in threads: 729 t.join() 730 731 732 class ThreadingExceptionTests(BaseTestCase): 464 733 # A RuntimeError should be raised if Thread.start() is called 465 734 # multiple times. … … 505 774 semtype = staticmethod(threading.BoundedSemaphore) 506 775 776 @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') 777 def test_recursion_limit(self): 778 # Issue 9670 779 # test that excessive recursion within a non-main thread causes 780 # an exception rather than crashing the interpreter on platforms 781 # like Mac OS X or FreeBSD which have small default stack sizes 782 # for threads 783 script = """if True: 784 import threading 785 786 def recurse(): 787 return recurse() 788 789 def outer(): 790 try: 791 recurse() 792 except RuntimeError: 793 pass 794 795 w = threading.Thread(target=outer) 796 w.start() 797 w.join() 798 print('end of main thread') 799 """ 800 expected_output = "end of main thread\n" 801 p = subprocess.Popen([sys.executable, "-c", script], 802 stdout=subprocess.PIPE) 803 stdout, stderr = p.communicate() 804 data = stdout.decode().replace('\r', '') 805 self.assertEqual(p.returncode, 0, "Unexpected error") 806 self.assertEqual(data, expected_output) 507 807 508 808 def test_main():
Note:
See TracChangeset
for help on using the changeset viewer.