source: python/vendor/Python-2.7.6/Lib/threading.py

Last change on this file was 388, checked in by dmik, 11 years ago

python: Update vendor to 2.7.6.

  • Property svn:eol-style set to native
File size: 46.1 KB
Line 
1"""Thread module emulating a subset of Java's threading model."""
2
3import sys as _sys
4
5try:
6 import thread
7except ImportError:
8 del _sys.modules[__name__]
9 raise
10
11import warnings
12
13from collections import deque as _deque
14from time import time as _time, sleep as _sleep
15from traceback import format_exc as _format_exc
16
17# Note regarding PEP 8 compliant aliases
18# This threading model was originally inspired by Java, and inherited
19# the convention of camelCase function and method names from that
20# language. While those names are not in any imminent danger of being
21# deprecated, starting with Python 2.6, the module now provides a
22# PEP 8 compliant alias for any such method name.
23# Using the new PEP 8 compliant names also facilitates substitution
24# with the multiprocessing module, which doesn't provide the old
25# Java inspired names.
26
27
28# Rename some stuff so "from threading import *" is safe
29__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
30 'current_thread', 'enumerate', 'Event',
31 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
32 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
33
34_start_new_thread = thread.start_new_thread
35_allocate_lock = thread.allocate_lock
36_get_ident = thread.get_ident
37ThreadError = thread.error
38del thread
39
40
41# sys.exc_clear is used to work around the fact that except blocks
42# don't fully clear the exception until 3.0.
43warnings.filterwarnings('ignore', category=DeprecationWarning,
44 module='threading', message='sys.exc_clear')
45
46# Debug support (adapted from ihooks.py).
47# All the major classes here derive from _Verbose. We force that to
48# be a new-style class so that all the major classes here are new-style.
49# This helps debugging (type(instance) is more revealing for instances
50# of new-style classes).
51
52_VERBOSE = False
53
54if __debug__:
55
56 class _Verbose(object):
57
58 def __init__(self, verbose=None):
59 if verbose is None:
60 verbose = _VERBOSE
61 self.__verbose = verbose
62
63 def _note(self, format, *args):
64 if self.__verbose:
65 format = format % args
66 # Issue #4188: calling current_thread() can incur an infinite
67 # recursion if it has to create a DummyThread on the fly.
68 ident = _get_ident()
69 try:
70 name = _active[ident].name
71 except KeyError:
72 name = "<OS thread %d>" % ident
73 format = "%s: %s\n" % (name, format)
74 _sys.stderr.write(format)
75
76else:
77 # Disable this when using "python -O"
78 class _Verbose(object):
79 def __init__(self, verbose=None):
80 pass
81 def _note(self, *args):
82 pass
83
84# Support for profile and trace hooks
85
86_profile_hook = None
87_trace_hook = None
88
89def setprofile(func):
90 """Set a profile function for all threads started from the threading module.
91
92 The func will be passed to sys.setprofile() for each thread, before its
93 run() method is called.
94
95 """
96 global _profile_hook
97 _profile_hook = func
98
99def settrace(func):
100 """Set a trace function for all threads started from the threading module.
101
102 The func will be passed to sys.settrace() for each thread, before its run()
103 method is called.
104
105 """
106 global _trace_hook
107 _trace_hook = func
108
109# Synchronization classes
110
111Lock = _allocate_lock
112
113def RLock(*args, **kwargs):
114 """Factory function that returns a new reentrant lock.
115
116 A reentrant lock must be released by the thread that acquired it. Once a
117 thread has acquired a reentrant lock, the same thread may acquire it again
118 without blocking; the thread must release it once for each time it has
119 acquired it.
120
121 """
122 return _RLock(*args, **kwargs)
123
124class _RLock(_Verbose):
125 """A reentrant lock must be released by the thread that acquired it. Once a
126 thread has acquired a reentrant lock, the same thread may acquire it
127 again without blocking; the thread must release it once for each time it
128 has acquired it.
129 """
130
131 def __init__(self, verbose=None):
132 _Verbose.__init__(self, verbose)
133 self.__block = _allocate_lock()
134 self.__owner = None
135 self.__count = 0
136
137 def __repr__(self):
138 owner = self.__owner
139 try:
140 owner = _active[owner].name
141 except KeyError:
142 pass
143 return "<%s owner=%r count=%d>" % (
144 self.__class__.__name__, owner, self.__count)
145
146 def acquire(self, blocking=1):
147 """Acquire a lock, blocking or non-blocking.
148
149 When invoked without arguments: if this thread already owns the lock,
150 increment the recursion level by one, and return immediately. Otherwise,
151 if another thread owns the lock, block until the lock is unlocked. Once
152 the lock is unlocked (not owned by any thread), then grab ownership, set
153 the recursion level to one, and return. If more than one thread is
154 blocked waiting until the lock is unlocked, only one at a time will be
155 able to grab ownership of the lock. There is no return value in this
156 case.
157
158 When invoked with the blocking argument set to true, do the same thing
159 as when called without arguments, and return true.
160
161 When invoked with the blocking argument set to false, do not block. If a
162 call without an argument would block, return false immediately;
163 otherwise, do the same thing as when called without arguments, and
164 return true.
165
166 """
167 me = _get_ident()
168 if self.__owner == me:
169 self.__count = self.__count + 1
170 if __debug__:
171 self._note("%s.acquire(%s): recursive success", self, blocking)
172 return 1
173 rc = self.__block.acquire(blocking)
174 if rc:
175 self.__owner = me
176 self.__count = 1
177 if __debug__:
178 self._note("%s.acquire(%s): initial success", self, blocking)
179 else:
180 if __debug__:
181 self._note("%s.acquire(%s): failure", self, blocking)
182 return rc
183
184 __enter__ = acquire
185
186 def release(self):
187 """Release a lock, decrementing the recursion level.
188
189 If after the decrement it is zero, reset the lock to unlocked (not owned
190 by any thread), and if any other threads are blocked waiting for the
191 lock to become unlocked, allow exactly one of them to proceed. If after
192 the decrement the recursion level is still nonzero, the lock remains
193 locked and owned by the calling thread.
194
195 Only call this method when the calling thread owns the lock. A
196 RuntimeError is raised if this method is called when the lock is
197 unlocked.
198
199 There is no return value.
200
201 """
202 if self.__owner != _get_ident():
203 raise RuntimeError("cannot release un-acquired lock")
204 self.__count = count = self.__count - 1
205 if not count:
206 self.__owner = None
207 self.__block.release()
208 if __debug__:
209 self._note("%s.release(): final release", self)
210 else:
211 if __debug__:
212 self._note("%s.release(): non-final release", self)
213
214 def __exit__(self, t, v, tb):
215 self.release()
216
217 # Internal methods used by condition variables
218
219 def _acquire_restore(self, count_owner):
220 count, owner = count_owner
221 self.__block.acquire()
222 self.__count = count
223 self.__owner = owner
224 if __debug__:
225 self._note("%s._acquire_restore()", self)
226
227 def _release_save(self):
228 if __debug__:
229 self._note("%s._release_save()", self)
230 count = self.__count
231 self.__count = 0
232 owner = self.__owner
233 self.__owner = None
234 self.__block.release()
235 return (count, owner)
236
237 def _is_owned(self):
238 return self.__owner == _get_ident()
239
240
241def Condition(*args, **kwargs):
242 """Factory function that returns a new condition variable object.
243
244 A condition variable allows one or more threads to wait until they are
245 notified by another thread.
246
247 If the lock argument is given and not None, it must be a Lock or RLock
248 object, and it is used as the underlying lock. Otherwise, a new RLock object
249 is created and used as the underlying lock.
250
251 """
252 return _Condition(*args, **kwargs)
253
254class _Condition(_Verbose):
255 """Condition variables allow one or more threads to wait until they are
256 notified by another thread.
257 """
258
259 def __init__(self, lock=None, verbose=None):
260 _Verbose.__init__(self, verbose)
261 if lock is None:
262 lock = RLock()
263 self.__lock = lock
264 # Export the lock's acquire() and release() methods
265 self.acquire = lock.acquire
266 self.release = lock.release
267 # If the lock defines _release_save() and/or _acquire_restore(),
268 # these override the default implementations (which just call
269 # release() and acquire() on the lock). Ditto for _is_owned().
270 try:
271 self._release_save = lock._release_save
272 except AttributeError:
273 pass
274 try:
275 self._acquire_restore = lock._acquire_restore
276 except AttributeError:
277 pass
278 try:
279 self._is_owned = lock._is_owned
280 except AttributeError:
281 pass
282 self.__waiters = []
283
284 def __enter__(self):
285 return self.__lock.__enter__()
286
287 def __exit__(self, *args):
288 return self.__lock.__exit__(*args)
289
290 def __repr__(self):
291 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
292
293 def _release_save(self):
294 self.__lock.release() # No state to save
295
296 def _acquire_restore(self, x):
297 self.__lock.acquire() # Ignore saved state
298
299 def _is_owned(self):
300 # Return True if lock is owned by current_thread.
301 # This method is called only if __lock doesn't have _is_owned().
302 if self.__lock.acquire(0):
303 self.__lock.release()
304 return False
305 else:
306 return True
307
308 def wait(self, timeout=None):
309 """Wait until notified or until a timeout occurs.
310
311 If the calling thread has not acquired the lock when this method is
312 called, a RuntimeError is raised.
313
314 This method releases the underlying lock, and then blocks until it is
315 awakened by a notify() or notifyAll() call for the same condition
316 variable in another thread, or until the optional timeout occurs. Once
317 awakened or timed out, it re-acquires the lock and returns.
318
319 When the timeout argument is present and not None, it should be a
320 floating point number specifying a timeout for the operation in seconds
321 (or fractions thereof).
322
323 When the underlying lock is an RLock, it is not released using its
324 release() method, since this may not actually unlock the lock when it
325 was acquired multiple times recursively. Instead, an internal interface
326 of the RLock class is used, which really unlocks it even when it has
327 been recursively acquired several times. Another internal interface is
328 then used to restore the recursion level when the lock is reacquired.
329
330 """
331 if not self._is_owned():
332 raise RuntimeError("cannot wait on un-acquired lock")
333 waiter = _allocate_lock()
334 waiter.acquire()
335 self.__waiters.append(waiter)
336 saved_state = self._release_save()
337 try: # restore state no matter what (e.g., KeyboardInterrupt)
338 if timeout is None:
339 waiter.acquire()
340 if __debug__:
341 self._note("%s.wait(): got it", self)
342 else:
343 # Balancing act: We can't afford a pure busy loop, so we
344 # have to sleep; but if we sleep the whole timeout time,
345 # we'll be unresponsive. The scheme here sleeps very
346 # little at first, longer as time goes on, but never longer
347 # than 20 times per second (or the timeout time remaining).
348 endtime = _time() + timeout
349 delay = 0.0005 # 500 us -> initial delay of 1 ms
350 while True:
351 gotit = waiter.acquire(0)
352 if gotit:
353 break
354 remaining = endtime - _time()
355 if remaining <= 0:
356 break
357 delay = min(delay * 2, remaining, .05)
358 _sleep(delay)
359 if not gotit:
360 if __debug__:
361 self._note("%s.wait(%s): timed out", self, timeout)
362 try:
363 self.__waiters.remove(waiter)
364 except ValueError:
365 pass
366 else:
367 if __debug__:
368 self._note("%s.wait(%s): got it", self, timeout)
369 finally:
370 self._acquire_restore(saved_state)
371
372 def notify(self, n=1):
373 """Wake up one or more threads waiting on this condition, if any.
374
375 If the calling thread has not acquired the lock when this method is
376 called, a RuntimeError is raised.
377
378 This method wakes up at most n of the threads waiting for the condition
379 variable; it is a no-op if no threads are waiting.
380
381 """
382 if not self._is_owned():
383 raise RuntimeError("cannot notify on un-acquired lock")
384 __waiters = self.__waiters
385 waiters = __waiters[:n]
386 if not waiters:
387 if __debug__:
388 self._note("%s.notify(): no waiters", self)
389 return
390 self._note("%s.notify(): notifying %d waiter%s", self, n,
391 n!=1 and "s" or "")
392 for waiter in waiters:
393 waiter.release()
394 try:
395 __waiters.remove(waiter)
396 except ValueError:
397 pass
398
399 def notifyAll(self):
400 """Wake up all threads waiting on this condition.
401
402 If the calling thread has not acquired the lock when this method
403 is called, a RuntimeError is raised.
404
405 """
406 self.notify(len(self.__waiters))
407
408 notify_all = notifyAll
409
410
411def Semaphore(*args, **kwargs):
412 """A factory function that returns a new semaphore.
413
414 Semaphores manage a counter representing the number of release() calls minus
415 the number of acquire() calls, plus an initial value. The acquire() method
416 blocks if necessary until it can return without making the counter
417 negative. If not given, value defaults to 1.
418
419 """
420 return _Semaphore(*args, **kwargs)
421
422class _Semaphore(_Verbose):
423 """Semaphores manage a counter representing the number of release() calls
424 minus the number of acquire() calls, plus an initial value. The acquire()
425 method blocks if necessary until it can return without making the counter
426 negative. If not given, value defaults to 1.
427
428 """
429
430 # After Tim Peters' semaphore class, but not quite the same (no maximum)
431
432 def __init__(self, value=1, verbose=None):
433 if value < 0:
434 raise ValueError("semaphore initial value must be >= 0")
435 _Verbose.__init__(self, verbose)
436 self.__cond = Condition(Lock())
437 self.__value = value
438
439 def acquire(self, blocking=1):
440 """Acquire a semaphore, decrementing the internal counter by one.
441
442 When invoked without arguments: if the internal counter is larger than
443 zero on entry, decrement it by one and return immediately. If it is zero
444 on entry, block, waiting until some other thread has called release() to
445 make it larger than zero. This is done with proper interlocking so that
446 if multiple acquire() calls are blocked, release() will wake exactly one
447 of them up. The implementation may pick one at random, so the order in
448 which blocked threads are awakened should not be relied on. There is no
449 return value in this case.
450
451 When invoked with blocking set to true, do the same thing as when called
452 without arguments, and return true.
453
454 When invoked with blocking set to false, do not block. If a call without
455 an argument would block, return false immediately; otherwise, do the
456 same thing as when called without arguments, and return true.
457
458 """
459 rc = False
460 with self.__cond:
461 while self.__value == 0:
462 if not blocking:
463 break
464 if __debug__:
465 self._note("%s.acquire(%s): blocked waiting, value=%s",
466 self, blocking, self.__value)
467 self.__cond.wait()
468 else:
469 self.__value = self.__value - 1
470 if __debug__:
471 self._note("%s.acquire: success, value=%s",
472 self, self.__value)
473 rc = True
474 return rc
475
476 __enter__ = acquire
477
478 def release(self):
479 """Release a semaphore, incrementing the internal counter by one.
480
481 When the counter is zero on entry and another thread is waiting for it
482 to become larger than zero again, wake up that thread.
483
484 """
485 with self.__cond:
486 self.__value = self.__value + 1
487 if __debug__:
488 self._note("%s.release: success, value=%s",
489 self, self.__value)
490 self.__cond.notify()
491
492 def __exit__(self, t, v, tb):
493 self.release()
494
495
496def BoundedSemaphore(*args, **kwargs):
497 """A factory function that returns a new bounded semaphore.
498
499 A bounded semaphore checks to make sure its current value doesn't exceed its
500 initial value. If it does, ValueError is raised. In most situations
501 semaphores are used to guard resources with limited capacity.
502
503 If the semaphore is released too many times it's a sign of a bug. If not
504 given, value defaults to 1.
505
506 Like regular semaphores, bounded semaphores manage a counter representing
507 the number of release() calls minus the number of acquire() calls, plus an
508 initial value. The acquire() method blocks if necessary until it can return
509 without making the counter negative. If not given, value defaults to 1.
510
511 """
512 return _BoundedSemaphore(*args, **kwargs)
513
514class _BoundedSemaphore(_Semaphore):
515 """A bounded semaphore checks to make sure its current value doesn't exceed
516 its initial value. If it does, ValueError is raised. In most situations
517 semaphores are used to guard resources with limited capacity.
518 """
519
520 def __init__(self, value=1, verbose=None):
521 _Semaphore.__init__(self, value, verbose)
522 self._initial_value = value
523
524 def release(self):
525 """Release a semaphore, incrementing the internal counter by one.
526
527 When the counter is zero on entry and another thread is waiting for it
528 to become larger than zero again, wake up that thread.
529
530 If the number of releases exceeds the number of acquires,
531 raise a ValueError.
532
533 """
534 with self._Semaphore__cond:
535 if self._Semaphore__value >= self._initial_value:
536 raise ValueError("Semaphore released too many times")
537 self._Semaphore__value += 1
538 self._Semaphore__cond.notify()
539
540
541def Event(*args, **kwargs):
542 """A factory function that returns a new event.
543
544 Events manage a flag that can be set to true with the set() method and reset
545 to false with the clear() method. The wait() method blocks until the flag is
546 true.
547
548 """
549 return _Event(*args, **kwargs)
550
551class _Event(_Verbose):
552 """A factory function that returns a new event object. An event manages a
553 flag that can be set to true with the set() method and reset to false
554 with the clear() method. The wait() method blocks until the flag is true.
555
556 """
557
558 # After Tim Peters' event class (without is_posted())
559
560 def __init__(self, verbose=None):
561 _Verbose.__init__(self, verbose)
562 self.__cond = Condition(Lock())
563 self.__flag = False
564
565 def _reset_internal_locks(self):
566 # private! called by Thread._reset_internal_locks by _after_fork()
567 self.__cond.__init__()
568
569 def isSet(self):
570 'Return true if and only if the internal flag is true.'
571 return self.__flag
572
573 is_set = isSet
574
575 def set(self):
576 """Set the internal flag to true.
577
578 All threads waiting for the flag to become true are awakened. Threads
579 that call wait() once the flag is true will not block at all.
580
581 """
582 self.__cond.acquire()
583 try:
584 self.__flag = True
585 self.__cond.notify_all()
586 finally:
587 self.__cond.release()
588
589 def clear(self):
590 """Reset the internal flag to false.
591
592 Subsequently, threads calling wait() will block until set() is called to
593 set the internal flag to true again.
594
595 """
596 self.__cond.acquire()
597 try:
598 self.__flag = False
599 finally:
600 self.__cond.release()
601
602 def wait(self, timeout=None):
603 """Block until the internal flag is true.
604
605 If the internal flag is true on entry, return immediately. Otherwise,
606 block until another thread calls set() to set the flag to true, or until
607 the optional timeout occurs.
608
609 When the timeout argument is present and not None, it should be a
610 floating point number specifying a timeout for the operation in seconds
611 (or fractions thereof).
612
613 This method returns the internal flag on exit, so it will always return
614 True except if a timeout is given and the operation times out.
615
616 """
617 self.__cond.acquire()
618 try:
619 if not self.__flag:
620 self.__cond.wait(timeout)
621 return self.__flag
622 finally:
623 self.__cond.release()
624
625# Helper to generate new thread names
626_counter = 0
627def _newname(template="Thread-%d"):
628 global _counter
629 _counter = _counter + 1
630 return template % _counter
631
632# Active thread administration
633_active_limbo_lock = _allocate_lock()
634_active = {} # maps thread id to Thread object
635_limbo = {}
636
637
638# Main class for threads
639
640class Thread(_Verbose):
641 """A class that represents a thread of control.
642
643 This class can be safely subclassed in a limited fashion.
644
645 """
646 __initialized = False
647 # Need to store a reference to sys.exc_info for printing
648 # out exceptions when a thread tries to use a global var. during interp.
649 # shutdown and thus raises an exception about trying to perform some
650 # operation on/with a NoneType
651 __exc_info = _sys.exc_info
652 # Keep sys.exc_clear too to clear the exception just before
653 # allowing .join() to return.
654 __exc_clear = _sys.exc_clear
655
656 def __init__(self, group=None, target=None, name=None,
657 args=(), kwargs=None, verbose=None):
658 """This constructor should always be called with keyword arguments. Arguments are:
659
660 *group* should be None; reserved for future extension when a ThreadGroup
661 class is implemented.
662
663 *target* is the callable object to be invoked by the run()
664 method. Defaults to None, meaning nothing is called.
665
666 *name* is the thread name. By default, a unique name is constructed of
667 the form "Thread-N" where N is a small decimal number.
668
669 *args* is the argument tuple for the target invocation. Defaults to ().
670
671 *kwargs* is a dictionary of keyword arguments for the target
672 invocation. Defaults to {}.
673
674 If a subclass overrides the constructor, it must make sure to invoke
675 the base class constructor (Thread.__init__()) before doing anything
676 else to the thread.
677
678"""
679 assert group is None, "group argument must be None for now"
680 _Verbose.__init__(self, verbose)
681 if kwargs is None:
682 kwargs = {}
683 self.__target = target
684 self.__name = str(name or _newname())
685 self.__args = args
686 self.__kwargs = kwargs
687 self.__daemonic = self._set_daemon()
688 self.__ident = None
689 self.__started = Event()
690 self.__stopped = False
691 self.__block = Condition(Lock())
692 self.__initialized = True
693 # sys.stderr is not stored in the class like
694 # sys.exc_info since it can be changed between instances
695 self.__stderr = _sys.stderr
696
697 def _reset_internal_locks(self):
698 # private! Called by _after_fork() to reset our internal locks as
699 # they may be in an invalid state leading to a deadlock or crash.
700 if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block
701 self.__block.__init__()
702 self.__started._reset_internal_locks()
703
704 @property
705 def _block(self):
706 # used by a unittest
707 return self.__block
708
709 def _set_daemon(self):
710 # Overridden in _MainThread and _DummyThread
711 return current_thread().daemon
712
713 def __repr__(self):
714 assert self.__initialized, "Thread.__init__() was not called"
715 status = "initial"
716 if self.__started.is_set():
717 status = "started"
718 if self.__stopped:
719 status = "stopped"
720 if self.__daemonic:
721 status += " daemon"
722 if self.__ident is not None:
723 status += " %s" % self.__ident
724 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
725
726 def start(self):
727 """Start the thread's activity.
728
729 It must be called at most once per thread object. It arranges for the
730 object's run() method to be invoked in a separate thread of control.
731
732 This method will raise a RuntimeError if called more than once on the
733 same thread object.
734
735 """
736 if not self.__initialized:
737 raise RuntimeError("thread.__init__() not called")
738 if self.__started.is_set():
739 raise RuntimeError("threads can only be started once")
740 if __debug__:
741 self._note("%s.start(): starting thread", self)
742 with _active_limbo_lock:
743 _limbo[self] = self
744 try:
745 _start_new_thread(self.__bootstrap, ())
746 except Exception:
747 with _active_limbo_lock:
748 del _limbo[self]
749 raise
750 self.__started.wait()
751
752 def run(self):
753 """Method representing the thread's activity.
754
755 You may override this method in a subclass. The standard run() method
756 invokes the callable object passed to the object's constructor as the
757 target argument, if any, with sequential and keyword arguments taken
758 from the args and kwargs arguments, respectively.
759
760 """
761 try:
762 if self.__target:
763 self.__target(*self.__args, **self.__kwargs)
764 finally:
765 # Avoid a refcycle if the thread is running a function with
766 # an argument that has a member that points to the thread.
767 del self.__target, self.__args, self.__kwargs
768
769 def __bootstrap(self):
770 # Wrapper around the real bootstrap code that ignores
771 # exceptions during interpreter cleanup. Those typically
772 # happen when a daemon thread wakes up at an unfortunate
773 # moment, finds the world around it destroyed, and raises some
774 # random exception *** while trying to report the exception in
775 # __bootstrap_inner() below ***. Those random exceptions
776 # don't help anybody, and they confuse users, so we suppress
777 # them. We suppress them only when it appears that the world
778 # indeed has already been destroyed, so that exceptions in
779 # __bootstrap_inner() during normal business hours are properly
780 # reported. Also, we only suppress them for daemonic threads;
781 # if a non-daemonic encounters this, something else is wrong.
782 try:
783 self.__bootstrap_inner()
784 except:
785 if self.__daemonic and _sys is None:
786 return
787 raise
788
789 def _set_ident(self):
790 self.__ident = _get_ident()
791
792 def __bootstrap_inner(self):
793 try:
794 self._set_ident()
795 self.__started.set()
796 with _active_limbo_lock:
797 _active[self.__ident] = self
798 del _limbo[self]
799 if __debug__:
800 self._note("%s.__bootstrap(): thread started", self)
801
802 if _trace_hook:
803 self._note("%s.__bootstrap(): registering trace hook", self)
804 _sys.settrace(_trace_hook)
805 if _profile_hook:
806 self._note("%s.__bootstrap(): registering profile hook", self)
807 _sys.setprofile(_profile_hook)
808
809 try:
810 self.run()
811 except SystemExit:
812 if __debug__:
813 self._note("%s.__bootstrap(): raised SystemExit", self)
814 except:
815 if __debug__:
816 self._note("%s.__bootstrap(): unhandled exception", self)
817 # If sys.stderr is no more (most likely from interpreter
818 # shutdown) use self.__stderr. Otherwise still use sys (as in
819 # _sys) in case sys.stderr was redefined since the creation of
820 # self.
821 if _sys:
822 _sys.stderr.write("Exception in thread %s:\n%s\n" %
823 (self.name, _format_exc()))
824 else:
825 # Do the best job possible w/o a huge amt. of code to
826 # approximate a traceback (code ideas from
827 # Lib/traceback.py)
828 exc_type, exc_value, exc_tb = self.__exc_info()
829 try:
830 print>>self.__stderr, (
831 "Exception in thread " + self.name +
832 " (most likely raised during interpreter shutdown):")
833 print>>self.__stderr, (
834 "Traceback (most recent call last):")
835 while exc_tb:
836 print>>self.__stderr, (
837 ' File "%s", line %s, in %s' %
838 (exc_tb.tb_frame.f_code.co_filename,
839 exc_tb.tb_lineno,
840 exc_tb.tb_frame.f_code.co_name))
841 exc_tb = exc_tb.tb_next
842 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
843 # Make sure that exc_tb gets deleted since it is a memory
844 # hog; deleting everything else is just for thoroughness
845 finally:
846 del exc_type, exc_value, exc_tb
847 else:
848 if __debug__:
849 self._note("%s.__bootstrap(): normal return", self)
850 finally:
851 # Prevent a race in
852 # test_threading.test_no_refcycle_through_target when
853 # the exception keeps the target alive past when we
854 # assert that it's dead.
855 self.__exc_clear()
856 finally:
857 with _active_limbo_lock:
858 self.__stop()
859 try:
860 # We don't call self.__delete() because it also
861 # grabs _active_limbo_lock.
862 del _active[_get_ident()]
863 except:
864 pass
865
866 def __stop(self):
867 # DummyThreads delete self.__block, but they have no waiters to
868 # notify anyway (join() is forbidden on them).
869 if not hasattr(self, '_Thread__block'):
870 return
871 self.__block.acquire()
872 self.__stopped = True
873 self.__block.notify_all()
874 self.__block.release()
875
876 def __delete(self):
877 "Remove current thread from the dict of currently running threads."
878
879 # Notes about running with dummy_thread:
880 #
881 # Must take care to not raise an exception if dummy_thread is being
882 # used (and thus this module is being used as an instance of
883 # dummy_threading). dummy_thread.get_ident() always returns -1 since
884 # there is only one thread if dummy_thread is being used. Thus
885 # len(_active) is always <= 1 here, and any Thread instance created
886 # overwrites the (if any) thread currently registered in _active.
887 #
888 # An instance of _MainThread is always created by 'threading'. This
889 # gets overwritten the instant an instance of Thread is created; both
890 # threads return -1 from dummy_thread.get_ident() and thus have the
891 # same key in the dict. So when the _MainThread instance created by
892 # 'threading' tries to clean itself up when atexit calls this method
893 # it gets a KeyError if another Thread instance was created.
894 #
895 # This all means that KeyError from trying to delete something from
896 # _active if dummy_threading is being used is a red herring. But
897 # since it isn't if dummy_threading is *not* being used then don't
898 # hide the exception.
899
900 try:
901 with _active_limbo_lock:
902 del _active[_get_ident()]
903 # There must not be any python code between the previous line
904 # and after the lock is released. Otherwise a tracing function
905 # could try to acquire the lock again in the same thread, (in
906 # current_thread()), and would block.
907 except KeyError:
908 if 'dummy_threading' not in _sys.modules:
909 raise
910
911 def join(self, timeout=None):
912 """Wait until the thread terminates.
913
914 This blocks the calling thread until the thread whose join() method is
915 called terminates -- either normally or through an unhandled exception
916 or until the optional timeout occurs.
917
918 When the timeout argument is present and not None, it should be a
919 floating point number specifying a timeout for the operation in seconds
920 (or fractions thereof). As join() always returns None, you must call
921 isAlive() after join() to decide whether a timeout happened -- if the
922 thread is still alive, the join() call timed out.
923
924 When the timeout argument is not present or None, the operation will
925 block until the thread terminates.
926
927 A thread can be join()ed many times.
928
929 join() raises a RuntimeError if an attempt is made to join the current
930 thread as that would cause a deadlock. It is also an error to join() a
931 thread before it has been started and attempts to do so raises the same
932 exception.
933
934 """
935 if not self.__initialized:
936 raise RuntimeError("Thread.__init__() not called")
937 if not self.__started.is_set():
938 raise RuntimeError("cannot join thread before it is started")
939 if self is current_thread():
940 raise RuntimeError("cannot join current thread")
941
942 if __debug__:
943 if not self.__stopped:
944 self._note("%s.join(): waiting until thread stops", self)
945 self.__block.acquire()
946 try:
947 if timeout is None:
948 while not self.__stopped:
949 self.__block.wait()
950 if __debug__:
951 self._note("%s.join(): thread stopped", self)
952 else:
953 deadline = _time() + timeout
954 while not self.__stopped:
955 delay = deadline - _time()
956 if delay <= 0:
957 if __debug__:
958 self._note("%s.join(): timed out", self)
959 break
960 self.__block.wait(delay)
961 else:
962 if __debug__:
963 self._note("%s.join(): thread stopped", self)
964 finally:
965 self.__block.release()
966
967 @property
968 def name(self):
969 """A string used for identification purposes only.
970
971 It has no semantics. Multiple threads may be given the same name. The
972 initial name is set by the constructor.
973
974 """
975 assert self.__initialized, "Thread.__init__() not called"
976 return self.__name
977
978 @name.setter
979 def name(self, name):
980 assert self.__initialized, "Thread.__init__() not called"
981 self.__name = str(name)
982
983 @property
984 def ident(self):
985 """Thread identifier of this thread or None if it has not been started.
986
987 This is a nonzero integer. See the thread.get_ident() function. Thread
988 identifiers may be recycled when a thread exits and another thread is
989 created. The identifier is available even after the thread has exited.
990
991 """
992 assert self.__initialized, "Thread.__init__() not called"
993 return self.__ident
994
995 def isAlive(self):
996 """Return whether the thread is alive.
997
998 This method returns True just before the run() method starts until just
999 after the run() method terminates. The module function enumerate()
1000 returns a list of all alive threads.
1001
1002 """
1003 assert self.__initialized, "Thread.__init__() not called"
1004 return self.__started.is_set() and not self.__stopped
1005
1006 is_alive = isAlive
1007
1008 @property
1009 def daemon(self):
1010 """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
1011
1012 This must be set before start() is called, otherwise RuntimeError is
1013 raised. Its initial value is inherited from the creating thread; the
1014 main thread is not a daemon thread and therefore all threads created in
1015 the main thread default to daemon = False.
1016
1017 The entire Python program exits when no alive non-daemon threads are
1018 left.
1019
1020 """
1021 assert self.__initialized, "Thread.__init__() not called"
1022 return self.__daemonic
1023
1024 @daemon.setter
1025 def daemon(self, daemonic):
1026 if not self.__initialized:
1027 raise RuntimeError("Thread.__init__() not called")
1028 if self.__started.is_set():
1029 raise RuntimeError("cannot set daemon status of active thread");
1030 self.__daemonic = daemonic
1031
1032 def isDaemon(self):
1033 return self.daemon
1034
1035 def setDaemon(self, daemonic):
1036 self.daemon = daemonic
1037
1038 def getName(self):
1039 return self.name
1040
1041 def setName(self, name):
1042 self.name = name
1043
1044# The timer class was contributed by Itamar Shtull-Trauring
1045
1046def Timer(*args, **kwargs):
1047 """Factory function to create a Timer object.
1048
1049 Timers call a function after a specified number of seconds:
1050
1051 t = Timer(30.0, f, args=[], kwargs={})
1052 t.start()
1053 t.cancel() # stop the timer's action if it's still waiting
1054
1055 """
1056 return _Timer(*args, **kwargs)
1057
1058class _Timer(Thread):
1059 """Call a function after a specified number of seconds:
1060
1061 t = Timer(30.0, f, args=[], kwargs={})
1062 t.start()
1063 t.cancel() # stop the timer's action if it's still waiting
1064
1065 """
1066
1067 def __init__(self, interval, function, args=[], kwargs={}):
1068 Thread.__init__(self)
1069 self.interval = interval
1070 self.function = function
1071 self.args = args
1072 self.kwargs = kwargs
1073 self.finished = Event()
1074
1075 def cancel(self):
1076 """Stop the timer if it hasn't finished yet"""
1077 self.finished.set()
1078
1079 def run(self):
1080 self.finished.wait(self.interval)
1081 if not self.finished.is_set():
1082 self.function(*self.args, **self.kwargs)
1083 self.finished.set()
1084
1085# Special thread class to represent the main thread
1086# This is garbage collected through an exit handler
1087
1088class _MainThread(Thread):
1089
1090 def __init__(self):
1091 Thread.__init__(self, name="MainThread")
1092 self._Thread__started.set()
1093 self._set_ident()
1094 with _active_limbo_lock:
1095 _active[_get_ident()] = self
1096
1097 def _set_daemon(self):
1098 return False
1099
1100 def _exitfunc(self):
1101 self._Thread__stop()
1102 t = _pickSomeNonDaemonThread()
1103 if t:
1104 if __debug__:
1105 self._note("%s: waiting for other threads", self)
1106 while t:
1107 t.join()
1108 t = _pickSomeNonDaemonThread()
1109 if __debug__:
1110 self._note("%s: exiting", self)
1111 self._Thread__delete()
1112
1113def _pickSomeNonDaemonThread():
1114 for t in enumerate():
1115 if not t.daemon and t.is_alive():
1116 return t
1117 return None
1118
1119
1120# Dummy thread class to represent threads not started here.
1121# These aren't garbage collected when they die, nor can they be waited for.
1122# If they invoke anything in threading.py that calls current_thread(), they
1123# leave an entry in the _active dict forever after.
1124# Their purpose is to return *something* from current_thread().
1125# They are marked as daemon threads so we won't wait for them
1126# when we exit (conform previous semantics).
1127
1128class _DummyThread(Thread):
1129
1130 def __init__(self):
1131 Thread.__init__(self, name=_newname("Dummy-%d"))
1132
1133 # Thread.__block consumes an OS-level locking primitive, which
1134 # can never be used by a _DummyThread. Since a _DummyThread
1135 # instance is immortal, that's bad, so release this resource.
1136 del self._Thread__block
1137
1138 self._Thread__started.set()
1139 self._set_ident()
1140 with _active_limbo_lock:
1141 _active[_get_ident()] = self
1142
1143 def _set_daemon(self):
1144 return True
1145
1146 def join(self, timeout=None):
1147 assert False, "cannot join a dummy thread"
1148
1149
1150# Global API functions
1151
1152def currentThread():
1153 """Return the current Thread object, corresponding to the caller's thread of control.
1154
1155 If the caller's thread of control was not created through the threading
1156 module, a dummy thread object with limited functionality is returned.
1157
1158 """
1159 try:
1160 return _active[_get_ident()]
1161 except KeyError:
1162 ##print "current_thread(): no current thread for", _get_ident()
1163 return _DummyThread()
1164
1165current_thread = currentThread
1166
1167def activeCount():
1168 """Return the number of Thread objects currently alive.
1169
1170 The returned count is equal to the length of the list returned by
1171 enumerate().
1172
1173 """
1174 with _active_limbo_lock:
1175 return len(_active) + len(_limbo)
1176
1177active_count = activeCount
1178
1179def _enumerate():
1180 # Same as enumerate(), but without the lock. Internal use only.
1181 return _active.values() + _limbo.values()
1182
1183def enumerate():
1184 """Return a list of all Thread objects currently alive.
1185
1186 The list includes daemonic threads, dummy thread objects created by
1187 current_thread(), and the main thread. It excludes terminated threads and
1188 threads that have not yet been started.
1189
1190 """
1191 with _active_limbo_lock:
1192 return _active.values() + _limbo.values()
1193
1194from thread import stack_size
1195
1196# Create the main thread object,
1197# and make it available for the interpreter
1198# (Py_Main) as threading._shutdown.
1199
1200_shutdown = _MainThread()._exitfunc
1201
1202# get thread-local implementation, either from the thread
1203# module, or from the python fallback
1204
1205try:
1206 from thread import _local as local
1207except ImportError:
1208 from _threading_local import local
1209
1210
1211def _after_fork():
1212 # This function is called by Python/ceval.c:PyEval_ReInitThreads which
1213 # is called from PyOS_AfterFork. Here we cleanup threading module state
1214 # that should not exist after a fork.
1215
1216 # Reset _active_limbo_lock, in case we forked while the lock was held
1217 # by another (non-forked) thread. http://bugs.python.org/issue874900
1218 global _active_limbo_lock
1219 _active_limbo_lock = _allocate_lock()
1220
1221 # fork() only copied the current thread; clear references to others.
1222 new_active = {}
1223 current = current_thread()
1224 with _active_limbo_lock:
1225 for thread in _enumerate():
1226 # Any lock/condition variable may be currently locked or in an
1227 # invalid state, so we reinitialize them.
1228 if hasattr(thread, '_reset_internal_locks'):
1229 thread._reset_internal_locks()
1230 if thread is current:
1231 # There is only one active thread. We reset the ident to
1232 # its new value since it can have changed.
1233 ident = _get_ident()
1234 thread._Thread__ident = ident
1235 new_active[ident] = thread
1236 else:
1237 # All the others are already stopped.
1238 thread._Thread__stop()
1239
1240 _limbo.clear()
1241 _active.clear()
1242 _active.update(new_active)
1243 assert len(_active) == 1
1244
1245
1246# Self-test code
1247
1248def _test():
1249
1250 class BoundedQueue(_Verbose):
1251
1252 def __init__(self, limit):
1253 _Verbose.__init__(self)
1254 self.mon = RLock()
1255 self.rc = Condition(self.mon)
1256 self.wc = Condition(self.mon)
1257 self.limit = limit
1258 self.queue = _deque()
1259
1260 def put(self, item):
1261 self.mon.acquire()
1262 while len(self.queue) >= self.limit:
1263 self._note("put(%s): queue full", item)
1264 self.wc.wait()
1265 self.queue.append(item)
1266 self._note("put(%s): appended, length now %d",
1267 item, len(self.queue))
1268 self.rc.notify()
1269 self.mon.release()
1270
1271 def get(self):
1272 self.mon.acquire()
1273 while not self.queue:
1274 self._note("get(): queue empty")
1275 self.rc.wait()
1276 item = self.queue.popleft()
1277 self._note("get(): got %s, %d left", item, len(self.queue))
1278 self.wc.notify()
1279 self.mon.release()
1280 return item
1281
1282 class ProducerThread(Thread):
1283
1284 def __init__(self, queue, quota):
1285 Thread.__init__(self, name="Producer")
1286 self.queue = queue
1287 self.quota = quota
1288
1289 def run(self):
1290 from random import random
1291 counter = 0
1292 while counter < self.quota:
1293 counter = counter + 1
1294 self.queue.put("%s.%d" % (self.name, counter))
1295 _sleep(random() * 0.00001)
1296
1297
1298 class ConsumerThread(Thread):
1299
1300 def __init__(self, queue, count):
1301 Thread.__init__(self, name="Consumer")
1302 self.queue = queue
1303 self.count = count
1304
1305 def run(self):
1306 while self.count > 0:
1307 item = self.queue.get()
1308 print item
1309 self.count = self.count - 1
1310
1311 NP = 3
1312 QL = 4
1313 NI = 5
1314
1315 Q = BoundedQueue(QL)
1316 P = []
1317 for i in range(NP):
1318 t = ProducerThread(Q, NI)
1319 t.name = ("Producer-%d" % (i+1))
1320 P.append(t)
1321 C = ConsumerThread(Q, NI*NP)
1322 for t in P:
1323 t.start()
1324 _sleep(0.000001)
1325 C.start()
1326 for t in P:
1327 t.join()
1328 C.join()
1329
1330if __name__ == '__main__':
1331 _test()
Note: See TracBrowser for help on using the repository browser.