Changeset 391 for python/trunk/Lib/test/test_capi.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_capi.py
r2 r391 2 2 # these are all functions _testcapi exports whose name begins with 'test_'. 3 3 4 from __future__ import with_statement 4 5 import sys 6 import time 7 import random 8 import unittest 5 9 from test import test_support 10 try: 11 import thread 12 import threading 13 except ImportError: 14 thread = None 15 threading = None 6 16 import _testcapi 7 17 18 @unittest.skipUnless(threading, 'Threading required for this test.') 19 class TestPendingCalls(unittest.TestCase): 20 21 def pendingcalls_submit(self, l, n): 22 def callback(): 23 #this function can be interrupted by thread switching so let's 24 #use an atomic operation 25 l.append(None) 26 27 for i in range(n): 28 time.sleep(random.random()*0.02) #0.01 secs on average 29 #try submitting callback until successful. 30 #rely on regular interrupt to flush queue if we are 31 #unsuccessful. 32 while True: 33 if _testcapi._pending_threadfunc(callback): 34 break; 35 36 def pendingcalls_wait(self, l, n, context = None): 37 #now, stick around until l[0] has grown to 10 38 count = 0; 39 while len(l) != n: 40 #this busy loop is where we expect to be interrupted to 41 #run our callbacks. Note that callbacks are only run on the 42 #main thread 43 if False and test_support.verbose: 44 print "(%i)"%(len(l),), 45 for i in xrange(1000): 46 a = i*i 47 if context and not context.event.is_set(): 48 continue 49 count += 1 50 self.assertTrue(count < 10000, 51 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 52 if False and test_support.verbose: 53 print "(%i)"%(len(l),) 54 55 def test_pendingcalls_threaded(self): 56 #do every callback on a separate thread 57 n = 32 #total callbacks 58 threads = [] 59 class foo(object):pass 60 context = foo() 61 context.l = [] 62 context.n = 2 #submits per thread 63 context.nThreads = n // context.n 64 context.nFinished = 0 65 context.lock = threading.Lock() 66 context.event = threading.Event() 67 68 for i in range(context.nThreads): 69 t = threading.Thread(target=self.pendingcalls_thread, args = (context,)) 70 t.start() 71 threads.append(t) 72 73 self.pendingcalls_wait(context.l, n, context) 74 75 for t in threads: 76 t.join() 77 78 def pendingcalls_thread(self, context): 79 try: 80 self.pendingcalls_submit(context.l, context.n) 81 finally: 82 with context.lock: 83 context.nFinished += 1 84 nFinished = context.nFinished 85 if False and test_support.verbose: 86 print "finished threads: ", nFinished 87 if nFinished == context.nThreads: 88 context.event.set() 89 90 def test_pendingcalls_non_threaded(self): 91 #again, just using the main thread, likely they will all be dispatched at 92 #once. It is ok to ask for too many, because we loop until we find a slot. 93 #the loop can be interrupted to dispatch. 94 #there are only 32 dispatch slots, so we go for twice that! 95 l = [] 96 n = 64 97 self.pendingcalls_submit(l, n) 98 self.pendingcalls_wait(l, n) 99 100 101 @unittest.skipUnless(threading and thread, 'Threading required for this test.') 102 class TestThreadState(unittest.TestCase): 103 104 @test_support.reap_threads 105 def test_thread_state(self): 106 # some extra thread-state tests driven via _testcapi 107 def target(): 108 idents = [] 109 110 def callback(): 111 idents.append(thread.get_ident()) 112 113 _testcapi._test_thread_state(callback) 114 a = b = callback 115 time.sleep(1) 116 # Check our main thread is in the list exactly 3 times. 117 self.assertEqual(idents.count(thread.get_ident()), 3, 118 "Couldn't find main thread correctly in the list") 119 120 target() 121 t = threading.Thread(target=target) 122 t.start() 123 t.join() 124 125 8 126 def test_main(): 9 10 127 for name in dir(_testcapi): 11 128 if name.startswith('test_'): … … 18 135 raise test_support.TestFailed, sys.exc_info()[1] 19 136 20 # some extra thread-state tests driven via _testcapi 21 def TestThreadState(): 22 if test_support.verbose: 23 print "auto-thread-state" 24 25 idents = [] 26 27 def callback(): 28 idents.append(thread.get_ident()) 29 30 _testcapi._test_thread_state(callback) 31 a = b = callback 32 time.sleep(1) 33 # Check our main thread is in the list exactly 3 times. 34 if idents.count(thread.get_ident()) != 3: 35 raise test_support.TestFailed, \ 36 "Couldn't find main thread correctly in the list" 37 38 try: 39 _testcapi._test_thread_state 40 have_thread_state = True 41 except AttributeError: 42 have_thread_state = False 43 44 if have_thread_state: 45 import thread 46 import time 47 TestThreadState() 48 import threading 49 t=threading.Thread(target=TestThreadState) 50 t.start() 51 t.join() 137 test_support.run_unittest(TestPendingCalls, TestThreadState) 52 138 53 139 if __name__ == "__main__":
Note:
See TracChangeset
for help on using the changeset viewer.