1 | import unittest
|
---|
2 | import os
|
---|
3 |
|
---|
4 | from test_all import db, test_support, get_new_environment_path, get_new_database_path
|
---|
5 |
|
---|
6 |
|
---|
7 | class DBSequenceTest(unittest.TestCase):
|
---|
8 | def setUp(self):
|
---|
9 | self.int_32_max = 0x100000000
|
---|
10 | self.homeDir = get_new_environment_path()
|
---|
11 | self.filename = "test"
|
---|
12 |
|
---|
13 | self.dbenv = db.DBEnv()
|
---|
14 | self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
|
---|
15 | self.d = db.DB(self.dbenv)
|
---|
16 | self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
|
---|
17 |
|
---|
18 | def tearDown(self):
|
---|
19 | if hasattr(self, 'seq'):
|
---|
20 | self.seq.close()
|
---|
21 | del self.seq
|
---|
22 | if hasattr(self, 'd'):
|
---|
23 | self.d.close()
|
---|
24 | del self.d
|
---|
25 | if hasattr(self, 'dbenv'):
|
---|
26 | self.dbenv.close()
|
---|
27 | del self.dbenv
|
---|
28 |
|
---|
29 | test_support.rmtree(self.homeDir)
|
---|
30 |
|
---|
31 | def test_get(self):
|
---|
32 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
33 | start_value = 10 * self.int_32_max
|
---|
34 | self.assertEqual(0xA00000000, start_value)
|
---|
35 | self.assertEqual(None, self.seq.initial_value(start_value))
|
---|
36 | self.assertEqual(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
|
---|
37 | self.assertEqual(start_value, self.seq.get(5))
|
---|
38 | self.assertEqual(start_value + 5, self.seq.get())
|
---|
39 |
|
---|
40 | def test_remove(self):
|
---|
41 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
42 | self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
---|
43 | self.assertEqual(None, self.seq.remove(txn=None, flags=0))
|
---|
44 | del self.seq
|
---|
45 |
|
---|
46 | def test_get_key(self):
|
---|
47 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
48 | key = 'foo'
|
---|
49 | self.assertEqual(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
|
---|
50 | self.assertEqual(key, self.seq.get_key())
|
---|
51 |
|
---|
52 | def test_get_dbp(self):
|
---|
53 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
54 | self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
---|
55 | self.assertEqual(self.d, self.seq.get_dbp())
|
---|
56 |
|
---|
57 | def test_cachesize(self):
|
---|
58 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
59 | cashe_size = 10
|
---|
60 | self.assertEqual(None, self.seq.set_cachesize(cashe_size))
|
---|
61 | self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
---|
62 | self.assertEqual(cashe_size, self.seq.get_cachesize())
|
---|
63 |
|
---|
64 | def test_flags(self):
|
---|
65 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
66 | flag = db.DB_SEQ_WRAP;
|
---|
67 | self.assertEqual(None, self.seq.set_flags(flag))
|
---|
68 | self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
---|
69 | self.assertEqual(flag, self.seq.get_flags() & flag)
|
---|
70 |
|
---|
71 | def test_range(self):
|
---|
72 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
73 | seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
|
---|
74 | self.assertEqual(None, self.seq.set_range(seq_range))
|
---|
75 | self.seq.initial_value(seq_range[0])
|
---|
76 | self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
---|
77 | self.assertEqual(seq_range, self.seq.get_range())
|
---|
78 |
|
---|
79 | def test_stat(self):
|
---|
80 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
81 | self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
---|
82 | stat = self.seq.stat()
|
---|
83 | for param in ('nowait', 'min', 'max', 'value', 'current',
|
---|
84 | 'flags', 'cache_size', 'last_value', 'wait'):
|
---|
85 | self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
|
---|
86 |
|
---|
87 | if db.version() >= (4,7) :
|
---|
88 | # This code checks a crash solved in Berkeley DB 4.7
|
---|
89 | def test_stat_crash(self) :
|
---|
90 | d=db.DB()
|
---|
91 | d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM
|
---|
92 | seq = db.DBSequence(d, flags=0)
|
---|
93 |
|
---|
94 | self.assertRaises(db.DBNotFoundError, seq.open,
|
---|
95 | key='id', txn=None, flags=0)
|
---|
96 |
|
---|
97 | self.assertRaises(db.DBInvalidArgError, seq.stat)
|
---|
98 |
|
---|
99 | d.close()
|
---|
100 |
|
---|
101 | def test_64bits(self) :
|
---|
102 | # We don't use both extremes because they are problematic
|
---|
103 | value_plus=(1L<<63)-2
|
---|
104 | self.assertEqual(9223372036854775806L,value_plus)
|
---|
105 | value_minus=(-1L<<63)+1 # Two complement
|
---|
106 | self.assertEqual(-9223372036854775807L,value_minus)
|
---|
107 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
108 | self.assertEqual(None, self.seq.initial_value(value_plus-1))
|
---|
109 | self.assertEqual(None, self.seq.open(key='id', txn=None,
|
---|
110 | flags=db.DB_CREATE))
|
---|
111 | self.assertEqual(value_plus-1, self.seq.get(1))
|
---|
112 | self.assertEqual(value_plus, self.seq.get(1))
|
---|
113 |
|
---|
114 | self.seq.remove(txn=None, flags=0)
|
---|
115 |
|
---|
116 | self.seq = db.DBSequence(self.d, flags=0)
|
---|
117 | self.assertEqual(None, self.seq.initial_value(value_minus))
|
---|
118 | self.assertEqual(None, self.seq.open(key='id', txn=None,
|
---|
119 | flags=db.DB_CREATE))
|
---|
120 | self.assertEqual(value_minus, self.seq.get(1))
|
---|
121 | self.assertEqual(value_minus+1, self.seq.get(1))
|
---|
122 |
|
---|
123 | def test_multiple_close(self):
|
---|
124 | self.seq = db.DBSequence(self.d)
|
---|
125 | self.seq.close() # You can close a Sequence multiple times
|
---|
126 | self.seq.close()
|
---|
127 | self.seq.close()
|
---|
128 |
|
---|
129 | def test_suite():
|
---|
130 | suite = unittest.TestSuite()
|
---|
131 | suite.addTest(unittest.makeSuite(DBSequenceTest))
|
---|
132 | return suite
|
---|
133 |
|
---|
134 |
|
---|
135 | if __name__ == '__main__':
|
---|
136 | unittest.main(defaultTest='test_suite')
|
---|