source: branches/samba-3.2.x/source/lib/tdb/common/transaction.c

Last change on this file was 204, checked in by Herwig Bauernfeind, 16 years ago

Update 3.2 branch to 3.2.4

File size: 34.1 KB
Line 
1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 2005
7
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
11
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
16
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
21
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
24*/
25
26#include "tdb_private.h"
27
28/*
29 transaction design:
30
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
35
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
40
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
45
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
50 to the real database.
51
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
55
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
58
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
62
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
66
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
70
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
77
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
82 intervention.
83
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
87
88*/
89
90
91/*
92 hold the context of any current transaction
93*/
94struct tdb_transaction {
95 /* we keep a mirrored copy of the tdb hash heads here so
96 tdb_next_hash_chain() can operate efficiently */
97 uint32_t *hash_heads;
98
99 /* the original io methods - used to do IOs to the real db */
100 const struct tdb_methods *io_methods;
101
102 /* the list of transaction blocks. When a block is first
103 written to, it gets created in this list */
104 uint8_t **blocks;
105 uint32_t num_blocks;
106 uint32_t block_size; /* bytes in each block */
107 uint32_t last_block_size; /* number of valid bytes in the last block */
108
109 /* non-zero when an internal transaction error has
110 occurred. All write operations will then fail until the
111 transaction is ended */
112 int transaction_error;
113
114 /* when inside a transaction we need to keep track of any
115 nested tdb_transaction_start() calls, as these are allowed,
116 but don't create a new transaction */
117 int nesting;
118
119 /* old file size before transaction */
120 tdb_len_t old_map_size;
121};
122
123
124/*
125 read while in a transaction. We need to check first if the data is in our list
126 of transaction elements, then if not do a real read
127*/
128static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129 tdb_len_t len, int cv)
130{
131 uint32_t blk;
132
133 /* break it down into block sized ops */
134 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137 return -1;
138 }
139 len -= len2;
140 off += len2;
141 buf = (void *)(len2 + (char *)buf);
142 }
143
144 if (len == 0) {
145 return 0;
146 }
147
148 blk = off / tdb->transaction->block_size;
149
150 /* see if we have it in the block list */
151 if (tdb->transaction->num_blocks <= blk ||
152 tdb->transaction->blocks[blk] == NULL) {
153 /* nope, do a real read */
154 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155 goto fail;
156 }
157 return 0;
158 }
159
160 /* it is in the block list. Now check for the last block */
161 if (blk == tdb->transaction->num_blocks-1) {
162 if (len > tdb->transaction->last_block_size) {
163 goto fail;
164 }
165 }
166
167 /* now copy it out of this block */
168 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169 if (cv) {
170 tdb_convert(buf, len);
171 }
172 return 0;
173
174fail:
175 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176 tdb->ecode = TDB_ERR_IO;
177 tdb->transaction->transaction_error = 1;
178 return -1;
179}
180
181
182/*
183 write while in a transaction
184*/
185static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
186 const void *buf, tdb_len_t len)
187{
188 uint32_t blk;
189
190 /* if the write is to a hash head, then update the transaction
191 hash heads */
192 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
196 }
197
198 /* break it up into block sized chunks */
199 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201 if (transaction_write(tdb, off, buf, len2) != 0) {
202 return -1;
203 }
204 len -= len2;
205 off += len2;
206 if (buf != NULL) {
207 buf = (const void *)(len2 + (const char *)buf);
208 }
209 }
210
211 if (len == 0) {
212 return 0;
213 }
214
215 blk = off / tdb->transaction->block_size;
216 off = off % tdb->transaction->block_size;
217
218 if (tdb->transaction->num_blocks <= blk) {
219 uint8_t **new_blocks;
220 /* expand the blocks array */
221 if (tdb->transaction->blocks == NULL) {
222 new_blocks = (uint8_t **)malloc(
223 (blk+1)*sizeof(uint8_t *));
224 } else {
225 new_blocks = (uint8_t **)realloc(
226 tdb->transaction->blocks,
227 (blk+1)*sizeof(uint8_t *));
228 }
229 if (new_blocks == NULL) {
230 tdb->ecode = TDB_ERR_OOM;
231 goto fail;
232 }
233 memset(&new_blocks[tdb->transaction->num_blocks], 0,
234 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235 tdb->transaction->blocks = new_blocks;
236 tdb->transaction->num_blocks = blk+1;
237 tdb->transaction->last_block_size = 0;
238 }
239
240 /* allocate and fill a block? */
241 if (tdb->transaction->blocks[blk] == NULL) {
242 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
243 if (tdb->transaction->blocks[blk] == NULL) {
244 tdb->ecode = TDB_ERR_OOM;
245 tdb->transaction->transaction_error = 1;
246 return -1;
247 }
248 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
249 tdb_len_t len2 = tdb->transaction->block_size;
250 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
251 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
252 }
253 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
254 tdb->transaction->blocks[blk],
255 len2, 0) != 0) {
256 SAFE_FREE(tdb->transaction->blocks[blk]);
257 tdb->ecode = TDB_ERR_IO;
258 goto fail;
259 }
260 if (blk == tdb->transaction->num_blocks-1) {
261 tdb->transaction->last_block_size = len2;
262 }
263 }
264 }
265
266 /* overwrite part of an existing block */
267 if (buf == NULL) {
268 memset(tdb->transaction->blocks[blk] + off, 0, len);
269 } else {
270 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271 }
272 if (blk == tdb->transaction->num_blocks-1) {
273 if (len + off > tdb->transaction->last_block_size) {
274 tdb->transaction->last_block_size = len + off;
275 }
276 }
277
278 return 0;
279
280fail:
281 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
282 (blk*tdb->transaction->block_size) + off, len));
283 tdb->transaction->transaction_error = 1;
284 return -1;
285}
286
287
288/*
289 write while in a transaction - this varient never expands the transaction blocks, it only
290 updates existing blocks. This means it cannot change the recovery size
291*/
292static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
293 const void *buf, tdb_len_t len)
294{
295 uint32_t blk;
296
297 /* break it up into block sized chunks */
298 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
299 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
300 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
301 return -1;
302 }
303 len -= len2;
304 off += len2;
305 if (buf != NULL) {
306 buf = (const void *)(len2 + (const char *)buf);
307 }
308 }
309
310 if (len == 0) {
311 return 0;
312 }
313
314 blk = off / tdb->transaction->block_size;
315 off = off % tdb->transaction->block_size;
316
317 if (tdb->transaction->num_blocks <= blk ||
318 tdb->transaction->blocks[blk] == NULL) {
319 return 0;
320 }
321
322 if (blk == tdb->transaction->num_blocks-1 &&
323 off + len > tdb->transaction->last_block_size) {
324 if (off >= tdb->transaction->last_block_size) {
325 return 0;
326 }
327 len = tdb->transaction->last_block_size - off;
328 }
329
330 /* overwrite part of an existing block */
331 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
332
333 return 0;
334}
335
336
337/*
338 accelerated hash chain head search, using the cached hash heads
339*/
340static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
341{
342 uint32_t h = *chain;
343 for (;h < tdb->header.hash_size;h++) {
344 /* the +1 takes account of the freelist */
345 if (0 != tdb->transaction->hash_heads[h+1]) {
346 break;
347 }
348 }
349 (*chain) = h;
350}
351
352/*
353 out of bounds check during a transaction
354*/
355static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
356{
357 if (len <= tdb->map_size) {
358 return 0;
359 }
360 return TDB_ERRCODE(TDB_ERR_IO, -1);
361}
362
363/*
364 transaction version of tdb_expand().
365*/
366static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
367 tdb_off_t addition)
368{
369 /* add a write to the transaction elements, so subsequent
370 reads see the zero data */
371 if (transaction_write(tdb, size, NULL, addition) != 0) {
372 return -1;
373 }
374
375 return 0;
376}
377
378/*
379 brlock during a transaction - ignore them
380*/
381static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
382 int rw_type, int lck_type, int probe, size_t len)
383{
384 return 0;
385}
386
387static const struct tdb_methods transaction_methods = {
388 transaction_read,
389 transaction_write,
390 transaction_next_hash_chain,
391 transaction_oob,
392 transaction_expand_file,
393 transaction_brlock
394};
395
396
397/*
398 start a tdb transaction. No token is returned, as only a single
399 transaction is allowed to be pending per tdb_context
400*/
401int tdb_transaction_start(struct tdb_context *tdb)
402{
403 /* some sanity checks */
404 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
405 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
406 tdb->ecode = TDB_ERR_EINVAL;
407 return -1;
408 }
409
410 /* cope with nested tdb_transaction_start() calls */
411 if (tdb->transaction != NULL) {
412 tdb->transaction->nesting++;
413 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
414 tdb->transaction->nesting));
415 return 0;
416 }
417
418 if (tdb->num_locks != 0 || tdb->global_lock.count) {
419 /* the caller must not have any locks when starting a
420 transaction as otherwise we'll be screwed by lack
421 of nested locks in posix */
422 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
423 tdb->ecode = TDB_ERR_LOCK;
424 return -1;
425 }
426
427 if (tdb->travlocks.next != NULL) {
428 /* you cannot use transactions inside a traverse (although you can use
429 traverse inside a transaction) as otherwise you can end up with
430 deadlock */
431 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
432 tdb->ecode = TDB_ERR_LOCK;
433 return -1;
434 }
435
436 tdb->transaction = (struct tdb_transaction *)
437 calloc(sizeof(struct tdb_transaction), 1);
438 if (tdb->transaction == NULL) {
439 tdb->ecode = TDB_ERR_OOM;
440 return -1;
441 }
442
443 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
444 tdb->transaction->block_size = tdb->page_size;
445
446 /* get the transaction write lock. This is a blocking lock. As
447 discussed with Volker, there are a number of ways we could
448 make this async, which we will probably do in the future */
449 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
450 SAFE_FREE(tdb->transaction->blocks);
451 SAFE_FREE(tdb->transaction);
452 return -1;
453 }
454
455 /* get a read lock from the freelist to the end of file. This
456 is upgraded to a write lock during the commit */
457#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
458 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
459 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
460 tdb->ecode = TDB_ERR_LOCK;
461 goto fail;
462 }
463#endif
464 /* setup a copy of the hash table heads so the hash scan in
465 traverse can be fast */
466 tdb->transaction->hash_heads = (uint32_t *)
467 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
468 if (tdb->transaction->hash_heads == NULL) {
469 tdb->ecode = TDB_ERR_OOM;
470 goto fail;
471 }
472 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
473 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
474 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
475 tdb->ecode = TDB_ERR_IO;
476 goto fail;
477 }
478
479 /* make sure we know about any file expansions already done by
480 anyone else */
481 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
482 tdb->transaction->old_map_size = tdb->map_size;
483
484 /* finally hook the io methods, replacing them with
485 transaction specific methods */
486 tdb->transaction->io_methods = tdb->methods;
487 tdb->methods = &transaction_methods;
488
489 return 0;
490
491fail:
492#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
493 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
494#endif
495 tdb_transaction_unlock(tdb);
496 SAFE_FREE(tdb->transaction->blocks);
497 SAFE_FREE(tdb->transaction->hash_heads);
498 SAFE_FREE(tdb->transaction);
499 return -1;
500}
501
502
503/*
504 cancel the current transaction
505*/
506int tdb_transaction_cancel(struct tdb_context *tdb)
507{
508 int i;
509
510 if (tdb->transaction == NULL) {
511 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
512 return -1;
513 }
514
515 if (tdb->transaction->nesting != 0) {
516 tdb->transaction->transaction_error = 1;
517 tdb->transaction->nesting--;
518 return 0;
519 }
520
521 tdb->map_size = tdb->transaction->old_map_size;
522
523 /* free all the transaction blocks */
524 for (i=0;i<tdb->transaction->num_blocks;i++) {
525 if (tdb->transaction->blocks[i] != NULL) {
526 free(tdb->transaction->blocks[i]);
527 }
528 }
529 SAFE_FREE(tdb->transaction->blocks);
530
531 /* remove any global lock created during the transaction */
532 if (tdb->global_lock.count != 0) {
533 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
534 tdb->global_lock.count = 0;
535 }
536
537 /* remove any locks created during the transaction */
538 if (tdb->num_locks != 0) {
539 for (i=0;i<tdb->num_lockrecs;i++) {
540 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
541 F_UNLCK,F_SETLKW, 0, 1);
542 }
543 tdb->num_locks = 0;
544 tdb->num_lockrecs = 0;
545 SAFE_FREE(tdb->lockrecs);
546 }
547
548 /* restore the normal io methods */
549 tdb->methods = tdb->transaction->io_methods;
550
551#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
552 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
553#endif
554 tdb_transaction_unlock(tdb);
555 SAFE_FREE(tdb->transaction->hash_heads);
556 SAFE_FREE(tdb->transaction);
557
558 return 0;
559}
560
561/*
562 sync to disk
563*/
564static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
565{
566 if (fsync(tdb->fd) != 0) {
567 tdb->ecode = TDB_ERR_IO;
568 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
569 return -1;
570 }
571#ifdef HAVE_MMAP
572 if (tdb->map_ptr) {
573 tdb_off_t moffset = offset & ~(tdb->page_size-1);
574 if (msync(moffset + (char *)tdb->map_ptr,
575 length + (offset - moffset), MS_SYNC) != 0) {
576 tdb->ecode = TDB_ERR_IO;
577 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
578 strerror(errno)));
579 return -1;
580 }
581 }
582#endif
583 return 0;
584}
585
586
587/*
588 work out how much space the linearised recovery data will consume
589*/
590static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
591{
592 tdb_len_t recovery_size = 0;
593 int i;
594
595 recovery_size = sizeof(uint32_t);
596 for (i=0;i<tdb->transaction->num_blocks;i++) {
597 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
598 break;
599 }
600 if (tdb->transaction->blocks[i] == NULL) {
601 continue;
602 }
603 recovery_size += 2*sizeof(tdb_off_t);
604 if (i == tdb->transaction->num_blocks-1) {
605 recovery_size += tdb->transaction->last_block_size;
606 } else {
607 recovery_size += tdb->transaction->block_size;
608 }
609 }
610
611 return recovery_size;
612}
613
614/*
615 allocate the recovery area, or use an existing recovery area if it is
616 large enough
617*/
618static int tdb_recovery_allocate(struct tdb_context *tdb,
619 tdb_len_t *recovery_size,
620 tdb_off_t *recovery_offset,
621 tdb_len_t *recovery_max_size)
622{
623 struct list_struct rec;
624 const struct tdb_methods *methods = tdb->transaction->io_methods;
625 tdb_off_t recovery_head;
626
627 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
628 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
629 return -1;
630 }
631
632 rec.rec_len = 0;
633
634 if (recovery_head != 0 &&
635 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
636 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
637 return -1;
638 }
639
640 *recovery_size = tdb_recovery_size(tdb);
641
642 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
643 /* it fits in the existing area */
644 *recovery_max_size = rec.rec_len;
645 *recovery_offset = recovery_head;
646 return 0;
647 }
648
649 /* we need to free up the old recovery area, then allocate a
650 new one at the end of the file. Note that we cannot use
651 tdb_allocate() to allocate the new one as that might return
652 us an area that is being currently used (as of the start of
653 the transaction) */
654 if (recovery_head != 0) {
655 if (tdb_free(tdb, recovery_head, &rec) == -1) {
656 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
657 return -1;
658 }
659 }
660
661 /* the tdb_free() call might have increased the recovery size */
662 *recovery_size = tdb_recovery_size(tdb);
663
664 /* round up to a multiple of page size */
665 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
666 *recovery_offset = tdb->map_size;
667 recovery_head = *recovery_offset;
668
669 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
670 (tdb->map_size - tdb->transaction->old_map_size) +
671 sizeof(rec) + *recovery_max_size) == -1) {
672 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
673 return -1;
674 }
675
676 /* remap the file (if using mmap) */
677 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
678
679 /* we have to reset the old map size so that we don't try to expand the file
680 again in the transaction commit, which would destroy the recovery area */
681 tdb->transaction->old_map_size = tdb->map_size;
682
683 /* write the recovery header offset and sync - we can sync without a race here
684 as the magic ptr in the recovery record has not been set */
685 CONVERT(recovery_head);
686 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
687 &recovery_head, sizeof(tdb_off_t)) == -1) {
688 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
689 return -1;
690 }
691 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
692 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
693 return -1;
694 }
695
696 return 0;
697}
698
699
700/*
701 setup the recovery data that will be used on a crash during commit
702*/
703static int transaction_setup_recovery(struct tdb_context *tdb,
704 tdb_off_t *magic_offset)
705{
706 tdb_len_t recovery_size;
707 unsigned char *data, *p;
708 const struct tdb_methods *methods = tdb->transaction->io_methods;
709 struct list_struct *rec;
710 tdb_off_t recovery_offset, recovery_max_size;
711 tdb_off_t old_map_size = tdb->transaction->old_map_size;
712 uint32_t magic, tailer;
713 int i;
714
715 /*
716 check that the recovery area has enough space
717 */
718 if (tdb_recovery_allocate(tdb, &recovery_size,
719 &recovery_offset, &recovery_max_size) == -1) {
720 return -1;
721 }
722
723 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
724 if (data == NULL) {
725 tdb->ecode = TDB_ERR_OOM;
726 return -1;
727 }
728
729 rec = (struct list_struct *)data;
730 memset(rec, 0, sizeof(*rec));
731
732 rec->magic = 0;
733 rec->data_len = recovery_size;
734 rec->rec_len = recovery_max_size;
735 rec->key_len = old_map_size;
736 CONVERT(rec);
737
738 /* build the recovery data into a single blob to allow us to do a single
739 large write, which should be more efficient */
740 p = data + sizeof(*rec);
741 for (i=0;i<tdb->transaction->num_blocks;i++) {
742 tdb_off_t offset;
743 tdb_len_t length;
744
745 if (tdb->transaction->blocks[i] == NULL) {
746 continue;
747 }
748
749 offset = i * tdb->transaction->block_size;
750 length = tdb->transaction->block_size;
751 if (i == tdb->transaction->num_blocks-1) {
752 length = tdb->transaction->last_block_size;
753 }
754
755 if (offset >= old_map_size) {
756 continue;
757 }
758 if (offset + length > tdb->transaction->old_map_size) {
759 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
760 free(data);
761 tdb->ecode = TDB_ERR_CORRUPT;
762 return -1;
763 }
764 memcpy(p, &offset, 4);
765 memcpy(p+4, &length, 4);
766 if (DOCONV()) {
767 tdb_convert(p, 8);
768 }
769 /* the recovery area contains the old data, not the
770 new data, so we have to call the original tdb_read
771 method to get it */
772 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
773 free(data);
774 tdb->ecode = TDB_ERR_IO;
775 return -1;
776 }
777 p += 8 + length;
778 }
779
780 /* and the tailer */
781 tailer = sizeof(*rec) + recovery_max_size;
782 memcpy(p, &tailer, 4);
783 CONVERT(p);
784
785 /* write the recovery data to the recovery area */
786 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
787 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
788 free(data);
789 tdb->ecode = TDB_ERR_IO;
790 return -1;
791 }
792 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
793 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
794 free(data);
795 tdb->ecode = TDB_ERR_IO;
796 return -1;
797 }
798
799 /* as we don't have ordered writes, we have to sync the recovery
800 data before we update the magic to indicate that the recovery
801 data is present */
802 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
803 free(data);
804 return -1;
805 }
806
807 free(data);
808
809 magic = TDB_RECOVERY_MAGIC;
810 CONVERT(magic);
811
812 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
813
814 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
815 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
816 tdb->ecode = TDB_ERR_IO;
817 return -1;
818 }
819 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
820 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
821 tdb->ecode = TDB_ERR_IO;
822 return -1;
823 }
824
825 /* ensure the recovery magic marker is on disk */
826 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
827 return -1;
828 }
829
830 return 0;
831}
832
833/*
834 commit the current transaction
835*/
836int tdb_transaction_commit(struct tdb_context *tdb)
837{
838 const struct tdb_methods *methods;
839 tdb_off_t magic_offset = 0;
840 uint32_t zero = 0;
841 int i;
842
843 if (tdb->transaction == NULL) {
844 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
845 return -1;
846 }
847
848 if (tdb->transaction->transaction_error) {
849 tdb->ecode = TDB_ERR_IO;
850 tdb_transaction_cancel(tdb);
851 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
852 return -1;
853 }
854
855
856 if (tdb->transaction->nesting != 0) {
857 tdb->transaction->nesting--;
858 return 0;
859 }
860
861 /* check for a null transaction */
862 if (tdb->transaction->blocks == NULL) {
863 tdb_transaction_cancel(tdb);
864 return 0;
865 }
866
867 methods = tdb->transaction->io_methods;
868
869 /* if there are any locks pending then the caller has not
870 nested their locks properly, so fail the transaction */
871 if (tdb->num_locks || tdb->global_lock.count) {
872 tdb->ecode = TDB_ERR_LOCK;
873 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
874 tdb_transaction_cancel(tdb);
875 return -1;
876 }
877
878 /* upgrade the main transaction lock region to a write lock */
879#ifndef __OS2__ // YD the global lock is an exclusive lock for us, it is enough.
880 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
881 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
882 tdb->ecode = TDB_ERR_LOCK;
883 tdb_transaction_cancel(tdb);
884 return -1;
885 }
886#endif
887 /* get the global lock - this prevents new users attaching to the database
888 during the commit */
889 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
890 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
891 tdb->ecode = TDB_ERR_LOCK;
892 tdb_transaction_cancel(tdb);
893 return -1;
894 }
895
896 if (!(tdb->flags & TDB_NOSYNC)) {
897 /* write the recovery data to the end of the file */
898 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
899 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
900 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
901 tdb_transaction_cancel(tdb);
902 return -1;
903 }
904 }
905
906 /* expand the file to the new size if needed */
907 if (tdb->map_size != tdb->transaction->old_map_size) {
908 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
909 tdb->map_size -
910 tdb->transaction->old_map_size) == -1) {
911 tdb->ecode = TDB_ERR_IO;
912 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
913 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
914 tdb_transaction_cancel(tdb);
915 return -1;
916 }
917 tdb->map_size = tdb->transaction->old_map_size;
918 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
919 }
920
921 /* perform all the writes */
922 for (i=0;i<tdb->transaction->num_blocks;i++) {
923 tdb_off_t offset;
924 tdb_len_t length;
925
926 if (tdb->transaction->blocks[i] == NULL) {
927 continue;
928 }
929
930 offset = i * tdb->transaction->block_size;
931 length = tdb->transaction->block_size;
932 if (i == tdb->transaction->num_blocks-1) {
933 length = tdb->transaction->last_block_size;
934 }
935
936 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
937 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
938
939 /* we've overwritten part of the data and
940 possibly expanded the file, so we need to
941 run the crash recovery code */
942 tdb->methods = methods;
943 tdb_transaction_recover(tdb);
944
945 tdb_transaction_cancel(tdb);
946 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
947
948 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
949 return -1;
950 }
951 SAFE_FREE(tdb->transaction->blocks[i]);
952 }
953
954 SAFE_FREE(tdb->transaction->blocks);
955 tdb->transaction->num_blocks = 0;
956
957 if (!(tdb->flags & TDB_NOSYNC)) {
958 /* ensure the new data is on disk */
959 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
960 return -1;
961 }
962
963 /* remove the recovery marker */
964 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
965 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
966 return -1;
967 }
968
969 /* ensure the recovery marker has been removed on disk */
970 if (transaction_sync(tdb, magic_offset, 4) == -1) {
971 return -1;
972 }
973 }
974
975 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
976
977 /*
978 TODO: maybe write to some dummy hdr field, or write to magic
979 offset without mmap, before the last sync, instead of the
980 utime() call
981 */
982
983 /* on some systems (like Linux 2.6.x) changes via mmap/msync
984 don't change the mtime of the file, this means the file may
985 not be backed up (as tdb rounding to block sizes means that
986 file size changes are quite rare too). The following forces
987 mtime changes when a transaction completes */
988#ifdef HAVE_UTIME
989 utime(tdb->name, NULL);
990#endif
991
992 /* use a transaction cancel to free memory and remove the
993 transaction locks */
994 tdb_transaction_cancel(tdb);
995
996 return 0;
997}
998
999
1000/*
1001 recover from an aborted transaction. Must be called with exclusive
1002 database write access already established (including the global
1003 lock to prevent new processes attaching)
1004*/
1005int tdb_transaction_recover(struct tdb_context *tdb)
1006{
1007 tdb_off_t recovery_head, recovery_eof;
1008 unsigned char *data, *p;
1009 uint32_t zero = 0;
1010 struct list_struct rec;
1011
1012 /* find the recovery area */
1013 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1014 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1015 tdb->ecode = TDB_ERR_IO;
1016 return -1;
1017 }
1018
1019 if (recovery_head == 0) {
1020 /* we have never allocated a recovery record */
1021 return 0;
1022 }
1023
1024 /* read the recovery record */
1025 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1026 sizeof(rec), DOCONV()) == -1) {
1027 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1028 tdb->ecode = TDB_ERR_IO;
1029 return -1;
1030 }
1031
1032 if (rec.magic != TDB_RECOVERY_MAGIC) {
1033 /* there is no valid recovery data */
1034 return 0;
1035 }
1036
1037 if (tdb->read_only) {
1038 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1039 tdb->ecode = TDB_ERR_CORRUPT;
1040 return -1;
1041 }
1042
1043 recovery_eof = rec.key_len;
1044
1045 data = (unsigned char *)malloc(rec.data_len);
1046 if (data == NULL) {
1047 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1048 tdb->ecode = TDB_ERR_OOM;
1049 return -1;
1050 }
1051
1052 /* read the full recovery data */
1053 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1054 rec.data_len, 0) == -1) {
1055 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1056 tdb->ecode = TDB_ERR_IO;
1057 return -1;
1058 }
1059
1060 /* recover the file data */
1061 p = data;
1062 while (p+8 < data + rec.data_len) {
1063 uint32_t ofs, len;
1064 if (DOCONV()) {
1065 tdb_convert(p, 8);
1066 }
1067 memcpy(&ofs, p, 4);
1068 memcpy(&len, p+4, 4);
1069
1070 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1071 free(data);
1072 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1073 tdb->ecode = TDB_ERR_IO;
1074 return -1;
1075 }
1076 p += 8 + len;
1077 }
1078
1079 free(data);
1080
1081 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1082 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1083 tdb->ecode = TDB_ERR_IO;
1084 return -1;
1085 }
1086
1087 /* if the recovery area is after the recovered eof then remove it */
1088 if (recovery_eof <= recovery_head) {
1089 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1090 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1091 tdb->ecode = TDB_ERR_IO;
1092 return -1;
1093 }
1094 }
1095
1096 /* remove the recovery magic */
1097 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1098 &zero) == -1) {
1099 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1100 tdb->ecode = TDB_ERR_IO;
1101 return -1;
1102 }
1103
1104 /* reduce the file size to the old size */
1105 tdb_munmap(tdb);
1106 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1107 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1108 tdb->ecode = TDB_ERR_IO;
1109 return -1;
1110 }
1111 tdb->map_size = recovery_eof;
1112 tdb_mmap(tdb);
1113
1114 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1115 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1116 tdb->ecode = TDB_ERR_IO;
1117 return -1;
1118 }
1119
1120 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1121 recovery_eof));
1122
1123 /* all done */
1124 return 0;
1125}
Note: See TracBrowser for help on using the repository browser.