Changeset 988 for vendor/current/buildtools/wafsamba/nothreads.py
- Timestamp:
- Nov 24, 2016, 1:14:11 PM (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/buildtools/wafsamba/nothreads.py
r740 r988 1 #!/usr/bin/env python2 1 # encoding: utf-8 3 2 # Thomas Nagy, 2005-2008 (ita) … … 12 11 "Execute the tasks" 13 12 14 import sys, random, t ime, threading, traceback, os13 import sys, random, threading 15 14 try: from Queue import Queue 16 15 except ImportError: from queue import Queue 17 import Build, Utils, Logs, Options 18 from Logs import debug, error 19 from Constants import * 16 import Utils, Options 17 from Constants import EXCEPTION, CRASHED, MAXJOBS, ASK_LATER, SKIPPED, SKIP_ME, SUCCESS 20 18 21 19 GAP = 15 … … 23 21 run_old = threading.Thread.run 24 22 def run(*args, **kwargs): 25 26 27 28 29 30 23 try: 24 run_old(*args, **kwargs) 25 except (KeyboardInterrupt, SystemExit): 26 raise 27 except: 28 sys.excepthook(*sys.exc_info()) 31 29 threading.Thread.run = run 32 30 33 31 34 32 class TaskConsumer(object): 35 33 consumers = 1 36 34 37 35 def process(tsk): 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 36 m = tsk.master 37 if m.stop: 38 m.out.put(tsk) 39 return 40 41 try: 42 tsk.generator.bld.printout(tsk.display()) 43 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk) 44 # actual call to task's run() function 45 else: ret = tsk.call_run() 46 except Exception, e: 47 tsk.err_msg = Utils.ex_stack() 48 tsk.hasrun = EXCEPTION 49 50 # TODO cleanup 51 m.error_handler(tsk) 52 m.out.put(tsk) 53 return 54 55 if ret: 56 tsk.err_code = ret 57 tsk.hasrun = CRASHED 58 else: 59 try: 60 tsk.post_run() 61 except Utils.WafError: 62 pass 63 except Exception: 64 tsk.err_msg = Utils.ex_stack() 65 tsk.hasrun = EXCEPTION 66 else: 67 tsk.hasrun = SUCCESS 68 if tsk.hasrun != SUCCESS: 69 m.error_handler(tsk) 70 71 m.out.put(tsk) 74 72 75 73 class Parallel(object): 76 """ 77 keep the consumer threads busy, and avoid consuming cpu cycles 78 when no more tasks can be added (end of the build, etc) 79 """ 80 def __init__(self, bld, j=2): 81 82 # number of consumers 83 self.numjobs = j 84 85 self.manager = bld.task_manager 86 self.manager.current_group = 0 87 88 self.total = self.manager.total() 89 90 # tasks waiting to be processed - IMPORTANT 91 self.outstanding = [] 92 self.maxjobs = MAXJOBS 93 94 # tasks that are awaiting for another task to complete 95 self.frozen = [] 96 97 # tasks returned by the consumers 98 self.out = Queue(0) 99 100 self.count = 0 # tasks not in the producer area 101 102 self.processed = 1 # progress indicator 103 104 self.stop = False # error condition to stop the build 105 self.error = False # error flag 106 107 def get_next(self): 108 "override this method to schedule the tasks in a particular order" 109 if not self.outstanding: 110 return None 111 return self.outstanding.pop(0) 112 113 def postpone(self, tsk): 114 "override this method to schedule the tasks in a particular order" 115 # TODO consider using a deque instead 116 if random.randint(0, 1): 117 self.frozen.insert(0, tsk) 118 else: 119 self.frozen.append(tsk) 120 121 def refill_task_list(self): 122 "called to set the next group of tasks" 123 124 while self.count > self.numjobs + GAP or self.count >= self.maxjobs: 125 self.get_out() 126 127 while not self.outstanding: 128 if self.count: 129 self.get_out() 130 131 if self.frozen: 132 self.outstanding += self.frozen 133 self.frozen = [] 134 elif not self.count: 135 (jobs, tmp) = self.manager.get_next_set() 136 if jobs != None: self.maxjobs = jobs 137 if tmp: self.outstanding += tmp 138 break 139 140 def get_out(self): 141 "the tasks that are put to execute are all collected using get_out" 142 ret = self.out.get() 143 self.manager.add_finished(ret) 144 if not self.stop and getattr(ret, 'more_tasks', None): 145 self.outstanding += ret.more_tasks 146 self.total += len(ret.more_tasks) 147 self.count -= 1 148 149 def error_handler(self, tsk): 150 "by default, errors make the build stop (not thread safe so be careful)" 151 if not Options.options.keep: 152 self.stop = True 153 self.error = True 154 155 def start(self): 156 "execute the tasks" 157 158 while not self.stop: 159 160 self.refill_task_list() 161 162 # consider the next task 163 tsk = self.get_next() 164 if not tsk: 165 if self.count: 166 # tasks may add new ones after they are run 167 continue 168 else: 169 # no tasks to run, no tasks running, time to exit 170 break 171 172 if tsk.hasrun: 173 # if the task is marked as "run", just skip it 174 self.processed += 1 175 self.manager.add_finished(tsk) 176 continue 177 178 try: 179 st = tsk.runnable_status() 180 except Exception, e: 181 self.processed += 1 182 if self.stop and not Options.options.keep: 183 tsk.hasrun = SKIPPED 184 self.manager.add_finished(tsk) 185 continue 186 self.error_handler(tsk) 187 self.manager.add_finished(tsk) 188 tsk.hasrun = EXCEPTION 189 tsk.err_msg = Utils.ex_stack() 190 continue 191 192 if st == ASK_LATER: 193 self.postpone(tsk) 194 elif st == SKIP_ME: 195 self.processed += 1 196 tsk.hasrun = SKIPPED 197 self.manager.add_finished(tsk) 198 else: 199 # run me: put the task in ready queue 200 tsk.position = (self.processed, self.total) 201 self.count += 1 202 self.processed += 1 203 tsk.master = self 204 205 process(tsk) 206 207 # self.count represents the tasks that have been made available to the consumer threads 208 # collect all the tasks after an error else the message may be incomplete 209 while self.error and self.count: 210 self.get_out() 211 212 #print loop 213 assert (self.count == 0 or self.stop) 74 """ 75 keep the consumer threads busy, and avoid consuming cpu cycles 76 when no more tasks can be added (end of the build, etc) 77 """ 78 def __init__(self, bld, j=2): 79 80 # number of consumers 81 self.numjobs = j 82 83 self.manager = bld.task_manager 84 self.manager.current_group = 0 85 86 self.total = self.manager.total() 87 88 # tasks waiting to be processed - IMPORTANT 89 self.outstanding = [] 90 self.maxjobs = MAXJOBS 91 92 # tasks that are awaiting for another task to complete 93 self.frozen = [] 94 95 # tasks returned by the consumers 96 self.out = Queue(0) 97 98 self.count = 0 # tasks not in the producer area 99 100 self.processed = 1 # progress indicator 101 102 self.stop = False # error condition to stop the build 103 self.error = False # error flag 104 105 def get_next(self): 106 "override this method to schedule the tasks in a particular order" 107 if not self.outstanding: 108 return None 109 return self.outstanding.pop(0) 110 111 def postpone(self, tsk): 112 "override this method to schedule the tasks in a particular order" 113 # TODO consider using a deque instead 114 if random.randint(0, 1): 115 self.frozen.insert(0, tsk) 116 else: 117 self.frozen.append(tsk) 118 119 def refill_task_list(self): 120 "called to set the next group of tasks" 121 122 while self.count > self.numjobs + GAP or self.count >= self.maxjobs: 123 self.get_out() 124 125 while not self.outstanding: 126 if self.count: 127 self.get_out() 128 129 if self.frozen: 130 self.outstanding += self.frozen 131 self.frozen = [] 132 elif not self.count: 133 (jobs, tmp) = self.manager.get_next_set() 134 if jobs is not None: 135 self.maxjobs = jobs 136 if tmp: 137 self.outstanding += tmp 138 break 139 140 def get_out(self): 141 "the tasks that are put to execute are all collected using get_out" 142 ret = self.out.get() 143 self.manager.add_finished(ret) 144 if not self.stop and getattr(ret, 'more_tasks', None): 145 self.outstanding += ret.more_tasks 146 self.total += len(ret.more_tasks) 147 self.count -= 1 148 149 def error_handler(self, tsk): 150 "by default, errors make the build stop (not thread safe so be careful)" 151 if not Options.options.keep: 152 self.stop = True 153 self.error = True 154 155 def start(self): 156 "execute the tasks" 157 158 while not self.stop: 159 160 self.refill_task_list() 161 162 # consider the next task 163 tsk = self.get_next() 164 if not tsk: 165 if self.count: 166 # tasks may add new ones after they are run 167 continue 168 else: 169 # no tasks to run, no tasks running, time to exit 170 break 171 172 if tsk.hasrun: 173 # if the task is marked as "run", just skip it 174 self.processed += 1 175 self.manager.add_finished(tsk) 176 continue 177 178 try: 179 st = tsk.runnable_status() 180 except Exception, e: 181 self.processed += 1 182 if self.stop and not Options.options.keep: 183 tsk.hasrun = SKIPPED 184 self.manager.add_finished(tsk) 185 continue 186 self.error_handler(tsk) 187 self.manager.add_finished(tsk) 188 tsk.hasrun = EXCEPTION 189 tsk.err_msg = Utils.ex_stack() 190 continue 191 192 if st == ASK_LATER: 193 self.postpone(tsk) 194 elif st == SKIP_ME: 195 self.processed += 1 196 tsk.hasrun = SKIPPED 197 self.manager.add_finished(tsk) 198 else: 199 # run me: put the task in ready queue 200 tsk.position = (self.processed, self.total) 201 self.count += 1 202 self.processed += 1 203 tsk.master = self 204 205 process(tsk) 206 207 # self.count represents the tasks that have been made available to the consumer threads 208 # collect all the tasks after an error else the message may be incomplete 209 while self.error and self.count: 210 self.get_out() 211 212 #print loop 213 assert (self.count == 0 or self.stop) 214 214 215 215
Note:
See TracChangeset
for help on using the changeset viewer.