Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_multiprocessing.py

    r2 r391  
    66
    77import unittest
    8 import threading
    98import Queue
    109import time
     
    1413import signal
    1514import array
    16 import copy
    1715import socket
    1816import random
    1917import logging
     18import errno
     19import test.script_helper
     20from test import test_support
    2021from StringIO import StringIO
    21 from test import test_support
    22 
     22_multiprocessing = test_support.import_module('_multiprocessing')
     23# import threading after _multiprocessing to raise a more relevant error
     24# message: "No module named _multiprocessing". _multiprocessing is not compiled
     25# without thread support.
     26import threading
    2327
    2428# Work around broken sem_open implementations
    25 try:
    26     import multiprocessing.synchronize
    27 except ImportError, e:
    28     from test.test_support import TestSkipped
    29     raise TestSkipped(e)
     29test_support.import_module('multiprocessing.synchronize')
    3030
    3131import multiprocessing.dummy
     
    3434import multiprocessing.heap
    3535import multiprocessing.pool
    36 import _multiprocessing
    3736
    3837from multiprocessing import util
    3938
     39try:
     40    from multiprocessing import reduction
     41    HAS_REDUCTION = True
     42except ImportError:
     43    HAS_REDUCTION = False
     44
     45try:
     46    from multiprocessing.sharedctypes import Value, copy
     47    HAS_SHAREDCTYPES = True
     48except ImportError:
     49    HAS_SHAREDCTYPES = False
     50
     51try:
     52    import msvcrt
     53except ImportError:
     54    msvcrt = None
     55
    4056#
    4157#
     
    4965
    5066LOG_LEVEL = util.SUBWARNING
    51 #LOG_LEVEL = logging.WARNING
     67#LOG_LEVEL = logging.DEBUG
    5268
    5369DELTA = 0.1
     
    6682WIN32 = (sys.platform == "win32")
    6783
     84try:
     85    MAXFD = os.sysconf("SC_OPEN_MAX")
     86except:
     87    MAXFD = 256
     88
     89#
     90# Some tests require ctypes
     91#
     92
     93try:
     94    from ctypes import Structure, c_int, c_double
     95except ImportError:
     96    Structure = object
     97    c_int = c_double = None
     98
     99
     100def check_enough_semaphores():
     101    """Check that the system supports enough semaphores to run the test."""
     102    # minimum number of semaphores available according to POSIX
     103    nsems_min = 256
     104    try:
     105        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
     106    except (AttributeError, ValueError):
     107        # sysconf not available or setting not available
     108        return
     109    if nsems == -1 or nsems >= nsems_min:
     110        return
     111    raise unittest.SkipTest("The OS doesn't support enough semaphores "
     112                            "to run the test (required: %d)." % nsems_min)
     113
     114
    68115#
    69116# Creates a wrapper for a function which records the time it takes to finish
     
    102149        else:
    103150            return self.assertEqual(value, res)
     151
     152    # For the sanity of Windows users, rather than crashing or freezing in
     153    # multiple ways.
     154    def __reduce__(self, *args):
     155        raise NotImplementedError("shouldn't try to pickle a test case")
     156
     157    __reduce_ex__ = __reduce__
    104158
    105159#
     
    136190        self.assertTrue(current.is_alive())
    137191        self.assertTrue(not current.daemon)
    138         self.assertTrue(isinstance(authkey, bytes))
     192        self.assertIsInstance(authkey, bytes)
    139193        self.assertTrue(len(authkey) > 0)
    140194        self.assertEqual(current.ident, os.getpid())
    141195        self.assertEqual(current.exitcode, None)
    142196
    143     def _test(self, q, *args, **kwds):
    144         current = self.current_process()
     197    @classmethod
     198    def _test(cls, q, *args, **kwds):
     199        current = cls.current_process()
    145200        q.put(args)
    146201        q.put(kwds)
    147202        q.put(current.name)
    148         if self.TYPE != 'threads':
     203        if cls.TYPE != 'threads':
    149204            q.put(bytes(current.authkey))
    150205            q.put(current.pid)
     
    163218
    164219        if self.TYPE != 'threads':
    165             self.assertEquals(p.authkey, current.authkey)
    166         self.assertEquals(p.is_alive(), False)
    167         self.assertEquals(p.daemon, True)
    168         self.assertTrue(p not in self.active_children())
     220            self.assertEqual(p.authkey, current.authkey)
     221        self.assertEqual(p.is_alive(), False)
     222        self.assertEqual(p.daemon, True)
     223        self.assertNotIn(p, self.active_children())
    169224        self.assertTrue(type(self.active_children()) is list)
    170225        self.assertEqual(p.exitcode, None)
     
    172227        p.start()
    173228
    174         self.assertEquals(p.exitcode, None)
    175         self.assertEquals(p.is_alive(), True)
    176         self.assertTrue(p in self.active_children())
    177 
    178         self.assertEquals(q.get(), args[1:])
    179         self.assertEquals(q.get(), kwargs)
    180         self.assertEquals(q.get(), p.name)
     229        self.assertEqual(p.exitcode, None)
     230        self.assertEqual(p.is_alive(), True)
     231        self.assertIn(p, self.active_children())
     232
     233        self.assertEqual(q.get(), args[1:])
     234        self.assertEqual(q.get(), kwargs)
     235        self.assertEqual(q.get(), p.name)
    181236        if self.TYPE != 'threads':
    182             self.assertEquals(q.get(), current.authkey)
    183             self.assertEquals(q.get(), p.pid)
     237            self.assertEqual(q.get(), current.authkey)
     238            self.assertEqual(q.get(), p.pid)
    184239
    185240        p.join()
    186241
    187         self.assertEquals(p.exitcode, 0)
    188         self.assertEquals(p.is_alive(), False)
    189         self.assertTrue(p not in self.active_children())
    190 
    191     def _test_terminate(self):
     242        self.assertEqual(p.exitcode, 0)
     243        self.assertEqual(p.is_alive(), False)
     244        self.assertNotIn(p, self.active_children())
     245
     246    @classmethod
     247    def _test_terminate(cls):
    192248        time.sleep(1000)
    193249
     
    201257
    202258        self.assertEqual(p.is_alive(), True)
    203         self.assertTrue(p in self.active_children())
     259        self.assertIn(p, self.active_children())
    204260        self.assertEqual(p.exitcode, None)
    205261
     
    211267
    212268        self.assertEqual(p.is_alive(), False)
    213         self.assertTrue(p not in self.active_children())
     269        self.assertNotIn(p, self.active_children())
    214270
    215271        p.join()
     
    230286
    231287        p = self.Process(target=time.sleep, args=(DELTA,))
    232         self.assertTrue(p not in self.active_children())
    233 
     288        self.assertNotIn(p, self.active_children())
     289
     290        p.daemon = True
    234291        p.start()
    235         self.assertTrue(p in self.active_children())
     292        self.assertIn(p, self.active_children())
    236293
    237294        p.join()
    238         self.assertTrue(p not in self.active_children())
    239 
    240     def _test_recursion(self, wconn, id):
     295        self.assertNotIn(p, self.active_children())
     296
     297    @classmethod
     298    def _test_recursion(cls, wconn, id):
    241299        from multiprocessing import forking
    242300        wconn.send(id)
    243301        if len(id) < 2:
    244302            for i in range(2):
    245                 p = self.Process(
    246                     target=self._test_recursion, args=(wconn, id+[i])
     303                p = cls.Process(
     304                    target=cls._test_recursion, args=(wconn, id+[i])
    247305                    )
    248306                p.start()
     
    269327        self.assertEqual(result, expected)
    270328
     329    @classmethod
     330    def _test_sys_exit(cls, reason, testfn):
     331        sys.stderr = open(testfn, 'w')
     332        sys.exit(reason)
     333
     334    def test_sys_exit(self):
     335        # See Issue 13854
     336        if self.TYPE == 'threads':
     337            return
     338
     339        testfn = test_support.TESTFN
     340        self.addCleanup(test_support.unlink, testfn)
     341
     342        for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
     343            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
     344            p.daemon = True
     345            p.start()
     346            p.join(5)
     347            self.assertEqual(p.exitcode, code)
     348
     349            with open(testfn, 'r') as f:
     350                self.assertEqual(f.read().rstrip(), str(reason))
     351
     352        for reason in (True, False, 8):
     353            p = self.Process(target=sys.exit, args=(reason,))
     354            p.daemon = True
     355            p.start()
     356            p.join(5)
     357            self.assertEqual(p.exitcode, reason)
     358
    271359#
    272360#
     
    301389    def test_subclassing(self):
    302390        uppercaser = _UpperCaser()
     391        uppercaser.daemon = True
    303392        uppercaser.start()
    304393        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
     
    327416
    328417
    329     def _test_put(self, queue, child_can_start, parent_can_continue):
     418    @classmethod
     419    def _test_put(cls, queue, child_can_start, parent_can_continue):
    330420        child_can_start.wait()
    331421        for i in range(6):
     
    391481        proc.join()
    392482
    393     def _test_get(self, queue, child_can_start, parent_can_continue):
     483    @classmethod
     484    def _test_get(cls, queue, child_can_start, parent_can_continue):
    394485        child_can_start.wait()
    395486        #queue.put(1)
     
    452543        proc.join()
    453544
    454     def _test_fork(self, queue):
     545    @classmethod
     546    def _test_fork(cls, queue):
    455547        for i in range(10, 20):
    456548            queue.put(i)
     
    476568        # fork process
    477569        p = self.Process(target=self._test_fork, args=(queue,))
     570        p.daemon = True
    478571        p.start()
    479572
     
    500593        self.assertEqual(q.qsize(), 0)
    501594
    502     def _test_task_done(self, q):
     595    @classmethod
     596    def _test_task_done(cls, q):
    503597        for obj in iter(q.get, None):
    504598            time.sleep(DELTA)
     
    509603
    510604        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
    511             return
     605            self.skipTest("requires 'queue.task_done()' method")
    512606
    513607        workers = [self.Process(target=self._test_task_done, args=(queue,))
     
    515609
    516610        for p in workers:
     611            p.daemon = True
    517612            p.start()
    518613
     
    612707class _TestCondition(BaseTestCase):
    613708
    614     def f(self, cond, sleeping, woken, timeout=None):
     709    @classmethod
     710    def f(cls, cond, sleeping, woken, timeout=None):
    615711        cond.acquire()
    616712        sleeping.release()
     
    744840class _TestEvent(BaseTestCase):
    745841
    746     def _test_event(self, event):
     842    @classmethod
     843    def _test_event(cls, event):
    747844        time.sleep(TIMEOUT2)
    748845        event.set()
     
    752849        wait = TimingWrapper(event.wait)
    753850
    754         # Removed temporaily, due to API shear, this does not
     851        # Removed temporarily, due to API shear, this does not
    755852        # work with threading._Event objects. is_set == isSet
    756         #self.assertEqual(event.is_set(), False)
    757 
    758         self.assertEqual(wait(0.0), None)
     853        self.assertEqual(event.is_set(), False)
     854
     855        # Removed, threading.Event.wait() will return the value of the __flag
     856        # instead of None. API Shear with the semaphore backed mp.Event
     857        self.assertEqual(wait(0.0), False)
    759858        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    760         self.assertEqual(wait(TIMEOUT1), None)
     859        self.assertEqual(wait(TIMEOUT1), False)
    761860        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    762861
     
    764863
    765864        # See note above on the API differences
    766         # self.assertEqual(event.is_set(), True)
    767         self.assertEqual(wait(), None)
     865        self.assertEqual(event.is_set(), True)
     866        self.assertEqual(wait(), True)
    768867        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    769         self.assertEqual(wait(TIMEOUT1), None)
     868        self.assertEqual(wait(TIMEOUT1), True)
    770869        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    771870        # self.assertEqual(event.is_set(), True)
     
    775874        #self.assertEqual(event.is_set(), False)
    776875
    777         self.Process(target=self._test_event, args=(event,)).start()
    778         self.assertEqual(wait(), None)
     876        p = self.Process(target=self._test_event, args=(event,))
     877        p.daemon = True
     878        p.start()
     879        self.assertEqual(wait(), True)
    779880
    780881#
     
    783884
    784885class _TestValue(BaseTestCase):
     886
     887    ALLOWED_TYPES = ('processes',)
    785888
    786889    codes_values = [
     
    791894        ]
    792895
    793     def _test(self, values):
    794         for sv, cv in zip(values, self.codes_values):
     896    def setUp(self):
     897        if not HAS_SHAREDCTYPES:
     898            self.skipTest("requires multiprocessing.sharedctypes")
     899
     900    @classmethod
     901    def _test(cls, values):
     902        for sv, cv in zip(values, cls.codes_values):
    795903            sv.value = cv[2]
    796904
    797905
    798906    def test_value(self, raw=False):
    799         if self.TYPE != 'processes':
    800             return
    801 
    802907        if raw:
    803908            values = [self.RawValue(code, value)
     
    811916
    812917        proc = self.Process(target=self._test, args=(values,))
     918        proc.daemon = True
    813919        proc.start()
    814920        proc.join()
     
    821927
    822928    def test_getobj_getlock(self):
    823         if self.TYPE != 'processes':
    824             return
    825 
    826929        val1 = self.Value('i', 5)
    827930        lock1 = val1.get_lock()
     
    851954class _TestArray(BaseTestCase):
    852955
    853     def f(self, seq):
     956    ALLOWED_TYPES = ('processes',)
     957
     958    @classmethod
     959    def f(cls, seq):
    854960        for i in range(1, len(seq)):
    855961            seq[i] += seq[i-1]
    856962
     963    @unittest.skipIf(c_int is None, "requires _ctypes")
    857964    def test_array(self, raw=False):
    858         if self.TYPE != 'processes':
    859             return
    860 
    861965        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
    862966        if raw:
     
    876980
    877981        p = self.Process(target=self.f, args=(arr,))
     982        p.daemon = True
    878983        p.start()
    879984        p.join()
     
    881986        self.assertEqual(list(arr[:]), seq)
    882987
     988    @unittest.skipIf(c_int is None, "requires _ctypes")
     989    def test_array_from_size(self):
     990        size = 10
     991        # Test for zeroing (see issue #11675).
     992        # The repetition below strengthens the test by increasing the chances
     993        # of previously allocated non-zero memory being used for the new array
     994        # on the 2nd and 3rd loops.
     995        for _ in range(3):
     996            arr = self.Array('i', size)
     997            self.assertEqual(len(arr), size)
     998            self.assertEqual(list(arr), [0] * size)
     999            arr[:] = range(10)
     1000            self.assertEqual(list(arr), range(10))
     1001            del arr
     1002
     1003    @unittest.skipIf(c_int is None, "requires _ctypes")
    8831004    def test_rawarray(self):
    8841005        self.test_array(raw=True)
    8851006
     1007    @unittest.skipIf(c_int is None, "requires _ctypes")
     1008    def test_array_accepts_long(self):
     1009        arr = self.Array('i', 10L)
     1010        self.assertEqual(len(arr), 10)
     1011        raw_arr = self.RawArray('i', 10L)
     1012        self.assertEqual(len(raw_arr), 10)
     1013
     1014    @unittest.skipIf(c_int is None, "requires _ctypes")
    8861015    def test_getobj_getlock_obj(self):
    887         if self.TYPE != 'processes':
    888             return
    889 
    8901016        arr1 = self.Array('i', range(10))
    8911017        lock1 = arr1.get_lock()
     
    9921118                         map(sqr, range(100)))
    9931119
     1120    def test_map_chunksize(self):
     1121        try:
     1122            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
     1123        except multiprocessing.TimeoutError:
     1124            self.fail("pool.map_async with chunksize stalled on null list")
     1125
    9941126    def test_async(self):
    9951127        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
     
    10261158
    10271159    def test_make_pool(self):
     1160        self.assertRaises(ValueError, multiprocessing.Pool, -1)
     1161        self.assertRaises(ValueError, multiprocessing.Pool, 0)
     1162
    10281163        p = multiprocessing.Pool(3)
    10291164        self.assertEqual(3, len(p._pool))
     
    10481183        join()
    10491184        self.assertTrue(join.elapsed < 0.2)
     1185
     1186    def test_empty_iterable(self):
     1187        # See Issue 12157
     1188        p = self.Pool(1)
     1189
     1190        self.assertEqual(p.map(sqr, []), [])
     1191        self.assertEqual(list(p.imap(sqr, [])), [])
     1192        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
     1193        self.assertEqual(p.map_async(sqr, []).get(), [])
     1194
     1195        p.close()
     1196        p.join()
     1197
     1198def unpickleable_result():
     1199    return lambda: 42
     1200
     1201class _TestPoolWorkerErrors(BaseTestCase):
     1202    ALLOWED_TYPES = ('processes', )
     1203
     1204    def test_unpickleable_result(self):
     1205        from multiprocessing.pool import MaybeEncodingError
     1206        p = multiprocessing.Pool(2)
     1207
     1208        # Make sure we don't lose pool processes because of encoding errors.
     1209        for iteration in range(20):
     1210            res = p.apply_async(unpickleable_result)
     1211            self.assertRaises(MaybeEncodingError, res.get)
     1212
     1213        p.close()
     1214        p.join()
     1215
     1216class _TestPoolWorkerLifetime(BaseTestCase):
     1217
     1218    ALLOWED_TYPES = ('processes', )
     1219    def test_pool_worker_lifetime(self):
     1220        p = multiprocessing.Pool(3, maxtasksperchild=10)
     1221        self.assertEqual(3, len(p._pool))
     1222        origworkerpids = [w.pid for w in p._pool]
     1223        # Run many tasks so each worker gets replaced (hopefully)
     1224        results = []
     1225        for i in range(100):
     1226            results.append(p.apply_async(sqr, (i, )))
     1227        # Fetch the results and verify we got the right answers,
     1228        # also ensuring all the tasks have completed.
     1229        for (j, res) in enumerate(results):
     1230            self.assertEqual(res.get(), sqr(j))
     1231        # Refill the pool
     1232        p._repopulate_pool()
     1233        # Wait until all workers are alive
     1234        # (countdown * DELTA = 5 seconds max startup process time)
     1235        countdown = 50
     1236        while countdown and not all(w.is_alive() for w in p._pool):
     1237            countdown -= 1
     1238            time.sleep(DELTA)
     1239        finalworkerpids = [w.pid for w in p._pool]
     1240        # All pids should be assigned.  See issue #7805.
     1241        self.assertNotIn(None, origworkerpids)
     1242        self.assertNotIn(None, finalworkerpids)
     1243        # Finally, check that the worker pids have changed
     1244        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
     1245        p.close()
     1246        p.join()
     1247
     1248    def test_pool_worker_lifetime_early_close(self):
     1249        # Issue #10332: closing a pool whose workers have limited lifetimes
     1250        # before all the tasks completed would make join() hang.
     1251        p = multiprocessing.Pool(3, maxtasksperchild=1)
     1252        results = []
     1253        for i in range(6):
     1254            results.append(p.apply_async(sqr, (i, 0.3)))
     1255        p.close()
     1256        p.join()
     1257        # check the results
     1258        for (j, res) in enumerate(results):
     1259            self.assertEqual(res.get(), sqr(j))
     1260
     1261
    10501262#
    10511263# Test that manager has expected number of shared objects left
     
    10641276        gc.collect()                       # do garbage collection
    10651277        refs = self.manager._number_of_objects()
     1278        debug_info = self.manager._debug_info()
    10661279        if refs != EXPECTED_NUMBER:
    10671280            print self.manager._debug_info()
     1281            print debug_info
    10681282
    10691283        self.assertEqual(refs, EXPECTED_NUMBER)
     
    11591373    ALLOWED_TYPES = ('manager',)
    11601374
    1161     def _putter(self, address, authkey):
     1375    @classmethod
     1376    def _putter(cls, address, authkey):
    11621377        manager = QueueManager2(
    11631378            address=address, authkey=authkey, serializer=SERIALIZER
     
    11711386
    11721387        manager = QueueManager(
    1173             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
     1388            address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
    11741389            )
    11751390        manager.start()
    11761391
    11771392        p = self.Process(target=self._putter, args=(manager.address, authkey))
     1393        p.daemon = True
    11781394        p.start()
    11791395
     
    11971413class _TestManagerRestart(BaseTestCase):
    11981414
    1199     def _putter(self, address, authkey):
     1415    @classmethod
     1416    def _putter(cls, address, authkey):
    12001417        manager = QueueManager(
    12011418            address=address, authkey=authkey, serializer=SERIALIZER)
     
    12061423    def test_rapid_restart(self):
    12071424        authkey = os.urandom(32)
    1208         port = test_support.find_unused_port()
    12091425        manager = QueueManager(
    1210             address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
     1426            address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
     1427        srvr = manager.get_server()
     1428        addr = srvr.address
     1429        # Close the connection.Listener socket which gets opened as a part
     1430        # of manager.get_server(). It's not needed for the test.
     1431        srvr.listener.close()
    12111432        manager.start()
    12121433
    12131434        p = self.Process(target=self._putter, args=(manager.address, authkey))
     1435        p.daemon = True
    12141436        p.start()
    12151437        queue = manager.get_queue()
     
    12181440        manager.shutdown()
    12191441        manager = QueueManager(
    1220             address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
     1442            address=addr, authkey=authkey, serializer=SERIALIZER)
    12211443        manager.start()
    12221444        manager.shutdown()
     
    12321454    ALLOWED_TYPES = ('processes', 'threads')
    12331455
    1234     def _echo(self, conn):
     1456    @classmethod
     1457    def _echo(cls, conn):
    12351458        for msg in iter(conn.recv_bytes, SENTINEL):
    12361459            conn.send_bytes(msg)
     
    12911514
    12921515        conn.send(None)
     1516        time.sleep(.1)
    12931517
    12941518        self.assertEqual(poll(TIMEOUT1), True)
     
    13341558
    13351559        p = self.Process(target=self._echo, args=(child_conn,))
     1560        p.daemon = True
    13361561        p.start()
    13371562        child_conn.close()    # this might complete before child initializes
     
    13771602        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
    13781603
     1604    @classmethod
     1605    def _is_fd_assigned(cls, fd):
     1606        try:
     1607            os.fstat(fd)
     1608        except OSError as e:
     1609            if e.errno == errno.EBADF:
     1610                return False
     1611            raise
     1612        else:
     1613            return True
     1614
     1615    @classmethod
     1616    def _writefd(cls, conn, data, create_dummy_fds=False):
     1617        if create_dummy_fds:
     1618            for i in range(0, 256):
     1619                if not cls._is_fd_assigned(i):
     1620                    os.dup2(conn.fileno(), i)
     1621        fd = reduction.recv_handle(conn)
     1622        if msvcrt:
     1623            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
     1624        os.write(fd, data)
     1625        os.close(fd)
     1626
     1627    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
     1628    def test_fd_transfer(self):
     1629        if self.TYPE != 'processes':
     1630            self.skipTest("only makes sense with processes")
     1631        conn, child_conn = self.Pipe(duplex=True)
     1632
     1633        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
     1634        p.daemon = True
     1635        p.start()
     1636        with open(test_support.TESTFN, "wb") as f:
     1637            fd = f.fileno()
     1638            if msvcrt:
     1639                fd = msvcrt.get_osfhandle(fd)
     1640            reduction.send_handle(conn, fd, p.pid)
     1641        p.join()
     1642        with open(test_support.TESTFN, "rb") as f:
     1643            self.assertEqual(f.read(), b"foo")
     1644
     1645    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
     1646    @unittest.skipIf(sys.platform == "win32",
     1647                     "test semantics don't make sense on Windows")
     1648    @unittest.skipIf(MAXFD <= 256,
     1649                     "largest assignable fd number is too small")
     1650    @unittest.skipUnless(hasattr(os, "dup2"),
     1651                         "test needs os.dup2()")
     1652    def test_large_fd_transfer(self):
     1653        # With fd > 256 (issue #11657)
     1654        if self.TYPE != 'processes':
     1655            self.skipTest("only makes sense with processes")
     1656        conn, child_conn = self.Pipe(duplex=True)
     1657
     1658        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
     1659        p.daemon = True
     1660        p.start()
     1661        with open(test_support.TESTFN, "wb") as f:
     1662            fd = f.fileno()
     1663            for newfd in range(256, MAXFD):
     1664                if not self._is_fd_assigned(newfd):
     1665                    break
     1666            else:
     1667                self.fail("could not find an unassigned large file descriptor")
     1668            os.dup2(fd, newfd)
     1669            try:
     1670                reduction.send_handle(conn, newfd, p.pid)
     1671            finally:
     1672                os.close(newfd)
     1673        p.join()
     1674        with open(test_support.TESTFN, "rb") as f:
     1675            self.assertEqual(f.read(), b"bar")
     1676
     1677    @classmethod
     1678    def _send_data_without_fd(self, conn):
     1679        os.write(conn.fileno(), b"\0")
     1680
     1681    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
     1682    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
     1683    def test_missing_fd_transfer(self):
     1684        # Check that exception is raised when received data is not
     1685        # accompanied by a file descriptor in ancillary data.
     1686        if self.TYPE != 'processes':
     1687            self.skipTest("only makes sense with processes")
     1688        conn, child_conn = self.Pipe(duplex=True)
     1689
     1690        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
     1691        p.daemon = True
     1692        p.start()
     1693        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
     1694        p.join()
     1695
    13791696class _TestListenerClient(BaseTestCase):
    13801697
    13811698    ALLOWED_TYPES = ('processes', 'threads')
    13821699
    1383     def _test(self, address):
    1384         conn = self.connection.Client(address)
     1700    @classmethod
     1701    def _test(cls, address):
     1702        conn = cls.connection.Client(address)
    13851703        conn.send('hello')
    13861704        conn.close()
     
    13961714            p.join()
    13971715            l.close()
     1716
     1717    def test_issue14725(self):
     1718        l = self.connection.Listener()
     1719        p = self.Process(target=self._test, args=(l.address,))
     1720        p.daemon = True
     1721        p.start()
     1722        time.sleep(1)
     1723        # On Windows the client process should by now have connected,
     1724        # written data and closed the pipe handle by now.  This causes
     1725        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
     1726        # 14725.
     1727        conn = l.accept()
     1728        self.assertEqual(conn.recv(), 'hello')
     1729        conn.close()
     1730        p.join()
     1731        l.close()
     1732
    13981733#
    13991734# Test of sending connection and socket objects between processes
     
    14461781        lconn, lconn0 = self.Pipe()
    14471782        lp = self.Process(target=self._listener, args=(lconn0, families))
     1783        lp.daemon = True
    14481784        lp.start()
    14491785        lconn0.close()
     
    14511787        rconn, rconn0 = self.Pipe()
    14521788        rp = self.Process(target=self._remote, args=(rconn0,))
     1789        rp.daemon = True
    14531790        rp.start()
    14541791        rconn0.close()
     
    15101847        all = []
    15111848        occupied = 0
     1849        heap._lock.acquire()
     1850        self.addCleanup(heap._lock.release)
    15121851        for L in heap._len_to_seq.values():
    15131852            for arena, start, stop in L:
     
    15271866                            (stop == nstart))
    15281867
    1529 #
    1530 #
    1531 #
    1532 
    1533 try:
    1534     from ctypes import Structure, Value, copy, c_int, c_double
    1535 except ImportError:
    1536     Structure = object
    1537     c_int = c_double = None
     1868    def test_free_from_gc(self):
     1869        # Check that freeing of blocks by the garbage collector doesn't deadlock
     1870        # (issue #12352).
     1871        # Make sure the GC is enabled, and set lower collection thresholds to
     1872        # make collections more frequent (and increase the probability of
     1873        # deadlock).
     1874        if not gc.isenabled():
     1875            gc.enable()
     1876            self.addCleanup(gc.disable)
     1877        thresholds = gc.get_threshold()
     1878        self.addCleanup(gc.set_threshold, *thresholds)
     1879        gc.set_threshold(10)
     1880
     1881        # perform numerous block allocations, with cyclic references to make
     1882        # sure objects are collected asynchronously by the gc
     1883        for i in range(5000):
     1884            a = multiprocessing.heap.BufferWrapper(1)
     1885            b = multiprocessing.heap.BufferWrapper(1)
     1886            # circular references
     1887            a.buddy = b
     1888            b.buddy = a
     1889
     1890#
     1891#
     1892#
    15381893
    15391894class _Foo(Structure):
     
    15471902    ALLOWED_TYPES = ('processes',)
    15481903
    1549     def _double(self, x, y, foo, arr, string):
     1904    def setUp(self):
     1905        if not HAS_SHAREDCTYPES:
     1906            self.skipTest("requires multiprocessing.sharedctypes")
     1907
     1908    @classmethod
     1909    def _double(cls, x, y, foo, arr, string):
    15501910        x.value *= 2
    15511911        y.value *= 2
     
    15571917
    15581918    def test_sharedctypes(self, lock=False):
    1559         if c_int is None:
    1560             return
    1561 
    15621919        x = Value('i', 7, lock=lock)
    15631920        y = Value(c_double, 1.0/3.0, lock=lock)
     
    15651922        arr = self.Array('d', range(10), lock=lock)
    15661923        string = self.Array('c', 20, lock=lock)
    1567         string.value = 'hello'
     1924        string.value = latin('hello')
    15681925
    15691926        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
     1927        p.daemon = True
    15701928        p.start()
    15711929        p.join()
     
    15831941
    15841942    def test_copy(self):
    1585         if c_int is None:
    1586             return
    1587 
    15881943        foo = _Foo(2, 5.0)
    15891944        bar = copy(foo)
     
    16011956    ALLOWED_TYPES = ('processes',)
    16021957
    1603     def _test_finalize(self, conn):
     1958    @classmethod
     1959    def _test_finalize(cls, conn):
    16041960        class Foo(object):
    16051961            pass
     
    16321988        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
    16331989
    1634         # call mutliprocessing's cleanup function then exit process without
     1990        # call multiprocessing's cleanup function then exit process without
    16351991        # garbage collecting locals
    16361992        util._exit_function()
     
    16421998
    16431999        p = self.Process(target=self._test_finalize, args=(child_conn,))
     2000        p.daemon = True
    16442001        p.start()
    16452002        p.join()
     
    16572014
    16582015    def test_import(self):
    1659         modules = (
     2016        modules = [
    16602017            'multiprocessing', 'multiprocessing.connection',
    16612018            'multiprocessing.heap', 'multiprocessing.managers',
    16622019            'multiprocessing.pool', 'multiprocessing.process',
    1663             'multiprocessing.reduction', 'multiprocessing.sharedctypes',
    16642020            'multiprocessing.synchronize', 'multiprocessing.util'
    1665             )
     2021            ]
     2022
     2023        if HAS_REDUCTION:
     2024            modules.append('multiprocessing.reduction')
     2025
     2026        if c_int is not None:
     2027            # This module requires _ctypes
     2028            modules.append('multiprocessing.sharedctypes')
    16662029
    16672030        for name in modules:
     
    16912054        logger.setLevel(LOG_LEVEL)
    16922055
    1693     def _test_level(self, conn):
     2056    @classmethod
     2057    def _test_level(cls, conn):
    16942058        logger = multiprocessing.get_logger()
    16952059        conn.send(logger.getEffectiveLevel())
     
    17062070
    17072071        logger.setLevel(LEVEL1)
    1708         self.Process(target=self._test_level, args=(writer,)).start()
     2072        p = self.Process(target=self._test_level, args=(writer,))
     2073        p.daemon = True
     2074        p.start()
    17092075        self.assertEqual(LEVEL1, reader.recv())
    17102076
    17112077        logger.setLevel(logging.NOTSET)
    17122078        root_logger.setLevel(LEVEL2)
    1713         self.Process(target=self._test_level, args=(writer,)).start()
     2079        p = self.Process(target=self._test_level, args=(writer,))
     2080        p.daemon = True
     2081        p.start()
    17142082        self.assertEqual(LEVEL2, reader.recv())
    17152083
     
    17172085        logger.setLevel(level=LOG_LEVEL)
    17182086
     2087
     2088# class _TestLoggingProcessName(BaseTestCase):
     2089#
     2090#     def handle(self, record):
     2091#         assert record.processName == multiprocessing.current_process().name
     2092#         self.__handled = True
     2093#
     2094#     def test_logging(self):
     2095#         handler = logging.Handler()
     2096#         handler.handle = self.handle
     2097#         self.__handled = False
     2098#         # Bypass getLogger() and side-effects
     2099#         logger = logging.getLoggerClass()(
     2100#                 'multiprocessing.test.TestLoggingProcessName')
     2101#         logger.addHandler(handler)
     2102#         logger.propagate = False
     2103#
     2104#         logger.warn('foo')
     2105#         assert self.__handled
     2106
     2107#
     2108# Check that Process.join() retries if os.waitpid() fails with EINTR
     2109#
     2110
     2111class _TestPollEintr(BaseTestCase):
     2112
     2113    ALLOWED_TYPES = ('processes',)
     2114
     2115    @classmethod
     2116    def _killer(cls, pid):
     2117        time.sleep(0.5)
     2118        os.kill(pid, signal.SIGUSR1)
     2119
     2120    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
     2121    def test_poll_eintr(self):
     2122        got_signal = [False]
     2123        def record(*args):
     2124            got_signal[0] = True
     2125        pid = os.getpid()
     2126        oldhandler = signal.signal(signal.SIGUSR1, record)
     2127        try:
     2128            killer = self.Process(target=self._killer, args=(pid,))
     2129            killer.start()
     2130            p = self.Process(target=time.sleep, args=(1,))
     2131            p.start()
     2132            p.join()
     2133            self.assertTrue(got_signal[0])
     2134            self.assertEqual(p.exitcode, 0)
     2135            killer.join()
     2136        finally:
     2137            signal.signal(signal.SIGUSR1, oldhandler)
     2138
    17192139#
    17202140# Test to verify handle verification, see issue 3321
     
    17232143class TestInvalidHandle(unittest.TestCase):
    17242144
     2145    @unittest.skipIf(WIN32, "skipped on Windows")
    17252146    def test_invalid_handles(self):
    1726         if WIN32:
    1727             return
    17282147        conn = _multiprocessing.Connection(44977608)
    17292148        self.assertRaises(IOError, conn.poll)
    17302149        self.assertRaises(IOError, _multiprocessing.Connection, -1)
     2150
    17312151#
    17322152# Functions used to create test cases from the base ones in this module
     
    17452165    result = {}
    17462166    glob = globals()
    1747     Type = type[0].upper() + type[1:]
     2167    Type = type.capitalize()
    17482168
    17492169    for name in glob.keys():
     
    17702190        'Condition', 'Event', 'Value', 'Array', 'RawValue',
    17712191        'RawArray', 'current_process', 'active_children', 'Pipe',
    1772         'connection', 'JoinableQueue'
     2192        'connection', 'JoinableQueue', 'Pool'
    17732193        )))
    17742194
     
    17842204        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
    17852205       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
    1786         'Namespace', 'JoinableQueue'
     2206        'Namespace', 'JoinableQueue', 'Pool'
    17872207        )))
    17882208
     
    17982218        'Condition', 'Event', 'Value', 'Array', 'current_process',
    17992219        'active_children', 'Pipe', 'connection', 'dict', 'list',
    1800         'Namespace', 'JoinableQueue'
     2220        'Namespace', 'JoinableQueue', 'Pool'
    18012221        )))
    18022222
     
    18342254
    18352255#
     2256# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
     2257#
     2258
     2259def initializer(ns):
     2260    ns.test += 1
     2261
     2262class TestInitializers(unittest.TestCase):
     2263    def setUp(self):
     2264        self.mgr = multiprocessing.Manager()
     2265        self.ns = self.mgr.Namespace()
     2266        self.ns.test = 0
     2267
     2268    def tearDown(self):
     2269        self.mgr.shutdown()
     2270
     2271    def test_manager_initializer(self):
     2272        m = multiprocessing.managers.SyncManager()
     2273        self.assertRaises(TypeError, m.start, 1)
     2274        m.start(initializer, (self.ns,))
     2275        self.assertEqual(self.ns.test, 1)
     2276        m.shutdown()
     2277
     2278    def test_pool_initializer(self):
     2279        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
     2280        p = multiprocessing.Pool(1, initializer, (self.ns,))
     2281        p.close()
     2282        p.join()
     2283        self.assertEqual(self.ns.test, 1)
     2284
     2285#
    18362286# Issue 5155, 5313, 5331: Test process in processes
    18372287# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
    18382288#
    18392289
    1840 def _ThisSubProcess(q):
     2290def _this_sub_process(q):
    18412291    try:
    18422292        item = q.get(block=False)
     
    18442294        pass
    18452295
    1846 def _TestProcess(q):
     2296def _test_process(q):
    18472297    queue = multiprocessing.Queue()
    1848     subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
     2298    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
     2299    subProc.daemon = True
    18492300    subProc.start()
    18502301    subProc.join()
     
    18822333    def test_queue_in_process(self):
    18832334        queue = multiprocessing.Queue()
    1884         proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
     2335        proc = multiprocessing.Process(target=_test_process, args=(queue,))
    18852336        proc.start()
    18862337        proc.join()
     
    18992350        assert sio.getvalue() == 'foo'
    19002351
    1901 testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor]
     2352#
     2353# Test interaction with socket timeouts - see Issue #6056
     2354#
     2355
     2356class TestTimeouts(unittest.TestCase):
     2357    @classmethod
     2358    def _test_timeout(cls, child, address):
     2359        time.sleep(1)
     2360        child.send(123)
     2361        child.close()
     2362        conn = multiprocessing.connection.Client(address)
     2363        conn.send(456)
     2364        conn.close()
     2365
     2366    def test_timeout(self):
     2367        old_timeout = socket.getdefaulttimeout()
     2368        try:
     2369            socket.setdefaulttimeout(0.1)
     2370            parent, child = multiprocessing.Pipe(duplex=True)
     2371            l = multiprocessing.connection.Listener(family='AF_INET')
     2372            p = multiprocessing.Process(target=self._test_timeout,
     2373                                        args=(child, l.address))
     2374            p.start()
     2375            child.close()
     2376            self.assertEqual(parent.recv(), 123)
     2377            parent.close()
     2378            conn = l.accept()
     2379            self.assertEqual(conn.recv(), 456)
     2380            conn.close()
     2381            l.close()
     2382            p.join(10)
     2383        finally:
     2384            socket.setdefaulttimeout(old_timeout)
     2385
     2386#
     2387# Test what happens with no "if __name__ == '__main__'"
     2388#
     2389
     2390class TestNoForkBomb(unittest.TestCase):
     2391    def test_noforkbomb(self):
     2392        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
     2393        if WIN32:
     2394            rc, out, err = test.script_helper.assert_python_failure(name)
     2395            self.assertEqual('', out.decode('ascii'))
     2396            self.assertIn('RuntimeError', err.decode('ascii'))
     2397        else:
     2398            rc, out, err = test.script_helper.assert_python_ok(name)
     2399            self.assertEqual('123', out.decode('ascii').rstrip())
     2400            self.assertEqual('', err.decode('ascii'))
     2401
     2402#
     2403# Issue 12098: check sys.flags of child matches that for parent
     2404#
     2405
     2406class TestFlags(unittest.TestCase):
     2407    @classmethod
     2408    def run_in_grandchild(cls, conn):
     2409        conn.send(tuple(sys.flags))
     2410
     2411    @classmethod
     2412    def run_in_child(cls):
     2413        import json
     2414        r, w = multiprocessing.Pipe(duplex=False)
     2415        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
     2416        p.start()
     2417        grandchild_flags = r.recv()
     2418        p.join()
     2419        r.close()
     2420        w.close()
     2421        flags = (tuple(sys.flags), grandchild_flags)
     2422        print(json.dumps(flags))
     2423
     2424    def test_flags(self):
     2425        import json, subprocess
     2426        # start child process using unusual flags
     2427        prog = ('from test.test_multiprocessing import TestFlags; ' +
     2428                'TestFlags.run_in_child()')
     2429        data = subprocess.check_output(
     2430            [sys.executable, '-E', '-B', '-O', '-c', prog])
     2431        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
     2432        self.assertEqual(child_flags, grandchild_flags)
     2433
     2434#
     2435# Issue #17555: ForkAwareThreadLock
     2436#
     2437
     2438class TestForkAwareThreadLock(unittest.TestCase):
     2439    # We recurisvely start processes.  Issue #17555 meant that the
     2440    # after fork registry would get duplicate entries for the same
     2441    # lock.  The size of the registry at generation n was ~2**n.
     2442
     2443    @classmethod
     2444    def child(cls, n, conn):
     2445        if n > 1:
     2446            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
     2447            p.start()
     2448            p.join()
     2449        else:
     2450            conn.send(len(util._afterfork_registry))
     2451        conn.close()
     2452
     2453    def test_lock(self):
     2454        r, w = multiprocessing.Pipe(False)
     2455        l = util.ForkAwareThreadLock()
     2456        old_size = len(util._afterfork_registry)
     2457        p = multiprocessing.Process(target=self.child, args=(5, w))
     2458        p.start()
     2459        new_size = r.recv()
     2460        p.join()
     2461        self.assertLessEqual(new_size, old_size)
     2462
     2463#
     2464# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
     2465#
     2466
     2467class TestIgnoreEINTR(unittest.TestCase):
     2468
     2469    @classmethod
     2470    def _test_ignore(cls, conn):
     2471        def handler(signum, frame):
     2472            pass
     2473        signal.signal(signal.SIGUSR1, handler)
     2474        conn.send('ready')
     2475        x = conn.recv()
     2476        conn.send(x)
     2477        conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
     2478
     2479    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
     2480    def test_ignore(self):
     2481        conn, child_conn = multiprocessing.Pipe()
     2482        try:
     2483            p = multiprocessing.Process(target=self._test_ignore,
     2484                                        args=(child_conn,))
     2485            p.daemon = True
     2486            p.start()
     2487            child_conn.close()
     2488            self.assertEqual(conn.recv(), 'ready')
     2489            time.sleep(0.1)
     2490            os.kill(p.pid, signal.SIGUSR1)
     2491            time.sleep(0.1)
     2492            conn.send(1234)
     2493            self.assertEqual(conn.recv(), 1234)
     2494            time.sleep(0.1)
     2495            os.kill(p.pid, signal.SIGUSR1)
     2496            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
     2497            time.sleep(0.1)
     2498            p.join()
     2499        finally:
     2500            conn.close()
     2501
     2502    @classmethod
     2503    def _test_ignore_listener(cls, conn):
     2504        def handler(signum, frame):
     2505            pass
     2506        signal.signal(signal.SIGUSR1, handler)
     2507        l = multiprocessing.connection.Listener()
     2508        conn.send(l.address)
     2509        a = l.accept()
     2510        a.send('welcome')
     2511
     2512    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
     2513    def test_ignore_listener(self):
     2514        conn, child_conn = multiprocessing.Pipe()
     2515        try:
     2516            p = multiprocessing.Process(target=self._test_ignore_listener,
     2517                                        args=(child_conn,))
     2518            p.daemon = True
     2519            p.start()
     2520            child_conn.close()
     2521            address = conn.recv()
     2522            time.sleep(0.1)
     2523            os.kill(p.pid, signal.SIGUSR1)
     2524            time.sleep(0.1)
     2525            client = multiprocessing.connection.Client(address)
     2526            self.assertEqual(client.recv(), 'welcome')
     2527            p.join()
     2528        finally:
     2529            conn.close()
     2530
     2531#
     2532#
     2533#
     2534
     2535testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
     2536                   TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
     2537                   TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
    19022538
    19032539#
     
    19102546            lock = multiprocessing.RLock()
    19112547        except OSError:
    1912             from test.test_support import TestSkipped
    1913             raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
     2548            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
     2549
     2550    check_enough_semaphores()
    19142551
    19152552    if run is None:
     
    19352572    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
    19362573    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
    1937     run(suite)
     2574    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
     2575    # module during these tests is at least platform dependent and possibly
     2576    # non-deterministic on any given platform. So we don't mind if the listed
     2577    # warnings aren't actually raised.
     2578    with test_support.check_py3k_warnings(
     2579            (".+__(get|set)slice__ has been removed", DeprecationWarning),
     2580            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
     2581            quiet=True):
     2582        run(suite)
    19382583
    19392584    ThreadsMixin.pool.terminate()
Note: See TracChangeset for help on using the changeset viewer.