1 | """TestCases for distributed transactions.
|
---|
2 | """
|
---|
3 |
|
---|
4 | import os
|
---|
5 | import time
|
---|
6 | import unittest
|
---|
7 |
|
---|
8 | from test_all import db, test_support, have_threads, verbose, \
|
---|
9 | get_new_environment_path, get_new_database_path
|
---|
10 |
|
---|
11 |
|
---|
12 | #----------------------------------------------------------------------
|
---|
13 |
|
---|
14 | class DBReplication(unittest.TestCase) :
|
---|
15 | def setUp(self) :
|
---|
16 | self.homeDirMaster = get_new_environment_path()
|
---|
17 | self.homeDirClient = get_new_environment_path()
|
---|
18 |
|
---|
19 | self.dbenvMaster = db.DBEnv()
|
---|
20 | self.dbenvClient = db.DBEnv()
|
---|
21 |
|
---|
22 | # Must use "DB_THREAD" because the Replication Manager will
|
---|
23 | # be executed in other threads but will use the same environment.
|
---|
24 | # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
|
---|
25 | self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
|
---|
26 | | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
|
---|
27 | db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
|
---|
28 | self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
|
---|
29 | | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
|
---|
30 | db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
|
---|
31 |
|
---|
32 | self.confirmed_master=self.client_startupdone=False
|
---|
33 | def confirmed_master(a,b,c) :
|
---|
34 | if b==db.DB_EVENT_REP_MASTER :
|
---|
35 | self.confirmed_master=True
|
---|
36 |
|
---|
37 | def client_startupdone(a,b,c) :
|
---|
38 | if b==db.DB_EVENT_REP_STARTUPDONE :
|
---|
39 | self.client_startupdone=True
|
---|
40 |
|
---|
41 | self.dbenvMaster.set_event_notify(confirmed_master)
|
---|
42 | self.dbenvClient.set_event_notify(client_startupdone)
|
---|
43 |
|
---|
44 | #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
|
---|
45 | #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
---|
46 | #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
|
---|
47 | #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
---|
48 |
|
---|
49 | self.dbMaster = self.dbClient = None
|
---|
50 |
|
---|
51 |
|
---|
52 | def tearDown(self):
|
---|
53 | if self.dbClient :
|
---|
54 | self.dbClient.close()
|
---|
55 | if self.dbMaster :
|
---|
56 | self.dbMaster.close()
|
---|
57 |
|
---|
58 | # Here we assign dummy event handlers to allow GC of the test object.
|
---|
59 | # Since the dummy handler doesn't use any outer scope variable, it
|
---|
60 | # doesn't keep any reference to the test object.
|
---|
61 | def dummy(*args) :
|
---|
62 | pass
|
---|
63 | self.dbenvMaster.set_event_notify(dummy)
|
---|
64 | self.dbenvClient.set_event_notify(dummy)
|
---|
65 |
|
---|
66 | self.dbenvClient.close()
|
---|
67 | self.dbenvMaster.close()
|
---|
68 | test_support.rmtree(self.homeDirClient)
|
---|
69 | test_support.rmtree(self.homeDirMaster)
|
---|
70 |
|
---|
71 | class DBReplicationManager(DBReplication) :
|
---|
72 | def test01_basic_replication(self) :
|
---|
73 | master_port = test_support.find_unused_port()
|
---|
74 | client_port = test_support.find_unused_port()
|
---|
75 | if db.version() >= (5, 2) :
|
---|
76 | self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port)
|
---|
77 | self.site.set_config(db.DB_GROUP_CREATOR, True)
|
---|
78 | self.site.set_config(db.DB_LOCAL_SITE, True)
|
---|
79 | self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port)
|
---|
80 |
|
---|
81 | self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port)
|
---|
82 | self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True)
|
---|
83 | self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port)
|
---|
84 | self.site4.set_config(db.DB_LOCAL_SITE, True)
|
---|
85 |
|
---|
86 | d = {
|
---|
87 | db.DB_BOOTSTRAP_HELPER: [False, False, True, False],
|
---|
88 | db.DB_GROUP_CREATOR: [True, False, False, False],
|
---|
89 | db.DB_LEGACY: [False, False, False, False],
|
---|
90 | db.DB_LOCAL_SITE: [True, False, False, True],
|
---|
91 | db.DB_REPMGR_PEER: [False, False, False, False ],
|
---|
92 | }
|
---|
93 |
|
---|
94 | for i, j in d.items() :
|
---|
95 | for k, v in \
|
---|
96 | zip([self.site, self.site2, self.site3, self.site4], j) :
|
---|
97 | if v :
|
---|
98 | self.assertTrue(k.get_config(i))
|
---|
99 | else :
|
---|
100 | self.assertFalse(k.get_config(i))
|
---|
101 |
|
---|
102 | self.assertNotEqual(self.site.get_eid(), self.site2.get_eid())
|
---|
103 | self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid())
|
---|
104 |
|
---|
105 | for i, j in zip([self.site, self.site2, self.site3, self.site4], \
|
---|
106 | [master_port, client_port, master_port, client_port]) :
|
---|
107 | addr = i.get_address()
|
---|
108 | self.assertEqual(addr, ("127.0.0.1", j))
|
---|
109 |
|
---|
110 | for i in [self.site, self.site2] :
|
---|
111 | self.assertEqual(i.get_address(),
|
---|
112 | self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address())
|
---|
113 | for i in [self.site3, self.site4] :
|
---|
114 | self.assertEqual(i.get_address(),
|
---|
115 | self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address())
|
---|
116 | else :
|
---|
117 | self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
|
---|
118 | self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
|
---|
119 | self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
|
---|
120 | self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
|
---|
121 |
|
---|
122 | self.dbenvMaster.rep_set_nsites(2)
|
---|
123 | self.dbenvClient.rep_set_nsites(2)
|
---|
124 |
|
---|
125 | self.dbenvMaster.rep_set_priority(10)
|
---|
126 | self.dbenvClient.rep_set_priority(0)
|
---|
127 |
|
---|
128 | self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
|
---|
129 | self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
|
---|
130 | self.assertEqual(self.dbenvMaster.rep_get_timeout(
|
---|
131 | db.DB_REP_CONNECTION_RETRY), 100123)
|
---|
132 | self.assertEqual(self.dbenvClient.rep_get_timeout(
|
---|
133 | db.DB_REP_CONNECTION_RETRY), 100321)
|
---|
134 |
|
---|
135 | self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
|
---|
136 | self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
|
---|
137 | self.assertEqual(self.dbenvMaster.rep_get_timeout(
|
---|
138 | db.DB_REP_ELECTION_TIMEOUT), 100234)
|
---|
139 | self.assertEqual(self.dbenvClient.rep_get_timeout(
|
---|
140 | db.DB_REP_ELECTION_TIMEOUT), 100432)
|
---|
141 |
|
---|
142 | self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
|
---|
143 | self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
|
---|
144 | self.assertEqual(self.dbenvMaster.rep_get_timeout(
|
---|
145 | db.DB_REP_ELECTION_RETRY), 100345)
|
---|
146 | self.assertEqual(self.dbenvClient.rep_get_timeout(
|
---|
147 | db.DB_REP_ELECTION_RETRY), 100543)
|
---|
148 |
|
---|
149 | self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
|
---|
150 | self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
|
---|
151 |
|
---|
152 | self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
|
---|
153 | self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
|
---|
154 |
|
---|
155 | self.assertEqual(self.dbenvMaster.rep_get_nsites(),2)
|
---|
156 | self.assertEqual(self.dbenvClient.rep_get_nsites(),2)
|
---|
157 | self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
|
---|
158 | self.assertEqual(self.dbenvClient.rep_get_priority(),0)
|
---|
159 | self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(),
|
---|
160 | db.DB_REPMGR_ACKS_ALL)
|
---|
161 | self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(),
|
---|
162 | db.DB_REPMGR_ACKS_ALL)
|
---|
163 |
|
---|
164 | # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
|
---|
165 | # is not generated if the master has no new transactions.
|
---|
166 | # This is solved in BDB 4.6 (#15542).
|
---|
167 | import time
|
---|
168 | timeout = time.time()+60
|
---|
169 | while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
|
---|
170 | time.sleep(0.02)
|
---|
171 | # self.client_startupdone does not always get set to True within
|
---|
172 | # the timeout. On windows this may be a deep issue, on other
|
---|
173 | # platforms it is likely just a timing issue, especially on slow
|
---|
174 | # virthost buildbots (see issue 3892 for more). Even though
|
---|
175 | # the timeout triggers, the rest of this test method usually passes
|
---|
176 | # (but not all of it always, see below). So we just note the
|
---|
177 | # timeout on stderr and keep soldering on.
|
---|
178 | if time.time()>timeout:
|
---|
179 | import sys
|
---|
180 | print >> sys.stderr, ("XXX: timeout happened before"
|
---|
181 | "startup was confirmed - see issue 3892")
|
---|
182 | startup_timeout = True
|
---|
183 |
|
---|
184 | d = self.dbenvMaster.repmgr_site_list()
|
---|
185 | self.assertEqual(len(d), 1)
|
---|
186 | d = d.values()[0] # There is only one
|
---|
187 | self.assertEqual(d[0], "127.0.0.1")
|
---|
188 | self.assertEqual(d[1], client_port)
|
---|
189 | self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
|
---|
190 | (d[2]==db.DB_REPMGR_DISCONNECTED))
|
---|
191 |
|
---|
192 | d = self.dbenvClient.repmgr_site_list()
|
---|
193 | self.assertEqual(len(d), 1)
|
---|
194 | d = d.values()[0] # There is only one
|
---|
195 | self.assertEqual(d[0], "127.0.0.1")
|
---|
196 | self.assertEqual(d[1], master_port)
|
---|
197 | self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
|
---|
198 | (d[2]==db.DB_REPMGR_DISCONNECTED))
|
---|
199 |
|
---|
200 | if db.version() >= (4,6) :
|
---|
201 | d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
|
---|
202 | self.assertTrue("msgs_queued" in d)
|
---|
203 |
|
---|
204 | self.dbMaster=db.DB(self.dbenvMaster)
|
---|
205 | txn=self.dbenvMaster.txn_begin()
|
---|
206 | self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
|
---|
207 | txn.commit()
|
---|
208 |
|
---|
209 | import time,os.path
|
---|
210 | timeout=time.time()+10
|
---|
211 | while (time.time()<timeout) and \
|
---|
212 | not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
|
---|
213 | time.sleep(0.01)
|
---|
214 |
|
---|
215 | self.dbClient=db.DB(self.dbenvClient)
|
---|
216 | while True :
|
---|
217 | txn=self.dbenvClient.txn_begin()
|
---|
218 | try :
|
---|
219 | self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
|
---|
220 | mode=0666, txn=txn)
|
---|
221 | except db.DBRepHandleDeadError :
|
---|
222 | txn.abort()
|
---|
223 | self.dbClient.close()
|
---|
224 | self.dbClient=db.DB(self.dbenvClient)
|
---|
225 | continue
|
---|
226 |
|
---|
227 | txn.commit()
|
---|
228 | break
|
---|
229 |
|
---|
230 | txn=self.dbenvMaster.txn_begin()
|
---|
231 | self.dbMaster.put("ABC", "123", txn=txn)
|
---|
232 | txn.commit()
|
---|
233 | import time
|
---|
234 | timeout=time.time()+10
|
---|
235 | v=None
|
---|
236 | while (time.time()<timeout) and (v is None) :
|
---|
237 | txn=self.dbenvClient.txn_begin()
|
---|
238 | v=self.dbClient.get("ABC", txn=txn)
|
---|
239 | txn.commit()
|
---|
240 | if v is None :
|
---|
241 | time.sleep(0.02)
|
---|
242 | # If startup did not happen before the timeout above, then this test
|
---|
243 | # sometimes fails. This happens randomly, which causes buildbot
|
---|
244 | # instability, but all the other bsddb tests pass. Since bsddb3 in the
|
---|
245 | # stdlib is currently not getting active maintenance, and is gone in
|
---|
246 | # py3k, we just skip the end of the test in that case.
|
---|
247 | if time.time()>=timeout and startup_timeout:
|
---|
248 | self.skipTest("replication test skipped due to random failure, "
|
---|
249 | "see issue 3892")
|
---|
250 | self.assertTrue(time.time()<timeout)
|
---|
251 | self.assertEqual("123", v)
|
---|
252 |
|
---|
253 | txn=self.dbenvMaster.txn_begin()
|
---|
254 | self.dbMaster.delete("ABC", txn=txn)
|
---|
255 | txn.commit()
|
---|
256 | timeout=time.time()+10
|
---|
257 | while (time.time()<timeout) and (v is not None) :
|
---|
258 | txn=self.dbenvClient.txn_begin()
|
---|
259 | v=self.dbClient.get("ABC", txn=txn)
|
---|
260 | txn.commit()
|
---|
261 | if v is None :
|
---|
262 | time.sleep(0.02)
|
---|
263 | self.assertTrue(time.time()<timeout)
|
---|
264 | self.assertEqual(None, v)
|
---|
265 |
|
---|
266 | class DBBaseReplication(DBReplication) :
|
---|
267 | def setUp(self) :
|
---|
268 | DBReplication.setUp(self)
|
---|
269 | def confirmed_master(a,b,c) :
|
---|
270 | if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
|
---|
271 | self.confirmed_master = True
|
---|
272 |
|
---|
273 | def client_startupdone(a,b,c) :
|
---|
274 | if b == db.DB_EVENT_REP_STARTUPDONE :
|
---|
275 | self.client_startupdone = True
|
---|
276 |
|
---|
277 | self.dbenvMaster.set_event_notify(confirmed_master)
|
---|
278 | self.dbenvClient.set_event_notify(client_startupdone)
|
---|
279 |
|
---|
280 | import Queue
|
---|
281 | self.m2c = Queue.Queue()
|
---|
282 | self.c2m = Queue.Queue()
|
---|
283 |
|
---|
284 | # There are only two nodes, so we don't need to
|
---|
285 | # do any routing decision
|
---|
286 | def m2c(dbenv, control, rec, lsnp, envid, flags) :
|
---|
287 | self.m2c.put((control, rec))
|
---|
288 |
|
---|
289 | def c2m(dbenv, control, rec, lsnp, envid, flags) :
|
---|
290 | self.c2m.put((control, rec))
|
---|
291 |
|
---|
292 | self.dbenvMaster.rep_set_transport(13,m2c)
|
---|
293 | self.dbenvMaster.rep_set_priority(10)
|
---|
294 | self.dbenvClient.rep_set_transport(3,c2m)
|
---|
295 | self.dbenvClient.rep_set_priority(0)
|
---|
296 |
|
---|
297 | self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
|
---|
298 | self.assertEqual(self.dbenvClient.rep_get_priority(),0)
|
---|
299 |
|
---|
300 | #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
|
---|
301 | #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
---|
302 | #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
|
---|
303 | #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
---|
304 |
|
---|
305 | def thread_master() :
|
---|
306 | return self.thread_do(self.dbenvMaster, self.c2m, 3,
|
---|
307 | self.master_doing_election, True)
|
---|
308 |
|
---|
309 | def thread_client() :
|
---|
310 | return self.thread_do(self.dbenvClient, self.m2c, 13,
|
---|
311 | self.client_doing_election, False)
|
---|
312 |
|
---|
313 | from threading import Thread
|
---|
314 | t_m=Thread(target=thread_master)
|
---|
315 | t_c=Thread(target=thread_client)
|
---|
316 | import sys
|
---|
317 | if sys.version_info[0] < 3 :
|
---|
318 | t_m.setDaemon(True)
|
---|
319 | t_c.setDaemon(True)
|
---|
320 | else :
|
---|
321 | t_m.daemon = True
|
---|
322 | t_c.daemon = True
|
---|
323 |
|
---|
324 | self.t_m = t_m
|
---|
325 | self.t_c = t_c
|
---|
326 |
|
---|
327 | self.dbMaster = self.dbClient = None
|
---|
328 |
|
---|
329 | self.master_doing_election=[False]
|
---|
330 | self.client_doing_election=[False]
|
---|
331 |
|
---|
332 |
|
---|
333 | def tearDown(self):
|
---|
334 | if self.dbClient :
|
---|
335 | self.dbClient.close()
|
---|
336 | if self.dbMaster :
|
---|
337 | self.dbMaster.close()
|
---|
338 | self.m2c.put(None)
|
---|
339 | self.c2m.put(None)
|
---|
340 | self.t_m.join()
|
---|
341 | self.t_c.join()
|
---|
342 |
|
---|
343 | # Here we assign dummy event handlers to allow GC of the test object.
|
---|
344 | # Since the dummy handler doesn't use any outer scope variable, it
|
---|
345 | # doesn't keep any reference to the test object.
|
---|
346 | def dummy(*args) :
|
---|
347 | pass
|
---|
348 | self.dbenvMaster.set_event_notify(dummy)
|
---|
349 | self.dbenvClient.set_event_notify(dummy)
|
---|
350 | self.dbenvMaster.rep_set_transport(13,dummy)
|
---|
351 | self.dbenvClient.rep_set_transport(3,dummy)
|
---|
352 |
|
---|
353 | self.dbenvClient.close()
|
---|
354 | self.dbenvMaster.close()
|
---|
355 | test_support.rmtree(self.homeDirClient)
|
---|
356 | test_support.rmtree(self.homeDirMaster)
|
---|
357 |
|
---|
358 | def basic_rep_threading(self) :
|
---|
359 | self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
|
---|
360 | self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
|
---|
361 |
|
---|
362 | def thread_do(env, q, envid, election_status, must_be_master) :
|
---|
363 | while True :
|
---|
364 | v=q.get()
|
---|
365 | if v is None : return
|
---|
366 | env.rep_process_message(v[0], v[1], envid)
|
---|
367 |
|
---|
368 | self.thread_do = thread_do
|
---|
369 |
|
---|
370 | self.t_m.start()
|
---|
371 | self.t_c.start()
|
---|
372 |
|
---|
373 | def test01_basic_replication(self) :
|
---|
374 | self.basic_rep_threading()
|
---|
375 |
|
---|
376 | # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
|
---|
377 | # is not generated if the master has no new transactions.
|
---|
378 | # This is solved in BDB 4.6 (#15542).
|
---|
379 | import time
|
---|
380 | timeout = time.time()+60
|
---|
381 | while (time.time()<timeout) and not (self.confirmed_master and
|
---|
382 | self.client_startupdone) :
|
---|
383 | time.sleep(0.02)
|
---|
384 | self.assertTrue(time.time()<timeout)
|
---|
385 |
|
---|
386 | self.dbMaster=db.DB(self.dbenvMaster)
|
---|
387 | txn=self.dbenvMaster.txn_begin()
|
---|
388 | self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
|
---|
389 | txn.commit()
|
---|
390 |
|
---|
391 | import time,os.path
|
---|
392 | timeout=time.time()+10
|
---|
393 | while (time.time()<timeout) and \
|
---|
394 | not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
|
---|
395 | time.sleep(0.01)
|
---|
396 |
|
---|
397 | self.dbClient=db.DB(self.dbenvClient)
|
---|
398 | while True :
|
---|
399 | txn=self.dbenvClient.txn_begin()
|
---|
400 | try :
|
---|
401 | self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
|
---|
402 | mode=0666, txn=txn)
|
---|
403 | except db.DBRepHandleDeadError :
|
---|
404 | txn.abort()
|
---|
405 | self.dbClient.close()
|
---|
406 | self.dbClient=db.DB(self.dbenvClient)
|
---|
407 | continue
|
---|
408 |
|
---|
409 | txn.commit()
|
---|
410 | break
|
---|
411 |
|
---|
412 | d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
|
---|
413 | self.assertTrue("master_changes" in d)
|
---|
414 |
|
---|
415 | txn=self.dbenvMaster.txn_begin()
|
---|
416 | self.dbMaster.put("ABC", "123", txn=txn)
|
---|
417 | txn.commit()
|
---|
418 | import time
|
---|
419 | timeout=time.time()+10
|
---|
420 | v=None
|
---|
421 | while (time.time()<timeout) and (v is None) :
|
---|
422 | txn=self.dbenvClient.txn_begin()
|
---|
423 | v=self.dbClient.get("ABC", txn=txn)
|
---|
424 | txn.commit()
|
---|
425 | if v is None :
|
---|
426 | time.sleep(0.02)
|
---|
427 | self.assertTrue(time.time()<timeout)
|
---|
428 | self.assertEqual("123", v)
|
---|
429 |
|
---|
430 | txn=self.dbenvMaster.txn_begin()
|
---|
431 | self.dbMaster.delete("ABC", txn=txn)
|
---|
432 | txn.commit()
|
---|
433 | timeout=time.time()+10
|
---|
434 | while (time.time()<timeout) and (v is not None) :
|
---|
435 | txn=self.dbenvClient.txn_begin()
|
---|
436 | v=self.dbClient.get("ABC", txn=txn)
|
---|
437 | txn.commit()
|
---|
438 | if v is None :
|
---|
439 | time.sleep(0.02)
|
---|
440 | self.assertTrue(time.time()<timeout)
|
---|
441 | self.assertEqual(None, v)
|
---|
442 |
|
---|
443 | if db.version() >= (4,7) :
|
---|
444 | def test02_test_request(self) :
|
---|
445 | self.basic_rep_threading()
|
---|
446 | (minimum, maximum) = self.dbenvClient.rep_get_request()
|
---|
447 | self.dbenvClient.rep_set_request(minimum-1, maximum+1)
|
---|
448 | self.assertEqual(self.dbenvClient.rep_get_request(),
|
---|
449 | (minimum-1, maximum+1))
|
---|
450 |
|
---|
451 | if db.version() >= (4,6) :
|
---|
452 | def test03_master_election(self) :
|
---|
453 | # Get ready to hold an election
|
---|
454 | #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
|
---|
455 | self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
|
---|
456 | self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
|
---|
457 |
|
---|
458 | def thread_do(env, q, envid, election_status, must_be_master) :
|
---|
459 | while True :
|
---|
460 | v=q.get()
|
---|
461 | if v is None : return
|
---|
462 | r = env.rep_process_message(v[0],v[1],envid)
|
---|
463 | if must_be_master and self.confirmed_master :
|
---|
464 | self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
|
---|
465 | must_be_master = False
|
---|
466 |
|
---|
467 | if r[0] == db.DB_REP_HOLDELECTION :
|
---|
468 | def elect() :
|
---|
469 | while True :
|
---|
470 | try :
|
---|
471 | env.rep_elect(2, 1)
|
---|
472 | election_status[0] = False
|
---|
473 | break
|
---|
474 | except db.DBRepUnavailError :
|
---|
475 | pass
|
---|
476 | if not election_status[0] and not self.confirmed_master :
|
---|
477 | from threading import Thread
|
---|
478 | election_status[0] = True
|
---|
479 | t=Thread(target=elect)
|
---|
480 | import sys
|
---|
481 | if sys.version_info[0] < 3 :
|
---|
482 | t.setDaemon(True)
|
---|
483 | else :
|
---|
484 | t.daemon = True
|
---|
485 | t.start()
|
---|
486 |
|
---|
487 | self.thread_do = thread_do
|
---|
488 |
|
---|
489 | self.t_m.start()
|
---|
490 | self.t_c.start()
|
---|
491 |
|
---|
492 | self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
|
---|
493 | self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
|
---|
494 | self.client_doing_election[0] = True
|
---|
495 | while True :
|
---|
496 | try :
|
---|
497 | self.dbenvClient.rep_elect(2, 1)
|
---|
498 | self.client_doing_election[0] = False
|
---|
499 | break
|
---|
500 | except db.DBRepUnavailError :
|
---|
501 | pass
|
---|
502 |
|
---|
503 | self.assertTrue(self.confirmed_master)
|
---|
504 |
|
---|
505 | # Race condition showed up after upgrading to Solaris 10 Update 10
|
---|
506 | # https://forums.oracle.com/forums/thread.jspa?messageID=9902860
|
---|
507 | # jcea@jcea.es: See private email from Paula Bingham (Oracle),
|
---|
508 | # in 20110929.
|
---|
509 | while not (self.dbenvClient.rep_stat()["startup_complete"]) :
|
---|
510 | pass
|
---|
511 |
|
---|
512 | if db.version() >= (4,7) :
|
---|
513 | def test04_test_clockskew(self) :
|
---|
514 | fast, slow = 1234, 1230
|
---|
515 | self.dbenvMaster.rep_set_clockskew(fast, slow)
|
---|
516 | self.assertEqual((fast, slow),
|
---|
517 | self.dbenvMaster.rep_get_clockskew())
|
---|
518 | self.basic_rep_threading()
|
---|
519 |
|
---|
520 | #----------------------------------------------------------------------
|
---|
521 |
|
---|
522 | def test_suite():
|
---|
523 | suite = unittest.TestSuite()
|
---|
524 | if db.version() >= (4, 6) :
|
---|
525 | dbenv = db.DBEnv()
|
---|
526 | try :
|
---|
527 | dbenv.repmgr_get_ack_policy()
|
---|
528 | ReplicationManager_available=True
|
---|
529 | except :
|
---|
530 | ReplicationManager_available=False
|
---|
531 | dbenv.close()
|
---|
532 | del dbenv
|
---|
533 | if ReplicationManager_available :
|
---|
534 | suite.addTest(unittest.makeSuite(DBReplicationManager))
|
---|
535 |
|
---|
536 | if have_threads :
|
---|
537 | suite.addTest(unittest.makeSuite(DBBaseReplication))
|
---|
538 |
|
---|
539 | return suite
|
---|
540 |
|
---|
541 |
|
---|
542 | if __name__ == '__main__':
|
---|
543 | unittest.main(defaultTest='test_suite')
|
---|