Changeset 391 for python/trunk/Lib/test/test_signal.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_signal.py
r2 r391 1 1 import unittest 2 2 from test import test_support 3 from contextlib import closing , nested3 from contextlib import closing 4 4 import gc 5 5 import pickle … … 10 10 import sys, os, time, errno 11 11 12 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos': 13 raise test_support.TestSkipped("Can't test signal on %s" % \ 14 sys.platform) 12 if sys.platform in ('os2', 'riscos'): 13 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 15 14 16 15 … … 38 37 39 38 39 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 40 40 class InterProcessSignalTests(unittest.TestCase): 41 41 MAX_DURATION = 20 # Entire test should last at most 20 sec. … … 110 110 self.wait(child) 111 111 time.sleep(1) # Give the signal time to be delivered. 112 self.fail('HandlerBCalled exception not thrown')112 self.fail('HandlerBCalled exception not raised') 113 113 except HandlerBCalled: 114 114 self.assertTrue(self.b_called) … … 140 140 " didn't arrive after another second.") 141 141 142 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 143 @unittest.skipIf(sys.platform=='freebsd6', 144 'inter process signals not reliable (do not mix well with threading) ' 145 'on freebsd6') 142 146 def test_main(self): 143 147 # This function spawns a child process to insulate the main … … 145 149 # communicates with that child process over a pipe and 146 150 # re-raises information about any exceptions the child 147 # throws. The real work happens in self.run_test().151 # raises. The real work happens in self.run_test(). 148 152 os_done_r, os_done_w = os.pipe() 149 with nested(closing(os.fdopen(os_done_r)),150 closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):153 with closing(os.fdopen(os_done_r)) as done_r, \ 154 closing(os.fdopen(os_done_w, 'w')) as done_w: 151 155 child = os.fork() 152 156 if child == 0: … … 183 187 184 188 189 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 185 190 class BasicSignalTests(unittest.TestCase): 186 191 def trivial_signal_handler(self, *args): … … 199 204 def test_getsignal(self): 200 205 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 201 self.assertEqual s(signal.getsignal(signal.SIGHUP),202 206 self.assertEqual(signal.getsignal(signal.SIGHUP), 207 self.trivial_signal_handler) 203 208 signal.signal(signal.SIGHUP, hup) 204 self.assertEquals(signal.getsignal(signal.SIGHUP), hup) 205 206 209 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 210 211 212 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 213 class WindowsSignalTests(unittest.TestCase): 214 def test_issue9324(self): 215 # Updated for issue #10003, adding SIGBREAK 216 handler = lambda x, y: None 217 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 218 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 219 signal.SIGTERM): 220 # Set and then reset a handler for signals that work on windows 221 signal.signal(sig, signal.signal(sig, handler)) 222 223 with self.assertRaises(ValueError): 224 signal.signal(-1, handler) 225 226 with self.assertRaises(ValueError): 227 signal.signal(7, handler) 228 229 230 class WakeupFDTests(unittest.TestCase): 231 232 def test_invalid_fd(self): 233 fd = test_support.make_bad_fd() 234 self.assertRaises(ValueError, signal.set_wakeup_fd, fd) 235 236 237 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 207 238 class WakeupSignalTests(unittest.TestCase): 208 239 TIMEOUT_FULL = 10 … … 218 249 time.sleep(self.TIMEOUT_FULL) 219 250 mid_time = time.time() 220 self.assert _(mid_time - before_time < self.TIMEOUT_HALF)251 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) 221 252 select.select([self.read], [], [], self.TIMEOUT_FULL) 222 253 after_time = time.time() 223 self.assert _(after_time - mid_time < self.TIMEOUT_HALF)254 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) 224 255 225 256 def test_wakeup_fd_during(self): … … 232 263 [self.read], [], [], self.TIMEOUT_FULL) 233 264 after_time = time.time() 234 self.assert _(after_time - before_time < self.TIMEOUT_HALF)265 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) 235 266 236 267 def setUp(self): … … 250 281 signal.signal(signal.SIGALRM, self.alrm) 251 282 283 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 252 284 class SiginterruptTest(unittest.TestCase): 253 signum = signal.SIGUSR1 254 def readpipe_interrupted(self, cb): 285 286 def setUp(self): 287 """Install a no-op signal handler that can be set to allow 288 interrupts or not, and arrange for the original signal handler to be 289 re-installed when the test is finished. 290 """ 291 self.signum = signal.SIGUSR1 292 oldhandler = signal.signal(self.signum, lambda x,y: None) 293 self.addCleanup(signal.signal, self.signum, oldhandler) 294 295 def readpipe_interrupted(self): 296 """Perform a read during which a signal will arrive. Return True if the 297 read is interrupted by the signal and raises an exception. Return False 298 if it returns normally. 299 """ 300 # Create a pipe that can be used for the read. Also clean it up 301 # when the test is over, since nothing else will (but see below for 302 # the write end). 255 303 r, w = os.pipe() 304 self.addCleanup(os.close, r) 305 306 # Create another process which can send a signal to this one to try 307 # to interrupt the read. 256 308 ppid = os.getpid() 257 309 pid = os.fork() 258 310 259 oldhandler = signal.signal(self.signum, lambda x,y: None) 260 cb() 261 if pid==0: 262 # child code: sleep, kill, sleep. and then exit, 263 # which closes the pipe from which the parent process reads 311 if pid == 0: 312 # Child code: sleep to give the parent enough time to enter the 313 # read() call (there's a race here, but it's really tricky to 314 # eliminate it); then signal the parent process. Also, sleep 315 # again to make it likely that the signal is delivered to the 316 # parent process before the child exits. If the child exits 317 # first, the write end of the pipe will be closed and the test 318 # is invalid. 264 319 try: 265 320 time.sleep(0.2) … … 267 322 time.sleep(0.2) 268 323 finally: 324 # No matter what, just exit as fast as possible now. 269 325 exit_subprocess() 270 271 try: 326 else: 327 # Parent code. 328 # Make sure the child is eventually reaped, else it'll be a 329 # zombie for the rest of the test suite run. 330 self.addCleanup(os.waitpid, pid, 0) 331 332 # Close the write end of the pipe. The child has a copy, so 333 # it's not really closed until the child exits. We need it to 334 # close when the child exits so that in the non-interrupt case 335 # the read eventually completes, otherwise we could just close 336 # it *after* the test. 272 337 os.close(w) 273 338 339 # Try the read and report whether it is interrupted or not to 340 # the caller. 274 341 try: 275 d =os.read(r, 1)342 d = os.read(r, 1) 276 343 return False 277 344 except OSError, err: … … 279 346 raise 280 347 return True 281 finally:282 signal.signal(self.signum, oldhandler)283 os.waitpid(pid, 0)284 348 285 349 def test_without_siginterrupt(self): 286 i=self.readpipe_interrupted(lambda: None) 287 self.assertEquals(i, True) 350 """If a signal handler is installed and siginterrupt is not called 351 at all, when that signal arrives, it interrupts a syscall that's in 352 progress. 353 """ 354 i = self.readpipe_interrupted() 355 self.assertTrue(i) 356 # Arrival of the signal shouldn't have changed anything. 357 i = self.readpipe_interrupted() 358 self.assertTrue(i) 288 359 289 360 def test_siginterrupt_on(self): 290 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1)) 291 self.assertEquals(i, True) 361 """If a signal handler is installed and siginterrupt is called with 362 a true value for the second argument, when that signal arrives, it 363 interrupts a syscall that's in progress. 364 """ 365 signal.siginterrupt(self.signum, 1) 366 i = self.readpipe_interrupted() 367 self.assertTrue(i) 368 # Arrival of the signal shouldn't have changed anything. 369 i = self.readpipe_interrupted() 370 self.assertTrue(i) 292 371 293 372 def test_siginterrupt_off(self): 294 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0)) 295 self.assertEquals(i, False) 296 373 """If a signal handler is installed and siginterrupt is called with 374 a false value for the second argument, when that signal arrives, it 375 does not interrupt a syscall that's in progress. 376 """ 377 signal.siginterrupt(self.signum, 0) 378 i = self.readpipe_interrupted() 379 self.assertFalse(i) 380 # Arrival of the signal shouldn't have changed anything. 381 i = self.readpipe_interrupted() 382 self.assertFalse(i) 383 384 385 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 297 386 class ItimerTest(unittest.TestCase): 298 387 def setUp(self): … … 356 445 self.assertEqual(self.hndl_called, True) 357 446 447 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 448 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), 449 'itimer not reliable (does not mix well with threading) on some BSDs.') 358 450 def test_itimer_virtual(self): 359 451 self.itimer = signal.ITIMER_VIRTUAL … … 362 454 363 455 start_time = time.time() 364 while time.time() - start_time < 5.0:456 while time.time() - start_time < 60.0: 365 457 # use up some virtual time by doing real work 366 458 _ = pow(12345, 67890, 10000019) 367 459 if signal.getitimer(self.itimer) == (0.0, 0.0): 368 460 break # sig_vtalrm handler stopped this itimer 369 else: 370 self.fail('timeout waiting for sig_vtalrm signal; ' 371 'signal.getitimer(self.itimer) gives: %s' % 372 (signal.getitimer(self.itimer),)) 461 else: # Issue 8424 462 self.skipTest("timeout: likely cause: machine too slow or load too " 463 "high") 373 464 374 465 # virtual itimer should be (0.0, 0.0) now 375 self.assertEqual s(signal.getitimer(self.itimer), (0.0, 0.0))466 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 376 467 # and the handler should have been called 377 self.assertEquals(self.hndl_called, True) 378 468 self.assertEqual(self.hndl_called, True) 469 470 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 471 @unittest.skipIf(sys.platform=='freebsd6', 472 'itimer not reliable (does not mix well with threading) on freebsd6') 379 473 def test_itimer_prof(self): 380 474 self.itimer = signal.ITIMER_PROF … … 383 477 384 478 start_time = time.time() 385 while time.time() - start_time < 5.0:479 while time.time() - start_time < 60.0: 386 480 # do some work 387 481 _ = pow(12345, 67890, 10000019) 388 482 if signal.getitimer(self.itimer) == (0.0, 0.0): 389 483 break # sig_prof handler stopped this itimer 390 else: 391 self.fail('timeout waiting for sig_prof signal') 484 else: # Issue 8424 485 self.skipTest("timeout: likely cause: machine too slow or load too " 486 "high") 392 487 393 488 # profiling itimer should be (0.0, 0.0) now 394 self.assertEqual s(signal.getitimer(self.itimer), (0.0, 0.0))489 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 395 490 # and the handler should have been called 396 491 self.assertEqual(self.hndl_called, True) … … 398 493 def test_main(): 399 494 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, 400 WakeupSignalTests, SiginterruptTest, ItimerTest) 495 WakeupFDTests, WakeupSignalTests, 496 SiginterruptTest, ItimerTest, 497 WindowsSignalTests) 401 498 402 499
Note:
See TracChangeset
for help on using the changeset viewer.