source: python/trunk/Lib/test/test_signal.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 18.5 KB
Line 
1import unittest
2from test import test_support
3from contextlib import closing
4import gc
5import pickle
6import select
7import signal
8import subprocess
9import traceback
10import sys, os, time, errno
11
12if sys.platform in ('os2', 'riscos'):
13 raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
14
15
16class HandlerBCalled(Exception):
17 pass
18
19
20def exit_subprocess():
21 """Use os._exit(0) to exit the current subprocess.
22
23 Otherwise, the test catches the SystemExit and continues executing
24 in parallel with the original test, so you wind up with an
25 exponential number of tests running concurrently.
26 """
27 os._exit(0)
28
29
30def ignoring_eintr(__func, *args, **kwargs):
31 try:
32 return __func(*args, **kwargs)
33 except EnvironmentError as e:
34 if e.errno != errno.EINTR:
35 raise
36 return None
37
38
39@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
40class InterProcessSignalTests(unittest.TestCase):
41 MAX_DURATION = 20 # Entire test should last at most 20 sec.
42
43 def setUp(self):
44 self.using_gc = gc.isenabled()
45 gc.disable()
46
47 def tearDown(self):
48 if self.using_gc:
49 gc.enable()
50
51 def format_frame(self, frame, limit=None):
52 return ''.join(traceback.format_stack(frame, limit=limit))
53
54 def handlerA(self, signum, frame):
55 self.a_called = True
56 if test_support.verbose:
57 print "handlerA invoked from signal %s at:\n%s" % (
58 signum, self.format_frame(frame, limit=1))
59
60 def handlerB(self, signum, frame):
61 self.b_called = True
62 if test_support.verbose:
63 print "handlerB invoked from signal %s at:\n%s" % (
64 signum, self.format_frame(frame, limit=1))
65 raise HandlerBCalled(signum, self.format_frame(frame))
66
67 def wait(self, child):
68 """Wait for child to finish, ignoring EINTR."""
69 while True:
70 try:
71 child.wait()
72 return
73 except OSError as e:
74 if e.errno != errno.EINTR:
75 raise
76
77 def run_test(self):
78 # Install handlers. This function runs in a sub-process, so we
79 # don't worry about re-setting the default handlers.
80 signal.signal(signal.SIGHUP, self.handlerA)
81 signal.signal(signal.SIGUSR1, self.handlerB)
82 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
83 signal.signal(signal.SIGALRM, signal.default_int_handler)
84
85 # Variables the signals will modify:
86 self.a_called = False
87 self.b_called = False
88
89 # Let the sub-processes know who to send signals to.
90 pid = os.getpid()
91 if test_support.verbose:
92 print "test runner's pid is", pid
93
94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
95 if child:
96 self.wait(child)
97 if not self.a_called:
98 time.sleep(1) # Give the signal time to be delivered.
99 self.assertTrue(self.a_called)
100 self.assertFalse(self.b_called)
101 self.a_called = False
102
103 # Make sure the signal isn't delivered while the previous
104 # Popen object is being destroyed, because __del__ swallows
105 # exceptions.
106 del child
107 try:
108 child = subprocess.Popen(['kill', '-USR1', str(pid)])
109 # This wait should be interrupted by the signal's exception.
110 self.wait(child)
111 time.sleep(1) # Give the signal time to be delivered.
112 self.fail('HandlerBCalled exception not raised')
113 except HandlerBCalled:
114 self.assertTrue(self.b_called)
115 self.assertFalse(self.a_called)
116 if test_support.verbose:
117 print "HandlerBCalled exception caught"
118
119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
120 if child:
121 self.wait(child) # Nothing should happen.
122
123 try:
124 signal.alarm(1)
125 # The race condition in pause doesn't matter in this case,
126 # since alarm is going to raise a KeyboardException, which
127 # will skip the call.
128 signal.pause()
129 # But if another signal arrives before the alarm, pause
130 # may return early.
131 time.sleep(1)
132 except KeyboardInterrupt:
133 if test_support.verbose:
134 print "KeyboardInterrupt (the alarm() went off)"
135 except:
136 self.fail("Some other exception woke us from pause: %s" %
137 traceback.format_exc())
138 else:
139 self.fail("pause returned of its own accord, and the signal"
140 " didn't arrive after another second.")
141
142 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
143 @unittest.skipIf(sys.platform=='freebsd6',
144 'inter process signals not reliable (do not mix well with threading) '
145 'on freebsd6')
146 def test_main(self):
147 # This function spawns a child process to insulate the main
148 # test-running process from all the signals. It then
149 # communicates with that child process over a pipe and
150 # re-raises information about any exceptions the child
151 # raises. The real work happens in self.run_test().
152 os_done_r, os_done_w = os.pipe()
153 with closing(os.fdopen(os_done_r)) as done_r, \
154 closing(os.fdopen(os_done_w, 'w')) as done_w:
155 child = os.fork()
156 if child == 0:
157 # In the child process; run the test and report results
158 # through the pipe.
159 try:
160 done_r.close()
161 # Have to close done_w again here because
162 # exit_subprocess() will skip the enclosing with block.
163 with closing(done_w):
164 try:
165 self.run_test()
166 except:
167 pickle.dump(traceback.format_exc(), done_w)
168 else:
169 pickle.dump(None, done_w)
170 except:
171 print 'Uh oh, raised from pickle.'
172 traceback.print_exc()
173 finally:
174 exit_subprocess()
175
176 done_w.close()
177 # Block for up to MAX_DURATION seconds for the test to finish.
178 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
179 if done_r in r:
180 tb = pickle.load(done_r)
181 if tb:
182 self.fail(tb)
183 else:
184 os.kill(child, signal.SIGKILL)
185 self.fail('Test deadlocked after %d seconds.' %
186 self.MAX_DURATION)
187
188
189@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
190class BasicSignalTests(unittest.TestCase):
191 def trivial_signal_handler(self, *args):
192 pass
193
194 def test_out_of_range_signal_number_raises_error(self):
195 self.assertRaises(ValueError, signal.getsignal, 4242)
196
197 self.assertRaises(ValueError, signal.signal, 4242,
198 self.trivial_signal_handler)
199
200 def test_setting_signal_handler_to_none_raises_error(self):
201 self.assertRaises(TypeError, signal.signal,
202 signal.SIGUSR1, None)
203
204 def test_getsignal(self):
205 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
206 self.assertEqual(signal.getsignal(signal.SIGHUP),
207 self.trivial_signal_handler)
208 signal.signal(signal.SIGHUP, hup)
209 self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
210
211
212@unittest.skipUnless(sys.platform == "win32", "Windows specific")
213class WindowsSignalTests(unittest.TestCase):
214 def test_issue9324(self):
215 # Updated for issue #10003, adding SIGBREAK
216 handler = lambda x, y: None
217 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
218 signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
219 signal.SIGTERM):
220 # Set and then reset a handler for signals that work on windows
221 signal.signal(sig, signal.signal(sig, handler))
222
223 with self.assertRaises(ValueError):
224 signal.signal(-1, handler)
225
226 with self.assertRaises(ValueError):
227 signal.signal(7, handler)
228
229
230class WakeupFDTests(unittest.TestCase):
231
232 def test_invalid_fd(self):
233 fd = test_support.make_bad_fd()
234 self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
235
236
237@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
238class WakeupSignalTests(unittest.TestCase):
239 TIMEOUT_FULL = 10
240 TIMEOUT_HALF = 5
241
242 def test_wakeup_fd_early(self):
243 import select
244
245 signal.alarm(1)
246 before_time = time.time()
247 # We attempt to get a signal during the sleep,
248 # before select is called
249 time.sleep(self.TIMEOUT_FULL)
250 mid_time = time.time()
251 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
252 select.select([self.read], [], [], self.TIMEOUT_FULL)
253 after_time = time.time()
254 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
255
256 def test_wakeup_fd_during(self):
257 import select
258
259 signal.alarm(1)
260 before_time = time.time()
261 # We attempt to get a signal during the select call
262 self.assertRaises(select.error, select.select,
263 [self.read], [], [], self.TIMEOUT_FULL)
264 after_time = time.time()
265 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
266
267 def setUp(self):
268 import fcntl
269
270 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
271 self.read, self.write = os.pipe()
272 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
273 flags = flags | os.O_NONBLOCK
274 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
275 self.old_wakeup = signal.set_wakeup_fd(self.write)
276
277 def tearDown(self):
278 signal.set_wakeup_fd(self.old_wakeup)
279 os.close(self.read)
280 os.close(self.write)
281 signal.signal(signal.SIGALRM, self.alrm)
282
283@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
284class SiginterruptTest(unittest.TestCase):
285
286 def setUp(self):
287 """Install a no-op signal handler that can be set to allow
288 interrupts or not, and arrange for the original signal handler to be
289 re-installed when the test is finished.
290 """
291 self.signum = signal.SIGUSR1
292 oldhandler = signal.signal(self.signum, lambda x,y: None)
293 self.addCleanup(signal.signal, self.signum, oldhandler)
294
295 def readpipe_interrupted(self):
296 """Perform a read during which a signal will arrive. Return True if the
297 read is interrupted by the signal and raises an exception. Return False
298 if it returns normally.
299 """
300 # Create a pipe that can be used for the read. Also clean it up
301 # when the test is over, since nothing else will (but see below for
302 # the write end).
303 r, w = os.pipe()
304 self.addCleanup(os.close, r)
305
306 # Create another process which can send a signal to this one to try
307 # to interrupt the read.
308 ppid = os.getpid()
309 pid = os.fork()
310
311 if pid == 0:
312 # Child code: sleep to give the parent enough time to enter the
313 # read() call (there's a race here, but it's really tricky to
314 # eliminate it); then signal the parent process. Also, sleep
315 # again to make it likely that the signal is delivered to the
316 # parent process before the child exits. If the child exits
317 # first, the write end of the pipe will be closed and the test
318 # is invalid.
319 try:
320 time.sleep(0.2)
321 os.kill(ppid, self.signum)
322 time.sleep(0.2)
323 finally:
324 # No matter what, just exit as fast as possible now.
325 exit_subprocess()
326 else:
327 # Parent code.
328 # Make sure the child is eventually reaped, else it'll be a
329 # zombie for the rest of the test suite run.
330 self.addCleanup(os.waitpid, pid, 0)
331
332 # Close the write end of the pipe. The child has a copy, so
333 # it's not really closed until the child exits. We need it to
334 # close when the child exits so that in the non-interrupt case
335 # the read eventually completes, otherwise we could just close
336 # it *after* the test.
337 os.close(w)
338
339 # Try the read and report whether it is interrupted or not to
340 # the caller.
341 try:
342 d = os.read(r, 1)
343 return False
344 except OSError, err:
345 if err.errno != errno.EINTR:
346 raise
347 return True
348
349 def test_without_siginterrupt(self):
350 """If a signal handler is installed and siginterrupt is not called
351 at all, when that signal arrives, it interrupts a syscall that's in
352 progress.
353 """
354 i = self.readpipe_interrupted()
355 self.assertTrue(i)
356 # Arrival of the signal shouldn't have changed anything.
357 i = self.readpipe_interrupted()
358 self.assertTrue(i)
359
360 def test_siginterrupt_on(self):
361 """If a signal handler is installed and siginterrupt is called with
362 a true value for the second argument, when that signal arrives, it
363 interrupts a syscall that's in progress.
364 """
365 signal.siginterrupt(self.signum, 1)
366 i = self.readpipe_interrupted()
367 self.assertTrue(i)
368 # Arrival of the signal shouldn't have changed anything.
369 i = self.readpipe_interrupted()
370 self.assertTrue(i)
371
372 def test_siginterrupt_off(self):
373 """If a signal handler is installed and siginterrupt is called with
374 a false value for the second argument, when that signal arrives, it
375 does not interrupt a syscall that's in progress.
376 """
377 signal.siginterrupt(self.signum, 0)
378 i = self.readpipe_interrupted()
379 self.assertFalse(i)
380 # Arrival of the signal shouldn't have changed anything.
381 i = self.readpipe_interrupted()
382 self.assertFalse(i)
383
384
385@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
386class ItimerTest(unittest.TestCase):
387 def setUp(self):
388 self.hndl_called = False
389 self.hndl_count = 0
390 self.itimer = None
391 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
392
393 def tearDown(self):
394 signal.signal(signal.SIGALRM, self.old_alarm)
395 if self.itimer is not None: # test_itimer_exc doesn't change this attr
396 # just ensure that itimer is stopped
397 signal.setitimer(self.itimer, 0)
398
399 def sig_alrm(self, *args):
400 self.hndl_called = True
401 if test_support.verbose:
402 print("SIGALRM handler invoked", args)
403
404 def sig_vtalrm(self, *args):
405 self.hndl_called = True
406
407 if self.hndl_count > 3:
408 # it shouldn't be here, because it should have been disabled.
409 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
410 "timer.")
411 elif self.hndl_count == 3:
412 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
413 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
414 if test_support.verbose:
415 print("last SIGVTALRM handler call")
416
417 self.hndl_count += 1
418
419 if test_support.verbose:
420 print("SIGVTALRM handler invoked", args)
421
422 def sig_prof(self, *args):
423 self.hndl_called = True
424 signal.setitimer(signal.ITIMER_PROF, 0)
425
426 if test_support.verbose:
427 print("SIGPROF handler invoked", args)
428
429 def test_itimer_exc(self):
430 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
431 # defines it ?
432 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
433 # Negative times are treated as zero on some platforms.
434 if 0:
435 self.assertRaises(signal.ItimerError,
436 signal.setitimer, signal.ITIMER_REAL, -1)
437
438 def test_itimer_real(self):
439 self.itimer = signal.ITIMER_REAL
440 signal.setitimer(self.itimer, 1.0)
441 if test_support.verbose:
442 print("\ncall pause()...")
443 signal.pause()
444
445 self.assertEqual(self.hndl_called, True)
446
447 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
448 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
449 'itimer not reliable (does not mix well with threading) on some BSDs.')
450 def test_itimer_virtual(self):
451 self.itimer = signal.ITIMER_VIRTUAL
452 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
453 signal.setitimer(self.itimer, 0.3, 0.2)
454
455 start_time = time.time()
456 while time.time() - start_time < 60.0:
457 # use up some virtual time by doing real work
458 _ = pow(12345, 67890, 10000019)
459 if signal.getitimer(self.itimer) == (0.0, 0.0):
460 break # sig_vtalrm handler stopped this itimer
461 else: # Issue 8424
462 self.skipTest("timeout: likely cause: machine too slow or load too "
463 "high")
464
465 # virtual itimer should be (0.0, 0.0) now
466 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
467 # and the handler should have been called
468 self.assertEqual(self.hndl_called, True)
469
470 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
471 @unittest.skipIf(sys.platform=='freebsd6',
472 'itimer not reliable (does not mix well with threading) on freebsd6')
473 def test_itimer_prof(self):
474 self.itimer = signal.ITIMER_PROF
475 signal.signal(signal.SIGPROF, self.sig_prof)
476 signal.setitimer(self.itimer, 0.2, 0.2)
477
478 start_time = time.time()
479 while time.time() - start_time < 60.0:
480 # do some work
481 _ = pow(12345, 67890, 10000019)
482 if signal.getitimer(self.itimer) == (0.0, 0.0):
483 break # sig_prof handler stopped this itimer
484 else: # Issue 8424
485 self.skipTest("timeout: likely cause: machine too slow or load too "
486 "high")
487
488 # profiling itimer should be (0.0, 0.0) now
489 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
490 # and the handler should have been called
491 self.assertEqual(self.hndl_called, True)
492
493def test_main():
494 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
495 WakeupFDTests, WakeupSignalTests,
496 SiginterruptTest, ItimerTest,
497 WindowsSignalTests)
498
499
500if __name__ == "__main__":
501 test_main()
Note: See TracBrowser for help on using the repository browser.