source: branches/samba-3.3.x/source/lib/tdb/common/transaction.c

Last change on this file was 206, checked in by Herwig Bauernfeind, 16 years ago

Import Samba 3.3 branch at 3.0.0 level (psmedley's port)

File size: 34.1 KB
Line 
1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 2005
7
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
11
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
16
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
21
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
24*/
25
26#include "tdb_private.h"
27
28/*
29 transaction design:
30
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
35
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
40
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
45
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
50 to the real database.
51
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
55
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
58
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
62
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
66
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
70
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
77
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
82 intervention.
83
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
87
88*/
89
90
91/*
92 hold the context of any current transaction
93*/
94struct tdb_transaction {
95 /* we keep a mirrored copy of the tdb hash heads here so
96 tdb_next_hash_chain() can operate efficiently */
97 uint32_t *hash_heads;
98
99 /* the original io methods - used to do IOs to the real db */
100 const struct tdb_methods *io_methods;
101
102 /* the list of transaction blocks. When a block is first
103 written to, it gets created in this list */
104 uint8_t **blocks;
105 uint32_t num_blocks;
106 uint32_t block_size; /* bytes in each block */
107 uint32_t last_block_size; /* number of valid bytes in the last block */
108
109 /* non-zero when an internal transaction error has
110 occurred. All write operations will then fail until the
111 transaction is ended */
112 int transaction_error;
113
114 /* when inside a transaction we need to keep track of any
115 nested tdb_transaction_start() calls, as these are allowed,
116 but don't create a new transaction */
117 int nesting;
118
119 /* old file size before transaction */
120 tdb_len_t old_map_size;
121};
122
123
124/*
125 read while in a transaction. We need to check first if the data is in our list
126 of transaction elements, then if not do a real read
127*/
128static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129 tdb_len_t len, int cv)
130{
131 uint32_t blk;
132
133 /* break it down into block sized ops */
134 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137 return -1;
138 }
139 len -= len2;
140 off += len2;
141 buf = (void *)(len2 + (char *)buf);
142 }
143
144 if (len == 0) {
145 return 0;
146 }
147
148 blk = off / tdb->transaction->block_size;
149
150 /* see if we have it in the block list */
151 if (tdb->transaction->num_blocks <= blk ||
152 tdb->transaction->blocks[blk] == NULL) {
153 /* nope, do a real read */
154 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155 goto fail;
156 }
157 return 0;
158 }
159
160 /* it is in the block list. Now check for the last block */
161 if (blk == tdb->transaction->num_blocks-1) {
162 if (len > tdb->transaction->last_block_size) {
163 goto fail;
164 }
165 }
166
167 /* now copy it out of this block */
168 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169 if (cv) {
170 tdb_convert(buf, len);
171 }
172 return 0;
173
174fail:
175 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176 tdb->ecode = TDB_ERR_IO;
177 tdb->transaction->transaction_error = 1;
178 return -1;
179}
180
181
182/*
183 write while in a transaction
184*/
185static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
186 const void *buf, tdb_len_t len)
187{
188 uint32_t blk;
189
190 /* if the write is to a hash head, then update the transaction
191 hash heads */
192 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
196 }
197
198 /* break it up into block sized chunks */
199 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201 if (transaction_write(tdb, off, buf, len2) != 0) {
202 return -1;
203 }
204 len -= len2;
205 off += len2;
206 if (buf != NULL) {
207 buf = (const void *)(len2 + (const char *)buf);
208 }
209 }
210
211 if (len == 0) {
212 return 0;
213 }
214
215 blk = off / tdb->transaction->block_size;
216 off = off % tdb->transaction->block_size;
217
218 if (tdb->transaction->num_blocks <= blk) {
219 uint8_t **new_blocks;
220 /* expand the blocks array */
221 if (tdb->transaction->blocks == NULL) {
222 new_blocks = (uint8_t **)malloc(
223 (blk+1)*sizeof(uint8_t *));
224 } else {
225 new_blocks = (uint8_t **)realloc(
226 tdb->transaction->blocks,
227 (blk+1)*sizeof(uint8_t *));
228 }
229 if (new_blocks == NULL) {
230 tdb->ecode = TDB_ERR_OOM;
231 goto fail;
232 }
233 memset(&new_blocks[tdb->transaction->num_blocks], 0,
234 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235 tdb->transaction->blocks = new_blocks;
236 tdb->transaction->num_blocks = blk+1;
237 tdb->transaction->last_block_size = 0;
238 }
239
240 /* allocate and fill a block? */
241 if (tdb->transaction->blocks[blk] == NULL) {
242 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
243 if (tdb->transaction->blocks[blk] == NULL) {
244 tdb->ecode = TDB_ERR_OOM;
245 tdb->transaction->transaction_error = 1;
246 return -1;
247 }
248 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
249 tdb_len_t len2 = tdb->transaction->block_size;
250 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
251 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
252 }
253 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
254 tdb->transaction->blocks[blk],
255 len2, 0) != 0) {
256 SAFE_FREE(tdb->transaction->blocks[blk]);
257 tdb->ecode = TDB_ERR_IO;
258 goto fail;
259 }
260 if (blk == tdb->transaction->num_blocks-1) {
261 tdb->transaction->last_block_size = len2;
262 }
263 }
264 }
265
266 /* overwrite part of an existing block */
267 if (buf == NULL) {
268 memset(tdb->transaction->blocks[blk] + off, 0, len);
269 } else {
270 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271 }
272 if (blk == tdb->transaction->num_blocks-1) {
273 if (len + off > tdb->transaction->last_block_size) {
274 tdb->transaction->last_block_size = len + off;
275 }
276 }
277
278 return 0;
279
280fail:
281 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
282 (blk*tdb->transaction->block_size) + off, len));
283 tdb->transaction->transaction_error = 1;
284 return -1;
285}
286
287
288/*
289 write while in a transaction - this varient never expands the transaction blocks, it only
290 updates existing blocks. This means it cannot change the recovery size
291*/
292static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
293 const void *buf, tdb_len_t len)
294{
295 uint32_t blk;
296
297 /* break it up into block sized chunks */
298 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
299 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
300 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
301 return -1;
302 }
303 len -= len2;
304 off += len2;
305 if (buf != NULL) {
306 buf = (const void *)(len2 + (const char *)buf);
307 }
308 }
309
310 if (len == 0) {
311 return 0;
312 }
313
314 blk = off / tdb->transaction->block_size;
315 off = off % tdb->transaction->block_size;
316
317 if (tdb->transaction->num_blocks <= blk ||
318 tdb->transaction->blocks[blk] == NULL) {
319 return 0;
320 }
321
322 if (blk == tdb->transaction->num_blocks-1 &&
323 off + len > tdb->transaction->last_block_size) {
324 if (off >= tdb->transaction->last_block_size) {
325 return 0;
326 }
327 len = tdb->transaction->last_block_size - off;
328 }
329
330 /* overwrite part of an existing block */
331 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
332
333 return 0;
334}
335
336
337/*
338 accelerated hash chain head search, using the cached hash heads
339*/
340static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
341{
342 uint32_t h = *chain;
343 for (;h < tdb->header.hash_size;h++) {
344 /* the +1 takes account of the freelist */
345 if (0 != tdb->transaction->hash_heads[h+1]) {
346 break;
347 }
348 }
349 (*chain) = h;
350}
351
352/*
353 out of bounds check during a transaction
354*/
355static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
356{
357 if (len <= tdb->map_size) {
358 return 0;
359 }
360 return TDB_ERRCODE(TDB_ERR_IO, -1);
361}
362
363/*
364 transaction version of tdb_expand().
365*/
366static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
367 tdb_off_t addition)
368{
369 /* add a write to the transaction elements, so subsequent
370 reads see the zero data */
371 if (transaction_write(tdb, size, NULL, addition) != 0) {
372 return -1;
373 }
374
375 return 0;
376}
377
378/*
379 brlock during a transaction - ignore them
380*/
381static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
382 int rw_type, int lck_type, int probe, size_t len)
383{
384 return 0;
385}
386
387static const struct tdb_methods transaction_methods = {
388 transaction_read,
389 transaction_write,
390 transaction_next_hash_chain,
391 transaction_oob,
392 transaction_expand_file,
393 transaction_brlock
394};
395
396
397/*
398 start a tdb transaction. No token is returned, as only a single
399 transaction is allowed to be pending per tdb_context
400*/
401int tdb_transaction_start(struct tdb_context *tdb)
402{
403 /* some sanity checks */
404 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
405 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
406 tdb->ecode = TDB_ERR_EINVAL;
407 return -1;
408 }
409
410 /* cope with nested tdb_transaction_start() calls */
411 if (tdb->transaction != NULL) {
412 tdb->transaction->nesting++;
413 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
414 tdb->transaction->nesting));
415 return 0;
416 }
417
418 if (tdb->num_locks != 0 || tdb->global_lock.count) {
419 /* the caller must not have any locks when starting a
420 transaction as otherwise we'll be screwed by lack
421 of nested locks in posix */
422 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
423 tdb->ecode = TDB_ERR_LOCK;
424 return -1;
425 }
426
427 if (tdb->travlocks.next != NULL) {
428 /* you cannot use transactions inside a traverse (although you can use
429 traverse inside a transaction) as otherwise you can end up with
430 deadlock */
431 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
432 tdb->ecode = TDB_ERR_LOCK;
433 return -1;
434 }
435
436 tdb->transaction = (struct tdb_transaction *)
437 calloc(sizeof(struct tdb_transaction), 1);
438 if (tdb->transaction == NULL) {
439 tdb->ecode = TDB_ERR_OOM;
440 return -1;
441 }
442
443 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
444 tdb->transaction->block_size = tdb->page_size;
445
446 /* get the transaction write lock. This is a blocking lock. As
447 discussed with Volker, there are a number of ways we could
448 make this async, which we will probably do in the future */
449 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
450 SAFE_FREE(tdb->transaction->blocks);
451 SAFE_FREE(tdb->transaction);
452 return -1;
453 }
454
455 /* get a read lock from the freelist to the end of file. This
456 is upgraded to a write lock during the commit */
457#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
458 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
459 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
460 tdb->ecode = TDB_ERR_LOCK;
461 goto fail;
462 }
463#endif
464
465 /* setup a copy of the hash table heads so the hash scan in
466 traverse can be fast */
467 tdb->transaction->hash_heads = (uint32_t *)
468 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
469 if (tdb->transaction->hash_heads == NULL) {
470 tdb->ecode = TDB_ERR_OOM;
471 goto fail;
472 }
473 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
474 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
475 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
476 tdb->ecode = TDB_ERR_IO;
477 goto fail;
478 }
479
480 /* make sure we know about any file expansions already done by
481 anyone else */
482 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
483 tdb->transaction->old_map_size = tdb->map_size;
484
485 /* finally hook the io methods, replacing them with
486 transaction specific methods */
487 tdb->transaction->io_methods = tdb->methods;
488 tdb->methods = &transaction_methods;
489
490 return 0;
491
492fail:
493#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
494 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
495#endif
496 tdb_transaction_unlock(tdb);
497 SAFE_FREE(tdb->transaction->blocks);
498 SAFE_FREE(tdb->transaction->hash_heads);
499 SAFE_FREE(tdb->transaction);
500 return -1;
501}
502
503
504/*
505 cancel the current transaction
506*/
507int tdb_transaction_cancel(struct tdb_context *tdb)
508{
509 int i;
510
511 if (tdb->transaction == NULL) {
512 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
513 return -1;
514 }
515
516 if (tdb->transaction->nesting != 0) {
517 tdb->transaction->transaction_error = 1;
518 tdb->transaction->nesting--;
519 return 0;
520 }
521
522 tdb->map_size = tdb->transaction->old_map_size;
523
524 /* free all the transaction blocks */
525 for (i=0;i<tdb->transaction->num_blocks;i++) {
526 if (tdb->transaction->blocks[i] != NULL) {
527 free(tdb->transaction->blocks[i]);
528 }
529 }
530 SAFE_FREE(tdb->transaction->blocks);
531
532 /* remove any global lock created during the transaction */
533 if (tdb->global_lock.count != 0) {
534 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
535 tdb->global_lock.count = 0;
536 }
537
538 /* remove any locks created during the transaction */
539 if (tdb->num_locks != 0) {
540 for (i=0;i<tdb->num_lockrecs;i++) {
541 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
542 F_UNLCK,F_SETLKW, 0, 1);
543 }
544 tdb->num_locks = 0;
545 tdb->num_lockrecs = 0;
546 SAFE_FREE(tdb->lockrecs);
547 }
548
549 /* restore the normal io methods */
550 tdb->methods = tdb->transaction->io_methods;
551
552#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
553 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
554#endif
555 tdb_transaction_unlock(tdb);
556 SAFE_FREE(tdb->transaction->hash_heads);
557 SAFE_FREE(tdb->transaction);
558
559 return 0;
560}
561
562/*
563 sync to disk
564*/
565static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
566{
567 if (fsync(tdb->fd) != 0) {
568 tdb->ecode = TDB_ERR_IO;
569 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
570 return -1;
571 }
572#ifdef HAVE_MMAP
573 if (tdb->map_ptr) {
574 tdb_off_t moffset = offset & ~(tdb->page_size-1);
575 if (msync(moffset + (char *)tdb->map_ptr,
576 length + (offset - moffset), MS_SYNC) != 0) {
577 tdb->ecode = TDB_ERR_IO;
578 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
579 strerror(errno)));
580 return -1;
581 }
582 }
583#endif
584 return 0;
585}
586
587
588/*
589 work out how much space the linearised recovery data will consume
590*/
591static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
592{
593 tdb_len_t recovery_size = 0;
594 int i;
595
596 recovery_size = sizeof(uint32_t);
597 for (i=0;i<tdb->transaction->num_blocks;i++) {
598 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
599 break;
600 }
601 if (tdb->transaction->blocks[i] == NULL) {
602 continue;
603 }
604 recovery_size += 2*sizeof(tdb_off_t);
605 if (i == tdb->transaction->num_blocks-1) {
606 recovery_size += tdb->transaction->last_block_size;
607 } else {
608 recovery_size += tdb->transaction->block_size;
609 }
610 }
611
612 return recovery_size;
613}
614
615/*
616 allocate the recovery area, or use an existing recovery area if it is
617 large enough
618*/
619static int tdb_recovery_allocate(struct tdb_context *tdb,
620 tdb_len_t *recovery_size,
621 tdb_off_t *recovery_offset,
622 tdb_len_t *recovery_max_size)
623{
624 struct list_struct rec;
625 const struct tdb_methods *methods = tdb->transaction->io_methods;
626 tdb_off_t recovery_head;
627
628 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
629 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
630 return -1;
631 }
632
633 rec.rec_len = 0;
634
635 if (recovery_head != 0 &&
636 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
637 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
638 return -1;
639 }
640
641 *recovery_size = tdb_recovery_size(tdb);
642
643 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
644 /* it fits in the existing area */
645 *recovery_max_size = rec.rec_len;
646 *recovery_offset = recovery_head;
647 return 0;
648 }
649
650 /* we need to free up the old recovery area, then allocate a
651 new one at the end of the file. Note that we cannot use
652 tdb_allocate() to allocate the new one as that might return
653 us an area that is being currently used (as of the start of
654 the transaction) */
655 if (recovery_head != 0) {
656 if (tdb_free(tdb, recovery_head, &rec) == -1) {
657 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
658 return -1;
659 }
660 }
661
662 /* the tdb_free() call might have increased the recovery size */
663 *recovery_size = tdb_recovery_size(tdb);
664
665 /* round up to a multiple of page size */
666 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
667 *recovery_offset = tdb->map_size;
668 recovery_head = *recovery_offset;
669
670 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
671 (tdb->map_size - tdb->transaction->old_map_size) +
672 sizeof(rec) + *recovery_max_size) == -1) {
673 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
674 return -1;
675 }
676
677 /* remap the file (if using mmap) */
678 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
679
680 /* we have to reset the old map size so that we don't try to expand the file
681 again in the transaction commit, which would destroy the recovery area */
682 tdb->transaction->old_map_size = tdb->map_size;
683
684 /* write the recovery header offset and sync - we can sync without a race here
685 as the magic ptr in the recovery record has not been set */
686 CONVERT(recovery_head);
687 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
688 &recovery_head, sizeof(tdb_off_t)) == -1) {
689 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
690 return -1;
691 }
692 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
693 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
694 return -1;
695 }
696
697 return 0;
698}
699
700
701/*
702 setup the recovery data that will be used on a crash during commit
703*/
704static int transaction_setup_recovery(struct tdb_context *tdb,
705 tdb_off_t *magic_offset)
706{
707 tdb_len_t recovery_size;
708 unsigned char *data, *p;
709 const struct tdb_methods *methods = tdb->transaction->io_methods;
710 struct list_struct *rec;
711 tdb_off_t recovery_offset, recovery_max_size;
712 tdb_off_t old_map_size = tdb->transaction->old_map_size;
713 uint32_t magic, tailer;
714 int i;
715
716 /*
717 check that the recovery area has enough space
718 */
719 if (tdb_recovery_allocate(tdb, &recovery_size,
720 &recovery_offset, &recovery_max_size) == -1) {
721 return -1;
722 }
723
724 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
725 if (data == NULL) {
726 tdb->ecode = TDB_ERR_OOM;
727 return -1;
728 }
729
730 rec = (struct list_struct *)data;
731 memset(rec, 0, sizeof(*rec));
732
733 rec->magic = 0;
734 rec->data_len = recovery_size;
735 rec->rec_len = recovery_max_size;
736 rec->key_len = old_map_size;
737 CONVERT(rec);
738
739 /* build the recovery data into a single blob to allow us to do a single
740 large write, which should be more efficient */
741 p = data + sizeof(*rec);
742 for (i=0;i<tdb->transaction->num_blocks;i++) {
743 tdb_off_t offset;
744 tdb_len_t length;
745
746 if (tdb->transaction->blocks[i] == NULL) {
747 continue;
748 }
749
750 offset = i * tdb->transaction->block_size;
751 length = tdb->transaction->block_size;
752 if (i == tdb->transaction->num_blocks-1) {
753 length = tdb->transaction->last_block_size;
754 }
755
756 if (offset >= old_map_size) {
757 continue;
758 }
759 if (offset + length > tdb->transaction->old_map_size) {
760 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
761 free(data);
762 tdb->ecode = TDB_ERR_CORRUPT;
763 return -1;
764 }
765 memcpy(p, &offset, 4);
766 memcpy(p+4, &length, 4);
767 if (DOCONV()) {
768 tdb_convert(p, 8);
769 }
770 /* the recovery area contains the old data, not the
771 new data, so we have to call the original tdb_read
772 method to get it */
773 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
774 free(data);
775 tdb->ecode = TDB_ERR_IO;
776 return -1;
777 }
778 p += 8 + length;
779 }
780
781 /* and the tailer */
782 tailer = sizeof(*rec) + recovery_max_size;
783 memcpy(p, &tailer, 4);
784 CONVERT(p);
785
786 /* write the recovery data to the recovery area */
787 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
788 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
789 free(data);
790 tdb->ecode = TDB_ERR_IO;
791 return -1;
792 }
793 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
794 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
795 free(data);
796 tdb->ecode = TDB_ERR_IO;
797 return -1;
798 }
799
800 /* as we don't have ordered writes, we have to sync the recovery
801 data before we update the magic to indicate that the recovery
802 data is present */
803 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
804 free(data);
805 return -1;
806 }
807
808 free(data);
809
810 magic = TDB_RECOVERY_MAGIC;
811 CONVERT(magic);
812
813 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
814
815 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
816 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
817 tdb->ecode = TDB_ERR_IO;
818 return -1;
819 }
820 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
821 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
822 tdb->ecode = TDB_ERR_IO;
823 return -1;
824 }
825
826 /* ensure the recovery magic marker is on disk */
827 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
828 return -1;
829 }
830
831 return 0;
832}
833
834/*
835 commit the current transaction
836*/
837int tdb_transaction_commit(struct tdb_context *tdb)
838{
839 const struct tdb_methods *methods;
840 tdb_off_t magic_offset = 0;
841 uint32_t zero = 0;
842 int i;
843
844 if (tdb->transaction == NULL) {
845 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
846 return -1;
847 }
848
849 if (tdb->transaction->transaction_error) {
850 tdb->ecode = TDB_ERR_IO;
851 tdb_transaction_cancel(tdb);
852 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
853 return -1;
854 }
855
856
857 if (tdb->transaction->nesting != 0) {
858 tdb->transaction->nesting--;
859 return 0;
860 }
861
862 /* check for a null transaction */
863 if (tdb->transaction->blocks == NULL) {
864 tdb_transaction_cancel(tdb);
865 return 0;
866 }
867
868 methods = tdb->transaction->io_methods;
869
870 /* if there are any locks pending then the caller has not
871 nested their locks properly, so fail the transaction */
872 if (tdb->num_locks || tdb->global_lock.count) {
873 tdb->ecode = TDB_ERR_LOCK;
874 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
875 tdb_transaction_cancel(tdb);
876 return -1;
877 }
878
879 /* upgrade the main transaction lock region to a write lock */
880#ifndef __OS2__ // YD the global lock is an exclusive lock for us, it is enough.
881 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
882 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
883 tdb->ecode = TDB_ERR_LOCK;
884 tdb_transaction_cancel(tdb);
885 return -1;
886 }
887#endif
888
889 /* get the global lock - this prevents new users attaching to the database
890 during the commit */
891 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
892 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
893 tdb->ecode = TDB_ERR_LOCK;
894 tdb_transaction_cancel(tdb);
895 return -1;
896 }
897
898 if (!(tdb->flags & TDB_NOSYNC)) {
899 /* write the recovery data to the end of the file */
900 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
901 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
902 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
903 tdb_transaction_cancel(tdb);
904 return -1;
905 }
906 }
907
908 /* expand the file to the new size if needed */
909 if (tdb->map_size != tdb->transaction->old_map_size) {
910 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
911 tdb->map_size -
912 tdb->transaction->old_map_size) == -1) {
913 tdb->ecode = TDB_ERR_IO;
914 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
915 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
916 tdb_transaction_cancel(tdb);
917 return -1;
918 }
919 tdb->map_size = tdb->transaction->old_map_size;
920 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
921 }
922
923 /* perform all the writes */
924 for (i=0;i<tdb->transaction->num_blocks;i++) {
925 tdb_off_t offset;
926 tdb_len_t length;
927
928 if (tdb->transaction->blocks[i] == NULL) {
929 continue;
930 }
931
932 offset = i * tdb->transaction->block_size;
933 length = tdb->transaction->block_size;
934 if (i == tdb->transaction->num_blocks-1) {
935 length = tdb->transaction->last_block_size;
936 }
937
938 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
939 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
940
941 /* we've overwritten part of the data and
942 possibly expanded the file, so we need to
943 run the crash recovery code */
944 tdb->methods = methods;
945 tdb_transaction_recover(tdb);
946
947 tdb_transaction_cancel(tdb);
948 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
949
950 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
951 return -1;
952 }
953 SAFE_FREE(tdb->transaction->blocks[i]);
954 }
955
956 SAFE_FREE(tdb->transaction->blocks);
957 tdb->transaction->num_blocks = 0;
958
959 if (!(tdb->flags & TDB_NOSYNC)) {
960 /* ensure the new data is on disk */
961 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
962 return -1;
963 }
964
965 /* remove the recovery marker */
966 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
967 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
968 return -1;
969 }
970
971 /* ensure the recovery marker has been removed on disk */
972 if (transaction_sync(tdb, magic_offset, 4) == -1) {
973 return -1;
974 }
975 }
976
977 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
978
979 /*
980 TODO: maybe write to some dummy hdr field, or write to magic
981 offset without mmap, before the last sync, instead of the
982 utime() call
983 */
984
985 /* on some systems (like Linux 2.6.x) changes via mmap/msync
986 don't change the mtime of the file, this means the file may
987 not be backed up (as tdb rounding to block sizes means that
988 file size changes are quite rare too). The following forces
989 mtime changes when a transaction completes */
990#ifdef HAVE_UTIME
991 utime(tdb->name, NULL);
992#endif
993
994 /* use a transaction cancel to free memory and remove the
995 transaction locks */
996 tdb_transaction_cancel(tdb);
997
998 return 0;
999}
1000
1001
1002/*
1003 recover from an aborted transaction. Must be called with exclusive
1004 database write access already established (including the global
1005 lock to prevent new processes attaching)
1006*/
1007int tdb_transaction_recover(struct tdb_context *tdb)
1008{
1009 tdb_off_t recovery_head, recovery_eof;
1010 unsigned char *data, *p;
1011 uint32_t zero = 0;
1012 struct list_struct rec;
1013
1014 /* find the recovery area */
1015 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1016 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1017 tdb->ecode = TDB_ERR_IO;
1018 return -1;
1019 }
1020
1021 if (recovery_head == 0) {
1022 /* we have never allocated a recovery record */
1023 return 0;
1024 }
1025
1026 /* read the recovery record */
1027 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1028 sizeof(rec), DOCONV()) == -1) {
1029 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1030 tdb->ecode = TDB_ERR_IO;
1031 return -1;
1032 }
1033
1034 if (rec.magic != TDB_RECOVERY_MAGIC) {
1035 /* there is no valid recovery data */
1036 return 0;
1037 }
1038
1039 if (tdb->read_only) {
1040 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1041 tdb->ecode = TDB_ERR_CORRUPT;
1042 return -1;
1043 }
1044
1045 recovery_eof = rec.key_len;
1046
1047 data = (unsigned char *)malloc(rec.data_len);
1048 if (data == NULL) {
1049 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1050 tdb->ecode = TDB_ERR_OOM;
1051 return -1;
1052 }
1053
1054 /* read the full recovery data */
1055 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1056 rec.data_len, 0) == -1) {
1057 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1058 tdb->ecode = TDB_ERR_IO;
1059 return -1;
1060 }
1061
1062 /* recover the file data */
1063 p = data;
1064 while (p+8 < data + rec.data_len) {
1065 uint32_t ofs, len;
1066 if (DOCONV()) {
1067 tdb_convert(p, 8);
1068 }
1069 memcpy(&ofs, p, 4);
1070 memcpy(&len, p+4, 4);
1071
1072 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1073 free(data);
1074 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1075 tdb->ecode = TDB_ERR_IO;
1076 return -1;
1077 }
1078 p += 8 + len;
1079 }
1080
1081 free(data);
1082
1083 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1084 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1085 tdb->ecode = TDB_ERR_IO;
1086 return -1;
1087 }
1088
1089 /* if the recovery area is after the recovered eof then remove it */
1090 if (recovery_eof <= recovery_head) {
1091 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1092 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1093 tdb->ecode = TDB_ERR_IO;
1094 return -1;
1095 }
1096 }
1097
1098 /* remove the recovery magic */
1099 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1100 &zero) == -1) {
1101 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1102 tdb->ecode = TDB_ERR_IO;
1103 return -1;
1104 }
1105
1106 /* reduce the file size to the old size */
1107 tdb_munmap(tdb);
1108 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1109 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1110 tdb->ecode = TDB_ERR_IO;
1111 return -1;
1112 }
1113 tdb->map_size = recovery_eof;
1114 tdb_mmap(tdb);
1115
1116 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1117 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1118 tdb->ecode = TDB_ERR_IO;
1119 return -1;
1120 }
1121
1122 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1123 recovery_eof));
1124
1125 /* all done */
1126 return 0;
1127}
Note: See TracBrowser for help on using the repository browser.