source: vendor/current/lib/tdb/common/transaction.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 37.4 KB
Line 
1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 2005
7
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
11
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
16
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
21
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
24*/
25
26#include "tdb_private.h"
27
28/*
29 transaction design:
30
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
35
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
40
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
45
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
50 to the real database.
51
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
55
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
58
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
62
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
66
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
70
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
77
78 - check for a valid recovery record on open of the tdb, while the
79 open lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
82 intervention.
83
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no fsync/msync calls are made. This means we
86 are still proof against a process dying during transaction commit,
87 but not against machine reboot.
88
89 - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90 tdb_add_flags() transaction nesting is enabled.
91 It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92 The default is that transaction nesting is allowed.
93 Note: this default may change in future versions of tdb.
94
95 Beware. when transactions are nested a transaction successfully
96 completed with tdb_transaction_commit() can be silently unrolled later.
97
98 - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99 tdb_add_flags() transaction nesting is disabled.
100 It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101 An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102 The default is that transaction nesting is allowed.
103 Note: this default may change in future versions of tdb.
104*/
105
106
107/*
108 hold the context of any current transaction
109*/
110struct tdb_transaction {
111 /* we keep a mirrored copy of the tdb hash heads here so
112 tdb_next_hash_chain() can operate efficiently */
113 uint32_t *hash_heads;
114
115 /* the original io methods - used to do IOs to the real db */
116 const struct tdb_methods *io_methods;
117
118 /* the list of transaction blocks. When a block is first
119 written to, it gets created in this list */
120 uint8_t **blocks;
121 uint32_t num_blocks;
122 uint32_t block_size; /* bytes in each block */
123 uint32_t last_block_size; /* number of valid bytes in the last block */
124
125 /* non-zero when an internal transaction error has
126 occurred. All write operations will then fail until the
127 transaction is ended */
128 int transaction_error;
129
130 /* when inside a transaction we need to keep track of any
131 nested tdb_transaction_start() calls, as these are allowed,
132 but don't create a new transaction */
133 int nesting;
134
135 /* set when a prepare has already occurred */
136 bool prepared;
137 tdb_off_t magic_offset;
138
139 /* old file size before transaction */
140 tdb_len_t old_map_size;
141
142 /* did we expand in this transaction */
143 bool expanded;
144};
145
146
147/*
148 read while in a transaction. We need to check first if the data is in our list
149 of transaction elements, then if not do a real read
150*/
151static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152 tdb_len_t len, int cv)
153{
154 uint32_t blk;
155
156 /* break it down into block sized ops */
157 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160 return -1;
161 }
162 len -= len2;
163 off += len2;
164 buf = (void *)(len2 + (char *)buf);
165 }
166
167 if (len == 0) {
168 return 0;
169 }
170
171 blk = off / tdb->transaction->block_size;
172
173 /* see if we have it in the block list */
174 if (tdb->transaction->num_blocks <= blk ||
175 tdb->transaction->blocks[blk] == NULL) {
176 /* nope, do a real read */
177 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178 goto fail;
179 }
180 return 0;
181 }
182
183 /* it is in the block list. Now check for the last block */
184 if (blk == tdb->transaction->num_blocks-1) {
185 if (len > tdb->transaction->last_block_size) {
186 goto fail;
187 }
188 }
189
190 /* now copy it out of this block */
191 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192 if (cv) {
193 tdb_convert(buf, len);
194 }
195 return 0;
196
197fail:
198 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199 tdb->ecode = TDB_ERR_IO;
200 tdb->transaction->transaction_error = 1;
201 return -1;
202}
203
204
205/*
206 write while in a transaction
207*/
208static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209 const void *buf, tdb_len_t len)
210{
211 uint32_t blk;
212
213 /* Only a commit is allowed on a prepared transaction */
214 if (tdb->transaction->prepared) {
215 tdb->ecode = TDB_ERR_EINVAL;
216 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
217 tdb->transaction->transaction_error = 1;
218 return -1;
219 }
220
221 /* if the write is to a hash head, then update the transaction
222 hash heads */
223 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
224 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
225 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
226 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
227 }
228
229 /* break it up into block sized chunks */
230 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
231 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
232 if (transaction_write(tdb, off, buf, len2) != 0) {
233 return -1;
234 }
235 len -= len2;
236 off += len2;
237 if (buf != NULL) {
238 buf = (const void *)(len2 + (const char *)buf);
239 }
240 }
241
242 if (len == 0) {
243 return 0;
244 }
245
246 blk = off / tdb->transaction->block_size;
247 off = off % tdb->transaction->block_size;
248
249 if (tdb->transaction->num_blocks <= blk) {
250 uint8_t **new_blocks;
251 /* expand the blocks array */
252 new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
253 (blk+1)*sizeof(uint8_t *));
254 if (new_blocks == NULL) {
255 tdb->ecode = TDB_ERR_OOM;
256 goto fail;
257 }
258 memset(&new_blocks[tdb->transaction->num_blocks], 0,
259 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
260 tdb->transaction->blocks = new_blocks;
261 tdb->transaction->num_blocks = blk+1;
262 tdb->transaction->last_block_size = 0;
263 }
264
265 /* allocate and fill a block? */
266 if (tdb->transaction->blocks[blk] == NULL) {
267 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
268 if (tdb->transaction->blocks[blk] == NULL) {
269 tdb->ecode = TDB_ERR_OOM;
270 tdb->transaction->transaction_error = 1;
271 return -1;
272 }
273 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
274 tdb_len_t len2 = tdb->transaction->block_size;
275 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
276 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
277 }
278 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
279 tdb->transaction->blocks[blk],
280 len2, 0) != 0) {
281 SAFE_FREE(tdb->transaction->blocks[blk]);
282 tdb->ecode = TDB_ERR_IO;
283 goto fail;
284 }
285 if (blk == tdb->transaction->num_blocks-1) {
286 tdb->transaction->last_block_size = len2;
287 }
288 }
289 }
290
291 /* overwrite part of an existing block */
292 if (buf == NULL) {
293 memset(tdb->transaction->blocks[blk] + off, 0, len);
294 } else {
295 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
296 }
297 if (blk == tdb->transaction->num_blocks-1) {
298 if (len + off > tdb->transaction->last_block_size) {
299 tdb->transaction->last_block_size = len + off;
300 }
301 }
302
303 return 0;
304
305fail:
306 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
307 (blk*tdb->transaction->block_size) + off, len));
308 tdb->transaction->transaction_error = 1;
309 return -1;
310}
311
312
313/*
314 write while in a transaction - this variant never expands the transaction blocks, it only
315 updates existing blocks. This means it cannot change the recovery size
316*/
317static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
318 const void *buf, tdb_len_t len)
319{
320 uint32_t blk;
321
322 /* break it up into block sized chunks */
323 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
324 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
325 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
326 return -1;
327 }
328 len -= len2;
329 off += len2;
330 if (buf != NULL) {
331 buf = (const void *)(len2 + (const char *)buf);
332 }
333 }
334
335 if (len == 0) {
336 return 0;
337 }
338
339 blk = off / tdb->transaction->block_size;
340 off = off % tdb->transaction->block_size;
341
342 if (tdb->transaction->num_blocks <= blk ||
343 tdb->transaction->blocks[blk] == NULL) {
344 return 0;
345 }
346
347 if (blk == tdb->transaction->num_blocks-1 &&
348 off + len > tdb->transaction->last_block_size) {
349 if (off >= tdb->transaction->last_block_size) {
350 return 0;
351 }
352 len = tdb->transaction->last_block_size - off;
353 }
354
355 /* overwrite part of an existing block */
356 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
357
358 return 0;
359}
360
361
362/*
363 accelerated hash chain head search, using the cached hash heads
364*/
365static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
366{
367 uint32_t h = *chain;
368 for (;h < tdb->hash_size;h++) {
369 /* the +1 takes account of the freelist */
370 if (0 != tdb->transaction->hash_heads[h+1]) {
371 break;
372 }
373 }
374 (*chain) = h;
375}
376
377/*
378 out of bounds check during a transaction
379*/
380static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
381 tdb_len_t len, int probe)
382{
383 if (off + len >= off && off + len <= tdb->map_size) {
384 return 0;
385 }
386 tdb->ecode = TDB_ERR_IO;
387 return -1;
388}
389
390/*
391 transaction version of tdb_expand().
392*/
393static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
394 tdb_off_t addition)
395{
396 /* add a write to the transaction elements, so subsequent
397 reads see the zero data */
398 if (transaction_write(tdb, size, NULL, addition) != 0) {
399 return -1;
400 }
401
402 tdb->transaction->expanded = true;
403
404 return 0;
405}
406
407static const struct tdb_methods transaction_methods = {
408 transaction_read,
409 transaction_write,
410 transaction_next_hash_chain,
411 transaction_oob,
412 transaction_expand_file,
413};
414
415
416/*
417 start a tdb transaction. No token is returned, as only a single
418 transaction is allowed to be pending per tdb_context
419*/
420static int _tdb_transaction_start(struct tdb_context *tdb,
421 enum tdb_lock_flags lockflags)
422{
423 /* some sanity checks */
424 if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
425 || tdb->traverse_read) {
426 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
427 tdb->ecode = TDB_ERR_EINVAL;
428 return -1;
429 }
430
431 /* cope with nested tdb_transaction_start() calls */
432 if (tdb->transaction != NULL) {
433 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
434 tdb->ecode = TDB_ERR_NESTING;
435 return -1;
436 }
437 tdb->transaction->nesting++;
438 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
439 tdb->transaction->nesting));
440 return 0;
441 }
442
443 if (tdb_have_extra_locks(tdb)) {
444 /* the caller must not have any locks when starting a
445 transaction as otherwise we'll be screwed by lack
446 of nested locks in posix */
447 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
448 tdb->ecode = TDB_ERR_LOCK;
449 return -1;
450 }
451
452 if (tdb->travlocks.next != NULL) {
453 /* you cannot use transactions inside a traverse (although you can use
454 traverse inside a transaction) as otherwise you can end up with
455 deadlock */
456 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
457 tdb->ecode = TDB_ERR_LOCK;
458 return -1;
459 }
460
461 tdb->transaction = (struct tdb_transaction *)
462 calloc(sizeof(struct tdb_transaction), 1);
463 if (tdb->transaction == NULL) {
464 tdb->ecode = TDB_ERR_OOM;
465 return -1;
466 }
467
468 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
469 tdb->transaction->block_size = tdb->page_size;
470
471 /* get the transaction write lock. This is a blocking lock. As
472 discussed with Volker, there are a number of ways we could
473 make this async, which we will probably do in the future */
474 if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
475 SAFE_FREE(tdb->transaction->blocks);
476 SAFE_FREE(tdb->transaction);
477 if ((lockflags & TDB_LOCK_WAIT) == 0) {
478 tdb->ecode = TDB_ERR_NOLOCK;
479 }
480 return -1;
481 }
482
483 /* get a read lock from the freelist to the end of file. This
484 is upgraded to a write lock during the commit */
485 if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
486 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
487 goto fail_allrecord_lock;
488 }
489
490 /* setup a copy of the hash table heads so the hash scan in
491 traverse can be fast */
492 tdb->transaction->hash_heads = (uint32_t *)
493 calloc(tdb->hash_size+1, sizeof(uint32_t));
494 if (tdb->transaction->hash_heads == NULL) {
495 tdb->ecode = TDB_ERR_OOM;
496 goto fail;
497 }
498 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
499 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
500 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
501 tdb->ecode = TDB_ERR_IO;
502 goto fail;
503 }
504
505 /* make sure we know about any file expansions already done by
506 anyone else */
507 tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
508 tdb->transaction->old_map_size = tdb->map_size;
509
510 /* finally hook the io methods, replacing them with
511 transaction specific methods */
512 tdb->transaction->io_methods = tdb->methods;
513 tdb->methods = &transaction_methods;
514
515 /* Trace at the end, so we get sequence number correct. */
516 tdb_trace(tdb, "tdb_transaction_start");
517 return 0;
518
519fail:
520 tdb_allrecord_unlock(tdb, F_RDLCK, false);
521fail_allrecord_lock:
522 tdb_transaction_unlock(tdb, F_WRLCK);
523 SAFE_FREE(tdb->transaction->blocks);
524 SAFE_FREE(tdb->transaction->hash_heads);
525 SAFE_FREE(tdb->transaction);
526 return -1;
527}
528
529_PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
530{
531 return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
532}
533
534_PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
535{
536 return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
537}
538
539/*
540 sync to disk
541*/
542static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
543{
544 if (tdb->flags & TDB_NOSYNC) {
545 return 0;
546 }
547
548#ifdef HAVE_FDATASYNC
549 if (fdatasync(tdb->fd) != 0) {
550#else
551 if (fsync(tdb->fd) != 0) {
552#endif
553 tdb->ecode = TDB_ERR_IO;
554 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
555 return -1;
556 }
557#ifdef HAVE_MMAP
558 if (tdb->map_ptr) {
559 tdb_off_t moffset = offset & ~(tdb->page_size-1);
560 if (msync(moffset + (char *)tdb->map_ptr,
561 length + (offset - moffset), MS_SYNC) != 0) {
562 tdb->ecode = TDB_ERR_IO;
563 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
564 strerror(errno)));
565 return -1;
566 }
567 }
568#endif
569 return 0;
570}
571
572
573static int _tdb_transaction_cancel(struct tdb_context *tdb)
574{
575 int i, ret = 0;
576
577 if (tdb->transaction == NULL) {
578 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
579 return -1;
580 }
581
582 if (tdb->transaction->nesting != 0) {
583 tdb->transaction->transaction_error = 1;
584 tdb->transaction->nesting--;
585 return 0;
586 }
587
588 tdb->map_size = tdb->transaction->old_map_size;
589
590 /* free all the transaction blocks */
591 for (i=0;i<tdb->transaction->num_blocks;i++) {
592 if (tdb->transaction->blocks[i] != NULL) {
593 free(tdb->transaction->blocks[i]);
594 }
595 }
596 SAFE_FREE(tdb->transaction->blocks);
597
598 if (tdb->transaction->magic_offset) {
599 const struct tdb_methods *methods = tdb->transaction->io_methods;
600 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
601
602 /* remove the recovery marker */
603 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
604 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
605 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
606 ret = -1;
607 }
608 }
609
610 /* This also removes the OPEN_LOCK, if we have it. */
611 tdb_release_transaction_locks(tdb);
612
613 /* restore the normal io methods */
614 tdb->methods = tdb->transaction->io_methods;
615
616 SAFE_FREE(tdb->transaction->hash_heads);
617 SAFE_FREE(tdb->transaction);
618
619 return ret;
620}
621
622/*
623 cancel the current transaction
624*/
625_PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
626{
627 tdb_trace(tdb, "tdb_transaction_cancel");
628 return _tdb_transaction_cancel(tdb);
629}
630
631/*
632 work out how much space the linearised recovery data will consume
633*/
634static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
635{
636 tdb_len_t recovery_size = 0;
637 int i;
638
639 recovery_size = sizeof(uint32_t);
640 for (i=0;i<tdb->transaction->num_blocks;i++) {
641 tdb_len_t block_size;
642 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
643 break;
644 }
645 if (tdb->transaction->blocks[i] == NULL) {
646 continue;
647 }
648 if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
649 &recovery_size)) {
650 return false;
651 }
652 if (i == tdb->transaction->num_blocks-1) {
653 block_size = tdb->transaction->last_block_size;
654 } else {
655 block_size = tdb->transaction->block_size;
656 }
657 if (!tdb_add_len_t(recovery_size, block_size,
658 &recovery_size)) {
659 return false;
660 }
661 }
662
663 *result = recovery_size;
664 return true;
665}
666
667int tdb_recovery_area(struct tdb_context *tdb,
668 const struct tdb_methods *methods,
669 tdb_off_t *recovery_offset,
670 struct tdb_record *rec)
671{
672 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
673 return -1;
674 }
675
676 if (*recovery_offset == 0) {
677 rec->rec_len = 0;
678 return 0;
679 }
680
681 if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
682 DOCONV()) == -1) {
683 return -1;
684 }
685
686 /* ignore invalid recovery regions: can happen in crash */
687 if (rec->magic != TDB_RECOVERY_MAGIC &&
688 rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
689 *recovery_offset = 0;
690 rec->rec_len = 0;
691 }
692 return 0;
693}
694
695/*
696 allocate the recovery area, or use an existing recovery area if it is
697 large enough
698*/
699static int tdb_recovery_allocate(struct tdb_context *tdb,
700 tdb_len_t *recovery_size,
701 tdb_off_t *recovery_offset,
702 tdb_len_t *recovery_max_size)
703{
704 struct tdb_record rec;
705 const struct tdb_methods *methods = tdb->transaction->io_methods;
706 tdb_off_t recovery_head, new_end;
707
708 if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
709 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
710 return -1;
711 }
712
713 if (!tdb_recovery_size(tdb, recovery_size)) {
714 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
715 "overflow recovery size\n"));
716 return -1;
717 }
718
719 /* Existing recovery area? */
720 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
721 /* it fits in the existing area */
722 *recovery_max_size = rec.rec_len;
723 *recovery_offset = recovery_head;
724 return 0;
725 }
726
727 /* If recovery area in middle of file, we need a new one. */
728 if (recovery_head == 0
729 || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
730 /* we need to free up the old recovery area, then allocate a
731 new one at the end of the file. Note that we cannot use
732 tdb_allocate() to allocate the new one as that might return
733 us an area that is being currently used (as of the start of
734 the transaction) */
735 if (recovery_head) {
736 if (tdb_free(tdb, recovery_head, &rec) == -1) {
737 TDB_LOG((tdb, TDB_DEBUG_FATAL,
738 "tdb_recovery_allocate: failed to"
739 " free previous recovery area\n"));
740 return -1;
741 }
742
743 /* the tdb_free() call might have increased
744 * the recovery size */
745 if (!tdb_recovery_size(tdb, recovery_size)) {
746 TDB_LOG((tdb, TDB_DEBUG_FATAL,
747 "tdb_recovery_allocate: "
748 "overflow recovery size\n"));
749 return -1;
750 }
751 }
752
753 /* New head will be at end of file. */
754 recovery_head = tdb->map_size;
755 }
756
757 /* Now we know where it will be. */
758 *recovery_offset = recovery_head;
759
760 /* Expand by more than we need, so we don't do it often. */
761 *recovery_max_size = tdb_expand_adjust(tdb->map_size,
762 *recovery_size,
763 tdb->page_size)
764 - sizeof(rec);
765
766 if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
767 !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
768 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
769 "overflow recovery area\n"));
770 return -1;
771 }
772
773 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
774 new_end - tdb->transaction->old_map_size)
775 == -1) {
776 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
777 return -1;
778 }
779
780 /* remap the file (if using mmap) */
781 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
782
783 /* we have to reset the old map size so that we don't try to expand the file
784 again in the transaction commit, which would destroy the recovery area */
785 tdb->transaction->old_map_size = tdb->map_size;
786
787 /* write the recovery header offset and sync - we can sync without a race here
788 as the magic ptr in the recovery record has not been set */
789 CONVERT(recovery_head);
790 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
791 &recovery_head, sizeof(tdb_off_t)) == -1) {
792 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
793 return -1;
794 }
795 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
796 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
797 return -1;
798 }
799
800 return 0;
801}
802
803
804/*
805 setup the recovery data that will be used on a crash during commit
806*/
807static int transaction_setup_recovery(struct tdb_context *tdb,
808 tdb_off_t *magic_offset)
809{
810 tdb_len_t recovery_size;
811 unsigned char *data, *p;
812 const struct tdb_methods *methods = tdb->transaction->io_methods;
813 struct tdb_record *rec;
814 tdb_off_t recovery_offset, recovery_max_size;
815 tdb_off_t old_map_size = tdb->transaction->old_map_size;
816 uint32_t magic, tailer;
817 int i;
818
819 /*
820 check that the recovery area has enough space
821 */
822 if (tdb_recovery_allocate(tdb, &recovery_size,
823 &recovery_offset, &recovery_max_size) == -1) {
824 return -1;
825 }
826
827 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
828 if (data == NULL) {
829 tdb->ecode = TDB_ERR_OOM;
830 return -1;
831 }
832
833 rec = (struct tdb_record *)data;
834 memset(rec, 0, sizeof(*rec));
835
836 rec->magic = TDB_RECOVERY_INVALID_MAGIC;
837 rec->data_len = recovery_size;
838 rec->rec_len = recovery_max_size;
839 rec->key_len = old_map_size;
840 CONVERT(*rec);
841
842 /* build the recovery data into a single blob to allow us to do a single
843 large write, which should be more efficient */
844 p = data + sizeof(*rec);
845 for (i=0;i<tdb->transaction->num_blocks;i++) {
846 tdb_off_t offset;
847 tdb_len_t length;
848
849 if (tdb->transaction->blocks[i] == NULL) {
850 continue;
851 }
852
853 offset = i * tdb->transaction->block_size;
854 length = tdb->transaction->block_size;
855 if (i == tdb->transaction->num_blocks-1) {
856 length = tdb->transaction->last_block_size;
857 }
858
859 if (offset >= old_map_size) {
860 continue;
861 }
862 if (offset + length > tdb->transaction->old_map_size) {
863 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
864 free(data);
865 tdb->ecode = TDB_ERR_CORRUPT;
866 return -1;
867 }
868 memcpy(p, &offset, 4);
869 memcpy(p+4, &length, 4);
870 if (DOCONV()) {
871 tdb_convert(p, 8);
872 }
873 /* the recovery area contains the old data, not the
874 new data, so we have to call the original tdb_read
875 method to get it */
876 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
877 free(data);
878 tdb->ecode = TDB_ERR_IO;
879 return -1;
880 }
881 p += 8 + length;
882 }
883
884 /* and the tailer */
885 tailer = sizeof(*rec) + recovery_max_size;
886 memcpy(p, &tailer, 4);
887 if (DOCONV()) {
888 tdb_convert(p, 4);
889 }
890
891 /* write the recovery data to the recovery area */
892 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
893 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
894 free(data);
895 tdb->ecode = TDB_ERR_IO;
896 return -1;
897 }
898 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
899 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
900 free(data);
901 tdb->ecode = TDB_ERR_IO;
902 return -1;
903 }
904
905 /* as we don't have ordered writes, we have to sync the recovery
906 data before we update the magic to indicate that the recovery
907 data is present */
908 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
909 free(data);
910 return -1;
911 }
912
913 free(data);
914
915 magic = TDB_RECOVERY_MAGIC;
916 CONVERT(magic);
917
918 *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
919
920 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
921 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
922 tdb->ecode = TDB_ERR_IO;
923 return -1;
924 }
925 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
926 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
927 tdb->ecode = TDB_ERR_IO;
928 return -1;
929 }
930
931 /* ensure the recovery magic marker is on disk */
932 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
933 return -1;
934 }
935
936 return 0;
937}
938
939static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
940{
941 const struct tdb_methods *methods;
942
943 if (tdb->transaction == NULL) {
944 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
945 return -1;
946 }
947
948 if (tdb->transaction->prepared) {
949 tdb->ecode = TDB_ERR_EINVAL;
950 _tdb_transaction_cancel(tdb);
951 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
952 return -1;
953 }
954
955 if (tdb->transaction->transaction_error) {
956 tdb->ecode = TDB_ERR_IO;
957 _tdb_transaction_cancel(tdb);
958 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
959 return -1;
960 }
961
962
963 if (tdb->transaction->nesting != 0) {
964 return 0;
965 }
966
967 /* check for a null transaction */
968 if (tdb->transaction->blocks == NULL) {
969 return 0;
970 }
971
972 methods = tdb->transaction->io_methods;
973
974 /* if there are any locks pending then the caller has not
975 nested their locks properly, so fail the transaction */
976 if (tdb_have_extra_locks(tdb)) {
977 tdb->ecode = TDB_ERR_LOCK;
978 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
979 _tdb_transaction_cancel(tdb);
980 return -1;
981 }
982
983 /* upgrade the main transaction lock region to a write lock */
984 if (tdb_allrecord_upgrade(tdb) == -1) {
985 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
986 _tdb_transaction_cancel(tdb);
987 return -1;
988 }
989
990 /* get the open lock - this prevents new users attaching to the database
991 during the commit */
992 if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
993 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
994 _tdb_transaction_cancel(tdb);
995 return -1;
996 }
997
998 /* write the recovery data to the end of the file */
999 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
1000 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
1001 _tdb_transaction_cancel(tdb);
1002 return -1;
1003 }
1004
1005 tdb->transaction->prepared = true;
1006
1007 /* expand the file to the new size if needed */
1008 if (tdb->map_size != tdb->transaction->old_map_size) {
1009 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1010 tdb->map_size -
1011 tdb->transaction->old_map_size) == -1) {
1012 tdb->ecode = TDB_ERR_IO;
1013 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1014 _tdb_transaction_cancel(tdb);
1015 return -1;
1016 }
1017 tdb->map_size = tdb->transaction->old_map_size;
1018 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1019 }
1020
1021 /* Keep the open lock until the actual commit */
1022
1023 return 0;
1024}
1025
1026/*
1027 prepare to commit the current transaction
1028*/
1029_PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1030{
1031 tdb_trace(tdb, "tdb_transaction_prepare_commit");
1032 return _tdb_transaction_prepare_commit(tdb);
1033}
1034
1035/* A repack is worthwhile if the largest is less than half total free. */
1036static bool repack_worthwhile(struct tdb_context *tdb)
1037{
1038 tdb_off_t ptr;
1039 struct tdb_record rec;
1040 tdb_len_t total = 0, largest = 0;
1041
1042 if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1043 return false;
1044 }
1045
1046 while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1047 total += rec.rec_len;
1048 if (rec.rec_len > largest) {
1049 largest = rec.rec_len;
1050 }
1051 ptr = rec.next;
1052 }
1053
1054 return total > largest * 2;
1055}
1056
1057/*
1058 commit the current transaction
1059*/
1060_PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1061{
1062 const struct tdb_methods *methods;
1063 int i;
1064 bool need_repack = false;
1065
1066 if (tdb->transaction == NULL) {
1067 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1068 return -1;
1069 }
1070
1071 tdb_trace(tdb, "tdb_transaction_commit");
1072
1073 if (tdb->transaction->transaction_error) {
1074 tdb->ecode = TDB_ERR_IO;
1075 _tdb_transaction_cancel(tdb);
1076 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1077 return -1;
1078 }
1079
1080
1081 if (tdb->transaction->nesting != 0) {
1082 tdb->transaction->nesting--;
1083 return 0;
1084 }
1085
1086 /* check for a null transaction */
1087 if (tdb->transaction->blocks == NULL) {
1088 _tdb_transaction_cancel(tdb);
1089 return 0;
1090 }
1091
1092 if (!tdb->transaction->prepared) {
1093 int ret = _tdb_transaction_prepare_commit(tdb);
1094 if (ret)
1095 return ret;
1096 }
1097
1098 methods = tdb->transaction->io_methods;
1099
1100 /* perform all the writes */
1101 for (i=0;i<tdb->transaction->num_blocks;i++) {
1102 tdb_off_t offset;
1103 tdb_len_t length;
1104
1105 if (tdb->transaction->blocks[i] == NULL) {
1106 continue;
1107 }
1108
1109 offset = i * tdb->transaction->block_size;
1110 length = tdb->transaction->block_size;
1111 if (i == tdb->transaction->num_blocks-1) {
1112 length = tdb->transaction->last_block_size;
1113 }
1114
1115 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1116 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1117
1118 /* we've overwritten part of the data and
1119 possibly expanded the file, so we need to
1120 run the crash recovery code */
1121 tdb->methods = methods;
1122 tdb_transaction_recover(tdb);
1123
1124 _tdb_transaction_cancel(tdb);
1125
1126 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1127 return -1;
1128 }
1129 SAFE_FREE(tdb->transaction->blocks[i]);
1130 }
1131
1132 /* Do this before we drop lock or blocks. */
1133 if (tdb->transaction->expanded) {
1134 need_repack = repack_worthwhile(tdb);
1135 }
1136
1137 SAFE_FREE(tdb->transaction->blocks);
1138 tdb->transaction->num_blocks = 0;
1139
1140 /* ensure the new data is on disk */
1141 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1142 return -1;
1143 }
1144
1145 /*
1146 TODO: maybe write to some dummy hdr field, or write to magic
1147 offset without mmap, before the last sync, instead of the
1148 utime() call
1149 */
1150
1151 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1152 don't change the mtime of the file, this means the file may
1153 not be backed up (as tdb rounding to block sizes means that
1154 file size changes are quite rare too). The following forces
1155 mtime changes when a transaction completes */
1156#ifdef HAVE_UTIME
1157 utime(tdb->name, NULL);
1158#endif
1159
1160 /* use a transaction cancel to free memory and remove the
1161 transaction locks */
1162 _tdb_transaction_cancel(tdb);
1163
1164 if (need_repack) {
1165 return tdb_repack(tdb);
1166 }
1167
1168 return 0;
1169}
1170
1171
1172/*
1173 recover from an aborted transaction. Must be called with exclusive
1174 database write access already established (including the open
1175 lock to prevent new processes attaching)
1176*/
1177int tdb_transaction_recover(struct tdb_context *tdb)
1178{
1179 tdb_off_t recovery_head, recovery_eof;
1180 unsigned char *data, *p;
1181 uint32_t zero = 0;
1182 struct tdb_record rec;
1183
1184 /* find the recovery area */
1185 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1186 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1187 tdb->ecode = TDB_ERR_IO;
1188 return -1;
1189 }
1190
1191 if (recovery_head == 0) {
1192 /* we have never allocated a recovery record */
1193 return 0;
1194 }
1195
1196 /* read the recovery record */
1197 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1198 sizeof(rec), DOCONV()) == -1) {
1199 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1200 tdb->ecode = TDB_ERR_IO;
1201 return -1;
1202 }
1203
1204 if (rec.magic != TDB_RECOVERY_MAGIC) {
1205 /* there is no valid recovery data */
1206 return 0;
1207 }
1208
1209 if (tdb->read_only) {
1210 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1211 tdb->ecode = TDB_ERR_CORRUPT;
1212 return -1;
1213 }
1214
1215 recovery_eof = rec.key_len;
1216
1217 data = (unsigned char *)malloc(rec.data_len);
1218 if (data == NULL) {
1219 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1220 tdb->ecode = TDB_ERR_OOM;
1221 return -1;
1222 }
1223
1224 /* read the full recovery data */
1225 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1226 rec.data_len, 0) == -1) {
1227 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1228 tdb->ecode = TDB_ERR_IO;
1229 return -1;
1230 }
1231
1232 /* recover the file data */
1233 p = data;
1234 while (p+8 < data + rec.data_len) {
1235 uint32_t ofs, len;
1236 if (DOCONV()) {
1237 tdb_convert(p, 8);
1238 }
1239 memcpy(&ofs, p, 4);
1240 memcpy(&len, p+4, 4);
1241
1242 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1243 free(data);
1244 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1245 tdb->ecode = TDB_ERR_IO;
1246 return -1;
1247 }
1248 p += 8 + len;
1249 }
1250
1251 free(data);
1252
1253 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1254 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1255 tdb->ecode = TDB_ERR_IO;
1256 return -1;
1257 }
1258
1259 /* if the recovery area is after the recovered eof then remove it */
1260 if (recovery_eof <= recovery_head) {
1261 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1262 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1263 tdb->ecode = TDB_ERR_IO;
1264 return -1;
1265 }
1266 }
1267
1268 /* remove the recovery magic */
1269 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1270 &zero) == -1) {
1271 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1272 tdb->ecode = TDB_ERR_IO;
1273 return -1;
1274 }
1275
1276 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1277 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1278 tdb->ecode = TDB_ERR_IO;
1279 return -1;
1280 }
1281
1282 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1283 recovery_eof));
1284
1285 /* all done */
1286 return 0;
1287}
1288
1289/* Any I/O failures we say "needs recovery". */
1290bool tdb_needs_recovery(struct tdb_context *tdb)
1291{
1292 tdb_off_t recovery_head;
1293 struct tdb_record rec;
1294
1295 /* find the recovery area */
1296 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1297 return true;
1298 }
1299
1300 if (recovery_head == 0) {
1301 /* we have never allocated a recovery record */
1302 return false;
1303 }
1304
1305 /* read the recovery record */
1306 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1307 sizeof(rec), DOCONV()) == -1) {
1308 return true;
1309 }
1310
1311 return (rec.magic == TDB_RECOVERY_MAGIC);
1312}
Note: See TracBrowser for help on using the repository browser.