Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/multiprocessing/pool.py

    r2 r391  
    44# multiprocessing/pool.py
    55#
    6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
     6# Copyright (c) 2006-2008, R Oudkerk
     7# All rights reserved.
     8#
     9# Redistribution and use in source and binary forms, with or without
     10# modification, are permitted provided that the following conditions
     11# are met:
     12#
     13# 1. Redistributions of source code must retain the above copyright
     14#    notice, this list of conditions and the following disclaimer.
     15# 2. Redistributions in binary form must reproduce the above copyright
     16#    notice, this list of conditions and the following disclaimer in the
     17#    documentation and/or other materials provided with the distribution.
     18# 3. Neither the name of author nor the names of any contributors may be
     19#    used to endorse or promote products derived from this software
     20#    without specific prior written permission.
     21#
     22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32# SUCH DAMAGE.
    733#
    834
     
    4369#
    4470
    45 def worker(inqueue, outqueue, initializer=None, initargs=()):
     71class MaybeEncodingError(Exception):
     72    """Wraps possible unpickleable errors, so they can be
     73    safely sent through the socket."""
     74
     75    def __init__(self, exc, value):
     76        self.exc = repr(exc)
     77        self.value = repr(value)
     78        super(MaybeEncodingError, self).__init__(self.exc, self.value)
     79
     80    def __str__(self):
     81        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
     82                                                             self.exc)
     83
     84    def __repr__(self):
     85        return "<MaybeEncodingError: %s>" % str(self)
     86
     87
     88def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     89    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
    4690    put = outqueue.put
    4791    get = inqueue.get
     
    5397        initializer(*initargs)
    5498
    55     while 1:
     99    completed = 0
     100    while maxtasks is None or (maxtasks and completed < maxtasks):
    56101        try:
    57102            task = get()
     
    69114        except Exception, e:
    70115            result = (False, e)
    71         put((job, i, result))
     116        try:
     117            put((job, i, result))
     118        except Exception as e:
     119            wrapped = MaybeEncodingError(e, result[1])
     120            debug("Possible encoding error while sending result: %s" % (
     121                wrapped))
     122            put((job, i, (False, wrapped)))
     123        completed += 1
     124    debug('worker exiting after %d tasks' % completed)
    72125
    73126#
     
    81134    Process = Process
    82135
    83     def __init__(self, processes=None, initializer=None, initargs=()):
     136    def __init__(self, processes=None, initializer=None, initargs=(),
     137                 maxtasksperchild=None):
    84138        self._setup_queues()
    85139        self._taskqueue = Queue.Queue()
    86140        self._cache = {}
    87141        self._state = RUN
     142        self._maxtasksperchild = maxtasksperchild
     143        self._initializer = initializer
     144        self._initargs = initargs
    88145
    89146        if processes is None:
     
    92149            except NotImplementedError:
    93150                processes = 1
    94 
     151        if processes < 1:
     152            raise ValueError("Number of processes must be at least 1")
     153
     154        if initializer is not None and not hasattr(initializer, '__call__'):
     155            raise TypeError('initializer must be a callable')
     156
     157        self._processes = processes
    95158        self._pool = []
    96         for i in range(processes):
    97             w = self.Process(
    98                 target=worker,
    99                 args=(self._inqueue, self._outqueue, initializer, initargs)
    100                 )
    101             self._pool.append(w)
    102             w.name = w.name.replace('Process', 'PoolWorker')
    103             w.daemon = True
    104             w.start()
     159        self._repopulate_pool()
     160
     161        self._worker_handler = threading.Thread(
     162            target=Pool._handle_workers,
     163            args=(self, )
     164            )
     165        self._worker_handler.daemon = True
     166        self._worker_handler._state = RUN
     167        self._worker_handler.start()
     168
    105169
    106170        self._task_handler = threading.Thread(
     
    123187            self, self._terminate_pool,
    124188            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
    125                   self._task_handler, self._result_handler, self._cache),
     189                  self._worker_handler, self._task_handler,
     190                  self._result_handler, self._cache),
    126191            exitpriority=15
    127192            )
     193
     194    def _join_exited_workers(self):
     195        """Cleanup after any worker processes which have exited due to reaching
     196        their specified lifetime.  Returns True if any workers were cleaned up.
     197        """
     198        cleaned = False
     199        for i in reversed(range(len(self._pool))):
     200            worker = self._pool[i]
     201            if worker.exitcode is not None:
     202                # worker exited
     203                debug('cleaning up worker %d' % i)
     204                worker.join()
     205                cleaned = True
     206                del self._pool[i]
     207        return cleaned
     208
     209    def _repopulate_pool(self):
     210        """Bring the number of pool processes up to the specified number,
     211        for use after reaping workers which have exited.
     212        """
     213        for i in range(self._processes - len(self._pool)):
     214            w = self.Process(target=worker,
     215                             args=(self._inqueue, self._outqueue,
     216                                   self._initializer,
     217                                   self._initargs, self._maxtasksperchild)
     218                            )
     219            self._pool.append(w)
     220            w.name = w.name.replace('Process', 'PoolWorker')
     221            w.daemon = True
     222            w.start()
     223            debug('added worker')
     224
     225    def _maintain_pool(self):
     226        """Clean up any exited workers and start replacements for them.
     227        """
     228        if self._join_exited_workers():
     229            self._repopulate_pool()
    128230
    129231    def _setup_queues(self):
     
    205307            if extra:
    206308                chunksize += 1
     309        if len(iterable) == 0:
     310            chunksize = 0
    207311
    208312        task_batches = Pool._get_tasks(func, iterable, chunksize)
     
    211315                              for i, x in enumerate(task_batches)), None))
    212316        return result
     317
     318    @staticmethod
     319    def _handle_workers(pool):
     320        thread = threading.current_thread()
     321
     322        # Keep maintaining workers until the cache gets drained, unless the pool
     323        # is terminated.
     324        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
     325            pool._maintain_pool()
     326            time.sleep(0.1)
     327        # send sentinel to stop workers
     328        pool._taskqueue.put(None)
     329        debug('worker handler exiting')
    213330
    214331    @staticmethod
     
    327444        if self._state == RUN:
    328445            self._state = CLOSE
    329             self._taskqueue.put(None)
     446            self._worker_handler._state = CLOSE
    330447
    331448    def terminate(self):
    332449        debug('terminating pool')
    333450        self._state = TERMINATE
     451        self._worker_handler._state = TERMINATE
    334452        self._terminate()
    335453
     
    337455        debug('joining pool')
    338456        assert self._state in (CLOSE, TERMINATE)
     457        self._worker_handler.join()
    339458        self._task_handler.join()
    340459        self._result_handler.join()
     
    353472    @classmethod
    354473    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
    355                         task_handler, result_handler, cache):
     474                        worker_handler, task_handler, result_handler, cache):
    356475        # this is guaranteed to only be called once
    357476        debug('finalizing pool')
    358477
     478        worker_handler._state = TERMINATE
    359479        task_handler._state = TERMINATE
    360         taskqueue.put(None)                 # sentinel
    361480
    362481        debug('helping task handler/workers to finish')
     
    368487        outqueue.put(None)                  # sentinel
    369488
     489        # We must wait for the worker handler to exit before terminating
     490        # workers because we don't want workers to be restarted behind our back.
     491        debug('joining worker handler')
     492        if threading.current_thread() is not worker_handler:
     493            worker_handler.join(1e100)
     494
     495        # Terminate workers which haven't already finished.
    370496        if pool and hasattr(pool[0], 'terminate'):
    371497            debug('terminating workers')
    372498            for p in pool:
    373                 p.terminate()
     499                if p.exitcode is None:
     500                    p.terminate()
    374501
    375502        debug('joining task handler')
    376         task_handler.join(1e100)
     503        if threading.current_thread() is not task_handler:
     504            task_handler.join(1e100)
    377505
    378506        debug('joining result handler')
    379         result_handler.join(1e100)
     507        if threading.current_thread() is not result_handler:
     508            result_handler.join(1e100)
    380509
    381510        if pool and hasattr(pool[0], 'terminate'):
    382511            debug('joining pool workers')
    383512            for p in pool:
    384                 p.join()
     513                if p.is_alive():
     514                    # worker has not yet exited
     515                    debug('cleaning up worker %d' % p.pid)
     516                    p.join()
    385517
    386518#
     
    434566        del self._cache[self._job]
    435567
     568AsyncResult = ApplyResult       # create alias -- see #17805
     569
    436570#
    437571# Class whose instances are returned by `Pool.map_async()`
     
    448582            self._number_left = 0
    449583            self._ready = True
     584            del cache[self._job]
    450585        else:
    451586            self._number_left = length//chunksize + bool(length % chunksize)
Note: See TracChangeset for help on using the changeset viewer.