source: python/trunk/Lib/bsddb/test/test_queue.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 3.9 KB
Line 
1"""
2TestCases for exercising a Queue DB.
3"""
4
5import os, string
6from pprint import pprint
7import unittest
8
9from test_all import db, verbose, get_new_database_path
10
11#----------------------------------------------------------------------
12
13class SimpleQueueTestCase(unittest.TestCase):
14 def setUp(self):
15 self.filename = get_new_database_path()
16
17 def tearDown(self):
18 try:
19 os.remove(self.filename)
20 except os.error:
21 pass
22
23
24 def test01_basic(self):
25 # Basic Queue tests using the deprecated DBCursor.consume method.
26
27 if verbose:
28 print '\n', '-=' * 30
29 print "Running %s.test01_basic..." % self.__class__.__name__
30
31 d = db.DB()
32 d.set_re_len(40) # Queues must be fixed length
33 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
34
35 if verbose:
36 print "before appends" + '-' * 30
37 pprint(d.stat())
38
39 for x in string.letters:
40 d.append(x * 40)
41
42 self.assertEqual(len(d), len(string.letters))
43
44 d.put(100, "some more data")
45 d.put(101, "and some more ")
46 d.put(75, "out of order")
47 d.put(1, "replacement data")
48
49 self.assertEqual(len(d), len(string.letters)+3)
50
51 if verbose:
52 print "before close" + '-' * 30
53 pprint(d.stat())
54
55 d.close()
56 del d
57 d = db.DB()
58 d.open(self.filename)
59
60 if verbose:
61 print "after open" + '-' * 30
62 pprint(d.stat())
63
64 # Test "txn" as a positional parameter
65 d.append("one more", None)
66 # Test "txn" as a keyword parameter
67 d.append("another one", txn=None)
68
69 c = d.cursor()
70
71 if verbose:
72 print "after append" + '-' * 30
73 pprint(d.stat())
74
75 rec = c.consume()
76 while rec:
77 if verbose:
78 print rec
79 rec = c.consume()
80 c.close()
81
82 if verbose:
83 print "after consume loop" + '-' * 30
84 pprint(d.stat())
85
86 self.assertEqual(len(d), 0, \
87 "if you see this message then you need to rebuild " \
88 "Berkeley DB 3.1.17 with the patch in patches/qam_stat.diff")
89
90 d.close()
91
92
93
94 def test02_basicPost32(self):
95 # Basic Queue tests using the new DB.consume method in DB 3.2+
96 # (No cursor needed)
97
98 if verbose:
99 print '\n', '-=' * 30
100 print "Running %s.test02_basicPost32..." % self.__class__.__name__
101
102 d = db.DB()
103 d.set_re_len(40) # Queues must be fixed length
104 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
105
106 if verbose:
107 print "before appends" + '-' * 30
108 pprint(d.stat())
109
110 for x in string.letters:
111 d.append(x * 40)
112
113 self.assertEqual(len(d), len(string.letters))
114
115 d.put(100, "some more data")
116 d.put(101, "and some more ")
117 d.put(75, "out of order")
118 d.put(1, "replacement data")
119
120 self.assertEqual(len(d), len(string.letters)+3)
121
122 if verbose:
123 print "before close" + '-' * 30
124 pprint(d.stat())
125
126 d.close()
127 del d
128 d = db.DB()
129 d.open(self.filename)
130 #d.set_get_returns_none(true)
131
132 if verbose:
133 print "after open" + '-' * 30
134 pprint(d.stat())
135
136 d.append("one more")
137
138 if verbose:
139 print "after append" + '-' * 30
140 pprint(d.stat())
141
142 rec = d.consume()
143 while rec:
144 if verbose:
145 print rec
146 rec = d.consume()
147
148 if verbose:
149 print "after consume loop" + '-' * 30
150 pprint(d.stat())
151
152 d.close()
153
154
155
156#----------------------------------------------------------------------
157
158def test_suite():
159 return unittest.makeSuite(SimpleQueueTestCase)
160
161
162if __name__ == '__main__':
163 unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.