1 | #
|
---|
2 | # Module providing various facilities to other parts of the package
|
---|
3 | #
|
---|
4 | # multiprocessing/util.py
|
---|
5 | #
|
---|
6 | # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
|
---|
7 | #
|
---|
8 |
|
---|
9 | import itertools
|
---|
10 | import weakref
|
---|
11 | import atexit
|
---|
12 | import threading # we want threading to install it's
|
---|
13 | # cleanup function before multiprocessing does
|
---|
14 |
|
---|
15 | from multiprocessing.process import current_process, active_children
|
---|
16 |
|
---|
17 | __all__ = [
|
---|
18 | 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
|
---|
19 | 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
|
---|
20 | 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
|
---|
21 | 'SUBDEBUG', 'SUBWARNING',
|
---|
22 | ]
|
---|
23 |
|
---|
24 | #
|
---|
25 | # Logging
|
---|
26 | #
|
---|
27 |
|
---|
28 | NOTSET = 0
|
---|
29 | SUBDEBUG = 5
|
---|
30 | DEBUG = 10
|
---|
31 | INFO = 20
|
---|
32 | SUBWARNING = 25
|
---|
33 |
|
---|
34 | LOGGER_NAME = 'multiprocessing'
|
---|
35 | DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
|
---|
36 |
|
---|
37 | _logger = None
|
---|
38 | _log_to_stderr = False
|
---|
39 |
|
---|
40 | def sub_debug(msg, *args):
|
---|
41 | if _logger:
|
---|
42 | _logger.log(SUBDEBUG, msg, *args)
|
---|
43 |
|
---|
44 | def debug(msg, *args):
|
---|
45 | if _logger:
|
---|
46 | _logger.log(DEBUG, msg, *args)
|
---|
47 |
|
---|
48 | def info(msg, *args):
|
---|
49 | if _logger:
|
---|
50 | _logger.log(INFO, msg, *args)
|
---|
51 |
|
---|
52 | def sub_warning(msg, *args):
|
---|
53 | if _logger:
|
---|
54 | _logger.log(SUBWARNING, msg, *args)
|
---|
55 |
|
---|
56 | def get_logger():
|
---|
57 | '''
|
---|
58 | Returns logger used by multiprocessing
|
---|
59 | '''
|
---|
60 | global _logger
|
---|
61 | import logging, atexit
|
---|
62 |
|
---|
63 | logging._acquireLock()
|
---|
64 | try:
|
---|
65 | if not _logger:
|
---|
66 |
|
---|
67 | _logger = logging.getLogger(LOGGER_NAME)
|
---|
68 | _logger.propagate = 0
|
---|
69 | logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
|
---|
70 | logging.addLevelName(SUBWARNING, 'SUBWARNING')
|
---|
71 |
|
---|
72 | # XXX multiprocessing should cleanup before logging
|
---|
73 | if hasattr(atexit, 'unregister'):
|
---|
74 | atexit.unregister(_exit_function)
|
---|
75 | atexit.register(_exit_function)
|
---|
76 | else:
|
---|
77 | atexit._exithandlers.remove((_exit_function, (), {}))
|
---|
78 | atexit._exithandlers.append((_exit_function, (), {}))
|
---|
79 |
|
---|
80 | finally:
|
---|
81 | logging._releaseLock()
|
---|
82 |
|
---|
83 | return _logger
|
---|
84 |
|
---|
85 | def log_to_stderr(level=None):
|
---|
86 | '''
|
---|
87 | Turn on logging and add a handler which prints to stderr
|
---|
88 | '''
|
---|
89 | global _log_to_stderr
|
---|
90 | import logging
|
---|
91 |
|
---|
92 | logger = get_logger()
|
---|
93 | formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
|
---|
94 | handler = logging.StreamHandler()
|
---|
95 | handler.setFormatter(formatter)
|
---|
96 | logger.addHandler(handler)
|
---|
97 |
|
---|
98 | if level:
|
---|
99 | logger.setLevel(level)
|
---|
100 | _log_to_stderr = True
|
---|
101 | return _logger
|
---|
102 |
|
---|
103 | #
|
---|
104 | # Function returning a temp directory which will be removed on exit
|
---|
105 | #
|
---|
106 |
|
---|
107 | def get_temp_dir():
|
---|
108 | # get name of a temp directory which will be automatically cleaned up
|
---|
109 | if current_process()._tempdir is None:
|
---|
110 | import shutil, tempfile
|
---|
111 | tempdir = tempfile.mkdtemp(prefix='pymp-')
|
---|
112 | info('created temp directory %s', tempdir)
|
---|
113 | Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
|
---|
114 | current_process()._tempdir = tempdir
|
---|
115 | return current_process()._tempdir
|
---|
116 |
|
---|
117 | #
|
---|
118 | # Support for reinitialization of objects when bootstrapping a child process
|
---|
119 | #
|
---|
120 |
|
---|
121 | _afterfork_registry = weakref.WeakValueDictionary()
|
---|
122 | _afterfork_counter = itertools.count()
|
---|
123 |
|
---|
124 | def _run_after_forkers():
|
---|
125 | items = list(_afterfork_registry.items())
|
---|
126 | items.sort()
|
---|
127 | for (index, ident, func), obj in items:
|
---|
128 | try:
|
---|
129 | func(obj)
|
---|
130 | except Exception, e:
|
---|
131 | info('after forker raised exception %s', e)
|
---|
132 |
|
---|
133 | def register_after_fork(obj, func):
|
---|
134 | _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
|
---|
135 |
|
---|
136 | #
|
---|
137 | # Finalization using weakrefs
|
---|
138 | #
|
---|
139 |
|
---|
140 | _finalizer_registry = {}
|
---|
141 | _finalizer_counter = itertools.count()
|
---|
142 |
|
---|
143 |
|
---|
144 | class Finalize(object):
|
---|
145 | '''
|
---|
146 | Class which supports object finalization using weakrefs
|
---|
147 | '''
|
---|
148 | def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
|
---|
149 | assert exitpriority is None or type(exitpriority) is int
|
---|
150 |
|
---|
151 | if obj is not None:
|
---|
152 | self._weakref = weakref.ref(obj, self)
|
---|
153 | else:
|
---|
154 | assert exitpriority is not None
|
---|
155 |
|
---|
156 | self._callback = callback
|
---|
157 | self._args = args
|
---|
158 | self._kwargs = kwargs or {}
|
---|
159 | self._key = (exitpriority, _finalizer_counter.next())
|
---|
160 |
|
---|
161 | _finalizer_registry[self._key] = self
|
---|
162 |
|
---|
163 | def __call__(self, wr=None):
|
---|
164 | '''
|
---|
165 | Run the callback unless it has already been called or cancelled
|
---|
166 | '''
|
---|
167 | try:
|
---|
168 | del _finalizer_registry[self._key]
|
---|
169 | except KeyError:
|
---|
170 | sub_debug('finalizer no longer registered')
|
---|
171 | else:
|
---|
172 | sub_debug('finalizer calling %s with args %s and kwargs %s',
|
---|
173 | self._callback, self._args, self._kwargs)
|
---|
174 | res = self._callback(*self._args, **self._kwargs)
|
---|
175 | self._weakref = self._callback = self._args = \
|
---|
176 | self._kwargs = self._key = None
|
---|
177 | return res
|
---|
178 |
|
---|
179 | def cancel(self):
|
---|
180 | '''
|
---|
181 | Cancel finalization of the object
|
---|
182 | '''
|
---|
183 | try:
|
---|
184 | del _finalizer_registry[self._key]
|
---|
185 | except KeyError:
|
---|
186 | pass
|
---|
187 | else:
|
---|
188 | self._weakref = self._callback = self._args = \
|
---|
189 | self._kwargs = self._key = None
|
---|
190 |
|
---|
191 | def still_active(self):
|
---|
192 | '''
|
---|
193 | Return whether this finalizer is still waiting to invoke callback
|
---|
194 | '''
|
---|
195 | return self._key in _finalizer_registry
|
---|
196 |
|
---|
197 | def __repr__(self):
|
---|
198 | try:
|
---|
199 | obj = self._weakref()
|
---|
200 | except (AttributeError, TypeError):
|
---|
201 | obj = None
|
---|
202 |
|
---|
203 | if obj is None:
|
---|
204 | return '<Finalize object, dead>'
|
---|
205 |
|
---|
206 | x = '<Finalize object, callback=%s' % \
|
---|
207 | getattr(self._callback, '__name__', self._callback)
|
---|
208 | if self._args:
|
---|
209 | x += ', args=' + str(self._args)
|
---|
210 | if self._kwargs:
|
---|
211 | x += ', kwargs=' + str(self._kwargs)
|
---|
212 | if self._key[0] is not None:
|
---|
213 | x += ', exitprority=' + str(self._key[0])
|
---|
214 | return x + '>'
|
---|
215 |
|
---|
216 |
|
---|
217 | def _run_finalizers(minpriority=None):
|
---|
218 | '''
|
---|
219 | Run all finalizers whose exit priority is not None and at least minpriority
|
---|
220 |
|
---|
221 | Finalizers with highest priority are called first; finalizers with
|
---|
222 | the same priority will be called in reverse order of creation.
|
---|
223 | '''
|
---|
224 | if minpriority is None:
|
---|
225 | f = lambda p : p[0][0] is not None
|
---|
226 | else:
|
---|
227 | f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
|
---|
228 |
|
---|
229 | items = [x for x in _finalizer_registry.items() if f(x)]
|
---|
230 | items.sort(reverse=True)
|
---|
231 |
|
---|
232 | for key, finalizer in items:
|
---|
233 | sub_debug('calling %s', finalizer)
|
---|
234 | try:
|
---|
235 | finalizer()
|
---|
236 | except Exception:
|
---|
237 | import traceback
|
---|
238 | traceback.print_exc()
|
---|
239 |
|
---|
240 | if minpriority is None:
|
---|
241 | _finalizer_registry.clear()
|
---|
242 |
|
---|
243 | #
|
---|
244 | # Clean up on exit
|
---|
245 | #
|
---|
246 |
|
---|
247 | def is_exiting():
|
---|
248 | '''
|
---|
249 | Returns true if the process is shutting down
|
---|
250 | '''
|
---|
251 | return _exiting or _exiting is None
|
---|
252 |
|
---|
253 | _exiting = False
|
---|
254 |
|
---|
255 | def _exit_function():
|
---|
256 | global _exiting
|
---|
257 |
|
---|
258 | info('process shutting down')
|
---|
259 | debug('running all "atexit" finalizers with priority >= 0')
|
---|
260 | _run_finalizers(0)
|
---|
261 |
|
---|
262 | for p in active_children():
|
---|
263 | if p._daemonic:
|
---|
264 | info('calling terminate() for daemon %s', p.name)
|
---|
265 | p._popen.terminate()
|
---|
266 |
|
---|
267 | for p in active_children():
|
---|
268 | info('calling join() for process %s', p.name)
|
---|
269 | p.join()
|
---|
270 |
|
---|
271 | debug('running the remaining "atexit" finalizers')
|
---|
272 | _run_finalizers()
|
---|
273 |
|
---|
274 | atexit.register(_exit_function)
|
---|
275 |
|
---|
276 | #
|
---|
277 | # Some fork aware types
|
---|
278 | #
|
---|
279 |
|
---|
280 | class ForkAwareThreadLock(object):
|
---|
281 | def __init__(self):
|
---|
282 | self._lock = threading.Lock()
|
---|
283 | self.acquire = self._lock.acquire
|
---|
284 | self.release = self._lock.release
|
---|
285 | register_after_fork(self, ForkAwareThreadLock.__init__)
|
---|
286 |
|
---|
287 | class ForkAwareLocal(threading.local):
|
---|
288 | def __init__(self):
|
---|
289 | register_after_fork(self, lambda obj : obj.__dict__.clear())
|
---|
290 | def __reduce__(self):
|
---|
291 | return type(self), ()
|
---|