1 | import signal
|
---|
2 | import weakref
|
---|
3 |
|
---|
4 | from functools import wraps
|
---|
5 |
|
---|
6 | __unittest = True
|
---|
7 |
|
---|
8 |
|
---|
9 | class _InterruptHandler(object):
|
---|
10 | def __init__(self, default_handler):
|
---|
11 | self.called = False
|
---|
12 | self.original_handler = default_handler
|
---|
13 | if isinstance(default_handler, int):
|
---|
14 | if default_handler == signal.SIG_DFL:
|
---|
15 | # Pretend it's signal.default_int_handler instead.
|
---|
16 | default_handler = signal.default_int_handler
|
---|
17 | elif default_handler == signal.SIG_IGN:
|
---|
18 | # Not quite the same thing as SIG_IGN, but the closest we
|
---|
19 | # can make it: do nothing.
|
---|
20 | def default_handler(unused_signum, unused_frame):
|
---|
21 | pass
|
---|
22 | else:
|
---|
23 | raise TypeError("expected SIGINT signal handler to be "
|
---|
24 | "signal.SIG_IGN, signal.SIG_DFL, or a "
|
---|
25 | "callable object")
|
---|
26 | self.default_handler = default_handler
|
---|
27 |
|
---|
28 | def __call__(self, signum, frame):
|
---|
29 | installed_handler = signal.getsignal(signal.SIGINT)
|
---|
30 | if installed_handler is not self:
|
---|
31 | # if we aren't the installed handler, then delegate immediately
|
---|
32 | # to the default handler
|
---|
33 | self.default_handler(signum, frame)
|
---|
34 |
|
---|
35 | if self.called:
|
---|
36 | self.default_handler(signum, frame)
|
---|
37 | self.called = True
|
---|
38 | for result in _results.keys():
|
---|
39 | result.stop()
|
---|
40 |
|
---|
41 | _results = weakref.WeakKeyDictionary()
|
---|
42 | def registerResult(result):
|
---|
43 | _results[result] = 1
|
---|
44 |
|
---|
45 | def removeResult(result):
|
---|
46 | return bool(_results.pop(result, None))
|
---|
47 |
|
---|
48 | _interrupt_handler = None
|
---|
49 | def installHandler():
|
---|
50 | global _interrupt_handler
|
---|
51 | if _interrupt_handler is None:
|
---|
52 | default_handler = signal.getsignal(signal.SIGINT)
|
---|
53 | _interrupt_handler = _InterruptHandler(default_handler)
|
---|
54 | signal.signal(signal.SIGINT, _interrupt_handler)
|
---|
55 |
|
---|
56 |
|
---|
57 | def removeHandler(method=None):
|
---|
58 | if method is not None:
|
---|
59 | @wraps(method)
|
---|
60 | def inner(*args, **kwargs):
|
---|
61 | initial = signal.getsignal(signal.SIGINT)
|
---|
62 | removeHandler()
|
---|
63 | try:
|
---|
64 | return method(*args, **kwargs)
|
---|
65 | finally:
|
---|
66 | signal.signal(signal.SIGINT, initial)
|
---|
67 | return inner
|
---|
68 |
|
---|
69 | global _interrupt_handler
|
---|
70 | if _interrupt_handler is not None:
|
---|
71 | signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
|
---|