1 | import gc
|
---|
2 | import os
|
---|
3 | import sys
|
---|
4 | import signal
|
---|
5 | import weakref
|
---|
6 |
|
---|
7 | from cStringIO import StringIO
|
---|
8 |
|
---|
9 |
|
---|
10 | import unittest
|
---|
11 |
|
---|
12 |
|
---|
13 | @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
|
---|
14 | @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
|
---|
15 | @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
|
---|
16 | "if threads have been used")
|
---|
17 | class TestBreak(unittest.TestCase):
|
---|
18 | int_handler = None
|
---|
19 |
|
---|
20 | def setUp(self):
|
---|
21 | self._default_handler = signal.getsignal(signal.SIGINT)
|
---|
22 | if self.int_handler is not None:
|
---|
23 | signal.signal(signal.SIGINT, self.int_handler)
|
---|
24 |
|
---|
25 | def tearDown(self):
|
---|
26 | signal.signal(signal.SIGINT, self._default_handler)
|
---|
27 | unittest.signals._results = weakref.WeakKeyDictionary()
|
---|
28 | unittest.signals._interrupt_handler = None
|
---|
29 |
|
---|
30 |
|
---|
31 | def testInstallHandler(self):
|
---|
32 | default_handler = signal.getsignal(signal.SIGINT)
|
---|
33 | unittest.installHandler()
|
---|
34 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
35 |
|
---|
36 | try:
|
---|
37 | pid = os.getpid()
|
---|
38 | os.kill(pid, signal.SIGINT)
|
---|
39 | except KeyboardInterrupt:
|
---|
40 | self.fail("KeyboardInterrupt not handled")
|
---|
41 |
|
---|
42 | self.assertTrue(unittest.signals._interrupt_handler.called)
|
---|
43 |
|
---|
44 | def testRegisterResult(self):
|
---|
45 | result = unittest.TestResult()
|
---|
46 | unittest.registerResult(result)
|
---|
47 |
|
---|
48 | for ref in unittest.signals._results:
|
---|
49 | if ref is result:
|
---|
50 | break
|
---|
51 | elif ref is not result:
|
---|
52 | self.fail("odd object in result set")
|
---|
53 | else:
|
---|
54 | self.fail("result not found")
|
---|
55 |
|
---|
56 |
|
---|
57 | def testInterruptCaught(self):
|
---|
58 | default_handler = signal.getsignal(signal.SIGINT)
|
---|
59 |
|
---|
60 | result = unittest.TestResult()
|
---|
61 | unittest.installHandler()
|
---|
62 | unittest.registerResult(result)
|
---|
63 |
|
---|
64 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
65 |
|
---|
66 | def test(result):
|
---|
67 | pid = os.getpid()
|
---|
68 | os.kill(pid, signal.SIGINT)
|
---|
69 | result.breakCaught = True
|
---|
70 | self.assertTrue(result.shouldStop)
|
---|
71 |
|
---|
72 | try:
|
---|
73 | test(result)
|
---|
74 | except KeyboardInterrupt:
|
---|
75 | self.fail("KeyboardInterrupt not handled")
|
---|
76 | self.assertTrue(result.breakCaught)
|
---|
77 |
|
---|
78 |
|
---|
79 | def testSecondInterrupt(self):
|
---|
80 | # Can't use skipIf decorator because the signal handler may have
|
---|
81 | # been changed after defining this method.
|
---|
82 | if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
|
---|
83 | self.skipTest("test requires SIGINT to not be ignored")
|
---|
84 | result = unittest.TestResult()
|
---|
85 | unittest.installHandler()
|
---|
86 | unittest.registerResult(result)
|
---|
87 |
|
---|
88 | def test(result):
|
---|
89 | pid = os.getpid()
|
---|
90 | os.kill(pid, signal.SIGINT)
|
---|
91 | result.breakCaught = True
|
---|
92 | self.assertTrue(result.shouldStop)
|
---|
93 | os.kill(pid, signal.SIGINT)
|
---|
94 | self.fail("Second KeyboardInterrupt not raised")
|
---|
95 |
|
---|
96 | try:
|
---|
97 | test(result)
|
---|
98 | except KeyboardInterrupt:
|
---|
99 | pass
|
---|
100 | else:
|
---|
101 | self.fail("Second KeyboardInterrupt not raised")
|
---|
102 | self.assertTrue(result.breakCaught)
|
---|
103 |
|
---|
104 |
|
---|
105 | def testTwoResults(self):
|
---|
106 | unittest.installHandler()
|
---|
107 |
|
---|
108 | result = unittest.TestResult()
|
---|
109 | unittest.registerResult(result)
|
---|
110 | new_handler = signal.getsignal(signal.SIGINT)
|
---|
111 |
|
---|
112 | result2 = unittest.TestResult()
|
---|
113 | unittest.registerResult(result2)
|
---|
114 | self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
|
---|
115 |
|
---|
116 | result3 = unittest.TestResult()
|
---|
117 |
|
---|
118 | def test(result):
|
---|
119 | pid = os.getpid()
|
---|
120 | os.kill(pid, signal.SIGINT)
|
---|
121 |
|
---|
122 | try:
|
---|
123 | test(result)
|
---|
124 | except KeyboardInterrupt:
|
---|
125 | self.fail("KeyboardInterrupt not handled")
|
---|
126 |
|
---|
127 | self.assertTrue(result.shouldStop)
|
---|
128 | self.assertTrue(result2.shouldStop)
|
---|
129 | self.assertFalse(result3.shouldStop)
|
---|
130 |
|
---|
131 |
|
---|
132 | def testHandlerReplacedButCalled(self):
|
---|
133 | # Can't use skipIf decorator because the signal handler may have
|
---|
134 | # been changed after defining this method.
|
---|
135 | if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
|
---|
136 | self.skipTest("test requires SIGINT to not be ignored")
|
---|
137 | # If our handler has been replaced (is no longer installed) but is
|
---|
138 | # called by the *new* handler, then it isn't safe to delay the
|
---|
139 | # SIGINT and we should immediately delegate to the default handler
|
---|
140 | unittest.installHandler()
|
---|
141 |
|
---|
142 | handler = signal.getsignal(signal.SIGINT)
|
---|
143 | def new_handler(frame, signum):
|
---|
144 | handler(frame, signum)
|
---|
145 | signal.signal(signal.SIGINT, new_handler)
|
---|
146 |
|
---|
147 | try:
|
---|
148 | pid = os.getpid()
|
---|
149 | os.kill(pid, signal.SIGINT)
|
---|
150 | except KeyboardInterrupt:
|
---|
151 | pass
|
---|
152 | else:
|
---|
153 | self.fail("replaced but delegated handler doesn't raise interrupt")
|
---|
154 |
|
---|
155 | def testRunner(self):
|
---|
156 | # Creating a TextTestRunner with the appropriate argument should
|
---|
157 | # register the TextTestResult it creates
|
---|
158 | runner = unittest.TextTestRunner(stream=StringIO())
|
---|
159 |
|
---|
160 | result = runner.run(unittest.TestSuite())
|
---|
161 | self.assertIn(result, unittest.signals._results)
|
---|
162 |
|
---|
163 | def testWeakReferences(self):
|
---|
164 | # Calling registerResult on a result should not keep it alive
|
---|
165 | result = unittest.TestResult()
|
---|
166 | unittest.registerResult(result)
|
---|
167 |
|
---|
168 | ref = weakref.ref(result)
|
---|
169 | del result
|
---|
170 |
|
---|
171 | # For non-reference counting implementations
|
---|
172 | gc.collect();gc.collect()
|
---|
173 | self.assertIsNone(ref())
|
---|
174 |
|
---|
175 |
|
---|
176 | def testRemoveResult(self):
|
---|
177 | result = unittest.TestResult()
|
---|
178 | unittest.registerResult(result)
|
---|
179 |
|
---|
180 | unittest.installHandler()
|
---|
181 | self.assertTrue(unittest.removeResult(result))
|
---|
182 |
|
---|
183 | # Should this raise an error instead?
|
---|
184 | self.assertFalse(unittest.removeResult(unittest.TestResult()))
|
---|
185 |
|
---|
186 | try:
|
---|
187 | pid = os.getpid()
|
---|
188 | os.kill(pid, signal.SIGINT)
|
---|
189 | except KeyboardInterrupt:
|
---|
190 | pass
|
---|
191 |
|
---|
192 | self.assertFalse(result.shouldStop)
|
---|
193 |
|
---|
194 | def testMainInstallsHandler(self):
|
---|
195 | failfast = object()
|
---|
196 | test = object()
|
---|
197 | verbosity = object()
|
---|
198 | result = object()
|
---|
199 | default_handler = signal.getsignal(signal.SIGINT)
|
---|
200 |
|
---|
201 | class FakeRunner(object):
|
---|
202 | initArgs = []
|
---|
203 | runArgs = []
|
---|
204 | def __init__(self, *args, **kwargs):
|
---|
205 | self.initArgs.append((args, kwargs))
|
---|
206 | def run(self, test):
|
---|
207 | self.runArgs.append(test)
|
---|
208 | return result
|
---|
209 |
|
---|
210 | class Program(unittest.TestProgram):
|
---|
211 | def __init__(self, catchbreak):
|
---|
212 | self.exit = False
|
---|
213 | self.verbosity = verbosity
|
---|
214 | self.failfast = failfast
|
---|
215 | self.catchbreak = catchbreak
|
---|
216 | self.testRunner = FakeRunner
|
---|
217 | self.test = test
|
---|
218 | self.result = None
|
---|
219 |
|
---|
220 | p = Program(False)
|
---|
221 | p.runTests()
|
---|
222 |
|
---|
223 | self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
|
---|
224 | 'verbosity': verbosity,
|
---|
225 | 'failfast': failfast})])
|
---|
226 | self.assertEqual(FakeRunner.runArgs, [test])
|
---|
227 | self.assertEqual(p.result, result)
|
---|
228 |
|
---|
229 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
230 |
|
---|
231 | FakeRunner.initArgs = []
|
---|
232 | FakeRunner.runArgs = []
|
---|
233 | p = Program(True)
|
---|
234 | p.runTests()
|
---|
235 |
|
---|
236 | self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
|
---|
237 | 'verbosity': verbosity,
|
---|
238 | 'failfast': failfast})])
|
---|
239 | self.assertEqual(FakeRunner.runArgs, [test])
|
---|
240 | self.assertEqual(p.result, result)
|
---|
241 |
|
---|
242 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
243 |
|
---|
244 | def testRemoveHandler(self):
|
---|
245 | default_handler = signal.getsignal(signal.SIGINT)
|
---|
246 | unittest.installHandler()
|
---|
247 | unittest.removeHandler()
|
---|
248 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
249 |
|
---|
250 | # check that calling removeHandler multiple times has no ill-effect
|
---|
251 | unittest.removeHandler()
|
---|
252 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
253 |
|
---|
254 | def testRemoveHandlerAsDecorator(self):
|
---|
255 | default_handler = signal.getsignal(signal.SIGINT)
|
---|
256 | unittest.installHandler()
|
---|
257 |
|
---|
258 | @unittest.removeHandler
|
---|
259 | def test():
|
---|
260 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
261 |
|
---|
262 | test()
|
---|
263 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
---|
264 |
|
---|
265 | @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
|
---|
266 | @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
|
---|
267 | @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
|
---|
268 | "if threads have been used")
|
---|
269 | class TestBreakDefaultIntHandler(TestBreak):
|
---|
270 | int_handler = signal.default_int_handler
|
---|
271 |
|
---|
272 | @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
|
---|
273 | @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
|
---|
274 | @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
|
---|
275 | "if threads have been used")
|
---|
276 | class TestBreakSignalIgnored(TestBreak):
|
---|
277 | int_handler = signal.SIG_IGN
|
---|
278 |
|
---|
279 | @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
|
---|
280 | @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
|
---|
281 | @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
|
---|
282 | "if threads have been used")
|
---|
283 | class TestBreakSignalDefault(TestBreak):
|
---|
284 | int_handler = signal.SIG_DFL
|
---|