Changeset 391 for python/trunk/Lib/multiprocessing
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/multiprocessing/__init__.py
r2 r391 10 10 # 11 11 # Try calling `multiprocessing.doc.main()` to read the html 12 # documentation in ina webbrowser.12 # documentation in a webbrowser. 13 13 # 14 14 # … … 39 39 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 40 40 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 41 # SUCH DAMAGE. 41 42 # 42 43 … … 116 117 num = 0 117 118 elif 'bsd' in sys.platform or sys.platform == 'darwin': 119 comm = '/sbin/sysctl -n hw.ncpu' 120 if sys.platform == 'darwin': 121 comm = '/usr' + comm 118 122 try: 119 num = int(os.popen('sysctl -n hw.ncpu').read()) 123 with os.popen(comm) as p: 124 num = int(p.read()) 120 125 except ValueError: 121 126 num = 0 … … 220 225 return JoinableQueue(maxsize) 221 226 222 def Pool(processes=None, initializer=None, initargs=() ):227 def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): 223 228 ''' 224 229 Returns a process pool object 225 230 ''' 226 231 from multiprocessing.pool import Pool 227 return Pool(processes, initializer, initargs )232 return Pool(processes, initializer, initargs, maxtasksperchild) 228 233 229 234 def RawValue(typecode_or_type, *args): -
python/trunk/Lib/multiprocessing/connection.py
r2 r391 4 4 # multiprocessing/connection.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 161 187 if duplex: 162 188 s1, s2 = socket.socketpair() 189 s1.setblocking(True) 190 s2.setblocking(True) 163 191 c1 = _multiprocessing.Connection(os.dup(s1.fileno())) 164 192 c2 = _multiprocessing.Connection(os.dup(s2.fileno())) … … 173 201 174 202 else: 175 176 from ._multiprocessing import win32 203 from _multiprocessing import win32 177 204 178 205 def Pipe(duplex=True): … … 224 251 def __init__(self, address, family, backlog=1): 225 252 self._socket = socket.socket(getattr(socket, family)) 226 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 227 self._socket.bind(address) 228 self._socket.listen(backlog) 229 self._address = self._socket.getsockname() 253 try: 254 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 255 self._socket.setblocking(True) 256 self._socket.bind(address) 257 self._socket.listen(backlog) 258 self._address = self._socket.getsockname() 259 except socket.error: 260 self._socket.close() 261 raise 230 262 self._family = family 231 263 self._last_accepted = None … … 239 271 240 272 def accept(self): 241 s, self._last_accepted = self._socket.accept() 273 while True: 274 try: 275 s, self._last_accepted = self._socket.accept() 276 except socket.error as e: 277 if e.args[0] != errno.EINTR: 278 raise 279 else: 280 break 281 s.setblocking(True) 242 282 fd = duplicate(s.fileno()) 243 283 conn = _multiprocessing.Connection(fd) … … 255 295 Return a connection object connected to the socket given by `address` 256 296 ''' 257 family = address_type(address) 258 s = socket.socket( getattr(socket, family) ) 297 family = getattr(socket, address_type(address)) 259 298 t = _init_timeout() 260 299 261 300 while 1: 301 s = socket.socket(family) 302 s.setblocking(True) 262 303 try: 263 304 s.connect(address) 264 305 except socket.error, e: 306 s.close() 265 307 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): 266 308 debug('failed to connect to address %s', address) … … 319 361 win32.ConnectNamedPipe(handle, win32.NULL) 320 362 except WindowsError, e: 321 if e.args[0] != win32.ERROR_PIPE_CONNECTED: 363 # ERROR_NO_DATA can occur if a client has already connected, 364 # written data and then disconnected -- see Issue 14725. 365 if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, 366 win32.ERROR_NO_DATA): 322 367 raise 323 368 return _multiprocessing.PipeConnection(handle) -
python/trunk/Lib/multiprocessing/dummy/__init__.py
r2 r391 4 4 # multiprocessing/dummy/__init__.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 45 71 assert self._parent is current_process() 46 72 self._start_called = True 47 self._parent._children[self] = None 73 if hasattr(self._parent, '_children'): 74 self._parent._children[self] = None 48 75 threading.Thread.start(self) 49 76 -
python/trunk/Lib/multiprocessing/dummy/connection.py
r2 r391 4 4 # multiprocessing/dummy/connection.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 -
python/trunk/Lib/multiprocessing/forking.py
r2 r391 4 4 # multiprocessing/forking.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 10 36 import sys 11 37 import signal 38 import errno 12 39 13 40 from multiprocessing import util, process … … 104 131 def poll(self, flag=os.WNOHANG): 105 132 if self.returncode is None: 106 pid, sts = os.waitpid(self.pid, flag) 133 while True: 134 try: 135 pid, sts = os.waitpid(self.pid, flag) 136 except os.error as e: 137 if e.errno == errno.EINTR: 138 continue 139 # Child process not yet created. See #1731717 140 # e.errno == errno.ECHILD == 10 141 return None 142 else: 143 break 107 144 if pid == self.pid: 108 145 if os.WIFSIGNALED(sts): … … 151 188 import time 152 189 153 from ._multiprocessing import win32, Connection, PipeConnection190 from _multiprocessing import win32, Connection, PipeConnection 154 191 from .util import Finalize 155 192 … … 168 205 TERMINATE = 0x10000 169 206 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) 207 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") 170 208 171 209 exit = win32.ExitProcess … … 177 215 # 178 216 179 if sys.executable.lower().endswith('pythonservice.exe'):217 if WINSERVICE: 180 218 _python_exe = os.path.join(sys.exec_prefix, 'python.exe') 181 219 else: … … 305 343 Returns prefix of command line used for spawning a child process 306 344 ''' 307 if process.current_process()._identity==() and is_forking(sys.argv):345 if getattr(process.current_process(), '_inheriting', False): 308 346 raise RuntimeError(''' 309 347 Attempt to start a new process before the current process … … 324 362 else: 325 363 prog = 'from multiprocessing.forking import main; main()' 326 return [_python_exe, '-c', prog, '--multiprocessing-fork'] 364 opts = util._args_from_interpreter_flags() 365 return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] 327 366 328 367 329 368 def main(): 330 369 ''' 331 Run code specif ed by data received over pipe370 Run code specified by data received over pipe 332 371 ''' 333 372 assert is_forking(sys.argv) … … 367 406 d['log_level'] = _logger.getEffectiveLevel() 368 407 369 if not WINEXE :408 if not WINEXE and not WINSERVICE: 370 409 main_path = getattr(sys.modules['__main__'], '__file__', None) 371 410 if not main_path and sys.argv[0] not in ('', '-c'): -
python/trunk/Lib/multiprocessing/heap.py
r2 r391 4 4 # multiprocessing/heap.py 5 5 # 6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 27 53 if sys.platform == 'win32': 28 54 29 from ._multiprocessing import win3255 from _multiprocessing import win32 30 56 31 57 class Arena(object): … … 76 102 self._allocated_blocks = set() 77 103 self._arenas = [] 104 # list of pending blocks to free - see free() comment below 105 self._pending_free_blocks = [] 78 106 79 107 @staticmethod … … 150 178 return start, stop 151 179 180 def _free_pending_blocks(self): 181 # Free all the blocks in the pending list - called with the lock held. 182 while True: 183 try: 184 block = self._pending_free_blocks.pop() 185 except IndexError: 186 break 187 self._allocated_blocks.remove(block) 188 self._free(block) 189 152 190 def free(self, block): 153 191 # free a block returned by malloc() 192 # Since free() can be called asynchronously by the GC, it could happen 193 # that it's called while self._lock is held: in that case, 194 # self._lock.acquire() would deadlock (issue #12352). To avoid that, a 195 # trylock is used instead, and if the lock can't be acquired 196 # immediately, the block is added to a list of blocks to be freed 197 # synchronously sometimes later from malloc() or free(), by calling 198 # _free_pending_blocks() (appending and retrieving from a list is not 199 # strictly thread-safe but under cPython it's atomic thanks to the GIL). 154 200 assert os.getpid() == self._lastpid 155 self._lock.acquire() 156 try: 157 self._allocated_blocks.remove(block) 158 self._free(block) 159 finally: 160 self._lock.release() 201 if not self._lock.acquire(False): 202 # can't acquire the lock right now, add the block to the list of 203 # pending blocks to free 204 self._pending_free_blocks.append(block) 205 else: 206 # we hold the lock 207 try: 208 self._free_pending_blocks() 209 self._allocated_blocks.remove(block) 210 self._free(block) 211 finally: 212 self._lock.release() 161 213 162 214 def malloc(self, size): … … 166 218 self.__init__() # reinitialize after fork 167 219 self._lock.acquire() 220 self._free_pending_blocks() 168 221 try: 169 222 size = self._roundup(max(size,1), self._alignment) -
python/trunk/Lib/multiprocessing/managers.py
r2 r391 5 5 # multiprocessing/managers.py 6 6 # 7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 7 # Copyright (c) 2006-2008, R Oudkerk 8 # All rights reserved. 9 # 10 # Redistribution and use in source and binary forms, with or without 11 # modification, are permitted provided that the following conditions 12 # are met: 13 # 14 # 1. Redistributions of source code must retain the above copyright 15 # notice, this list of conditions and the following disclaimer. 16 # 2. Redistributions in binary form must reproduce the above copyright 17 # notice, this list of conditions and the following disclaimer in the 18 # documentation and/or other materials provided with the distribution. 19 # 3. Neither the name of author nor the names of any contributors may be 20 # used to endorse or promote products derived from this software 21 # without specific prior written permission. 22 # 23 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 # SUCH DAMAGE. 8 34 # 9 35 … … 134 160 135 161 # do authentication later 136 self.listener = Listener(address=address, backlog= 5)162 self.listener = Listener(address=address, backlog=16) 137 163 self.address = self.listener.address 138 164 139 self.id_to_obj = { 0: (None, ())}165 self.id_to_obj = {'0': (None, ())} 140 166 self.id_to_refcount = {} 141 167 self.mutex = threading.RLock() … … 299 325 keys.sort() 300 326 for ident in keys: 301 if ident != 0:327 if ident != '0': 302 328 result.append(' %s: refcount=%s\n %s' % 303 329 (ident, self.id_to_refcount[ident], … … 311 337 Number of shared objects 312 338 ''' 313 return len(self.id_to_obj) - 1 # don't count ident= 0339 return len(self.id_to_obj) - 1 # don't count ident='0' 314 340 315 341 def shutdown(self, c): … … 476 502 self._state.value = State.STARTED 477 503 478 def start(self ):504 def start(self, initializer=None, initargs=()): 479 505 ''' 480 506 Spawn a server process for this manager object 481 507 ''' 482 508 assert self._state.value == State.INITIAL 509 510 if initializer is not None and not hasattr(initializer, '__call__'): 511 raise TypeError('initializer must be a callable') 483 512 484 513 # pipe over which we will retrieve address of server … … 489 518 target=type(self)._run_server, 490 519 args=(self._registry, self._address, self._authkey, 491 self._serializer, writer ),520 self._serializer, writer, initializer, initargs), 492 521 ) 493 522 ident = ':'.join(str(i) for i in self._process._identity) … … 510 539 511 540 @classmethod 512 def _run_server(cls, registry, address, authkey, serializer, writer): 541 def _run_server(cls, registry, address, authkey, serializer, writer, 542 initializer=None, initargs=()): 513 543 ''' 514 544 Create a server, report its address and run it 515 545 ''' 546 if initializer is not None: 547 initializer(*initargs) 548 516 549 # create server 517 550 server = cls._Server(registry, address, authkey, serializer) … … 731 764 exposed, token = result 732 765 proxytype = self._manager._registry[token.typeid][-1] 766 token.address = self._token.address 733 767 proxy = proxytype( 734 768 token, self._serializer, manager=self._manager, -
python/trunk/Lib/multiprocessing/pool.py
r2 r391 4 4 # multiprocessing/pool.py 5 5 # 6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 43 69 # 44 70 45 def worker(inqueue, outqueue, initializer=None, initargs=()): 71 class MaybeEncodingError(Exception): 72 """Wraps possible unpickleable errors, so they can be 73 safely sent through the socket.""" 74 75 def __init__(self, exc, value): 76 self.exc = repr(exc) 77 self.value = repr(value) 78 super(MaybeEncodingError, self).__init__(self.exc, self.value) 79 80 def __str__(self): 81 return "Error sending result: '%s'. Reason: '%s'" % (self.value, 82 self.exc) 83 84 def __repr__(self): 85 return "<MaybeEncodingError: %s>" % str(self) 86 87 88 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): 89 assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) 46 90 put = outqueue.put 47 91 get = inqueue.get … … 53 97 initializer(*initargs) 54 98 55 while 1: 99 completed = 0 100 while maxtasks is None or (maxtasks and completed < maxtasks): 56 101 try: 57 102 task = get() … … 69 114 except Exception, e: 70 115 result = (False, e) 71 put((job, i, result)) 116 try: 117 put((job, i, result)) 118 except Exception as e: 119 wrapped = MaybeEncodingError(e, result[1]) 120 debug("Possible encoding error while sending result: %s" % ( 121 wrapped)) 122 put((job, i, (False, wrapped))) 123 completed += 1 124 debug('worker exiting after %d tasks' % completed) 72 125 73 126 # … … 81 134 Process = Process 82 135 83 def __init__(self, processes=None, initializer=None, initargs=()): 136 def __init__(self, processes=None, initializer=None, initargs=(), 137 maxtasksperchild=None): 84 138 self._setup_queues() 85 139 self._taskqueue = Queue.Queue() 86 140 self._cache = {} 87 141 self._state = RUN 142 self._maxtasksperchild = maxtasksperchild 143 self._initializer = initializer 144 self._initargs = initargs 88 145 89 146 if processes is None: … … 92 149 except NotImplementedError: 93 150 processes = 1 94 151 if processes < 1: 152 raise ValueError("Number of processes must be at least 1") 153 154 if initializer is not None and not hasattr(initializer, '__call__'): 155 raise TypeError('initializer must be a callable') 156 157 self._processes = processes 95 158 self._pool = [] 96 for i in range(processes): 97 w = self.Process( 98 target=worker, 99 args=(self._inqueue, self._outqueue, initializer, initargs) 100 ) 101 self._pool.append(w) 102 w.name = w.name.replace('Process', 'PoolWorker') 103 w.daemon = True 104 w.start() 159 self._repopulate_pool() 160 161 self._worker_handler = threading.Thread( 162 target=Pool._handle_workers, 163 args=(self, ) 164 ) 165 self._worker_handler.daemon = True 166 self._worker_handler._state = RUN 167 self._worker_handler.start() 168 105 169 106 170 self._task_handler = threading.Thread( … … 123 187 self, self._terminate_pool, 124 188 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 125 self._task_handler, self._result_handler, self._cache), 189 self._worker_handler, self._task_handler, 190 self._result_handler, self._cache), 126 191 exitpriority=15 127 192 ) 193 194 def _join_exited_workers(self): 195 """Cleanup after any worker processes which have exited due to reaching 196 their specified lifetime. Returns True if any workers were cleaned up. 197 """ 198 cleaned = False 199 for i in reversed(range(len(self._pool))): 200 worker = self._pool[i] 201 if worker.exitcode is not None: 202 # worker exited 203 debug('cleaning up worker %d' % i) 204 worker.join() 205 cleaned = True 206 del self._pool[i] 207 return cleaned 208 209 def _repopulate_pool(self): 210 """Bring the number of pool processes up to the specified number, 211 for use after reaping workers which have exited. 212 """ 213 for i in range(self._processes - len(self._pool)): 214 w = self.Process(target=worker, 215 args=(self._inqueue, self._outqueue, 216 self._initializer, 217 self._initargs, self._maxtasksperchild) 218 ) 219 self._pool.append(w) 220 w.name = w.name.replace('Process', 'PoolWorker') 221 w.daemon = True 222 w.start() 223 debug('added worker') 224 225 def _maintain_pool(self): 226 """Clean up any exited workers and start replacements for them. 227 """ 228 if self._join_exited_workers(): 229 self._repopulate_pool() 128 230 129 231 def _setup_queues(self): … … 205 307 if extra: 206 308 chunksize += 1 309 if len(iterable) == 0: 310 chunksize = 0 207 311 208 312 task_batches = Pool._get_tasks(func, iterable, chunksize) … … 211 315 for i, x in enumerate(task_batches)), None)) 212 316 return result 317 318 @staticmethod 319 def _handle_workers(pool): 320 thread = threading.current_thread() 321 322 # Keep maintaining workers until the cache gets drained, unless the pool 323 # is terminated. 324 while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 325 pool._maintain_pool() 326 time.sleep(0.1) 327 # send sentinel to stop workers 328 pool._taskqueue.put(None) 329 debug('worker handler exiting') 213 330 214 331 @staticmethod … … 327 444 if self._state == RUN: 328 445 self._state = CLOSE 329 self._ taskqueue.put(None)446 self._worker_handler._state = CLOSE 330 447 331 448 def terminate(self): 332 449 debug('terminating pool') 333 450 self._state = TERMINATE 451 self._worker_handler._state = TERMINATE 334 452 self._terminate() 335 453 … … 337 455 debug('joining pool') 338 456 assert self._state in (CLOSE, TERMINATE) 457 self._worker_handler.join() 339 458 self._task_handler.join() 340 459 self._result_handler.join() … … 353 472 @classmethod 354 473 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 355 task_handler, result_handler, cache):474 worker_handler, task_handler, result_handler, cache): 356 475 # this is guaranteed to only be called once 357 476 debug('finalizing pool') 358 477 478 worker_handler._state = TERMINATE 359 479 task_handler._state = TERMINATE 360 taskqueue.put(None) # sentinel361 480 362 481 debug('helping task handler/workers to finish') … … 368 487 outqueue.put(None) # sentinel 369 488 489 # We must wait for the worker handler to exit before terminating 490 # workers because we don't want workers to be restarted behind our back. 491 debug('joining worker handler') 492 if threading.current_thread() is not worker_handler: 493 worker_handler.join(1e100) 494 495 # Terminate workers which haven't already finished. 370 496 if pool and hasattr(pool[0], 'terminate'): 371 497 debug('terminating workers') 372 498 for p in pool: 373 p.terminate() 499 if p.exitcode is None: 500 p.terminate() 374 501 375 502 debug('joining task handler') 376 task_handler.join(1e100) 503 if threading.current_thread() is not task_handler: 504 task_handler.join(1e100) 377 505 378 506 debug('joining result handler') 379 result_handler.join(1e100) 507 if threading.current_thread() is not result_handler: 508 result_handler.join(1e100) 380 509 381 510 if pool and hasattr(pool[0], 'terminate'): 382 511 debug('joining pool workers') 383 512 for p in pool: 384 p.join() 513 if p.is_alive(): 514 # worker has not yet exited 515 debug('cleaning up worker %d' % p.pid) 516 p.join() 385 517 386 518 # … … 434 566 del self._cache[self._job] 435 567 568 AsyncResult = ApplyResult # create alias -- see #17805 569 436 570 # 437 571 # Class whose instances are returned by `Pool.map_async()` … … 448 582 self._number_left = 0 449 583 self._ready = True 584 del cache[self._job] 450 585 else: 451 586 self._number_left = length//chunksize + bool(length % chunksize) -
python/trunk/Lib/multiprocessing/process.py
r2 r391 4 4 # multiprocessing/process.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 180 206 def ident(self): 181 207 ''' 182 Return i ndentifier (PID) of process or `None` if it has yet to start208 Return identifier (PID) of process or `None` if it has yet to start 183 209 ''' 184 210 if self is _current_process: … … 237 263 if not e.args: 238 264 exitcode = 1 239 elif type(e.args[0]) is int:265 elif isinstance(e.args[0], int): 240 266 exitcode = e.args[0] 241 267 else: 242 sys.stderr.write( e.args[0]+ '\n')268 sys.stderr.write(str(e.args[0]) + '\n') 243 269 sys.stderr.flush() 244 exitcode = 1270 exitcode = 0 if isinstance(e.args[0], str) else 1 245 271 except: 246 272 exitcode = 1 -
python/trunk/Lib/multiprocessing/queues.py
r2 r391 4 4 # multiprocessing/queues.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 101 127 raise Empty 102 128 try: 103 if not self._poll(block and (deadline-time.time()) or 0.0): 129 if block: 130 timeout = deadline - time.time() 131 if timeout < 0 or not self._poll(timeout): 132 raise Empty 133 elif not self._poll(): 104 134 raise Empty 105 135 res = self._recv() … … 163 193 164 194 # On process exit we will wait for data to be flushed to pipe. 165 # 166 # However, if this process created the queue then all 167 # processes which use the queue will be descendants of this 168 # process. Therefore waiting for the queue to be flushed 169 # is pointless once all the child processes have been joined. 170 created_by_this_process = (self._opid == os.getpid()) 171 if not self._joincancelled and not created_by_this_process: 195 if not self._joincancelled: 172 196 self._jointhread = Finalize( 173 197 self._thread, Queue._finalize_join, -
python/trunk/Lib/multiprocessing/reduction.py
r2 r391 5 5 # multiprocessing/reduction.py 6 6 # 7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 7 # Copyright (c) 2006-2008, R Oudkerk 8 # All rights reserved. 9 # 10 # Redistribution and use in source and binary forms, with or without 11 # modification, are permitted provided that the following conditions 12 # are met: 13 # 14 # 1. Redistributions of source code must retain the above copyright 15 # notice, this list of conditions and the following disclaimer. 16 # 2. Redistributions in binary form must reproduce the above copyright 17 # notice, this list of conditions and the following disclaimer in the 18 # documentation and/or other materials provided with the distribution. 19 # 3. Neither the name of author nor the names of any contributors may be 20 # used to endorse or promote products derived from this software 21 # without specific prior written permission. 22 # 23 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 # SUCH DAMAGE. 8 34 # 9 35 … … 35 61 if sys.platform == 'win32': 36 62 import _subprocess 37 from ._multiprocessing import win3263 from _multiprocessing import win32 38 64 39 65 def send_handle(conn, handle, destination_pid): -
python/trunk/Lib/multiprocessing/sharedctypes.py
r2 r391 4 4 # multiprocessing/sharedctypes.py 5 5 # 6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 53 79 ''' 54 80 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type) 55 if isinstance(size_or_initializer, int):81 if isinstance(size_or_initializer, (int, long)): 56 82 type_ = type_ * size_or_initializer 57 return _new_value(type_) 83 obj = _new_value(type_) 84 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj)) 85 return obj 58 86 else: 59 87 type_ = type_ * len(size_or_initializer) -
python/trunk/Lib/multiprocessing/synchronize.py
r2 r391 4 4 # multiprocessing/synchronize.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 59 85 self.acquire = self._semlock.acquire 60 86 self.release = self._semlock.release 61 self.__enter__ = self._semlock.__enter__ 62 self.__exit__ = self._semlock.__exit__ 87 88 def __enter__(self): 89 return self._semlock.__enter__() 90 91 def __exit__(self, *args): 92 return self._semlock.__exit__(*args) 63 93 64 94 def __getstate__(self): … … 182 212 self._make_methods() 183 213 214 def __enter__(self): 215 return self._lock.__enter__() 216 217 def __exit__(self, *args): 218 return self._lock.__exit__(*args) 219 184 220 def _make_methods(self): 185 221 self.acquire = self._lock.acquire 186 222 self.release = self._lock.release 187 self.__enter__ = self._lock.__enter__188 self.__exit__ = self._lock.__exit__189 223 190 224 def __repr__(self): … … 193 227 self._woken_count._semlock._get_value()) 194 228 except Exception: 195 num_waiters = 'unk own'229 num_waiters = 'unknown' 196 230 return '<Condition(%s, %s)>' % (self._lock, num_waiters) 197 231 … … 302 336 else: 303 337 self._cond.wait(timeout) 338 339 if self._flag.acquire(False): 340 self._flag.release() 341 return True 342 return False 304 343 finally: 305 344 self._cond.release() -
python/trunk/Lib/multiprocessing/util.py
r2 r391 4 4 # multiprocessing/util.py 5 5 # 6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 7 33 # 8 34 … … 12 38 import threading # we want threading to install it's 13 39 # cleanup function before multiprocessing does 40 from subprocess import _args_from_interpreter_flags 14 41 15 42 from multiprocessing.process import current_process, active_children … … 222 249 the same priority will be called in reverse order of creation. 223 250 ''' 251 if _finalizer_registry is None: 252 # This function may be called after this module's globals are 253 # destroyed. See the _exit_function function in this module for more 254 # notes. 255 return 256 224 257 if minpriority is None: 225 258 f = lambda p : p[0][0] is not None … … 253 286 _exiting = False 254 287 255 def _exit_function(): 288 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 289 active_children=active_children, 290 current_process=current_process): 291 # NB: we hold on to references to functions in the arglist due to the 292 # situation described below, where this function is called after this 293 # module's globals are destroyed. 294 256 295 global _exiting 257 296 … … 260 299 _run_finalizers(0) 261 300 262 for p in active_children(): 263 if p._daemonic: 264 info('calling terminate() for daemon %s', p.name) 265 p._popen.terminate() 266 267 for p in active_children(): 268 info('calling join() for process %s', p.name) 269 p.join() 301 if current_process() is not None: 302 # NB: we check if the current process is None here because if 303 # it's None, any call to ``active_children()`` will throw an 304 # AttributeError (active_children winds up trying to get 305 # attributes from util._current_process). This happens in a 306 # variety of shutdown circumstances that are not well-understood 307 # because module-scope variables are not apparently supposed to 308 # be destroyed until after this function is called. However, 309 # they are indeed destroyed before this function is called. See 310 # issues 9775 and 15881. Also related: 4106, 9205, and 9207. 311 312 for p in active_children(): 313 if p._daemonic: 314 info('calling terminate() for daemon %s', p.name) 315 p._popen.terminate() 316 317 for p in active_children(): 318 info('calling join() for process %s', p.name) 319 p.join() 270 320 271 321 debug('running the remaining "atexit" finalizers') … … 280 330 class ForkAwareThreadLock(object): 281 331 def __init__(self): 332 self._reset() 333 register_after_fork(self, ForkAwareThreadLock._reset) 334 335 def _reset(self): 282 336 self._lock = threading.Lock() 283 337 self.acquire = self._lock.acquire 284 338 self.release = self._lock.release 285 register_after_fork(self, ForkAwareThreadLock.__init__)286 339 287 340 class ForkAwareLocal(threading.local):
Note:
See TracChangeset
for help on using the changeset viewer.