Changeset 391 for python/trunk/Lib/test/test_socketserver.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_socketserver.py
r2 r391 4 4 5 5 import contextlib 6 import errno7 6 import imp 8 7 import os … … 10 9 import signal 11 10 import socket 11 import select 12 import errno 12 13 import tempfile 13 import threading14 import time15 14 import unittest 16 15 import SocketServer 17 16 18 17 import test.test_support 19 from test.test_support import reap_children, verbose, TestSkipped 20 from test.test_support import TESTFN as TEST_FILE 18 from test.test_support import reap_children, reap_threads, verbose 19 try: 20 import threading 21 except ImportError: 22 threading = None 21 23 22 24 test.test_support.requires("network") … … 33 35 signal.alarm(n) 34 36 37 # Remember real select() to avoid interferences with mocking 38 _real_select = select.select 39 35 40 def receive(sock, n, timeout=20): 36 r, w, x = select.select([sock], [], [], timeout)41 r, w, x = _real_select([sock], [], [], timeout) 37 42 if sock in r: 38 43 return sock.recv(n) … … 54 59 pid = os.fork() 55 60 if pid == 0: 56 # Don't throwan exception; it would be caught by the test harness.61 # Don't raise an exception; it would be caught by the test harness. 57 62 os._exit(72) 58 63 yield None 59 64 pid2, status = os.waitpid(pid, 0) 60 testcase.assertEquals(pid2, pid) 61 testcase.assertEquals(72 << 8, status) 62 63 65 testcase.assertEqual(pid2, pid) 66 testcase.assertEqual(72 << 8, status) 67 68 69 @unittest.skipUnless(threading, 'Threading required for this test.') 64 70 class SocketServerTest(unittest.TestCase): 65 71 """Test all socket servers.""" 66 72 67 73 def setUp(self): 68 signal_alarm( 20) # Kill deadlocks after 20 seconds.74 signal_alarm(60) # Kill deadlocks after 60 seconds. 69 75 self.port_seed = 0 70 76 self.test_files = [] … … 120 126 if verbose: print "creating server" 121 127 server = MyServer(addr, MyHandler) 122 self.assertEqual s(server.server_address, server.socket.getsockname())128 self.assertEqual(server.server_address, server.socket.getsockname()) 123 129 return server 124 130 131 @reap_threads 125 132 def run_server(self, svrcls, hdlrbase, testfunc): 126 133 server = self.make_server(self.pickaddr(svrcls.address_family), … … 159 166 data = receive(s, 100) 160 167 buf += data 161 self.assertEqual s(buf, TEST_STR)168 self.assertEqual(buf, TEST_STR) 162 169 s.close() 163 170 … … 169 176 data = receive(s, 100) 170 177 buf += data 171 self.assertEqual s(buf, TEST_STR)178 self.assertEqual(buf, TEST_STR) 172 179 s.close() 173 180 … … 223 230 SocketServer.DatagramRequestHandler, 224 231 self.dgram_examine) 232 233 @contextlib.contextmanager 234 def mocked_select_module(self): 235 """Mocks the select.select() call to raise EINTR for first call""" 236 old_select = select.select 237 238 class MockSelect: 239 def __init__(self): 240 self.called = 0 241 242 def __call__(self, *args): 243 self.called += 1 244 if self.called == 1: 245 # raise the exception on first call 246 raise select.error(errno.EINTR, os.strerror(errno.EINTR)) 247 else: 248 # Return real select value for consecutive calls 249 return old_select(*args) 250 251 select.select = MockSelect() 252 try: 253 yield select.select 254 finally: 255 select.select = old_select 256 257 def test_InterruptServerSelectCall(self): 258 with self.mocked_select_module() as mock_select: 259 pid = self.run_server(SocketServer.TCPServer, 260 SocketServer.StreamRequestHandler, 261 self.stream_examine) 262 # Make sure select was called again: 263 self.assertGreater(mock_select.called, 1) 225 264 226 265 # Alas, on Linux (at least) recvfrom() doesn't return a meaningful … … 244 283 # self.dgram_examine) 245 284 285 @reap_threads 286 def test_shutdown(self): 287 # Issue #2302: shutdown() should always succeed in making an 288 # other thread leave serve_forever(). 289 class MyServer(SocketServer.TCPServer): 290 pass 291 292 class MyHandler(SocketServer.StreamRequestHandler): 293 pass 294 295 threads = [] 296 for i in range(20): 297 s = MyServer((HOST, 0), MyHandler) 298 t = threading.Thread( 299 name='MyServer serving', 300 target=s.serve_forever, 301 kwargs={'poll_interval':0.01}) 302 t.daemon = True # In case this function raises. 303 threads.append((t, s)) 304 for t, s in threads: 305 t.start() 306 s.shutdown() 307 for t, s in threads: 308 t.join() 309 246 310 247 311 def test_main(): 248 312 if imp.lock_held(): 249 313 # If the import lock is held, the threads will hang 250 raise TestSkipped("can't run when import lock is held")314 raise unittest.SkipTest("can't run when import lock is held") 251 315 252 316 test.test_support.run_unittest(SocketServerTest) … … 254 318 if __name__ == "__main__": 255 319 test_main() 256 signal_alarm(3) # Shutdown shouldn't take more than 3 seconds.
Note:
See TracChangeset
for help on using the changeset viewer.