| 1 | """Synchronization metaclass.
|
|---|
| 2 |
|
|---|
| 3 | This metaclass makes it possible to declare synchronized methods.
|
|---|
| 4 |
|
|---|
| 5 | """
|
|---|
| 6 |
|
|---|
| 7 | import thread
|
|---|
| 8 |
|
|---|
| 9 | # First we need to define a reentrant lock.
|
|---|
| 10 | # This is generally useful and should probably be in a standard Python
|
|---|
| 11 | # library module. For now, we in-line it.
|
|---|
| 12 |
|
|---|
| 13 | class Lock:
|
|---|
| 14 |
|
|---|
| 15 | """Reentrant lock.
|
|---|
| 16 |
|
|---|
| 17 | This is a mutex-like object which can be acquired by the same
|
|---|
| 18 | thread more than once. It keeps a reference count of the number
|
|---|
| 19 | of times it has been acquired by the same thread. Each acquire()
|
|---|
| 20 | call must be matched by a release() call and only the last
|
|---|
| 21 | release() call actually releases the lock for acquisition by
|
|---|
| 22 | another thread.
|
|---|
| 23 |
|
|---|
| 24 | The implementation uses two locks internally:
|
|---|
| 25 |
|
|---|
| 26 | __mutex is a short term lock used to protect the instance variables
|
|---|
| 27 | __wait is the lock for which other threads wait
|
|---|
| 28 |
|
|---|
| 29 | A thread intending to acquire both locks should acquire __wait
|
|---|
| 30 | first.
|
|---|
| 31 |
|
|---|
| 32 | The implementation uses two other instance variables, protected by
|
|---|
| 33 | locking __mutex:
|
|---|
| 34 |
|
|---|
| 35 | __tid is the thread ID of the thread that currently has the lock
|
|---|
| 36 | __count is the number of times the current thread has acquired it
|
|---|
| 37 |
|
|---|
| 38 | When the lock is released, __tid is None and __count is zero.
|
|---|
| 39 |
|
|---|
| 40 | """
|
|---|
| 41 |
|
|---|
| 42 | def __init__(self):
|
|---|
| 43 | """Constructor. Initialize all instance variables."""
|
|---|
| 44 | self.__mutex = thread.allocate_lock()
|
|---|
| 45 | self.__wait = thread.allocate_lock()
|
|---|
| 46 | self.__tid = None
|
|---|
| 47 | self.__count = 0
|
|---|
| 48 |
|
|---|
| 49 | def acquire(self, flag=1):
|
|---|
| 50 | """Acquire the lock.
|
|---|
| 51 |
|
|---|
| 52 | If the optional flag argument is false, returns immediately
|
|---|
| 53 | when it cannot acquire the __wait lock without blocking (it
|
|---|
| 54 | may still block for a little while in order to acquire the
|
|---|
| 55 | __mutex lock).
|
|---|
| 56 |
|
|---|
| 57 | The return value is only relevant when the flag argument is
|
|---|
| 58 | false; it is 1 if the lock is acquired, 0 if not.
|
|---|
| 59 |
|
|---|
| 60 | """
|
|---|
| 61 | self.__mutex.acquire()
|
|---|
| 62 | try:
|
|---|
| 63 | if self.__tid == thread.get_ident():
|
|---|
| 64 | self.__count = self.__count + 1
|
|---|
| 65 | return 1
|
|---|
| 66 | finally:
|
|---|
| 67 | self.__mutex.release()
|
|---|
| 68 | locked = self.__wait.acquire(flag)
|
|---|
| 69 | if not flag and not locked:
|
|---|
| 70 | return 0
|
|---|
| 71 | try:
|
|---|
| 72 | self.__mutex.acquire()
|
|---|
| 73 | assert self.__tid == None
|
|---|
| 74 | assert self.__count == 0
|
|---|
| 75 | self.__tid = thread.get_ident()
|
|---|
| 76 | self.__count = 1
|
|---|
| 77 | return 1
|
|---|
| 78 | finally:
|
|---|
| 79 | self.__mutex.release()
|
|---|
| 80 |
|
|---|
| 81 | def release(self):
|
|---|
| 82 | """Release the lock.
|
|---|
| 83 |
|
|---|
| 84 | If this thread doesn't currently have the lock, an assertion
|
|---|
| 85 | error is raised.
|
|---|
| 86 |
|
|---|
| 87 | Only allow another thread to acquire the lock when the count
|
|---|
| 88 | reaches zero after decrementing it.
|
|---|
| 89 |
|
|---|
| 90 | """
|
|---|
| 91 | self.__mutex.acquire()
|
|---|
| 92 | try:
|
|---|
| 93 | assert self.__tid == thread.get_ident()
|
|---|
| 94 | assert self.__count > 0
|
|---|
| 95 | self.__count = self.__count - 1
|
|---|
| 96 | if self.__count == 0:
|
|---|
| 97 | self.__tid = None
|
|---|
| 98 | self.__wait.release()
|
|---|
| 99 | finally:
|
|---|
| 100 | self.__mutex.release()
|
|---|
| 101 |
|
|---|
| 102 |
|
|---|
| 103 | def _testLock():
|
|---|
| 104 |
|
|---|
| 105 | done = []
|
|---|
| 106 |
|
|---|
| 107 | def f2(lock, done=done):
|
|---|
| 108 | lock.acquire()
|
|---|
| 109 | print "f2 running in thread %d\n" % thread.get_ident(),
|
|---|
| 110 | lock.release()
|
|---|
| 111 | done.append(1)
|
|---|
| 112 |
|
|---|
| 113 | def f1(lock, f2=f2, done=done):
|
|---|
| 114 | lock.acquire()
|
|---|
| 115 | print "f1 running in thread %d\n" % thread.get_ident(),
|
|---|
| 116 | try:
|
|---|
| 117 | f2(lock)
|
|---|
| 118 | finally:
|
|---|
| 119 | lock.release()
|
|---|
| 120 | done.append(1)
|
|---|
| 121 |
|
|---|
| 122 | lock = Lock()
|
|---|
| 123 | lock.acquire()
|
|---|
| 124 | f1(lock) # Adds 2 to done
|
|---|
| 125 | lock.release()
|
|---|
| 126 |
|
|---|
| 127 | lock.acquire()
|
|---|
| 128 |
|
|---|
| 129 | thread.start_new_thread(f1, (lock,)) # Adds 2
|
|---|
| 130 | thread.start_new_thread(f1, (lock, f1)) # Adds 3
|
|---|
| 131 | thread.start_new_thread(f2, (lock,)) # Adds 1
|
|---|
| 132 | thread.start_new_thread(f2, (lock,)) # Adds 1
|
|---|
| 133 |
|
|---|
| 134 | lock.release()
|
|---|
| 135 | import time
|
|---|
| 136 | while len(done) < 9:
|
|---|
| 137 | print len(done)
|
|---|
| 138 | time.sleep(0.001)
|
|---|
| 139 | print len(done)
|
|---|
| 140 |
|
|---|
| 141 |
|
|---|
| 142 | # Now, the Locking metaclass is a piece of cake.
|
|---|
| 143 | # As an example feature, methods whose name begins with exactly one
|
|---|
| 144 | # underscore are not synchronized.
|
|---|
| 145 |
|
|---|
| 146 | from Meta import MetaClass, MetaHelper, MetaMethodWrapper
|
|---|
| 147 |
|
|---|
| 148 | class LockingMethodWrapper(MetaMethodWrapper):
|
|---|
| 149 | def __call__(self, *args, **kw):
|
|---|
| 150 | if self.__name__[:1] == '_' and self.__name__[1:] != '_':
|
|---|
| 151 | return apply(self.func, (self.inst,) + args, kw)
|
|---|
| 152 | self.inst.__lock__.acquire()
|
|---|
| 153 | try:
|
|---|
| 154 | return apply(self.func, (self.inst,) + args, kw)
|
|---|
| 155 | finally:
|
|---|
| 156 | self.inst.__lock__.release()
|
|---|
| 157 |
|
|---|
| 158 | class LockingHelper(MetaHelper):
|
|---|
| 159 | __methodwrapper__ = LockingMethodWrapper
|
|---|
| 160 | def __helperinit__(self, formalclass):
|
|---|
| 161 | MetaHelper.__helperinit__(self, formalclass)
|
|---|
| 162 | self.__lock__ = Lock()
|
|---|
| 163 |
|
|---|
| 164 | class LockingMetaClass(MetaClass):
|
|---|
| 165 | __helper__ = LockingHelper
|
|---|
| 166 |
|
|---|
| 167 | Locking = LockingMetaClass('Locking', (), {})
|
|---|
| 168 |
|
|---|
| 169 | def _test():
|
|---|
| 170 | # For kicks, take away the Locking base class and see it die
|
|---|
| 171 | class Buffer(Locking):
|
|---|
| 172 | def __init__(self, initialsize):
|
|---|
| 173 | assert initialsize > 0
|
|---|
| 174 | self.size = initialsize
|
|---|
| 175 | self.buffer = [None]*self.size
|
|---|
| 176 | self.first = self.last = 0
|
|---|
| 177 | def put(self, item):
|
|---|
| 178 | # Do we need to grow the buffer?
|
|---|
| 179 | if (self.last+1) % self.size != self.first:
|
|---|
| 180 | # Insert the new item
|
|---|
| 181 | self.buffer[self.last] = item
|
|---|
| 182 | self.last = (self.last+1) % self.size
|
|---|
| 183 | return
|
|---|
| 184 | # Double the buffer size
|
|---|
| 185 | # First normalize it so that first==0 and last==size-1
|
|---|
| 186 | print "buffer =", self.buffer
|
|---|
| 187 | print "first = %d, last = %d, size = %d" % (
|
|---|
| 188 | self.first, self.last, self.size)
|
|---|
| 189 | if self.first <= self.last:
|
|---|
| 190 | temp = self.buffer[self.first:self.last]
|
|---|
| 191 | else:
|
|---|
| 192 | temp = self.buffer[self.first:] + self.buffer[:self.last]
|
|---|
| 193 | print "temp =", temp
|
|---|
| 194 | self.buffer = temp + [None]*(self.size+1)
|
|---|
| 195 | self.first = 0
|
|---|
| 196 | self.last = self.size-1
|
|---|
| 197 | self.size = self.size*2
|
|---|
| 198 | print "Buffer size doubled to", self.size
|
|---|
| 199 | print "new buffer =", self.buffer
|
|---|
| 200 | print "first = %d, last = %d, size = %d" % (
|
|---|
| 201 | self.first, self.last, self.size)
|
|---|
| 202 | self.put(item) # Recursive call to test the locking
|
|---|
| 203 | def get(self):
|
|---|
| 204 | # Is the buffer empty?
|
|---|
| 205 | if self.first == self.last:
|
|---|
| 206 | raise EOFError # Avoid defining a new exception
|
|---|
| 207 | item = self.buffer[self.first]
|
|---|
| 208 | self.first = (self.first+1) % self.size
|
|---|
| 209 | return item
|
|---|
| 210 |
|
|---|
| 211 | def producer(buffer, wait, n=1000):
|
|---|
| 212 | import time
|
|---|
| 213 | i = 0
|
|---|
| 214 | while i < n:
|
|---|
| 215 | print "put", i
|
|---|
| 216 | buffer.put(i)
|
|---|
| 217 | i = i+1
|
|---|
| 218 | print "Producer: done producing", n, "items"
|
|---|
| 219 | wait.release()
|
|---|
| 220 |
|
|---|
| 221 | def consumer(buffer, wait, n=1000):
|
|---|
| 222 | import time
|
|---|
| 223 | i = 0
|
|---|
| 224 | tout = 0.001
|
|---|
| 225 | while i < n:
|
|---|
| 226 | try:
|
|---|
| 227 | x = buffer.get()
|
|---|
| 228 | if x != i:
|
|---|
| 229 | raise AssertionError, \
|
|---|
| 230 | "get() returned %s, expected %s" % (x, i)
|
|---|
| 231 | print "got", i
|
|---|
| 232 | i = i+1
|
|---|
| 233 | tout = 0.001
|
|---|
| 234 | except EOFError:
|
|---|
| 235 | time.sleep(tout)
|
|---|
| 236 | tout = tout*2
|
|---|
| 237 | print "Consumer: done consuming", n, "items"
|
|---|
| 238 | wait.release()
|
|---|
| 239 |
|
|---|
| 240 | pwait = thread.allocate_lock()
|
|---|
| 241 | pwait.acquire()
|
|---|
| 242 | cwait = thread.allocate_lock()
|
|---|
| 243 | cwait.acquire()
|
|---|
| 244 | buffer = Buffer(1)
|
|---|
| 245 | n = 1000
|
|---|
| 246 | thread.start_new_thread(consumer, (buffer, cwait, n))
|
|---|
| 247 | thread.start_new_thread(producer, (buffer, pwait, n))
|
|---|
| 248 | pwait.acquire()
|
|---|
| 249 | print "Producer done"
|
|---|
| 250 | cwait.acquire()
|
|---|
| 251 | print "All done"
|
|---|
| 252 | print "buffer size ==", len(buffer.buffer)
|
|---|
| 253 |
|
|---|
| 254 | if __name__ == '__main__':
|
|---|
| 255 | _testLock()
|
|---|
| 256 | _test()
|
|---|