Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_threading.py

    r2 r391  
    33import test.test_support
    44from test.test_support import verbose
     5from test.script_helper import assert_python_ok
     6
    57import random
    68import re
    79import sys
    8 import threading
    9 import thread
     10thread = test.test_support.import_module('thread')
     11threading = test.test_support.import_module('threading')
    1012import time
    1113import unittest
    1214import weakref
     15import os
     16import subprocess
    1317
    1418from test import lock_tests
     
    4448                if verbose:
    4549                    print self.nrunning.get(), 'tasks are running'
    46                 self.testcase.assert_(self.nrunning.get() <= 3)
     50                self.testcase.assertTrue(self.nrunning.get() <= 3)
    4751
    4852            time.sleep(delay)
     
    5256            with self.mutex:
    5357                self.nrunning.dec()
    54                 self.testcase.assert_(self.nrunning.get() >= 0)
     58                self.testcase.assertTrue(self.nrunning.get() >= 0)
    5559                if verbose:
    5660                    print '%s is finished. %d tasks are running' % (
    5761                        self.name, self.nrunning.get())
    5862
    59 class ThreadTests(unittest.TestCase):
     63class BaseTestCase(unittest.TestCase):
     64    def setUp(self):
     65        self._threads = test.test_support.threading_setup()
     66
     67    def tearDown(self):
     68        test.test_support.threading_cleanup(*self._threads)
     69        test.test_support.reap_children()
     70
     71
     72class ThreadTests(BaseTestCase):
    6073
    6174    # Create a bunch of threads, let each do some work, wait until all are
     
    7689            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
    7790            threads.append(t)
    78             self.failUnlessEqual(t.ident, None)
    79             self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t)))
     91            self.assertEqual(t.ident, None)
     92            self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
    8093            t.start()
    8194
     
    8497        for t in threads:
    8598            t.join(NUMTASKS)
    86             self.assert_(not t.is_alive())
    87             self.failIfEqual(t.ident, 0)
     99            self.assertTrue(not t.is_alive())
     100            self.assertNotEqual(t.ident, 0)
    88101            self.assertFalse(t.ident is None)
    89             self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
     102            self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
    90103        if verbose:
    91104            print 'all tasks done'
     
    145158        # Wait for the thread to finish.
    146159        mutex.acquire()
    147         self.assert_(tid in threading._active)
    148         self.assert_(isinstance(threading._active[tid],
    149                                 threading._DummyThread))
     160        self.assertIn(tid, threading._active)
     161        self.assertIsInstance(threading._active[tid], threading._DummyThread)
    150162        del threading._active[tid]
    151163
     
    167179        exception = ctypes.py_object(AsyncExc)
    168180
     181        # First check it works when setting the exception from the same thread.
     182        tid = thread.get_ident()
     183
     184        try:
     185            result = set_async_exc(ctypes.c_long(tid), exception)
     186            # The exception is async, so we might have to keep the VM busy until
     187            # it notices.
     188            while True:
     189                pass
     190        except AsyncExc:
     191            pass
     192        else:
     193            # This code is unreachable but it reflects the intent. If we wanted
     194            # to be smarter the above loop wouldn't be infinite.
     195            self.fail("AsyncExc not raised")
     196        try:
     197            self.assertEqual(result, 1) # one thread state modified
     198        except UnboundLocalError:
     199            # The exception was raised too quickly for us to get the result.
     200            pass
     201
    169202        # `worker_started` is set by the thread when it's inside a try/except
    170203        # block waiting to catch the asynchronously set AsyncExc exception.
     
    202235        if verbose:
    203236            print "    waiting for worker thread to get started"
    204         worker_started.wait()
     237        ret = worker_started.wait()
     238        self.assertTrue(ret)
    205239        if verbose:
    206240            print "    verifying worker hasn't exited"
    207         self.assert_(not t.finished)
     241        self.assertTrue(not t.finished)
    208242        if verbose:
    209243            print "    attempting to raise asynch exception in worker"
     
    213247            print "    waiting for worker to say it caught the exception"
    214248        worker_saw_exception.wait(timeout=10)
    215         self.assert_(t.finished)
     249        self.assertTrue(t.finished)
    216250        if verbose:
    217251            print "    all OK -- joining worker"
     
    246280            return  # can't do anything
    247281
    248         import subprocess
    249282        rc = subprocess.call([sys.executable, "-c", """if 1:
    250283            import ctypes, sys, time, thread
     
    277310        # Issue1733757
    278311        # Avoid a deadlock when sys.settrace steps into threading._shutdown
    279         import subprocess
    280         rc = subprocess.call([sys.executable, "-c", """if 1:
     312        p = subprocess.Popen([sys.executable, "-c", """if 1:
    281313            import sys, threading
    282314
     
    298330
    299331            sys.settrace(func)
    300             """])
    301         self.failIf(rc == 2, "interpreted was blocked")
    302         self.failUnless(rc == 0, "Unexpected error")
     332            """],
     333            stdout=subprocess.PIPE,
     334            stderr=subprocess.PIPE)
     335        self.addCleanup(p.stdout.close)
     336        self.addCleanup(p.stderr.close)
     337        stdout, stderr = p.communicate()
     338        rc = p.returncode
     339        self.assertFalse(rc == 2, "interpreted was blocked")
     340        self.assertTrue(rc == 0,
     341                        "Unexpected error: " + repr(stderr))
    303342
    304343    def test_join_nondaemon_on_shutdown(self):
    305344        # Issue 1722344
    306345        # Raising SystemExit skipped threading._shutdown
    307         import subprocess
    308346        p = subprocess.Popen([sys.executable, "-c", """if 1:
    309347                import threading
     
    321359            stdout=subprocess.PIPE,
    322360            stderr=subprocess.PIPE)
     361        self.addCleanup(p.stdout.close)
     362        self.addCleanup(p.stderr.close)
    323363        stdout, stderr = p.communicate()
    324364        self.assertEqual(stdout.strip(),
     
    341381                t.join()
    342382                l = enum()
    343                 self.assertFalse(t in l,
     383                self.assertNotIn(t, l,
    344384                    "#1703448 triggered after %d trials: %s" % (i, l))
    345385        finally:
     
    365405        cyclic_object.thread.join()
    366406        del cyclic_object
    367         self.assertEquals(None, weak_cyclic_object(),
    368                           msg=('%d references still around' %
    369                                sys.getrefcount(weak_cyclic_object())))
     407        self.assertEqual(None, weak_cyclic_object(),
     408                         msg=('%d references still around' %
     409                              sys.getrefcount(weak_cyclic_object())))
    370410
    371411        raising_cyclic_object = RunSelfFunction(should_raise=True)
     
    373413        raising_cyclic_object.thread.join()
    374414        del raising_cyclic_object
    375         self.assertEquals(None, weak_raising_cyclic_object(),
    376                           msg=('%d references still around' %
    377                                sys.getrefcount(weak_raising_cyclic_object())))
    378 
    379 
    380 class ThreadJoinOnShutdown(unittest.TestCase):
     415        self.assertEqual(None, weak_raising_cyclic_object(),
     416                         msg=('%d references still around' %
     417                              sys.getrefcount(weak_raising_cyclic_object())))
     418
     419    @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
     420    def test_dummy_thread_after_fork(self):
     421        # Issue #14308: a dummy thread in the active list doesn't mess up
     422        # the after-fork mechanism.
     423        code = """if 1:
     424            import thread, threading, os, time
     425
     426            def background_thread(evt):
     427                # Creates and registers the _DummyThread instance
     428                threading.current_thread()
     429                evt.set()
     430                time.sleep(10)
     431
     432            evt = threading.Event()
     433            thread.start_new_thread(background_thread, (evt,))
     434            evt.wait()
     435            assert threading.active_count() == 2, threading.active_count()
     436            if os.fork() == 0:
     437                assert threading.active_count() == 1, threading.active_count()
     438                os._exit(0)
     439            else:
     440                os.wait()
     441        """
     442        _, out, err = assert_python_ok("-c", code)
     443        self.assertEqual(out, '')
     444        self.assertEqual(err, '')
     445
     446    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     447    def test_is_alive_after_fork(self):
     448        # Try hard to trigger #18418: is_alive() could sometimes be True on
     449        # threads that vanished after a fork.
     450        old_interval = sys.getcheckinterval()
     451
     452        # Make the bug more likely to manifest.
     453        sys.setcheckinterval(10)
     454
     455        try:
     456            for i in range(20):
     457                t = threading.Thread(target=lambda: None)
     458                t.start()
     459                pid = os.fork()
     460                if pid == 0:
     461                    os._exit(1 if t.is_alive() else 0)
     462                else:
     463                    t.join()
     464                    pid, status = os.waitpid(pid, 0)
     465                    self.assertEqual(0, status)
     466        finally:
     467            sys.setcheckinterval(old_interval)
     468
     469    def test_BoundedSemaphore_limit(self):
     470        # BoundedSemaphore should raise ValueError if released too often.
     471        for limit in range(1, 10):
     472            bs = threading.BoundedSemaphore(limit)
     473            threads = [threading.Thread(target=bs.acquire)
     474                       for _ in range(limit)]
     475            for t in threads:
     476                t.start()
     477            for t in threads:
     478                t.join()
     479            threads = [threading.Thread(target=bs.release)
     480                       for _ in range(limit)]
     481            for t in threads:
     482                t.start()
     483            for t in threads:
     484                t.join()
     485            self.assertRaises(ValueError, bs.release)
     486
     487class ThreadJoinOnShutdown(BaseTestCase):
     488
     489    # Between fork() and exec(), only async-safe functions are allowed (issues
     490    # #12316 and #11870), and fork() from a worker thread is known to trigger
     491    # problems with some operating systems (issue #3863): skip problematic tests
     492    # on platforms known to behave badly.
     493    platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
     494                         'os2emx')
    381495
    382496    def _run_and_join(self, script):
     
    390504        \n""" + script
    391505
    392         import subprocess
    393506        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
    394507        rc = p.wait()
    395508        data = p.stdout.read().replace('\r', '')
     509        p.stdout.close()
    396510        self.assertEqual(data, "end of main\nend of thread\n")
    397         self.failIf(rc == 2, "interpreter was blocked")
    398         self.failUnless(rc == 0, "Unexpected error")
     511        self.assertFalse(rc == 2, "interpreter was blocked")
     512        self.assertTrue(rc == 0, "Unexpected error")
    399513
    400514    def test_1_join_on_shutdown(self):
     
    411525
    412526
     527    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     528    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    413529    def test_2_join_in_forked_process(self):
    414530        # Like the test above, but from a forked interpreter
    415         import os
    416         if not hasattr(os, 'fork'):
    417             return
    418531        script = """if 1:
    419532            childpid = os.fork()
     
    429542        self._run_and_join(script)
    430543
     544    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     545    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    431546    def test_3_join_in_forked_from_thread(self):
    432547        # Like the test above, but fork() was called from a worker thread
    433548        # In the forked process, the main Thread object must be marked as stopped.
    434         import os
    435         if not hasattr(os, 'fork'):
    436             return
    437         # Skip platforms with known problems forking from a worker thread.
    438         # See http://bugs.python.org/issue3863.
    439         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
    440             print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'
    441                                  ' due to known OS bugs on'), sys.platform
    442             return
    443549        script = """if 1:
    444550            main_thread = threading.current_thread()
     
    460566        self._run_and_join(script)
    461567
    462 
    463 class ThreadingExceptionTests(unittest.TestCase):
     568    def assertScriptHasOutput(self, script, expected_output):
     569        p = subprocess.Popen([sys.executable, "-c", script],
     570                             stdout=subprocess.PIPE)
     571        rc = p.wait()
     572        data = p.stdout.read().decode().replace('\r', '')
     573        self.assertEqual(rc, 0, "Unexpected error")
     574        self.assertEqual(data, expected_output)
     575
     576    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     577    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
     578    def test_4_joining_across_fork_in_worker_thread(self):
     579        # There used to be a possible deadlock when forking from a child
     580        # thread.  See http://bugs.python.org/issue6643.
     581
     582        # The script takes the following steps:
     583        # - The main thread in the parent process starts a new thread and then
     584        #   tries to join it.
     585        # - The join operation acquires the Lock inside the thread's _block
     586        #   Condition.  (See threading.py:Thread.join().)
     587        # - We stub out the acquire method on the condition to force it to wait
     588        #   until the child thread forks.  (See LOCK ACQUIRED HERE)
     589        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
     590        #   HERE)
     591        # - The main thread of the parent process enters Condition.wait(),
     592        #   which releases the lock on the child thread.
     593        # - The child process returns.  Without the necessary fix, when the
     594        #   main thread of the child process (which used to be the child thread
     595        #   in the parent process) attempts to exit, it will try to acquire the
     596        #   lock in the Thread._block Condition object and hang, because the
     597        #   lock was held across the fork.
     598
     599        script = """if 1:
     600            import os, time, threading
     601
     602            finish_join = False
     603            start_fork = False
     604
     605            def worker():
     606                # Wait until this thread's lock is acquired before forking to
     607                # create the deadlock.
     608                global finish_join
     609                while not start_fork:
     610                    time.sleep(0.01)
     611                # LOCK HELD: Main thread holds lock across this call.
     612                childpid = os.fork()
     613                finish_join = True
     614                if childpid != 0:
     615                    # Parent process just waits for child.
     616                    os.waitpid(childpid, 0)
     617                # Child process should just return.
     618
     619            w = threading.Thread(target=worker)
     620
     621            # Stub out the private condition variable's lock acquire method.
     622            # This acquires the lock and then waits until the child has forked
     623            # before returning, which will release the lock soon after.  If
     624            # someone else tries to fix this test case by acquiring this lock
     625            # before forking instead of resetting it, the test case will
     626            # deadlock when it shouldn't.
     627            condition = w._block
     628            orig_acquire = condition.acquire
     629            call_count_lock = threading.Lock()
     630            call_count = 0
     631            def my_acquire():
     632                global call_count
     633                global start_fork
     634                orig_acquire()  # LOCK ACQUIRED HERE
     635                start_fork = True
     636                if call_count == 0:
     637                    while not finish_join:
     638                        time.sleep(0.01)  # WORKER THREAD FORKS HERE
     639                with call_count_lock:
     640                    call_count += 1
     641            condition.acquire = my_acquire
     642
     643            w.start()
     644            w.join()
     645            print('end of main')
     646            """
     647        self.assertScriptHasOutput(script, "end of main\n")
     648
     649    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     650    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
     651    def test_5_clear_waiter_locks_to_avoid_crash(self):
     652        # Check that a spawned thread that forks doesn't segfault on certain
     653        # platforms, namely OS X.  This used to happen if there was a waiter
     654        # lock in the thread's condition variable's waiters list.  Even though
     655        # we know the lock will be held across the fork, it is not safe to
     656        # release locks held across forks on all platforms, so releasing the
     657        # waiter lock caused a segfault on OS X.  Furthermore, since locks on
     658        # OS X are (as of this writing) implemented with a mutex + condition
     659        # variable instead of a semaphore, while we know that the Python-level
     660        # lock will be acquired, we can't know if the internal mutex will be
     661        # acquired at the time of the fork.
     662
     663        script = """if True:
     664            import os, time, threading
     665
     666            start_fork = False
     667
     668            def worker():
     669                # Wait until the main thread has attempted to join this thread
     670                # before continuing.
     671                while not start_fork:
     672                    time.sleep(0.01)
     673                childpid = os.fork()
     674                if childpid != 0:
     675                    # Parent process just waits for child.
     676                    (cpid, rc) = os.waitpid(childpid, 0)
     677                    assert cpid == childpid
     678                    assert rc == 0
     679                    print('end of worker thread')
     680                else:
     681                    # Child process should just return.
     682                    pass
     683
     684            w = threading.Thread(target=worker)
     685
     686            # Stub out the private condition variable's _release_save method.
     687            # This releases the condition's lock and flips the global that
     688            # causes the worker to fork.  At this point, the problematic waiter
     689            # lock has been acquired once by the waiter and has been put onto
     690            # the waiters list.
     691            condition = w._block
     692            orig_release_save = condition._release_save
     693            def my_release_save():
     694                global start_fork
     695                orig_release_save()
     696                # Waiter lock held here, condition lock released.
     697                start_fork = True
     698            condition._release_save = my_release_save
     699
     700            w.start()
     701            w.join()
     702            print('end of main thread')
     703            """
     704        output = "end of worker thread\nend of main thread\n"
     705        self.assertScriptHasOutput(script, output)
     706
     707    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     708    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
     709    def test_reinit_tls_after_fork(self):
     710        # Issue #13817: fork() would deadlock in a multithreaded program with
     711        # the ad-hoc TLS implementation.
     712
     713        def do_fork_and_wait():
     714            # just fork a child process and wait it
     715            pid = os.fork()
     716            if pid > 0:
     717                os.waitpid(pid, 0)
     718            else:
     719                os._exit(0)
     720
     721        # start a bunch of threads that will fork() child processes
     722        threads = []
     723        for i in range(16):
     724            t = threading.Thread(target=do_fork_and_wait)
     725            threads.append(t)
     726            t.start()
     727
     728        for t in threads:
     729            t.join()
     730
     731
     732class ThreadingExceptionTests(BaseTestCase):
    464733    # A RuntimeError should be raised if Thread.start() is called
    465734    # multiple times.
     
    505774    semtype = staticmethod(threading.BoundedSemaphore)
    506775
     776    @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
     777    def test_recursion_limit(self):
     778        # Issue 9670
     779        # test that excessive recursion within a non-main thread causes
     780        # an exception rather than crashing the interpreter on platforms
     781        # like Mac OS X or FreeBSD which have small default stack sizes
     782        # for threads
     783        script = """if True:
     784            import threading
     785
     786            def recurse():
     787                return recurse()
     788
     789            def outer():
     790                try:
     791                    recurse()
     792                except RuntimeError:
     793                    pass
     794
     795            w = threading.Thread(target=outer)
     796            w.start()
     797            w.join()
     798            print('end of main thread')
     799            """
     800        expected_output = "end of main thread\n"
     801        p = subprocess.Popen([sys.executable, "-c", script],
     802                             stdout=subprocess.PIPE)
     803        stdout, stderr = p.communicate()
     804        data = stdout.decode().replace('\r', '')
     805        self.assertEqual(p.returncode, 0, "Unexpected error")
     806        self.assertEqual(data, expected_output)
    507807
    508808def test_main():
Note: See TracChangeset for help on using the changeset viewer.