1 | # Very rudimentary test of threading module
|
---|
2 |
|
---|
3 | import test.test_support
|
---|
4 | from test.test_support import verbose
|
---|
5 | from test.script_helper import assert_python_ok
|
---|
6 |
|
---|
7 | import random
|
---|
8 | import re
|
---|
9 | import sys
|
---|
10 | thread = test.test_support.import_module('thread')
|
---|
11 | threading = test.test_support.import_module('threading')
|
---|
12 | import time
|
---|
13 | import unittest
|
---|
14 | import weakref
|
---|
15 | import os
|
---|
16 | import subprocess
|
---|
17 |
|
---|
18 | from test import lock_tests
|
---|
19 |
|
---|
20 | # A trivial mutable counter.
|
---|
21 | class Counter(object):
|
---|
22 | def __init__(self):
|
---|
23 | self.value = 0
|
---|
24 | def inc(self):
|
---|
25 | self.value += 1
|
---|
26 | def dec(self):
|
---|
27 | self.value -= 1
|
---|
28 | def get(self):
|
---|
29 | return self.value
|
---|
30 |
|
---|
31 | class TestThread(threading.Thread):
|
---|
32 | def __init__(self, name, testcase, sema, mutex, nrunning):
|
---|
33 | threading.Thread.__init__(self, name=name)
|
---|
34 | self.testcase = testcase
|
---|
35 | self.sema = sema
|
---|
36 | self.mutex = mutex
|
---|
37 | self.nrunning = nrunning
|
---|
38 |
|
---|
39 | def run(self):
|
---|
40 | delay = random.random() / 10000.0
|
---|
41 | if verbose:
|
---|
42 | print 'task %s will run for %.1f usec' % (
|
---|
43 | self.name, delay * 1e6)
|
---|
44 |
|
---|
45 | with self.sema:
|
---|
46 | with self.mutex:
|
---|
47 | self.nrunning.inc()
|
---|
48 | if verbose:
|
---|
49 | print self.nrunning.get(), 'tasks are running'
|
---|
50 | self.testcase.assertTrue(self.nrunning.get() <= 3)
|
---|
51 |
|
---|
52 | time.sleep(delay)
|
---|
53 | if verbose:
|
---|
54 | print 'task', self.name, 'done'
|
---|
55 |
|
---|
56 | with self.mutex:
|
---|
57 | self.nrunning.dec()
|
---|
58 | self.testcase.assertTrue(self.nrunning.get() >= 0)
|
---|
59 | if verbose:
|
---|
60 | print '%s is finished. %d tasks are running' % (
|
---|
61 | self.name, self.nrunning.get())
|
---|
62 |
|
---|
63 | class BaseTestCase(unittest.TestCase):
|
---|
64 | def setUp(self):
|
---|
65 | self._threads = test.test_support.threading_setup()
|
---|
66 |
|
---|
67 | def tearDown(self):
|
---|
68 | test.test_support.threading_cleanup(*self._threads)
|
---|
69 | test.test_support.reap_children()
|
---|
70 |
|
---|
71 |
|
---|
72 | class ThreadTests(BaseTestCase):
|
---|
73 |
|
---|
74 | # Create a bunch of threads, let each do some work, wait until all are
|
---|
75 | # done.
|
---|
76 | def test_various_ops(self):
|
---|
77 | # This takes about n/3 seconds to run (about n/3 clumps of tasks,
|
---|
78 | # times about 1 second per clump).
|
---|
79 | NUMTASKS = 10
|
---|
80 |
|
---|
81 | # no more than 3 of the 10 can run at once
|
---|
82 | sema = threading.BoundedSemaphore(value=3)
|
---|
83 | mutex = threading.RLock()
|
---|
84 | numrunning = Counter()
|
---|
85 |
|
---|
86 | threads = []
|
---|
87 |
|
---|
88 | for i in range(NUMTASKS):
|
---|
89 | t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
|
---|
90 | threads.append(t)
|
---|
91 | self.assertEqual(t.ident, None)
|
---|
92 | self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
|
---|
93 | t.start()
|
---|
94 |
|
---|
95 | if verbose:
|
---|
96 | print 'waiting for all tasks to complete'
|
---|
97 | for t in threads:
|
---|
98 | t.join(NUMTASKS)
|
---|
99 | self.assertTrue(not t.is_alive())
|
---|
100 | self.assertNotEqual(t.ident, 0)
|
---|
101 | self.assertFalse(t.ident is None)
|
---|
102 | self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
|
---|
103 | if verbose:
|
---|
104 | print 'all tasks done'
|
---|
105 | self.assertEqual(numrunning.get(), 0)
|
---|
106 |
|
---|
107 | def test_ident_of_no_threading_threads(self):
|
---|
108 | # The ident still must work for the main thread and dummy threads.
|
---|
109 | self.assertFalse(threading.currentThread().ident is None)
|
---|
110 | def f():
|
---|
111 | ident.append(threading.currentThread().ident)
|
---|
112 | done.set()
|
---|
113 | done = threading.Event()
|
---|
114 | ident = []
|
---|
115 | thread.start_new_thread(f, ())
|
---|
116 | done.wait()
|
---|
117 | self.assertFalse(ident[0] is None)
|
---|
118 | # Kill the "immortal" _DummyThread
|
---|
119 | del threading._active[ident[0]]
|
---|
120 |
|
---|
121 | # run with a small(ish) thread stack size (256kB)
|
---|
122 | def test_various_ops_small_stack(self):
|
---|
123 | if verbose:
|
---|
124 | print 'with 256kB thread stack size...'
|
---|
125 | try:
|
---|
126 | threading.stack_size(262144)
|
---|
127 | except thread.error:
|
---|
128 | if verbose:
|
---|
129 | print 'platform does not support changing thread stack size'
|
---|
130 | return
|
---|
131 | self.test_various_ops()
|
---|
132 | threading.stack_size(0)
|
---|
133 |
|
---|
134 | # run with a large thread stack size (1MB)
|
---|
135 | def test_various_ops_large_stack(self):
|
---|
136 | if verbose:
|
---|
137 | print 'with 1MB thread stack size...'
|
---|
138 | try:
|
---|
139 | threading.stack_size(0x100000)
|
---|
140 | except thread.error:
|
---|
141 | if verbose:
|
---|
142 | print 'platform does not support changing thread stack size'
|
---|
143 | return
|
---|
144 | self.test_various_ops()
|
---|
145 | threading.stack_size(0)
|
---|
146 |
|
---|
147 | def test_foreign_thread(self):
|
---|
148 | # Check that a "foreign" thread can use the threading module.
|
---|
149 | def f(mutex):
|
---|
150 | # Calling current_thread() forces an entry for the foreign
|
---|
151 | # thread to get made in the threading._active map.
|
---|
152 | threading.current_thread()
|
---|
153 | mutex.release()
|
---|
154 |
|
---|
155 | mutex = threading.Lock()
|
---|
156 | mutex.acquire()
|
---|
157 | tid = thread.start_new_thread(f, (mutex,))
|
---|
158 | # Wait for the thread to finish.
|
---|
159 | mutex.acquire()
|
---|
160 | self.assertIn(tid, threading._active)
|
---|
161 | self.assertIsInstance(threading._active[tid], threading._DummyThread)
|
---|
162 | del threading._active[tid]
|
---|
163 |
|
---|
164 | # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
|
---|
165 | # exposed at the Python level. This test relies on ctypes to get at it.
|
---|
166 | def test_PyThreadState_SetAsyncExc(self):
|
---|
167 | try:
|
---|
168 | import ctypes
|
---|
169 | except ImportError:
|
---|
170 | if verbose:
|
---|
171 | print "test_PyThreadState_SetAsyncExc can't import ctypes"
|
---|
172 | return # can't do anything
|
---|
173 |
|
---|
174 | set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
|
---|
175 |
|
---|
176 | class AsyncExc(Exception):
|
---|
177 | pass
|
---|
178 |
|
---|
179 | exception = ctypes.py_object(AsyncExc)
|
---|
180 |
|
---|
181 | # First check it works when setting the exception from the same thread.
|
---|
182 | tid = thread.get_ident()
|
---|
183 |
|
---|
184 | try:
|
---|
185 | result = set_async_exc(ctypes.c_long(tid), exception)
|
---|
186 | # The exception is async, so we might have to keep the VM busy until
|
---|
187 | # it notices.
|
---|
188 | while True:
|
---|
189 | pass
|
---|
190 | except AsyncExc:
|
---|
191 | pass
|
---|
192 | else:
|
---|
193 | # This code is unreachable but it reflects the intent. If we wanted
|
---|
194 | # to be smarter the above loop wouldn't be infinite.
|
---|
195 | self.fail("AsyncExc not raised")
|
---|
196 | try:
|
---|
197 | self.assertEqual(result, 1) # one thread state modified
|
---|
198 | except UnboundLocalError:
|
---|
199 | # The exception was raised too quickly for us to get the result.
|
---|
200 | pass
|
---|
201 |
|
---|
202 | # `worker_started` is set by the thread when it's inside a try/except
|
---|
203 | # block waiting to catch the asynchronously set AsyncExc exception.
|
---|
204 | # `worker_saw_exception` is set by the thread upon catching that
|
---|
205 | # exception.
|
---|
206 | worker_started = threading.Event()
|
---|
207 | worker_saw_exception = threading.Event()
|
---|
208 |
|
---|
209 | class Worker(threading.Thread):
|
---|
210 | def run(self):
|
---|
211 | self.id = thread.get_ident()
|
---|
212 | self.finished = False
|
---|
213 |
|
---|
214 | try:
|
---|
215 | while True:
|
---|
216 | worker_started.set()
|
---|
217 | time.sleep(0.1)
|
---|
218 | except AsyncExc:
|
---|
219 | self.finished = True
|
---|
220 | worker_saw_exception.set()
|
---|
221 |
|
---|
222 | t = Worker()
|
---|
223 | t.daemon = True # so if this fails, we don't hang Python at shutdown
|
---|
224 | t.start()
|
---|
225 | if verbose:
|
---|
226 | print " started worker thread"
|
---|
227 |
|
---|
228 | # Try a thread id that doesn't make sense.
|
---|
229 | if verbose:
|
---|
230 | print " trying nonsensical thread id"
|
---|
231 | result = set_async_exc(ctypes.c_long(-1), exception)
|
---|
232 | self.assertEqual(result, 0) # no thread states modified
|
---|
233 |
|
---|
234 | # Now raise an exception in the worker thread.
|
---|
235 | if verbose:
|
---|
236 | print " waiting for worker thread to get started"
|
---|
237 | ret = worker_started.wait()
|
---|
238 | self.assertTrue(ret)
|
---|
239 | if verbose:
|
---|
240 | print " verifying worker hasn't exited"
|
---|
241 | self.assertTrue(not t.finished)
|
---|
242 | if verbose:
|
---|
243 | print " attempting to raise asynch exception in worker"
|
---|
244 | result = set_async_exc(ctypes.c_long(t.id), exception)
|
---|
245 | self.assertEqual(result, 1) # one thread state modified
|
---|
246 | if verbose:
|
---|
247 | print " waiting for worker to say it caught the exception"
|
---|
248 | worker_saw_exception.wait(timeout=10)
|
---|
249 | self.assertTrue(t.finished)
|
---|
250 | if verbose:
|
---|
251 | print " all OK -- joining worker"
|
---|
252 | if t.finished:
|
---|
253 | t.join()
|
---|
254 | # else the thread is still running, and we have no way to kill it
|
---|
255 |
|
---|
256 | def test_limbo_cleanup(self):
|
---|
257 | # Issue 7481: Failure to start thread should cleanup the limbo map.
|
---|
258 | def fail_new_thread(*args):
|
---|
259 | raise thread.error()
|
---|
260 | _start_new_thread = threading._start_new_thread
|
---|
261 | threading._start_new_thread = fail_new_thread
|
---|
262 | try:
|
---|
263 | t = threading.Thread(target=lambda: None)
|
---|
264 | self.assertRaises(thread.error, t.start)
|
---|
265 | self.assertFalse(
|
---|
266 | t in threading._limbo,
|
---|
267 | "Failed to cleanup _limbo map on failure of Thread.start().")
|
---|
268 | finally:
|
---|
269 | threading._start_new_thread = _start_new_thread
|
---|
270 |
|
---|
271 | def test_finalize_runnning_thread(self):
|
---|
272 | # Issue 1402: the PyGILState_Ensure / _Release functions may be called
|
---|
273 | # very late on python exit: on deallocation of a running thread for
|
---|
274 | # example.
|
---|
275 | try:
|
---|
276 | import ctypes
|
---|
277 | except ImportError:
|
---|
278 | if verbose:
|
---|
279 | print("test_finalize_with_runnning_thread can't import ctypes")
|
---|
280 | return # can't do anything
|
---|
281 |
|
---|
282 | rc = subprocess.call([sys.executable, "-c", """if 1:
|
---|
283 | import ctypes, sys, time, thread
|
---|
284 |
|
---|
285 | # This lock is used as a simple event variable.
|
---|
286 | ready = thread.allocate_lock()
|
---|
287 | ready.acquire()
|
---|
288 |
|
---|
289 | # Module globals are cleared before __del__ is run
|
---|
290 | # So we save the functions in class dict
|
---|
291 | class C:
|
---|
292 | ensure = ctypes.pythonapi.PyGILState_Ensure
|
---|
293 | release = ctypes.pythonapi.PyGILState_Release
|
---|
294 | def __del__(self):
|
---|
295 | state = self.ensure()
|
---|
296 | self.release(state)
|
---|
297 |
|
---|
298 | def waitingThread():
|
---|
299 | x = C()
|
---|
300 | ready.release()
|
---|
301 | time.sleep(100)
|
---|
302 |
|
---|
303 | thread.start_new_thread(waitingThread, ())
|
---|
304 | ready.acquire() # Be sure the other thread is waiting.
|
---|
305 | sys.exit(42)
|
---|
306 | """])
|
---|
307 | self.assertEqual(rc, 42)
|
---|
308 |
|
---|
309 | def test_finalize_with_trace(self):
|
---|
310 | # Issue1733757
|
---|
311 | # Avoid a deadlock when sys.settrace steps into threading._shutdown
|
---|
312 | p = subprocess.Popen([sys.executable, "-c", """if 1:
|
---|
313 | import sys, threading
|
---|
314 |
|
---|
315 | # A deadlock-killer, to prevent the
|
---|
316 | # testsuite to hang forever
|
---|
317 | def killer():
|
---|
318 | import os, time
|
---|
319 | time.sleep(2)
|
---|
320 | print 'program blocked; aborting'
|
---|
321 | os._exit(2)
|
---|
322 | t = threading.Thread(target=killer)
|
---|
323 | t.daemon = True
|
---|
324 | t.start()
|
---|
325 |
|
---|
326 | # This is the trace function
|
---|
327 | def func(frame, event, arg):
|
---|
328 | threading.current_thread()
|
---|
329 | return func
|
---|
330 |
|
---|
331 | sys.settrace(func)
|
---|
332 | """],
|
---|
333 | stdout=subprocess.PIPE,
|
---|
334 | stderr=subprocess.PIPE)
|
---|
335 | self.addCleanup(p.stdout.close)
|
---|
336 | self.addCleanup(p.stderr.close)
|
---|
337 | stdout, stderr = p.communicate()
|
---|
338 | rc = p.returncode
|
---|
339 | self.assertFalse(rc == 2, "interpreted was blocked")
|
---|
340 | self.assertTrue(rc == 0,
|
---|
341 | "Unexpected error: " + repr(stderr))
|
---|
342 |
|
---|
343 | def test_join_nondaemon_on_shutdown(self):
|
---|
344 | # Issue 1722344
|
---|
345 | # Raising SystemExit skipped threading._shutdown
|
---|
346 | p = subprocess.Popen([sys.executable, "-c", """if 1:
|
---|
347 | import threading
|
---|
348 | from time import sleep
|
---|
349 |
|
---|
350 | def child():
|
---|
351 | sleep(1)
|
---|
352 | # As a non-daemon thread we SHOULD wake up and nothing
|
---|
353 | # should be torn down yet
|
---|
354 | print "Woke up, sleep function is:", sleep
|
---|
355 |
|
---|
356 | threading.Thread(target=child).start()
|
---|
357 | raise SystemExit
|
---|
358 | """],
|
---|
359 | stdout=subprocess.PIPE,
|
---|
360 | stderr=subprocess.PIPE)
|
---|
361 | self.addCleanup(p.stdout.close)
|
---|
362 | self.addCleanup(p.stderr.close)
|
---|
363 | stdout, stderr = p.communicate()
|
---|
364 | self.assertEqual(stdout.strip(),
|
---|
365 | "Woke up, sleep function is: <built-in function sleep>")
|
---|
366 | stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
|
---|
367 | self.assertEqual(stderr, "")
|
---|
368 |
|
---|
369 | def test_enumerate_after_join(self):
|
---|
370 | # Try hard to trigger #1703448: a thread is still returned in
|
---|
371 | # threading.enumerate() after it has been join()ed.
|
---|
372 | enum = threading.enumerate
|
---|
373 | old_interval = sys.getcheckinterval()
|
---|
374 | try:
|
---|
375 | for i in xrange(1, 100):
|
---|
376 | # Try a couple times at each thread-switching interval
|
---|
377 | # to get more interleavings.
|
---|
378 | sys.setcheckinterval(i // 5)
|
---|
379 | t = threading.Thread(target=lambda: None)
|
---|
380 | t.start()
|
---|
381 | t.join()
|
---|
382 | l = enum()
|
---|
383 | self.assertNotIn(t, l,
|
---|
384 | "#1703448 triggered after %d trials: %s" % (i, l))
|
---|
385 | finally:
|
---|
386 | sys.setcheckinterval(old_interval)
|
---|
387 |
|
---|
388 | def test_no_refcycle_through_target(self):
|
---|
389 | class RunSelfFunction(object):
|
---|
390 | def __init__(self, should_raise):
|
---|
391 | # The links in this refcycle from Thread back to self
|
---|
392 | # should be cleaned up when the thread completes.
|
---|
393 | self.should_raise = should_raise
|
---|
394 | self.thread = threading.Thread(target=self._run,
|
---|
395 | args=(self,),
|
---|
396 | kwargs={'yet_another':self})
|
---|
397 | self.thread.start()
|
---|
398 |
|
---|
399 | def _run(self, other_ref, yet_another):
|
---|
400 | if self.should_raise:
|
---|
401 | raise SystemExit
|
---|
402 |
|
---|
403 | cyclic_object = RunSelfFunction(should_raise=False)
|
---|
404 | weak_cyclic_object = weakref.ref(cyclic_object)
|
---|
405 | cyclic_object.thread.join()
|
---|
406 | del cyclic_object
|
---|
407 | self.assertEqual(None, weak_cyclic_object(),
|
---|
408 | msg=('%d references still around' %
|
---|
409 | sys.getrefcount(weak_cyclic_object())))
|
---|
410 |
|
---|
411 | raising_cyclic_object = RunSelfFunction(should_raise=True)
|
---|
412 | weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
|
---|
413 | raising_cyclic_object.thread.join()
|
---|
414 | del raising_cyclic_object
|
---|
415 | self.assertEqual(None, weak_raising_cyclic_object(),
|
---|
416 | msg=('%d references still around' %
|
---|
417 | sys.getrefcount(weak_raising_cyclic_object())))
|
---|
418 |
|
---|
419 | @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
|
---|
420 | def test_dummy_thread_after_fork(self):
|
---|
421 | # Issue #14308: a dummy thread in the active list doesn't mess up
|
---|
422 | # the after-fork mechanism.
|
---|
423 | code = """if 1:
|
---|
424 | import thread, threading, os, time
|
---|
425 |
|
---|
426 | def background_thread(evt):
|
---|
427 | # Creates and registers the _DummyThread instance
|
---|
428 | threading.current_thread()
|
---|
429 | evt.set()
|
---|
430 | time.sleep(10)
|
---|
431 |
|
---|
432 | evt = threading.Event()
|
---|
433 | thread.start_new_thread(background_thread, (evt,))
|
---|
434 | evt.wait()
|
---|
435 | assert threading.active_count() == 2, threading.active_count()
|
---|
436 | if os.fork() == 0:
|
---|
437 | assert threading.active_count() == 1, threading.active_count()
|
---|
438 | os._exit(0)
|
---|
439 | else:
|
---|
440 | os.wait()
|
---|
441 | """
|
---|
442 | _, out, err = assert_python_ok("-c", code)
|
---|
443 | self.assertEqual(out, '')
|
---|
444 | self.assertEqual(err, '')
|
---|
445 |
|
---|
446 | @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
---|
447 | def test_is_alive_after_fork(self):
|
---|
448 | # Try hard to trigger #18418: is_alive() could sometimes be True on
|
---|
449 | # threads that vanished after a fork.
|
---|
450 | old_interval = sys.getcheckinterval()
|
---|
451 |
|
---|
452 | # Make the bug more likely to manifest.
|
---|
453 | sys.setcheckinterval(10)
|
---|
454 |
|
---|
455 | try:
|
---|
456 | for i in range(20):
|
---|
457 | t = threading.Thread(target=lambda: None)
|
---|
458 | t.start()
|
---|
459 | pid = os.fork()
|
---|
460 | if pid == 0:
|
---|
461 | os._exit(1 if t.is_alive() else 0)
|
---|
462 | else:
|
---|
463 | t.join()
|
---|
464 | pid, status = os.waitpid(pid, 0)
|
---|
465 | self.assertEqual(0, status)
|
---|
466 | finally:
|
---|
467 | sys.setcheckinterval(old_interval)
|
---|
468 |
|
---|
469 | def test_BoundedSemaphore_limit(self):
|
---|
470 | # BoundedSemaphore should raise ValueError if released too often.
|
---|
471 | for limit in range(1, 10):
|
---|
472 | bs = threading.BoundedSemaphore(limit)
|
---|
473 | threads = [threading.Thread(target=bs.acquire)
|
---|
474 | for _ in range(limit)]
|
---|
475 | for t in threads:
|
---|
476 | t.start()
|
---|
477 | for t in threads:
|
---|
478 | t.join()
|
---|
479 | threads = [threading.Thread(target=bs.release)
|
---|
480 | for _ in range(limit)]
|
---|
481 | for t in threads:
|
---|
482 | t.start()
|
---|
483 | for t in threads:
|
---|
484 | t.join()
|
---|
485 | self.assertRaises(ValueError, bs.release)
|
---|
486 |
|
---|
487 | class ThreadJoinOnShutdown(BaseTestCase):
|
---|
488 |
|
---|
489 | # Between fork() and exec(), only async-safe functions are allowed (issues
|
---|
490 | # #12316 and #11870), and fork() from a worker thread is known to trigger
|
---|
491 | # problems with some operating systems (issue #3863): skip problematic tests
|
---|
492 | # on platforms known to behave badly.
|
---|
493 | platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
|
---|
494 | 'os2emx')
|
---|
495 |
|
---|
496 | def _run_and_join(self, script):
|
---|
497 | script = """if 1:
|
---|
498 | import sys, os, time, threading
|
---|
499 |
|
---|
500 | # a thread, which waits for the main program to terminate
|
---|
501 | def joiningfunc(mainthread):
|
---|
502 | mainthread.join()
|
---|
503 | print 'end of thread'
|
---|
504 | \n""" + script
|
---|
505 |
|
---|
506 | p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
|
---|
507 | rc = p.wait()
|
---|
508 | data = p.stdout.read().replace('\r', '')
|
---|
509 | p.stdout.close()
|
---|
510 | self.assertEqual(data, "end of main\nend of thread\n")
|
---|
511 | self.assertFalse(rc == 2, "interpreter was blocked")
|
---|
512 | self.assertTrue(rc == 0, "Unexpected error")
|
---|
513 |
|
---|
514 | def test_1_join_on_shutdown(self):
|
---|
515 | # The usual case: on exit, wait for a non-daemon thread
|
---|
516 | script = """if 1:
|
---|
517 | import os
|
---|
518 | t = threading.Thread(target=joiningfunc,
|
---|
519 | args=(threading.current_thread(),))
|
---|
520 | t.start()
|
---|
521 | time.sleep(0.1)
|
---|
522 | print 'end of main'
|
---|
523 | """
|
---|
524 | self._run_and_join(script)
|
---|
525 |
|
---|
526 |
|
---|
527 | @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
---|
528 | @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
---|
529 | def test_2_join_in_forked_process(self):
|
---|
530 | # Like the test above, but from a forked interpreter
|
---|
531 | script = """if 1:
|
---|
532 | childpid = os.fork()
|
---|
533 | if childpid != 0:
|
---|
534 | os.waitpid(childpid, 0)
|
---|
535 | sys.exit(0)
|
---|
536 |
|
---|
537 | t = threading.Thread(target=joiningfunc,
|
---|
538 | args=(threading.current_thread(),))
|
---|
539 | t.start()
|
---|
540 | print 'end of main'
|
---|
541 | """
|
---|
542 | self._run_and_join(script)
|
---|
543 |
|
---|
544 | @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
---|
545 | @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
---|
546 | def test_3_join_in_forked_from_thread(self):
|
---|
547 | # Like the test above, but fork() was called from a worker thread
|
---|
548 | # In the forked process, the main Thread object must be marked as stopped.
|
---|
549 | script = """if 1:
|
---|
550 | main_thread = threading.current_thread()
|
---|
551 | def worker():
|
---|
552 | childpid = os.fork()
|
---|
553 | if childpid != 0:
|
---|
554 | os.waitpid(childpid, 0)
|
---|
555 | sys.exit(0)
|
---|
556 |
|
---|
557 | t = threading.Thread(target=joiningfunc,
|
---|
558 | args=(main_thread,))
|
---|
559 | print 'end of main'
|
---|
560 | t.start()
|
---|
561 | t.join() # Should not block: main_thread is already stopped
|
---|
562 |
|
---|
563 | w = threading.Thread(target=worker)
|
---|
564 | w.start()
|
---|
565 | """
|
---|
566 | self._run_and_join(script)
|
---|
567 |
|
---|
568 | def assertScriptHasOutput(self, script, expected_output):
|
---|
569 | p = subprocess.Popen([sys.executable, "-c", script],
|
---|
570 | stdout=subprocess.PIPE)
|
---|
571 | rc = p.wait()
|
---|
572 | data = p.stdout.read().decode().replace('\r', '')
|
---|
573 | self.assertEqual(rc, 0, "Unexpected error")
|
---|
574 | self.assertEqual(data, expected_output)
|
---|
575 |
|
---|
576 | @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
---|
577 | @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
---|
578 | def test_4_joining_across_fork_in_worker_thread(self):
|
---|
579 | # There used to be a possible deadlock when forking from a child
|
---|
580 | # thread. See http://bugs.python.org/issue6643.
|
---|
581 |
|
---|
582 | # The script takes the following steps:
|
---|
583 | # - The main thread in the parent process starts a new thread and then
|
---|
584 | # tries to join it.
|
---|
585 | # - The join operation acquires the Lock inside the thread's _block
|
---|
586 | # Condition. (See threading.py:Thread.join().)
|
---|
587 | # - We stub out the acquire method on the condition to force it to wait
|
---|
588 | # until the child thread forks. (See LOCK ACQUIRED HERE)
|
---|
589 | # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
|
---|
590 | # HERE)
|
---|
591 | # - The main thread of the parent process enters Condition.wait(),
|
---|
592 | # which releases the lock on the child thread.
|
---|
593 | # - The child process returns. Without the necessary fix, when the
|
---|
594 | # main thread of the child process (which used to be the child thread
|
---|
595 | # in the parent process) attempts to exit, it will try to acquire the
|
---|
596 | # lock in the Thread._block Condition object and hang, because the
|
---|
597 | # lock was held across the fork.
|
---|
598 |
|
---|
599 | script = """if 1:
|
---|
600 | import os, time, threading
|
---|
601 |
|
---|
602 | finish_join = False
|
---|
603 | start_fork = False
|
---|
604 |
|
---|
605 | def worker():
|
---|
606 | # Wait until this thread's lock is acquired before forking to
|
---|
607 | # create the deadlock.
|
---|
608 | global finish_join
|
---|
609 | while not start_fork:
|
---|
610 | time.sleep(0.01)
|
---|
611 | # LOCK HELD: Main thread holds lock across this call.
|
---|
612 | childpid = os.fork()
|
---|
613 | finish_join = True
|
---|
614 | if childpid != 0:
|
---|
615 | # Parent process just waits for child.
|
---|
616 | os.waitpid(childpid, 0)
|
---|
617 | # Child process should just return.
|
---|
618 |
|
---|
619 | w = threading.Thread(target=worker)
|
---|
620 |
|
---|
621 | # Stub out the private condition variable's lock acquire method.
|
---|
622 | # This acquires the lock and then waits until the child has forked
|
---|
623 | # before returning, which will release the lock soon after. If
|
---|
624 | # someone else tries to fix this test case by acquiring this lock
|
---|
625 | # before forking instead of resetting it, the test case will
|
---|
626 | # deadlock when it shouldn't.
|
---|
627 | condition = w._block
|
---|
628 | orig_acquire = condition.acquire
|
---|
629 | call_count_lock = threading.Lock()
|
---|
630 | call_count = 0
|
---|
631 | def my_acquire():
|
---|
632 | global call_count
|
---|
633 | global start_fork
|
---|
634 | orig_acquire() # LOCK ACQUIRED HERE
|
---|
635 | start_fork = True
|
---|
636 | if call_count == 0:
|
---|
637 | while not finish_join:
|
---|
638 | time.sleep(0.01) # WORKER THREAD FORKS HERE
|
---|
639 | with call_count_lock:
|
---|
640 | call_count += 1
|
---|
641 | condition.acquire = my_acquire
|
---|
642 |
|
---|
643 | w.start()
|
---|
644 | w.join()
|
---|
645 | print('end of main')
|
---|
646 | """
|
---|
647 | self.assertScriptHasOutput(script, "end of main\n")
|
---|
648 |
|
---|
649 | @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
---|
650 | @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
---|
651 | def test_5_clear_waiter_locks_to_avoid_crash(self):
|
---|
652 | # Check that a spawned thread that forks doesn't segfault on certain
|
---|
653 | # platforms, namely OS X. This used to happen if there was a waiter
|
---|
654 | # lock in the thread's condition variable's waiters list. Even though
|
---|
655 | # we know the lock will be held across the fork, it is not safe to
|
---|
656 | # release locks held across forks on all platforms, so releasing the
|
---|
657 | # waiter lock caused a segfault on OS X. Furthermore, since locks on
|
---|
658 | # OS X are (as of this writing) implemented with a mutex + condition
|
---|
659 | # variable instead of a semaphore, while we know that the Python-level
|
---|
660 | # lock will be acquired, we can't know if the internal mutex will be
|
---|
661 | # acquired at the time of the fork.
|
---|
662 |
|
---|
663 | script = """if True:
|
---|
664 | import os, time, threading
|
---|
665 |
|
---|
666 | start_fork = False
|
---|
667 |
|
---|
668 | def worker():
|
---|
669 | # Wait until the main thread has attempted to join this thread
|
---|
670 | # before continuing.
|
---|
671 | while not start_fork:
|
---|
672 | time.sleep(0.01)
|
---|
673 | childpid = os.fork()
|
---|
674 | if childpid != 0:
|
---|
675 | # Parent process just waits for child.
|
---|
676 | (cpid, rc) = os.waitpid(childpid, 0)
|
---|
677 | assert cpid == childpid
|
---|
678 | assert rc == 0
|
---|
679 | print('end of worker thread')
|
---|
680 | else:
|
---|
681 | # Child process should just return.
|
---|
682 | pass
|
---|
683 |
|
---|
684 | w = threading.Thread(target=worker)
|
---|
685 |
|
---|
686 | # Stub out the private condition variable's _release_save method.
|
---|
687 | # This releases the condition's lock and flips the global that
|
---|
688 | # causes the worker to fork. At this point, the problematic waiter
|
---|
689 | # lock has been acquired once by the waiter and has been put onto
|
---|
690 | # the waiters list.
|
---|
691 | condition = w._block
|
---|
692 | orig_release_save = condition._release_save
|
---|
693 | def my_release_save():
|
---|
694 | global start_fork
|
---|
695 | orig_release_save()
|
---|
696 | # Waiter lock held here, condition lock released.
|
---|
697 | start_fork = True
|
---|
698 | condition._release_save = my_release_save
|
---|
699 |
|
---|
700 | w.start()
|
---|
701 | w.join()
|
---|
702 | print('end of main thread')
|
---|
703 | """
|
---|
704 | output = "end of worker thread\nend of main thread\n"
|
---|
705 | self.assertScriptHasOutput(script, output)
|
---|
706 |
|
---|
707 | @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
---|
708 | @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
---|
709 | def test_reinit_tls_after_fork(self):
|
---|
710 | # Issue #13817: fork() would deadlock in a multithreaded program with
|
---|
711 | # the ad-hoc TLS implementation.
|
---|
712 |
|
---|
713 | def do_fork_and_wait():
|
---|
714 | # just fork a child process and wait it
|
---|
715 | pid = os.fork()
|
---|
716 | if pid > 0:
|
---|
717 | os.waitpid(pid, 0)
|
---|
718 | else:
|
---|
719 | os._exit(0)
|
---|
720 |
|
---|
721 | # start a bunch of threads that will fork() child processes
|
---|
722 | threads = []
|
---|
723 | for i in range(16):
|
---|
724 | t = threading.Thread(target=do_fork_and_wait)
|
---|
725 | threads.append(t)
|
---|
726 | t.start()
|
---|
727 |
|
---|
728 | for t in threads:
|
---|
729 | t.join()
|
---|
730 |
|
---|
731 |
|
---|
732 | class ThreadingExceptionTests(BaseTestCase):
|
---|
733 | # A RuntimeError should be raised if Thread.start() is called
|
---|
734 | # multiple times.
|
---|
735 | def test_start_thread_again(self):
|
---|
736 | thread = threading.Thread()
|
---|
737 | thread.start()
|
---|
738 | self.assertRaises(RuntimeError, thread.start)
|
---|
739 |
|
---|
740 | def test_joining_current_thread(self):
|
---|
741 | current_thread = threading.current_thread()
|
---|
742 | self.assertRaises(RuntimeError, current_thread.join);
|
---|
743 |
|
---|
744 | def test_joining_inactive_thread(self):
|
---|
745 | thread = threading.Thread()
|
---|
746 | self.assertRaises(RuntimeError, thread.join)
|
---|
747 |
|
---|
748 | def test_daemonize_active_thread(self):
|
---|
749 | thread = threading.Thread()
|
---|
750 | thread.start()
|
---|
751 | self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
|
---|
752 |
|
---|
753 |
|
---|
754 | class LockTests(lock_tests.LockTests):
|
---|
755 | locktype = staticmethod(threading.Lock)
|
---|
756 |
|
---|
757 | class RLockTests(lock_tests.RLockTests):
|
---|
758 | locktype = staticmethod(threading.RLock)
|
---|
759 |
|
---|
760 | class EventTests(lock_tests.EventTests):
|
---|
761 | eventtype = staticmethod(threading.Event)
|
---|
762 |
|
---|
763 | class ConditionAsRLockTests(lock_tests.RLockTests):
|
---|
764 | # An Condition uses an RLock by default and exports its API.
|
---|
765 | locktype = staticmethod(threading.Condition)
|
---|
766 |
|
---|
767 | class ConditionTests(lock_tests.ConditionTests):
|
---|
768 | condtype = staticmethod(threading.Condition)
|
---|
769 |
|
---|
770 | class SemaphoreTests(lock_tests.SemaphoreTests):
|
---|
771 | semtype = staticmethod(threading.Semaphore)
|
---|
772 |
|
---|
773 | class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
|
---|
774 | semtype = staticmethod(threading.BoundedSemaphore)
|
---|
775 |
|
---|
776 | @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
|
---|
777 | def test_recursion_limit(self):
|
---|
778 | # Issue 9670
|
---|
779 | # test that excessive recursion within a non-main thread causes
|
---|
780 | # an exception rather than crashing the interpreter on platforms
|
---|
781 | # like Mac OS X or FreeBSD which have small default stack sizes
|
---|
782 | # for threads
|
---|
783 | script = """if True:
|
---|
784 | import threading
|
---|
785 |
|
---|
786 | def recurse():
|
---|
787 | return recurse()
|
---|
788 |
|
---|
789 | def outer():
|
---|
790 | try:
|
---|
791 | recurse()
|
---|
792 | except RuntimeError:
|
---|
793 | pass
|
---|
794 |
|
---|
795 | w = threading.Thread(target=outer)
|
---|
796 | w.start()
|
---|
797 | w.join()
|
---|
798 | print('end of main thread')
|
---|
799 | """
|
---|
800 | expected_output = "end of main thread\n"
|
---|
801 | p = subprocess.Popen([sys.executable, "-c", script],
|
---|
802 | stdout=subprocess.PIPE)
|
---|
803 | stdout, stderr = p.communicate()
|
---|
804 | data = stdout.decode().replace('\r', '')
|
---|
805 | self.assertEqual(p.returncode, 0, "Unexpected error")
|
---|
806 | self.assertEqual(data, expected_output)
|
---|
807 |
|
---|
808 | def test_main():
|
---|
809 | test.test_support.run_unittest(LockTests, RLockTests, EventTests,
|
---|
810 | ConditionAsRLockTests, ConditionTests,
|
---|
811 | SemaphoreTests, BoundedSemaphoreTests,
|
---|
812 | ThreadTests,
|
---|
813 | ThreadJoinOnShutdown,
|
---|
814 | ThreadingExceptionTests,
|
---|
815 | )
|
---|
816 |
|
---|
817 | if __name__ == "__main__":
|
---|
818 | test_main()
|
---|