Changeset 745 for trunk/server/lib/tdb/common/transaction.c
- Timestamp:
- Nov 27, 2012, 4:43:17 PM (13 years ago)
- Location:
- trunk/server
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/server
- Property svn:mergeinfo changed
/vendor/current merged: 581,587,591,594,597,600,615,618,740
- Property svn:mergeinfo changed
-
trunk/server/lib/tdb/common/transaction.c
r657 r745 9 9 ** library. This does NOT imply that all of Samba is released 10 10 ** under the LGPL 11 11 12 12 This library is free software; you can redistribute it and/or 13 13 modify it under the terms of the GNU Lesser General Public … … 60 60 existing transaction record. If the inner transaction is cancelled 61 61 then a subsequent commit will fail 62 62 63 63 - keep a mirrored copy of the tdb hash chain heads to allow for the 64 64 fast hash heads scan on traverse, updating the mirrored copy in … … 77 77 78 78 - check for a valid recovery record on open of the tdb, while the 79 globallock is held. Automatically recover from the transaction79 open lock is held. Automatically recover from the transaction 80 80 recovery area if needed, then continue with the open as 81 81 usual. This allows for smooth crash recovery with no administrator … … 136 136 tdb_off_t magic_offset; 137 137 138 /* set when the GLOBAL_LOCK has been taken */139 bool global_lock_taken;140 141 138 /* old file size before transaction */ 142 139 tdb_len_t old_map_size; 143 140 144 /* we should re-pack on commit*/145 bool need_repack;141 /* did we expand in this transaction */ 142 bool expanded; 146 143 }; 147 144 … … 189 186 } 190 187 } 191 188 192 189 /* now copy it out of this block */ 193 190 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len); … … 296 293 } 297 294 } 298 295 299 296 /* overwrite part of an existing block */ 300 297 if (buf == NULL) { … … 407 404 } 408 405 409 tdb->transaction->need_repack = true; 410 411 return 0; 412 } 413 414 /* 415 brlock during a transaction - ignore them 416 */ 417 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 418 int rw_type, int lck_type, int probe, size_t len) 419 { 406 tdb->transaction->expanded = true; 407 420 408 return 0; 421 409 } … … 427 415 transaction_oob, 428 416 transaction_expand_file, 429 transaction_brlock430 417 }; 431 418 … … 435 422 transaction is allowed to be pending per tdb_context 436 423 */ 437 int tdb_transaction_start(struct tdb_context *tdb) 424 static int _tdb_transaction_start(struct tdb_context *tdb, 425 enum tdb_lock_flags lockflags) 438 426 { 439 427 /* some sanity checks */ … … 456 444 } 457 445 458 if (tdb ->num_locks != 0 || tdb->global_lock.count) {446 if (tdb_have_extra_locks(tdb)) { 459 447 /* the caller must not have any locks when starting a 460 448 transaction as otherwise we'll be screwed by lack … … 487 475 discussed with Volker, there are a number of ways we could 488 476 make this async, which we will probably do in the future */ 489 if (tdb_transaction_lock(tdb, F_WRLCK ) == -1) {477 if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) { 490 478 SAFE_FREE(tdb->transaction->blocks); 491 479 SAFE_FREE(tdb->transaction); 492 return -1; 493 } 494 480 if ((lockflags & TDB_LOCK_WAIT) == 0) { 481 tdb->ecode = TDB_ERR_NOLOCK; 482 } 483 return -1; 484 } 485 495 486 /* get a read lock from the freelist to the end of file. This 496 487 is upgraded to a write lock during the commit */ 497 488 #ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough. 498 if (tdb_ brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {489 if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) { 499 490 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n")); 500 tdb->ecode = TDB_ERR_LOCK; 501 goto fail; 491 goto fail_allrecord_lock; 502 492 } 503 493 #endif … … 531 521 tdb_trace(tdb, "tdb_transaction_start"); 532 522 return 0; 533 523 534 524 fail: 535 525 #ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough. 536 tdb_ brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);526 tdb_allrecord_unlock(tdb, F_RDLCK, false); 537 527 #endif 538 tdb_transaction_unlock(tdb); 528 fail_allrecord_lock: 529 tdb_transaction_unlock(tdb, F_WRLCK); 539 530 SAFE_FREE(tdb->transaction->blocks); 540 531 SAFE_FREE(tdb->transaction->hash_heads); … … 543 534 } 544 535 536 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb) 537 { 538 return _tdb_transaction_start(tdb, TDB_LOCK_WAIT); 539 } 540 541 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb) 542 { 543 return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE); 544 } 545 545 546 546 /* … … 553 553 } 554 554 555 #ifdef HAVE_FDATASYNC 556 if (fdatasync(tdb->fd) != 0) { 557 #else 555 558 if (fsync(tdb->fd) != 0) { 559 #endif 556 560 tdb->ecode = TDB_ERR_IO; 557 561 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n")); … … 574 578 575 579 576 int _tdb_transaction_cancel(struct tdb_context *tdb)580 static int _tdb_transaction_cancel(struct tdb_context *tdb) 577 581 { 578 582 int i, ret = 0; … … 601 605 if (tdb->transaction->magic_offset) { 602 606 const struct tdb_methods *methods = tdb->transaction->io_methods; 603 uint32_t zero = 0;607 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC; 604 608 605 609 /* remove the recovery marker */ 606 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, & zero, 4) == -1 ||610 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 || 607 611 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) { 608 612 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n")); … … 611 615 } 612 616 613 if (tdb->transaction->global_lock_taken) { 614 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1); 615 tdb->transaction->global_lock_taken = false; 616 } 617 618 /* remove any global lock created during the transaction */ 619 if (tdb->global_lock.count != 0) { 620 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size); 621 tdb->global_lock.count = 0; 622 } 623 624 /* remove any locks created during the transaction */ 625 if (tdb->num_locks != 0) { 626 for (i=0;i<tdb->num_lockrecs;i++) { 627 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list, 628 F_UNLCK,F_SETLKW, 0, 1); 629 } 630 tdb->num_locks = 0; 631 tdb->num_lockrecs = 0; 632 SAFE_FREE(tdb->lockrecs); 633 } 617 /* This also removes the OPEN_LOCK, if we have it. */ 618 tdb_release_transaction_locks(tdb); 634 619 635 620 /* restore the normal io methods */ 636 621 tdb->methods = tdb->transaction->io_methods; 637 622 638 #ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough639 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);640 #endif641 tdb_transaction_unlock(tdb);642 623 SAFE_FREE(tdb->transaction->hash_heads); 643 624 SAFE_FREE(tdb->transaction); 644 625 645 626 return ret; 646 627 } … … 649 630 cancel the current transaction 650 631 */ 651 int tdb_transaction_cancel(struct tdb_context *tdb)632 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb) 652 633 { 653 634 tdb_trace(tdb, "tdb_transaction_cancel"); … … 682 663 } 683 664 665 int tdb_recovery_area(struct tdb_context *tdb, 666 const struct tdb_methods *methods, 667 tdb_off_t *recovery_offset, 668 struct tdb_record *rec) 669 { 670 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) { 671 return -1; 672 } 673 674 if (*recovery_offset == 0) { 675 rec->rec_len = 0; 676 return 0; 677 } 678 679 if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec), 680 DOCONV()) == -1) { 681 return -1; 682 } 683 684 /* ignore invalid recovery regions: can happen in crash */ 685 if (rec->magic != TDB_RECOVERY_MAGIC && 686 rec->magic != TDB_RECOVERY_INVALID_MAGIC) { 687 *recovery_offset = 0; 688 rec->rec_len = 0; 689 } 690 return 0; 691 } 692 684 693 /* 685 694 allocate the recovery area, or use an existing recovery area if it is … … 695 704 tdb_off_t recovery_head; 696 705 697 if (tdb_ ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {706 if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) { 698 707 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n")); 699 return -1;700 }701 702 rec.rec_len = 0;703 704 if (recovery_head != 0 &&705 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {706 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));707 708 return -1; 708 709 } … … 800 801 memset(rec, 0, sizeof(*rec)); 801 802 802 rec->magic = 0;803 rec->magic = TDB_RECOVERY_INVALID_MAGIC; 803 804 rec->data_len = recovery_size; 804 805 rec->rec_len = recovery_max_size; 805 806 rec->key_len = old_map_size; 806 CONVERT( rec);807 CONVERT(*rec); 807 808 808 809 /* build the recovery data into a single blob to allow us to do a single … … 822 823 length = tdb->transaction->last_block_size; 823 824 } 824 825 825 826 if (offset >= old_map_size) { 826 827 continue; … … 851 852 tailer = sizeof(*rec) + recovery_max_size; 852 853 memcpy(p, &tailer, 4); 853 CONVERT(p); 854 if (DOCONV()) { 855 tdb_convert(p, 4); 856 } 854 857 855 858 /* write the recovery data to the recovery area */ … … 935 938 936 939 methods = tdb->transaction->io_methods; 937 940 938 941 /* if there are any locks pending then the caller has not 939 942 nested their locks properly, so fail the transaction */ 940 if (tdb ->num_locks || tdb->global_lock.count) {943 if (tdb_have_extra_locks(tdb)) { 941 944 tdb->ecode = TDB_ERR_LOCK; 942 945 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n")); … … 947 950 /* upgrade the main transaction lock region to a write lock */ 948 951 #ifndef __OS2__ // YD the global lock is an exclusive lock for us, it is enough 949 if (tdb_ brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {952 if (tdb_allrecord_upgrade(tdb) == -1) { 950 953 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n")); 951 tdb->ecode = TDB_ERR_LOCK;952 954 _tdb_transaction_cancel(tdb); 953 955 return -1; … … 955 957 #endif 956 958 957 /* get the globallock - this prevents new users attaching to the database959 /* get the open lock - this prevents new users attaching to the database 958 960 during the commit */ 959 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) { 960 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n")); 961 tdb->ecode = TDB_ERR_LOCK; 961 if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) { 962 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n")); 962 963 _tdb_transaction_cancel(tdb); 963 964 return -1; 964 965 } 965 966 tdb->transaction->global_lock_taken = true;967 966 968 967 if (!(tdb->flags & TDB_NOSYNC)) { … … 991 990 } 992 991 993 /* Keep the globallock until the actual commit */992 /* Keep the open lock until the actual commit */ 994 993 995 994 return 0; … … 999 998 prepare to commit the current transaction 1000 999 */ 1001 int tdb_transaction_prepare_commit(struct tdb_context *tdb)1002 { 1000 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb) 1001 { 1003 1002 tdb_trace(tdb, "tdb_transaction_prepare_commit"); 1004 1003 return _tdb_transaction_prepare_commit(tdb); 1005 1004 } 1006 1005 1006 /* A repack is worthwhile if the largest is less than half total free. */ 1007 static bool repack_worthwhile(struct tdb_context *tdb) 1008 { 1009 tdb_off_t ptr; 1010 struct tdb_record rec; 1011 tdb_len_t total = 0, largest = 0; 1012 1013 if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) { 1014 return false; 1015 } 1016 1017 while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) { 1018 total += rec.rec_len; 1019 if (rec.rec_len > largest) { 1020 largest = rec.rec_len; 1021 } 1022 ptr = rec.next; 1023 } 1024 1025 return total > largest * 2; 1026 } 1027 1007 1028 /* 1008 1029 commit the current transaction 1009 1030 */ 1010 int tdb_transaction_commit(struct tdb_context *tdb)1011 { 1031 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb) 1032 { 1012 1033 const struct tdb_methods *methods; 1013 1034 int i; 1014 bool need_repack ;1035 bool need_repack = false; 1015 1036 1016 1037 if (tdb->transaction == NULL) { … … 1020 1041 1021 1042 tdb_trace(tdb, "tdb_transaction_commit"); 1043 1022 1044 if (tdb->transaction->transaction_error) { 1023 1045 tdb->ecode = TDB_ERR_IO; … … 1026 1048 return -1; 1027 1049 } 1050 1028 1051 1029 1052 if (tdb->transaction->nesting != 0) { … … 1060 1083 length = tdb->transaction->last_block_size; 1061 1084 } 1085 1062 1086 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) { 1063 1087 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n")); 1064 1088 1065 1089 /* we've overwritten part of the data and 1066 1090 possibly expanded the file, so we need to … … 1076 1100 SAFE_FREE(tdb->transaction->blocks[i]); 1077 1101 } 1102 1103 /* Do this before we drop lock or blocks. */ 1104 if (tdb->transaction->expanded) { 1105 need_repack = repack_worthwhile(tdb); 1106 } 1078 1107 1079 1108 SAFE_FREE(tdb->transaction->blocks); … … 1100 1129 #endif 1101 1130 1102 need_repack = tdb->transaction->need_repack;1103 1104 1131 /* use a transaction cancel to free memory and remove the 1105 1132 transaction locks */ … … 1116 1143 /* 1117 1144 recover from an aborted transaction. Must be called with exclusive 1118 database write access already established (including the global1145 database write access already established (including the open 1119 1146 lock to prevent new processes attaching) 1120 1147 */ … … 1217 1244 return -1; 1218 1245 } 1219 1220 /* reduce the file size to the old size */1221 tdb_munmap(tdb);1222 if (ftruncate(tdb->fd, recovery_eof) != 0) {1223 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));1224 tdb->ecode = TDB_ERR_IO;1225 return -1;1226 }1227 tdb->map_size = recovery_eof;1228 tdb_mmap(tdb);1229 1246 1230 1247 if (transaction_sync(tdb, 0, recovery_eof) == -1) { … … 1240 1257 return 0; 1241 1258 } 1259 1260 /* Any I/O failures we say "needs recovery". */ 1261 bool tdb_needs_recovery(struct tdb_context *tdb) 1262 { 1263 tdb_off_t recovery_head; 1264 struct tdb_record rec; 1265 1266 /* find the recovery area */ 1267 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) { 1268 return true; 1269 } 1270 1271 if (recovery_head == 0) { 1272 /* we have never allocated a recovery record */ 1273 return false; 1274 } 1275 1276 /* read the recovery record */ 1277 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 1278 sizeof(rec), DOCONV()) == -1) { 1279 return true; 1280 } 1281 1282 return (rec.magic == TDB_RECOVERY_MAGIC); 1283 }
Note:
See TracChangeset
for help on using the changeset viewer.