Changeset 391 for python/trunk/Lib/test/test_multiprocessing.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_multiprocessing.py
r2 r391 6 6 7 7 import unittest 8 import threading9 8 import Queue 10 9 import time … … 14 13 import signal 15 14 import array 16 import copy17 15 import socket 18 16 import random 19 17 import logging 18 import errno 19 import test.script_helper 20 from test import test_support 20 21 from StringIO import StringIO 21 from test import test_support 22 22 _multiprocessing = test_support.import_module('_multiprocessing') 23 # import threading after _multiprocessing to raise a more relevant error 24 # message: "No module named _multiprocessing". _multiprocessing is not compiled 25 # without thread support. 26 import threading 23 27 24 28 # Work around broken sem_open implementations 25 try: 26 import multiprocessing.synchronize 27 except ImportError, e: 28 from test.test_support import TestSkipped 29 raise TestSkipped(e) 29 test_support.import_module('multiprocessing.synchronize') 30 30 31 31 import multiprocessing.dummy … … 34 34 import multiprocessing.heap 35 35 import multiprocessing.pool 36 import _multiprocessing37 36 38 37 from multiprocessing import util 39 38 39 try: 40 from multiprocessing import reduction 41 HAS_REDUCTION = True 42 except ImportError: 43 HAS_REDUCTION = False 44 45 try: 46 from multiprocessing.sharedctypes import Value, copy 47 HAS_SHAREDCTYPES = True 48 except ImportError: 49 HAS_SHAREDCTYPES = False 50 51 try: 52 import msvcrt 53 except ImportError: 54 msvcrt = None 55 40 56 # 41 57 # … … 49 65 50 66 LOG_LEVEL = util.SUBWARNING 51 #LOG_LEVEL = logging. WARNING67 #LOG_LEVEL = logging.DEBUG 52 68 53 69 DELTA = 0.1 … … 66 82 WIN32 = (sys.platform == "win32") 67 83 84 try: 85 MAXFD = os.sysconf("SC_OPEN_MAX") 86 except: 87 MAXFD = 256 88 89 # 90 # Some tests require ctypes 91 # 92 93 try: 94 from ctypes import Structure, c_int, c_double 95 except ImportError: 96 Structure = object 97 c_int = c_double = None 98 99 100 def check_enough_semaphores(): 101 """Check that the system supports enough semaphores to run the test.""" 102 # minimum number of semaphores available according to POSIX 103 nsems_min = 256 104 try: 105 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 106 except (AttributeError, ValueError): 107 # sysconf not available or setting not available 108 return 109 if nsems == -1 or nsems >= nsems_min: 110 return 111 raise unittest.SkipTest("The OS doesn't support enough semaphores " 112 "to run the test (required: %d)." % nsems_min) 113 114 68 115 # 69 116 # Creates a wrapper for a function which records the time it takes to finish … … 102 149 else: 103 150 return self.assertEqual(value, res) 151 152 # For the sanity of Windows users, rather than crashing or freezing in 153 # multiple ways. 154 def __reduce__(self, *args): 155 raise NotImplementedError("shouldn't try to pickle a test case") 156 157 __reduce_ex__ = __reduce__ 104 158 105 159 # … … 136 190 self.assertTrue(current.is_alive()) 137 191 self.assertTrue(not current.daemon) 138 self.assert True(isinstance(authkey, bytes))192 self.assertIsInstance(authkey, bytes) 139 193 self.assertTrue(len(authkey) > 0) 140 194 self.assertEqual(current.ident, os.getpid()) 141 195 self.assertEqual(current.exitcode, None) 142 196 143 def _test(self, q, *args, **kwds): 144 current = self.current_process() 197 @classmethod 198 def _test(cls, q, *args, **kwds): 199 current = cls.current_process() 145 200 q.put(args) 146 201 q.put(kwds) 147 202 q.put(current.name) 148 if self.TYPE != 'threads':203 if cls.TYPE != 'threads': 149 204 q.put(bytes(current.authkey)) 150 205 q.put(current.pid) … … 163 218 164 219 if self.TYPE != 'threads': 165 self.assertEqual s(p.authkey, current.authkey)166 self.assertEqual s(p.is_alive(), False)167 self.assertEqual s(p.daemon, True)168 self.assert True(p not inself.active_children())220 self.assertEqual(p.authkey, current.authkey) 221 self.assertEqual(p.is_alive(), False) 222 self.assertEqual(p.daemon, True) 223 self.assertNotIn(p, self.active_children()) 169 224 self.assertTrue(type(self.active_children()) is list) 170 225 self.assertEqual(p.exitcode, None) … … 172 227 p.start() 173 228 174 self.assertEqual s(p.exitcode, None)175 self.assertEqual s(p.is_alive(), True)176 self.assert True(p inself.active_children())177 178 self.assertEqual s(q.get(), args[1:])179 self.assertEqual s(q.get(), kwargs)180 self.assertEqual s(q.get(), p.name)229 self.assertEqual(p.exitcode, None) 230 self.assertEqual(p.is_alive(), True) 231 self.assertIn(p, self.active_children()) 232 233 self.assertEqual(q.get(), args[1:]) 234 self.assertEqual(q.get(), kwargs) 235 self.assertEqual(q.get(), p.name) 181 236 if self.TYPE != 'threads': 182 self.assertEqual s(q.get(), current.authkey)183 self.assertEqual s(q.get(), p.pid)237 self.assertEqual(q.get(), current.authkey) 238 self.assertEqual(q.get(), p.pid) 184 239 185 240 p.join() 186 241 187 self.assertEquals(p.exitcode, 0) 188 self.assertEquals(p.is_alive(), False) 189 self.assertTrue(p not in self.active_children()) 190 191 def _test_terminate(self): 242 self.assertEqual(p.exitcode, 0) 243 self.assertEqual(p.is_alive(), False) 244 self.assertNotIn(p, self.active_children()) 245 246 @classmethod 247 def _test_terminate(cls): 192 248 time.sleep(1000) 193 249 … … 201 257 202 258 self.assertEqual(p.is_alive(), True) 203 self.assert True(p inself.active_children())259 self.assertIn(p, self.active_children()) 204 260 self.assertEqual(p.exitcode, None) 205 261 … … 211 267 212 268 self.assertEqual(p.is_alive(), False) 213 self.assert True(p not inself.active_children())269 self.assertNotIn(p, self.active_children()) 214 270 215 271 p.join() … … 230 286 231 287 p = self.Process(target=time.sleep, args=(DELTA,)) 232 self.assertTrue(p not in self.active_children()) 233 288 self.assertNotIn(p, self.active_children()) 289 290 p.daemon = True 234 291 p.start() 235 self.assert True(p inself.active_children())292 self.assertIn(p, self.active_children()) 236 293 237 294 p.join() 238 self.assertTrue(p not in self.active_children()) 239 240 def _test_recursion(self, wconn, id): 295 self.assertNotIn(p, self.active_children()) 296 297 @classmethod 298 def _test_recursion(cls, wconn, id): 241 299 from multiprocessing import forking 242 300 wconn.send(id) 243 301 if len(id) < 2: 244 302 for i in range(2): 245 p = self.Process(246 target= self._test_recursion, args=(wconn, id+[i])303 p = cls.Process( 304 target=cls._test_recursion, args=(wconn, id+[i]) 247 305 ) 248 306 p.start() … … 269 327 self.assertEqual(result, expected) 270 328 329 @classmethod 330 def _test_sys_exit(cls, reason, testfn): 331 sys.stderr = open(testfn, 'w') 332 sys.exit(reason) 333 334 def test_sys_exit(self): 335 # See Issue 13854 336 if self.TYPE == 'threads': 337 return 338 339 testfn = test_support.TESTFN 340 self.addCleanup(test_support.unlink, testfn) 341 342 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)): 343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 344 p.daemon = True 345 p.start() 346 p.join(5) 347 self.assertEqual(p.exitcode, code) 348 349 with open(testfn, 'r') as f: 350 self.assertEqual(f.read().rstrip(), str(reason)) 351 352 for reason in (True, False, 8): 353 p = self.Process(target=sys.exit, args=(reason,)) 354 p.daemon = True 355 p.start() 356 p.join(5) 357 self.assertEqual(p.exitcode, reason) 358 271 359 # 272 360 # … … 301 389 def test_subclassing(self): 302 390 uppercaser = _UpperCaser() 391 uppercaser.daemon = True 303 392 uppercaser.start() 304 393 self.assertEqual(uppercaser.submit('hello'), 'HELLO') … … 327 416 328 417 329 def _test_put(self, queue, child_can_start, parent_can_continue): 418 @classmethod 419 def _test_put(cls, queue, child_can_start, parent_can_continue): 330 420 child_can_start.wait() 331 421 for i in range(6): … … 391 481 proc.join() 392 482 393 def _test_get(self, queue, child_can_start, parent_can_continue): 483 @classmethod 484 def _test_get(cls, queue, child_can_start, parent_can_continue): 394 485 child_can_start.wait() 395 486 #queue.put(1) … … 452 543 proc.join() 453 544 454 def _test_fork(self, queue): 545 @classmethod 546 def _test_fork(cls, queue): 455 547 for i in range(10, 20): 456 548 queue.put(i) … … 476 568 # fork process 477 569 p = self.Process(target=self._test_fork, args=(queue,)) 570 p.daemon = True 478 571 p.start() 479 572 … … 500 593 self.assertEqual(q.qsize(), 0) 501 594 502 def _test_task_done(self, q): 595 @classmethod 596 def _test_task_done(cls, q): 503 597 for obj in iter(q.get, None): 504 598 time.sleep(DELTA) … … 509 603 510 604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): 511 return605 self.skipTest("requires 'queue.task_done()' method") 512 606 513 607 workers = [self.Process(target=self._test_task_done, args=(queue,)) … … 515 609 516 610 for p in workers: 611 p.daemon = True 517 612 p.start() 518 613 … … 612 707 class _TestCondition(BaseTestCase): 613 708 614 def f(self, cond, sleeping, woken, timeout=None): 709 @classmethod 710 def f(cls, cond, sleeping, woken, timeout=None): 615 711 cond.acquire() 616 712 sleeping.release() … … 744 840 class _TestEvent(BaseTestCase): 745 841 746 def _test_event(self, event): 842 @classmethod 843 def _test_event(cls, event): 747 844 time.sleep(TIMEOUT2) 748 845 event.set() … … 752 849 wait = TimingWrapper(event.wait) 753 850 754 # Removed tempora ily, due to API shear, this does not851 # Removed temporarily, due to API shear, this does not 755 852 # work with threading._Event objects. is_set == isSet 756 #self.assertEqual(event.is_set(), False) 757 758 self.assertEqual(wait(0.0), None) 853 self.assertEqual(event.is_set(), False) 854 855 # Removed, threading.Event.wait() will return the value of the __flag 856 # instead of None. API Shear with the semaphore backed mp.Event 857 self.assertEqual(wait(0.0), False) 759 858 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 760 self.assertEqual(wait(TIMEOUT1), None)859 self.assertEqual(wait(TIMEOUT1), False) 761 860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 762 861 … … 764 863 765 864 # See note above on the API differences 766 #self.assertEqual(event.is_set(), True)767 self.assertEqual(wait(), None)865 self.assertEqual(event.is_set(), True) 866 self.assertEqual(wait(), True) 768 867 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 769 self.assertEqual(wait(TIMEOUT1), None)868 self.assertEqual(wait(TIMEOUT1), True) 770 869 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 771 870 # self.assertEqual(event.is_set(), True) … … 775 874 #self.assertEqual(event.is_set(), False) 776 875 777 self.Process(target=self._test_event, args=(event,)).start() 778 self.assertEqual(wait(), None) 876 p = self.Process(target=self._test_event, args=(event,)) 877 p.daemon = True 878 p.start() 879 self.assertEqual(wait(), True) 779 880 780 881 # … … 783 884 784 885 class _TestValue(BaseTestCase): 886 887 ALLOWED_TYPES = ('processes',) 785 888 786 889 codes_values = [ … … 791 894 ] 792 895 793 def _test(self, values): 794 for sv, cv in zip(values, self.codes_values): 896 def setUp(self): 897 if not HAS_SHAREDCTYPES: 898 self.skipTest("requires multiprocessing.sharedctypes") 899 900 @classmethod 901 def _test(cls, values): 902 for sv, cv in zip(values, cls.codes_values): 795 903 sv.value = cv[2] 796 904 797 905 798 906 def test_value(self, raw=False): 799 if self.TYPE != 'processes':800 return801 802 907 if raw: 803 908 values = [self.RawValue(code, value) … … 811 916 812 917 proc = self.Process(target=self._test, args=(values,)) 918 proc.daemon = True 813 919 proc.start() 814 920 proc.join() … … 821 927 822 928 def test_getobj_getlock(self): 823 if self.TYPE != 'processes':824 return825 826 929 val1 = self.Value('i', 5) 827 930 lock1 = val1.get_lock() … … 851 954 class _TestArray(BaseTestCase): 852 955 853 def f(self, seq): 956 ALLOWED_TYPES = ('processes',) 957 958 @classmethod 959 def f(cls, seq): 854 960 for i in range(1, len(seq)): 855 961 seq[i] += seq[i-1] 856 962 963 @unittest.skipIf(c_int is None, "requires _ctypes") 857 964 def test_array(self, raw=False): 858 if self.TYPE != 'processes':859 return860 861 965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 862 966 if raw: … … 876 980 877 981 p = self.Process(target=self.f, args=(arr,)) 982 p.daemon = True 878 983 p.start() 879 984 p.join() … … 881 986 self.assertEqual(list(arr[:]), seq) 882 987 988 @unittest.skipIf(c_int is None, "requires _ctypes") 989 def test_array_from_size(self): 990 size = 10 991 # Test for zeroing (see issue #11675). 992 # The repetition below strengthens the test by increasing the chances 993 # of previously allocated non-zero memory being used for the new array 994 # on the 2nd and 3rd loops. 995 for _ in range(3): 996 arr = self.Array('i', size) 997 self.assertEqual(len(arr), size) 998 self.assertEqual(list(arr), [0] * size) 999 arr[:] = range(10) 1000 self.assertEqual(list(arr), range(10)) 1001 del arr 1002 1003 @unittest.skipIf(c_int is None, "requires _ctypes") 883 1004 def test_rawarray(self): 884 1005 self.test_array(raw=True) 885 1006 1007 @unittest.skipIf(c_int is None, "requires _ctypes") 1008 def test_array_accepts_long(self): 1009 arr = self.Array('i', 10L) 1010 self.assertEqual(len(arr), 10) 1011 raw_arr = self.RawArray('i', 10L) 1012 self.assertEqual(len(raw_arr), 10) 1013 1014 @unittest.skipIf(c_int is None, "requires _ctypes") 886 1015 def test_getobj_getlock_obj(self): 887 if self.TYPE != 'processes':888 return889 890 1016 arr1 = self.Array('i', range(10)) 891 1017 lock1 = arr1.get_lock() … … 992 1118 map(sqr, range(100))) 993 1119 1120 def test_map_chunksize(self): 1121 try: 1122 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1123 except multiprocessing.TimeoutError: 1124 self.fail("pool.map_async with chunksize stalled on null list") 1125 994 1126 def test_async(self): 995 1127 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) … … 1026 1158 1027 1159 def test_make_pool(self): 1160 self.assertRaises(ValueError, multiprocessing.Pool, -1) 1161 self.assertRaises(ValueError, multiprocessing.Pool, 0) 1162 1028 1163 p = multiprocessing.Pool(3) 1029 1164 self.assertEqual(3, len(p._pool)) … … 1048 1183 join() 1049 1184 self.assertTrue(join.elapsed < 0.2) 1185 1186 def test_empty_iterable(self): 1187 # See Issue 12157 1188 p = self.Pool(1) 1189 1190 self.assertEqual(p.map(sqr, []), []) 1191 self.assertEqual(list(p.imap(sqr, [])), []) 1192 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1193 self.assertEqual(p.map_async(sqr, []).get(), []) 1194 1195 p.close() 1196 p.join() 1197 1198 def unpickleable_result(): 1199 return lambda: 42 1200 1201 class _TestPoolWorkerErrors(BaseTestCase): 1202 ALLOWED_TYPES = ('processes', ) 1203 1204 def test_unpickleable_result(self): 1205 from multiprocessing.pool import MaybeEncodingError 1206 p = multiprocessing.Pool(2) 1207 1208 # Make sure we don't lose pool processes because of encoding errors. 1209 for iteration in range(20): 1210 res = p.apply_async(unpickleable_result) 1211 self.assertRaises(MaybeEncodingError, res.get) 1212 1213 p.close() 1214 p.join() 1215 1216 class _TestPoolWorkerLifetime(BaseTestCase): 1217 1218 ALLOWED_TYPES = ('processes', ) 1219 def test_pool_worker_lifetime(self): 1220 p = multiprocessing.Pool(3, maxtasksperchild=10) 1221 self.assertEqual(3, len(p._pool)) 1222 origworkerpids = [w.pid for w in p._pool] 1223 # Run many tasks so each worker gets replaced (hopefully) 1224 results = [] 1225 for i in range(100): 1226 results.append(p.apply_async(sqr, (i, ))) 1227 # Fetch the results and verify we got the right answers, 1228 # also ensuring all the tasks have completed. 1229 for (j, res) in enumerate(results): 1230 self.assertEqual(res.get(), sqr(j)) 1231 # Refill the pool 1232 p._repopulate_pool() 1233 # Wait until all workers are alive 1234 # (countdown * DELTA = 5 seconds max startup process time) 1235 countdown = 50 1236 while countdown and not all(w.is_alive() for w in p._pool): 1237 countdown -= 1 1238 time.sleep(DELTA) 1239 finalworkerpids = [w.pid for w in p._pool] 1240 # All pids should be assigned. See issue #7805. 1241 self.assertNotIn(None, origworkerpids) 1242 self.assertNotIn(None, finalworkerpids) 1243 # Finally, check that the worker pids have changed 1244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1245 p.close() 1246 p.join() 1247 1248 def test_pool_worker_lifetime_early_close(self): 1249 # Issue #10332: closing a pool whose workers have limited lifetimes 1250 # before all the tasks completed would make join() hang. 1251 p = multiprocessing.Pool(3, maxtasksperchild=1) 1252 results = [] 1253 for i in range(6): 1254 results.append(p.apply_async(sqr, (i, 0.3))) 1255 p.close() 1256 p.join() 1257 # check the results 1258 for (j, res) in enumerate(results): 1259 self.assertEqual(res.get(), sqr(j)) 1260 1261 1050 1262 # 1051 1263 # Test that manager has expected number of shared objects left … … 1064 1276 gc.collect() # do garbage collection 1065 1277 refs = self.manager._number_of_objects() 1278 debug_info = self.manager._debug_info() 1066 1279 if refs != EXPECTED_NUMBER: 1067 1280 print self.manager._debug_info() 1281 print debug_info 1068 1282 1069 1283 self.assertEqual(refs, EXPECTED_NUMBER) … … 1159 1373 ALLOWED_TYPES = ('manager',) 1160 1374 1161 def _putter(self, address, authkey): 1375 @classmethod 1376 def _putter(cls, address, authkey): 1162 1377 manager = QueueManager2( 1163 1378 address=address, authkey=authkey, serializer=SERIALIZER … … 1171 1386 1172 1387 manager = QueueManager( 1173 address=( 'localhost', 0), authkey=authkey, serializer=SERIALIZER1388 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER 1174 1389 ) 1175 1390 manager.start() 1176 1391 1177 1392 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1393 p.daemon = True 1178 1394 p.start() 1179 1395 … … 1197 1413 class _TestManagerRestart(BaseTestCase): 1198 1414 1199 def _putter(self, address, authkey): 1415 @classmethod 1416 def _putter(cls, address, authkey): 1200 1417 manager = QueueManager( 1201 1418 address=address, authkey=authkey, serializer=SERIALIZER) … … 1206 1423 def test_rapid_restart(self): 1207 1424 authkey = os.urandom(32) 1208 port = test_support.find_unused_port()1209 1425 manager = QueueManager( 1210 address=('localhost', port), authkey=authkey, serializer=SERIALIZER) 1426 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 1427 srvr = manager.get_server() 1428 addr = srvr.address 1429 # Close the connection.Listener socket which gets opened as a part 1430 # of manager.get_server(). It's not needed for the test. 1431 srvr.listener.close() 1211 1432 manager.start() 1212 1433 1213 1434 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1435 p.daemon = True 1214 1436 p.start() 1215 1437 queue = manager.get_queue() … … 1218 1440 manager.shutdown() 1219 1441 manager = QueueManager( 1220 address= ('localhost', port), authkey=authkey, serializer=SERIALIZER)1442 address=addr, authkey=authkey, serializer=SERIALIZER) 1221 1443 manager.start() 1222 1444 manager.shutdown() … … 1232 1454 ALLOWED_TYPES = ('processes', 'threads') 1233 1455 1234 def _echo(self, conn): 1456 @classmethod 1457 def _echo(cls, conn): 1235 1458 for msg in iter(conn.recv_bytes, SENTINEL): 1236 1459 conn.send_bytes(msg) … … 1291 1514 1292 1515 conn.send(None) 1516 time.sleep(.1) 1293 1517 1294 1518 self.assertEqual(poll(TIMEOUT1), True) … … 1334 1558 1335 1559 p = self.Process(target=self._echo, args=(child_conn,)) 1560 p.daemon = True 1336 1561 p.start() 1337 1562 child_conn.close() # this might complete before child initializes … … 1377 1602 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 1378 1603 1604 @classmethod 1605 def _is_fd_assigned(cls, fd): 1606 try: 1607 os.fstat(fd) 1608 except OSError as e: 1609 if e.errno == errno.EBADF: 1610 return False 1611 raise 1612 else: 1613 return True 1614 1615 @classmethod 1616 def _writefd(cls, conn, data, create_dummy_fds=False): 1617 if create_dummy_fds: 1618 for i in range(0, 256): 1619 if not cls._is_fd_assigned(i): 1620 os.dup2(conn.fileno(), i) 1621 fd = reduction.recv_handle(conn) 1622 if msvcrt: 1623 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 1624 os.write(fd, data) 1625 os.close(fd) 1626 1627 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1628 def test_fd_transfer(self): 1629 if self.TYPE != 'processes': 1630 self.skipTest("only makes sense with processes") 1631 conn, child_conn = self.Pipe(duplex=True) 1632 1633 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 1634 p.daemon = True 1635 p.start() 1636 with open(test_support.TESTFN, "wb") as f: 1637 fd = f.fileno() 1638 if msvcrt: 1639 fd = msvcrt.get_osfhandle(fd) 1640 reduction.send_handle(conn, fd, p.pid) 1641 p.join() 1642 with open(test_support.TESTFN, "rb") as f: 1643 self.assertEqual(f.read(), b"foo") 1644 1645 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1646 @unittest.skipIf(sys.platform == "win32", 1647 "test semantics don't make sense on Windows") 1648 @unittest.skipIf(MAXFD <= 256, 1649 "largest assignable fd number is too small") 1650 @unittest.skipUnless(hasattr(os, "dup2"), 1651 "test needs os.dup2()") 1652 def test_large_fd_transfer(self): 1653 # With fd > 256 (issue #11657) 1654 if self.TYPE != 'processes': 1655 self.skipTest("only makes sense with processes") 1656 conn, child_conn = self.Pipe(duplex=True) 1657 1658 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 1659 p.daemon = True 1660 p.start() 1661 with open(test_support.TESTFN, "wb") as f: 1662 fd = f.fileno() 1663 for newfd in range(256, MAXFD): 1664 if not self._is_fd_assigned(newfd): 1665 break 1666 else: 1667 self.fail("could not find an unassigned large file descriptor") 1668 os.dup2(fd, newfd) 1669 try: 1670 reduction.send_handle(conn, newfd, p.pid) 1671 finally: 1672 os.close(newfd) 1673 p.join() 1674 with open(test_support.TESTFN, "rb") as f: 1675 self.assertEqual(f.read(), b"bar") 1676 1677 @classmethod 1678 def _send_data_without_fd(self, conn): 1679 os.write(conn.fileno(), b"\0") 1680 1681 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1682 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 1683 def test_missing_fd_transfer(self): 1684 # Check that exception is raised when received data is not 1685 # accompanied by a file descriptor in ancillary data. 1686 if self.TYPE != 'processes': 1687 self.skipTest("only makes sense with processes") 1688 conn, child_conn = self.Pipe(duplex=True) 1689 1690 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 1691 p.daemon = True 1692 p.start() 1693 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 1694 p.join() 1695 1379 1696 class _TestListenerClient(BaseTestCase): 1380 1697 1381 1698 ALLOWED_TYPES = ('processes', 'threads') 1382 1699 1383 def _test(self, address): 1384 conn = self.connection.Client(address) 1700 @classmethod 1701 def _test(cls, address): 1702 conn = cls.connection.Client(address) 1385 1703 conn.send('hello') 1386 1704 conn.close() … … 1396 1714 p.join() 1397 1715 l.close() 1716 1717 def test_issue14725(self): 1718 l = self.connection.Listener() 1719 p = self.Process(target=self._test, args=(l.address,)) 1720 p.daemon = True 1721 p.start() 1722 time.sleep(1) 1723 # On Windows the client process should by now have connected, 1724 # written data and closed the pipe handle by now. This causes 1725 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 1726 # 14725. 1727 conn = l.accept() 1728 self.assertEqual(conn.recv(), 'hello') 1729 conn.close() 1730 p.join() 1731 l.close() 1732 1398 1733 # 1399 1734 # Test of sending connection and socket objects between processes … … 1446 1781 lconn, lconn0 = self.Pipe() 1447 1782 lp = self.Process(target=self._listener, args=(lconn0, families)) 1783 lp.daemon = True 1448 1784 lp.start() 1449 1785 lconn0.close() … … 1451 1787 rconn, rconn0 = self.Pipe() 1452 1788 rp = self.Process(target=self._remote, args=(rconn0,)) 1789 rp.daemon = True 1453 1790 rp.start() 1454 1791 rconn0.close() … … 1510 1847 all = [] 1511 1848 occupied = 0 1849 heap._lock.acquire() 1850 self.addCleanup(heap._lock.release) 1512 1851 for L in heap._len_to_seq.values(): 1513 1852 for arena, start, stop in L: … … 1527 1866 (stop == nstart)) 1528 1867 1529 # 1530 # 1531 # 1532 1533 try: 1534 from ctypes import Structure, Value, copy, c_int, c_double 1535 except ImportError: 1536 Structure = object 1537 c_int = c_double = None 1868 def test_free_from_gc(self): 1869 # Check that freeing of blocks by the garbage collector doesn't deadlock 1870 # (issue #12352). 1871 # Make sure the GC is enabled, and set lower collection thresholds to 1872 # make collections more frequent (and increase the probability of 1873 # deadlock). 1874 if not gc.isenabled(): 1875 gc.enable() 1876 self.addCleanup(gc.disable) 1877 thresholds = gc.get_threshold() 1878 self.addCleanup(gc.set_threshold, *thresholds) 1879 gc.set_threshold(10) 1880 1881 # perform numerous block allocations, with cyclic references to make 1882 # sure objects are collected asynchronously by the gc 1883 for i in range(5000): 1884 a = multiprocessing.heap.BufferWrapper(1) 1885 b = multiprocessing.heap.BufferWrapper(1) 1886 # circular references 1887 a.buddy = b 1888 b.buddy = a 1889 1890 # 1891 # 1892 # 1538 1893 1539 1894 class _Foo(Structure): … … 1547 1902 ALLOWED_TYPES = ('processes',) 1548 1903 1549 def _double(self, x, y, foo, arr, string): 1904 def setUp(self): 1905 if not HAS_SHAREDCTYPES: 1906 self.skipTest("requires multiprocessing.sharedctypes") 1907 1908 @classmethod 1909 def _double(cls, x, y, foo, arr, string): 1550 1910 x.value *= 2 1551 1911 y.value *= 2 … … 1557 1917 1558 1918 def test_sharedctypes(self, lock=False): 1559 if c_int is None:1560 return1561 1562 1919 x = Value('i', 7, lock=lock) 1563 1920 y = Value(c_double, 1.0/3.0, lock=lock) … … 1565 1922 arr = self.Array('d', range(10), lock=lock) 1566 1923 string = self.Array('c', 20, lock=lock) 1567 string.value = 'hello'1924 string.value = latin('hello') 1568 1925 1569 1926 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 1927 p.daemon = True 1570 1928 p.start() 1571 1929 p.join() … … 1583 1941 1584 1942 def test_copy(self): 1585 if c_int is None:1586 return1587 1588 1943 foo = _Foo(2, 5.0) 1589 1944 bar = copy(foo) … … 1601 1956 ALLOWED_TYPES = ('processes',) 1602 1957 1603 def _test_finalize(self, conn): 1958 @classmethod 1959 def _test_finalize(cls, conn): 1604 1960 class Foo(object): 1605 1961 pass … … 1632 1988 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 1633 1989 1634 # call mu tliprocessing's cleanup function then exit process without1990 # call multiprocessing's cleanup function then exit process without 1635 1991 # garbage collecting locals 1636 1992 util._exit_function() … … 1642 1998 1643 1999 p = self.Process(target=self._test_finalize, args=(child_conn,)) 2000 p.daemon = True 1644 2001 p.start() 1645 2002 p.join() … … 1657 2014 1658 2015 def test_import(self): 1659 modules = (2016 modules = [ 1660 2017 'multiprocessing', 'multiprocessing.connection', 1661 2018 'multiprocessing.heap', 'multiprocessing.managers', 1662 2019 'multiprocessing.pool', 'multiprocessing.process', 1663 'multiprocessing.reduction', 'multiprocessing.sharedctypes',1664 2020 'multiprocessing.synchronize', 'multiprocessing.util' 1665 ) 2021 ] 2022 2023 if HAS_REDUCTION: 2024 modules.append('multiprocessing.reduction') 2025 2026 if c_int is not None: 2027 # This module requires _ctypes 2028 modules.append('multiprocessing.sharedctypes') 1666 2029 1667 2030 for name in modules: … … 1691 2054 logger.setLevel(LOG_LEVEL) 1692 2055 1693 def _test_level(self, conn): 2056 @classmethod 2057 def _test_level(cls, conn): 1694 2058 logger = multiprocessing.get_logger() 1695 2059 conn.send(logger.getEffectiveLevel()) … … 1706 2070 1707 2071 logger.setLevel(LEVEL1) 1708 self.Process(target=self._test_level, args=(writer,)).start() 2072 p = self.Process(target=self._test_level, args=(writer,)) 2073 p.daemon = True 2074 p.start() 1709 2075 self.assertEqual(LEVEL1, reader.recv()) 1710 2076 1711 2077 logger.setLevel(logging.NOTSET) 1712 2078 root_logger.setLevel(LEVEL2) 1713 self.Process(target=self._test_level, args=(writer,)).start() 2079 p = self.Process(target=self._test_level, args=(writer,)) 2080 p.daemon = True 2081 p.start() 1714 2082 self.assertEqual(LEVEL2, reader.recv()) 1715 2083 … … 1717 2085 logger.setLevel(level=LOG_LEVEL) 1718 2086 2087 2088 # class _TestLoggingProcessName(BaseTestCase): 2089 # 2090 # def handle(self, record): 2091 # assert record.processName == multiprocessing.current_process().name 2092 # self.__handled = True 2093 # 2094 # def test_logging(self): 2095 # handler = logging.Handler() 2096 # handler.handle = self.handle 2097 # self.__handled = False 2098 # # Bypass getLogger() and side-effects 2099 # logger = logging.getLoggerClass()( 2100 # 'multiprocessing.test.TestLoggingProcessName') 2101 # logger.addHandler(handler) 2102 # logger.propagate = False 2103 # 2104 # logger.warn('foo') 2105 # assert self.__handled 2106 2107 # 2108 # Check that Process.join() retries if os.waitpid() fails with EINTR 2109 # 2110 2111 class _TestPollEintr(BaseTestCase): 2112 2113 ALLOWED_TYPES = ('processes',) 2114 2115 @classmethod 2116 def _killer(cls, pid): 2117 time.sleep(0.5) 2118 os.kill(pid, signal.SIGUSR1) 2119 2120 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2121 def test_poll_eintr(self): 2122 got_signal = [False] 2123 def record(*args): 2124 got_signal[0] = True 2125 pid = os.getpid() 2126 oldhandler = signal.signal(signal.SIGUSR1, record) 2127 try: 2128 killer = self.Process(target=self._killer, args=(pid,)) 2129 killer.start() 2130 p = self.Process(target=time.sleep, args=(1,)) 2131 p.start() 2132 p.join() 2133 self.assertTrue(got_signal[0]) 2134 self.assertEqual(p.exitcode, 0) 2135 killer.join() 2136 finally: 2137 signal.signal(signal.SIGUSR1, oldhandler) 2138 1719 2139 # 1720 2140 # Test to verify handle verification, see issue 3321 … … 1723 2143 class TestInvalidHandle(unittest.TestCase): 1724 2144 2145 @unittest.skipIf(WIN32, "skipped on Windows") 1725 2146 def test_invalid_handles(self): 1726 if WIN32:1727 return1728 2147 conn = _multiprocessing.Connection(44977608) 1729 2148 self.assertRaises(IOError, conn.poll) 1730 2149 self.assertRaises(IOError, _multiprocessing.Connection, -1) 2150 1731 2151 # 1732 2152 # Functions used to create test cases from the base ones in this module … … 1745 2165 result = {} 1746 2166 glob = globals() 1747 Type = type [0].upper() + type[1:]2167 Type = type.capitalize() 1748 2168 1749 2169 for name in glob.keys(): … … 1770 2190 'Condition', 'Event', 'Value', 'Array', 'RawValue', 1771 2191 'RawArray', 'current_process', 'active_children', 'Pipe', 1772 'connection', 'JoinableQueue' 2192 'connection', 'JoinableQueue', 'Pool' 1773 2193 ))) 1774 2194 … … 1784 2204 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 1785 2205 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', 1786 'Namespace', 'JoinableQueue' 2206 'Namespace', 'JoinableQueue', 'Pool' 1787 2207 ))) 1788 2208 … … 1798 2218 'Condition', 'Event', 'Value', 'Array', 'current_process', 1799 2219 'active_children', 'Pipe', 'connection', 'dict', 'list', 1800 'Namespace', 'JoinableQueue' 2220 'Namespace', 'JoinableQueue', 'Pool' 1801 2221 ))) 1802 2222 … … 1834 2254 1835 2255 # 2256 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 2257 # 2258 2259 def initializer(ns): 2260 ns.test += 1 2261 2262 class TestInitializers(unittest.TestCase): 2263 def setUp(self): 2264 self.mgr = multiprocessing.Manager() 2265 self.ns = self.mgr.Namespace() 2266 self.ns.test = 0 2267 2268 def tearDown(self): 2269 self.mgr.shutdown() 2270 2271 def test_manager_initializer(self): 2272 m = multiprocessing.managers.SyncManager() 2273 self.assertRaises(TypeError, m.start, 1) 2274 m.start(initializer, (self.ns,)) 2275 self.assertEqual(self.ns.test, 1) 2276 m.shutdown() 2277 2278 def test_pool_initializer(self): 2279 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 2280 p = multiprocessing.Pool(1, initializer, (self.ns,)) 2281 p.close() 2282 p.join() 2283 self.assertEqual(self.ns.test, 1) 2284 2285 # 1836 2286 # Issue 5155, 5313, 5331: Test process in processes 1837 2287 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 1838 2288 # 1839 2289 1840 def _ ThisSubProcess(q):2290 def _this_sub_process(q): 1841 2291 try: 1842 2292 item = q.get(block=False) … … 1844 2294 pass 1845 2295 1846 def _ TestProcess(q):2296 def _test_process(q): 1847 2297 queue = multiprocessing.Queue() 1848 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,)) 2298 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 2299 subProc.daemon = True 1849 2300 subProc.start() 1850 2301 subProc.join() … … 1882 2333 def test_queue_in_process(self): 1883 2334 queue = multiprocessing.Queue() 1884 proc = multiprocessing.Process(target=_ TestProcess, args=(queue,))2335 proc = multiprocessing.Process(target=_test_process, args=(queue,)) 1885 2336 proc.start() 1886 2337 proc.join() … … 1899 2350 assert sio.getvalue() == 'foo' 1900 2351 1901 testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor] 2352 # 2353 # Test interaction with socket timeouts - see Issue #6056 2354 # 2355 2356 class TestTimeouts(unittest.TestCase): 2357 @classmethod 2358 def _test_timeout(cls, child, address): 2359 time.sleep(1) 2360 child.send(123) 2361 child.close() 2362 conn = multiprocessing.connection.Client(address) 2363 conn.send(456) 2364 conn.close() 2365 2366 def test_timeout(self): 2367 old_timeout = socket.getdefaulttimeout() 2368 try: 2369 socket.setdefaulttimeout(0.1) 2370 parent, child = multiprocessing.Pipe(duplex=True) 2371 l = multiprocessing.connection.Listener(family='AF_INET') 2372 p = multiprocessing.Process(target=self._test_timeout, 2373 args=(child, l.address)) 2374 p.start() 2375 child.close() 2376 self.assertEqual(parent.recv(), 123) 2377 parent.close() 2378 conn = l.accept() 2379 self.assertEqual(conn.recv(), 456) 2380 conn.close() 2381 l.close() 2382 p.join(10) 2383 finally: 2384 socket.setdefaulttimeout(old_timeout) 2385 2386 # 2387 # Test what happens with no "if __name__ == '__main__'" 2388 # 2389 2390 class TestNoForkBomb(unittest.TestCase): 2391 def test_noforkbomb(self): 2392 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 2393 if WIN32: 2394 rc, out, err = test.script_helper.assert_python_failure(name) 2395 self.assertEqual('', out.decode('ascii')) 2396 self.assertIn('RuntimeError', err.decode('ascii')) 2397 else: 2398 rc, out, err = test.script_helper.assert_python_ok(name) 2399 self.assertEqual('123', out.decode('ascii').rstrip()) 2400 self.assertEqual('', err.decode('ascii')) 2401 2402 # 2403 # Issue 12098: check sys.flags of child matches that for parent 2404 # 2405 2406 class TestFlags(unittest.TestCase): 2407 @classmethod 2408 def run_in_grandchild(cls, conn): 2409 conn.send(tuple(sys.flags)) 2410 2411 @classmethod 2412 def run_in_child(cls): 2413 import json 2414 r, w = multiprocessing.Pipe(duplex=False) 2415 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 2416 p.start() 2417 grandchild_flags = r.recv() 2418 p.join() 2419 r.close() 2420 w.close() 2421 flags = (tuple(sys.flags), grandchild_flags) 2422 print(json.dumps(flags)) 2423 2424 def test_flags(self): 2425 import json, subprocess 2426 # start child process using unusual flags 2427 prog = ('from test.test_multiprocessing import TestFlags; ' + 2428 'TestFlags.run_in_child()') 2429 data = subprocess.check_output( 2430 [sys.executable, '-E', '-B', '-O', '-c', prog]) 2431 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 2432 self.assertEqual(child_flags, grandchild_flags) 2433 2434 # 2435 # Issue #17555: ForkAwareThreadLock 2436 # 2437 2438 class TestForkAwareThreadLock(unittest.TestCase): 2439 # We recurisvely start processes. Issue #17555 meant that the 2440 # after fork registry would get duplicate entries for the same 2441 # lock. The size of the registry at generation n was ~2**n. 2442 2443 @classmethod 2444 def child(cls, n, conn): 2445 if n > 1: 2446 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 2447 p.start() 2448 p.join() 2449 else: 2450 conn.send(len(util._afterfork_registry)) 2451 conn.close() 2452 2453 def test_lock(self): 2454 r, w = multiprocessing.Pipe(False) 2455 l = util.ForkAwareThreadLock() 2456 old_size = len(util._afterfork_registry) 2457 p = multiprocessing.Process(target=self.child, args=(5, w)) 2458 p.start() 2459 new_size = r.recv() 2460 p.join() 2461 self.assertLessEqual(new_size, old_size) 2462 2463 # 2464 # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 2465 # 2466 2467 class TestIgnoreEINTR(unittest.TestCase): 2468 2469 @classmethod 2470 def _test_ignore(cls, conn): 2471 def handler(signum, frame): 2472 pass 2473 signal.signal(signal.SIGUSR1, handler) 2474 conn.send('ready') 2475 x = conn.recv() 2476 conn.send(x) 2477 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block 2478 2479 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2480 def test_ignore(self): 2481 conn, child_conn = multiprocessing.Pipe() 2482 try: 2483 p = multiprocessing.Process(target=self._test_ignore, 2484 args=(child_conn,)) 2485 p.daemon = True 2486 p.start() 2487 child_conn.close() 2488 self.assertEqual(conn.recv(), 'ready') 2489 time.sleep(0.1) 2490 os.kill(p.pid, signal.SIGUSR1) 2491 time.sleep(0.1) 2492 conn.send(1234) 2493 self.assertEqual(conn.recv(), 1234) 2494 time.sleep(0.1) 2495 os.kill(p.pid, signal.SIGUSR1) 2496 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) 2497 time.sleep(0.1) 2498 p.join() 2499 finally: 2500 conn.close() 2501 2502 @classmethod 2503 def _test_ignore_listener(cls, conn): 2504 def handler(signum, frame): 2505 pass 2506 signal.signal(signal.SIGUSR1, handler) 2507 l = multiprocessing.connection.Listener() 2508 conn.send(l.address) 2509 a = l.accept() 2510 a.send('welcome') 2511 2512 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2513 def test_ignore_listener(self): 2514 conn, child_conn = multiprocessing.Pipe() 2515 try: 2516 p = multiprocessing.Process(target=self._test_ignore_listener, 2517 args=(child_conn,)) 2518 p.daemon = True 2519 p.start() 2520 child_conn.close() 2521 address = conn.recv() 2522 time.sleep(0.1) 2523 os.kill(p.pid, signal.SIGUSR1) 2524 time.sleep(0.1) 2525 client = multiprocessing.connection.Client(address) 2526 self.assertEqual(client.recv(), 'welcome') 2527 p.join() 2528 finally: 2529 conn.close() 2530 2531 # 2532 # 2533 # 2534 2535 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, 2536 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, 2537 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR] 1902 2538 1903 2539 # … … 1910 2546 lock = multiprocessing.RLock() 1911 2547 except OSError: 1912 from test.test_support import TestSkipped 1913 raise TestSkipped("OSError raises on RLock creation, see issue 3111!") 2548 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") 2549 2550 check_enough_semaphores() 1914 2551 1915 2552 if run is None: … … 1935 2572 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase 1936 2573 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) 1937 run(suite) 2574 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading 2575 # module during these tests is at least platform dependent and possibly 2576 # non-deterministic on any given platform. So we don't mind if the listed 2577 # warnings aren't actually raised. 2578 with test_support.check_py3k_warnings( 2579 (".+__(get|set)slice__ has been removed", DeprecationWarning), 2580 (r"sys.exc_clear\(\) not supported", DeprecationWarning), 2581 quiet=True): 2582 run(suite) 1938 2583 1939 2584 ThreadsMixin.pool.terminate()
Note:
See TracChangeset
for help on using the changeset viewer.