1 | """Synchronization metaclass.
|
---|
2 |
|
---|
3 | This metaclass makes it possible to declare synchronized methods.
|
---|
4 |
|
---|
5 | """
|
---|
6 |
|
---|
7 | import thread
|
---|
8 |
|
---|
9 | # First we need to define a reentrant lock.
|
---|
10 | # This is generally useful and should probably be in a standard Python
|
---|
11 | # library module. For now, we in-line it.
|
---|
12 |
|
---|
13 | class Lock:
|
---|
14 |
|
---|
15 | """Reentrant lock.
|
---|
16 |
|
---|
17 | This is a mutex-like object which can be acquired by the same
|
---|
18 | thread more than once. It keeps a reference count of the number
|
---|
19 | of times it has been acquired by the same thread. Each acquire()
|
---|
20 | call must be matched by a release() call and only the last
|
---|
21 | release() call actually releases the lock for acquisition by
|
---|
22 | another thread.
|
---|
23 |
|
---|
24 | The implementation uses two locks internally:
|
---|
25 |
|
---|
26 | __mutex is a short term lock used to protect the instance variables
|
---|
27 | __wait is the lock for which other threads wait
|
---|
28 |
|
---|
29 | A thread intending to acquire both locks should acquire __wait
|
---|
30 | first.
|
---|
31 |
|
---|
32 | The implementation uses two other instance variables, protected by
|
---|
33 | locking __mutex:
|
---|
34 |
|
---|
35 | __tid is the thread ID of the thread that currently has the lock
|
---|
36 | __count is the number of times the current thread has acquired it
|
---|
37 |
|
---|
38 | When the lock is released, __tid is None and __count is zero.
|
---|
39 |
|
---|
40 | """
|
---|
41 |
|
---|
42 | def __init__(self):
|
---|
43 | """Constructor. Initialize all instance variables."""
|
---|
44 | self.__mutex = thread.allocate_lock()
|
---|
45 | self.__wait = thread.allocate_lock()
|
---|
46 | self.__tid = None
|
---|
47 | self.__count = 0
|
---|
48 |
|
---|
49 | def acquire(self, flag=1):
|
---|
50 | """Acquire the lock.
|
---|
51 |
|
---|
52 | If the optional flag argument is false, returns immediately
|
---|
53 | when it cannot acquire the __wait lock without blocking (it
|
---|
54 | may still block for a little while in order to acquire the
|
---|
55 | __mutex lock).
|
---|
56 |
|
---|
57 | The return value is only relevant when the flag argument is
|
---|
58 | false; it is 1 if the lock is acquired, 0 if not.
|
---|
59 |
|
---|
60 | """
|
---|
61 | self.__mutex.acquire()
|
---|
62 | try:
|
---|
63 | if self.__tid == thread.get_ident():
|
---|
64 | self.__count = self.__count + 1
|
---|
65 | return 1
|
---|
66 | finally:
|
---|
67 | self.__mutex.release()
|
---|
68 | locked = self.__wait.acquire(flag)
|
---|
69 | if not flag and not locked:
|
---|
70 | return 0
|
---|
71 | try:
|
---|
72 | self.__mutex.acquire()
|
---|
73 | assert self.__tid == None
|
---|
74 | assert self.__count == 0
|
---|
75 | self.__tid = thread.get_ident()
|
---|
76 | self.__count = 1
|
---|
77 | return 1
|
---|
78 | finally:
|
---|
79 | self.__mutex.release()
|
---|
80 |
|
---|
81 | def release(self):
|
---|
82 | """Release the lock.
|
---|
83 |
|
---|
84 | If this thread doesn't currently have the lock, an assertion
|
---|
85 | error is raised.
|
---|
86 |
|
---|
87 | Only allow another thread to acquire the lock when the count
|
---|
88 | reaches zero after decrementing it.
|
---|
89 |
|
---|
90 | """
|
---|
91 | self.__mutex.acquire()
|
---|
92 | try:
|
---|
93 | assert self.__tid == thread.get_ident()
|
---|
94 | assert self.__count > 0
|
---|
95 | self.__count = self.__count - 1
|
---|
96 | if self.__count == 0:
|
---|
97 | self.__tid = None
|
---|
98 | self.__wait.release()
|
---|
99 | finally:
|
---|
100 | self.__mutex.release()
|
---|
101 |
|
---|
102 |
|
---|
103 | def _testLock():
|
---|
104 |
|
---|
105 | done = []
|
---|
106 |
|
---|
107 | def f2(lock, done=done):
|
---|
108 | lock.acquire()
|
---|
109 | print "f2 running in thread %d\n" % thread.get_ident(),
|
---|
110 | lock.release()
|
---|
111 | done.append(1)
|
---|
112 |
|
---|
113 | def f1(lock, f2=f2, done=done):
|
---|
114 | lock.acquire()
|
---|
115 | print "f1 running in thread %d\n" % thread.get_ident(),
|
---|
116 | try:
|
---|
117 | f2(lock)
|
---|
118 | finally:
|
---|
119 | lock.release()
|
---|
120 | done.append(1)
|
---|
121 |
|
---|
122 | lock = Lock()
|
---|
123 | lock.acquire()
|
---|
124 | f1(lock) # Adds 2 to done
|
---|
125 | lock.release()
|
---|
126 |
|
---|
127 | lock.acquire()
|
---|
128 |
|
---|
129 | thread.start_new_thread(f1, (lock,)) # Adds 2
|
---|
130 | thread.start_new_thread(f1, (lock, f1)) # Adds 3
|
---|
131 | thread.start_new_thread(f2, (lock,)) # Adds 1
|
---|
132 | thread.start_new_thread(f2, (lock,)) # Adds 1
|
---|
133 |
|
---|
134 | lock.release()
|
---|
135 | import time
|
---|
136 | while len(done) < 9:
|
---|
137 | print len(done)
|
---|
138 | time.sleep(0.001)
|
---|
139 | print len(done)
|
---|
140 |
|
---|
141 |
|
---|
142 | # Now, the Locking metaclass is a piece of cake.
|
---|
143 | # As an example feature, methods whose name begins with exactly one
|
---|
144 | # underscore are not synchronized.
|
---|
145 |
|
---|
146 | from Meta import MetaClass, MetaHelper, MetaMethodWrapper
|
---|
147 |
|
---|
148 | class LockingMethodWrapper(MetaMethodWrapper):
|
---|
149 | def __call__(self, *args, **kw):
|
---|
150 | if self.__name__[:1] == '_' and self.__name__[1:] != '_':
|
---|
151 | return apply(self.func, (self.inst,) + args, kw)
|
---|
152 | self.inst.__lock__.acquire()
|
---|
153 | try:
|
---|
154 | return apply(self.func, (self.inst,) + args, kw)
|
---|
155 | finally:
|
---|
156 | self.inst.__lock__.release()
|
---|
157 |
|
---|
158 | class LockingHelper(MetaHelper):
|
---|
159 | __methodwrapper__ = LockingMethodWrapper
|
---|
160 | def __helperinit__(self, formalclass):
|
---|
161 | MetaHelper.__helperinit__(self, formalclass)
|
---|
162 | self.__lock__ = Lock()
|
---|
163 |
|
---|
164 | class LockingMetaClass(MetaClass):
|
---|
165 | __helper__ = LockingHelper
|
---|
166 |
|
---|
167 | Locking = LockingMetaClass('Locking', (), {})
|
---|
168 |
|
---|
169 | def _test():
|
---|
170 | # For kicks, take away the Locking base class and see it die
|
---|
171 | class Buffer(Locking):
|
---|
172 | def __init__(self, initialsize):
|
---|
173 | assert initialsize > 0
|
---|
174 | self.size = initialsize
|
---|
175 | self.buffer = [None]*self.size
|
---|
176 | self.first = self.last = 0
|
---|
177 | def put(self, item):
|
---|
178 | # Do we need to grow the buffer?
|
---|
179 | if (self.last+1) % self.size != self.first:
|
---|
180 | # Insert the new item
|
---|
181 | self.buffer[self.last] = item
|
---|
182 | self.last = (self.last+1) % self.size
|
---|
183 | return
|
---|
184 | # Double the buffer size
|
---|
185 | # First normalize it so that first==0 and last==size-1
|
---|
186 | print "buffer =", self.buffer
|
---|
187 | print "first = %d, last = %d, size = %d" % (
|
---|
188 | self.first, self.last, self.size)
|
---|
189 | if self.first <= self.last:
|
---|
190 | temp = self.buffer[self.first:self.last]
|
---|
191 | else:
|
---|
192 | temp = self.buffer[self.first:] + self.buffer[:self.last]
|
---|
193 | print "temp =", temp
|
---|
194 | self.buffer = temp + [None]*(self.size+1)
|
---|
195 | self.first = 0
|
---|
196 | self.last = self.size-1
|
---|
197 | self.size = self.size*2
|
---|
198 | print "Buffer size doubled to", self.size
|
---|
199 | print "new buffer =", self.buffer
|
---|
200 | print "first = %d, last = %d, size = %d" % (
|
---|
201 | self.first, self.last, self.size)
|
---|
202 | self.put(item) # Recursive call to test the locking
|
---|
203 | def get(self):
|
---|
204 | # Is the buffer empty?
|
---|
205 | if self.first == self.last:
|
---|
206 | raise EOFError # Avoid defining a new exception
|
---|
207 | item = self.buffer[self.first]
|
---|
208 | self.first = (self.first+1) % self.size
|
---|
209 | return item
|
---|
210 |
|
---|
211 | def producer(buffer, wait, n=1000):
|
---|
212 | import time
|
---|
213 | i = 0
|
---|
214 | while i < n:
|
---|
215 | print "put", i
|
---|
216 | buffer.put(i)
|
---|
217 | i = i+1
|
---|
218 | print "Producer: done producing", n, "items"
|
---|
219 | wait.release()
|
---|
220 |
|
---|
221 | def consumer(buffer, wait, n=1000):
|
---|
222 | import time
|
---|
223 | i = 0
|
---|
224 | tout = 0.001
|
---|
225 | while i < n:
|
---|
226 | try:
|
---|
227 | x = buffer.get()
|
---|
228 | if x != i:
|
---|
229 | raise AssertionError, \
|
---|
230 | "get() returned %s, expected %s" % (x, i)
|
---|
231 | print "got", i
|
---|
232 | i = i+1
|
---|
233 | tout = 0.001
|
---|
234 | except EOFError:
|
---|
235 | time.sleep(tout)
|
---|
236 | tout = tout*2
|
---|
237 | print "Consumer: done consuming", n, "items"
|
---|
238 | wait.release()
|
---|
239 |
|
---|
240 | pwait = thread.allocate_lock()
|
---|
241 | pwait.acquire()
|
---|
242 | cwait = thread.allocate_lock()
|
---|
243 | cwait.acquire()
|
---|
244 | buffer = Buffer(1)
|
---|
245 | n = 1000
|
---|
246 | thread.start_new_thread(consumer, (buffer, cwait, n))
|
---|
247 | thread.start_new_thread(producer, (buffer, pwait, n))
|
---|
248 | pwait.acquire()
|
---|
249 | print "Producer done"
|
---|
250 | cwait.acquire()
|
---|
251 | print "All done"
|
---|
252 | print "buffer size ==", len(buffer.buffer)
|
---|
253 |
|
---|
254 | if __name__ == '__main__':
|
---|
255 | _testLock()
|
---|
256 | _test()
|
---|