Changeset 391 for python/trunk/Lib/multiprocessing/pool.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/multiprocessing/pool.py
r2 r391 4 4 # multiprocessing/pool.py 5 5 # 6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 43 69 # 44 70 45 def worker(inqueue, outqueue, initializer=None, initargs=()): 71 class MaybeEncodingError(Exception): 72 """Wraps possible unpickleable errors, so they can be 73 safely sent through the socket.""" 74 75 def __init__(self, exc, value): 76 self.exc = repr(exc) 77 self.value = repr(value) 78 super(MaybeEncodingError, self).__init__(self.exc, self.value) 79 80 def __str__(self): 81 return "Error sending result: '%s'. Reason: '%s'" % (self.value, 82 self.exc) 83 84 def __repr__(self): 85 return "<MaybeEncodingError: %s>" % str(self) 86 87 88 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): 89 assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) 46 90 put = outqueue.put 47 91 get = inqueue.get … … 53 97 initializer(*initargs) 54 98 55 while 1: 99 completed = 0 100 while maxtasks is None or (maxtasks and completed < maxtasks): 56 101 try: 57 102 task = get() … … 69 114 except Exception, e: 70 115 result = (False, e) 71 put((job, i, result)) 116 try: 117 put((job, i, result)) 118 except Exception as e: 119 wrapped = MaybeEncodingError(e, result[1]) 120 debug("Possible encoding error while sending result: %s" % ( 121 wrapped)) 122 put((job, i, (False, wrapped))) 123 completed += 1 124 debug('worker exiting after %d tasks' % completed) 72 125 73 126 # … … 81 134 Process = Process 82 135 83 def __init__(self, processes=None, initializer=None, initargs=()): 136 def __init__(self, processes=None, initializer=None, initargs=(), 137 maxtasksperchild=None): 84 138 self._setup_queues() 85 139 self._taskqueue = Queue.Queue() 86 140 self._cache = {} 87 141 self._state = RUN 142 self._maxtasksperchild = maxtasksperchild 143 self._initializer = initializer 144 self._initargs = initargs 88 145 89 146 if processes is None: … … 92 149 except NotImplementedError: 93 150 processes = 1 94 151 if processes < 1: 152 raise ValueError("Number of processes must be at least 1") 153 154 if initializer is not None and not hasattr(initializer, '__call__'): 155 raise TypeError('initializer must be a callable') 156 157 self._processes = processes 95 158 self._pool = [] 96 for i in range(processes): 97 w = self.Process( 98 target=worker, 99 args=(self._inqueue, self._outqueue, initializer, initargs) 100 ) 101 self._pool.append(w) 102 w.name = w.name.replace('Process', 'PoolWorker') 103 w.daemon = True 104 w.start() 159 self._repopulate_pool() 160 161 self._worker_handler = threading.Thread( 162 target=Pool._handle_workers, 163 args=(self, ) 164 ) 165 self._worker_handler.daemon = True 166 self._worker_handler._state = RUN 167 self._worker_handler.start() 168 105 169 106 170 self._task_handler = threading.Thread( … … 123 187 self, self._terminate_pool, 124 188 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 125 self._task_handler, self._result_handler, self._cache), 189 self._worker_handler, self._task_handler, 190 self._result_handler, self._cache), 126 191 exitpriority=15 127 192 ) 193 194 def _join_exited_workers(self): 195 """Cleanup after any worker processes which have exited due to reaching 196 their specified lifetime. Returns True if any workers were cleaned up. 197 """ 198 cleaned = False 199 for i in reversed(range(len(self._pool))): 200 worker = self._pool[i] 201 if worker.exitcode is not None: 202 # worker exited 203 debug('cleaning up worker %d' % i) 204 worker.join() 205 cleaned = True 206 del self._pool[i] 207 return cleaned 208 209 def _repopulate_pool(self): 210 """Bring the number of pool processes up to the specified number, 211 for use after reaping workers which have exited. 212 """ 213 for i in range(self._processes - len(self._pool)): 214 w = self.Process(target=worker, 215 args=(self._inqueue, self._outqueue, 216 self._initializer, 217 self._initargs, self._maxtasksperchild) 218 ) 219 self._pool.append(w) 220 w.name = w.name.replace('Process', 'PoolWorker') 221 w.daemon = True 222 w.start() 223 debug('added worker') 224 225 def _maintain_pool(self): 226 """Clean up any exited workers and start replacements for them. 227 """ 228 if self._join_exited_workers(): 229 self._repopulate_pool() 128 230 129 231 def _setup_queues(self): … … 205 307 if extra: 206 308 chunksize += 1 309 if len(iterable) == 0: 310 chunksize = 0 207 311 208 312 task_batches = Pool._get_tasks(func, iterable, chunksize) … … 211 315 for i, x in enumerate(task_batches)), None)) 212 316 return result 317 318 @staticmethod 319 def _handle_workers(pool): 320 thread = threading.current_thread() 321 322 # Keep maintaining workers until the cache gets drained, unless the pool 323 # is terminated. 324 while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 325 pool._maintain_pool() 326 time.sleep(0.1) 327 # send sentinel to stop workers 328 pool._taskqueue.put(None) 329 debug('worker handler exiting') 213 330 214 331 @staticmethod … … 327 444 if self._state == RUN: 328 445 self._state = CLOSE 329 self._ taskqueue.put(None)446 self._worker_handler._state = CLOSE 330 447 331 448 def terminate(self): 332 449 debug('terminating pool') 333 450 self._state = TERMINATE 451 self._worker_handler._state = TERMINATE 334 452 self._terminate() 335 453 … … 337 455 debug('joining pool') 338 456 assert self._state in (CLOSE, TERMINATE) 457 self._worker_handler.join() 339 458 self._task_handler.join() 340 459 self._result_handler.join() … … 353 472 @classmethod 354 473 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 355 task_handler, result_handler, cache):474 worker_handler, task_handler, result_handler, cache): 356 475 # this is guaranteed to only be called once 357 476 debug('finalizing pool') 358 477 478 worker_handler._state = TERMINATE 359 479 task_handler._state = TERMINATE 360 taskqueue.put(None) # sentinel361 480 362 481 debug('helping task handler/workers to finish') … … 368 487 outqueue.put(None) # sentinel 369 488 489 # We must wait for the worker handler to exit before terminating 490 # workers because we don't want workers to be restarted behind our back. 491 debug('joining worker handler') 492 if threading.current_thread() is not worker_handler: 493 worker_handler.join(1e100) 494 495 # Terminate workers which haven't already finished. 370 496 if pool and hasattr(pool[0], 'terminate'): 371 497 debug('terminating workers') 372 498 for p in pool: 373 p.terminate() 499 if p.exitcode is None: 500 p.terminate() 374 501 375 502 debug('joining task handler') 376 task_handler.join(1e100) 503 if threading.current_thread() is not task_handler: 504 task_handler.join(1e100) 377 505 378 506 debug('joining result handler') 379 result_handler.join(1e100) 507 if threading.current_thread() is not result_handler: 508 result_handler.join(1e100) 380 509 381 510 if pool and hasattr(pool[0], 'terminate'): 382 511 debug('joining pool workers') 383 512 for p in pool: 384 p.join() 513 if p.is_alive(): 514 # worker has not yet exited 515 debug('cleaning up worker %d' % p.pid) 516 p.join() 385 517 386 518 # … … 434 566 del self._cache[self._job] 435 567 568 AsyncResult = ApplyResult # create alias -- see #17805 569 436 570 # 437 571 # Class whose instances are returned by `Pool.map_async()` … … 448 582 self._number_left = 0 449 583 self._ready = True 584 del cache[self._job] 450 585 else: 451 586 self._number_left = length//chunksize + bool(length % chunksize)
Note:
See TracChangeset
for help on using the changeset viewer.