source: python/trunk/Lib/bsddb/test/test_lock.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 6.1 KB
Line 
1"""
2TestCases for testing the locking sub-system.
3"""
4
5import time
6
7import unittest
8from test_all import db, test_support, verbose, have_threads, \
9 get_new_environment_path, get_new_database_path
10
11if have_threads :
12 from threading import Thread
13 import sys
14 if sys.version_info[0] < 3 :
15 from threading import currentThread
16 else :
17 from threading import current_thread as currentThread
18
19#----------------------------------------------------------------------
20
21class LockingTestCase(unittest.TestCase):
22 def setUp(self):
23 self.homeDir = get_new_environment_path()
24 self.env = db.DBEnv()
25 self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
26 db.DB_INIT_LOCK | db.DB_CREATE)
27
28
29 def tearDown(self):
30 self.env.close()
31 test_support.rmtree(self.homeDir)
32
33
34 def test01_simple(self):
35 if verbose:
36 print '\n', '-=' * 30
37 print "Running %s.test01_simple..." % self.__class__.__name__
38
39 anID = self.env.lock_id()
40 if verbose:
41 print "locker ID: %s" % anID
42 lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
43 if verbose:
44 print "Aquired lock: %s" % lock
45 self.env.lock_put(lock)
46 if verbose:
47 print "Released lock: %s" % lock
48 self.env.lock_id_free(anID)
49
50
51 def test02_threaded(self):
52 if verbose:
53 print '\n', '-=' * 30
54 print "Running %s.test02_threaded..." % self.__class__.__name__
55
56 threads = []
57 threads.append(Thread(target = self.theThread,
58 args=(db.DB_LOCK_WRITE,)))
59 threads.append(Thread(target = self.theThread,
60 args=(db.DB_LOCK_READ,)))
61 threads.append(Thread(target = self.theThread,
62 args=(db.DB_LOCK_READ,)))
63 threads.append(Thread(target = self.theThread,
64 args=(db.DB_LOCK_WRITE,)))
65 threads.append(Thread(target = self.theThread,
66 args=(db.DB_LOCK_READ,)))
67 threads.append(Thread(target = self.theThread,
68 args=(db.DB_LOCK_READ,)))
69 threads.append(Thread(target = self.theThread,
70 args=(db.DB_LOCK_WRITE,)))
71 threads.append(Thread(target = self.theThread,
72 args=(db.DB_LOCK_WRITE,)))
73 threads.append(Thread(target = self.theThread,
74 args=(db.DB_LOCK_WRITE,)))
75
76 for t in threads:
77 import sys
78 if sys.version_info[0] < 3 :
79 t.setDaemon(True)
80 else :
81 t.daemon = True
82 t.start()
83 for t in threads:
84 t.join()
85
86 def test03_lock_timeout(self):
87 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
88 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)
89 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
90 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)
91 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
92 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)
93 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
94 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)
95
96 def test04_lock_timeout2(self):
97 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
98 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
99 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
100 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
101
102 def deadlock_detection() :
103 while not deadlock_detection.end :
104 deadlock_detection.count = \
105 self.env.lock_detect(db.DB_LOCK_EXPIRE)
106 if deadlock_detection.count :
107 while not deadlock_detection.end :
108 pass
109 break
110 time.sleep(0.01)
111
112 deadlock_detection.end=False
113 deadlock_detection.count=0
114 t=Thread(target=deadlock_detection)
115 import sys
116 if sys.version_info[0] < 3 :
117 t.setDaemon(True)
118 else :
119 t.daemon = True
120 t.start()
121 self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
122 anID = self.env.lock_id()
123 anID2 = self.env.lock_id()
124 self.assertNotEqual(anID, anID2)
125 lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
126 start_time=time.time()
127 self.assertRaises(db.DBLockNotGrantedError,
128 self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
129 end_time=time.time()
130 deadlock_detection.end=True
131 # Floating point rounding
132 self.assertTrue((end_time-start_time) >= 0.0999)
133 self.env.lock_put(lock)
134 t.join()
135
136 self.env.lock_id_free(anID)
137 self.env.lock_id_free(anID2)
138
139 if db.version() >= (4,6):
140 self.assertTrue(deadlock_detection.count>0)
141
142 def theThread(self, lockType):
143 import sys
144 if sys.version_info[0] < 3 :
145 name = currentThread().getName()
146 else :
147 name = currentThread().name
148
149 if lockType == db.DB_LOCK_WRITE:
150 lt = "write"
151 else:
152 lt = "read"
153
154 anID = self.env.lock_id()
155 if verbose:
156 print "%s: locker ID: %s" % (name, anID)
157
158 for i in xrange(1000) :
159 lock = self.env.lock_get(anID, "some locked thing", lockType)
160 if verbose:
161 print "%s: Aquired %s lock: %s" % (name, lt, lock)
162
163 self.env.lock_put(lock)
164 if verbose:
165 print "%s: Released %s lock: %s" % (name, lt, lock)
166
167 self.env.lock_id_free(anID)
168
169
170#----------------------------------------------------------------------
171
172def test_suite():
173 suite = unittest.TestSuite()
174
175 if have_threads:
176 suite.addTest(unittest.makeSuite(LockingTestCase))
177 else:
178 suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
179
180 return suite
181
182
183if __name__ == '__main__':
184 unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.