source: trunk-3.0/source/tdb/common/transaction.c@ 101

Last change on this file since 101 was 61, checked in by Yuri Dario, 18 years ago

More changes for locking problems:

  • reopen semaphore handles for new processes;
  • remove locks done after using exclusive locks with semaphores;
  • more logging;
  • upgrade read locks to write (exclusive) locks before writing to file (this makes real

unlocks to fail, fixme);

File size: 32.0 KB
Line 
1 /*
2 Unix SMB/CIFS implementation.
3
4 trivial database library
5
6 Copyright (C) Andrew Tridgell 2005
7
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
10 ** under the LGPL
11
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 2 of the License, or (at your option) any later version.
16
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
21
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25*/
26
27#include "tdb_private.h"
28
29/*
30 transaction design:
31
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
36
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
41
42 - dynamically allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
46
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
51 to the real database.
52
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
56
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
59
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
63
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
67
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
71
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
78
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
83 intervention.
84
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
88
89*/
90
91int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
92 int rw_type, int lck_type, int probe, size_t len);
93
94struct tdb_transaction_el {
95 struct tdb_transaction_el *next, *prev;
96 tdb_off_t offset;
97 tdb_len_t length;
98 unsigned char *data;
99};
100
101/*
102 hold the context of any current transaction
103*/
104struct tdb_transaction {
105 /* we keep a mirrored copy of the tdb hash heads here so
106 tdb_next_hash_chain() can operate efficiently */
107 u32 *hash_heads;
108
109 /* the original io methods - used to do IOs to the real db */
110 const struct tdb_methods *io_methods;
111
112 /* the list of transaction elements. We use a doubly linked
113 list with a last pointer to allow us to keep the list
114 ordered, with first element at the front of the list. It
115 needs to be doubly linked as the read/write traversals need
116 to be backwards, while the commit needs to be forwards */
117 struct tdb_transaction_el *elements, *elements_last;
118
119 /* non-zero when an internal transaction error has
120 occurred. All write operations will then fail until the
121 transaction is ended */
122 int transaction_error;
123
124 /* when inside a transaction we need to keep track of any
125 nested tdb_transaction_start() calls, as these are allowed,
126 but don't create a new transaction */
127 int nesting;
128
129 /* old file size before transaction */
130 tdb_len_t old_map_size;
131};
132
133
134/*
135 read while in a transaction. We need to check first if the data is in our list
136 of transaction elements, then if not do a real read
137*/
138static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
139 tdb_len_t len, int cv)
140{
141 struct tdb_transaction_el *el;
142
143 /* we need to walk the list backwards to get the most recent data */
144 for (el=tdb->transaction->elements_last;el;el=el->prev) {
145 tdb_len_t partial;
146
147 if (off+len <= el->offset) {
148 continue;
149 }
150 if (off >= el->offset + el->length) {
151 continue;
152 }
153
154 /* an overlapping read - needs to be split into up to
155 2 reads and a memcpy */
156 if (off < el->offset) {
157 partial = el->offset - off;
158 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
159 goto fail;
160 }
161 len -= partial;
162 off += partial;
163 buf = (void *)(partial + (char *)buf);
164 }
165 if (off + len <= el->offset + el->length) {
166 partial = len;
167 } else {
168 partial = el->offset + el->length - off;
169 }
170 memcpy(buf, el->data + (off - el->offset), partial);
171 if (cv) {
172 tdb_convert(buf, len);
173 }
174 len -= partial;
175 off += partial;
176 buf = (void *)(partial + (char *)buf);
177
178 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
179 goto fail;
180 }
181
182 return 0;
183 }
184
185 /* its not in the transaction elements - do a real read */
186 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
187
188fail:
189 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
190 tdb->ecode = TDB_ERR_IO;
191 tdb->transaction->transaction_error = 1;
192 return -1;
193}
194
195
196/*
197 write while in a transaction
198*/
199static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
200 const void *buf, tdb_len_t len)
201{
202 struct tdb_transaction_el *el, *best_el=NULL;
203
204 if (len == 0) {
205 return 0;
206 }
207
208 /* if the write is to a hash head, then update the transaction
209 hash heads */
210 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
211 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
212 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
213 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
214 }
215
216 /* first see if we can replace an existing entry */
217 for (el=tdb->transaction->elements_last;el;el=el->prev) {
218 tdb_len_t partial;
219
220 if (best_el == NULL && off == el->offset+el->length) {
221 best_el = el;
222 }
223
224 if (off+len <= el->offset) {
225 continue;
226 }
227 if (off >= el->offset + el->length) {
228 continue;
229 }
230
231 /* an overlapping write - needs to be split into up to
232 2 writes and a memcpy */
233 if (off < el->offset) {
234 partial = el->offset - off;
235 if (transaction_write(tdb, off, buf, partial) != 0) {
236 goto fail;
237 }
238 len -= partial;
239 off += partial;
240 buf = (const void *)(partial + (const char *)buf);
241 }
242 if (off + len <= el->offset + el->length) {
243 partial = len;
244 } else {
245 partial = el->offset + el->length - off;
246 }
247 memcpy(el->data + (off - el->offset), buf, partial);
248 len -= partial;
249 off += partial;
250 buf = (const void *)(partial + (const char *)buf);
251
252 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
253 goto fail;
254 }
255
256 return 0;
257 }
258
259 /* see if we can append the new entry to an existing entry */
260 if (best_el && best_el->offset + best_el->length == off &&
261 (off+len < tdb->transaction->old_map_size ||
262 off > tdb->transaction->old_map_size)) {
263 unsigned char *data = best_el->data;
264 el = best_el;
265 el->data = (unsigned char *)realloc(el->data,
266 el->length + len);
267 if (el->data == NULL) {
268 tdb->ecode = TDB_ERR_OOM;
269 tdb->transaction->transaction_error = 1;
270 el->data = data;
271 return -1;
272 }
273 if (buf) {
274 memcpy(el->data + el->length, buf, len);
275 } else {
276 memset(el->data + el->length, TDB_PAD_BYTE, len);
277 }
278 el->length += len;
279 return 0;
280 }
281
282 /* add a new entry at the end of the list */
283 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
284 if (el == NULL) {
285 tdb->ecode = TDB_ERR_OOM;
286 tdb->transaction->transaction_error = 1;
287 return -1;
288 }
289 el->next = NULL;
290 el->prev = tdb->transaction->elements_last;
291 el->offset = off;
292 el->length = len;
293 el->data = (unsigned char *)malloc(len);
294 if (el->data == NULL) {
295 free(el);
296 tdb->ecode = TDB_ERR_OOM;
297 tdb->transaction->transaction_error = 1;
298 return -1;
299 }
300 if (buf) {
301 memcpy(el->data, buf, len);
302 } else {
303 memset(el->data, TDB_PAD_BYTE, len);
304 }
305 if (el->prev) {
306 el->prev->next = el;
307 } else {
308 tdb->transaction->elements = el;
309 }
310 tdb->transaction->elements_last = el;
311 return 0;
312
313fail:
314 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
315 tdb->ecode = TDB_ERR_IO;
316 tdb->transaction->transaction_error = 1;
317 return -1;
318}
319
320/*
321 accelerated hash chain head search, using the cached hash heads
322*/
323static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
324{
325 u32 h = *chain;
326 for (;h < tdb->header.hash_size;h++) {
327 /* the +1 takes account of the freelist */
328 if (0 != tdb->transaction->hash_heads[h+1]) {
329 break;
330 }
331 }
332 (*chain) = h;
333}
334
335/*
336 out of bounds check during a transaction
337*/
338static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
339{
340 if (len <= tdb->map_size) {
341 return 0;
342 }
343 return TDB_ERRCODE(TDB_ERR_IO, -1);
344}
345
346/*
347 transaction version of tdb_expand().
348*/
349static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
350 tdb_off_t addition)
351{
352 /* add a write to the transaction elements, so subsequent
353 reads see the zero data */
354 if (transaction_write(tdb, size, NULL, addition) != 0) {
355 return -1;
356 }
357
358 return 0;
359}
360
361/*
362 brlock during a transaction - ignore them
363*/
364int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
365 int rw_type, int lck_type, int probe, size_t len)
366{
367 return 0;
368}
369
370static const struct tdb_methods transaction_methods = {
371 transaction_read,
372 transaction_write,
373 transaction_next_hash_chain,
374 transaction_oob,
375 transaction_expand_file,
376 transaction_brlock
377};
378
379
380/*
381 start a tdb transaction. No token is returned, as only a single
382 transaction is allowed to be pending per tdb_context
383*/
384int tdb_transaction_start(struct tdb_context *tdb)
385{
386 /* some sanity checks */
387 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
388 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
389 tdb->ecode = TDB_ERR_EINVAL;
390 return -1;
391 }
392
393 /* cope with nested tdb_transaction_start() calls */
394 if (tdb->transaction != NULL) {
395 tdb->transaction->nesting++;
396 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
397 tdb->transaction->nesting));
398 return 0;
399 }
400
401 if (tdb->num_locks != 0 || tdb->global_lock.count) {
402 /* the caller must not have any locks when starting a
403 transaction as otherwise we'll be screwed by lack
404 of nested locks in posix */
405 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
406 tdb->ecode = TDB_ERR_LOCK;
407 return -1;
408 }
409
410 if (tdb->travlocks.next != NULL) {
411 /* you cannot use transactions inside a traverse (although you can use
412 traverse inside a transaction) as otherwise you can end up with
413 deadlock */
414 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
415 tdb->ecode = TDB_ERR_LOCK;
416 return -1;
417 }
418
419 tdb->transaction = (struct tdb_transaction *)
420 calloc(sizeof(struct tdb_transaction), 1);
421 if (tdb->transaction == NULL) {
422 tdb->ecode = TDB_ERR_OOM;
423 return -1;
424 }
425
426 /* get the transaction write lock. This is a blocking lock. As
427 discussed with Volker, there are a number of ways we could
428 make this async, which we will probably do in the future */
429 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
430 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
431 tdb->ecode = TDB_ERR_LOCK;
432 SAFE_FREE(tdb->transaction);
433 return -1;
434 }
435
436 /* get a read lock from the freelist to the end of file. This
437 is upgraded to a write lock during the commit */
438#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
439 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
440 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
441 tdb->ecode = TDB_ERR_LOCK;
442 goto fail;
443 }
444#endif
445
446 /* setup a copy of the hash table heads so the hash scan in
447 traverse can be fast */
448 tdb->transaction->hash_heads = (u32 *)
449 calloc(tdb->header.hash_size+1, sizeof(u32));
450 if (tdb->transaction->hash_heads == NULL) {
451 tdb->ecode = TDB_ERR_OOM;
452 goto fail;
453 }
454 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
455 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
456 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
457 tdb->ecode = TDB_ERR_IO;
458 goto fail;
459 }
460
461 /* make sure we know about any file expansions already done by
462 anyone else */
463 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
464 tdb->transaction->old_map_size = tdb->map_size;
465
466 /* finally hook the io methods, replacing them with
467 transaction specific methods */
468 tdb->transaction->io_methods = tdb->methods;
469 tdb->methods = &transaction_methods;
470
471 /* by calling this transaction write here, we ensure that we don't grow the
472 transaction linked list due to hash table updates */
473 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
474 TDB_HASHTABLE_SIZE(tdb)) != 0) {
475 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
476 tdb->ecode = TDB_ERR_IO;
477 goto fail;
478 }
479
480 return 0;
481
482fail:
483#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
484 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
485#endif
486 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
487 SAFE_FREE(tdb->transaction->hash_heads);
488 SAFE_FREE(tdb->transaction);
489 return -1;
490}
491
492
493/*
494 cancel the current transaction
495*/
496int tdb_transaction_cancel(struct tdb_context *tdb)
497{
498 if (tdb->transaction == NULL) {
499 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
500 return -1;
501 }
502
503 if (tdb->transaction->nesting != 0) {
504 tdb->transaction->transaction_error = 1;
505 tdb->transaction->nesting--;
506 return 0;
507 }
508
509 tdb->map_size = tdb->transaction->old_map_size;
510
511 /* free all the transaction elements */
512 while (tdb->transaction->elements) {
513 struct tdb_transaction_el *el = tdb->transaction->elements;
514 tdb->transaction->elements = el->next;
515 free(el->data);
516 free(el);
517 }
518
519 /* remove any global lock created during the transaction */
520 if (tdb->global_lock.count != 0) {
521 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
522 tdb->global_lock.count = 0;
523 }
524
525 /* remove any locks created during the transaction */
526 if (tdb->num_locks != 0) {
527 int i;
528 for (i=0;i<tdb->num_lockrecs;i++) {
529 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
530 F_UNLCK,F_SETLKW, 0, 1);
531 }
532 tdb->num_locks = 0;
533 tdb->num_lockrecs = 0;
534 SAFE_FREE(tdb->lockrecs);
535 }
536
537 /* restore the normal io methods */
538 tdb->methods = tdb->transaction->io_methods;
539
540#ifndef __OS2__ // YD the transation lock is an exclusive lock for us, it is enough.
541 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
542#endif
543 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
544 SAFE_FREE(tdb->transaction->hash_heads);
545 SAFE_FREE(tdb->transaction);
546
547 return 0;
548}
549
550/*
551 sync to disk
552*/
553static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
554{
555 if (fsync(tdb->fd) != 0) {
556 tdb->ecode = TDB_ERR_IO;
557 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
558 return -1;
559 }
560#ifdef MS_SYNC
561 if (tdb->map_ptr) {
562 tdb_off_t moffset = offset & ~(tdb->page_size-1);
563 if (msync(moffset + (char *)tdb->map_ptr,
564 length + (offset - moffset), MS_SYNC) != 0) {
565 tdb->ecode = TDB_ERR_IO;
566 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
567 strerror(errno)));
568 return -1;
569 }
570 }
571#endif
572 return 0;
573}
574
575
576/*
577 work out how much space the linearised recovery data will consume
578*/
579static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
580{
581 struct tdb_transaction_el *el;
582 tdb_len_t recovery_size = 0;
583
584 recovery_size = sizeof(u32);
585 for (el=tdb->transaction->elements;el;el=el->next) {
586 if (el->offset >= tdb->transaction->old_map_size) {
587 continue;
588 }
589 recovery_size += 2*sizeof(tdb_off_t) + el->length;
590 }
591
592 return recovery_size;
593}
594
595/*
596 allocate the recovery area, or use an existing recovery area if it is
597 large enough
598*/
599static int tdb_recovery_allocate(struct tdb_context *tdb,
600 tdb_len_t *recovery_size,
601 tdb_off_t *recovery_offset,
602 tdb_len_t *recovery_max_size)
603{
604 struct list_struct rec;
605 const struct tdb_methods *methods = tdb->transaction->io_methods;
606 tdb_off_t recovery_head;
607
608 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
609 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
610 return -1;
611 }
612
613 rec.rec_len = 0;
614
615 if (recovery_head != 0 &&
616 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
617 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
618 return -1;
619 }
620
621 *recovery_size = tdb_recovery_size(tdb);
622
623 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
624 /* it fits in the existing area */
625 *recovery_max_size = rec.rec_len;
626 *recovery_offset = recovery_head;
627 return 0;
628 }
629
630 /* we need to free up the old recovery area, then allocate a
631 new one at the end of the file. Note that we cannot use
632 tdb_allocate() to allocate the new one as that might return
633 us an area that is being currently used (as of the start of
634 the transaction) */
635 if (recovery_head != 0) {
636 if (tdb_free(tdb, recovery_head, &rec) == -1) {
637 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
638 return -1;
639 }
640 }
641
642 /* the tdb_free() call might have increased the recovery size */
643 *recovery_size = tdb_recovery_size(tdb);
644
645 /* round up to a multiple of page size */
646 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
647 *recovery_offset = tdb->map_size;
648 recovery_head = *recovery_offset;
649
650 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
651 (tdb->map_size - tdb->transaction->old_map_size) +
652 sizeof(rec) + *recovery_max_size) == -1) {
653 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
654 return -1;
655 }
656
657 /* remap the file (if using mmap) */
658 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
659
660 /* we have to reset the old map size so that we don't try to expand the file
661 again in the transaction commit, which would destroy the recovery area */
662 tdb->transaction->old_map_size = tdb->map_size;
663
664 /* write the recovery header offset and sync - we can sync without a race here
665 as the magic ptr in the recovery record has not been set */
666 CONVERT(recovery_head);
667 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
668 &recovery_head, sizeof(tdb_off_t)) == -1) {
669 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
670 return -1;
671 }
672
673 return 0;
674}
675
676
677/*
678 setup the recovery data that will be used on a crash during commit
679*/
680static int transaction_setup_recovery(struct tdb_context *tdb,
681 tdb_off_t *magic_offset)
682{
683 struct tdb_transaction_el *el;
684 tdb_len_t recovery_size;
685 unsigned char *data, *p;
686 const struct tdb_methods *methods = tdb->transaction->io_methods;
687 struct list_struct *rec;
688 tdb_off_t recovery_offset, recovery_max_size;
689 tdb_off_t old_map_size = tdb->transaction->old_map_size;
690 u32 magic, tailer;
691
692 /*
693 check that the recovery area has enough space
694 */
695 if (tdb_recovery_allocate(tdb, &recovery_size,
696 &recovery_offset, &recovery_max_size) == -1) {
697 return -1;
698 }
699
700 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
701 if (data == NULL) {
702 tdb->ecode = TDB_ERR_OOM;
703 return -1;
704 }
705
706 rec = (struct list_struct *)data;
707 memset(rec, 0, sizeof(*rec));
708
709 rec->magic = 0;
710 rec->data_len = recovery_size;
711 rec->rec_len = recovery_max_size;
712 rec->key_len = old_map_size;
713 CONVERT(rec);
714
715 /* build the recovery data into a single blob to allow us to do a single
716 large write, which should be more efficient */
717 p = data + sizeof(*rec);
718 for (el=tdb->transaction->elements;el;el=el->next) {
719 if (el->offset >= old_map_size) {
720 continue;
721 }
722 if (el->offset + el->length > tdb->transaction->old_map_size) {
723 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
724 free(data);
725 tdb->ecode = TDB_ERR_CORRUPT;
726 return -1;
727 }
728 memcpy(p, &el->offset, 4);
729 memcpy(p+4, &el->length, 4);
730 if (DOCONV()) {
731 tdb_convert(p, 8);
732 }
733 /* the recovery area contains the old data, not the
734 new data, so we have to call the original tdb_read
735 method to get it */
736 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
737 free(data);
738 tdb->ecode = TDB_ERR_IO;
739 return -1;
740 }
741 p += 8 + el->length;
742 }
743
744 /* and the tailer */
745 tailer = sizeof(*rec) + recovery_max_size;
746 memcpy(p, &tailer, 4);
747 CONVERT(p);
748
749 /* write the recovery data to the recovery area */
750 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
751 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
752 free(data);
753 tdb->ecode = TDB_ERR_IO;
754 return -1;
755 }
756
757 /* as we don't have ordered writes, we have to sync the recovery
758 data before we update the magic to indicate that the recovery
759 data is present */
760 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
761 free(data);
762 return -1;
763 }
764
765 free(data);
766
767 magic = TDB_RECOVERY_MAGIC;
768 CONVERT(magic);
769
770 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
771
772 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
773 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
774 tdb->ecode = TDB_ERR_IO;
775 return -1;
776 }
777
778 /* ensure the recovery magic marker is on disk */
779 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
780 return -1;
781 }
782
783 return 0;
784}
785
786/*
787 commit the current transaction
788*/
789int tdb_transaction_commit(struct tdb_context *tdb)
790{
791 const struct tdb_methods *methods;
792 tdb_off_t magic_offset = 0;
793 u32 zero = 0;
794
795 if (tdb->transaction == NULL) {
796 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
797 return -1;
798 }
799
800 if (tdb->transaction->transaction_error) {
801 tdb->ecode = TDB_ERR_IO;
802 tdb_transaction_cancel(tdb);
803 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
804 return -1;
805 }
806
807 if (tdb->transaction->nesting != 0) {
808 tdb->transaction->nesting--;
809 return 0;
810 }
811
812 /* check for a null transaction */
813 if (tdb->transaction->elements == NULL) {
814 tdb_transaction_cancel(tdb);
815 return 0;
816 }
817
818 methods = tdb->transaction->io_methods;
819
820 /* if there are any locks pending then the caller has not
821 nested their locks properly, so fail the transaction */
822 if (tdb->num_locks || tdb->global_lock.count) {
823 tdb->ecode = TDB_ERR_LOCK;
824 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
825 tdb_transaction_cancel(tdb);
826 return -1;
827 }
828
829 /* upgrade the main transaction lock region to a write lock */
830#ifndef __OS2__ // YD the global lock is an exclusive lock for us, it is enough.
831 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
832 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
833 tdb->ecode = TDB_ERR_LOCK;
834 tdb_transaction_cancel(tdb);
835 return -1;
836 }
837#endif
838
839 /* get the global lock - this prevents new users attaching to the database
840 during the commit */
841 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
842 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
843 tdb->ecode = TDB_ERR_LOCK;
844 tdb_transaction_cancel(tdb);
845 return -1;
846 }
847
848 if (!(tdb->flags & TDB_NOSYNC)) {
849 /* write the recovery data to the end of the file */
850 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
851 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
852 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
853 tdb_transaction_cancel(tdb);
854 return -1;
855 }
856 }
857
858 /* expand the file to the new size if needed */
859 if (tdb->map_size != tdb->transaction->old_map_size) {
860 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
861 tdb->map_size -
862 tdb->transaction->old_map_size) == -1) {
863 tdb->ecode = TDB_ERR_IO;
864 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
865 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
866 tdb_transaction_cancel(tdb);
867 return -1;
868 }
869 tdb->map_size = tdb->transaction->old_map_size;
870 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
871 }
872
873 /* perform all the writes */
874 while (tdb->transaction->elements) {
875 struct tdb_transaction_el *el = tdb->transaction->elements;
876
877 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
878 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
879
880 /* we've overwritten part of the data and
881 possibly expanded the file, so we need to
882 run the crash recovery code */
883 tdb->methods = methods;
884 tdb_transaction_recover(tdb);
885
886 tdb_transaction_cancel(tdb);
887 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
888
889 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
890 return -1;
891 }
892 tdb->transaction->elements = el->next;
893 free(el->data);
894 free(el);
895 }
896
897 if (!(tdb->flags & TDB_NOSYNC)) {
898 /* ensure the new data is on disk */
899 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
900 return -1;
901 }
902
903 /* remove the recovery marker */
904 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
905 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
906 return -1;
907 }
908
909 /* ensure the recovery marker has been removed on disk */
910 if (transaction_sync(tdb, magic_offset, 4) == -1) {
911 return -1;
912 }
913 }
914
915 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
916
917 /*
918 TODO: maybe write to some dummy hdr field, or write to magic
919 offset without mmap, before the last sync, instead of the
920 utime() call
921 */
922
923 /* on some systems (like Linux 2.6.x) changes via mmap/msync
924 don't change the mtime of the file, this means the file may
925 not be backed up (as tdb rounding to block sizes means that
926 file size changes are quite rare too). The following forces
927 mtime changes when a transaction completes */
928#ifdef HAVE_UTIME
929 utime(tdb->name, NULL);
930#endif
931
932 /* use a transaction cancel to free memory and remove the
933 transaction locks */
934 tdb_transaction_cancel(tdb);
935 return 0;
936}
937
938
939/*
940 recover from an aborted transaction. Must be called with exclusive
941 database write access already established (including the global
942 lock to prevent new processes attaching)
943*/
944int tdb_transaction_recover(struct tdb_context *tdb)
945{
946 tdb_off_t recovery_head, recovery_eof;
947 unsigned char *data, *p;
948 u32 zero = 0;
949 struct list_struct rec;
950
951 /* find the recovery area */
952 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
953 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
954 tdb->ecode = TDB_ERR_IO;
955 return -1;
956 }
957
958 if (recovery_head == 0) {
959 /* we have never allocated a recovery record */
960 return 0;
961 }
962
963 /* read the recovery record */
964 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
965 sizeof(rec), DOCONV()) == -1) {
966 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
967 tdb->ecode = TDB_ERR_IO;
968 return -1;
969 }
970
971 if (rec.magic != TDB_RECOVERY_MAGIC) {
972 /* there is no valid recovery data */
973 return 0;
974 }
975
976 if (tdb->read_only) {
977 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
978 tdb->ecode = TDB_ERR_CORRUPT;
979 return -1;
980 }
981
982 recovery_eof = rec.key_len;
983
984 data = (unsigned char *)malloc(rec.data_len);
985 if (data == NULL) {
986 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
987 tdb->ecode = TDB_ERR_OOM;
988 return -1;
989 }
990
991 /* read the full recovery data */
992 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
993 rec.data_len, 0) == -1) {
994 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
995 tdb->ecode = TDB_ERR_IO;
996 return -1;
997 }
998
999 /* recover the file data */
1000 p = data;
1001 while (p+8 < data + rec.data_len) {
1002 u32 ofs, len;
1003 if (DOCONV()) {
1004 tdb_convert(p, 8);
1005 }
1006 memcpy(&ofs, p, 4);
1007 memcpy(&len, p+4, 4);
1008
1009 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1010 free(data);
1011 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1012 tdb->ecode = TDB_ERR_IO;
1013 return -1;
1014 }
1015 p += 8 + len;
1016 }
1017
1018 free(data);
1019
1020 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1021 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1022 tdb->ecode = TDB_ERR_IO;
1023 return -1;
1024 }
1025
1026 /* if the recovery area is after the recovered eof then remove it */
1027 if (recovery_eof <= recovery_head) {
1028 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1029 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1030 tdb->ecode = TDB_ERR_IO;
1031 return -1;
1032 }
1033 }
1034
1035 /* remove the recovery magic */
1036 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1037 &zero) == -1) {
1038 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1039 tdb->ecode = TDB_ERR_IO;
1040 return -1;
1041 }
1042
1043 /* reduce the file size to the old size */
1044 tdb_munmap(tdb);
1045 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1046 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1047 tdb->ecode = TDB_ERR_IO;
1048 return -1;
1049 }
1050 tdb->map_size = recovery_eof;
1051 tdb_mmap(tdb);
1052
1053 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1054 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1055 tdb->ecode = TDB_ERR_IO;
1056 return -1;
1057 }
1058
1059 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1060 recovery_eof));
1061
1062 /* all done */
1063 return 0;
1064}
Note: See TracBrowser for help on using the repository browser.