1 | # encoding: utf-8
|
---|
2 | # Thomas Nagy, 2005-2008 (ita)
|
---|
3 |
|
---|
4 | # this replaces the core of Runner.py in waf with a varient that works
|
---|
5 | # on systems with completely broken threading (such as Python 2.5.x on
|
---|
6 | # AIX). For simplicity we enable this when JOBS=1, which is triggered
|
---|
7 | # by the compatibility makefile used for the waf build. That also ensures
|
---|
8 | # this code is tested, as it means it is used in the build farm, and by
|
---|
9 | # anyone using 'make' to build Samba with waf
|
---|
10 |
|
---|
11 | "Execute the tasks"
|
---|
12 |
|
---|
13 | import sys, random, threading
|
---|
14 | try: from Queue import Queue
|
---|
15 | except ImportError: from queue import Queue
|
---|
16 | import Utils, Options
|
---|
17 | from Constants import EXCEPTION, CRASHED, MAXJOBS, ASK_LATER, SKIPPED, SKIP_ME, SUCCESS
|
---|
18 |
|
---|
19 | GAP = 15
|
---|
20 |
|
---|
21 | run_old = threading.Thread.run
|
---|
22 | def run(*args, **kwargs):
|
---|
23 | try:
|
---|
24 | run_old(*args, **kwargs)
|
---|
25 | except (KeyboardInterrupt, SystemExit):
|
---|
26 | raise
|
---|
27 | except:
|
---|
28 | sys.excepthook(*sys.exc_info())
|
---|
29 | threading.Thread.run = run
|
---|
30 |
|
---|
31 |
|
---|
32 | class TaskConsumer(object):
|
---|
33 | consumers = 1
|
---|
34 |
|
---|
35 | def process(tsk):
|
---|
36 | m = tsk.master
|
---|
37 | if m.stop:
|
---|
38 | m.out.put(tsk)
|
---|
39 | return
|
---|
40 |
|
---|
41 | try:
|
---|
42 | tsk.generator.bld.printout(tsk.display())
|
---|
43 | if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
|
---|
44 | # actual call to task's run() function
|
---|
45 | else: ret = tsk.call_run()
|
---|
46 | except Exception, e:
|
---|
47 | tsk.err_msg = Utils.ex_stack()
|
---|
48 | tsk.hasrun = EXCEPTION
|
---|
49 |
|
---|
50 | # TODO cleanup
|
---|
51 | m.error_handler(tsk)
|
---|
52 | m.out.put(tsk)
|
---|
53 | return
|
---|
54 |
|
---|
55 | if ret:
|
---|
56 | tsk.err_code = ret
|
---|
57 | tsk.hasrun = CRASHED
|
---|
58 | else:
|
---|
59 | try:
|
---|
60 | tsk.post_run()
|
---|
61 | except Utils.WafError:
|
---|
62 | pass
|
---|
63 | except Exception:
|
---|
64 | tsk.err_msg = Utils.ex_stack()
|
---|
65 | tsk.hasrun = EXCEPTION
|
---|
66 | else:
|
---|
67 | tsk.hasrun = SUCCESS
|
---|
68 | if tsk.hasrun != SUCCESS:
|
---|
69 | m.error_handler(tsk)
|
---|
70 |
|
---|
71 | m.out.put(tsk)
|
---|
72 |
|
---|
73 | class Parallel(object):
|
---|
74 | """
|
---|
75 | keep the consumer threads busy, and avoid consuming cpu cycles
|
---|
76 | when no more tasks can be added (end of the build, etc)
|
---|
77 | """
|
---|
78 | def __init__(self, bld, j=2):
|
---|
79 |
|
---|
80 | # number of consumers
|
---|
81 | self.numjobs = j
|
---|
82 |
|
---|
83 | self.manager = bld.task_manager
|
---|
84 | self.manager.current_group = 0
|
---|
85 |
|
---|
86 | self.total = self.manager.total()
|
---|
87 |
|
---|
88 | # tasks waiting to be processed - IMPORTANT
|
---|
89 | self.outstanding = []
|
---|
90 | self.maxjobs = MAXJOBS
|
---|
91 |
|
---|
92 | # tasks that are awaiting for another task to complete
|
---|
93 | self.frozen = []
|
---|
94 |
|
---|
95 | # tasks returned by the consumers
|
---|
96 | self.out = Queue(0)
|
---|
97 |
|
---|
98 | self.count = 0 # tasks not in the producer area
|
---|
99 |
|
---|
100 | self.processed = 1 # progress indicator
|
---|
101 |
|
---|
102 | self.stop = False # error condition to stop the build
|
---|
103 | self.error = False # error flag
|
---|
104 |
|
---|
105 | def get_next(self):
|
---|
106 | "override this method to schedule the tasks in a particular order"
|
---|
107 | if not self.outstanding:
|
---|
108 | return None
|
---|
109 | return self.outstanding.pop(0)
|
---|
110 |
|
---|
111 | def postpone(self, tsk):
|
---|
112 | "override this method to schedule the tasks in a particular order"
|
---|
113 | # TODO consider using a deque instead
|
---|
114 | if random.randint(0, 1):
|
---|
115 | self.frozen.insert(0, tsk)
|
---|
116 | else:
|
---|
117 | self.frozen.append(tsk)
|
---|
118 |
|
---|
119 | def refill_task_list(self):
|
---|
120 | "called to set the next group of tasks"
|
---|
121 |
|
---|
122 | while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
|
---|
123 | self.get_out()
|
---|
124 |
|
---|
125 | while not self.outstanding:
|
---|
126 | if self.count:
|
---|
127 | self.get_out()
|
---|
128 |
|
---|
129 | if self.frozen:
|
---|
130 | self.outstanding += self.frozen
|
---|
131 | self.frozen = []
|
---|
132 | elif not self.count:
|
---|
133 | (jobs, tmp) = self.manager.get_next_set()
|
---|
134 | if jobs is not None:
|
---|
135 | self.maxjobs = jobs
|
---|
136 | if tmp:
|
---|
137 | self.outstanding += tmp
|
---|
138 | break
|
---|
139 |
|
---|
140 | def get_out(self):
|
---|
141 | "the tasks that are put to execute are all collected using get_out"
|
---|
142 | ret = self.out.get()
|
---|
143 | self.manager.add_finished(ret)
|
---|
144 | if not self.stop and getattr(ret, 'more_tasks', None):
|
---|
145 | self.outstanding += ret.more_tasks
|
---|
146 | self.total += len(ret.more_tasks)
|
---|
147 | self.count -= 1
|
---|
148 |
|
---|
149 | def error_handler(self, tsk):
|
---|
150 | "by default, errors make the build stop (not thread safe so be careful)"
|
---|
151 | if not Options.options.keep:
|
---|
152 | self.stop = True
|
---|
153 | self.error = True
|
---|
154 |
|
---|
155 | def start(self):
|
---|
156 | "execute the tasks"
|
---|
157 |
|
---|
158 | while not self.stop:
|
---|
159 |
|
---|
160 | self.refill_task_list()
|
---|
161 |
|
---|
162 | # consider the next task
|
---|
163 | tsk = self.get_next()
|
---|
164 | if not tsk:
|
---|
165 | if self.count:
|
---|
166 | # tasks may add new ones after they are run
|
---|
167 | continue
|
---|
168 | else:
|
---|
169 | # no tasks to run, no tasks running, time to exit
|
---|
170 | break
|
---|
171 |
|
---|
172 | if tsk.hasrun:
|
---|
173 | # if the task is marked as "run", just skip it
|
---|
174 | self.processed += 1
|
---|
175 | self.manager.add_finished(tsk)
|
---|
176 | continue
|
---|
177 |
|
---|
178 | try:
|
---|
179 | st = tsk.runnable_status()
|
---|
180 | except Exception, e:
|
---|
181 | self.processed += 1
|
---|
182 | if self.stop and not Options.options.keep:
|
---|
183 | tsk.hasrun = SKIPPED
|
---|
184 | self.manager.add_finished(tsk)
|
---|
185 | continue
|
---|
186 | self.error_handler(tsk)
|
---|
187 | self.manager.add_finished(tsk)
|
---|
188 | tsk.hasrun = EXCEPTION
|
---|
189 | tsk.err_msg = Utils.ex_stack()
|
---|
190 | continue
|
---|
191 |
|
---|
192 | if st == ASK_LATER:
|
---|
193 | self.postpone(tsk)
|
---|
194 | elif st == SKIP_ME:
|
---|
195 | self.processed += 1
|
---|
196 | tsk.hasrun = SKIPPED
|
---|
197 | self.manager.add_finished(tsk)
|
---|
198 | else:
|
---|
199 | # run me: put the task in ready queue
|
---|
200 | tsk.position = (self.processed, self.total)
|
---|
201 | self.count += 1
|
---|
202 | self.processed += 1
|
---|
203 | tsk.master = self
|
---|
204 |
|
---|
205 | process(tsk)
|
---|
206 |
|
---|
207 | # self.count represents the tasks that have been made available to the consumer threads
|
---|
208 | # collect all the tasks after an error else the message may be incomplete
|
---|
209 | while self.error and self.count:
|
---|
210 | self.get_out()
|
---|
211 |
|
---|
212 | #print loop
|
---|
213 | assert (self.count == 0 or self.stop)
|
---|
214 |
|
---|
215 |
|
---|
216 | # enable nothreads
|
---|
217 | import Runner
|
---|
218 | Runner.process = process
|
---|
219 | Runner.Parallel = Parallel
|
---|