source: python/trunk/Lib/test/test_multiprocessing.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 76.8 KB
Line 
1#!/usr/bin/env python
2
3#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
15import socket
16import random
17import logging
18import errno
19import test.script_helper
20from test import test_support
21from StringIO import StringIO
22_multiprocessing = test_support.import_module('_multiprocessing')
23# import threading after _multiprocessing to raise a more relevant error
24# message: "No module named _multiprocessing". _multiprocessing is not compiled
25# without thread support.
26import threading
27
28# Work around broken sem_open implementations
29test_support.import_module('multiprocessing.synchronize')
30
31import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
35import multiprocessing.pool
36
37from multiprocessing import util
38
39try:
40 from multiprocessing import reduction
41 HAS_REDUCTION = True
42except ImportError:
43 HAS_REDUCTION = False
44
45try:
46 from multiprocessing.sharedctypes import Value, copy
47 HAS_SHAREDCTYPES = True
48except ImportError:
49 HAS_SHAREDCTYPES = False
50
51try:
52 import msvcrt
53except ImportError:
54 msvcrt = None
55
56#
57#
58#
59
60latin = str
61
62#
63# Constants
64#
65
66LOG_LEVEL = util.SUBWARNING
67#LOG_LEVEL = logging.DEBUG
68
69DELTA = 0.1
70CHECK_TIMINGS = False # making true makes tests take a lot longer
71 # and can sometimes cause some non-serious
72 # failures because some calls block a bit
73 # longer than expected
74if CHECK_TIMINGS:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
76else:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
78
79HAVE_GETVALUE = not getattr(_multiprocessing,
80 'HAVE_BROKEN_SEM_GETVALUE', False)
81
82WIN32 = (sys.platform == "win32")
83
84try:
85 MAXFD = os.sysconf("SC_OPEN_MAX")
86except:
87 MAXFD = 256
88
89#
90# Some tests require ctypes
91#
92
93try:
94 from ctypes import Structure, c_int, c_double
95except ImportError:
96 Structure = object
97 c_int = c_double = None
98
99
100def check_enough_semaphores():
101 """Check that the system supports enough semaphores to run the test."""
102 # minimum number of semaphores available according to POSIX
103 nsems_min = 256
104 try:
105 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
106 except (AttributeError, ValueError):
107 # sysconf not available or setting not available
108 return
109 if nsems == -1 or nsems >= nsems_min:
110 return
111 raise unittest.SkipTest("The OS doesn't support enough semaphores "
112 "to run the test (required: %d)." % nsems_min)
113
114
115#
116# Creates a wrapper for a function which records the time it takes to finish
117#
118
119class TimingWrapper(object):
120
121 def __init__(self, func):
122 self.func = func
123 self.elapsed = None
124
125 def __call__(self, *args, **kwds):
126 t = time.time()
127 try:
128 return self.func(*args, **kwds)
129 finally:
130 self.elapsed = time.time() - t
131
132#
133# Base class for test cases
134#
135
136class BaseTestCase(object):
137
138 ALLOWED_TYPES = ('processes', 'manager', 'threads')
139
140 def assertTimingAlmostEqual(self, a, b):
141 if CHECK_TIMINGS:
142 self.assertAlmostEqual(a, b, 1)
143
144 def assertReturnsIfImplemented(self, value, func, *args):
145 try:
146 res = func(*args)
147 except NotImplementedError:
148 pass
149 else:
150 return self.assertEqual(value, res)
151
152 # For the sanity of Windows users, rather than crashing or freezing in
153 # multiple ways.
154 def __reduce__(self, *args):
155 raise NotImplementedError("shouldn't try to pickle a test case")
156
157 __reduce_ex__ = __reduce__
158
159#
160# Return the value of a semaphore
161#
162
163def get_value(self):
164 try:
165 return self.get_value()
166 except AttributeError:
167 try:
168 return self._Semaphore__value
169 except AttributeError:
170 try:
171 return self._value
172 except AttributeError:
173 raise NotImplementedError
174
175#
176# Testcases
177#
178
179class _TestProcess(BaseTestCase):
180
181 ALLOWED_TYPES = ('processes', 'threads')
182
183 def test_current(self):
184 if self.TYPE == 'threads':
185 return
186
187 current = self.current_process()
188 authkey = current.authkey
189
190 self.assertTrue(current.is_alive())
191 self.assertTrue(not current.daemon)
192 self.assertIsInstance(authkey, bytes)
193 self.assertTrue(len(authkey) > 0)
194 self.assertEqual(current.ident, os.getpid())
195 self.assertEqual(current.exitcode, None)
196
197 @classmethod
198 def _test(cls, q, *args, **kwds):
199 current = cls.current_process()
200 q.put(args)
201 q.put(kwds)
202 q.put(current.name)
203 if cls.TYPE != 'threads':
204 q.put(bytes(current.authkey))
205 q.put(current.pid)
206
207 def test_process(self):
208 q = self.Queue(1)
209 e = self.Event()
210 args = (q, 1, 2)
211 kwargs = {'hello':23, 'bye':2.54}
212 name = 'SomeProcess'
213 p = self.Process(
214 target=self._test, args=args, kwargs=kwargs, name=name
215 )
216 p.daemon = True
217 current = self.current_process()
218
219 if self.TYPE != 'threads':
220 self.assertEqual(p.authkey, current.authkey)
221 self.assertEqual(p.is_alive(), False)
222 self.assertEqual(p.daemon, True)
223 self.assertNotIn(p, self.active_children())
224 self.assertTrue(type(self.active_children()) is list)
225 self.assertEqual(p.exitcode, None)
226
227 p.start()
228
229 self.assertEqual(p.exitcode, None)
230 self.assertEqual(p.is_alive(), True)
231 self.assertIn(p, self.active_children())
232
233 self.assertEqual(q.get(), args[1:])
234 self.assertEqual(q.get(), kwargs)
235 self.assertEqual(q.get(), p.name)
236 if self.TYPE != 'threads':
237 self.assertEqual(q.get(), current.authkey)
238 self.assertEqual(q.get(), p.pid)
239
240 p.join()
241
242 self.assertEqual(p.exitcode, 0)
243 self.assertEqual(p.is_alive(), False)
244 self.assertNotIn(p, self.active_children())
245
246 @classmethod
247 def _test_terminate(cls):
248 time.sleep(1000)
249
250 def test_terminate(self):
251 if self.TYPE == 'threads':
252 return
253
254 p = self.Process(target=self._test_terminate)
255 p.daemon = True
256 p.start()
257
258 self.assertEqual(p.is_alive(), True)
259 self.assertIn(p, self.active_children())
260 self.assertEqual(p.exitcode, None)
261
262 p.terminate()
263
264 join = TimingWrapper(p.join)
265 self.assertEqual(join(), None)
266 self.assertTimingAlmostEqual(join.elapsed, 0.0)
267
268 self.assertEqual(p.is_alive(), False)
269 self.assertNotIn(p, self.active_children())
270
271 p.join()
272
273 # XXX sometimes get p.exitcode == 0 on Windows ...
274 #self.assertEqual(p.exitcode, -signal.SIGTERM)
275
276 def test_cpu_count(self):
277 try:
278 cpus = multiprocessing.cpu_count()
279 except NotImplementedError:
280 cpus = 1
281 self.assertTrue(type(cpus) is int)
282 self.assertTrue(cpus >= 1)
283
284 def test_active_children(self):
285 self.assertEqual(type(self.active_children()), list)
286
287 p = self.Process(target=time.sleep, args=(DELTA,))
288 self.assertNotIn(p, self.active_children())
289
290 p.daemon = True
291 p.start()
292 self.assertIn(p, self.active_children())
293
294 p.join()
295 self.assertNotIn(p, self.active_children())
296
297 @classmethod
298 def _test_recursion(cls, wconn, id):
299 from multiprocessing import forking
300 wconn.send(id)
301 if len(id) < 2:
302 for i in range(2):
303 p = cls.Process(
304 target=cls._test_recursion, args=(wconn, id+[i])
305 )
306 p.start()
307 p.join()
308
309 def test_recursion(self):
310 rconn, wconn = self.Pipe(duplex=False)
311 self._test_recursion(wconn, [])
312
313 time.sleep(DELTA)
314 result = []
315 while rconn.poll():
316 result.append(rconn.recv())
317
318 expected = [
319 [],
320 [0],
321 [0, 0],
322 [0, 1],
323 [1],
324 [1, 0],
325 [1, 1]
326 ]
327 self.assertEqual(result, expected)
328
329 @classmethod
330 def _test_sys_exit(cls, reason, testfn):
331 sys.stderr = open(testfn, 'w')
332 sys.exit(reason)
333
334 def test_sys_exit(self):
335 # See Issue 13854
336 if self.TYPE == 'threads':
337 return
338
339 testfn = test_support.TESTFN
340 self.addCleanup(test_support.unlink, testfn)
341
342 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
344 p.daemon = True
345 p.start()
346 p.join(5)
347 self.assertEqual(p.exitcode, code)
348
349 with open(testfn, 'r') as f:
350 self.assertEqual(f.read().rstrip(), str(reason))
351
352 for reason in (True, False, 8):
353 p = self.Process(target=sys.exit, args=(reason,))
354 p.daemon = True
355 p.start()
356 p.join(5)
357 self.assertEqual(p.exitcode, reason)
358
359#
360#
361#
362
363class _UpperCaser(multiprocessing.Process):
364
365 def __init__(self):
366 multiprocessing.Process.__init__(self)
367 self.child_conn, self.parent_conn = multiprocessing.Pipe()
368
369 def run(self):
370 self.parent_conn.close()
371 for s in iter(self.child_conn.recv, None):
372 self.child_conn.send(s.upper())
373 self.child_conn.close()
374
375 def submit(self, s):
376 assert type(s) is str
377 self.parent_conn.send(s)
378 return self.parent_conn.recv()
379
380 def stop(self):
381 self.parent_conn.send(None)
382 self.parent_conn.close()
383 self.child_conn.close()
384
385class _TestSubclassingProcess(BaseTestCase):
386
387 ALLOWED_TYPES = ('processes',)
388
389 def test_subclassing(self):
390 uppercaser = _UpperCaser()
391 uppercaser.daemon = True
392 uppercaser.start()
393 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
394 self.assertEqual(uppercaser.submit('world'), 'WORLD')
395 uppercaser.stop()
396 uppercaser.join()
397
398#
399#
400#
401
402def queue_empty(q):
403 if hasattr(q, 'empty'):
404 return q.empty()
405 else:
406 return q.qsize() == 0
407
408def queue_full(q, maxsize):
409 if hasattr(q, 'full'):
410 return q.full()
411 else:
412 return q.qsize() == maxsize
413
414
415class _TestQueue(BaseTestCase):
416
417
418 @classmethod
419 def _test_put(cls, queue, child_can_start, parent_can_continue):
420 child_can_start.wait()
421 for i in range(6):
422 queue.get()
423 parent_can_continue.set()
424
425 def test_put(self):
426 MAXSIZE = 6
427 queue = self.Queue(maxsize=MAXSIZE)
428 child_can_start = self.Event()
429 parent_can_continue = self.Event()
430
431 proc = self.Process(
432 target=self._test_put,
433 args=(queue, child_can_start, parent_can_continue)
434 )
435 proc.daemon = True
436 proc.start()
437
438 self.assertEqual(queue_empty(queue), True)
439 self.assertEqual(queue_full(queue, MAXSIZE), False)
440
441 queue.put(1)
442 queue.put(2, True)
443 queue.put(3, True, None)
444 queue.put(4, False)
445 queue.put(5, False, None)
446 queue.put_nowait(6)
447
448 # the values may be in buffer but not yet in pipe so sleep a bit
449 time.sleep(DELTA)
450
451 self.assertEqual(queue_empty(queue), False)
452 self.assertEqual(queue_full(queue, MAXSIZE), True)
453
454 put = TimingWrapper(queue.put)
455 put_nowait = TimingWrapper(queue.put_nowait)
456
457 self.assertRaises(Queue.Full, put, 7, False)
458 self.assertTimingAlmostEqual(put.elapsed, 0)
459
460 self.assertRaises(Queue.Full, put, 7, False, None)
461 self.assertTimingAlmostEqual(put.elapsed, 0)
462
463 self.assertRaises(Queue.Full, put_nowait, 7)
464 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
465
466 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
467 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
468
469 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
470 self.assertTimingAlmostEqual(put.elapsed, 0)
471
472 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
473 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
474
475 child_can_start.set()
476 parent_can_continue.wait()
477
478 self.assertEqual(queue_empty(queue), True)
479 self.assertEqual(queue_full(queue, MAXSIZE), False)
480
481 proc.join()
482
483 @classmethod
484 def _test_get(cls, queue, child_can_start, parent_can_continue):
485 child_can_start.wait()
486 #queue.put(1)
487 queue.put(2)
488 queue.put(3)
489 queue.put(4)
490 queue.put(5)
491 parent_can_continue.set()
492
493 def test_get(self):
494 queue = self.Queue()
495 child_can_start = self.Event()
496 parent_can_continue = self.Event()
497
498 proc = self.Process(
499 target=self._test_get,
500 args=(queue, child_can_start, parent_can_continue)
501 )
502 proc.daemon = True
503 proc.start()
504
505 self.assertEqual(queue_empty(queue), True)
506
507 child_can_start.set()
508 parent_can_continue.wait()
509
510 time.sleep(DELTA)
511 self.assertEqual(queue_empty(queue), False)
512
513 # Hangs unexpectedly, remove for now
514 #self.assertEqual(queue.get(), 1)
515 self.assertEqual(queue.get(True, None), 2)
516 self.assertEqual(queue.get(True), 3)
517 self.assertEqual(queue.get(timeout=1), 4)
518 self.assertEqual(queue.get_nowait(), 5)
519
520 self.assertEqual(queue_empty(queue), True)
521
522 get = TimingWrapper(queue.get)
523 get_nowait = TimingWrapper(queue.get_nowait)
524
525 self.assertRaises(Queue.Empty, get, False)
526 self.assertTimingAlmostEqual(get.elapsed, 0)
527
528 self.assertRaises(Queue.Empty, get, False, None)
529 self.assertTimingAlmostEqual(get.elapsed, 0)
530
531 self.assertRaises(Queue.Empty, get_nowait)
532 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
533
534 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
535 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
536
537 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
538 self.assertTimingAlmostEqual(get.elapsed, 0)
539
540 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
541 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
542
543 proc.join()
544
545 @classmethod
546 def _test_fork(cls, queue):
547 for i in range(10, 20):
548 queue.put(i)
549 # note that at this point the items may only be buffered, so the
550 # process cannot shutdown until the feeder thread has finished
551 # pushing items onto the pipe.
552
553 def test_fork(self):
554 # Old versions of Queue would fail to create a new feeder
555 # thread for a forked process if the original process had its
556 # own feeder thread. This test checks that this no longer
557 # happens.
558
559 queue = self.Queue()
560
561 # put items on queue so that main process starts a feeder thread
562 for i in range(10):
563 queue.put(i)
564
565 # wait to make sure thread starts before we fork a new process
566 time.sleep(DELTA)
567
568 # fork process
569 p = self.Process(target=self._test_fork, args=(queue,))
570 p.daemon = True
571 p.start()
572
573 # check that all expected items are in the queue
574 for i in range(20):
575 self.assertEqual(queue.get(), i)
576 self.assertRaises(Queue.Empty, queue.get, False)
577
578 p.join()
579
580 def test_qsize(self):
581 q = self.Queue()
582 try:
583 self.assertEqual(q.qsize(), 0)
584 except NotImplementedError:
585 return
586 q.put(1)
587 self.assertEqual(q.qsize(), 1)
588 q.put(5)
589 self.assertEqual(q.qsize(), 2)
590 q.get()
591 self.assertEqual(q.qsize(), 1)
592 q.get()
593 self.assertEqual(q.qsize(), 0)
594
595 @classmethod
596 def _test_task_done(cls, q):
597 for obj in iter(q.get, None):
598 time.sleep(DELTA)
599 q.task_done()
600
601 def test_task_done(self):
602 queue = self.JoinableQueue()
603
604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
605 self.skipTest("requires 'queue.task_done()' method")
606
607 workers = [self.Process(target=self._test_task_done, args=(queue,))
608 for i in xrange(4)]
609
610 for p in workers:
611 p.daemon = True
612 p.start()
613
614 for i in xrange(10):
615 queue.put(i)
616
617 queue.join()
618
619 for p in workers:
620 queue.put(None)
621
622 for p in workers:
623 p.join()
624
625#
626#
627#
628
629class _TestLock(BaseTestCase):
630
631 def test_lock(self):
632 lock = self.Lock()
633 self.assertEqual(lock.acquire(), True)
634 self.assertEqual(lock.acquire(False), False)
635 self.assertEqual(lock.release(), None)
636 self.assertRaises((ValueError, threading.ThreadError), lock.release)
637
638 def test_rlock(self):
639 lock = self.RLock()
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.acquire(), True)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertEqual(lock.release(), None)
646 self.assertRaises((AssertionError, RuntimeError), lock.release)
647
648 def test_lock_context(self):
649 with self.Lock():
650 pass
651
652
653class _TestSemaphore(BaseTestCase):
654
655 def _test_semaphore(self, sem):
656 self.assertReturnsIfImplemented(2, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(1, get_value, sem)
659 self.assertEqual(sem.acquire(), True)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.acquire(False), False)
662 self.assertReturnsIfImplemented(0, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(1, get_value, sem)
665 self.assertEqual(sem.release(), None)
666 self.assertReturnsIfImplemented(2, get_value, sem)
667
668 def test_semaphore(self):
669 sem = self.Semaphore(2)
670 self._test_semaphore(sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(3, get_value, sem)
673 self.assertEqual(sem.release(), None)
674 self.assertReturnsIfImplemented(4, get_value, sem)
675
676 def test_bounded_semaphore(self):
677 sem = self.BoundedSemaphore(2)
678 self._test_semaphore(sem)
679 # Currently fails on OS/X
680 #if HAVE_GETVALUE:
681 # self.assertRaises(ValueError, sem.release)
682 # self.assertReturnsIfImplemented(2, get_value, sem)
683
684 def test_timeout(self):
685 if self.TYPE != 'processes':
686 return
687
688 sem = self.Semaphore(0)
689 acquire = TimingWrapper(sem.acquire)
690
691 self.assertEqual(acquire(False), False)
692 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
693
694 self.assertEqual(acquire(False, None), False)
695 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
696
697 self.assertEqual(acquire(False, TIMEOUT1), False)
698 self.assertTimingAlmostEqual(acquire.elapsed, 0)
699
700 self.assertEqual(acquire(True, TIMEOUT2), False)
701 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
702
703 self.assertEqual(acquire(timeout=TIMEOUT3), False)
704 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
705
706
707class _TestCondition(BaseTestCase):
708
709 @classmethod
710 def f(cls, cond, sleeping, woken, timeout=None):
711 cond.acquire()
712 sleeping.release()
713 cond.wait(timeout)
714 woken.release()
715 cond.release()
716
717 def check_invariant(self, cond):
718 # this is only supposed to succeed when there are no sleepers
719 if self.TYPE == 'processes':
720 try:
721 sleepers = (cond._sleeping_count.get_value() -
722 cond._woken_count.get_value())
723 self.assertEqual(sleepers, 0)
724 self.assertEqual(cond._wait_semaphore.get_value(), 0)
725 except NotImplementedError:
726 pass
727
728 def test_notify(self):
729 cond = self.Condition()
730 sleeping = self.Semaphore(0)
731 woken = self.Semaphore(0)
732
733 p = self.Process(target=self.f, args=(cond, sleeping, woken))
734 p.daemon = True
735 p.start()
736
737 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
738 p.daemon = True
739 p.start()
740
741 # wait for both children to start sleeping
742 sleeping.acquire()
743 sleeping.acquire()
744
745 # check no process/thread has woken up
746 time.sleep(DELTA)
747 self.assertReturnsIfImplemented(0, get_value, woken)
748
749 # wake up one process/thread
750 cond.acquire()
751 cond.notify()
752 cond.release()
753
754 # check one process/thread has woken up
755 time.sleep(DELTA)
756 self.assertReturnsIfImplemented(1, get_value, woken)
757
758 # wake up another
759 cond.acquire()
760 cond.notify()
761 cond.release()
762
763 # check other has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(2, get_value, woken)
766
767 # check state is not mucked up
768 self.check_invariant(cond)
769 p.join()
770
771 def test_notify_all(self):
772 cond = self.Condition()
773 sleeping = self.Semaphore(0)
774 woken = self.Semaphore(0)
775
776 # start some threads/processes which will timeout
777 for i in range(3):
778 p = self.Process(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
780 p.daemon = True
781 p.start()
782
783 t = threading.Thread(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
785 t.daemon = True
786 t.start()
787
788 # wait for them all to sleep
789 for i in xrange(6):
790 sleeping.acquire()
791
792 # check they have all timed out
793 for i in xrange(6):
794 woken.acquire()
795 self.assertReturnsIfImplemented(0, get_value, woken)
796
797 # check state is not mucked up
798 self.check_invariant(cond)
799
800 # start some more threads/processes
801 for i in range(3):
802 p = self.Process(target=self.f, args=(cond, sleeping, woken))
803 p.daemon = True
804 p.start()
805
806 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
807 t.daemon = True
808 t.start()
809
810 # wait for them to all sleep
811 for i in xrange(6):
812 sleeping.acquire()
813
814 # check no process/thread has woken up
815 time.sleep(DELTA)
816 self.assertReturnsIfImplemented(0, get_value, woken)
817
818 # wake them all up
819 cond.acquire()
820 cond.notify_all()
821 cond.release()
822
823 # check they have all woken
824 time.sleep(DELTA)
825 self.assertReturnsIfImplemented(6, get_value, woken)
826
827 # check state is not mucked up
828 self.check_invariant(cond)
829
830 def test_timeout(self):
831 cond = self.Condition()
832 wait = TimingWrapper(cond.wait)
833 cond.acquire()
834 res = wait(TIMEOUT1)
835 cond.release()
836 self.assertEqual(res, None)
837 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
838
839
840class _TestEvent(BaseTestCase):
841
842 @classmethod
843 def _test_event(cls, event):
844 time.sleep(TIMEOUT2)
845 event.set()
846
847 def test_event(self):
848 event = self.Event()
849 wait = TimingWrapper(event.wait)
850
851 # Removed temporarily, due to API shear, this does not
852 # work with threading._Event objects. is_set == isSet
853 self.assertEqual(event.is_set(), False)
854
855 # Removed, threading.Event.wait() will return the value of the __flag
856 # instead of None. API Shear with the semaphore backed mp.Event
857 self.assertEqual(wait(0.0), False)
858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
859 self.assertEqual(wait(TIMEOUT1), False)
860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
861
862 event.set()
863
864 # See note above on the API differences
865 self.assertEqual(event.is_set(), True)
866 self.assertEqual(wait(), True)
867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
868 self.assertEqual(wait(TIMEOUT1), True)
869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
870 # self.assertEqual(event.is_set(), True)
871
872 event.clear()
873
874 #self.assertEqual(event.is_set(), False)
875
876 p = self.Process(target=self._test_event, args=(event,))
877 p.daemon = True
878 p.start()
879 self.assertEqual(wait(), True)
880
881#
882#
883#
884
885class _TestValue(BaseTestCase):
886
887 ALLOWED_TYPES = ('processes',)
888
889 codes_values = [
890 ('i', 4343, 24234),
891 ('d', 3.625, -4.25),
892 ('h', -232, 234),
893 ('c', latin('x'), latin('y'))
894 ]
895
896 def setUp(self):
897 if not HAS_SHAREDCTYPES:
898 self.skipTest("requires multiprocessing.sharedctypes")
899
900 @classmethod
901 def _test(cls, values):
902 for sv, cv in zip(values, cls.codes_values):
903 sv.value = cv[2]
904
905
906 def test_value(self, raw=False):
907 if raw:
908 values = [self.RawValue(code, value)
909 for code, value, _ in self.codes_values]
910 else:
911 values = [self.Value(code, value)
912 for code, value, _ in self.codes_values]
913
914 for sv, cv in zip(values, self.codes_values):
915 self.assertEqual(sv.value, cv[1])
916
917 proc = self.Process(target=self._test, args=(values,))
918 proc.daemon = True
919 proc.start()
920 proc.join()
921
922 for sv, cv in zip(values, self.codes_values):
923 self.assertEqual(sv.value, cv[2])
924
925 def test_rawvalue(self):
926 self.test_value(raw=True)
927
928 def test_getobj_getlock(self):
929 val1 = self.Value('i', 5)
930 lock1 = val1.get_lock()
931 obj1 = val1.get_obj()
932
933 val2 = self.Value('i', 5, lock=None)
934 lock2 = val2.get_lock()
935 obj2 = val2.get_obj()
936
937 lock = self.Lock()
938 val3 = self.Value('i', 5, lock=lock)
939 lock3 = val3.get_lock()
940 obj3 = val3.get_obj()
941 self.assertEqual(lock, lock3)
942
943 arr4 = self.Value('i', 5, lock=False)
944 self.assertFalse(hasattr(arr4, 'get_lock'))
945 self.assertFalse(hasattr(arr4, 'get_obj'))
946
947 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
948
949 arr5 = self.RawValue('i', 5)
950 self.assertFalse(hasattr(arr5, 'get_lock'))
951 self.assertFalse(hasattr(arr5, 'get_obj'))
952
953
954class _TestArray(BaseTestCase):
955
956 ALLOWED_TYPES = ('processes',)
957
958 @classmethod
959 def f(cls, seq):
960 for i in range(1, len(seq)):
961 seq[i] += seq[i-1]
962
963 @unittest.skipIf(c_int is None, "requires _ctypes")
964 def test_array(self, raw=False):
965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
966 if raw:
967 arr = self.RawArray('i', seq)
968 else:
969 arr = self.Array('i', seq)
970
971 self.assertEqual(len(arr), len(seq))
972 self.assertEqual(arr[3], seq[3])
973 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
974
975 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
976
977 self.assertEqual(list(arr[:]), seq)
978
979 self.f(seq)
980
981 p = self.Process(target=self.f, args=(arr,))
982 p.daemon = True
983 p.start()
984 p.join()
985
986 self.assertEqual(list(arr[:]), seq)
987
988 @unittest.skipIf(c_int is None, "requires _ctypes")
989 def test_array_from_size(self):
990 size = 10
991 # Test for zeroing (see issue #11675).
992 # The repetition below strengthens the test by increasing the chances
993 # of previously allocated non-zero memory being used for the new array
994 # on the 2nd and 3rd loops.
995 for _ in range(3):
996 arr = self.Array('i', size)
997 self.assertEqual(len(arr), size)
998 self.assertEqual(list(arr), [0] * size)
999 arr[:] = range(10)
1000 self.assertEqual(list(arr), range(10))
1001 del arr
1002
1003 @unittest.skipIf(c_int is None, "requires _ctypes")
1004 def test_rawarray(self):
1005 self.test_array(raw=True)
1006
1007 @unittest.skipIf(c_int is None, "requires _ctypes")
1008 def test_array_accepts_long(self):
1009 arr = self.Array('i', 10L)
1010 self.assertEqual(len(arr), 10)
1011 raw_arr = self.RawArray('i', 10L)
1012 self.assertEqual(len(raw_arr), 10)
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
1015 def test_getobj_getlock_obj(self):
1016 arr1 = self.Array('i', range(10))
1017 lock1 = arr1.get_lock()
1018 obj1 = arr1.get_obj()
1019
1020 arr2 = self.Array('i', range(10), lock=None)
1021 lock2 = arr2.get_lock()
1022 obj2 = arr2.get_obj()
1023
1024 lock = self.Lock()
1025 arr3 = self.Array('i', range(10), lock=lock)
1026 lock3 = arr3.get_lock()
1027 obj3 = arr3.get_obj()
1028 self.assertEqual(lock, lock3)
1029
1030 arr4 = self.Array('i', range(10), lock=False)
1031 self.assertFalse(hasattr(arr4, 'get_lock'))
1032 self.assertFalse(hasattr(arr4, 'get_obj'))
1033 self.assertRaises(AttributeError,
1034 self.Array, 'i', range(10), lock='notalock')
1035
1036 arr5 = self.RawArray('i', range(10))
1037 self.assertFalse(hasattr(arr5, 'get_lock'))
1038 self.assertFalse(hasattr(arr5, 'get_obj'))
1039
1040#
1041#
1042#
1043
1044class _TestContainers(BaseTestCase):
1045
1046 ALLOWED_TYPES = ('manager',)
1047
1048 def test_list(self):
1049 a = self.list(range(10))
1050 self.assertEqual(a[:], range(10))
1051
1052 b = self.list()
1053 self.assertEqual(b[:], [])
1054
1055 b.extend(range(5))
1056 self.assertEqual(b[:], range(5))
1057
1058 self.assertEqual(b[2], 2)
1059 self.assertEqual(b[2:10], [2,3,4])
1060
1061 b *= 2
1062 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1063
1064 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1065
1066 self.assertEqual(a[:], range(10))
1067
1068 d = [a, b]
1069 e = self.list(d)
1070 self.assertEqual(
1071 e[:],
1072 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1073 )
1074
1075 f = self.list([a])
1076 a.append('hello')
1077 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1078
1079 def test_dict(self):
1080 d = self.dict()
1081 indices = range(65, 70)
1082 for i in indices:
1083 d[i] = chr(i)
1084 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1085 self.assertEqual(sorted(d.keys()), indices)
1086 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1087 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1088
1089 def test_namespace(self):
1090 n = self.Namespace()
1091 n.name = 'Bob'
1092 n.job = 'Builder'
1093 n._hidden = 'hidden'
1094 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1095 del n.job
1096 self.assertEqual(str(n), "Namespace(name='Bob')")
1097 self.assertTrue(hasattr(n, 'name'))
1098 self.assertTrue(not hasattr(n, 'job'))
1099
1100#
1101#
1102#
1103
1104def sqr(x, wait=0.0):
1105 time.sleep(wait)
1106 return x*x
1107class _TestPool(BaseTestCase):
1108
1109 def test_apply(self):
1110 papply = self.pool.apply
1111 self.assertEqual(papply(sqr, (5,)), sqr(5))
1112 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1113
1114 def test_map(self):
1115 pmap = self.pool.map
1116 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1117 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1118 map(sqr, range(100)))
1119
1120 def test_map_chunksize(self):
1121 try:
1122 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1123 except multiprocessing.TimeoutError:
1124 self.fail("pool.map_async with chunksize stalled on null list")
1125
1126 def test_async(self):
1127 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1128 get = TimingWrapper(res.get)
1129 self.assertEqual(get(), 49)
1130 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1131
1132 def test_async_timeout(self):
1133 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1134 get = TimingWrapper(res.get)
1135 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1136 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1137
1138 def test_imap(self):
1139 it = self.pool.imap(sqr, range(10))
1140 self.assertEqual(list(it), map(sqr, range(10)))
1141
1142 it = self.pool.imap(sqr, range(10))
1143 for i in range(10):
1144 self.assertEqual(it.next(), i*i)
1145 self.assertRaises(StopIteration, it.next)
1146
1147 it = self.pool.imap(sqr, range(1000), chunksize=100)
1148 for i in range(1000):
1149 self.assertEqual(it.next(), i*i)
1150 self.assertRaises(StopIteration, it.next)
1151
1152 def test_imap_unordered(self):
1153 it = self.pool.imap_unordered(sqr, range(1000))
1154 self.assertEqual(sorted(it), map(sqr, range(1000)))
1155
1156 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1157 self.assertEqual(sorted(it), map(sqr, range(1000)))
1158
1159 def test_make_pool(self):
1160 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1161 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1162
1163 p = multiprocessing.Pool(3)
1164 self.assertEqual(3, len(p._pool))
1165 p.close()
1166 p.join()
1167
1168 def test_terminate(self):
1169 if self.TYPE == 'manager':
1170 # On Unix a forked process increfs each shared object to
1171 # which its parent process held a reference. If the
1172 # forked process gets terminated then there is likely to
1173 # be a reference leak. So to prevent
1174 # _TestZZZNumberOfObjects from failing we skip this test
1175 # when using a manager.
1176 return
1177
1178 result = self.pool.map_async(
1179 time.sleep, [0.1 for i in range(10000)], chunksize=1
1180 )
1181 self.pool.terminate()
1182 join = TimingWrapper(self.pool.join)
1183 join()
1184 self.assertTrue(join.elapsed < 0.2)
1185
1186 def test_empty_iterable(self):
1187 # See Issue 12157
1188 p = self.Pool(1)
1189
1190 self.assertEqual(p.map(sqr, []), [])
1191 self.assertEqual(list(p.imap(sqr, [])), [])
1192 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1193 self.assertEqual(p.map_async(sqr, []).get(), [])
1194
1195 p.close()
1196 p.join()
1197
1198def unpickleable_result():
1199 return lambda: 42
1200
1201class _TestPoolWorkerErrors(BaseTestCase):
1202 ALLOWED_TYPES = ('processes', )
1203
1204 def test_unpickleable_result(self):
1205 from multiprocessing.pool import MaybeEncodingError
1206 p = multiprocessing.Pool(2)
1207
1208 # Make sure we don't lose pool processes because of encoding errors.
1209 for iteration in range(20):
1210 res = p.apply_async(unpickleable_result)
1211 self.assertRaises(MaybeEncodingError, res.get)
1212
1213 p.close()
1214 p.join()
1215
1216class _TestPoolWorkerLifetime(BaseTestCase):
1217
1218 ALLOWED_TYPES = ('processes', )
1219 def test_pool_worker_lifetime(self):
1220 p = multiprocessing.Pool(3, maxtasksperchild=10)
1221 self.assertEqual(3, len(p._pool))
1222 origworkerpids = [w.pid for w in p._pool]
1223 # Run many tasks so each worker gets replaced (hopefully)
1224 results = []
1225 for i in range(100):
1226 results.append(p.apply_async(sqr, (i, )))
1227 # Fetch the results and verify we got the right answers,
1228 # also ensuring all the tasks have completed.
1229 for (j, res) in enumerate(results):
1230 self.assertEqual(res.get(), sqr(j))
1231 # Refill the pool
1232 p._repopulate_pool()
1233 # Wait until all workers are alive
1234 # (countdown * DELTA = 5 seconds max startup process time)
1235 countdown = 50
1236 while countdown and not all(w.is_alive() for w in p._pool):
1237 countdown -= 1
1238 time.sleep(DELTA)
1239 finalworkerpids = [w.pid for w in p._pool]
1240 # All pids should be assigned. See issue #7805.
1241 self.assertNotIn(None, origworkerpids)
1242 self.assertNotIn(None, finalworkerpids)
1243 # Finally, check that the worker pids have changed
1244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1245 p.close()
1246 p.join()
1247
1248 def test_pool_worker_lifetime_early_close(self):
1249 # Issue #10332: closing a pool whose workers have limited lifetimes
1250 # before all the tasks completed would make join() hang.
1251 p = multiprocessing.Pool(3, maxtasksperchild=1)
1252 results = []
1253 for i in range(6):
1254 results.append(p.apply_async(sqr, (i, 0.3)))
1255 p.close()
1256 p.join()
1257 # check the results
1258 for (j, res) in enumerate(results):
1259 self.assertEqual(res.get(), sqr(j))
1260
1261
1262#
1263# Test that manager has expected number of shared objects left
1264#
1265
1266class _TestZZZNumberOfObjects(BaseTestCase):
1267 # Because test cases are sorted alphabetically, this one will get
1268 # run after all the other tests for the manager. It tests that
1269 # there have been no "reference leaks" for the manager's shared
1270 # objects. Note the comment in _TestPool.test_terminate().
1271 ALLOWED_TYPES = ('manager',)
1272
1273 def test_number_of_objects(self):
1274 EXPECTED_NUMBER = 1 # the pool object is still alive
1275 multiprocessing.active_children() # discard dead process objs
1276 gc.collect() # do garbage collection
1277 refs = self.manager._number_of_objects()
1278 debug_info = self.manager._debug_info()
1279 if refs != EXPECTED_NUMBER:
1280 print self.manager._debug_info()
1281 print debug_info
1282
1283 self.assertEqual(refs, EXPECTED_NUMBER)
1284
1285#
1286# Test of creating a customized manager class
1287#
1288
1289from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1290
1291class FooBar(object):
1292 def f(self):
1293 return 'f()'
1294 def g(self):
1295 raise ValueError
1296 def _h(self):
1297 return '_h()'
1298
1299def baz():
1300 for i in xrange(10):
1301 yield i*i
1302
1303class IteratorProxy(BaseProxy):
1304 _exposed_ = ('next', '__next__')
1305 def __iter__(self):
1306 return self
1307 def next(self):
1308 return self._callmethod('next')
1309 def __next__(self):
1310 return self._callmethod('__next__')
1311
1312class MyManager(BaseManager):
1313 pass
1314
1315MyManager.register('Foo', callable=FooBar)
1316MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1317MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1318
1319
1320class _TestMyManager(BaseTestCase):
1321
1322 ALLOWED_TYPES = ('manager',)
1323
1324 def test_mymanager(self):
1325 manager = MyManager()
1326 manager.start()
1327
1328 foo = manager.Foo()
1329 bar = manager.Bar()
1330 baz = manager.baz()
1331
1332 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1333 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1334
1335 self.assertEqual(foo_methods, ['f', 'g'])
1336 self.assertEqual(bar_methods, ['f', '_h'])
1337
1338 self.assertEqual(foo.f(), 'f()')
1339 self.assertRaises(ValueError, foo.g)
1340 self.assertEqual(foo._callmethod('f'), 'f()')
1341 self.assertRaises(RemoteError, foo._callmethod, '_h')
1342
1343 self.assertEqual(bar.f(), 'f()')
1344 self.assertEqual(bar._h(), '_h()')
1345 self.assertEqual(bar._callmethod('f'), 'f()')
1346 self.assertEqual(bar._callmethod('_h'), '_h()')
1347
1348 self.assertEqual(list(baz), [i*i for i in range(10)])
1349
1350 manager.shutdown()
1351
1352#
1353# Test of connecting to a remote server and using xmlrpclib for serialization
1354#
1355
1356_queue = Queue.Queue()
1357def get_queue():
1358 return _queue
1359
1360class QueueManager(BaseManager):
1361 '''manager class used by server process'''
1362QueueManager.register('get_queue', callable=get_queue)
1363
1364class QueueManager2(BaseManager):
1365 '''manager class which specifies the same interface as QueueManager'''
1366QueueManager2.register('get_queue')
1367
1368
1369SERIALIZER = 'xmlrpclib'
1370
1371class _TestRemoteManager(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('manager',)
1374
1375 @classmethod
1376 def _putter(cls, address, authkey):
1377 manager = QueueManager2(
1378 address=address, authkey=authkey, serializer=SERIALIZER
1379 )
1380 manager.connect()
1381 queue = manager.get_queue()
1382 queue.put(('hello world', None, True, 2.25))
1383
1384 def test_remote(self):
1385 authkey = os.urandom(32)
1386
1387 manager = QueueManager(
1388 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
1389 )
1390 manager.start()
1391
1392 p = self.Process(target=self._putter, args=(manager.address, authkey))
1393 p.daemon = True
1394 p.start()
1395
1396 manager2 = QueueManager2(
1397 address=manager.address, authkey=authkey, serializer=SERIALIZER
1398 )
1399 manager2.connect()
1400 queue = manager2.get_queue()
1401
1402 # Note that xmlrpclib will deserialize object as a list not a tuple
1403 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1404
1405 # Because we are using xmlrpclib for serialization instead of
1406 # pickle this will cause a serialization error.
1407 self.assertRaises(Exception, queue.put, time.sleep)
1408
1409 # Make queue finalizer run before the server is stopped
1410 del queue
1411 manager.shutdown()
1412
1413class _TestManagerRestart(BaseTestCase):
1414
1415 @classmethod
1416 def _putter(cls, address, authkey):
1417 manager = QueueManager(
1418 address=address, authkey=authkey, serializer=SERIALIZER)
1419 manager.connect()
1420 queue = manager.get_queue()
1421 queue.put('hello world')
1422
1423 def test_rapid_restart(self):
1424 authkey = os.urandom(32)
1425 manager = QueueManager(
1426 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
1427 srvr = manager.get_server()
1428 addr = srvr.address
1429 # Close the connection.Listener socket which gets opened as a part
1430 # of manager.get_server(). It's not needed for the test.
1431 srvr.listener.close()
1432 manager.start()
1433
1434 p = self.Process(target=self._putter, args=(manager.address, authkey))
1435 p.daemon = True
1436 p.start()
1437 queue = manager.get_queue()
1438 self.assertEqual(queue.get(), 'hello world')
1439 del queue
1440 manager.shutdown()
1441 manager = QueueManager(
1442 address=addr, authkey=authkey, serializer=SERIALIZER)
1443 manager.start()
1444 manager.shutdown()
1445
1446#
1447#
1448#
1449
1450SENTINEL = latin('')
1451
1452class _TestConnection(BaseTestCase):
1453
1454 ALLOWED_TYPES = ('processes', 'threads')
1455
1456 @classmethod
1457 def _echo(cls, conn):
1458 for msg in iter(conn.recv_bytes, SENTINEL):
1459 conn.send_bytes(msg)
1460 conn.close()
1461
1462 def test_connection(self):
1463 conn, child_conn = self.Pipe()
1464
1465 p = self.Process(target=self._echo, args=(child_conn,))
1466 p.daemon = True
1467 p.start()
1468
1469 seq = [1, 2.25, None]
1470 msg = latin('hello world')
1471 longmsg = msg * 10
1472 arr = array.array('i', range(4))
1473
1474 if self.TYPE == 'processes':
1475 self.assertEqual(type(conn.fileno()), int)
1476
1477 self.assertEqual(conn.send(seq), None)
1478 self.assertEqual(conn.recv(), seq)
1479
1480 self.assertEqual(conn.send_bytes(msg), None)
1481 self.assertEqual(conn.recv_bytes(), msg)
1482
1483 if self.TYPE == 'processes':
1484 buffer = array.array('i', [0]*10)
1485 expected = list(arr) + [0] * (10 - len(arr))
1486 self.assertEqual(conn.send_bytes(arr), None)
1487 self.assertEqual(conn.recv_bytes_into(buffer),
1488 len(arr) * buffer.itemsize)
1489 self.assertEqual(list(buffer), expected)
1490
1491 buffer = array.array('i', [0]*10)
1492 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1493 self.assertEqual(conn.send_bytes(arr), None)
1494 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1495 len(arr) * buffer.itemsize)
1496 self.assertEqual(list(buffer), expected)
1497
1498 buffer = bytearray(latin(' ' * 40))
1499 self.assertEqual(conn.send_bytes(longmsg), None)
1500 try:
1501 res = conn.recv_bytes_into(buffer)
1502 except multiprocessing.BufferTooShort, e:
1503 self.assertEqual(e.args, (longmsg,))
1504 else:
1505 self.fail('expected BufferTooShort, got %s' % res)
1506
1507 poll = TimingWrapper(conn.poll)
1508
1509 self.assertEqual(poll(), False)
1510 self.assertTimingAlmostEqual(poll.elapsed, 0)
1511
1512 self.assertEqual(poll(TIMEOUT1), False)
1513 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1514
1515 conn.send(None)
1516 time.sleep(.1)
1517
1518 self.assertEqual(poll(TIMEOUT1), True)
1519 self.assertTimingAlmostEqual(poll.elapsed, 0)
1520
1521 self.assertEqual(conn.recv(), None)
1522
1523 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1524 conn.send_bytes(really_big_msg)
1525 self.assertEqual(conn.recv_bytes(), really_big_msg)
1526
1527 conn.send_bytes(SENTINEL) # tell child to quit
1528 child_conn.close()
1529
1530 if self.TYPE == 'processes':
1531 self.assertEqual(conn.readable, True)
1532 self.assertEqual(conn.writable, True)
1533 self.assertRaises(EOFError, conn.recv)
1534 self.assertRaises(EOFError, conn.recv_bytes)
1535
1536 p.join()
1537
1538 def test_duplex_false(self):
1539 reader, writer = self.Pipe(duplex=False)
1540 self.assertEqual(writer.send(1), None)
1541 self.assertEqual(reader.recv(), 1)
1542 if self.TYPE == 'processes':
1543 self.assertEqual(reader.readable, True)
1544 self.assertEqual(reader.writable, False)
1545 self.assertEqual(writer.readable, False)
1546 self.assertEqual(writer.writable, True)
1547 self.assertRaises(IOError, reader.send, 2)
1548 self.assertRaises(IOError, writer.recv)
1549 self.assertRaises(IOError, writer.poll)
1550
1551 def test_spawn_close(self):
1552 # We test that a pipe connection can be closed by parent
1553 # process immediately after child is spawned. On Windows this
1554 # would have sometimes failed on old versions because
1555 # child_conn would be closed before the child got a chance to
1556 # duplicate it.
1557 conn, child_conn = self.Pipe()
1558
1559 p = self.Process(target=self._echo, args=(child_conn,))
1560 p.daemon = True
1561 p.start()
1562 child_conn.close() # this might complete before child initializes
1563
1564 msg = latin('hello')
1565 conn.send_bytes(msg)
1566 self.assertEqual(conn.recv_bytes(), msg)
1567
1568 conn.send_bytes(SENTINEL)
1569 conn.close()
1570 p.join()
1571
1572 def test_sendbytes(self):
1573 if self.TYPE != 'processes':
1574 return
1575
1576 msg = latin('abcdefghijklmnopqrstuvwxyz')
1577 a, b = self.Pipe()
1578
1579 a.send_bytes(msg)
1580 self.assertEqual(b.recv_bytes(), msg)
1581
1582 a.send_bytes(msg, 5)
1583 self.assertEqual(b.recv_bytes(), msg[5:])
1584
1585 a.send_bytes(msg, 7, 8)
1586 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1587
1588 a.send_bytes(msg, 26)
1589 self.assertEqual(b.recv_bytes(), latin(''))
1590
1591 a.send_bytes(msg, 26, 0)
1592 self.assertEqual(b.recv_bytes(), latin(''))
1593
1594 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1597
1598 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1599
1600 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1601
1602 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1603
1604 @classmethod
1605 def _is_fd_assigned(cls, fd):
1606 try:
1607 os.fstat(fd)
1608 except OSError as e:
1609 if e.errno == errno.EBADF:
1610 return False
1611 raise
1612 else:
1613 return True
1614
1615 @classmethod
1616 def _writefd(cls, conn, data, create_dummy_fds=False):
1617 if create_dummy_fds:
1618 for i in range(0, 256):
1619 if not cls._is_fd_assigned(i):
1620 os.dup2(conn.fileno(), i)
1621 fd = reduction.recv_handle(conn)
1622 if msvcrt:
1623 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1624 os.write(fd, data)
1625 os.close(fd)
1626
1627 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1628 def test_fd_transfer(self):
1629 if self.TYPE != 'processes':
1630 self.skipTest("only makes sense with processes")
1631 conn, child_conn = self.Pipe(duplex=True)
1632
1633 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1634 p.daemon = True
1635 p.start()
1636 with open(test_support.TESTFN, "wb") as f:
1637 fd = f.fileno()
1638 if msvcrt:
1639 fd = msvcrt.get_osfhandle(fd)
1640 reduction.send_handle(conn, fd, p.pid)
1641 p.join()
1642 with open(test_support.TESTFN, "rb") as f:
1643 self.assertEqual(f.read(), b"foo")
1644
1645 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1646 @unittest.skipIf(sys.platform == "win32",
1647 "test semantics don't make sense on Windows")
1648 @unittest.skipIf(MAXFD <= 256,
1649 "largest assignable fd number is too small")
1650 @unittest.skipUnless(hasattr(os, "dup2"),
1651 "test needs os.dup2()")
1652 def test_large_fd_transfer(self):
1653 # With fd > 256 (issue #11657)
1654 if self.TYPE != 'processes':
1655 self.skipTest("only makes sense with processes")
1656 conn, child_conn = self.Pipe(duplex=True)
1657
1658 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1659 p.daemon = True
1660 p.start()
1661 with open(test_support.TESTFN, "wb") as f:
1662 fd = f.fileno()
1663 for newfd in range(256, MAXFD):
1664 if not self._is_fd_assigned(newfd):
1665 break
1666 else:
1667 self.fail("could not find an unassigned large file descriptor")
1668 os.dup2(fd, newfd)
1669 try:
1670 reduction.send_handle(conn, newfd, p.pid)
1671 finally:
1672 os.close(newfd)
1673 p.join()
1674 with open(test_support.TESTFN, "rb") as f:
1675 self.assertEqual(f.read(), b"bar")
1676
1677 @classmethod
1678 def _send_data_without_fd(self, conn):
1679 os.write(conn.fileno(), b"\0")
1680
1681 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1682 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1683 def test_missing_fd_transfer(self):
1684 # Check that exception is raised when received data is not
1685 # accompanied by a file descriptor in ancillary data.
1686 if self.TYPE != 'processes':
1687 self.skipTest("only makes sense with processes")
1688 conn, child_conn = self.Pipe(duplex=True)
1689
1690 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1691 p.daemon = True
1692 p.start()
1693 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1694 p.join()
1695
1696class _TestListenerClient(BaseTestCase):
1697
1698 ALLOWED_TYPES = ('processes', 'threads')
1699
1700 @classmethod
1701 def _test(cls, address):
1702 conn = cls.connection.Client(address)
1703 conn.send('hello')
1704 conn.close()
1705
1706 def test_listener_client(self):
1707 for family in self.connection.families:
1708 l = self.connection.Listener(family=family)
1709 p = self.Process(target=self._test, args=(l.address,))
1710 p.daemon = True
1711 p.start()
1712 conn = l.accept()
1713 self.assertEqual(conn.recv(), 'hello')
1714 p.join()
1715 l.close()
1716
1717 def test_issue14725(self):
1718 l = self.connection.Listener()
1719 p = self.Process(target=self._test, args=(l.address,))
1720 p.daemon = True
1721 p.start()
1722 time.sleep(1)
1723 # On Windows the client process should by now have connected,
1724 # written data and closed the pipe handle by now. This causes
1725 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1726 # 14725.
1727 conn = l.accept()
1728 self.assertEqual(conn.recv(), 'hello')
1729 conn.close()
1730 p.join()
1731 l.close()
1732
1733#
1734# Test of sending connection and socket objects between processes
1735#
1736"""
1737class _TestPicklingConnections(BaseTestCase):
1738
1739 ALLOWED_TYPES = ('processes',)
1740
1741 def _listener(self, conn, families):
1742 for fam in families:
1743 l = self.connection.Listener(family=fam)
1744 conn.send(l.address)
1745 new_conn = l.accept()
1746 conn.send(new_conn)
1747
1748 if self.TYPE == 'processes':
1749 l = socket.socket()
1750 l.bind(('localhost', 0))
1751 conn.send(l.getsockname())
1752 l.listen(1)
1753 new_conn, addr = l.accept()
1754 conn.send(new_conn)
1755
1756 conn.recv()
1757
1758 def _remote(self, conn):
1759 for (address, msg) in iter(conn.recv, None):
1760 client = self.connection.Client(address)
1761 client.send(msg.upper())
1762 client.close()
1763
1764 if self.TYPE == 'processes':
1765 address, msg = conn.recv()
1766 client = socket.socket()
1767 client.connect(address)
1768 client.sendall(msg.upper())
1769 client.close()
1770
1771 conn.close()
1772
1773 def test_pickling(self):
1774 try:
1775 multiprocessing.allow_connection_pickling()
1776 except ImportError:
1777 return
1778
1779 families = self.connection.families
1780
1781 lconn, lconn0 = self.Pipe()
1782 lp = self.Process(target=self._listener, args=(lconn0, families))
1783 lp.daemon = True
1784 lp.start()
1785 lconn0.close()
1786
1787 rconn, rconn0 = self.Pipe()
1788 rp = self.Process(target=self._remote, args=(rconn0,))
1789 rp.daemon = True
1790 rp.start()
1791 rconn0.close()
1792
1793 for fam in families:
1794 msg = ('This connection uses family %s' % fam).encode('ascii')
1795 address = lconn.recv()
1796 rconn.send((address, msg))
1797 new_conn = lconn.recv()
1798 self.assertEqual(new_conn.recv(), msg.upper())
1799
1800 rconn.send(None)
1801
1802 if self.TYPE == 'processes':
1803 msg = latin('This connection uses a normal socket')
1804 address = lconn.recv()
1805 rconn.send((address, msg))
1806 if hasattr(socket, 'fromfd'):
1807 new_conn = lconn.recv()
1808 self.assertEqual(new_conn.recv(100), msg.upper())
1809 else:
1810 # XXX On Windows with Py2.6 need to backport fromfd()
1811 discard = lconn.recv_bytes()
1812
1813 lconn.send(None)
1814
1815 rconn.close()
1816 lconn.close()
1817
1818 lp.join()
1819 rp.join()
1820"""
1821#
1822#
1823#
1824
1825class _TestHeap(BaseTestCase):
1826
1827 ALLOWED_TYPES = ('processes',)
1828
1829 def test_heap(self):
1830 iterations = 5000
1831 maxblocks = 50
1832 blocks = []
1833
1834 # create and destroy lots of blocks of different sizes
1835 for i in xrange(iterations):
1836 size = int(random.lognormvariate(0, 1) * 1000)
1837 b = multiprocessing.heap.BufferWrapper(size)
1838 blocks.append(b)
1839 if len(blocks) > maxblocks:
1840 i = random.randrange(maxblocks)
1841 del blocks[i]
1842
1843 # get the heap object
1844 heap = multiprocessing.heap.BufferWrapper._heap
1845
1846 # verify the state of the heap
1847 all = []
1848 occupied = 0
1849 heap._lock.acquire()
1850 self.addCleanup(heap._lock.release)
1851 for L in heap._len_to_seq.values():
1852 for arena, start, stop in L:
1853 all.append((heap._arenas.index(arena), start, stop,
1854 stop-start, 'free'))
1855 for arena, start, stop in heap._allocated_blocks:
1856 all.append((heap._arenas.index(arena), start, stop,
1857 stop-start, 'occupied'))
1858 occupied += (stop-start)
1859
1860 all.sort()
1861
1862 for i in range(len(all)-1):
1863 (arena, start, stop) = all[i][:3]
1864 (narena, nstart, nstop) = all[i+1][:3]
1865 self.assertTrue((arena != narena and nstart == 0) or
1866 (stop == nstart))
1867
1868 def test_free_from_gc(self):
1869 # Check that freeing of blocks by the garbage collector doesn't deadlock
1870 # (issue #12352).
1871 # Make sure the GC is enabled, and set lower collection thresholds to
1872 # make collections more frequent (and increase the probability of
1873 # deadlock).
1874 if not gc.isenabled():
1875 gc.enable()
1876 self.addCleanup(gc.disable)
1877 thresholds = gc.get_threshold()
1878 self.addCleanup(gc.set_threshold, *thresholds)
1879 gc.set_threshold(10)
1880
1881 # perform numerous block allocations, with cyclic references to make
1882 # sure objects are collected asynchronously by the gc
1883 for i in range(5000):
1884 a = multiprocessing.heap.BufferWrapper(1)
1885 b = multiprocessing.heap.BufferWrapper(1)
1886 # circular references
1887 a.buddy = b
1888 b.buddy = a
1889
1890#
1891#
1892#
1893
1894class _Foo(Structure):
1895 _fields_ = [
1896 ('x', c_int),
1897 ('y', c_double)
1898 ]
1899
1900class _TestSharedCTypes(BaseTestCase):
1901
1902 ALLOWED_TYPES = ('processes',)
1903
1904 def setUp(self):
1905 if not HAS_SHAREDCTYPES:
1906 self.skipTest("requires multiprocessing.sharedctypes")
1907
1908 @classmethod
1909 def _double(cls, x, y, foo, arr, string):
1910 x.value *= 2
1911 y.value *= 2
1912 foo.x *= 2
1913 foo.y *= 2
1914 string.value *= 2
1915 for i in range(len(arr)):
1916 arr[i] *= 2
1917
1918 def test_sharedctypes(self, lock=False):
1919 x = Value('i', 7, lock=lock)
1920 y = Value(c_double, 1.0/3.0, lock=lock)
1921 foo = Value(_Foo, 3, 2, lock=lock)
1922 arr = self.Array('d', range(10), lock=lock)
1923 string = self.Array('c', 20, lock=lock)
1924 string.value = latin('hello')
1925
1926 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1927 p.daemon = True
1928 p.start()
1929 p.join()
1930
1931 self.assertEqual(x.value, 14)
1932 self.assertAlmostEqual(y.value, 2.0/3.0)
1933 self.assertEqual(foo.x, 6)
1934 self.assertAlmostEqual(foo.y, 4.0)
1935 for i in range(10):
1936 self.assertAlmostEqual(arr[i], i*2)
1937 self.assertEqual(string.value, latin('hellohello'))
1938
1939 def test_synchronize(self):
1940 self.test_sharedctypes(lock=True)
1941
1942 def test_copy(self):
1943 foo = _Foo(2, 5.0)
1944 bar = copy(foo)
1945 foo.x = 0
1946 foo.y = 0
1947 self.assertEqual(bar.x, 2)
1948 self.assertAlmostEqual(bar.y, 5.0)
1949
1950#
1951#
1952#
1953
1954class _TestFinalize(BaseTestCase):
1955
1956 ALLOWED_TYPES = ('processes',)
1957
1958 @classmethod
1959 def _test_finalize(cls, conn):
1960 class Foo(object):
1961 pass
1962
1963 a = Foo()
1964 util.Finalize(a, conn.send, args=('a',))
1965 del a # triggers callback for a
1966
1967 b = Foo()
1968 close_b = util.Finalize(b, conn.send, args=('b',))
1969 close_b() # triggers callback for b
1970 close_b() # does nothing because callback has already been called
1971 del b # does nothing because callback has already been called
1972
1973 c = Foo()
1974 util.Finalize(c, conn.send, args=('c',))
1975
1976 d10 = Foo()
1977 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1978
1979 d01 = Foo()
1980 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1981 d02 = Foo()
1982 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1983 d03 = Foo()
1984 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1985
1986 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1987
1988 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1989
1990 # call multiprocessing's cleanup function then exit process without
1991 # garbage collecting locals
1992 util._exit_function()
1993 conn.close()
1994 os._exit(0)
1995
1996 def test_finalize(self):
1997 conn, child_conn = self.Pipe()
1998
1999 p = self.Process(target=self._test_finalize, args=(child_conn,))
2000 p.daemon = True
2001 p.start()
2002 p.join()
2003
2004 result = [obj for obj in iter(conn.recv, 'STOP')]
2005 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2006
2007#
2008# Test that from ... import * works for each module
2009#
2010
2011class _TestImportStar(BaseTestCase):
2012
2013 ALLOWED_TYPES = ('processes',)
2014
2015 def test_import(self):
2016 modules = [
2017 'multiprocessing', 'multiprocessing.connection',
2018 'multiprocessing.heap', 'multiprocessing.managers',
2019 'multiprocessing.pool', 'multiprocessing.process',
2020 'multiprocessing.synchronize', 'multiprocessing.util'
2021 ]
2022
2023 if HAS_REDUCTION:
2024 modules.append('multiprocessing.reduction')
2025
2026 if c_int is not None:
2027 # This module requires _ctypes
2028 modules.append('multiprocessing.sharedctypes')
2029
2030 for name in modules:
2031 __import__(name)
2032 mod = sys.modules[name]
2033
2034 for attr in getattr(mod, '__all__', ()):
2035 self.assertTrue(
2036 hasattr(mod, attr),
2037 '%r does not have attribute %r' % (mod, attr)
2038 )
2039
2040#
2041# Quick test that logging works -- does not test logging output
2042#
2043
2044class _TestLogging(BaseTestCase):
2045
2046 ALLOWED_TYPES = ('processes',)
2047
2048 def test_enable_logging(self):
2049 logger = multiprocessing.get_logger()
2050 logger.setLevel(util.SUBWARNING)
2051 self.assertTrue(logger is not None)
2052 logger.debug('this will not be printed')
2053 logger.info('nor will this')
2054 logger.setLevel(LOG_LEVEL)
2055
2056 @classmethod
2057 def _test_level(cls, conn):
2058 logger = multiprocessing.get_logger()
2059 conn.send(logger.getEffectiveLevel())
2060
2061 def test_level(self):
2062 LEVEL1 = 32
2063 LEVEL2 = 37
2064
2065 logger = multiprocessing.get_logger()
2066 root_logger = logging.getLogger()
2067 root_level = root_logger.level
2068
2069 reader, writer = multiprocessing.Pipe(duplex=False)
2070
2071 logger.setLevel(LEVEL1)
2072 p = self.Process(target=self._test_level, args=(writer,))
2073 p.daemon = True
2074 p.start()
2075 self.assertEqual(LEVEL1, reader.recv())
2076
2077 logger.setLevel(logging.NOTSET)
2078 root_logger.setLevel(LEVEL2)
2079 p = self.Process(target=self._test_level, args=(writer,))
2080 p.daemon = True
2081 p.start()
2082 self.assertEqual(LEVEL2, reader.recv())
2083
2084 root_logger.setLevel(root_level)
2085 logger.setLevel(level=LOG_LEVEL)
2086
2087
2088# class _TestLoggingProcessName(BaseTestCase):
2089#
2090# def handle(self, record):
2091# assert record.processName == multiprocessing.current_process().name
2092# self.__handled = True
2093#
2094# def test_logging(self):
2095# handler = logging.Handler()
2096# handler.handle = self.handle
2097# self.__handled = False
2098# # Bypass getLogger() and side-effects
2099# logger = logging.getLoggerClass()(
2100# 'multiprocessing.test.TestLoggingProcessName')
2101# logger.addHandler(handler)
2102# logger.propagate = False
2103#
2104# logger.warn('foo')
2105# assert self.__handled
2106
2107#
2108# Check that Process.join() retries if os.waitpid() fails with EINTR
2109#
2110
2111class _TestPollEintr(BaseTestCase):
2112
2113 ALLOWED_TYPES = ('processes',)
2114
2115 @classmethod
2116 def _killer(cls, pid):
2117 time.sleep(0.5)
2118 os.kill(pid, signal.SIGUSR1)
2119
2120 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2121 def test_poll_eintr(self):
2122 got_signal = [False]
2123 def record(*args):
2124 got_signal[0] = True
2125 pid = os.getpid()
2126 oldhandler = signal.signal(signal.SIGUSR1, record)
2127 try:
2128 killer = self.Process(target=self._killer, args=(pid,))
2129 killer.start()
2130 p = self.Process(target=time.sleep, args=(1,))
2131 p.start()
2132 p.join()
2133 self.assertTrue(got_signal[0])
2134 self.assertEqual(p.exitcode, 0)
2135 killer.join()
2136 finally:
2137 signal.signal(signal.SIGUSR1, oldhandler)
2138
2139#
2140# Test to verify handle verification, see issue 3321
2141#
2142
2143class TestInvalidHandle(unittest.TestCase):
2144
2145 @unittest.skipIf(WIN32, "skipped on Windows")
2146 def test_invalid_handles(self):
2147 conn = _multiprocessing.Connection(44977608)
2148 self.assertRaises(IOError, conn.poll)
2149 self.assertRaises(IOError, _multiprocessing.Connection, -1)
2150
2151#
2152# Functions used to create test cases from the base ones in this module
2153#
2154
2155def get_attributes(Source, names):
2156 d = {}
2157 for name in names:
2158 obj = getattr(Source, name)
2159 if type(obj) == type(get_attributes):
2160 obj = staticmethod(obj)
2161 d[name] = obj
2162 return d
2163
2164def create_test_cases(Mixin, type):
2165 result = {}
2166 glob = globals()
2167 Type = type.capitalize()
2168
2169 for name in glob.keys():
2170 if name.startswith('_Test'):
2171 base = glob[name]
2172 if type in base.ALLOWED_TYPES:
2173 newname = 'With' + Type + name[1:]
2174 class Temp(base, unittest.TestCase, Mixin):
2175 pass
2176 result[newname] = Temp
2177 Temp.__name__ = newname
2178 Temp.__module__ = Mixin.__module__
2179 return result
2180
2181#
2182# Create test cases
2183#
2184
2185class ProcessesMixin(object):
2186 TYPE = 'processes'
2187 Process = multiprocessing.Process
2188 locals().update(get_attributes(multiprocessing, (
2189 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2190 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2191 'RawArray', 'current_process', 'active_children', 'Pipe',
2192 'connection', 'JoinableQueue', 'Pool'
2193 )))
2194
2195testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2196globals().update(testcases_processes)
2197
2198
2199class ManagerMixin(object):
2200 TYPE = 'manager'
2201 Process = multiprocessing.Process
2202 manager = object.__new__(multiprocessing.managers.SyncManager)
2203 locals().update(get_attributes(manager, (
2204 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2205 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2206 'Namespace', 'JoinableQueue', 'Pool'
2207 )))
2208
2209testcases_manager = create_test_cases(ManagerMixin, type='manager')
2210globals().update(testcases_manager)
2211
2212
2213class ThreadsMixin(object):
2214 TYPE = 'threads'
2215 Process = multiprocessing.dummy.Process
2216 locals().update(get_attributes(multiprocessing.dummy, (
2217 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2218 'Condition', 'Event', 'Value', 'Array', 'current_process',
2219 'active_children', 'Pipe', 'connection', 'dict', 'list',
2220 'Namespace', 'JoinableQueue', 'Pool'
2221 )))
2222
2223testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2224globals().update(testcases_threads)
2225
2226class OtherTest(unittest.TestCase):
2227 # TODO: add more tests for deliver/answer challenge.
2228 def test_deliver_challenge_auth_failure(self):
2229 class _FakeConnection(object):
2230 def recv_bytes(self, size):
2231 return b'something bogus'
2232 def send_bytes(self, data):
2233 pass
2234 self.assertRaises(multiprocessing.AuthenticationError,
2235 multiprocessing.connection.deliver_challenge,
2236 _FakeConnection(), b'abc')
2237
2238 def test_answer_challenge_auth_failure(self):
2239 class _FakeConnection(object):
2240 def __init__(self):
2241 self.count = 0
2242 def recv_bytes(self, size):
2243 self.count += 1
2244 if self.count == 1:
2245 return multiprocessing.connection.CHALLENGE
2246 elif self.count == 2:
2247 return b'something bogus'
2248 return b''
2249 def send_bytes(self, data):
2250 pass
2251 self.assertRaises(multiprocessing.AuthenticationError,
2252 multiprocessing.connection.answer_challenge,
2253 _FakeConnection(), b'abc')
2254
2255#
2256# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2257#
2258
2259def initializer(ns):
2260 ns.test += 1
2261
2262class TestInitializers(unittest.TestCase):
2263 def setUp(self):
2264 self.mgr = multiprocessing.Manager()
2265 self.ns = self.mgr.Namespace()
2266 self.ns.test = 0
2267
2268 def tearDown(self):
2269 self.mgr.shutdown()
2270
2271 def test_manager_initializer(self):
2272 m = multiprocessing.managers.SyncManager()
2273 self.assertRaises(TypeError, m.start, 1)
2274 m.start(initializer, (self.ns,))
2275 self.assertEqual(self.ns.test, 1)
2276 m.shutdown()
2277
2278 def test_pool_initializer(self):
2279 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2280 p = multiprocessing.Pool(1, initializer, (self.ns,))
2281 p.close()
2282 p.join()
2283 self.assertEqual(self.ns.test, 1)
2284
2285#
2286# Issue 5155, 5313, 5331: Test process in processes
2287# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2288#
2289
2290def _this_sub_process(q):
2291 try:
2292 item = q.get(block=False)
2293 except Queue.Empty:
2294 pass
2295
2296def _test_process(q):
2297 queue = multiprocessing.Queue()
2298 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2299 subProc.daemon = True
2300 subProc.start()
2301 subProc.join()
2302
2303def _afunc(x):
2304 return x*x
2305
2306def pool_in_process():
2307 pool = multiprocessing.Pool(processes=4)
2308 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2309
2310class _file_like(object):
2311 def __init__(self, delegate):
2312 self._delegate = delegate
2313 self._pid = None
2314
2315 @property
2316 def cache(self):
2317 pid = os.getpid()
2318 # There are no race conditions since fork keeps only the running thread
2319 if pid != self._pid:
2320 self._pid = pid
2321 self._cache = []
2322 return self._cache
2323
2324 def write(self, data):
2325 self.cache.append(data)
2326
2327 def flush(self):
2328 self._delegate.write(''.join(self.cache))
2329 self._cache = []
2330
2331class TestStdinBadfiledescriptor(unittest.TestCase):
2332
2333 def test_queue_in_process(self):
2334 queue = multiprocessing.Queue()
2335 proc = multiprocessing.Process(target=_test_process, args=(queue,))
2336 proc.start()
2337 proc.join()
2338
2339 def test_pool_in_process(self):
2340 p = multiprocessing.Process(target=pool_in_process)
2341 p.start()
2342 p.join()
2343
2344 def test_flushing(self):
2345 sio = StringIO()
2346 flike = _file_like(sio)
2347 flike.write('foo')
2348 proc = multiprocessing.Process(target=lambda: flike.flush())
2349 flike.flush()
2350 assert sio.getvalue() == 'foo'
2351
2352#
2353# Test interaction with socket timeouts - see Issue #6056
2354#
2355
2356class TestTimeouts(unittest.TestCase):
2357 @classmethod
2358 def _test_timeout(cls, child, address):
2359 time.sleep(1)
2360 child.send(123)
2361 child.close()
2362 conn = multiprocessing.connection.Client(address)
2363 conn.send(456)
2364 conn.close()
2365
2366 def test_timeout(self):
2367 old_timeout = socket.getdefaulttimeout()
2368 try:
2369 socket.setdefaulttimeout(0.1)
2370 parent, child = multiprocessing.Pipe(duplex=True)
2371 l = multiprocessing.connection.Listener(family='AF_INET')
2372 p = multiprocessing.Process(target=self._test_timeout,
2373 args=(child, l.address))
2374 p.start()
2375 child.close()
2376 self.assertEqual(parent.recv(), 123)
2377 parent.close()
2378 conn = l.accept()
2379 self.assertEqual(conn.recv(), 456)
2380 conn.close()
2381 l.close()
2382 p.join(10)
2383 finally:
2384 socket.setdefaulttimeout(old_timeout)
2385
2386#
2387# Test what happens with no "if __name__ == '__main__'"
2388#
2389
2390class TestNoForkBomb(unittest.TestCase):
2391 def test_noforkbomb(self):
2392 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2393 if WIN32:
2394 rc, out, err = test.script_helper.assert_python_failure(name)
2395 self.assertEqual('', out.decode('ascii'))
2396 self.assertIn('RuntimeError', err.decode('ascii'))
2397 else:
2398 rc, out, err = test.script_helper.assert_python_ok(name)
2399 self.assertEqual('123', out.decode('ascii').rstrip())
2400 self.assertEqual('', err.decode('ascii'))
2401
2402#
2403# Issue 12098: check sys.flags of child matches that for parent
2404#
2405
2406class TestFlags(unittest.TestCase):
2407 @classmethod
2408 def run_in_grandchild(cls, conn):
2409 conn.send(tuple(sys.flags))
2410
2411 @classmethod
2412 def run_in_child(cls):
2413 import json
2414 r, w = multiprocessing.Pipe(duplex=False)
2415 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2416 p.start()
2417 grandchild_flags = r.recv()
2418 p.join()
2419 r.close()
2420 w.close()
2421 flags = (tuple(sys.flags), grandchild_flags)
2422 print(json.dumps(flags))
2423
2424 def test_flags(self):
2425 import json, subprocess
2426 # start child process using unusual flags
2427 prog = ('from test.test_multiprocessing import TestFlags; ' +
2428 'TestFlags.run_in_child()')
2429 data = subprocess.check_output(
2430 [sys.executable, '-E', '-B', '-O', '-c', prog])
2431 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2432 self.assertEqual(child_flags, grandchild_flags)
2433
2434#
2435# Issue #17555: ForkAwareThreadLock
2436#
2437
2438class TestForkAwareThreadLock(unittest.TestCase):
2439 # We recurisvely start processes. Issue #17555 meant that the
2440 # after fork registry would get duplicate entries for the same
2441 # lock. The size of the registry at generation n was ~2**n.
2442
2443 @classmethod
2444 def child(cls, n, conn):
2445 if n > 1:
2446 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2447 p.start()
2448 p.join()
2449 else:
2450 conn.send(len(util._afterfork_registry))
2451 conn.close()
2452
2453 def test_lock(self):
2454 r, w = multiprocessing.Pipe(False)
2455 l = util.ForkAwareThreadLock()
2456 old_size = len(util._afterfork_registry)
2457 p = multiprocessing.Process(target=self.child, args=(5, w))
2458 p.start()
2459 new_size = r.recv()
2460 p.join()
2461 self.assertLessEqual(new_size, old_size)
2462
2463#
2464# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2465#
2466
2467class TestIgnoreEINTR(unittest.TestCase):
2468
2469 @classmethod
2470 def _test_ignore(cls, conn):
2471 def handler(signum, frame):
2472 pass
2473 signal.signal(signal.SIGUSR1, handler)
2474 conn.send('ready')
2475 x = conn.recv()
2476 conn.send(x)
2477 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2478
2479 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2480 def test_ignore(self):
2481 conn, child_conn = multiprocessing.Pipe()
2482 try:
2483 p = multiprocessing.Process(target=self._test_ignore,
2484 args=(child_conn,))
2485 p.daemon = True
2486 p.start()
2487 child_conn.close()
2488 self.assertEqual(conn.recv(), 'ready')
2489 time.sleep(0.1)
2490 os.kill(p.pid, signal.SIGUSR1)
2491 time.sleep(0.1)
2492 conn.send(1234)
2493 self.assertEqual(conn.recv(), 1234)
2494 time.sleep(0.1)
2495 os.kill(p.pid, signal.SIGUSR1)
2496 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2497 time.sleep(0.1)
2498 p.join()
2499 finally:
2500 conn.close()
2501
2502 @classmethod
2503 def _test_ignore_listener(cls, conn):
2504 def handler(signum, frame):
2505 pass
2506 signal.signal(signal.SIGUSR1, handler)
2507 l = multiprocessing.connection.Listener()
2508 conn.send(l.address)
2509 a = l.accept()
2510 a.send('welcome')
2511
2512 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2513 def test_ignore_listener(self):
2514 conn, child_conn = multiprocessing.Pipe()
2515 try:
2516 p = multiprocessing.Process(target=self._test_ignore_listener,
2517 args=(child_conn,))
2518 p.daemon = True
2519 p.start()
2520 child_conn.close()
2521 address = conn.recv()
2522 time.sleep(0.1)
2523 os.kill(p.pid, signal.SIGUSR1)
2524 time.sleep(0.1)
2525 client = multiprocessing.connection.Client(address)
2526 self.assertEqual(client.recv(), 'welcome')
2527 p.join()
2528 finally:
2529 conn.close()
2530
2531#
2532#
2533#
2534
2535testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2536 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
2537 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
2538
2539#
2540#
2541#
2542
2543def test_main(run=None):
2544 if sys.platform.startswith("linux"):
2545 try:
2546 lock = multiprocessing.RLock()
2547 except OSError:
2548 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
2549
2550 check_enough_semaphores()
2551
2552 if run is None:
2553 from test.test_support import run_unittest as run
2554
2555 util.get_temp_dir() # creates temp directory for use by all processes
2556
2557 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2558
2559 ProcessesMixin.pool = multiprocessing.Pool(4)
2560 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2561 ManagerMixin.manager.__init__()
2562 ManagerMixin.manager.start()
2563 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
2564
2565 testcases = (
2566 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2567 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
2568 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2569 testcases_other
2570 )
2571
2572 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2573 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2574 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2575 # module during these tests is at least platform dependent and possibly
2576 # non-deterministic on any given platform. So we don't mind if the listed
2577 # warnings aren't actually raised.
2578 with test_support.check_py3k_warnings(
2579 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2580 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2581 quiet=True):
2582 run(suite)
2583
2584 ThreadsMixin.pool.terminate()
2585 ProcessesMixin.pool.terminate()
2586 ManagerMixin.pool.terminate()
2587 ManagerMixin.manager.shutdown()
2588
2589 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2590
2591def main():
2592 test_main(unittest.TextTestRunner(verbosity=2).run)
2593
2594if __name__ == '__main__':
2595 main()
Note: See TracBrowser for help on using the repository browser.