Changeset 429 for trunk/server/source3/lib/dbwrap_ctdb.c
- Timestamp:
- Apr 9, 2010, 3:51:41 PM (15 years ago)
- Location:
- trunk/server
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/server
-
Property svn:mergeinfo
set to
/vendor/3.5.2 merged eligible /vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
trunk/server/source3/lib/dbwrap_ctdb.c
r414 r429 2 2 Unix SMB/CIFS implementation. 3 3 Database interface wrapper around ctdbd 4 Copyright (C) Volker Lendecke 2007 4 Copyright (C) Volker Lendecke 2007-2009 5 Copyright (C) Michael Adam 2009 5 6 6 7 This program is free software; you can redistribute it and/or modify … … 23 24 #include "ctdb_private.h" 24 25 #include "ctdbd_conn.h" 26 #include "g_lock.h" 25 27 26 28 struct db_ctdb_transaction_handle { 27 29 struct db_ctdb_ctx *ctx; 28 bool in_replay;29 30 /* 30 31 * we store the reads and writes done under a transaction: … … 36 37 uint32_t nesting; 37 38 bool nested_cancel; 39 char *lock_name; 38 40 }; 39 41 … … 43 45 uint32 db_id; 44 46 struct db_ctdb_transaction_handle *transaction; 47 struct g_lock_ctx *lock_ctx; 45 48 }; 46 49 … … 49 52 struct ctdb_ltdb_header header; 50 53 }; 51 52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,53 TALLOC_CTX *mem_ctx,54 TDB_DATA key,55 bool persistent);56 54 57 55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb) … … 298 296 } 299 297 300 301 static int32_t db_ctdb_transaction_active(uint32_t db_id)302 {303 int32_t status;304 NTSTATUS ret;305 TDB_DATA indata;306 307 indata.dptr = (uint8_t *)&db_id;308 indata.dsize = sizeof(db_id);309 310 ret = ctdbd_control_local(messaging_ctdbd_connection(),311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,312 indata, NULL, NULL, &status);313 314 if (!NT_STATUS_IS_OK(ret)) {315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));316 return -1;317 }318 319 return status;320 }321 322 323 298 /** 324 299 * CTDB transaction destructor … … 326 301 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h) 327 302 { 328 tdb_transaction_cancel(h->ctx->wtdb->tdb); 303 NTSTATUS status; 304 305 status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name); 306 if (!NT_STATUS_IS_OK(status)) { 307 DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status))); 308 return -1; 309 } 329 310 return 0; 330 311 } 331 332 /**333 * start a transaction on a ctdb database:334 * - lock the transaction lock key335 * - start the tdb transaction336 */337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)338 {339 struct db_record *rh;340 struct db_ctdb_rec *crec;341 TDB_DATA key;342 TALLOC_CTX *tmp_ctx;343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;344 int ret;345 struct db_ctdb_ctx *ctx = h->ctx;346 TDB_DATA data;347 pid_t pid;348 NTSTATUS status;349 struct ctdb_ltdb_header header;350 int32_t transaction_status;351 352 key.dptr = (uint8_t *)discard_const(keyname);353 key.dsize = strlen(keyname);354 355 again:356 tmp_ctx = talloc_new(h);357 358 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);359 if (rh == NULL) {360 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));361 talloc_free(tmp_ctx);362 return -1;363 }364 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);365 366 transaction_status = db_ctdb_transaction_active(ctx->db_id);367 if (transaction_status == 1) {368 unsigned long int usec = (1000 + random()) % 100000;369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."370 "Re-trying after %lu microseconds...",371 ctx->db_id, usec));372 talloc_free(tmp_ctx);373 usleep(usec);374 goto again;375 }376 377 /*378 * store the pid in the database:379 * it is not enought that the node is dmaster...380 */381 pid = getpid();382 data.dptr = (unsigned char *)&pid;383 data.dsize = sizeof(pid_t);384 crec->header.rsn++;385 crec->header.dmaster = get_my_vnn();386 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);387 if (!NT_STATUS_IS_OK(status)) {388 DEBUG(0, (__location__ " Failed to store pid in transaction "389 "record: %s\n", nt_errstr(status)));390 talloc_free(tmp_ctx);391 return -1;392 }393 394 talloc_free(rh);395 396 ret = tdb_transaction_start(ctx->wtdb->tdb);397 if (ret != 0) {398 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));399 talloc_free(tmp_ctx);400 return -1;401 }402 403 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);404 if (!NT_STATUS_IS_OK(status)) {405 DEBUG(0, (__location__ " failed to refetch transaction lock "406 "record inside transaction: %s - retrying\n",407 nt_errstr(status)));408 tdb_transaction_cancel(ctx->wtdb->tdb);409 talloc_free(tmp_ctx);410 goto again;411 }412 413 if (header.dmaster != get_my_vnn()) {414 DEBUG(3, (__location__ " refetch transaction lock record : "415 "we are not dmaster any more "416 "(dmaster[%u] != my_vnn[%u]) - retrying\n",417 header.dmaster, get_my_vnn()));418 tdb_transaction_cancel(ctx->wtdb->tdb);419 talloc_free(tmp_ctx);420 goto again;421 }422 423 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {424 DEBUG(3, (__location__ " refetch transaction lock record: "425 "another local process has started a transaction "426 "(stored pid [%u] != my pid [%u]) - retrying\n",427 *(pid_t *)(data.dptr), pid));428 tdb_transaction_cancel(ctx->wtdb->tdb);429 talloc_free(tmp_ctx);430 goto again;431 }432 433 talloc_free(tmp_ctx);434 435 return 0;436 }437 438 312 439 313 /** … … 444 318 { 445 319 struct db_ctdb_transaction_handle *h; 446 int ret;320 NTSTATUS status; 447 321 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, 448 322 struct db_ctdb_ctx); … … 467 341 h->ctx = ctx; 468 342 469 ret = db_ctdb_transaction_fetch_start(h); 470 if (ret != 0) { 471 talloc_free(h); 343 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", 344 (unsigned int)ctx->db_id); 345 if (h->lock_name == NULL) { 346 DEBUG(0, ("talloc_asprintf failed\n")); 347 TALLOC_FREE(h); 472 348 return -1; 473 349 } 474 350 351 /* 352 * Wait a day, i.e. forever... 353 */ 354 status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE, 355 timeval_set(86400, 0)); 356 if (!NT_STATUS_IS_OK(status)) { 357 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status))); 358 TALLOC_FREE(h); 359 return -1; 360 } 361 475 362 talloc_set_destructor(h, db_ctdb_transaction_destructor); 476 363 … … 482 369 } 483 370 484 371 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf, 372 TDB_DATA key, 373 struct ctdb_ltdb_header *pheader, 374 TALLOC_CTX *mem_ctx, 375 TDB_DATA *pdata) 376 { 377 struct ctdb_rec_data *rec = NULL; 378 struct ctdb_ltdb_header h; 379 bool found = false; 380 TDB_DATA data; 381 int i; 382 383 if (buf == NULL) { 384 return false; 385 } 386 387 ZERO_STRUCT(h); 388 ZERO_STRUCT(data); 389 390 /* 391 * Walk the list of records written during this 392 * transaction. If we want to read one we have already 393 * written, return the last written sample. Thus we do not do 394 * a "break;" for the first hit, this record might have been 395 * overwritten later. 396 */ 397 398 for (i=0; i<buf->count; i++) { 399 TDB_DATA tkey, tdata; 400 uint32_t reqid; 401 struct ctdb_ltdb_header hdr; 402 403 ZERO_STRUCT(hdr); 404 405 rec = db_ctdb_marshall_loop_next(buf, rec, &reqid, &hdr, &tkey, 406 &tdata); 407 if (rec == NULL) { 408 return false; 409 } 410 411 if (tdb_data_equal(key, tkey)) { 412 found = true; 413 data = tdata; 414 h = hdr; 415 } 416 } 417 418 if (!found) { 419 return false; 420 } 421 422 if (pdata != NULL) { 423 data.dptr = (uint8_t *)talloc_memdup(mem_ctx, data.dptr, 424 data.dsize); 425 if ((data.dsize != 0) && (data.dptr == NULL)) { 426 return false; 427 } 428 *pdata = data; 429 } 430 431 if (pheader != NULL) { 432 *pheader = h; 433 } 434 435 return true; 436 } 485 437 486 438 /* … … 493 445 struct db_ctdb_transaction_handle *h = db->transaction; 494 446 NTSTATUS status; 447 bool found; 448 449 found = pull_newest_from_marshall_buffer(h->m_write, key, NULL, 450 mem_ctx, data); 451 if (found) { 452 return 0; 453 } 495 454 496 455 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data); … … 502 461 } 503 462 504 if (!h->in_replay) {505 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key,NULL, *data);506 507 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));508 data->dsize = 0;509 talloc_free(data->dptr);510 return -1;511 }463 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, 464 NULL, *data); 465 if (h->m_all == NULL) { 466 DEBUG(0,(__location__ " Failed to add to marshalling " 467 "record\n")); 468 data->dsize = 0; 469 talloc_free(data->dptr); 470 return -1; 512 471 } 513 472 … … 543 502 result->store = db_ctdb_store_transaction; 544 503 result->delete_rec = db_ctdb_delete_transaction; 504 505 if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key, 506 NULL, result, &result->value)) { 507 return result; 508 } 545 509 546 510 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key); … … 616 580 stores a record inside a transaction 617 581 */ 618 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,619 582 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 583 TDB_DATA key, TDB_DATA data) 620 584 { 621 585 TALLOC_CTX *tmp_ctx = talloc_new(h); 622 int ret;623 586 TDB_DATA rec; 624 587 struct ctdb_ltdb_header header; 625 NTSTATUS status; 588 589 ZERO_STRUCT(header); 626 590 627 591 /* we need the header so we can update the RSN */ 628 rec = tdb_fetch(h->ctx->wtdb->tdb, key); 629 if (rec.dptr == NULL) { 630 /* the record doesn't exist - create one with us as dmaster. 631 This is only safe because we are in a transaction and this 632 is a persistent database */ 633 ZERO_STRUCT(header); 634 } else { 635 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header)); 636 rec.dsize -= sizeof(struct ctdb_ltdb_header); 637 /* a special case, we are writing the same data that is there now */ 638 if (data.dsize == rec.dsize && 639 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) { 640 SAFE_FREE(rec.dptr); 641 talloc_free(tmp_ctx); 642 return 0; 592 593 if (!pull_newest_from_marshall_buffer(h->m_write, key, &header, 594 NULL, NULL)) { 595 596 rec = tdb_fetch(h->ctx->wtdb->tdb, key); 597 598 if (rec.dptr != NULL) { 599 memcpy(&header, rec.dptr, 600 sizeof(struct ctdb_ltdb_header)); 601 rec.dsize -= sizeof(struct ctdb_ltdb_header); 602 603 /* 604 * a special case, we are writing the same 605 * data that is there now 606 */ 607 if (data.dsize == rec.dsize && 608 memcmp(data.dptr, 609 rec.dptr + sizeof(struct ctdb_ltdb_header), 610 data.dsize) == 0) { 611 SAFE_FREE(rec.dptr); 612 talloc_free(tmp_ctx); 613 return NT_STATUS_OK; 614 } 643 615 } 644 616 SAFE_FREE(rec.dptr); … … 648 620 header.rsn++; 649 621 650 if (!h->in_replay) {651 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key,NULL, data);652 653 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));654 talloc_free(tmp_ctx);655 return -1;656 }622 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, 623 NULL, data); 624 if (h->m_all == NULL) { 625 DEBUG(0,(__location__ " Failed to add to marshalling " 626 "record\n")); 627 talloc_free(tmp_ctx); 628 return NT_STATUS_NO_MEMORY; 657 629 } 658 630 … … 661 633 DEBUG(0,(__location__ " Failed to add to marshalling record\n")); 662 634 talloc_free(tmp_ctx); 663 return -1; 664 } 665 666 status = db_ctdb_ltdb_store(h->ctx, key, &header, data); 667 if (NT_STATUS_IS_OK(status)) { 668 ret = 0; 669 } else { 670 ret = -1; 635 return NT_STATUS_NO_MEMORY; 671 636 } 672 637 673 638 talloc_free(tmp_ctx); 674 675 return ret; 639 return NT_STATUS_OK; 676 640 } 677 641 … … 684 648 struct db_ctdb_transaction_handle *h = talloc_get_type_abort( 685 649 rec->private_data, struct db_ctdb_transaction_handle); 686 int ret; 687 688 ret = db_ctdb_transaction_store(h, rec->key, data); 689 if (ret != 0) { 690 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb); 691 } 692 return NT_STATUS_OK; 650 NTSTATUS status; 651 652 status = db_ctdb_transaction_store(h, rec->key, data); 653 return status; 693 654 } 694 655 … … 700 661 struct db_ctdb_transaction_handle *h = talloc_get_type_abort( 701 662 rec->private_data, struct db_ctdb_transaction_handle); 702 int ret;703 704 ret =db_ctdb_transaction_store(h, rec->key, tdb_null);705 if (ret != 0) {706 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb); 707 } 708 return NT_STATUS_OK; 709 } 710 711 712 /* 713 replay a transaction 714 */ 715 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h) 716 { 717 int ret, i;718 struct ctdb_ rec_data *rec = NULL;719 720 h->in_replay = true; 721 talloc_free(h->m_write);722 h->m_write = NULL;723 724 ret = db_ctdb_transaction_fetch_start(h); 725 if (ret != 0) {726 return ret; 727 }728 729 for (i=0;i<h->m_all->count;i++) {730 TDB_DATA key, data;731 732 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);733 if (rec == NULL) { 734 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));735 goto failed; 736 }737 738 if (rec->reqid == 0) {739 /* its a store */740 if (db_ctdb_transaction_store(h, key, data) != 0) { 741 goto failed;742 } 743 } else { 744 TDB_DATA data2;745 TALLOC_CTX *tmp_ctx = talloc_new(h);746 747 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) { 748 talloc_free(tmp_ctx); 749 goto failed; 750 } 751 if (data2.dsize != data.dsize || 752 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {753 /* the record has changed on us - we have to give up */ 754 talloc_free(tmp_ctx);755 goto failed;756 }757 talloc_free(tmp_ctx);758 } 759 }760 761 return 0;762 763 failed: 764 tdb_transaction_cancel(h->ctx->wtdb->tdb);765 return -1; 766 } 767 663 NTSTATUS status; 664 665 status = db_ctdb_transaction_store(h, rec->key, tdb_null); 666 return status; 667 } 668 669 /** 670 * Fetch the db sequence number of a persistent db directly from the db. 671 */ 672 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db, 673 uint64_t *seqnum) 674 { 675 NTSTATUS status; 676 const char *keyname = CTDB_DB_SEQNUM_KEY; 677 TDB_DATA key; 678 TDB_DATA data; 679 struct ctdb_ltdb_header header; 680 TALLOC_CTX *mem_ctx = talloc_stackframe(); 681 682 if (seqnum == NULL) { 683 return NT_STATUS_INVALID_PARAMETER; 684 } 685 686 key = string_term_tdb_data(keyname); 687 688 status = db_ctdb_ltdb_fetch(db, key, &header, mem_ctx, &data); 689 if (!NT_STATUS_IS_OK(status) && 690 !NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) 691 { 692 goto done; 693 } 694 695 status = NT_STATUS_OK; 696 697 if (data.dsize != sizeof(uint64_t)) { 698 *seqnum = 0; 699 goto done; 700 } 701 702 *seqnum = *(uint64_t *)data.dptr; 703 704 done: 705 TALLOC_FREE(mem_ctx); 706 return status; 707 } 708 709 /** 710 * Store the database sequence number inside a transaction. 711 */ 712 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h, 713 uint64_t seqnum) 714 { 715 NTSTATUS status; 716 const char *keyname = CTDB_DB_SEQNUM_KEY; 717 TDB_DATA key; 718 TDB_DATA data; 719 720 key = string_term_tdb_data(keyname); 721 722 data.dptr = (uint8_t *)&seqnum; 723 data.dsize = sizeof(uint64_t); 724 725 status = db_ctdb_transaction_store(h, key, data); 726 727 return status; 728 } 768 729 769 730 /* … … 775 736 struct db_ctdb_ctx); 776 737 NTSTATUS rets; 738 int status; 739 struct db_ctdb_transaction_handle *h = ctx->transaction; 740 uint64_t old_seqnum, new_seqnum; 777 741 int ret; 778 int status;779 int retries = 0;780 struct db_ctdb_transaction_handle *h = ctx->transaction;781 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;782 742 783 743 if (h == NULL) { … … 797 757 } 798 758 759 if (h->m_write == NULL) { 760 /* 761 * No changes were made, so don't change the seqnum, 762 * don't push to other node, just exit with success. 763 */ 764 ret = 0; 765 goto done; 766 } 767 799 768 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id)); 800 769 801 talloc_set_destructor(h, NULL); 802 803 /* our commit strategy is quite complex. 804 805 - we first try to commit the changes to all other nodes 806 807 - if that works, then we commit locally and we are done 808 809 - if a commit on another node fails, then we need to cancel 810 the transaction, then restart the transaction (thus 811 opening a window of time for a pending recovery to 812 complete), then replay the transaction, checking all the 813 reads and writes (checking that reads give the same data, 814 and writes succeed). Then we retry the transaction to the 815 other nodes 816 */ 770 /* 771 * As the last db action before committing, bump the database sequence 772 * number. Note that this undoes all changes to the seqnum records 773 * performed under the transaction. This record is not meant to be 774 * modified by user interaction. It is for internal use only... 775 */ 776 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum); 777 if (!NT_STATUS_IS_OK(rets)) { 778 DEBUG(1, (__location__ " failed to fetch the db sequence number " 779 "in transaction commit on db 0x%08x\n", ctx->db_id)); 780 ret = -1; 781 goto done; 782 } 783 784 new_seqnum = old_seqnum + 1; 785 786 rets = db_ctdb_store_db_seqnum(h, new_seqnum); 787 if (!NT_STATUS_IS_OK(rets)) { 788 DEBUG(1, (__location__ "failed to store the db sequence number " 789 " in transaction commit on db 0x%08x\n", ctx->db_id)); 790 ret = -1; 791 goto done; 792 } 817 793 818 794 again: 819 if (h->m_write == NULL) {820 /* no changes were made, potentially after a retry */821 tdb_transaction_cancel(h->ctx->wtdb->tdb);822 talloc_free(h);823 ctx->transaction = NULL;824 return 0;825 }826 827 795 /* tell ctdbd to commit to the other nodes */ 828 rets = ctdbd_control_local(messaging_ctdbd_connection(), 829 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,796 rets = ctdbd_control_local(messaging_ctdbd_connection(), 797 CTDB_CONTROL_TRANS3_COMMIT, 830 798 h->ctx->db_id, 0, 831 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status); 799 db_ctdb_marshall_finish(h->m_write), 800 NULL, NULL, &status); 832 801 if (!NT_STATUS_IS_OK(rets) || status != 0) { 833 tdb_transaction_cancel(h->ctx->wtdb->tdb); 834 sleep(1); 835 802 /* 803 * The TRANS3_COMMIT control should only possibly fail when a 804 * recovery has been running concurrently. In any case, the db 805 * will be the same on all nodes, either the new copy or the 806 * old copy. This can be detected by comparing the old and new 807 * local sequence numbers. 808 */ 809 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum); 836 810 if (!NT_STATUS_IS_OK(rets)) { 837 failure_control = CTDB_CONTROL_TRANS2_ERROR; 838 } else { 839 /* work out what error code we will give if we 840 have to fail the operation */ 841 switch ((enum ctdb_trans2_commit_error)status) { 842 case CTDB_TRANS2_COMMIT_SUCCESS: 843 case CTDB_TRANS2_COMMIT_SOMEFAIL: 844 case CTDB_TRANS2_COMMIT_TIMEOUT: 845 failure_control = CTDB_CONTROL_TRANS2_ERROR; 846 break; 847 case CTDB_TRANS2_COMMIT_ALLFAIL: 848 failure_control = CTDB_CONTROL_TRANS2_FINISHED; 849 break; 850 } 851 } 852 853 if (++retries == 100) { 854 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 855 h->ctx->db_id, retries, (unsigned)failure_control)); 856 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, 857 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 858 tdb_null, NULL, NULL, NULL); 859 h->ctx->transaction = NULL; 860 talloc_free(h); 861 ctx->transaction = NULL; 862 return -1; 863 } 864 865 if (ctdb_replay_transaction(h) != 0) { 866 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n", 867 (unsigned)failure_control)); 868 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, 869 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 870 tdb_null, NULL, NULL, NULL); 871 h->ctx->transaction = NULL; 872 talloc_free(h); 873 ctx->transaction = NULL; 874 return -1; 875 } 876 goto again; 877 } else { 878 failure_control = CTDB_CONTROL_TRANS2_ERROR; 879 } 880 881 /* do the real commit locally */ 882 ret = tdb_transaction_commit(h->ctx->wtdb->tdb); 883 if (ret != 0) { 884 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n", 885 (unsigned)failure_control)); 886 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 887 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL); 888 h->ctx->transaction = NULL; 889 talloc_free(h); 890 return ret; 891 } 892 893 /* tell ctdbd that we are finished with our local commit */ 894 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 895 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 896 tdb_null, NULL, NULL, NULL); 811 DEBUG(1, (__location__ " failed to refetch db sequence " 812 "number after failed TRANS3_COMMIT\n")); 813 ret = -1; 814 goto done; 815 } 816 817 if (new_seqnum == old_seqnum) { 818 /* Recovery prevented all our changes: retry. */ 819 goto again; 820 } else if (new_seqnum != (old_seqnum + 1)) { 821 DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != " 822 "old_seqnum[%lu] + (0 or 1) after failed " 823 "TRANS3_COMMIT - this should not happen!\n", 824 (unsigned long)new_seqnum, 825 (unsigned long)old_seqnum)); 826 ret = -1; 827 goto done; 828 } 829 /* 830 * Recovery propagated our changes to all nodes, completing 831 * our commit for us - succeed. 832 */ 833 } 834 835 ret = 0; 836 837 done: 897 838 h->ctx->transaction = NULL; 898 839 talloc_free(h); 899 return 0;840 return ret; 900 841 } 901 842 … … 976 917 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx, 977 918 TALLOC_CTX *mem_ctx, 978 TDB_DATA key, 979 bool persistent) 919 TDB_DATA key) 980 920 { 981 921 struct db_record *result; … … 1105 1045 } 1106 1046 1107 return fetch_locked_internal(ctx, mem_ctx, key , db->persistent);1047 return fetch_locked_internal(ctx, mem_ctx, key); 1108 1048 } 1109 1049 … … 1315 1255 struct db_ctdb_ctx *db_ctdb; 1316 1256 char *db_path; 1257 struct ctdbd_connection *conn; 1317 1258 1318 1259 if (!lp_clustering()) { … … 1336 1277 db_ctdb->db = result; 1337 1278 1338 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) { 1279 conn = messaging_ctdbd_connection(); 1280 1281 if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn, name, &db_ctdb->db_id, tdb_flags))) { 1339 1282 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name)); 1340 1283 TALLOC_FREE(result); … … 1342 1285 } 1343 1286 1344 db_path = ctdbd_dbpath( messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);1287 db_path = ctdbd_dbpath(conn, db_ctdb, db_ctdb->db_id); 1345 1288 1346 1289 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0); … … 1361 1304 } 1362 1305 talloc_free(db_path); 1306 1307 if (result->persistent) { 1308 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb, 1309 ctdb_conn_msg_ctx(conn)); 1310 if (db_ctdb->lock_ctx == NULL) { 1311 DEBUG(0, ("g_lock_ctx_init failed\n")); 1312 TALLOC_FREE(result); 1313 return NULL; 1314 } 1315 } 1363 1316 1364 1317 result->private_data = (void *)db_ctdb;
Note:
See TracChangeset
for help on using the changeset viewer.