1 | #! /usr/bin/env python
|
---|
2 | """Test script for the bsddb C module by Roger E. Masse
|
---|
3 | Adapted to unittest format and expanded scope by Raymond Hettinger
|
---|
4 | """
|
---|
5 | import os, sys
|
---|
6 | import unittest
|
---|
7 | from test import test_support
|
---|
8 |
|
---|
9 | # Skip test if _bsddb wasn't built.
|
---|
10 | test_support.import_module('_bsddb')
|
---|
11 |
|
---|
12 | bsddb = test_support.import_module('bsddb', deprecated=True)
|
---|
13 | # Just so we know it's imported:
|
---|
14 | test_support.import_module('dbhash', deprecated=True)
|
---|
15 |
|
---|
16 |
|
---|
17 | class TestBSDDB(unittest.TestCase):
|
---|
18 | openflag = 'c'
|
---|
19 |
|
---|
20 | def setUp(self):
|
---|
21 | self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
|
---|
22 | self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
|
---|
23 | for k, v in self.d.iteritems():
|
---|
24 | self.f[k] = v
|
---|
25 |
|
---|
26 | def tearDown(self):
|
---|
27 | self.f.sync()
|
---|
28 | self.f.close()
|
---|
29 | if self.fname is None:
|
---|
30 | return
|
---|
31 | try:
|
---|
32 | os.remove(self.fname)
|
---|
33 | except os.error:
|
---|
34 | pass
|
---|
35 |
|
---|
36 | def test_getitem(self):
|
---|
37 | for k, v in self.d.iteritems():
|
---|
38 | self.assertEqual(self.f[k], v)
|
---|
39 |
|
---|
40 | def test_len(self):
|
---|
41 | self.assertEqual(len(self.f), len(self.d))
|
---|
42 |
|
---|
43 | def test_change(self):
|
---|
44 | self.f['r'] = 'discovered'
|
---|
45 | self.assertEqual(self.f['r'], 'discovered')
|
---|
46 | self.assertIn('r', self.f.keys())
|
---|
47 | self.assertIn('discovered', self.f.values())
|
---|
48 |
|
---|
49 | def test_close_and_reopen(self):
|
---|
50 | if self.fname is None:
|
---|
51 | # if we're using an in-memory only db, we can't reopen it
|
---|
52 | # so finish here.
|
---|
53 | return
|
---|
54 | self.f.close()
|
---|
55 | self.f = self.openmethod[0](self.fname, 'w')
|
---|
56 | for k, v in self.d.iteritems():
|
---|
57 | self.assertEqual(self.f[k], v)
|
---|
58 |
|
---|
59 | def assertSetEquals(self, seqn1, seqn2):
|
---|
60 | self.assertEqual(set(seqn1), set(seqn2))
|
---|
61 |
|
---|
62 | def test_mapping_iteration_methods(self):
|
---|
63 | f = self.f
|
---|
64 | d = self.d
|
---|
65 | self.assertSetEquals(d, f)
|
---|
66 | self.assertSetEquals(d.keys(), f.keys())
|
---|
67 | self.assertSetEquals(d.values(), f.values())
|
---|
68 | self.assertSetEquals(d.items(), f.items())
|
---|
69 | self.assertSetEquals(d.iterkeys(), f.iterkeys())
|
---|
70 | self.assertSetEquals(d.itervalues(), f.itervalues())
|
---|
71 | self.assertSetEquals(d.iteritems(), f.iteritems())
|
---|
72 |
|
---|
73 | def test_iter_while_modifying_values(self):
|
---|
74 | di = iter(self.d)
|
---|
75 | while 1:
|
---|
76 | try:
|
---|
77 | key = di.next()
|
---|
78 | self.d[key] = 'modified '+key
|
---|
79 | except StopIteration:
|
---|
80 | break
|
---|
81 |
|
---|
82 | # it should behave the same as a dict. modifying values
|
---|
83 | # of existing keys should not break iteration. (adding
|
---|
84 | # or removing keys should)
|
---|
85 | loops_left = len(self.f)
|
---|
86 | fi = iter(self.f)
|
---|
87 | while 1:
|
---|
88 | try:
|
---|
89 | key = fi.next()
|
---|
90 | self.f[key] = 'modified '+key
|
---|
91 | loops_left -= 1
|
---|
92 | except StopIteration:
|
---|
93 | break
|
---|
94 | self.assertEqual(loops_left, 0)
|
---|
95 |
|
---|
96 | self.test_mapping_iteration_methods()
|
---|
97 |
|
---|
98 | def test_iter_abort_on_changed_size(self):
|
---|
99 | def DictIterAbort():
|
---|
100 | di = iter(self.d)
|
---|
101 | while 1:
|
---|
102 | try:
|
---|
103 | di.next()
|
---|
104 | self.d['newkey'] = 'SPAM'
|
---|
105 | except StopIteration:
|
---|
106 | break
|
---|
107 | self.assertRaises(RuntimeError, DictIterAbort)
|
---|
108 |
|
---|
109 | def DbIterAbort():
|
---|
110 | fi = iter(self.f)
|
---|
111 | while 1:
|
---|
112 | try:
|
---|
113 | fi.next()
|
---|
114 | self.f['newkey'] = 'SPAM'
|
---|
115 | except StopIteration:
|
---|
116 | break
|
---|
117 | self.assertRaises(RuntimeError, DbIterAbort)
|
---|
118 |
|
---|
119 | def test_iteritems_abort_on_changed_size(self):
|
---|
120 | def DictIteritemsAbort():
|
---|
121 | di = self.d.iteritems()
|
---|
122 | while 1:
|
---|
123 | try:
|
---|
124 | di.next()
|
---|
125 | self.d['newkey'] = 'SPAM'
|
---|
126 | except StopIteration:
|
---|
127 | break
|
---|
128 | self.assertRaises(RuntimeError, DictIteritemsAbort)
|
---|
129 |
|
---|
130 | def DbIteritemsAbort():
|
---|
131 | fi = self.f.iteritems()
|
---|
132 | while 1:
|
---|
133 | try:
|
---|
134 | key, value = fi.next()
|
---|
135 | del self.f[key]
|
---|
136 | except StopIteration:
|
---|
137 | break
|
---|
138 | self.assertRaises(RuntimeError, DbIteritemsAbort)
|
---|
139 |
|
---|
140 | def test_iteritems_while_modifying_values(self):
|
---|
141 | di = self.d.iteritems()
|
---|
142 | while 1:
|
---|
143 | try:
|
---|
144 | k, v = di.next()
|
---|
145 | self.d[k] = 'modified '+v
|
---|
146 | except StopIteration:
|
---|
147 | break
|
---|
148 |
|
---|
149 | # it should behave the same as a dict. modifying values
|
---|
150 | # of existing keys should not break iteration. (adding
|
---|
151 | # or removing keys should)
|
---|
152 | loops_left = len(self.f)
|
---|
153 | fi = self.f.iteritems()
|
---|
154 | while 1:
|
---|
155 | try:
|
---|
156 | k, v = fi.next()
|
---|
157 | self.f[k] = 'modified '+v
|
---|
158 | loops_left -= 1
|
---|
159 | except StopIteration:
|
---|
160 | break
|
---|
161 | self.assertEqual(loops_left, 0)
|
---|
162 |
|
---|
163 | self.test_mapping_iteration_methods()
|
---|
164 |
|
---|
165 | def test_first_next_looping(self):
|
---|
166 | items = [self.f.first()]
|
---|
167 | for i in xrange(1, len(self.f)):
|
---|
168 | items.append(self.f.next())
|
---|
169 | self.assertSetEquals(items, self.d.items())
|
---|
170 |
|
---|
171 | def test_previous_last_looping(self):
|
---|
172 | items = [self.f.last()]
|
---|
173 | for i in xrange(1, len(self.f)):
|
---|
174 | items.append(self.f.previous())
|
---|
175 | self.assertSetEquals(items, self.d.items())
|
---|
176 |
|
---|
177 | def test_first_while_deleting(self):
|
---|
178 | # Test for bug 1725856
|
---|
179 | self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
|
---|
180 | for _ in self.d:
|
---|
181 | key = self.f.first()[0]
|
---|
182 | del self.f[key]
|
---|
183 | self.assertEqual([], self.f.items(), "expected empty db after test")
|
---|
184 |
|
---|
185 | def test_last_while_deleting(self):
|
---|
186 | # Test for bug 1725856's evil twin
|
---|
187 | self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
|
---|
188 | for _ in self.d:
|
---|
189 | key = self.f.last()[0]
|
---|
190 | del self.f[key]
|
---|
191 | self.assertEqual([], self.f.items(), "expected empty db after test")
|
---|
192 |
|
---|
193 | def test_set_location(self):
|
---|
194 | self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
|
---|
195 |
|
---|
196 | def test_contains(self):
|
---|
197 | for k in self.d:
|
---|
198 | self.assertIn(k, self.f)
|
---|
199 | self.assertNotIn('not here', self.f)
|
---|
200 |
|
---|
201 | def test_has_key(self):
|
---|
202 | for k in self.d:
|
---|
203 | self.assertTrue(self.f.has_key(k))
|
---|
204 | self.assertTrue(not self.f.has_key('not here'))
|
---|
205 |
|
---|
206 | def test_clear(self):
|
---|
207 | self.f.clear()
|
---|
208 | self.assertEqual(len(self.f), 0)
|
---|
209 |
|
---|
210 | def test__no_deadlock_first(self, debug=0):
|
---|
211 | # do this so that testers can see what function we're in in
|
---|
212 | # verbose mode when we deadlock.
|
---|
213 | sys.stdout.flush()
|
---|
214 |
|
---|
215 | # in pybsddb's _DBWithCursor this causes an internal DBCursor
|
---|
216 | # object is created. Other test_ methods in this class could
|
---|
217 | # inadvertently cause the deadlock but an explicit test is needed.
|
---|
218 | if debug: print "A"
|
---|
219 | k,v = self.f.first()
|
---|
220 | if debug: print "B", k
|
---|
221 | self.f[k] = "deadlock. do not pass go. do not collect $200."
|
---|
222 | if debug: print "C"
|
---|
223 | # if the bsddb implementation leaves the DBCursor open during
|
---|
224 | # the database write and locking+threading support is enabled
|
---|
225 | # the cursor's read lock will deadlock the write lock request..
|
---|
226 |
|
---|
227 | # test the iterator interface
|
---|
228 | if True:
|
---|
229 | if debug: print "D"
|
---|
230 | i = self.f.iteritems()
|
---|
231 | k,v = i.next()
|
---|
232 | if debug: print "E"
|
---|
233 | self.f[k] = "please don't deadlock"
|
---|
234 | if debug: print "F"
|
---|
235 | while 1:
|
---|
236 | try:
|
---|
237 | k,v = i.next()
|
---|
238 | except StopIteration:
|
---|
239 | break
|
---|
240 | if debug: print "F2"
|
---|
241 |
|
---|
242 | i = iter(self.f)
|
---|
243 | if debug: print "G"
|
---|
244 | while i:
|
---|
245 | try:
|
---|
246 | if debug: print "H"
|
---|
247 | k = i.next()
|
---|
248 | if debug: print "I"
|
---|
249 | self.f[k] = "deadlocks-r-us"
|
---|
250 | if debug: print "J"
|
---|
251 | except StopIteration:
|
---|
252 | i = None
|
---|
253 | if debug: print "K"
|
---|
254 |
|
---|
255 | # test the legacy cursor interface mixed with writes
|
---|
256 | self.assertIn(self.f.first()[0], self.d)
|
---|
257 | k = self.f.next()[0]
|
---|
258 | self.assertIn(k, self.d)
|
---|
259 | self.f[k] = "be gone with ye deadlocks"
|
---|
260 | self.assertTrue(self.f[k], "be gone with ye deadlocks")
|
---|
261 |
|
---|
262 | def test_for_cursor_memleak(self):
|
---|
263 | # do the bsddb._DBWithCursor iterator internals leak cursors?
|
---|
264 | nc1 = len(self.f._cursor_refs)
|
---|
265 | # create iterator
|
---|
266 | i = self.f.iteritems()
|
---|
267 | nc2 = len(self.f._cursor_refs)
|
---|
268 | # use the iterator (should run to the first yield, creating the cursor)
|
---|
269 | k, v = i.next()
|
---|
270 | nc3 = len(self.f._cursor_refs)
|
---|
271 | # destroy the iterator; this should cause the weakref callback
|
---|
272 | # to remove the cursor object from self.f._cursor_refs
|
---|
273 | del i
|
---|
274 | nc4 = len(self.f._cursor_refs)
|
---|
275 |
|
---|
276 | self.assertEqual(nc1, nc2)
|
---|
277 | self.assertEqual(nc1, nc4)
|
---|
278 | self.assertTrue(nc3 == nc1+1)
|
---|
279 |
|
---|
280 | def test_popitem(self):
|
---|
281 | k, v = self.f.popitem()
|
---|
282 | self.assertIn(k, self.d)
|
---|
283 | self.assertIn(v, self.d.values())
|
---|
284 | self.assertNotIn(k, self.f)
|
---|
285 | self.assertEqual(len(self.d)-1, len(self.f))
|
---|
286 |
|
---|
287 | def test_pop(self):
|
---|
288 | k = 'w'
|
---|
289 | v = self.f.pop(k)
|
---|
290 | self.assertEqual(v, self.d[k])
|
---|
291 | self.assertNotIn(k, self.f)
|
---|
292 | self.assertNotIn(v, self.f.values())
|
---|
293 | self.assertEqual(len(self.d)-1, len(self.f))
|
---|
294 |
|
---|
295 | def test_get(self):
|
---|
296 | self.assertEqual(self.f.get('NotHere'), None)
|
---|
297 | self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
|
---|
298 | self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
|
---|
299 |
|
---|
300 | def test_setdefault(self):
|
---|
301 | self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
|
---|
302 | self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
|
---|
303 |
|
---|
304 | def test_update(self):
|
---|
305 | new = dict(y='life', u='of', i='brian')
|
---|
306 | self.f.update(new)
|
---|
307 | self.d.update(new)
|
---|
308 | for k, v in self.d.iteritems():
|
---|
309 | self.assertEqual(self.f[k], v)
|
---|
310 |
|
---|
311 | def test_keyordering(self):
|
---|
312 | if self.openmethod[0] is not bsddb.btopen:
|
---|
313 | return
|
---|
314 | keys = self.d.keys()
|
---|
315 | keys.sort()
|
---|
316 | self.assertEqual(self.f.first()[0], keys[0])
|
---|
317 | self.assertEqual(self.f.next()[0], keys[1])
|
---|
318 | self.assertEqual(self.f.last()[0], keys[-1])
|
---|
319 | self.assertEqual(self.f.previous()[0], keys[-2])
|
---|
320 | self.assertEqual(list(self.f), keys)
|
---|
321 |
|
---|
322 | class TestBTree(TestBSDDB):
|
---|
323 | fname = test_support.TESTFN
|
---|
324 | openmethod = [bsddb.btopen]
|
---|
325 |
|
---|
326 | class TestBTree_InMemory(TestBSDDB):
|
---|
327 | fname = None
|
---|
328 | openmethod = [bsddb.btopen]
|
---|
329 |
|
---|
330 | class TestBTree_InMemory_Truncate(TestBSDDB):
|
---|
331 | fname = None
|
---|
332 | openflag = 'n'
|
---|
333 | openmethod = [bsddb.btopen]
|
---|
334 |
|
---|
335 | class TestHashTable(TestBSDDB):
|
---|
336 | fname = test_support.TESTFN
|
---|
337 | openmethod = [bsddb.hashopen]
|
---|
338 |
|
---|
339 | class TestHashTable_InMemory(TestBSDDB):
|
---|
340 | fname = None
|
---|
341 | openmethod = [bsddb.hashopen]
|
---|
342 |
|
---|
343 | ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
|
---|
344 | ## # appears broken... at least on
|
---|
345 | ## # Solaris Intel - rmasse 1/97
|
---|
346 |
|
---|
347 | def test_main(verbose=None):
|
---|
348 | test_support.run_unittest(
|
---|
349 | TestBTree,
|
---|
350 | TestHashTable,
|
---|
351 | TestBTree_InMemory,
|
---|
352 | TestHashTable_InMemory,
|
---|
353 | TestBTree_InMemory_Truncate,
|
---|
354 | )
|
---|
355 |
|
---|
356 | if __name__ == "__main__":
|
---|
357 | test_main(verbose=True)
|
---|