Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_capi.py

    r2 r391  
    22# these are all functions _testcapi exports whose name begins with 'test_'.
    33
     4from __future__ import with_statement
    45import sys
     6import time
     7import random
     8import unittest
    59from test import test_support
     10try:
     11    import thread
     12    import threading
     13except ImportError:
     14    thread = None
     15    threading = None
    616import _testcapi
    717
     18@unittest.skipUnless(threading, 'Threading required for this test.')
     19class TestPendingCalls(unittest.TestCase):
     20
     21    def pendingcalls_submit(self, l, n):
     22        def callback():
     23            #this function can be interrupted by thread switching so let's
     24            #use an atomic operation
     25            l.append(None)
     26
     27        for i in range(n):
     28            time.sleep(random.random()*0.02) #0.01 secs on average
     29            #try submitting callback until successful.
     30            #rely on regular interrupt to flush queue if we are
     31            #unsuccessful.
     32            while True:
     33                if _testcapi._pending_threadfunc(callback):
     34                    break;
     35
     36    def pendingcalls_wait(self, l, n, context = None):
     37        #now, stick around until l[0] has grown to 10
     38        count = 0;
     39        while len(l) != n:
     40            #this busy loop is where we expect to be interrupted to
     41            #run our callbacks.  Note that callbacks are only run on the
     42            #main thread
     43            if False and test_support.verbose:
     44                print "(%i)"%(len(l),),
     45            for i in xrange(1000):
     46                a = i*i
     47            if context and not context.event.is_set():
     48                continue
     49            count += 1
     50            self.assertTrue(count < 10000,
     51                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
     52        if False and test_support.verbose:
     53            print "(%i)"%(len(l),)
     54
     55    def test_pendingcalls_threaded(self):
     56        #do every callback on a separate thread
     57        n = 32 #total callbacks
     58        threads = []
     59        class foo(object):pass
     60        context = foo()
     61        context.l = []
     62        context.n = 2 #submits per thread
     63        context.nThreads = n // context.n
     64        context.nFinished = 0
     65        context.lock = threading.Lock()
     66        context.event = threading.Event()
     67
     68        for i in range(context.nThreads):
     69            t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
     70            t.start()
     71            threads.append(t)
     72
     73        self.pendingcalls_wait(context.l, n, context)
     74
     75        for t in threads:
     76            t.join()
     77
     78    def pendingcalls_thread(self, context):
     79        try:
     80            self.pendingcalls_submit(context.l, context.n)
     81        finally:
     82            with context.lock:
     83                context.nFinished += 1
     84                nFinished = context.nFinished
     85                if False and test_support.verbose:
     86                    print "finished threads: ", nFinished
     87            if nFinished == context.nThreads:
     88                context.event.set()
     89
     90    def test_pendingcalls_non_threaded(self):
     91        #again, just using the main thread, likely they will all be dispatched at
     92        #once.  It is ok to ask for too many, because we loop until we find a slot.
     93        #the loop can be interrupted to dispatch.
     94        #there are only 32 dispatch slots, so we go for twice that!
     95        l = []
     96        n = 64
     97        self.pendingcalls_submit(l, n)
     98        self.pendingcalls_wait(l, n)
     99
     100
     101@unittest.skipUnless(threading and thread, 'Threading required for this test.')
     102class TestThreadState(unittest.TestCase):
     103
     104    @test_support.reap_threads
     105    def test_thread_state(self):
     106        # some extra thread-state tests driven via _testcapi
     107        def target():
     108            idents = []
     109
     110            def callback():
     111                idents.append(thread.get_ident())
     112
     113            _testcapi._test_thread_state(callback)
     114            a = b = callback
     115            time.sleep(1)
     116            # Check our main thread is in the list exactly 3 times.
     117            self.assertEqual(idents.count(thread.get_ident()), 3,
     118                             "Couldn't find main thread correctly in the list")
     119
     120        target()
     121        t = threading.Thread(target=target)
     122        t.start()
     123        t.join()
     124
     125
    8126def test_main():
    9 
    10127    for name in dir(_testcapi):
    11128        if name.startswith('test_'):
     
    18135                raise test_support.TestFailed, sys.exc_info()[1]
    19136
    20     # some extra thread-state tests driven via _testcapi
    21     def TestThreadState():
    22         if test_support.verbose:
    23             print "auto-thread-state"
    24 
    25         idents = []
    26 
    27         def callback():
    28             idents.append(thread.get_ident())
    29 
    30         _testcapi._test_thread_state(callback)
    31         a = b = callback
    32         time.sleep(1)
    33         # Check our main thread is in the list exactly 3 times.
    34         if idents.count(thread.get_ident()) != 3:
    35             raise test_support.TestFailed, \
    36                   "Couldn't find main thread correctly in the list"
    37 
    38     try:
    39         _testcapi._test_thread_state
    40         have_thread_state = True
    41     except AttributeError:
    42         have_thread_state = False
    43 
    44     if have_thread_state:
    45         import thread
    46         import time
    47         TestThreadState()
    48         import threading
    49         t=threading.Thread(target=TestThreadState)
    50         t.start()
    51         t.join()
     137    test_support.run_unittest(TestPendingCalls, TestThreadState)
    52138
    53139if __name__ == "__main__":
Note: See TracChangeset for help on using the changeset viewer.