source: python/trunk/Lib/bsddb/test/test_replication.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 21.0 KB
Line 
1"""TestCases for distributed transactions.
2"""
3
4import os
5import time
6import unittest
7
8from test_all import db, test_support, have_threads, verbose, \
9 get_new_environment_path, get_new_database_path
10
11
12#----------------------------------------------------------------------
13
14class DBReplication(unittest.TestCase) :
15 def setUp(self) :
16 self.homeDirMaster = get_new_environment_path()
17 self.homeDirClient = get_new_environment_path()
18
19 self.dbenvMaster = db.DBEnv()
20 self.dbenvClient = db.DBEnv()
21
22 # Must use "DB_THREAD" because the Replication Manager will
23 # be executed in other threads but will use the same environment.
24 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
25 self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
26 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
27 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
28 self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
29 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
30 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
31
32 self.confirmed_master=self.client_startupdone=False
33 def confirmed_master(a,b,c) :
34 if b==db.DB_EVENT_REP_MASTER :
35 self.confirmed_master=True
36
37 def client_startupdone(a,b,c) :
38 if b==db.DB_EVENT_REP_STARTUPDONE :
39 self.client_startupdone=True
40
41 self.dbenvMaster.set_event_notify(confirmed_master)
42 self.dbenvClient.set_event_notify(client_startupdone)
43
44 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
45 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
46 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
47 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
48
49 self.dbMaster = self.dbClient = None
50
51
52 def tearDown(self):
53 if self.dbClient :
54 self.dbClient.close()
55 if self.dbMaster :
56 self.dbMaster.close()
57
58 # Here we assign dummy event handlers to allow GC of the test object.
59 # Since the dummy handler doesn't use any outer scope variable, it
60 # doesn't keep any reference to the test object.
61 def dummy(*args) :
62 pass
63 self.dbenvMaster.set_event_notify(dummy)
64 self.dbenvClient.set_event_notify(dummy)
65
66 self.dbenvClient.close()
67 self.dbenvMaster.close()
68 test_support.rmtree(self.homeDirClient)
69 test_support.rmtree(self.homeDirMaster)
70
71class DBReplicationManager(DBReplication) :
72 def test01_basic_replication(self) :
73 master_port = test_support.find_unused_port()
74 client_port = test_support.find_unused_port()
75 if db.version() >= (5, 2) :
76 self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port)
77 self.site.set_config(db.DB_GROUP_CREATOR, True)
78 self.site.set_config(db.DB_LOCAL_SITE, True)
79 self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port)
80
81 self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port)
82 self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True)
83 self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port)
84 self.site4.set_config(db.DB_LOCAL_SITE, True)
85
86 d = {
87 db.DB_BOOTSTRAP_HELPER: [False, False, True, False],
88 db.DB_GROUP_CREATOR: [True, False, False, False],
89 db.DB_LEGACY: [False, False, False, False],
90 db.DB_LOCAL_SITE: [True, False, False, True],
91 db.DB_REPMGR_PEER: [False, False, False, False ],
92 }
93
94 for i, j in d.items() :
95 for k, v in \
96 zip([self.site, self.site2, self.site3, self.site4], j) :
97 if v :
98 self.assertTrue(k.get_config(i))
99 else :
100 self.assertFalse(k.get_config(i))
101
102 self.assertNotEqual(self.site.get_eid(), self.site2.get_eid())
103 self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid())
104
105 for i, j in zip([self.site, self.site2, self.site3, self.site4], \
106 [master_port, client_port, master_port, client_port]) :
107 addr = i.get_address()
108 self.assertEqual(addr, ("127.0.0.1", j))
109
110 for i in [self.site, self.site2] :
111 self.assertEqual(i.get_address(),
112 self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address())
113 for i in [self.site3, self.site4] :
114 self.assertEqual(i.get_address(),
115 self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address())
116 else :
117 self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
118 self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
119 self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
120 self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
121
122 self.dbenvMaster.rep_set_nsites(2)
123 self.dbenvClient.rep_set_nsites(2)
124
125 self.dbenvMaster.rep_set_priority(10)
126 self.dbenvClient.rep_set_priority(0)
127
128 self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
129 self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
130 self.assertEqual(self.dbenvMaster.rep_get_timeout(
131 db.DB_REP_CONNECTION_RETRY), 100123)
132 self.assertEqual(self.dbenvClient.rep_get_timeout(
133 db.DB_REP_CONNECTION_RETRY), 100321)
134
135 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
136 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
137 self.assertEqual(self.dbenvMaster.rep_get_timeout(
138 db.DB_REP_ELECTION_TIMEOUT), 100234)
139 self.assertEqual(self.dbenvClient.rep_get_timeout(
140 db.DB_REP_ELECTION_TIMEOUT), 100432)
141
142 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
143 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
144 self.assertEqual(self.dbenvMaster.rep_get_timeout(
145 db.DB_REP_ELECTION_RETRY), 100345)
146 self.assertEqual(self.dbenvClient.rep_get_timeout(
147 db.DB_REP_ELECTION_RETRY), 100543)
148
149 self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
150 self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
151
152 self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
153 self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
154
155 self.assertEqual(self.dbenvMaster.rep_get_nsites(),2)
156 self.assertEqual(self.dbenvClient.rep_get_nsites(),2)
157 self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
158 self.assertEqual(self.dbenvClient.rep_get_priority(),0)
159 self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(),
160 db.DB_REPMGR_ACKS_ALL)
161 self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(),
162 db.DB_REPMGR_ACKS_ALL)
163
164 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
165 # is not generated if the master has no new transactions.
166 # This is solved in BDB 4.6 (#15542).
167 import time
168 timeout = time.time()+60
169 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
170 time.sleep(0.02)
171 # self.client_startupdone does not always get set to True within
172 # the timeout. On windows this may be a deep issue, on other
173 # platforms it is likely just a timing issue, especially on slow
174 # virthost buildbots (see issue 3892 for more). Even though
175 # the timeout triggers, the rest of this test method usually passes
176 # (but not all of it always, see below). So we just note the
177 # timeout on stderr and keep soldering on.
178 if time.time()>timeout:
179 import sys
180 print >> sys.stderr, ("XXX: timeout happened before"
181 "startup was confirmed - see issue 3892")
182 startup_timeout = True
183
184 d = self.dbenvMaster.repmgr_site_list()
185 self.assertEqual(len(d), 1)
186 d = d.values()[0] # There is only one
187 self.assertEqual(d[0], "127.0.0.1")
188 self.assertEqual(d[1], client_port)
189 self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
190 (d[2]==db.DB_REPMGR_DISCONNECTED))
191
192 d = self.dbenvClient.repmgr_site_list()
193 self.assertEqual(len(d), 1)
194 d = d.values()[0] # There is only one
195 self.assertEqual(d[0], "127.0.0.1")
196 self.assertEqual(d[1], master_port)
197 self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
198 (d[2]==db.DB_REPMGR_DISCONNECTED))
199
200 if db.version() >= (4,6) :
201 d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
202 self.assertTrue("msgs_queued" in d)
203
204 self.dbMaster=db.DB(self.dbenvMaster)
205 txn=self.dbenvMaster.txn_begin()
206 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
207 txn.commit()
208
209 import time,os.path
210 timeout=time.time()+10
211 while (time.time()<timeout) and \
212 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
213 time.sleep(0.01)
214
215 self.dbClient=db.DB(self.dbenvClient)
216 while True :
217 txn=self.dbenvClient.txn_begin()
218 try :
219 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
220 mode=0666, txn=txn)
221 except db.DBRepHandleDeadError :
222 txn.abort()
223 self.dbClient.close()
224 self.dbClient=db.DB(self.dbenvClient)
225 continue
226
227 txn.commit()
228 break
229
230 txn=self.dbenvMaster.txn_begin()
231 self.dbMaster.put("ABC", "123", txn=txn)
232 txn.commit()
233 import time
234 timeout=time.time()+10
235 v=None
236 while (time.time()<timeout) and (v is None) :
237 txn=self.dbenvClient.txn_begin()
238 v=self.dbClient.get("ABC", txn=txn)
239 txn.commit()
240 if v is None :
241 time.sleep(0.02)
242 # If startup did not happen before the timeout above, then this test
243 # sometimes fails. This happens randomly, which causes buildbot
244 # instability, but all the other bsddb tests pass. Since bsddb3 in the
245 # stdlib is currently not getting active maintenance, and is gone in
246 # py3k, we just skip the end of the test in that case.
247 if time.time()>=timeout and startup_timeout:
248 self.skipTest("replication test skipped due to random failure, "
249 "see issue 3892")
250 self.assertTrue(time.time()<timeout)
251 self.assertEqual("123", v)
252
253 txn=self.dbenvMaster.txn_begin()
254 self.dbMaster.delete("ABC", txn=txn)
255 txn.commit()
256 timeout=time.time()+10
257 while (time.time()<timeout) and (v is not None) :
258 txn=self.dbenvClient.txn_begin()
259 v=self.dbClient.get("ABC", txn=txn)
260 txn.commit()
261 if v is None :
262 time.sleep(0.02)
263 self.assertTrue(time.time()<timeout)
264 self.assertEqual(None, v)
265
266class DBBaseReplication(DBReplication) :
267 def setUp(self) :
268 DBReplication.setUp(self)
269 def confirmed_master(a,b,c) :
270 if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
271 self.confirmed_master = True
272
273 def client_startupdone(a,b,c) :
274 if b == db.DB_EVENT_REP_STARTUPDONE :
275 self.client_startupdone = True
276
277 self.dbenvMaster.set_event_notify(confirmed_master)
278 self.dbenvClient.set_event_notify(client_startupdone)
279
280 import Queue
281 self.m2c = Queue.Queue()
282 self.c2m = Queue.Queue()
283
284 # There are only two nodes, so we don't need to
285 # do any routing decision
286 def m2c(dbenv, control, rec, lsnp, envid, flags) :
287 self.m2c.put((control, rec))
288
289 def c2m(dbenv, control, rec, lsnp, envid, flags) :
290 self.c2m.put((control, rec))
291
292 self.dbenvMaster.rep_set_transport(13,m2c)
293 self.dbenvMaster.rep_set_priority(10)
294 self.dbenvClient.rep_set_transport(3,c2m)
295 self.dbenvClient.rep_set_priority(0)
296
297 self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
298 self.assertEqual(self.dbenvClient.rep_get_priority(),0)
299
300 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
301 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
302 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
303 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
304
305 def thread_master() :
306 return self.thread_do(self.dbenvMaster, self.c2m, 3,
307 self.master_doing_election, True)
308
309 def thread_client() :
310 return self.thread_do(self.dbenvClient, self.m2c, 13,
311 self.client_doing_election, False)
312
313 from threading import Thread
314 t_m=Thread(target=thread_master)
315 t_c=Thread(target=thread_client)
316 import sys
317 if sys.version_info[0] < 3 :
318 t_m.setDaemon(True)
319 t_c.setDaemon(True)
320 else :
321 t_m.daemon = True
322 t_c.daemon = True
323
324 self.t_m = t_m
325 self.t_c = t_c
326
327 self.dbMaster = self.dbClient = None
328
329 self.master_doing_election=[False]
330 self.client_doing_election=[False]
331
332
333 def tearDown(self):
334 if self.dbClient :
335 self.dbClient.close()
336 if self.dbMaster :
337 self.dbMaster.close()
338 self.m2c.put(None)
339 self.c2m.put(None)
340 self.t_m.join()
341 self.t_c.join()
342
343 # Here we assign dummy event handlers to allow GC of the test object.
344 # Since the dummy handler doesn't use any outer scope variable, it
345 # doesn't keep any reference to the test object.
346 def dummy(*args) :
347 pass
348 self.dbenvMaster.set_event_notify(dummy)
349 self.dbenvClient.set_event_notify(dummy)
350 self.dbenvMaster.rep_set_transport(13,dummy)
351 self.dbenvClient.rep_set_transport(3,dummy)
352
353 self.dbenvClient.close()
354 self.dbenvMaster.close()
355 test_support.rmtree(self.homeDirClient)
356 test_support.rmtree(self.homeDirMaster)
357
358 def basic_rep_threading(self) :
359 self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
360 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
361
362 def thread_do(env, q, envid, election_status, must_be_master) :
363 while True :
364 v=q.get()
365 if v is None : return
366 env.rep_process_message(v[0], v[1], envid)
367
368 self.thread_do = thread_do
369
370 self.t_m.start()
371 self.t_c.start()
372
373 def test01_basic_replication(self) :
374 self.basic_rep_threading()
375
376 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
377 # is not generated if the master has no new transactions.
378 # This is solved in BDB 4.6 (#15542).
379 import time
380 timeout = time.time()+60
381 while (time.time()<timeout) and not (self.confirmed_master and
382 self.client_startupdone) :
383 time.sleep(0.02)
384 self.assertTrue(time.time()<timeout)
385
386 self.dbMaster=db.DB(self.dbenvMaster)
387 txn=self.dbenvMaster.txn_begin()
388 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
389 txn.commit()
390
391 import time,os.path
392 timeout=time.time()+10
393 while (time.time()<timeout) and \
394 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
395 time.sleep(0.01)
396
397 self.dbClient=db.DB(self.dbenvClient)
398 while True :
399 txn=self.dbenvClient.txn_begin()
400 try :
401 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
402 mode=0666, txn=txn)
403 except db.DBRepHandleDeadError :
404 txn.abort()
405 self.dbClient.close()
406 self.dbClient=db.DB(self.dbenvClient)
407 continue
408
409 txn.commit()
410 break
411
412 d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
413 self.assertTrue("master_changes" in d)
414
415 txn=self.dbenvMaster.txn_begin()
416 self.dbMaster.put("ABC", "123", txn=txn)
417 txn.commit()
418 import time
419 timeout=time.time()+10
420 v=None
421 while (time.time()<timeout) and (v is None) :
422 txn=self.dbenvClient.txn_begin()
423 v=self.dbClient.get("ABC", txn=txn)
424 txn.commit()
425 if v is None :
426 time.sleep(0.02)
427 self.assertTrue(time.time()<timeout)
428 self.assertEqual("123", v)
429
430 txn=self.dbenvMaster.txn_begin()
431 self.dbMaster.delete("ABC", txn=txn)
432 txn.commit()
433 timeout=time.time()+10
434 while (time.time()<timeout) and (v is not None) :
435 txn=self.dbenvClient.txn_begin()
436 v=self.dbClient.get("ABC", txn=txn)
437 txn.commit()
438 if v is None :
439 time.sleep(0.02)
440 self.assertTrue(time.time()<timeout)
441 self.assertEqual(None, v)
442
443 if db.version() >= (4,7) :
444 def test02_test_request(self) :
445 self.basic_rep_threading()
446 (minimum, maximum) = self.dbenvClient.rep_get_request()
447 self.dbenvClient.rep_set_request(minimum-1, maximum+1)
448 self.assertEqual(self.dbenvClient.rep_get_request(),
449 (minimum-1, maximum+1))
450
451 if db.version() >= (4,6) :
452 def test03_master_election(self) :
453 # Get ready to hold an election
454 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
455 self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
456 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
457
458 def thread_do(env, q, envid, election_status, must_be_master) :
459 while True :
460 v=q.get()
461 if v is None : return
462 r = env.rep_process_message(v[0],v[1],envid)
463 if must_be_master and self.confirmed_master :
464 self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
465 must_be_master = False
466
467 if r[0] == db.DB_REP_HOLDELECTION :
468 def elect() :
469 while True :
470 try :
471 env.rep_elect(2, 1)
472 election_status[0] = False
473 break
474 except db.DBRepUnavailError :
475 pass
476 if not election_status[0] and not self.confirmed_master :
477 from threading import Thread
478 election_status[0] = True
479 t=Thread(target=elect)
480 import sys
481 if sys.version_info[0] < 3 :
482 t.setDaemon(True)
483 else :
484 t.daemon = True
485 t.start()
486
487 self.thread_do = thread_do
488
489 self.t_m.start()
490 self.t_c.start()
491
492 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
493 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
494 self.client_doing_election[0] = True
495 while True :
496 try :
497 self.dbenvClient.rep_elect(2, 1)
498 self.client_doing_election[0] = False
499 break
500 except db.DBRepUnavailError :
501 pass
502
503 self.assertTrue(self.confirmed_master)
504
505 # Race condition showed up after upgrading to Solaris 10 Update 10
506 # https://forums.oracle.com/forums/thread.jspa?messageID=9902860
507 # jcea@jcea.es: See private email from Paula Bingham (Oracle),
508 # in 20110929.
509 while not (self.dbenvClient.rep_stat()["startup_complete"]) :
510 pass
511
512 if db.version() >= (4,7) :
513 def test04_test_clockskew(self) :
514 fast, slow = 1234, 1230
515 self.dbenvMaster.rep_set_clockskew(fast, slow)
516 self.assertEqual((fast, slow),
517 self.dbenvMaster.rep_get_clockskew())
518 self.basic_rep_threading()
519
520#----------------------------------------------------------------------
521
522def test_suite():
523 suite = unittest.TestSuite()
524 if db.version() >= (4, 6) :
525 dbenv = db.DBEnv()
526 try :
527 dbenv.repmgr_get_ack_policy()
528 ReplicationManager_available=True
529 except :
530 ReplicationManager_available=False
531 dbenv.close()
532 del dbenv
533 if ReplicationManager_available :
534 suite.addTest(unittest.makeSuite(DBReplicationManager))
535
536 if have_threads :
537 suite.addTest(unittest.makeSuite(DBBaseReplication))
538
539 return suite
540
541
542if __name__ == '__main__':
543 unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.