source: vendor/current/buildtools/wafsamba/nothreads.py

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 6.3 KB
Line 
1# encoding: utf-8
2# Thomas Nagy, 2005-2008 (ita)
3
4# this replaces the core of Runner.py in waf with a varient that works
5# on systems with completely broken threading (such as Python 2.5.x on
6# AIX). For simplicity we enable this when JOBS=1, which is triggered
7# by the compatibility makefile used for the waf build. That also ensures
8# this code is tested, as it means it is used in the build farm, and by
9# anyone using 'make' to build Samba with waf
10
11"Execute the tasks"
12
13import sys, random, threading
14try: from Queue import Queue
15except ImportError: from queue import Queue
16import Utils, Options
17from Constants import EXCEPTION, CRASHED, MAXJOBS, ASK_LATER, SKIPPED, SKIP_ME, SUCCESS
18
19GAP = 15
20
21run_old = threading.Thread.run
22def run(*args, **kwargs):
23 try:
24 run_old(*args, **kwargs)
25 except (KeyboardInterrupt, SystemExit):
26 raise
27 except:
28 sys.excepthook(*sys.exc_info())
29threading.Thread.run = run
30
31
32class TaskConsumer(object):
33 consumers = 1
34
35def process(tsk):
36 m = tsk.master
37 if m.stop:
38 m.out.put(tsk)
39 return
40
41 try:
42 tsk.generator.bld.printout(tsk.display())
43 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
44 # actual call to task's run() function
45 else: ret = tsk.call_run()
46 except Exception, e:
47 tsk.err_msg = Utils.ex_stack()
48 tsk.hasrun = EXCEPTION
49
50 # TODO cleanup
51 m.error_handler(tsk)
52 m.out.put(tsk)
53 return
54
55 if ret:
56 tsk.err_code = ret
57 tsk.hasrun = CRASHED
58 else:
59 try:
60 tsk.post_run()
61 except Utils.WafError:
62 pass
63 except Exception:
64 tsk.err_msg = Utils.ex_stack()
65 tsk.hasrun = EXCEPTION
66 else:
67 tsk.hasrun = SUCCESS
68 if tsk.hasrun != SUCCESS:
69 m.error_handler(tsk)
70
71 m.out.put(tsk)
72
73class Parallel(object):
74 """
75 keep the consumer threads busy, and avoid consuming cpu cycles
76 when no more tasks can be added (end of the build, etc)
77 """
78 def __init__(self, bld, j=2):
79
80 # number of consumers
81 self.numjobs = j
82
83 self.manager = bld.task_manager
84 self.manager.current_group = 0
85
86 self.total = self.manager.total()
87
88 # tasks waiting to be processed - IMPORTANT
89 self.outstanding = []
90 self.maxjobs = MAXJOBS
91
92 # tasks that are awaiting for another task to complete
93 self.frozen = []
94
95 # tasks returned by the consumers
96 self.out = Queue(0)
97
98 self.count = 0 # tasks not in the producer area
99
100 self.processed = 1 # progress indicator
101
102 self.stop = False # error condition to stop the build
103 self.error = False # error flag
104
105 def get_next(self):
106 "override this method to schedule the tasks in a particular order"
107 if not self.outstanding:
108 return None
109 return self.outstanding.pop(0)
110
111 def postpone(self, tsk):
112 "override this method to schedule the tasks in a particular order"
113 # TODO consider using a deque instead
114 if random.randint(0, 1):
115 self.frozen.insert(0, tsk)
116 else:
117 self.frozen.append(tsk)
118
119 def refill_task_list(self):
120 "called to set the next group of tasks"
121
122 while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
123 self.get_out()
124
125 while not self.outstanding:
126 if self.count:
127 self.get_out()
128
129 if self.frozen:
130 self.outstanding += self.frozen
131 self.frozen = []
132 elif not self.count:
133 (jobs, tmp) = self.manager.get_next_set()
134 if jobs is not None:
135 self.maxjobs = jobs
136 if tmp:
137 self.outstanding += tmp
138 break
139
140 def get_out(self):
141 "the tasks that are put to execute are all collected using get_out"
142 ret = self.out.get()
143 self.manager.add_finished(ret)
144 if not self.stop and getattr(ret, 'more_tasks', None):
145 self.outstanding += ret.more_tasks
146 self.total += len(ret.more_tasks)
147 self.count -= 1
148
149 def error_handler(self, tsk):
150 "by default, errors make the build stop (not thread safe so be careful)"
151 if not Options.options.keep:
152 self.stop = True
153 self.error = True
154
155 def start(self):
156 "execute the tasks"
157
158 while not self.stop:
159
160 self.refill_task_list()
161
162 # consider the next task
163 tsk = self.get_next()
164 if not tsk:
165 if self.count:
166 # tasks may add new ones after they are run
167 continue
168 else:
169 # no tasks to run, no tasks running, time to exit
170 break
171
172 if tsk.hasrun:
173 # if the task is marked as "run", just skip it
174 self.processed += 1
175 self.manager.add_finished(tsk)
176 continue
177
178 try:
179 st = tsk.runnable_status()
180 except Exception, e:
181 self.processed += 1
182 if self.stop and not Options.options.keep:
183 tsk.hasrun = SKIPPED
184 self.manager.add_finished(tsk)
185 continue
186 self.error_handler(tsk)
187 self.manager.add_finished(tsk)
188 tsk.hasrun = EXCEPTION
189 tsk.err_msg = Utils.ex_stack()
190 continue
191
192 if st == ASK_LATER:
193 self.postpone(tsk)
194 elif st == SKIP_ME:
195 self.processed += 1
196 tsk.hasrun = SKIPPED
197 self.manager.add_finished(tsk)
198 else:
199 # run me: put the task in ready queue
200 tsk.position = (self.processed, self.total)
201 self.count += 1
202 self.processed += 1
203 tsk.master = self
204
205 process(tsk)
206
207 # self.count represents the tasks that have been made available to the consumer threads
208 # collect all the tasks after an error else the message may be incomplete
209 while self.error and self.count:
210 self.get_out()
211
212 #print loop
213 assert (self.count == 0 or self.stop)
214
215
216# enable nothreads
217import Runner
218Runner.process = process
219Runner.Parallel = Parallel
Note: See TracBrowser for help on using the repository browser.