1 | #!/usr/bin/env python
|
---|
2 |
|
---|
3 | #
|
---|
4 | # Unit tests for the multiprocessing package
|
---|
5 | #
|
---|
6 |
|
---|
7 | import unittest
|
---|
8 | import Queue
|
---|
9 | import time
|
---|
10 | import sys
|
---|
11 | import os
|
---|
12 | import gc
|
---|
13 | import signal
|
---|
14 | import array
|
---|
15 | import socket
|
---|
16 | import random
|
---|
17 | import logging
|
---|
18 | import errno
|
---|
19 | import test.script_helper
|
---|
20 | from test import test_support
|
---|
21 | from StringIO import StringIO
|
---|
22 | _multiprocessing = test_support.import_module('_multiprocessing')
|
---|
23 | # import threading after _multiprocessing to raise a more relevant error
|
---|
24 | # message: "No module named _multiprocessing". _multiprocessing is not compiled
|
---|
25 | # without thread support.
|
---|
26 | import threading
|
---|
27 |
|
---|
28 | # Work around broken sem_open implementations
|
---|
29 | test_support.import_module('multiprocessing.synchronize')
|
---|
30 |
|
---|
31 | import multiprocessing.dummy
|
---|
32 | import multiprocessing.connection
|
---|
33 | import multiprocessing.managers
|
---|
34 | import multiprocessing.heap
|
---|
35 | import multiprocessing.pool
|
---|
36 |
|
---|
37 | from multiprocessing import util
|
---|
38 |
|
---|
39 | try:
|
---|
40 | from multiprocessing import reduction
|
---|
41 | HAS_REDUCTION = True
|
---|
42 | except ImportError:
|
---|
43 | HAS_REDUCTION = False
|
---|
44 |
|
---|
45 | try:
|
---|
46 | from multiprocessing.sharedctypes import Value, copy
|
---|
47 | HAS_SHAREDCTYPES = True
|
---|
48 | except ImportError:
|
---|
49 | HAS_SHAREDCTYPES = False
|
---|
50 |
|
---|
51 | try:
|
---|
52 | import msvcrt
|
---|
53 | except ImportError:
|
---|
54 | msvcrt = None
|
---|
55 |
|
---|
56 | #
|
---|
57 | #
|
---|
58 | #
|
---|
59 |
|
---|
60 | latin = str
|
---|
61 |
|
---|
62 | #
|
---|
63 | # Constants
|
---|
64 | #
|
---|
65 |
|
---|
66 | LOG_LEVEL = util.SUBWARNING
|
---|
67 | #LOG_LEVEL = logging.DEBUG
|
---|
68 |
|
---|
69 | DELTA = 0.1
|
---|
70 | CHECK_TIMINGS = False # making true makes tests take a lot longer
|
---|
71 | # and can sometimes cause some non-serious
|
---|
72 | # failures because some calls block a bit
|
---|
73 | # longer than expected
|
---|
74 | if CHECK_TIMINGS:
|
---|
75 | TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
|
---|
76 | else:
|
---|
77 | TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
|
---|
78 |
|
---|
79 | HAVE_GETVALUE = not getattr(_multiprocessing,
|
---|
80 | 'HAVE_BROKEN_SEM_GETVALUE', False)
|
---|
81 |
|
---|
82 | WIN32 = (sys.platform == "win32")
|
---|
83 |
|
---|
84 | try:
|
---|
85 | MAXFD = os.sysconf("SC_OPEN_MAX")
|
---|
86 | except:
|
---|
87 | MAXFD = 256
|
---|
88 |
|
---|
89 | #
|
---|
90 | # Some tests require ctypes
|
---|
91 | #
|
---|
92 |
|
---|
93 | try:
|
---|
94 | from ctypes import Structure, c_int, c_double
|
---|
95 | except ImportError:
|
---|
96 | Structure = object
|
---|
97 | c_int = c_double = None
|
---|
98 |
|
---|
99 |
|
---|
100 | def check_enough_semaphores():
|
---|
101 | """Check that the system supports enough semaphores to run the test."""
|
---|
102 | # minimum number of semaphores available according to POSIX
|
---|
103 | nsems_min = 256
|
---|
104 | try:
|
---|
105 | nsems = os.sysconf("SC_SEM_NSEMS_MAX")
|
---|
106 | except (AttributeError, ValueError):
|
---|
107 | # sysconf not available or setting not available
|
---|
108 | return
|
---|
109 | if nsems == -1 or nsems >= nsems_min:
|
---|
110 | return
|
---|
111 | raise unittest.SkipTest("The OS doesn't support enough semaphores "
|
---|
112 | "to run the test (required: %d)." % nsems_min)
|
---|
113 |
|
---|
114 |
|
---|
115 | #
|
---|
116 | # Creates a wrapper for a function which records the time it takes to finish
|
---|
117 | #
|
---|
118 |
|
---|
119 | class TimingWrapper(object):
|
---|
120 |
|
---|
121 | def __init__(self, func):
|
---|
122 | self.func = func
|
---|
123 | self.elapsed = None
|
---|
124 |
|
---|
125 | def __call__(self, *args, **kwds):
|
---|
126 | t = time.time()
|
---|
127 | try:
|
---|
128 | return self.func(*args, **kwds)
|
---|
129 | finally:
|
---|
130 | self.elapsed = time.time() - t
|
---|
131 |
|
---|
132 | #
|
---|
133 | # Base class for test cases
|
---|
134 | #
|
---|
135 |
|
---|
136 | class BaseTestCase(object):
|
---|
137 |
|
---|
138 | ALLOWED_TYPES = ('processes', 'manager', 'threads')
|
---|
139 |
|
---|
140 | def assertTimingAlmostEqual(self, a, b):
|
---|
141 | if CHECK_TIMINGS:
|
---|
142 | self.assertAlmostEqual(a, b, 1)
|
---|
143 |
|
---|
144 | def assertReturnsIfImplemented(self, value, func, *args):
|
---|
145 | try:
|
---|
146 | res = func(*args)
|
---|
147 | except NotImplementedError:
|
---|
148 | pass
|
---|
149 | else:
|
---|
150 | return self.assertEqual(value, res)
|
---|
151 |
|
---|
152 | # For the sanity of Windows users, rather than crashing or freezing in
|
---|
153 | # multiple ways.
|
---|
154 | def __reduce__(self, *args):
|
---|
155 | raise NotImplementedError("shouldn't try to pickle a test case")
|
---|
156 |
|
---|
157 | __reduce_ex__ = __reduce__
|
---|
158 |
|
---|
159 | #
|
---|
160 | # Return the value of a semaphore
|
---|
161 | #
|
---|
162 |
|
---|
163 | def get_value(self):
|
---|
164 | try:
|
---|
165 | return self.get_value()
|
---|
166 | except AttributeError:
|
---|
167 | try:
|
---|
168 | return self._Semaphore__value
|
---|
169 | except AttributeError:
|
---|
170 | try:
|
---|
171 | return self._value
|
---|
172 | except AttributeError:
|
---|
173 | raise NotImplementedError
|
---|
174 |
|
---|
175 | #
|
---|
176 | # Testcases
|
---|
177 | #
|
---|
178 |
|
---|
179 | class _TestProcess(BaseTestCase):
|
---|
180 |
|
---|
181 | ALLOWED_TYPES = ('processes', 'threads')
|
---|
182 |
|
---|
183 | def test_current(self):
|
---|
184 | if self.TYPE == 'threads':
|
---|
185 | return
|
---|
186 |
|
---|
187 | current = self.current_process()
|
---|
188 | authkey = current.authkey
|
---|
189 |
|
---|
190 | self.assertTrue(current.is_alive())
|
---|
191 | self.assertTrue(not current.daemon)
|
---|
192 | self.assertIsInstance(authkey, bytes)
|
---|
193 | self.assertTrue(len(authkey) > 0)
|
---|
194 | self.assertEqual(current.ident, os.getpid())
|
---|
195 | self.assertEqual(current.exitcode, None)
|
---|
196 |
|
---|
197 | @classmethod
|
---|
198 | def _test(cls, q, *args, **kwds):
|
---|
199 | current = cls.current_process()
|
---|
200 | q.put(args)
|
---|
201 | q.put(kwds)
|
---|
202 | q.put(current.name)
|
---|
203 | if cls.TYPE != 'threads':
|
---|
204 | q.put(bytes(current.authkey))
|
---|
205 | q.put(current.pid)
|
---|
206 |
|
---|
207 | def test_process(self):
|
---|
208 | q = self.Queue(1)
|
---|
209 | e = self.Event()
|
---|
210 | args = (q, 1, 2)
|
---|
211 | kwargs = {'hello':23, 'bye':2.54}
|
---|
212 | name = 'SomeProcess'
|
---|
213 | p = self.Process(
|
---|
214 | target=self._test, args=args, kwargs=kwargs, name=name
|
---|
215 | )
|
---|
216 | p.daemon = True
|
---|
217 | current = self.current_process()
|
---|
218 |
|
---|
219 | if self.TYPE != 'threads':
|
---|
220 | self.assertEqual(p.authkey, current.authkey)
|
---|
221 | self.assertEqual(p.is_alive(), False)
|
---|
222 | self.assertEqual(p.daemon, True)
|
---|
223 | self.assertNotIn(p, self.active_children())
|
---|
224 | self.assertTrue(type(self.active_children()) is list)
|
---|
225 | self.assertEqual(p.exitcode, None)
|
---|
226 |
|
---|
227 | p.start()
|
---|
228 |
|
---|
229 | self.assertEqual(p.exitcode, None)
|
---|
230 | self.assertEqual(p.is_alive(), True)
|
---|
231 | self.assertIn(p, self.active_children())
|
---|
232 |
|
---|
233 | self.assertEqual(q.get(), args[1:])
|
---|
234 | self.assertEqual(q.get(), kwargs)
|
---|
235 | self.assertEqual(q.get(), p.name)
|
---|
236 | if self.TYPE != 'threads':
|
---|
237 | self.assertEqual(q.get(), current.authkey)
|
---|
238 | self.assertEqual(q.get(), p.pid)
|
---|
239 |
|
---|
240 | p.join()
|
---|
241 |
|
---|
242 | self.assertEqual(p.exitcode, 0)
|
---|
243 | self.assertEqual(p.is_alive(), False)
|
---|
244 | self.assertNotIn(p, self.active_children())
|
---|
245 |
|
---|
246 | @classmethod
|
---|
247 | def _test_terminate(cls):
|
---|
248 | time.sleep(1000)
|
---|
249 |
|
---|
250 | def test_terminate(self):
|
---|
251 | if self.TYPE == 'threads':
|
---|
252 | return
|
---|
253 |
|
---|
254 | p = self.Process(target=self._test_terminate)
|
---|
255 | p.daemon = True
|
---|
256 | p.start()
|
---|
257 |
|
---|
258 | self.assertEqual(p.is_alive(), True)
|
---|
259 | self.assertIn(p, self.active_children())
|
---|
260 | self.assertEqual(p.exitcode, None)
|
---|
261 |
|
---|
262 | p.terminate()
|
---|
263 |
|
---|
264 | join = TimingWrapper(p.join)
|
---|
265 | self.assertEqual(join(), None)
|
---|
266 | self.assertTimingAlmostEqual(join.elapsed, 0.0)
|
---|
267 |
|
---|
268 | self.assertEqual(p.is_alive(), False)
|
---|
269 | self.assertNotIn(p, self.active_children())
|
---|
270 |
|
---|
271 | p.join()
|
---|
272 |
|
---|
273 | # XXX sometimes get p.exitcode == 0 on Windows ...
|
---|
274 | #self.assertEqual(p.exitcode, -signal.SIGTERM)
|
---|
275 |
|
---|
276 | def test_cpu_count(self):
|
---|
277 | try:
|
---|
278 | cpus = multiprocessing.cpu_count()
|
---|
279 | except NotImplementedError:
|
---|
280 | cpus = 1
|
---|
281 | self.assertTrue(type(cpus) is int)
|
---|
282 | self.assertTrue(cpus >= 1)
|
---|
283 |
|
---|
284 | def test_active_children(self):
|
---|
285 | self.assertEqual(type(self.active_children()), list)
|
---|
286 |
|
---|
287 | p = self.Process(target=time.sleep, args=(DELTA,))
|
---|
288 | self.assertNotIn(p, self.active_children())
|
---|
289 |
|
---|
290 | p.daemon = True
|
---|
291 | p.start()
|
---|
292 | self.assertIn(p, self.active_children())
|
---|
293 |
|
---|
294 | p.join()
|
---|
295 | self.assertNotIn(p, self.active_children())
|
---|
296 |
|
---|
297 | @classmethod
|
---|
298 | def _test_recursion(cls, wconn, id):
|
---|
299 | from multiprocessing import forking
|
---|
300 | wconn.send(id)
|
---|
301 | if len(id) < 2:
|
---|
302 | for i in range(2):
|
---|
303 | p = cls.Process(
|
---|
304 | target=cls._test_recursion, args=(wconn, id+[i])
|
---|
305 | )
|
---|
306 | p.start()
|
---|
307 | p.join()
|
---|
308 |
|
---|
309 | def test_recursion(self):
|
---|
310 | rconn, wconn = self.Pipe(duplex=False)
|
---|
311 | self._test_recursion(wconn, [])
|
---|
312 |
|
---|
313 | time.sleep(DELTA)
|
---|
314 | result = []
|
---|
315 | while rconn.poll():
|
---|
316 | result.append(rconn.recv())
|
---|
317 |
|
---|
318 | expected = [
|
---|
319 | [],
|
---|
320 | [0],
|
---|
321 | [0, 0],
|
---|
322 | [0, 1],
|
---|
323 | [1],
|
---|
324 | [1, 0],
|
---|
325 | [1, 1]
|
---|
326 | ]
|
---|
327 | self.assertEqual(result, expected)
|
---|
328 |
|
---|
329 | @classmethod
|
---|
330 | def _test_sys_exit(cls, reason, testfn):
|
---|
331 | sys.stderr = open(testfn, 'w')
|
---|
332 | sys.exit(reason)
|
---|
333 |
|
---|
334 | def test_sys_exit(self):
|
---|
335 | # See Issue 13854
|
---|
336 | if self.TYPE == 'threads':
|
---|
337 | return
|
---|
338 |
|
---|
339 | testfn = test_support.TESTFN
|
---|
340 | self.addCleanup(test_support.unlink, testfn)
|
---|
341 |
|
---|
342 | for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
|
---|
343 | p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
|
---|
344 | p.daemon = True
|
---|
345 | p.start()
|
---|
346 | p.join(5)
|
---|
347 | self.assertEqual(p.exitcode, code)
|
---|
348 |
|
---|
349 | with open(testfn, 'r') as f:
|
---|
350 | self.assertEqual(f.read().rstrip(), str(reason))
|
---|
351 |
|
---|
352 | for reason in (True, False, 8):
|
---|
353 | p = self.Process(target=sys.exit, args=(reason,))
|
---|
354 | p.daemon = True
|
---|
355 | p.start()
|
---|
356 | p.join(5)
|
---|
357 | self.assertEqual(p.exitcode, reason)
|
---|
358 |
|
---|
359 | #
|
---|
360 | #
|
---|
361 | #
|
---|
362 |
|
---|
363 | class _UpperCaser(multiprocessing.Process):
|
---|
364 |
|
---|
365 | def __init__(self):
|
---|
366 | multiprocessing.Process.__init__(self)
|
---|
367 | self.child_conn, self.parent_conn = multiprocessing.Pipe()
|
---|
368 |
|
---|
369 | def run(self):
|
---|
370 | self.parent_conn.close()
|
---|
371 | for s in iter(self.child_conn.recv, None):
|
---|
372 | self.child_conn.send(s.upper())
|
---|
373 | self.child_conn.close()
|
---|
374 |
|
---|
375 | def submit(self, s):
|
---|
376 | assert type(s) is str
|
---|
377 | self.parent_conn.send(s)
|
---|
378 | return self.parent_conn.recv()
|
---|
379 |
|
---|
380 | def stop(self):
|
---|
381 | self.parent_conn.send(None)
|
---|
382 | self.parent_conn.close()
|
---|
383 | self.child_conn.close()
|
---|
384 |
|
---|
385 | class _TestSubclassingProcess(BaseTestCase):
|
---|
386 |
|
---|
387 | ALLOWED_TYPES = ('processes',)
|
---|
388 |
|
---|
389 | def test_subclassing(self):
|
---|
390 | uppercaser = _UpperCaser()
|
---|
391 | uppercaser.daemon = True
|
---|
392 | uppercaser.start()
|
---|
393 | self.assertEqual(uppercaser.submit('hello'), 'HELLO')
|
---|
394 | self.assertEqual(uppercaser.submit('world'), 'WORLD')
|
---|
395 | uppercaser.stop()
|
---|
396 | uppercaser.join()
|
---|
397 |
|
---|
398 | #
|
---|
399 | #
|
---|
400 | #
|
---|
401 |
|
---|
402 | def queue_empty(q):
|
---|
403 | if hasattr(q, 'empty'):
|
---|
404 | return q.empty()
|
---|
405 | else:
|
---|
406 | return q.qsize() == 0
|
---|
407 |
|
---|
408 | def queue_full(q, maxsize):
|
---|
409 | if hasattr(q, 'full'):
|
---|
410 | return q.full()
|
---|
411 | else:
|
---|
412 | return q.qsize() == maxsize
|
---|
413 |
|
---|
414 |
|
---|
415 | class _TestQueue(BaseTestCase):
|
---|
416 |
|
---|
417 |
|
---|
418 | @classmethod
|
---|
419 | def _test_put(cls, queue, child_can_start, parent_can_continue):
|
---|
420 | child_can_start.wait()
|
---|
421 | for i in range(6):
|
---|
422 | queue.get()
|
---|
423 | parent_can_continue.set()
|
---|
424 |
|
---|
425 | def test_put(self):
|
---|
426 | MAXSIZE = 6
|
---|
427 | queue = self.Queue(maxsize=MAXSIZE)
|
---|
428 | child_can_start = self.Event()
|
---|
429 | parent_can_continue = self.Event()
|
---|
430 |
|
---|
431 | proc = self.Process(
|
---|
432 | target=self._test_put,
|
---|
433 | args=(queue, child_can_start, parent_can_continue)
|
---|
434 | )
|
---|
435 | proc.daemon = True
|
---|
436 | proc.start()
|
---|
437 |
|
---|
438 | self.assertEqual(queue_empty(queue), True)
|
---|
439 | self.assertEqual(queue_full(queue, MAXSIZE), False)
|
---|
440 |
|
---|
441 | queue.put(1)
|
---|
442 | queue.put(2, True)
|
---|
443 | queue.put(3, True, None)
|
---|
444 | queue.put(4, False)
|
---|
445 | queue.put(5, False, None)
|
---|
446 | queue.put_nowait(6)
|
---|
447 |
|
---|
448 | # the values may be in buffer but not yet in pipe so sleep a bit
|
---|
449 | time.sleep(DELTA)
|
---|
450 |
|
---|
451 | self.assertEqual(queue_empty(queue), False)
|
---|
452 | self.assertEqual(queue_full(queue, MAXSIZE), True)
|
---|
453 |
|
---|
454 | put = TimingWrapper(queue.put)
|
---|
455 | put_nowait = TimingWrapper(queue.put_nowait)
|
---|
456 |
|
---|
457 | self.assertRaises(Queue.Full, put, 7, False)
|
---|
458 | self.assertTimingAlmostEqual(put.elapsed, 0)
|
---|
459 |
|
---|
460 | self.assertRaises(Queue.Full, put, 7, False, None)
|
---|
461 | self.assertTimingAlmostEqual(put.elapsed, 0)
|
---|
462 |
|
---|
463 | self.assertRaises(Queue.Full, put_nowait, 7)
|
---|
464 | self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
|
---|
465 |
|
---|
466 | self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
|
---|
467 | self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
|
---|
468 |
|
---|
469 | self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
|
---|
470 | self.assertTimingAlmostEqual(put.elapsed, 0)
|
---|
471 |
|
---|
472 | self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
|
---|
473 | self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
|
---|
474 |
|
---|
475 | child_can_start.set()
|
---|
476 | parent_can_continue.wait()
|
---|
477 |
|
---|
478 | self.assertEqual(queue_empty(queue), True)
|
---|
479 | self.assertEqual(queue_full(queue, MAXSIZE), False)
|
---|
480 |
|
---|
481 | proc.join()
|
---|
482 |
|
---|
483 | @classmethod
|
---|
484 | def _test_get(cls, queue, child_can_start, parent_can_continue):
|
---|
485 | child_can_start.wait()
|
---|
486 | #queue.put(1)
|
---|
487 | queue.put(2)
|
---|
488 | queue.put(3)
|
---|
489 | queue.put(4)
|
---|
490 | queue.put(5)
|
---|
491 | parent_can_continue.set()
|
---|
492 |
|
---|
493 | def test_get(self):
|
---|
494 | queue = self.Queue()
|
---|
495 | child_can_start = self.Event()
|
---|
496 | parent_can_continue = self.Event()
|
---|
497 |
|
---|
498 | proc = self.Process(
|
---|
499 | target=self._test_get,
|
---|
500 | args=(queue, child_can_start, parent_can_continue)
|
---|
501 | )
|
---|
502 | proc.daemon = True
|
---|
503 | proc.start()
|
---|
504 |
|
---|
505 | self.assertEqual(queue_empty(queue), True)
|
---|
506 |
|
---|
507 | child_can_start.set()
|
---|
508 | parent_can_continue.wait()
|
---|
509 |
|
---|
510 | time.sleep(DELTA)
|
---|
511 | self.assertEqual(queue_empty(queue), False)
|
---|
512 |
|
---|
513 | # Hangs unexpectedly, remove for now
|
---|
514 | #self.assertEqual(queue.get(), 1)
|
---|
515 | self.assertEqual(queue.get(True, None), 2)
|
---|
516 | self.assertEqual(queue.get(True), 3)
|
---|
517 | self.assertEqual(queue.get(timeout=1), 4)
|
---|
518 | self.assertEqual(queue.get_nowait(), 5)
|
---|
519 |
|
---|
520 | self.assertEqual(queue_empty(queue), True)
|
---|
521 |
|
---|
522 | get = TimingWrapper(queue.get)
|
---|
523 | get_nowait = TimingWrapper(queue.get_nowait)
|
---|
524 |
|
---|
525 | self.assertRaises(Queue.Empty, get, False)
|
---|
526 | self.assertTimingAlmostEqual(get.elapsed, 0)
|
---|
527 |
|
---|
528 | self.assertRaises(Queue.Empty, get, False, None)
|
---|
529 | self.assertTimingAlmostEqual(get.elapsed, 0)
|
---|
530 |
|
---|
531 | self.assertRaises(Queue.Empty, get_nowait)
|
---|
532 | self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
|
---|
533 |
|
---|
534 | self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
|
---|
535 | self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
|
---|
536 |
|
---|
537 | self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
|
---|
538 | self.assertTimingAlmostEqual(get.elapsed, 0)
|
---|
539 |
|
---|
540 | self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
|
---|
541 | self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
|
---|
542 |
|
---|
543 | proc.join()
|
---|
544 |
|
---|
545 | @classmethod
|
---|
546 | def _test_fork(cls, queue):
|
---|
547 | for i in range(10, 20):
|
---|
548 | queue.put(i)
|
---|
549 | # note that at this point the items may only be buffered, so the
|
---|
550 | # process cannot shutdown until the feeder thread has finished
|
---|
551 | # pushing items onto the pipe.
|
---|
552 |
|
---|
553 | def test_fork(self):
|
---|
554 | # Old versions of Queue would fail to create a new feeder
|
---|
555 | # thread for a forked process if the original process had its
|
---|
556 | # own feeder thread. This test checks that this no longer
|
---|
557 | # happens.
|
---|
558 |
|
---|
559 | queue = self.Queue()
|
---|
560 |
|
---|
561 | # put items on queue so that main process starts a feeder thread
|
---|
562 | for i in range(10):
|
---|
563 | queue.put(i)
|
---|
564 |
|
---|
565 | # wait to make sure thread starts before we fork a new process
|
---|
566 | time.sleep(DELTA)
|
---|
567 |
|
---|
568 | # fork process
|
---|
569 | p = self.Process(target=self._test_fork, args=(queue,))
|
---|
570 | p.daemon = True
|
---|
571 | p.start()
|
---|
572 |
|
---|
573 | # check that all expected items are in the queue
|
---|
574 | for i in range(20):
|
---|
575 | self.assertEqual(queue.get(), i)
|
---|
576 | self.assertRaises(Queue.Empty, queue.get, False)
|
---|
577 |
|
---|
578 | p.join()
|
---|
579 |
|
---|
580 | def test_qsize(self):
|
---|
581 | q = self.Queue()
|
---|
582 | try:
|
---|
583 | self.assertEqual(q.qsize(), 0)
|
---|
584 | except NotImplementedError:
|
---|
585 | return
|
---|
586 | q.put(1)
|
---|
587 | self.assertEqual(q.qsize(), 1)
|
---|
588 | q.put(5)
|
---|
589 | self.assertEqual(q.qsize(), 2)
|
---|
590 | q.get()
|
---|
591 | self.assertEqual(q.qsize(), 1)
|
---|
592 | q.get()
|
---|
593 | self.assertEqual(q.qsize(), 0)
|
---|
594 |
|
---|
595 | @classmethod
|
---|
596 | def _test_task_done(cls, q):
|
---|
597 | for obj in iter(q.get, None):
|
---|
598 | time.sleep(DELTA)
|
---|
599 | q.task_done()
|
---|
600 |
|
---|
601 | def test_task_done(self):
|
---|
602 | queue = self.JoinableQueue()
|
---|
603 |
|
---|
604 | if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
|
---|
605 | self.skipTest("requires 'queue.task_done()' method")
|
---|
606 |
|
---|
607 | workers = [self.Process(target=self._test_task_done, args=(queue,))
|
---|
608 | for i in xrange(4)]
|
---|
609 |
|
---|
610 | for p in workers:
|
---|
611 | p.daemon = True
|
---|
612 | p.start()
|
---|
613 |
|
---|
614 | for i in xrange(10):
|
---|
615 | queue.put(i)
|
---|
616 |
|
---|
617 | queue.join()
|
---|
618 |
|
---|
619 | for p in workers:
|
---|
620 | queue.put(None)
|
---|
621 |
|
---|
622 | for p in workers:
|
---|
623 | p.join()
|
---|
624 |
|
---|
625 | #
|
---|
626 | #
|
---|
627 | #
|
---|
628 |
|
---|
629 | class _TestLock(BaseTestCase):
|
---|
630 |
|
---|
631 | def test_lock(self):
|
---|
632 | lock = self.Lock()
|
---|
633 | self.assertEqual(lock.acquire(), True)
|
---|
634 | self.assertEqual(lock.acquire(False), False)
|
---|
635 | self.assertEqual(lock.release(), None)
|
---|
636 | self.assertRaises((ValueError, threading.ThreadError), lock.release)
|
---|
637 |
|
---|
638 | def test_rlock(self):
|
---|
639 | lock = self.RLock()
|
---|
640 | self.assertEqual(lock.acquire(), True)
|
---|
641 | self.assertEqual(lock.acquire(), True)
|
---|
642 | self.assertEqual(lock.acquire(), True)
|
---|
643 | self.assertEqual(lock.release(), None)
|
---|
644 | self.assertEqual(lock.release(), None)
|
---|
645 | self.assertEqual(lock.release(), None)
|
---|
646 | self.assertRaises((AssertionError, RuntimeError), lock.release)
|
---|
647 |
|
---|
648 | def test_lock_context(self):
|
---|
649 | with self.Lock():
|
---|
650 | pass
|
---|
651 |
|
---|
652 |
|
---|
653 | class _TestSemaphore(BaseTestCase):
|
---|
654 |
|
---|
655 | def _test_semaphore(self, sem):
|
---|
656 | self.assertReturnsIfImplemented(2, get_value, sem)
|
---|
657 | self.assertEqual(sem.acquire(), True)
|
---|
658 | self.assertReturnsIfImplemented(1, get_value, sem)
|
---|
659 | self.assertEqual(sem.acquire(), True)
|
---|
660 | self.assertReturnsIfImplemented(0, get_value, sem)
|
---|
661 | self.assertEqual(sem.acquire(False), False)
|
---|
662 | self.assertReturnsIfImplemented(0, get_value, sem)
|
---|
663 | self.assertEqual(sem.release(), None)
|
---|
664 | self.assertReturnsIfImplemented(1, get_value, sem)
|
---|
665 | self.assertEqual(sem.release(), None)
|
---|
666 | self.assertReturnsIfImplemented(2, get_value, sem)
|
---|
667 |
|
---|
668 | def test_semaphore(self):
|
---|
669 | sem = self.Semaphore(2)
|
---|
670 | self._test_semaphore(sem)
|
---|
671 | self.assertEqual(sem.release(), None)
|
---|
672 | self.assertReturnsIfImplemented(3, get_value, sem)
|
---|
673 | self.assertEqual(sem.release(), None)
|
---|
674 | self.assertReturnsIfImplemented(4, get_value, sem)
|
---|
675 |
|
---|
676 | def test_bounded_semaphore(self):
|
---|
677 | sem = self.BoundedSemaphore(2)
|
---|
678 | self._test_semaphore(sem)
|
---|
679 | # Currently fails on OS/X
|
---|
680 | #if HAVE_GETVALUE:
|
---|
681 | # self.assertRaises(ValueError, sem.release)
|
---|
682 | # self.assertReturnsIfImplemented(2, get_value, sem)
|
---|
683 |
|
---|
684 | def test_timeout(self):
|
---|
685 | if self.TYPE != 'processes':
|
---|
686 | return
|
---|
687 |
|
---|
688 | sem = self.Semaphore(0)
|
---|
689 | acquire = TimingWrapper(sem.acquire)
|
---|
690 |
|
---|
691 | self.assertEqual(acquire(False), False)
|
---|
692 | self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
|
---|
693 |
|
---|
694 | self.assertEqual(acquire(False, None), False)
|
---|
695 | self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
|
---|
696 |
|
---|
697 | self.assertEqual(acquire(False, TIMEOUT1), False)
|
---|
698 | self.assertTimingAlmostEqual(acquire.elapsed, 0)
|
---|
699 |
|
---|
700 | self.assertEqual(acquire(True, TIMEOUT2), False)
|
---|
701 | self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
|
---|
702 |
|
---|
703 | self.assertEqual(acquire(timeout=TIMEOUT3), False)
|
---|
704 | self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
|
---|
705 |
|
---|
706 |
|
---|
707 | class _TestCondition(BaseTestCase):
|
---|
708 |
|
---|
709 | @classmethod
|
---|
710 | def f(cls, cond, sleeping, woken, timeout=None):
|
---|
711 | cond.acquire()
|
---|
712 | sleeping.release()
|
---|
713 | cond.wait(timeout)
|
---|
714 | woken.release()
|
---|
715 | cond.release()
|
---|
716 |
|
---|
717 | def check_invariant(self, cond):
|
---|
718 | # this is only supposed to succeed when there are no sleepers
|
---|
719 | if self.TYPE == 'processes':
|
---|
720 | try:
|
---|
721 | sleepers = (cond._sleeping_count.get_value() -
|
---|
722 | cond._woken_count.get_value())
|
---|
723 | self.assertEqual(sleepers, 0)
|
---|
724 | self.assertEqual(cond._wait_semaphore.get_value(), 0)
|
---|
725 | except NotImplementedError:
|
---|
726 | pass
|
---|
727 |
|
---|
728 | def test_notify(self):
|
---|
729 | cond = self.Condition()
|
---|
730 | sleeping = self.Semaphore(0)
|
---|
731 | woken = self.Semaphore(0)
|
---|
732 |
|
---|
733 | p = self.Process(target=self.f, args=(cond, sleeping, woken))
|
---|
734 | p.daemon = True
|
---|
735 | p.start()
|
---|
736 |
|
---|
737 | p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
|
---|
738 | p.daemon = True
|
---|
739 | p.start()
|
---|
740 |
|
---|
741 | # wait for both children to start sleeping
|
---|
742 | sleeping.acquire()
|
---|
743 | sleeping.acquire()
|
---|
744 |
|
---|
745 | # check no process/thread has woken up
|
---|
746 | time.sleep(DELTA)
|
---|
747 | self.assertReturnsIfImplemented(0, get_value, woken)
|
---|
748 |
|
---|
749 | # wake up one process/thread
|
---|
750 | cond.acquire()
|
---|
751 | cond.notify()
|
---|
752 | cond.release()
|
---|
753 |
|
---|
754 | # check one process/thread has woken up
|
---|
755 | time.sleep(DELTA)
|
---|
756 | self.assertReturnsIfImplemented(1, get_value, woken)
|
---|
757 |
|
---|
758 | # wake up another
|
---|
759 | cond.acquire()
|
---|
760 | cond.notify()
|
---|
761 | cond.release()
|
---|
762 |
|
---|
763 | # check other has woken up
|
---|
764 | time.sleep(DELTA)
|
---|
765 | self.assertReturnsIfImplemented(2, get_value, woken)
|
---|
766 |
|
---|
767 | # check state is not mucked up
|
---|
768 | self.check_invariant(cond)
|
---|
769 | p.join()
|
---|
770 |
|
---|
771 | def test_notify_all(self):
|
---|
772 | cond = self.Condition()
|
---|
773 | sleeping = self.Semaphore(0)
|
---|
774 | woken = self.Semaphore(0)
|
---|
775 |
|
---|
776 | # start some threads/processes which will timeout
|
---|
777 | for i in range(3):
|
---|
778 | p = self.Process(target=self.f,
|
---|
779 | args=(cond, sleeping, woken, TIMEOUT1))
|
---|
780 | p.daemon = True
|
---|
781 | p.start()
|
---|
782 |
|
---|
783 | t = threading.Thread(target=self.f,
|
---|
784 | args=(cond, sleeping, woken, TIMEOUT1))
|
---|
785 | t.daemon = True
|
---|
786 | t.start()
|
---|
787 |
|
---|
788 | # wait for them all to sleep
|
---|
789 | for i in xrange(6):
|
---|
790 | sleeping.acquire()
|
---|
791 |
|
---|
792 | # check they have all timed out
|
---|
793 | for i in xrange(6):
|
---|
794 | woken.acquire()
|
---|
795 | self.assertReturnsIfImplemented(0, get_value, woken)
|
---|
796 |
|
---|
797 | # check state is not mucked up
|
---|
798 | self.check_invariant(cond)
|
---|
799 |
|
---|
800 | # start some more threads/processes
|
---|
801 | for i in range(3):
|
---|
802 | p = self.Process(target=self.f, args=(cond, sleeping, woken))
|
---|
803 | p.daemon = True
|
---|
804 | p.start()
|
---|
805 |
|
---|
806 | t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
|
---|
807 | t.daemon = True
|
---|
808 | t.start()
|
---|
809 |
|
---|
810 | # wait for them to all sleep
|
---|
811 | for i in xrange(6):
|
---|
812 | sleeping.acquire()
|
---|
813 |
|
---|
814 | # check no process/thread has woken up
|
---|
815 | time.sleep(DELTA)
|
---|
816 | self.assertReturnsIfImplemented(0, get_value, woken)
|
---|
817 |
|
---|
818 | # wake them all up
|
---|
819 | cond.acquire()
|
---|
820 | cond.notify_all()
|
---|
821 | cond.release()
|
---|
822 |
|
---|
823 | # check they have all woken
|
---|
824 | time.sleep(DELTA)
|
---|
825 | self.assertReturnsIfImplemented(6, get_value, woken)
|
---|
826 |
|
---|
827 | # check state is not mucked up
|
---|
828 | self.check_invariant(cond)
|
---|
829 |
|
---|
830 | def test_timeout(self):
|
---|
831 | cond = self.Condition()
|
---|
832 | wait = TimingWrapper(cond.wait)
|
---|
833 | cond.acquire()
|
---|
834 | res = wait(TIMEOUT1)
|
---|
835 | cond.release()
|
---|
836 | self.assertEqual(res, None)
|
---|
837 | self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
|
---|
838 |
|
---|
839 |
|
---|
840 | class _TestEvent(BaseTestCase):
|
---|
841 |
|
---|
842 | @classmethod
|
---|
843 | def _test_event(cls, event):
|
---|
844 | time.sleep(TIMEOUT2)
|
---|
845 | event.set()
|
---|
846 |
|
---|
847 | def test_event(self):
|
---|
848 | event = self.Event()
|
---|
849 | wait = TimingWrapper(event.wait)
|
---|
850 |
|
---|
851 | # Removed temporarily, due to API shear, this does not
|
---|
852 | # work with threading._Event objects. is_set == isSet
|
---|
853 | self.assertEqual(event.is_set(), False)
|
---|
854 |
|
---|
855 | # Removed, threading.Event.wait() will return the value of the __flag
|
---|
856 | # instead of None. API Shear with the semaphore backed mp.Event
|
---|
857 | self.assertEqual(wait(0.0), False)
|
---|
858 | self.assertTimingAlmostEqual(wait.elapsed, 0.0)
|
---|
859 | self.assertEqual(wait(TIMEOUT1), False)
|
---|
860 | self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
|
---|
861 |
|
---|
862 | event.set()
|
---|
863 |
|
---|
864 | # See note above on the API differences
|
---|
865 | self.assertEqual(event.is_set(), True)
|
---|
866 | self.assertEqual(wait(), True)
|
---|
867 | self.assertTimingAlmostEqual(wait.elapsed, 0.0)
|
---|
868 | self.assertEqual(wait(TIMEOUT1), True)
|
---|
869 | self.assertTimingAlmostEqual(wait.elapsed, 0.0)
|
---|
870 | # self.assertEqual(event.is_set(), True)
|
---|
871 |
|
---|
872 | event.clear()
|
---|
873 |
|
---|
874 | #self.assertEqual(event.is_set(), False)
|
---|
875 |
|
---|
876 | p = self.Process(target=self._test_event, args=(event,))
|
---|
877 | p.daemon = True
|
---|
878 | p.start()
|
---|
879 | self.assertEqual(wait(), True)
|
---|
880 |
|
---|
881 | #
|
---|
882 | #
|
---|
883 | #
|
---|
884 |
|
---|
885 | class _TestValue(BaseTestCase):
|
---|
886 |
|
---|
887 | ALLOWED_TYPES = ('processes',)
|
---|
888 |
|
---|
889 | codes_values = [
|
---|
890 | ('i', 4343, 24234),
|
---|
891 | ('d', 3.625, -4.25),
|
---|
892 | ('h', -232, 234),
|
---|
893 | ('c', latin('x'), latin('y'))
|
---|
894 | ]
|
---|
895 |
|
---|
896 | def setUp(self):
|
---|
897 | if not HAS_SHAREDCTYPES:
|
---|
898 | self.skipTest("requires multiprocessing.sharedctypes")
|
---|
899 |
|
---|
900 | @classmethod
|
---|
901 | def _test(cls, values):
|
---|
902 | for sv, cv in zip(values, cls.codes_values):
|
---|
903 | sv.value = cv[2]
|
---|
904 |
|
---|
905 |
|
---|
906 | def test_value(self, raw=False):
|
---|
907 | if raw:
|
---|
908 | values = [self.RawValue(code, value)
|
---|
909 | for code, value, _ in self.codes_values]
|
---|
910 | else:
|
---|
911 | values = [self.Value(code, value)
|
---|
912 | for code, value, _ in self.codes_values]
|
---|
913 |
|
---|
914 | for sv, cv in zip(values, self.codes_values):
|
---|
915 | self.assertEqual(sv.value, cv[1])
|
---|
916 |
|
---|
917 | proc = self.Process(target=self._test, args=(values,))
|
---|
918 | proc.daemon = True
|
---|
919 | proc.start()
|
---|
920 | proc.join()
|
---|
921 |
|
---|
922 | for sv, cv in zip(values, self.codes_values):
|
---|
923 | self.assertEqual(sv.value, cv[2])
|
---|
924 |
|
---|
925 | def test_rawvalue(self):
|
---|
926 | self.test_value(raw=True)
|
---|
927 |
|
---|
928 | def test_getobj_getlock(self):
|
---|
929 | val1 = self.Value('i', 5)
|
---|
930 | lock1 = val1.get_lock()
|
---|
931 | obj1 = val1.get_obj()
|
---|
932 |
|
---|
933 | val2 = self.Value('i', 5, lock=None)
|
---|
934 | lock2 = val2.get_lock()
|
---|
935 | obj2 = val2.get_obj()
|
---|
936 |
|
---|
937 | lock = self.Lock()
|
---|
938 | val3 = self.Value('i', 5, lock=lock)
|
---|
939 | lock3 = val3.get_lock()
|
---|
940 | obj3 = val3.get_obj()
|
---|
941 | self.assertEqual(lock, lock3)
|
---|
942 |
|
---|
943 | arr4 = self.Value('i', 5, lock=False)
|
---|
944 | self.assertFalse(hasattr(arr4, 'get_lock'))
|
---|
945 | self.assertFalse(hasattr(arr4, 'get_obj'))
|
---|
946 |
|
---|
947 | self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
|
---|
948 |
|
---|
949 | arr5 = self.RawValue('i', 5)
|
---|
950 | self.assertFalse(hasattr(arr5, 'get_lock'))
|
---|
951 | self.assertFalse(hasattr(arr5, 'get_obj'))
|
---|
952 |
|
---|
953 |
|
---|
954 | class _TestArray(BaseTestCase):
|
---|
955 |
|
---|
956 | ALLOWED_TYPES = ('processes',)
|
---|
957 |
|
---|
958 | @classmethod
|
---|
959 | def f(cls, seq):
|
---|
960 | for i in range(1, len(seq)):
|
---|
961 | seq[i] += seq[i-1]
|
---|
962 |
|
---|
963 | @unittest.skipIf(c_int is None, "requires _ctypes")
|
---|
964 | def test_array(self, raw=False):
|
---|
965 | seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
|
---|
966 | if raw:
|
---|
967 | arr = self.RawArray('i', seq)
|
---|
968 | else:
|
---|
969 | arr = self.Array('i', seq)
|
---|
970 |
|
---|
971 | self.assertEqual(len(arr), len(seq))
|
---|
972 | self.assertEqual(arr[3], seq[3])
|
---|
973 | self.assertEqual(list(arr[2:7]), list(seq[2:7]))
|
---|
974 |
|
---|
975 | arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
|
---|
976 |
|
---|
977 | self.assertEqual(list(arr[:]), seq)
|
---|
978 |
|
---|
979 | self.f(seq)
|
---|
980 |
|
---|
981 | p = self.Process(target=self.f, args=(arr,))
|
---|
982 | p.daemon = True
|
---|
983 | p.start()
|
---|
984 | p.join()
|
---|
985 |
|
---|
986 | self.assertEqual(list(arr[:]), seq)
|
---|
987 |
|
---|
988 | @unittest.skipIf(c_int is None, "requires _ctypes")
|
---|
989 | def test_array_from_size(self):
|
---|
990 | size = 10
|
---|
991 | # Test for zeroing (see issue #11675).
|
---|
992 | # The repetition below strengthens the test by increasing the chances
|
---|
993 | # of previously allocated non-zero memory being used for the new array
|
---|
994 | # on the 2nd and 3rd loops.
|
---|
995 | for _ in range(3):
|
---|
996 | arr = self.Array('i', size)
|
---|
997 | self.assertEqual(len(arr), size)
|
---|
998 | self.assertEqual(list(arr), [0] * size)
|
---|
999 | arr[:] = range(10)
|
---|
1000 | self.assertEqual(list(arr), range(10))
|
---|
1001 | del arr
|
---|
1002 |
|
---|
1003 | @unittest.skipIf(c_int is None, "requires _ctypes")
|
---|
1004 | def test_rawarray(self):
|
---|
1005 | self.test_array(raw=True)
|
---|
1006 |
|
---|
1007 | @unittest.skipIf(c_int is None, "requires _ctypes")
|
---|
1008 | def test_array_accepts_long(self):
|
---|
1009 | arr = self.Array('i', 10L)
|
---|
1010 | self.assertEqual(len(arr), 10)
|
---|
1011 | raw_arr = self.RawArray('i', 10L)
|
---|
1012 | self.assertEqual(len(raw_arr), 10)
|
---|
1013 |
|
---|
1014 | @unittest.skipIf(c_int is None, "requires _ctypes")
|
---|
1015 | def test_getobj_getlock_obj(self):
|
---|
1016 | arr1 = self.Array('i', range(10))
|
---|
1017 | lock1 = arr1.get_lock()
|
---|
1018 | obj1 = arr1.get_obj()
|
---|
1019 |
|
---|
1020 | arr2 = self.Array('i', range(10), lock=None)
|
---|
1021 | lock2 = arr2.get_lock()
|
---|
1022 | obj2 = arr2.get_obj()
|
---|
1023 |
|
---|
1024 | lock = self.Lock()
|
---|
1025 | arr3 = self.Array('i', range(10), lock=lock)
|
---|
1026 | lock3 = arr3.get_lock()
|
---|
1027 | obj3 = arr3.get_obj()
|
---|
1028 | self.assertEqual(lock, lock3)
|
---|
1029 |
|
---|
1030 | arr4 = self.Array('i', range(10), lock=False)
|
---|
1031 | self.assertFalse(hasattr(arr4, 'get_lock'))
|
---|
1032 | self.assertFalse(hasattr(arr4, 'get_obj'))
|
---|
1033 | self.assertRaises(AttributeError,
|
---|
1034 | self.Array, 'i', range(10), lock='notalock')
|
---|
1035 |
|
---|
1036 | arr5 = self.RawArray('i', range(10))
|
---|
1037 | self.assertFalse(hasattr(arr5, 'get_lock'))
|
---|
1038 | self.assertFalse(hasattr(arr5, 'get_obj'))
|
---|
1039 |
|
---|
1040 | #
|
---|
1041 | #
|
---|
1042 | #
|
---|
1043 |
|
---|
1044 | class _TestContainers(BaseTestCase):
|
---|
1045 |
|
---|
1046 | ALLOWED_TYPES = ('manager',)
|
---|
1047 |
|
---|
1048 | def test_list(self):
|
---|
1049 | a = self.list(range(10))
|
---|
1050 | self.assertEqual(a[:], range(10))
|
---|
1051 |
|
---|
1052 | b = self.list()
|
---|
1053 | self.assertEqual(b[:], [])
|
---|
1054 |
|
---|
1055 | b.extend(range(5))
|
---|
1056 | self.assertEqual(b[:], range(5))
|
---|
1057 |
|
---|
1058 | self.assertEqual(b[2], 2)
|
---|
1059 | self.assertEqual(b[2:10], [2,3,4])
|
---|
1060 |
|
---|
1061 | b *= 2
|
---|
1062 | self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
|
---|
1063 |
|
---|
1064 | self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
|
---|
1065 |
|
---|
1066 | self.assertEqual(a[:], range(10))
|
---|
1067 |
|
---|
1068 | d = [a, b]
|
---|
1069 | e = self.list(d)
|
---|
1070 | self.assertEqual(
|
---|
1071 | e[:],
|
---|
1072 | [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
|
---|
1073 | )
|
---|
1074 |
|
---|
1075 | f = self.list([a])
|
---|
1076 | a.append('hello')
|
---|
1077 | self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
|
---|
1078 |
|
---|
1079 | def test_dict(self):
|
---|
1080 | d = self.dict()
|
---|
1081 | indices = range(65, 70)
|
---|
1082 | for i in indices:
|
---|
1083 | d[i] = chr(i)
|
---|
1084 | self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
|
---|
1085 | self.assertEqual(sorted(d.keys()), indices)
|
---|
1086 | self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
|
---|
1087 | self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
|
---|
1088 |
|
---|
1089 | def test_namespace(self):
|
---|
1090 | n = self.Namespace()
|
---|
1091 | n.name = 'Bob'
|
---|
1092 | n.job = 'Builder'
|
---|
1093 | n._hidden = 'hidden'
|
---|
1094 | self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
|
---|
1095 | del n.job
|
---|
1096 | self.assertEqual(str(n), "Namespace(name='Bob')")
|
---|
1097 | self.assertTrue(hasattr(n, 'name'))
|
---|
1098 | self.assertTrue(not hasattr(n, 'job'))
|
---|
1099 |
|
---|
1100 | #
|
---|
1101 | #
|
---|
1102 | #
|
---|
1103 |
|
---|
1104 | def sqr(x, wait=0.0):
|
---|
1105 | time.sleep(wait)
|
---|
1106 | return x*x
|
---|
1107 | class _TestPool(BaseTestCase):
|
---|
1108 |
|
---|
1109 | def test_apply(self):
|
---|
1110 | papply = self.pool.apply
|
---|
1111 | self.assertEqual(papply(sqr, (5,)), sqr(5))
|
---|
1112 | self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
|
---|
1113 |
|
---|
1114 | def test_map(self):
|
---|
1115 | pmap = self.pool.map
|
---|
1116 | self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
|
---|
1117 | self.assertEqual(pmap(sqr, range(100), chunksize=20),
|
---|
1118 | map(sqr, range(100)))
|
---|
1119 |
|
---|
1120 | def test_map_chunksize(self):
|
---|
1121 | try:
|
---|
1122 | self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
|
---|
1123 | except multiprocessing.TimeoutError:
|
---|
1124 | self.fail("pool.map_async with chunksize stalled on null list")
|
---|
1125 |
|
---|
1126 | def test_async(self):
|
---|
1127 | res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
|
---|
1128 | get = TimingWrapper(res.get)
|
---|
1129 | self.assertEqual(get(), 49)
|
---|
1130 | self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
|
---|
1131 |
|
---|
1132 | def test_async_timeout(self):
|
---|
1133 | res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
|
---|
1134 | get = TimingWrapper(res.get)
|
---|
1135 | self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
|
---|
1136 | self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
|
---|
1137 |
|
---|
1138 | def test_imap(self):
|
---|
1139 | it = self.pool.imap(sqr, range(10))
|
---|
1140 | self.assertEqual(list(it), map(sqr, range(10)))
|
---|
1141 |
|
---|
1142 | it = self.pool.imap(sqr, range(10))
|
---|
1143 | for i in range(10):
|
---|
1144 | self.assertEqual(it.next(), i*i)
|
---|
1145 | self.assertRaises(StopIteration, it.next)
|
---|
1146 |
|
---|
1147 | it = self.pool.imap(sqr, range(1000), chunksize=100)
|
---|
1148 | for i in range(1000):
|
---|
1149 | self.assertEqual(it.next(), i*i)
|
---|
1150 | self.assertRaises(StopIteration, it.next)
|
---|
1151 |
|
---|
1152 | def test_imap_unordered(self):
|
---|
1153 | it = self.pool.imap_unordered(sqr, range(1000))
|
---|
1154 | self.assertEqual(sorted(it), map(sqr, range(1000)))
|
---|
1155 |
|
---|
1156 | it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
|
---|
1157 | self.assertEqual(sorted(it), map(sqr, range(1000)))
|
---|
1158 |
|
---|
1159 | def test_make_pool(self):
|
---|
1160 | self.assertRaises(ValueError, multiprocessing.Pool, -1)
|
---|
1161 | self.assertRaises(ValueError, multiprocessing.Pool, 0)
|
---|
1162 |
|
---|
1163 | p = multiprocessing.Pool(3)
|
---|
1164 | self.assertEqual(3, len(p._pool))
|
---|
1165 | p.close()
|
---|
1166 | p.join()
|
---|
1167 |
|
---|
1168 | def test_terminate(self):
|
---|
1169 | if self.TYPE == 'manager':
|
---|
1170 | # On Unix a forked process increfs each shared object to
|
---|
1171 | # which its parent process held a reference. If the
|
---|
1172 | # forked process gets terminated then there is likely to
|
---|
1173 | # be a reference leak. So to prevent
|
---|
1174 | # _TestZZZNumberOfObjects from failing we skip this test
|
---|
1175 | # when using a manager.
|
---|
1176 | return
|
---|
1177 |
|
---|
1178 | result = self.pool.map_async(
|
---|
1179 | time.sleep, [0.1 for i in range(10000)], chunksize=1
|
---|
1180 | )
|
---|
1181 | self.pool.terminate()
|
---|
1182 | join = TimingWrapper(self.pool.join)
|
---|
1183 | join()
|
---|
1184 | self.assertTrue(join.elapsed < 0.2)
|
---|
1185 |
|
---|
1186 | def test_empty_iterable(self):
|
---|
1187 | # See Issue 12157
|
---|
1188 | p = self.Pool(1)
|
---|
1189 |
|
---|
1190 | self.assertEqual(p.map(sqr, []), [])
|
---|
1191 | self.assertEqual(list(p.imap(sqr, [])), [])
|
---|
1192 | self.assertEqual(list(p.imap_unordered(sqr, [])), [])
|
---|
1193 | self.assertEqual(p.map_async(sqr, []).get(), [])
|
---|
1194 |
|
---|
1195 | p.close()
|
---|
1196 | p.join()
|
---|
1197 |
|
---|
1198 | def unpickleable_result():
|
---|
1199 | return lambda: 42
|
---|
1200 |
|
---|
1201 | class _TestPoolWorkerErrors(BaseTestCase):
|
---|
1202 | ALLOWED_TYPES = ('processes', )
|
---|
1203 |
|
---|
1204 | def test_unpickleable_result(self):
|
---|
1205 | from multiprocessing.pool import MaybeEncodingError
|
---|
1206 | p = multiprocessing.Pool(2)
|
---|
1207 |
|
---|
1208 | # Make sure we don't lose pool processes because of encoding errors.
|
---|
1209 | for iteration in range(20):
|
---|
1210 | res = p.apply_async(unpickleable_result)
|
---|
1211 | self.assertRaises(MaybeEncodingError, res.get)
|
---|
1212 |
|
---|
1213 | p.close()
|
---|
1214 | p.join()
|
---|
1215 |
|
---|
1216 | class _TestPoolWorkerLifetime(BaseTestCase):
|
---|
1217 |
|
---|
1218 | ALLOWED_TYPES = ('processes', )
|
---|
1219 | def test_pool_worker_lifetime(self):
|
---|
1220 | p = multiprocessing.Pool(3, maxtasksperchild=10)
|
---|
1221 | self.assertEqual(3, len(p._pool))
|
---|
1222 | origworkerpids = [w.pid for w in p._pool]
|
---|
1223 | # Run many tasks so each worker gets replaced (hopefully)
|
---|
1224 | results = []
|
---|
1225 | for i in range(100):
|
---|
1226 | results.append(p.apply_async(sqr, (i, )))
|
---|
1227 | # Fetch the results and verify we got the right answers,
|
---|
1228 | # also ensuring all the tasks have completed.
|
---|
1229 | for (j, res) in enumerate(results):
|
---|
1230 | self.assertEqual(res.get(), sqr(j))
|
---|
1231 | # Refill the pool
|
---|
1232 | p._repopulate_pool()
|
---|
1233 | # Wait until all workers are alive
|
---|
1234 | # (countdown * DELTA = 5 seconds max startup process time)
|
---|
1235 | countdown = 50
|
---|
1236 | while countdown and not all(w.is_alive() for w in p._pool):
|
---|
1237 | countdown -= 1
|
---|
1238 | time.sleep(DELTA)
|
---|
1239 | finalworkerpids = [w.pid for w in p._pool]
|
---|
1240 | # All pids should be assigned. See issue #7805.
|
---|
1241 | self.assertNotIn(None, origworkerpids)
|
---|
1242 | self.assertNotIn(None, finalworkerpids)
|
---|
1243 | # Finally, check that the worker pids have changed
|
---|
1244 | self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
|
---|
1245 | p.close()
|
---|
1246 | p.join()
|
---|
1247 |
|
---|
1248 | def test_pool_worker_lifetime_early_close(self):
|
---|
1249 | # Issue #10332: closing a pool whose workers have limited lifetimes
|
---|
1250 | # before all the tasks completed would make join() hang.
|
---|
1251 | p = multiprocessing.Pool(3, maxtasksperchild=1)
|
---|
1252 | results = []
|
---|
1253 | for i in range(6):
|
---|
1254 | results.append(p.apply_async(sqr, (i, 0.3)))
|
---|
1255 | p.close()
|
---|
1256 | p.join()
|
---|
1257 | # check the results
|
---|
1258 | for (j, res) in enumerate(results):
|
---|
1259 | self.assertEqual(res.get(), sqr(j))
|
---|
1260 |
|
---|
1261 |
|
---|
1262 | #
|
---|
1263 | # Test that manager has expected number of shared objects left
|
---|
1264 | #
|
---|
1265 |
|
---|
1266 | class _TestZZZNumberOfObjects(BaseTestCase):
|
---|
1267 | # Because test cases are sorted alphabetically, this one will get
|
---|
1268 | # run after all the other tests for the manager. It tests that
|
---|
1269 | # there have been no "reference leaks" for the manager's shared
|
---|
1270 | # objects. Note the comment in _TestPool.test_terminate().
|
---|
1271 | ALLOWED_TYPES = ('manager',)
|
---|
1272 |
|
---|
1273 | def test_number_of_objects(self):
|
---|
1274 | EXPECTED_NUMBER = 1 # the pool object is still alive
|
---|
1275 | multiprocessing.active_children() # discard dead process objs
|
---|
1276 | gc.collect() # do garbage collection
|
---|
1277 | refs = self.manager._number_of_objects()
|
---|
1278 | debug_info = self.manager._debug_info()
|
---|
1279 | if refs != EXPECTED_NUMBER:
|
---|
1280 | print self.manager._debug_info()
|
---|
1281 | print debug_info
|
---|
1282 |
|
---|
1283 | self.assertEqual(refs, EXPECTED_NUMBER)
|
---|
1284 |
|
---|
1285 | #
|
---|
1286 | # Test of creating a customized manager class
|
---|
1287 | #
|
---|
1288 |
|
---|
1289 | from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
|
---|
1290 |
|
---|
1291 | class FooBar(object):
|
---|
1292 | def f(self):
|
---|
1293 | return 'f()'
|
---|
1294 | def g(self):
|
---|
1295 | raise ValueError
|
---|
1296 | def _h(self):
|
---|
1297 | return '_h()'
|
---|
1298 |
|
---|
1299 | def baz():
|
---|
1300 | for i in xrange(10):
|
---|
1301 | yield i*i
|
---|
1302 |
|
---|
1303 | class IteratorProxy(BaseProxy):
|
---|
1304 | _exposed_ = ('next', '__next__')
|
---|
1305 | def __iter__(self):
|
---|
1306 | return self
|
---|
1307 | def next(self):
|
---|
1308 | return self._callmethod('next')
|
---|
1309 | def __next__(self):
|
---|
1310 | return self._callmethod('__next__')
|
---|
1311 |
|
---|
1312 | class MyManager(BaseManager):
|
---|
1313 | pass
|
---|
1314 |
|
---|
1315 | MyManager.register('Foo', callable=FooBar)
|
---|
1316 | MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
|
---|
1317 | MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
|
---|
1318 |
|
---|
1319 |
|
---|
1320 | class _TestMyManager(BaseTestCase):
|
---|
1321 |
|
---|
1322 | ALLOWED_TYPES = ('manager',)
|
---|
1323 |
|
---|
1324 | def test_mymanager(self):
|
---|
1325 | manager = MyManager()
|
---|
1326 | manager.start()
|
---|
1327 |
|
---|
1328 | foo = manager.Foo()
|
---|
1329 | bar = manager.Bar()
|
---|
1330 | baz = manager.baz()
|
---|
1331 |
|
---|
1332 | foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
|
---|
1333 | bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
|
---|
1334 |
|
---|
1335 | self.assertEqual(foo_methods, ['f', 'g'])
|
---|
1336 | self.assertEqual(bar_methods, ['f', '_h'])
|
---|
1337 |
|
---|
1338 | self.assertEqual(foo.f(), 'f()')
|
---|
1339 | self.assertRaises(ValueError, foo.g)
|
---|
1340 | self.assertEqual(foo._callmethod('f'), 'f()')
|
---|
1341 | self.assertRaises(RemoteError, foo._callmethod, '_h')
|
---|
1342 |
|
---|
1343 | self.assertEqual(bar.f(), 'f()')
|
---|
1344 | self.assertEqual(bar._h(), '_h()')
|
---|
1345 | self.assertEqual(bar._callmethod('f'), 'f()')
|
---|
1346 | self.assertEqual(bar._callmethod('_h'), '_h()')
|
---|
1347 |
|
---|
1348 | self.assertEqual(list(baz), [i*i for i in range(10)])
|
---|
1349 |
|
---|
1350 | manager.shutdown()
|
---|
1351 |
|
---|
1352 | #
|
---|
1353 | # Test of connecting to a remote server and using xmlrpclib for serialization
|
---|
1354 | #
|
---|
1355 |
|
---|
1356 | _queue = Queue.Queue()
|
---|
1357 | def get_queue():
|
---|
1358 | return _queue
|
---|
1359 |
|
---|
1360 | class QueueManager(BaseManager):
|
---|
1361 | '''manager class used by server process'''
|
---|
1362 | QueueManager.register('get_queue', callable=get_queue)
|
---|
1363 |
|
---|
1364 | class QueueManager2(BaseManager):
|
---|
1365 | '''manager class which specifies the same interface as QueueManager'''
|
---|
1366 | QueueManager2.register('get_queue')
|
---|
1367 |
|
---|
1368 |
|
---|
1369 | SERIALIZER = 'xmlrpclib'
|
---|
1370 |
|
---|
1371 | class _TestRemoteManager(BaseTestCase):
|
---|
1372 |
|
---|
1373 | ALLOWED_TYPES = ('manager',)
|
---|
1374 |
|
---|
1375 | @classmethod
|
---|
1376 | def _putter(cls, address, authkey):
|
---|
1377 | manager = QueueManager2(
|
---|
1378 | address=address, authkey=authkey, serializer=SERIALIZER
|
---|
1379 | )
|
---|
1380 | manager.connect()
|
---|
1381 | queue = manager.get_queue()
|
---|
1382 | queue.put(('hello world', None, True, 2.25))
|
---|
1383 |
|
---|
1384 | def test_remote(self):
|
---|
1385 | authkey = os.urandom(32)
|
---|
1386 |
|
---|
1387 | manager = QueueManager(
|
---|
1388 | address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
|
---|
1389 | )
|
---|
1390 | manager.start()
|
---|
1391 |
|
---|
1392 | p = self.Process(target=self._putter, args=(manager.address, authkey))
|
---|
1393 | p.daemon = True
|
---|
1394 | p.start()
|
---|
1395 |
|
---|
1396 | manager2 = QueueManager2(
|
---|
1397 | address=manager.address, authkey=authkey, serializer=SERIALIZER
|
---|
1398 | )
|
---|
1399 | manager2.connect()
|
---|
1400 | queue = manager2.get_queue()
|
---|
1401 |
|
---|
1402 | # Note that xmlrpclib will deserialize object as a list not a tuple
|
---|
1403 | self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
|
---|
1404 |
|
---|
1405 | # Because we are using xmlrpclib for serialization instead of
|
---|
1406 | # pickle this will cause a serialization error.
|
---|
1407 | self.assertRaises(Exception, queue.put, time.sleep)
|
---|
1408 |
|
---|
1409 | # Make queue finalizer run before the server is stopped
|
---|
1410 | del queue
|
---|
1411 | manager.shutdown()
|
---|
1412 |
|
---|
1413 | class _TestManagerRestart(BaseTestCase):
|
---|
1414 |
|
---|
1415 | @classmethod
|
---|
1416 | def _putter(cls, address, authkey):
|
---|
1417 | manager = QueueManager(
|
---|
1418 | address=address, authkey=authkey, serializer=SERIALIZER)
|
---|
1419 | manager.connect()
|
---|
1420 | queue = manager.get_queue()
|
---|
1421 | queue.put('hello world')
|
---|
1422 |
|
---|
1423 | def test_rapid_restart(self):
|
---|
1424 | authkey = os.urandom(32)
|
---|
1425 | manager = QueueManager(
|
---|
1426 | address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
|
---|
1427 | srvr = manager.get_server()
|
---|
1428 | addr = srvr.address
|
---|
1429 | # Close the connection.Listener socket which gets opened as a part
|
---|
1430 | # of manager.get_server(). It's not needed for the test.
|
---|
1431 | srvr.listener.close()
|
---|
1432 | manager.start()
|
---|
1433 |
|
---|
1434 | p = self.Process(target=self._putter, args=(manager.address, authkey))
|
---|
1435 | p.daemon = True
|
---|
1436 | p.start()
|
---|
1437 | queue = manager.get_queue()
|
---|
1438 | self.assertEqual(queue.get(), 'hello world')
|
---|
1439 | del queue
|
---|
1440 | manager.shutdown()
|
---|
1441 | manager = QueueManager(
|
---|
1442 | address=addr, authkey=authkey, serializer=SERIALIZER)
|
---|
1443 | manager.start()
|
---|
1444 | manager.shutdown()
|
---|
1445 |
|
---|
1446 | #
|
---|
1447 | #
|
---|
1448 | #
|
---|
1449 |
|
---|
1450 | SENTINEL = latin('')
|
---|
1451 |
|
---|
1452 | class _TestConnection(BaseTestCase):
|
---|
1453 |
|
---|
1454 | ALLOWED_TYPES = ('processes', 'threads')
|
---|
1455 |
|
---|
1456 | @classmethod
|
---|
1457 | def _echo(cls, conn):
|
---|
1458 | for msg in iter(conn.recv_bytes, SENTINEL):
|
---|
1459 | conn.send_bytes(msg)
|
---|
1460 | conn.close()
|
---|
1461 |
|
---|
1462 | def test_connection(self):
|
---|
1463 | conn, child_conn = self.Pipe()
|
---|
1464 |
|
---|
1465 | p = self.Process(target=self._echo, args=(child_conn,))
|
---|
1466 | p.daemon = True
|
---|
1467 | p.start()
|
---|
1468 |
|
---|
1469 | seq = [1, 2.25, None]
|
---|
1470 | msg = latin('hello world')
|
---|
1471 | longmsg = msg * 10
|
---|
1472 | arr = array.array('i', range(4))
|
---|
1473 |
|
---|
1474 | if self.TYPE == 'processes':
|
---|
1475 | self.assertEqual(type(conn.fileno()), int)
|
---|
1476 |
|
---|
1477 | self.assertEqual(conn.send(seq), None)
|
---|
1478 | self.assertEqual(conn.recv(), seq)
|
---|
1479 |
|
---|
1480 | self.assertEqual(conn.send_bytes(msg), None)
|
---|
1481 | self.assertEqual(conn.recv_bytes(), msg)
|
---|
1482 |
|
---|
1483 | if self.TYPE == 'processes':
|
---|
1484 | buffer = array.array('i', [0]*10)
|
---|
1485 | expected = list(arr) + [0] * (10 - len(arr))
|
---|
1486 | self.assertEqual(conn.send_bytes(arr), None)
|
---|
1487 | self.assertEqual(conn.recv_bytes_into(buffer),
|
---|
1488 | len(arr) * buffer.itemsize)
|
---|
1489 | self.assertEqual(list(buffer), expected)
|
---|
1490 |
|
---|
1491 | buffer = array.array('i', [0]*10)
|
---|
1492 | expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
|
---|
1493 | self.assertEqual(conn.send_bytes(arr), None)
|
---|
1494 | self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
|
---|
1495 | len(arr) * buffer.itemsize)
|
---|
1496 | self.assertEqual(list(buffer), expected)
|
---|
1497 |
|
---|
1498 | buffer = bytearray(latin(' ' * 40))
|
---|
1499 | self.assertEqual(conn.send_bytes(longmsg), None)
|
---|
1500 | try:
|
---|
1501 | res = conn.recv_bytes_into(buffer)
|
---|
1502 | except multiprocessing.BufferTooShort, e:
|
---|
1503 | self.assertEqual(e.args, (longmsg,))
|
---|
1504 | else:
|
---|
1505 | self.fail('expected BufferTooShort, got %s' % res)
|
---|
1506 |
|
---|
1507 | poll = TimingWrapper(conn.poll)
|
---|
1508 |
|
---|
1509 | self.assertEqual(poll(), False)
|
---|
1510 | self.assertTimingAlmostEqual(poll.elapsed, 0)
|
---|
1511 |
|
---|
1512 | self.assertEqual(poll(TIMEOUT1), False)
|
---|
1513 | self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
|
---|
1514 |
|
---|
1515 | conn.send(None)
|
---|
1516 | time.sleep(.1)
|
---|
1517 |
|
---|
1518 | self.assertEqual(poll(TIMEOUT1), True)
|
---|
1519 | self.assertTimingAlmostEqual(poll.elapsed, 0)
|
---|
1520 |
|
---|
1521 | self.assertEqual(conn.recv(), None)
|
---|
1522 |
|
---|
1523 | really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
|
---|
1524 | conn.send_bytes(really_big_msg)
|
---|
1525 | self.assertEqual(conn.recv_bytes(), really_big_msg)
|
---|
1526 |
|
---|
1527 | conn.send_bytes(SENTINEL) # tell child to quit
|
---|
1528 | child_conn.close()
|
---|
1529 |
|
---|
1530 | if self.TYPE == 'processes':
|
---|
1531 | self.assertEqual(conn.readable, True)
|
---|
1532 | self.assertEqual(conn.writable, True)
|
---|
1533 | self.assertRaises(EOFError, conn.recv)
|
---|
1534 | self.assertRaises(EOFError, conn.recv_bytes)
|
---|
1535 |
|
---|
1536 | p.join()
|
---|
1537 |
|
---|
1538 | def test_duplex_false(self):
|
---|
1539 | reader, writer = self.Pipe(duplex=False)
|
---|
1540 | self.assertEqual(writer.send(1), None)
|
---|
1541 | self.assertEqual(reader.recv(), 1)
|
---|
1542 | if self.TYPE == 'processes':
|
---|
1543 | self.assertEqual(reader.readable, True)
|
---|
1544 | self.assertEqual(reader.writable, False)
|
---|
1545 | self.assertEqual(writer.readable, False)
|
---|
1546 | self.assertEqual(writer.writable, True)
|
---|
1547 | self.assertRaises(IOError, reader.send, 2)
|
---|
1548 | self.assertRaises(IOError, writer.recv)
|
---|
1549 | self.assertRaises(IOError, writer.poll)
|
---|
1550 |
|
---|
1551 | def test_spawn_close(self):
|
---|
1552 | # We test that a pipe connection can be closed by parent
|
---|
1553 | # process immediately after child is spawned. On Windows this
|
---|
1554 | # would have sometimes failed on old versions because
|
---|
1555 | # child_conn would be closed before the child got a chance to
|
---|
1556 | # duplicate it.
|
---|
1557 | conn, child_conn = self.Pipe()
|
---|
1558 |
|
---|
1559 | p = self.Process(target=self._echo, args=(child_conn,))
|
---|
1560 | p.daemon = True
|
---|
1561 | p.start()
|
---|
1562 | child_conn.close() # this might complete before child initializes
|
---|
1563 |
|
---|
1564 | msg = latin('hello')
|
---|
1565 | conn.send_bytes(msg)
|
---|
1566 | self.assertEqual(conn.recv_bytes(), msg)
|
---|
1567 |
|
---|
1568 | conn.send_bytes(SENTINEL)
|
---|
1569 | conn.close()
|
---|
1570 | p.join()
|
---|
1571 |
|
---|
1572 | def test_sendbytes(self):
|
---|
1573 | if self.TYPE != 'processes':
|
---|
1574 | return
|
---|
1575 |
|
---|
1576 | msg = latin('abcdefghijklmnopqrstuvwxyz')
|
---|
1577 | a, b = self.Pipe()
|
---|
1578 |
|
---|
1579 | a.send_bytes(msg)
|
---|
1580 | self.assertEqual(b.recv_bytes(), msg)
|
---|
1581 |
|
---|
1582 | a.send_bytes(msg, 5)
|
---|
1583 | self.assertEqual(b.recv_bytes(), msg[5:])
|
---|
1584 |
|
---|
1585 | a.send_bytes(msg, 7, 8)
|
---|
1586 | self.assertEqual(b.recv_bytes(), msg[7:7+8])
|
---|
1587 |
|
---|
1588 | a.send_bytes(msg, 26)
|
---|
1589 | self.assertEqual(b.recv_bytes(), latin(''))
|
---|
1590 |
|
---|
1591 | a.send_bytes(msg, 26, 0)
|
---|
1592 | self.assertEqual(b.recv_bytes(), latin(''))
|
---|
1593 |
|
---|
1594 | self.assertRaises(ValueError, a.send_bytes, msg, 27)
|
---|
1595 |
|
---|
1596 | self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
|
---|
1597 |
|
---|
1598 | self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
|
---|
1599 |
|
---|
1600 | self.assertRaises(ValueError, a.send_bytes, msg, -1)
|
---|
1601 |
|
---|
1602 | self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
|
---|
1603 |
|
---|
1604 | @classmethod
|
---|
1605 | def _is_fd_assigned(cls, fd):
|
---|
1606 | try:
|
---|
1607 | os.fstat(fd)
|
---|
1608 | except OSError as e:
|
---|
1609 | if e.errno == errno.EBADF:
|
---|
1610 | return False
|
---|
1611 | raise
|
---|
1612 | else:
|
---|
1613 | return True
|
---|
1614 |
|
---|
1615 | @classmethod
|
---|
1616 | def _writefd(cls, conn, data, create_dummy_fds=False):
|
---|
1617 | if create_dummy_fds:
|
---|
1618 | for i in range(0, 256):
|
---|
1619 | if not cls._is_fd_assigned(i):
|
---|
1620 | os.dup2(conn.fileno(), i)
|
---|
1621 | fd = reduction.recv_handle(conn)
|
---|
1622 | if msvcrt:
|
---|
1623 | fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
|
---|
1624 | os.write(fd, data)
|
---|
1625 | os.close(fd)
|
---|
1626 |
|
---|
1627 | @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
|
---|
1628 | def test_fd_transfer(self):
|
---|
1629 | if self.TYPE != 'processes':
|
---|
1630 | self.skipTest("only makes sense with processes")
|
---|
1631 | conn, child_conn = self.Pipe(duplex=True)
|
---|
1632 |
|
---|
1633 | p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
|
---|
1634 | p.daemon = True
|
---|
1635 | p.start()
|
---|
1636 | with open(test_support.TESTFN, "wb") as f:
|
---|
1637 | fd = f.fileno()
|
---|
1638 | if msvcrt:
|
---|
1639 | fd = msvcrt.get_osfhandle(fd)
|
---|
1640 | reduction.send_handle(conn, fd, p.pid)
|
---|
1641 | p.join()
|
---|
1642 | with open(test_support.TESTFN, "rb") as f:
|
---|
1643 | self.assertEqual(f.read(), b"foo")
|
---|
1644 |
|
---|
1645 | @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
|
---|
1646 | @unittest.skipIf(sys.platform == "win32",
|
---|
1647 | "test semantics don't make sense on Windows")
|
---|
1648 | @unittest.skipIf(MAXFD <= 256,
|
---|
1649 | "largest assignable fd number is too small")
|
---|
1650 | @unittest.skipUnless(hasattr(os, "dup2"),
|
---|
1651 | "test needs os.dup2()")
|
---|
1652 | def test_large_fd_transfer(self):
|
---|
1653 | # With fd > 256 (issue #11657)
|
---|
1654 | if self.TYPE != 'processes':
|
---|
1655 | self.skipTest("only makes sense with processes")
|
---|
1656 | conn, child_conn = self.Pipe(duplex=True)
|
---|
1657 |
|
---|
1658 | p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
|
---|
1659 | p.daemon = True
|
---|
1660 | p.start()
|
---|
1661 | with open(test_support.TESTFN, "wb") as f:
|
---|
1662 | fd = f.fileno()
|
---|
1663 | for newfd in range(256, MAXFD):
|
---|
1664 | if not self._is_fd_assigned(newfd):
|
---|
1665 | break
|
---|
1666 | else:
|
---|
1667 | self.fail("could not find an unassigned large file descriptor")
|
---|
1668 | os.dup2(fd, newfd)
|
---|
1669 | try:
|
---|
1670 | reduction.send_handle(conn, newfd, p.pid)
|
---|
1671 | finally:
|
---|
1672 | os.close(newfd)
|
---|
1673 | p.join()
|
---|
1674 | with open(test_support.TESTFN, "rb") as f:
|
---|
1675 | self.assertEqual(f.read(), b"bar")
|
---|
1676 |
|
---|
1677 | @classmethod
|
---|
1678 | def _send_data_without_fd(self, conn):
|
---|
1679 | os.write(conn.fileno(), b"\0")
|
---|
1680 |
|
---|
1681 | @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
|
---|
1682 | @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
|
---|
1683 | def test_missing_fd_transfer(self):
|
---|
1684 | # Check that exception is raised when received data is not
|
---|
1685 | # accompanied by a file descriptor in ancillary data.
|
---|
1686 | if self.TYPE != 'processes':
|
---|
1687 | self.skipTest("only makes sense with processes")
|
---|
1688 | conn, child_conn = self.Pipe(duplex=True)
|
---|
1689 |
|
---|
1690 | p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
|
---|
1691 | p.daemon = True
|
---|
1692 | p.start()
|
---|
1693 | self.assertRaises(RuntimeError, reduction.recv_handle, conn)
|
---|
1694 | p.join()
|
---|
1695 |
|
---|
1696 | class _TestListenerClient(BaseTestCase):
|
---|
1697 |
|
---|
1698 | ALLOWED_TYPES = ('processes', 'threads')
|
---|
1699 |
|
---|
1700 | @classmethod
|
---|
1701 | def _test(cls, address):
|
---|
1702 | conn = cls.connection.Client(address)
|
---|
1703 | conn.send('hello')
|
---|
1704 | conn.close()
|
---|
1705 |
|
---|
1706 | def test_listener_client(self):
|
---|
1707 | for family in self.connection.families:
|
---|
1708 | l = self.connection.Listener(family=family)
|
---|
1709 | p = self.Process(target=self._test, args=(l.address,))
|
---|
1710 | p.daemon = True
|
---|
1711 | p.start()
|
---|
1712 | conn = l.accept()
|
---|
1713 | self.assertEqual(conn.recv(), 'hello')
|
---|
1714 | p.join()
|
---|
1715 | l.close()
|
---|
1716 |
|
---|
1717 | def test_issue14725(self):
|
---|
1718 | l = self.connection.Listener()
|
---|
1719 | p = self.Process(target=self._test, args=(l.address,))
|
---|
1720 | p.daemon = True
|
---|
1721 | p.start()
|
---|
1722 | time.sleep(1)
|
---|
1723 | # On Windows the client process should by now have connected,
|
---|
1724 | # written data and closed the pipe handle by now. This causes
|
---|
1725 | # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
|
---|
1726 | # 14725.
|
---|
1727 | conn = l.accept()
|
---|
1728 | self.assertEqual(conn.recv(), 'hello')
|
---|
1729 | conn.close()
|
---|
1730 | p.join()
|
---|
1731 | l.close()
|
---|
1732 |
|
---|
1733 | #
|
---|
1734 | # Test of sending connection and socket objects between processes
|
---|
1735 | #
|
---|
1736 | """
|
---|
1737 | class _TestPicklingConnections(BaseTestCase):
|
---|
1738 |
|
---|
1739 | ALLOWED_TYPES = ('processes',)
|
---|
1740 |
|
---|
1741 | def _listener(self, conn, families):
|
---|
1742 | for fam in families:
|
---|
1743 | l = self.connection.Listener(family=fam)
|
---|
1744 | conn.send(l.address)
|
---|
1745 | new_conn = l.accept()
|
---|
1746 | conn.send(new_conn)
|
---|
1747 |
|
---|
1748 | if self.TYPE == 'processes':
|
---|
1749 | l = socket.socket()
|
---|
1750 | l.bind(('localhost', 0))
|
---|
1751 | conn.send(l.getsockname())
|
---|
1752 | l.listen(1)
|
---|
1753 | new_conn, addr = l.accept()
|
---|
1754 | conn.send(new_conn)
|
---|
1755 |
|
---|
1756 | conn.recv()
|
---|
1757 |
|
---|
1758 | def _remote(self, conn):
|
---|
1759 | for (address, msg) in iter(conn.recv, None):
|
---|
1760 | client = self.connection.Client(address)
|
---|
1761 | client.send(msg.upper())
|
---|
1762 | client.close()
|
---|
1763 |
|
---|
1764 | if self.TYPE == 'processes':
|
---|
1765 | address, msg = conn.recv()
|
---|
1766 | client = socket.socket()
|
---|
1767 | client.connect(address)
|
---|
1768 | client.sendall(msg.upper())
|
---|
1769 | client.close()
|
---|
1770 |
|
---|
1771 | conn.close()
|
---|
1772 |
|
---|
1773 | def test_pickling(self):
|
---|
1774 | try:
|
---|
1775 | multiprocessing.allow_connection_pickling()
|
---|
1776 | except ImportError:
|
---|
1777 | return
|
---|
1778 |
|
---|
1779 | families = self.connection.families
|
---|
1780 |
|
---|
1781 | lconn, lconn0 = self.Pipe()
|
---|
1782 | lp = self.Process(target=self._listener, args=(lconn0, families))
|
---|
1783 | lp.daemon = True
|
---|
1784 | lp.start()
|
---|
1785 | lconn0.close()
|
---|
1786 |
|
---|
1787 | rconn, rconn0 = self.Pipe()
|
---|
1788 | rp = self.Process(target=self._remote, args=(rconn0,))
|
---|
1789 | rp.daemon = True
|
---|
1790 | rp.start()
|
---|
1791 | rconn0.close()
|
---|
1792 |
|
---|
1793 | for fam in families:
|
---|
1794 | msg = ('This connection uses family %s' % fam).encode('ascii')
|
---|
1795 | address = lconn.recv()
|
---|
1796 | rconn.send((address, msg))
|
---|
1797 | new_conn = lconn.recv()
|
---|
1798 | self.assertEqual(new_conn.recv(), msg.upper())
|
---|
1799 |
|
---|
1800 | rconn.send(None)
|
---|
1801 |
|
---|
1802 | if self.TYPE == 'processes':
|
---|
1803 | msg = latin('This connection uses a normal socket')
|
---|
1804 | address = lconn.recv()
|
---|
1805 | rconn.send((address, msg))
|
---|
1806 | if hasattr(socket, 'fromfd'):
|
---|
1807 | new_conn = lconn.recv()
|
---|
1808 | self.assertEqual(new_conn.recv(100), msg.upper())
|
---|
1809 | else:
|
---|
1810 | # XXX On Windows with Py2.6 need to backport fromfd()
|
---|
1811 | discard = lconn.recv_bytes()
|
---|
1812 |
|
---|
1813 | lconn.send(None)
|
---|
1814 |
|
---|
1815 | rconn.close()
|
---|
1816 | lconn.close()
|
---|
1817 |
|
---|
1818 | lp.join()
|
---|
1819 | rp.join()
|
---|
1820 | """
|
---|
1821 | #
|
---|
1822 | #
|
---|
1823 | #
|
---|
1824 |
|
---|
1825 | class _TestHeap(BaseTestCase):
|
---|
1826 |
|
---|
1827 | ALLOWED_TYPES = ('processes',)
|
---|
1828 |
|
---|
1829 | def test_heap(self):
|
---|
1830 | iterations = 5000
|
---|
1831 | maxblocks = 50
|
---|
1832 | blocks = []
|
---|
1833 |
|
---|
1834 | # create and destroy lots of blocks of different sizes
|
---|
1835 | for i in xrange(iterations):
|
---|
1836 | size = int(random.lognormvariate(0, 1) * 1000)
|
---|
1837 | b = multiprocessing.heap.BufferWrapper(size)
|
---|
1838 | blocks.append(b)
|
---|
1839 | if len(blocks) > maxblocks:
|
---|
1840 | i = random.randrange(maxblocks)
|
---|
1841 | del blocks[i]
|
---|
1842 |
|
---|
1843 | # get the heap object
|
---|
1844 | heap = multiprocessing.heap.BufferWrapper._heap
|
---|
1845 |
|
---|
1846 | # verify the state of the heap
|
---|
1847 | all = []
|
---|
1848 | occupied = 0
|
---|
1849 | heap._lock.acquire()
|
---|
1850 | self.addCleanup(heap._lock.release)
|
---|
1851 | for L in heap._len_to_seq.values():
|
---|
1852 | for arena, start, stop in L:
|
---|
1853 | all.append((heap._arenas.index(arena), start, stop,
|
---|
1854 | stop-start, 'free'))
|
---|
1855 | for arena, start, stop in heap._allocated_blocks:
|
---|
1856 | all.append((heap._arenas.index(arena), start, stop,
|
---|
1857 | stop-start, 'occupied'))
|
---|
1858 | occupied += (stop-start)
|
---|
1859 |
|
---|
1860 | all.sort()
|
---|
1861 |
|
---|
1862 | for i in range(len(all)-1):
|
---|
1863 | (arena, start, stop) = all[i][:3]
|
---|
1864 | (narena, nstart, nstop) = all[i+1][:3]
|
---|
1865 | self.assertTrue((arena != narena and nstart == 0) or
|
---|
1866 | (stop == nstart))
|
---|
1867 |
|
---|
1868 | def test_free_from_gc(self):
|
---|
1869 | # Check that freeing of blocks by the garbage collector doesn't deadlock
|
---|
1870 | # (issue #12352).
|
---|
1871 | # Make sure the GC is enabled, and set lower collection thresholds to
|
---|
1872 | # make collections more frequent (and increase the probability of
|
---|
1873 | # deadlock).
|
---|
1874 | if not gc.isenabled():
|
---|
1875 | gc.enable()
|
---|
1876 | self.addCleanup(gc.disable)
|
---|
1877 | thresholds = gc.get_threshold()
|
---|
1878 | self.addCleanup(gc.set_threshold, *thresholds)
|
---|
1879 | gc.set_threshold(10)
|
---|
1880 |
|
---|
1881 | # perform numerous block allocations, with cyclic references to make
|
---|
1882 | # sure objects are collected asynchronously by the gc
|
---|
1883 | for i in range(5000):
|
---|
1884 | a = multiprocessing.heap.BufferWrapper(1)
|
---|
1885 | b = multiprocessing.heap.BufferWrapper(1)
|
---|
1886 | # circular references
|
---|
1887 | a.buddy = b
|
---|
1888 | b.buddy = a
|
---|
1889 |
|
---|
1890 | #
|
---|
1891 | #
|
---|
1892 | #
|
---|
1893 |
|
---|
1894 | class _Foo(Structure):
|
---|
1895 | _fields_ = [
|
---|
1896 | ('x', c_int),
|
---|
1897 | ('y', c_double)
|
---|
1898 | ]
|
---|
1899 |
|
---|
1900 | class _TestSharedCTypes(BaseTestCase):
|
---|
1901 |
|
---|
1902 | ALLOWED_TYPES = ('processes',)
|
---|
1903 |
|
---|
1904 | def setUp(self):
|
---|
1905 | if not HAS_SHAREDCTYPES:
|
---|
1906 | self.skipTest("requires multiprocessing.sharedctypes")
|
---|
1907 |
|
---|
1908 | @classmethod
|
---|
1909 | def _double(cls, x, y, foo, arr, string):
|
---|
1910 | x.value *= 2
|
---|
1911 | y.value *= 2
|
---|
1912 | foo.x *= 2
|
---|
1913 | foo.y *= 2
|
---|
1914 | string.value *= 2
|
---|
1915 | for i in range(len(arr)):
|
---|
1916 | arr[i] *= 2
|
---|
1917 |
|
---|
1918 | def test_sharedctypes(self, lock=False):
|
---|
1919 | x = Value('i', 7, lock=lock)
|
---|
1920 | y = Value(c_double, 1.0/3.0, lock=lock)
|
---|
1921 | foo = Value(_Foo, 3, 2, lock=lock)
|
---|
1922 | arr = self.Array('d', range(10), lock=lock)
|
---|
1923 | string = self.Array('c', 20, lock=lock)
|
---|
1924 | string.value = latin('hello')
|
---|
1925 |
|
---|
1926 | p = self.Process(target=self._double, args=(x, y, foo, arr, string))
|
---|
1927 | p.daemon = True
|
---|
1928 | p.start()
|
---|
1929 | p.join()
|
---|
1930 |
|
---|
1931 | self.assertEqual(x.value, 14)
|
---|
1932 | self.assertAlmostEqual(y.value, 2.0/3.0)
|
---|
1933 | self.assertEqual(foo.x, 6)
|
---|
1934 | self.assertAlmostEqual(foo.y, 4.0)
|
---|
1935 | for i in range(10):
|
---|
1936 | self.assertAlmostEqual(arr[i], i*2)
|
---|
1937 | self.assertEqual(string.value, latin('hellohello'))
|
---|
1938 |
|
---|
1939 | def test_synchronize(self):
|
---|
1940 | self.test_sharedctypes(lock=True)
|
---|
1941 |
|
---|
1942 | def test_copy(self):
|
---|
1943 | foo = _Foo(2, 5.0)
|
---|
1944 | bar = copy(foo)
|
---|
1945 | foo.x = 0
|
---|
1946 | foo.y = 0
|
---|
1947 | self.assertEqual(bar.x, 2)
|
---|
1948 | self.assertAlmostEqual(bar.y, 5.0)
|
---|
1949 |
|
---|
1950 | #
|
---|
1951 | #
|
---|
1952 | #
|
---|
1953 |
|
---|
1954 | class _TestFinalize(BaseTestCase):
|
---|
1955 |
|
---|
1956 | ALLOWED_TYPES = ('processes',)
|
---|
1957 |
|
---|
1958 | @classmethod
|
---|
1959 | def _test_finalize(cls, conn):
|
---|
1960 | class Foo(object):
|
---|
1961 | pass
|
---|
1962 |
|
---|
1963 | a = Foo()
|
---|
1964 | util.Finalize(a, conn.send, args=('a',))
|
---|
1965 | del a # triggers callback for a
|
---|
1966 |
|
---|
1967 | b = Foo()
|
---|
1968 | close_b = util.Finalize(b, conn.send, args=('b',))
|
---|
1969 | close_b() # triggers callback for b
|
---|
1970 | close_b() # does nothing because callback has already been called
|
---|
1971 | del b # does nothing because callback has already been called
|
---|
1972 |
|
---|
1973 | c = Foo()
|
---|
1974 | util.Finalize(c, conn.send, args=('c',))
|
---|
1975 |
|
---|
1976 | d10 = Foo()
|
---|
1977 | util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
|
---|
1978 |
|
---|
1979 | d01 = Foo()
|
---|
1980 | util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
|
---|
1981 | d02 = Foo()
|
---|
1982 | util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
|
---|
1983 | d03 = Foo()
|
---|
1984 | util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
|
---|
1985 |
|
---|
1986 | util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
|
---|
1987 |
|
---|
1988 | util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
|
---|
1989 |
|
---|
1990 | # call multiprocessing's cleanup function then exit process without
|
---|
1991 | # garbage collecting locals
|
---|
1992 | util._exit_function()
|
---|
1993 | conn.close()
|
---|
1994 | os._exit(0)
|
---|
1995 |
|
---|
1996 | def test_finalize(self):
|
---|
1997 | conn, child_conn = self.Pipe()
|
---|
1998 |
|
---|
1999 | p = self.Process(target=self._test_finalize, args=(child_conn,))
|
---|
2000 | p.daemon = True
|
---|
2001 | p.start()
|
---|
2002 | p.join()
|
---|
2003 |
|
---|
2004 | result = [obj for obj in iter(conn.recv, 'STOP')]
|
---|
2005 | self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
|
---|
2006 |
|
---|
2007 | #
|
---|
2008 | # Test that from ... import * works for each module
|
---|
2009 | #
|
---|
2010 |
|
---|
2011 | class _TestImportStar(BaseTestCase):
|
---|
2012 |
|
---|
2013 | ALLOWED_TYPES = ('processes',)
|
---|
2014 |
|
---|
2015 | def test_import(self):
|
---|
2016 | modules = [
|
---|
2017 | 'multiprocessing', 'multiprocessing.connection',
|
---|
2018 | 'multiprocessing.heap', 'multiprocessing.managers',
|
---|
2019 | 'multiprocessing.pool', 'multiprocessing.process',
|
---|
2020 | 'multiprocessing.synchronize', 'multiprocessing.util'
|
---|
2021 | ]
|
---|
2022 |
|
---|
2023 | if HAS_REDUCTION:
|
---|
2024 | modules.append('multiprocessing.reduction')
|
---|
2025 |
|
---|
2026 | if c_int is not None:
|
---|
2027 | # This module requires _ctypes
|
---|
2028 | modules.append('multiprocessing.sharedctypes')
|
---|
2029 |
|
---|
2030 | for name in modules:
|
---|
2031 | __import__(name)
|
---|
2032 | mod = sys.modules[name]
|
---|
2033 |
|
---|
2034 | for attr in getattr(mod, '__all__', ()):
|
---|
2035 | self.assertTrue(
|
---|
2036 | hasattr(mod, attr),
|
---|
2037 | '%r does not have attribute %r' % (mod, attr)
|
---|
2038 | )
|
---|
2039 |
|
---|
2040 | #
|
---|
2041 | # Quick test that logging works -- does not test logging output
|
---|
2042 | #
|
---|
2043 |
|
---|
2044 | class _TestLogging(BaseTestCase):
|
---|
2045 |
|
---|
2046 | ALLOWED_TYPES = ('processes',)
|
---|
2047 |
|
---|
2048 | def test_enable_logging(self):
|
---|
2049 | logger = multiprocessing.get_logger()
|
---|
2050 | logger.setLevel(util.SUBWARNING)
|
---|
2051 | self.assertTrue(logger is not None)
|
---|
2052 | logger.debug('this will not be printed')
|
---|
2053 | logger.info('nor will this')
|
---|
2054 | logger.setLevel(LOG_LEVEL)
|
---|
2055 |
|
---|
2056 | @classmethod
|
---|
2057 | def _test_level(cls, conn):
|
---|
2058 | logger = multiprocessing.get_logger()
|
---|
2059 | conn.send(logger.getEffectiveLevel())
|
---|
2060 |
|
---|
2061 | def test_level(self):
|
---|
2062 | LEVEL1 = 32
|
---|
2063 | LEVEL2 = 37
|
---|
2064 |
|
---|
2065 | logger = multiprocessing.get_logger()
|
---|
2066 | root_logger = logging.getLogger()
|
---|
2067 | root_level = root_logger.level
|
---|
2068 |
|
---|
2069 | reader, writer = multiprocessing.Pipe(duplex=False)
|
---|
2070 |
|
---|
2071 | logger.setLevel(LEVEL1)
|
---|
2072 | p = self.Process(target=self._test_level, args=(writer,))
|
---|
2073 | p.daemon = True
|
---|
2074 | p.start()
|
---|
2075 | self.assertEqual(LEVEL1, reader.recv())
|
---|
2076 |
|
---|
2077 | logger.setLevel(logging.NOTSET)
|
---|
2078 | root_logger.setLevel(LEVEL2)
|
---|
2079 | p = self.Process(target=self._test_level, args=(writer,))
|
---|
2080 | p.daemon = True
|
---|
2081 | p.start()
|
---|
2082 | self.assertEqual(LEVEL2, reader.recv())
|
---|
2083 |
|
---|
2084 | root_logger.setLevel(root_level)
|
---|
2085 | logger.setLevel(level=LOG_LEVEL)
|
---|
2086 |
|
---|
2087 |
|
---|
2088 | # class _TestLoggingProcessName(BaseTestCase):
|
---|
2089 | #
|
---|
2090 | # def handle(self, record):
|
---|
2091 | # assert record.processName == multiprocessing.current_process().name
|
---|
2092 | # self.__handled = True
|
---|
2093 | #
|
---|
2094 | # def test_logging(self):
|
---|
2095 | # handler = logging.Handler()
|
---|
2096 | # handler.handle = self.handle
|
---|
2097 | # self.__handled = False
|
---|
2098 | # # Bypass getLogger() and side-effects
|
---|
2099 | # logger = logging.getLoggerClass()(
|
---|
2100 | # 'multiprocessing.test.TestLoggingProcessName')
|
---|
2101 | # logger.addHandler(handler)
|
---|
2102 | # logger.propagate = False
|
---|
2103 | #
|
---|
2104 | # logger.warn('foo')
|
---|
2105 | # assert self.__handled
|
---|
2106 |
|
---|
2107 | #
|
---|
2108 | # Check that Process.join() retries if os.waitpid() fails with EINTR
|
---|
2109 | #
|
---|
2110 |
|
---|
2111 | class _TestPollEintr(BaseTestCase):
|
---|
2112 |
|
---|
2113 | ALLOWED_TYPES = ('processes',)
|
---|
2114 |
|
---|
2115 | @classmethod
|
---|
2116 | def _killer(cls, pid):
|
---|
2117 | time.sleep(0.5)
|
---|
2118 | os.kill(pid, signal.SIGUSR1)
|
---|
2119 |
|
---|
2120 | @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
---|
2121 | def test_poll_eintr(self):
|
---|
2122 | got_signal = [False]
|
---|
2123 | def record(*args):
|
---|
2124 | got_signal[0] = True
|
---|
2125 | pid = os.getpid()
|
---|
2126 | oldhandler = signal.signal(signal.SIGUSR1, record)
|
---|
2127 | try:
|
---|
2128 | killer = self.Process(target=self._killer, args=(pid,))
|
---|
2129 | killer.start()
|
---|
2130 | p = self.Process(target=time.sleep, args=(1,))
|
---|
2131 | p.start()
|
---|
2132 | p.join()
|
---|
2133 | self.assertTrue(got_signal[0])
|
---|
2134 | self.assertEqual(p.exitcode, 0)
|
---|
2135 | killer.join()
|
---|
2136 | finally:
|
---|
2137 | signal.signal(signal.SIGUSR1, oldhandler)
|
---|
2138 |
|
---|
2139 | #
|
---|
2140 | # Test to verify handle verification, see issue 3321
|
---|
2141 | #
|
---|
2142 |
|
---|
2143 | class TestInvalidHandle(unittest.TestCase):
|
---|
2144 |
|
---|
2145 | @unittest.skipIf(WIN32, "skipped on Windows")
|
---|
2146 | def test_invalid_handles(self):
|
---|
2147 | conn = _multiprocessing.Connection(44977608)
|
---|
2148 | self.assertRaises(IOError, conn.poll)
|
---|
2149 | self.assertRaises(IOError, _multiprocessing.Connection, -1)
|
---|
2150 |
|
---|
2151 | #
|
---|
2152 | # Functions used to create test cases from the base ones in this module
|
---|
2153 | #
|
---|
2154 |
|
---|
2155 | def get_attributes(Source, names):
|
---|
2156 | d = {}
|
---|
2157 | for name in names:
|
---|
2158 | obj = getattr(Source, name)
|
---|
2159 | if type(obj) == type(get_attributes):
|
---|
2160 | obj = staticmethod(obj)
|
---|
2161 | d[name] = obj
|
---|
2162 | return d
|
---|
2163 |
|
---|
2164 | def create_test_cases(Mixin, type):
|
---|
2165 | result = {}
|
---|
2166 | glob = globals()
|
---|
2167 | Type = type.capitalize()
|
---|
2168 |
|
---|
2169 | for name in glob.keys():
|
---|
2170 | if name.startswith('_Test'):
|
---|
2171 | base = glob[name]
|
---|
2172 | if type in base.ALLOWED_TYPES:
|
---|
2173 | newname = 'With' + Type + name[1:]
|
---|
2174 | class Temp(base, unittest.TestCase, Mixin):
|
---|
2175 | pass
|
---|
2176 | result[newname] = Temp
|
---|
2177 | Temp.__name__ = newname
|
---|
2178 | Temp.__module__ = Mixin.__module__
|
---|
2179 | return result
|
---|
2180 |
|
---|
2181 | #
|
---|
2182 | # Create test cases
|
---|
2183 | #
|
---|
2184 |
|
---|
2185 | class ProcessesMixin(object):
|
---|
2186 | TYPE = 'processes'
|
---|
2187 | Process = multiprocessing.Process
|
---|
2188 | locals().update(get_attributes(multiprocessing, (
|
---|
2189 | 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
|
---|
2190 | 'Condition', 'Event', 'Value', 'Array', 'RawValue',
|
---|
2191 | 'RawArray', 'current_process', 'active_children', 'Pipe',
|
---|
2192 | 'connection', 'JoinableQueue', 'Pool'
|
---|
2193 | )))
|
---|
2194 |
|
---|
2195 | testcases_processes = create_test_cases(ProcessesMixin, type='processes')
|
---|
2196 | globals().update(testcases_processes)
|
---|
2197 |
|
---|
2198 |
|
---|
2199 | class ManagerMixin(object):
|
---|
2200 | TYPE = 'manager'
|
---|
2201 | Process = multiprocessing.Process
|
---|
2202 | manager = object.__new__(multiprocessing.managers.SyncManager)
|
---|
2203 | locals().update(get_attributes(manager, (
|
---|
2204 | 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
|
---|
2205 | 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
|
---|
2206 | 'Namespace', 'JoinableQueue', 'Pool'
|
---|
2207 | )))
|
---|
2208 |
|
---|
2209 | testcases_manager = create_test_cases(ManagerMixin, type='manager')
|
---|
2210 | globals().update(testcases_manager)
|
---|
2211 |
|
---|
2212 |
|
---|
2213 | class ThreadsMixin(object):
|
---|
2214 | TYPE = 'threads'
|
---|
2215 | Process = multiprocessing.dummy.Process
|
---|
2216 | locals().update(get_attributes(multiprocessing.dummy, (
|
---|
2217 | 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
|
---|
2218 | 'Condition', 'Event', 'Value', 'Array', 'current_process',
|
---|
2219 | 'active_children', 'Pipe', 'connection', 'dict', 'list',
|
---|
2220 | 'Namespace', 'JoinableQueue', 'Pool'
|
---|
2221 | )))
|
---|
2222 |
|
---|
2223 | testcases_threads = create_test_cases(ThreadsMixin, type='threads')
|
---|
2224 | globals().update(testcases_threads)
|
---|
2225 |
|
---|
2226 | class OtherTest(unittest.TestCase):
|
---|
2227 | # TODO: add more tests for deliver/answer challenge.
|
---|
2228 | def test_deliver_challenge_auth_failure(self):
|
---|
2229 | class _FakeConnection(object):
|
---|
2230 | def recv_bytes(self, size):
|
---|
2231 | return b'something bogus'
|
---|
2232 | def send_bytes(self, data):
|
---|
2233 | pass
|
---|
2234 | self.assertRaises(multiprocessing.AuthenticationError,
|
---|
2235 | multiprocessing.connection.deliver_challenge,
|
---|
2236 | _FakeConnection(), b'abc')
|
---|
2237 |
|
---|
2238 | def test_answer_challenge_auth_failure(self):
|
---|
2239 | class _FakeConnection(object):
|
---|
2240 | def __init__(self):
|
---|
2241 | self.count = 0
|
---|
2242 | def recv_bytes(self, size):
|
---|
2243 | self.count += 1
|
---|
2244 | if self.count == 1:
|
---|
2245 | return multiprocessing.connection.CHALLENGE
|
---|
2246 | elif self.count == 2:
|
---|
2247 | return b'something bogus'
|
---|
2248 | return b''
|
---|
2249 | def send_bytes(self, data):
|
---|
2250 | pass
|
---|
2251 | self.assertRaises(multiprocessing.AuthenticationError,
|
---|
2252 | multiprocessing.connection.answer_challenge,
|
---|
2253 | _FakeConnection(), b'abc')
|
---|
2254 |
|
---|
2255 | #
|
---|
2256 | # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
|
---|
2257 | #
|
---|
2258 |
|
---|
2259 | def initializer(ns):
|
---|
2260 | ns.test += 1
|
---|
2261 |
|
---|
2262 | class TestInitializers(unittest.TestCase):
|
---|
2263 | def setUp(self):
|
---|
2264 | self.mgr = multiprocessing.Manager()
|
---|
2265 | self.ns = self.mgr.Namespace()
|
---|
2266 | self.ns.test = 0
|
---|
2267 |
|
---|
2268 | def tearDown(self):
|
---|
2269 | self.mgr.shutdown()
|
---|
2270 |
|
---|
2271 | def test_manager_initializer(self):
|
---|
2272 | m = multiprocessing.managers.SyncManager()
|
---|
2273 | self.assertRaises(TypeError, m.start, 1)
|
---|
2274 | m.start(initializer, (self.ns,))
|
---|
2275 | self.assertEqual(self.ns.test, 1)
|
---|
2276 | m.shutdown()
|
---|
2277 |
|
---|
2278 | def test_pool_initializer(self):
|
---|
2279 | self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
|
---|
2280 | p = multiprocessing.Pool(1, initializer, (self.ns,))
|
---|
2281 | p.close()
|
---|
2282 | p.join()
|
---|
2283 | self.assertEqual(self.ns.test, 1)
|
---|
2284 |
|
---|
2285 | #
|
---|
2286 | # Issue 5155, 5313, 5331: Test process in processes
|
---|
2287 | # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
|
---|
2288 | #
|
---|
2289 |
|
---|
2290 | def _this_sub_process(q):
|
---|
2291 | try:
|
---|
2292 | item = q.get(block=False)
|
---|
2293 | except Queue.Empty:
|
---|
2294 | pass
|
---|
2295 |
|
---|
2296 | def _test_process(q):
|
---|
2297 | queue = multiprocessing.Queue()
|
---|
2298 | subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
|
---|
2299 | subProc.daemon = True
|
---|
2300 | subProc.start()
|
---|
2301 | subProc.join()
|
---|
2302 |
|
---|
2303 | def _afunc(x):
|
---|
2304 | return x*x
|
---|
2305 |
|
---|
2306 | def pool_in_process():
|
---|
2307 | pool = multiprocessing.Pool(processes=4)
|
---|
2308 | x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
|
---|
2309 |
|
---|
2310 | class _file_like(object):
|
---|
2311 | def __init__(self, delegate):
|
---|
2312 | self._delegate = delegate
|
---|
2313 | self._pid = None
|
---|
2314 |
|
---|
2315 | @property
|
---|
2316 | def cache(self):
|
---|
2317 | pid = os.getpid()
|
---|
2318 | # There are no race conditions since fork keeps only the running thread
|
---|
2319 | if pid != self._pid:
|
---|
2320 | self._pid = pid
|
---|
2321 | self._cache = []
|
---|
2322 | return self._cache
|
---|
2323 |
|
---|
2324 | def write(self, data):
|
---|
2325 | self.cache.append(data)
|
---|
2326 |
|
---|
2327 | def flush(self):
|
---|
2328 | self._delegate.write(''.join(self.cache))
|
---|
2329 | self._cache = []
|
---|
2330 |
|
---|
2331 | class TestStdinBadfiledescriptor(unittest.TestCase):
|
---|
2332 |
|
---|
2333 | def test_queue_in_process(self):
|
---|
2334 | queue = multiprocessing.Queue()
|
---|
2335 | proc = multiprocessing.Process(target=_test_process, args=(queue,))
|
---|
2336 | proc.start()
|
---|
2337 | proc.join()
|
---|
2338 |
|
---|
2339 | def test_pool_in_process(self):
|
---|
2340 | p = multiprocessing.Process(target=pool_in_process)
|
---|
2341 | p.start()
|
---|
2342 | p.join()
|
---|
2343 |
|
---|
2344 | def test_flushing(self):
|
---|
2345 | sio = StringIO()
|
---|
2346 | flike = _file_like(sio)
|
---|
2347 | flike.write('foo')
|
---|
2348 | proc = multiprocessing.Process(target=lambda: flike.flush())
|
---|
2349 | flike.flush()
|
---|
2350 | assert sio.getvalue() == 'foo'
|
---|
2351 |
|
---|
2352 | #
|
---|
2353 | # Test interaction with socket timeouts - see Issue #6056
|
---|
2354 | #
|
---|
2355 |
|
---|
2356 | class TestTimeouts(unittest.TestCase):
|
---|
2357 | @classmethod
|
---|
2358 | def _test_timeout(cls, child, address):
|
---|
2359 | time.sleep(1)
|
---|
2360 | child.send(123)
|
---|
2361 | child.close()
|
---|
2362 | conn = multiprocessing.connection.Client(address)
|
---|
2363 | conn.send(456)
|
---|
2364 | conn.close()
|
---|
2365 |
|
---|
2366 | def test_timeout(self):
|
---|
2367 | old_timeout = socket.getdefaulttimeout()
|
---|
2368 | try:
|
---|
2369 | socket.setdefaulttimeout(0.1)
|
---|
2370 | parent, child = multiprocessing.Pipe(duplex=True)
|
---|
2371 | l = multiprocessing.connection.Listener(family='AF_INET')
|
---|
2372 | p = multiprocessing.Process(target=self._test_timeout,
|
---|
2373 | args=(child, l.address))
|
---|
2374 | p.start()
|
---|
2375 | child.close()
|
---|
2376 | self.assertEqual(parent.recv(), 123)
|
---|
2377 | parent.close()
|
---|
2378 | conn = l.accept()
|
---|
2379 | self.assertEqual(conn.recv(), 456)
|
---|
2380 | conn.close()
|
---|
2381 | l.close()
|
---|
2382 | p.join(10)
|
---|
2383 | finally:
|
---|
2384 | socket.setdefaulttimeout(old_timeout)
|
---|
2385 |
|
---|
2386 | #
|
---|
2387 | # Test what happens with no "if __name__ == '__main__'"
|
---|
2388 | #
|
---|
2389 |
|
---|
2390 | class TestNoForkBomb(unittest.TestCase):
|
---|
2391 | def test_noforkbomb(self):
|
---|
2392 | name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
|
---|
2393 | if WIN32:
|
---|
2394 | rc, out, err = test.script_helper.assert_python_failure(name)
|
---|
2395 | self.assertEqual('', out.decode('ascii'))
|
---|
2396 | self.assertIn('RuntimeError', err.decode('ascii'))
|
---|
2397 | else:
|
---|
2398 | rc, out, err = test.script_helper.assert_python_ok(name)
|
---|
2399 | self.assertEqual('123', out.decode('ascii').rstrip())
|
---|
2400 | self.assertEqual('', err.decode('ascii'))
|
---|
2401 |
|
---|
2402 | #
|
---|
2403 | # Issue 12098: check sys.flags of child matches that for parent
|
---|
2404 | #
|
---|
2405 |
|
---|
2406 | class TestFlags(unittest.TestCase):
|
---|
2407 | @classmethod
|
---|
2408 | def run_in_grandchild(cls, conn):
|
---|
2409 | conn.send(tuple(sys.flags))
|
---|
2410 |
|
---|
2411 | @classmethod
|
---|
2412 | def run_in_child(cls):
|
---|
2413 | import json
|
---|
2414 | r, w = multiprocessing.Pipe(duplex=False)
|
---|
2415 | p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
|
---|
2416 | p.start()
|
---|
2417 | grandchild_flags = r.recv()
|
---|
2418 | p.join()
|
---|
2419 | r.close()
|
---|
2420 | w.close()
|
---|
2421 | flags = (tuple(sys.flags), grandchild_flags)
|
---|
2422 | print(json.dumps(flags))
|
---|
2423 |
|
---|
2424 | def test_flags(self):
|
---|
2425 | import json, subprocess
|
---|
2426 | # start child process using unusual flags
|
---|
2427 | prog = ('from test.test_multiprocessing import TestFlags; ' +
|
---|
2428 | 'TestFlags.run_in_child()')
|
---|
2429 | data = subprocess.check_output(
|
---|
2430 | [sys.executable, '-E', '-B', '-O', '-c', prog])
|
---|
2431 | child_flags, grandchild_flags = json.loads(data.decode('ascii'))
|
---|
2432 | self.assertEqual(child_flags, grandchild_flags)
|
---|
2433 |
|
---|
2434 | #
|
---|
2435 | # Issue #17555: ForkAwareThreadLock
|
---|
2436 | #
|
---|
2437 |
|
---|
2438 | class TestForkAwareThreadLock(unittest.TestCase):
|
---|
2439 | # We recurisvely start processes. Issue #17555 meant that the
|
---|
2440 | # after fork registry would get duplicate entries for the same
|
---|
2441 | # lock. The size of the registry at generation n was ~2**n.
|
---|
2442 |
|
---|
2443 | @classmethod
|
---|
2444 | def child(cls, n, conn):
|
---|
2445 | if n > 1:
|
---|
2446 | p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
|
---|
2447 | p.start()
|
---|
2448 | p.join()
|
---|
2449 | else:
|
---|
2450 | conn.send(len(util._afterfork_registry))
|
---|
2451 | conn.close()
|
---|
2452 |
|
---|
2453 | def test_lock(self):
|
---|
2454 | r, w = multiprocessing.Pipe(False)
|
---|
2455 | l = util.ForkAwareThreadLock()
|
---|
2456 | old_size = len(util._afterfork_registry)
|
---|
2457 | p = multiprocessing.Process(target=self.child, args=(5, w))
|
---|
2458 | p.start()
|
---|
2459 | new_size = r.recv()
|
---|
2460 | p.join()
|
---|
2461 | self.assertLessEqual(new_size, old_size)
|
---|
2462 |
|
---|
2463 | #
|
---|
2464 | # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
|
---|
2465 | #
|
---|
2466 |
|
---|
2467 | class TestIgnoreEINTR(unittest.TestCase):
|
---|
2468 |
|
---|
2469 | @classmethod
|
---|
2470 | def _test_ignore(cls, conn):
|
---|
2471 | def handler(signum, frame):
|
---|
2472 | pass
|
---|
2473 | signal.signal(signal.SIGUSR1, handler)
|
---|
2474 | conn.send('ready')
|
---|
2475 | x = conn.recv()
|
---|
2476 | conn.send(x)
|
---|
2477 | conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
|
---|
2478 |
|
---|
2479 | @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
---|
2480 | def test_ignore(self):
|
---|
2481 | conn, child_conn = multiprocessing.Pipe()
|
---|
2482 | try:
|
---|
2483 | p = multiprocessing.Process(target=self._test_ignore,
|
---|
2484 | args=(child_conn,))
|
---|
2485 | p.daemon = True
|
---|
2486 | p.start()
|
---|
2487 | child_conn.close()
|
---|
2488 | self.assertEqual(conn.recv(), 'ready')
|
---|
2489 | time.sleep(0.1)
|
---|
2490 | os.kill(p.pid, signal.SIGUSR1)
|
---|
2491 | time.sleep(0.1)
|
---|
2492 | conn.send(1234)
|
---|
2493 | self.assertEqual(conn.recv(), 1234)
|
---|
2494 | time.sleep(0.1)
|
---|
2495 | os.kill(p.pid, signal.SIGUSR1)
|
---|
2496 | self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
|
---|
2497 | time.sleep(0.1)
|
---|
2498 | p.join()
|
---|
2499 | finally:
|
---|
2500 | conn.close()
|
---|
2501 |
|
---|
2502 | @classmethod
|
---|
2503 | def _test_ignore_listener(cls, conn):
|
---|
2504 | def handler(signum, frame):
|
---|
2505 | pass
|
---|
2506 | signal.signal(signal.SIGUSR1, handler)
|
---|
2507 | l = multiprocessing.connection.Listener()
|
---|
2508 | conn.send(l.address)
|
---|
2509 | a = l.accept()
|
---|
2510 | a.send('welcome')
|
---|
2511 |
|
---|
2512 | @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
---|
2513 | def test_ignore_listener(self):
|
---|
2514 | conn, child_conn = multiprocessing.Pipe()
|
---|
2515 | try:
|
---|
2516 | p = multiprocessing.Process(target=self._test_ignore_listener,
|
---|
2517 | args=(child_conn,))
|
---|
2518 | p.daemon = True
|
---|
2519 | p.start()
|
---|
2520 | child_conn.close()
|
---|
2521 | address = conn.recv()
|
---|
2522 | time.sleep(0.1)
|
---|
2523 | os.kill(p.pid, signal.SIGUSR1)
|
---|
2524 | time.sleep(0.1)
|
---|
2525 | client = multiprocessing.connection.Client(address)
|
---|
2526 | self.assertEqual(client.recv(), 'welcome')
|
---|
2527 | p.join()
|
---|
2528 | finally:
|
---|
2529 | conn.close()
|
---|
2530 |
|
---|
2531 | #
|
---|
2532 | #
|
---|
2533 | #
|
---|
2534 |
|
---|
2535 | testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
|
---|
2536 | TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
|
---|
2537 | TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
|
---|
2538 |
|
---|
2539 | #
|
---|
2540 | #
|
---|
2541 | #
|
---|
2542 |
|
---|
2543 | def test_main(run=None):
|
---|
2544 | if sys.platform.startswith("linux"):
|
---|
2545 | try:
|
---|
2546 | lock = multiprocessing.RLock()
|
---|
2547 | except OSError:
|
---|
2548 | raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
|
---|
2549 |
|
---|
2550 | check_enough_semaphores()
|
---|
2551 |
|
---|
2552 | if run is None:
|
---|
2553 | from test.test_support import run_unittest as run
|
---|
2554 |
|
---|
2555 | util.get_temp_dir() # creates temp directory for use by all processes
|
---|
2556 |
|
---|
2557 | multiprocessing.get_logger().setLevel(LOG_LEVEL)
|
---|
2558 |
|
---|
2559 | ProcessesMixin.pool = multiprocessing.Pool(4)
|
---|
2560 | ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
|
---|
2561 | ManagerMixin.manager.__init__()
|
---|
2562 | ManagerMixin.manager.start()
|
---|
2563 | ManagerMixin.pool = ManagerMixin.manager.Pool(4)
|
---|
2564 |
|
---|
2565 | testcases = (
|
---|
2566 | sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
|
---|
2567 | sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
|
---|
2568 | sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
|
---|
2569 | testcases_other
|
---|
2570 | )
|
---|
2571 |
|
---|
2572 | loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
|
---|
2573 | suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
|
---|
2574 | # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
|
---|
2575 | # module during these tests is at least platform dependent and possibly
|
---|
2576 | # non-deterministic on any given platform. So we don't mind if the listed
|
---|
2577 | # warnings aren't actually raised.
|
---|
2578 | with test_support.check_py3k_warnings(
|
---|
2579 | (".+__(get|set)slice__ has been removed", DeprecationWarning),
|
---|
2580 | (r"sys.exc_clear\(\) not supported", DeprecationWarning),
|
---|
2581 | quiet=True):
|
---|
2582 | run(suite)
|
---|
2583 |
|
---|
2584 | ThreadsMixin.pool.terminate()
|
---|
2585 | ProcessesMixin.pool.terminate()
|
---|
2586 | ManagerMixin.pool.terminate()
|
---|
2587 | ManagerMixin.manager.shutdown()
|
---|
2588 |
|
---|
2589 | del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
|
---|
2590 |
|
---|
2591 | def main():
|
---|
2592 | test_main(unittest.TextTestRunner(verbosity=2).run)
|
---|
2593 |
|
---|
2594 | if __name__ == '__main__':
|
---|
2595 | main()
|
---|