| 1 | # A parallelized "find(1)" using the thread module.
|
|---|
| 2 |
|
|---|
| 3 | # This demonstrates the use of a work queue and worker threads.
|
|---|
| 4 | # It really does do more stats/sec when using multiple threads,
|
|---|
| 5 | # although the improvement is only about 20-30 percent.
|
|---|
| 6 | # (That was 8 years ago. In 2002, on Linux, I can't measure
|
|---|
| 7 | # a speedup. :-( )
|
|---|
| 8 |
|
|---|
| 9 | # I'm too lazy to write a command line parser for the full find(1)
|
|---|
| 10 | # command line syntax, so the predicate it searches for is wired-in,
|
|---|
| 11 | # see function selector() below. (It currently searches for files with
|
|---|
| 12 | # world write permission.)
|
|---|
| 13 |
|
|---|
| 14 | # Usage: parfind.py [-w nworkers] [directory] ...
|
|---|
| 15 | # Default nworkers is 4
|
|---|
| 16 |
|
|---|
| 17 |
|
|---|
| 18 | import sys
|
|---|
| 19 | import getopt
|
|---|
| 20 | import string
|
|---|
| 21 | import time
|
|---|
| 22 | import os
|
|---|
| 23 | from stat import *
|
|---|
| 24 | import thread
|
|---|
| 25 |
|
|---|
| 26 |
|
|---|
| 27 | # Work queue class. Usage:
|
|---|
| 28 | # wq = WorkQ()
|
|---|
| 29 | # wq.addwork(func, (arg1, arg2, ...)) # one or more calls
|
|---|
| 30 | # wq.run(nworkers)
|
|---|
| 31 | # The work is done when wq.run() completes.
|
|---|
| 32 | # The function calls executed by the workers may add more work.
|
|---|
| 33 | # Don't use keyboard interrupts!
|
|---|
| 34 |
|
|---|
| 35 | class WorkQ:
|
|---|
| 36 |
|
|---|
| 37 | # Invariants:
|
|---|
| 38 |
|
|---|
| 39 | # - busy and work are only modified when mutex is locked
|
|---|
| 40 | # - len(work) is the number of jobs ready to be taken
|
|---|
| 41 | # - busy is the number of jobs being done
|
|---|
| 42 | # - todo is locked iff there is no work and somebody is busy
|
|---|
| 43 |
|
|---|
| 44 | def __init__(self):
|
|---|
| 45 | self.mutex = thread.allocate()
|
|---|
| 46 | self.todo = thread.allocate()
|
|---|
| 47 | self.todo.acquire()
|
|---|
| 48 | self.work = []
|
|---|
| 49 | self.busy = 0
|
|---|
| 50 |
|
|---|
| 51 | def addwork(self, func, args):
|
|---|
| 52 | job = (func, args)
|
|---|
| 53 | self.mutex.acquire()
|
|---|
| 54 | self.work.append(job)
|
|---|
| 55 | self.mutex.release()
|
|---|
| 56 | if len(self.work) == 1:
|
|---|
| 57 | self.todo.release()
|
|---|
| 58 |
|
|---|
| 59 | def _getwork(self):
|
|---|
| 60 | self.todo.acquire()
|
|---|
| 61 | self.mutex.acquire()
|
|---|
| 62 | if self.busy == 0 and len(self.work) == 0:
|
|---|
| 63 | self.mutex.release()
|
|---|
| 64 | self.todo.release()
|
|---|
| 65 | return None
|
|---|
| 66 | job = self.work[0]
|
|---|
| 67 | del self.work[0]
|
|---|
| 68 | self.busy = self.busy + 1
|
|---|
| 69 | self.mutex.release()
|
|---|
| 70 | if len(self.work) > 0:
|
|---|
| 71 | self.todo.release()
|
|---|
| 72 | return job
|
|---|
| 73 |
|
|---|
| 74 | def _donework(self):
|
|---|
| 75 | self.mutex.acquire()
|
|---|
| 76 | self.busy = self.busy - 1
|
|---|
| 77 | if self.busy == 0 and len(self.work) == 0:
|
|---|
| 78 | self.todo.release()
|
|---|
| 79 | self.mutex.release()
|
|---|
| 80 |
|
|---|
| 81 | def _worker(self):
|
|---|
| 82 | time.sleep(0.00001) # Let other threads run
|
|---|
| 83 | while 1:
|
|---|
| 84 | job = self._getwork()
|
|---|
| 85 | if not job:
|
|---|
| 86 | break
|
|---|
| 87 | func, args = job
|
|---|
| 88 | apply(func, args)
|
|---|
| 89 | self._donework()
|
|---|
| 90 |
|
|---|
| 91 | def run(self, nworkers):
|
|---|
| 92 | if not self.work:
|
|---|
| 93 | return # Nothing to do
|
|---|
| 94 | for i in range(nworkers-1):
|
|---|
| 95 | thread.start_new(self._worker, ())
|
|---|
| 96 | self._worker()
|
|---|
| 97 | self.todo.acquire()
|
|---|
| 98 |
|
|---|
| 99 |
|
|---|
| 100 | # Main program
|
|---|
| 101 |
|
|---|
| 102 | def main():
|
|---|
| 103 | nworkers = 4
|
|---|
| 104 | opts, args = getopt.getopt(sys.argv[1:], '-w:')
|
|---|
| 105 | for opt, arg in opts:
|
|---|
| 106 | if opt == '-w':
|
|---|
| 107 | nworkers = string.atoi(arg)
|
|---|
| 108 | if not args:
|
|---|
| 109 | args = [os.curdir]
|
|---|
| 110 |
|
|---|
| 111 | wq = WorkQ()
|
|---|
| 112 | for dir in args:
|
|---|
| 113 | wq.addwork(find, (dir, selector, wq))
|
|---|
| 114 |
|
|---|
| 115 | t1 = time.time()
|
|---|
| 116 | wq.run(nworkers)
|
|---|
| 117 | t2 = time.time()
|
|---|
| 118 |
|
|---|
| 119 | sys.stderr.write('Total time %r sec.\n' % (t2-t1))
|
|---|
| 120 |
|
|---|
| 121 |
|
|---|
| 122 | # The predicate -- defines what files we look for.
|
|---|
| 123 | # Feel free to change this to suit your purpose
|
|---|
| 124 |
|
|---|
| 125 | def selector(dir, name, fullname, stat):
|
|---|
| 126 | # Look for world writable files that are not symlinks
|
|---|
| 127 | return (stat[ST_MODE] & 0002) != 0 and not S_ISLNK(stat[ST_MODE])
|
|---|
| 128 |
|
|---|
| 129 |
|
|---|
| 130 | # The find procedure -- calls wq.addwork() for subdirectories
|
|---|
| 131 |
|
|---|
| 132 | def find(dir, pred, wq):
|
|---|
| 133 | try:
|
|---|
| 134 | names = os.listdir(dir)
|
|---|
| 135 | except os.error, msg:
|
|---|
| 136 | print repr(dir), ':', msg
|
|---|
| 137 | return
|
|---|
| 138 | for name in names:
|
|---|
| 139 | if name not in (os.curdir, os.pardir):
|
|---|
| 140 | fullname = os.path.join(dir, name)
|
|---|
| 141 | try:
|
|---|
| 142 | stat = os.lstat(fullname)
|
|---|
| 143 | except os.error, msg:
|
|---|
| 144 | print repr(fullname), ':', msg
|
|---|
| 145 | continue
|
|---|
| 146 | if pred(dir, name, fullname, stat):
|
|---|
| 147 | print fullname
|
|---|
| 148 | if S_ISDIR(stat[ST_MODE]):
|
|---|
| 149 | if not os.path.ismount(fullname):
|
|---|
| 150 | wq.addwork(find, (fullname, pred, wq))
|
|---|
| 151 |
|
|---|
| 152 |
|
|---|
| 153 | # Call the main program
|
|---|
| 154 |
|
|---|
| 155 | main()
|
|---|