source: vendor/3.5.0/source3/lib/dbwrap_ctdb.c

Last change on this file was 414, checked in by Herwig Bauernfeind, 16 years ago

Samba 3.5.0: Initial import

File size: 34.5 KB
Line 
1/*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "includes.h"
21#ifdef CLUSTER_SUPPORT
22#include "ctdb.h"
23#include "ctdb_private.h"
24#include "ctdbd_conn.h"
25
26struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
29 /*
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
33 */
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
38};
39
40struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
45};
46
47struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
50};
51
52static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
56
57static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58{
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
61
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
72 }
73
74 return status;
75}
76
77
78/**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
81 */
82static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
87{
88 TDB_DATA rec;
89 NTSTATUS status;
90
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
96 }
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
100 }
101 goto done;
102 }
103
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106 }
107
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
120 }
121 }
122 }
123
124 status = NT_STATUS_OK;
125
126done:
127 SAFE_FREE(rec.dptr);
128 return status;
129}
130
131/*
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
134 */
135static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
139{
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
143
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
150 }
151
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157 talloc_free(tmp_ctx);
158
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162}
163
164/*
165 form a ctdb_rec_data record from a key/data pair
166
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
169 */
170static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
174{
175 size_t length;
176 struct ctdb_rec_data *d;
177
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
183 }
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195 }
196 return d;
197}
198
199
200/* helper function for marshalling multiple records */
201static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
208{
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
212
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
217 }
218
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
224 }
225 m->db_id = db_id;
226 }
227
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
230
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
236 }
237
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240 m2->count++;
241
242done:
243 talloc_free(r);
244 return m2;
245}
246
247/* we've finished marshalling, return a data blob with the marshalled records */
248static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249{
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
254}
255
256/*
257 loop over a marshalling buffer
258
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
261*/
262static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
266{
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271 }
272
273 if (reqid != NULL) {
274 *reqid = r->reqid;
275 }
276
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
280 }
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
287 }
288 }
289
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
293 }
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295 }
296
297 return r;
298}
299
300
301static int32_t db_ctdb_transaction_active(uint32_t db_id)
302{
303 int32_t status;
304 NTSTATUS ret;
305 TDB_DATA indata;
306
307 indata.dptr = (uint8_t *)&db_id;
308 indata.dsize = sizeof(db_id);
309
310 ret = ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312 indata, NULL, NULL, &status);
313
314 if (!NT_STATUS_IS_OK(ret)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316 return -1;
317 }
318
319 return status;
320}
321
322
323/**
324 * CTDB transaction destructor
325 */
326static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
327{
328 tdb_transaction_cancel(h->ctx->wtdb->tdb);
329 return 0;
330}
331
332/**
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
336 */
337static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
338{
339 struct db_record *rh;
340 struct db_ctdb_rec *crec;
341 TDB_DATA key;
342 TALLOC_CTX *tmp_ctx;
343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344 int ret;
345 struct db_ctdb_ctx *ctx = h->ctx;
346 TDB_DATA data;
347 pid_t pid;
348 NTSTATUS status;
349 struct ctdb_ltdb_header header;
350 int32_t transaction_status;
351
352 key.dptr = (uint8_t *)discard_const(keyname);
353 key.dsize = strlen(keyname);
354
355again:
356 tmp_ctx = talloc_new(h);
357
358 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359 if (rh == NULL) {
360 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx);
362 return -1;
363 }
364 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
365
366 transaction_status = db_ctdb_transaction_active(ctx->db_id);
367 if (transaction_status == 1) {
368 unsigned long int usec = (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
371 ctx->db_id, usec));
372 talloc_free(tmp_ctx);
373 usleep(usec);
374 goto again;
375 }
376
377 /*
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
380 */
381 pid = getpid();
382 data.dptr = (unsigned char *)&pid;
383 data.dsize = sizeof(pid_t);
384 crec->header.rsn++;
385 crec->header.dmaster = get_my_vnn();
386 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
387 if (!NT_STATUS_IS_OK(status)) {
388 DEBUG(0, (__location__ " Failed to store pid in transaction "
389 "record: %s\n", nt_errstr(status)));
390 talloc_free(tmp_ctx);
391 return -1;
392 }
393
394 talloc_free(rh);
395
396 ret = tdb_transaction_start(ctx->wtdb->tdb);
397 if (ret != 0) {
398 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
399 talloc_free(tmp_ctx);
400 return -1;
401 }
402
403 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
404 if (!NT_STATUS_IS_OK(status)) {
405 DEBUG(0, (__location__ " failed to refetch transaction lock "
406 "record inside transaction: %s - retrying\n",
407 nt_errstr(status)));
408 tdb_transaction_cancel(ctx->wtdb->tdb);
409 talloc_free(tmp_ctx);
410 goto again;
411 }
412
413 if (header.dmaster != get_my_vnn()) {
414 DEBUG(3, (__location__ " refetch transaction lock record : "
415 "we are not dmaster any more "
416 "(dmaster[%u] != my_vnn[%u]) - retrying\n",
417 header.dmaster, get_my_vnn()));
418 tdb_transaction_cancel(ctx->wtdb->tdb);
419 talloc_free(tmp_ctx);
420 goto again;
421 }
422
423 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
424 DEBUG(3, (__location__ " refetch transaction lock record: "
425 "another local process has started a transaction "
426 "(stored pid [%u] != my pid [%u]) - retrying\n",
427 *(pid_t *)(data.dptr), pid));
428 tdb_transaction_cancel(ctx->wtdb->tdb);
429 talloc_free(tmp_ctx);
430 goto again;
431 }
432
433 talloc_free(tmp_ctx);
434
435 return 0;
436}
437
438
439/**
440 * CTDB dbwrap API: transaction_start function
441 * starts a transaction on a persistent database
442 */
443static int db_ctdb_transaction_start(struct db_context *db)
444{
445 struct db_ctdb_transaction_handle *h;
446 int ret;
447 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
448 struct db_ctdb_ctx);
449
450 if (!db->persistent) {
451 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
452 ctx->db_id));
453 return -1;
454 }
455
456 if (ctx->transaction) {
457 ctx->transaction->nesting++;
458 return 0;
459 }
460
461 h = talloc_zero(db, struct db_ctdb_transaction_handle);
462 if (h == NULL) {
463 DEBUG(0,(__location__ " oom for transaction handle\n"));
464 return -1;
465 }
466
467 h->ctx = ctx;
468
469 ret = db_ctdb_transaction_fetch_start(h);
470 if (ret != 0) {
471 talloc_free(h);
472 return -1;
473 }
474
475 talloc_set_destructor(h, db_ctdb_transaction_destructor);
476
477 ctx->transaction = h;
478
479 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
480
481 return 0;
482}
483
484
485
486/*
487 fetch a record inside a transaction
488 */
489static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
490 TALLOC_CTX *mem_ctx,
491 TDB_DATA key, TDB_DATA *data)
492{
493 struct db_ctdb_transaction_handle *h = db->transaction;
494 NTSTATUS status;
495
496 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
497
498 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
499 *data = tdb_null;
500 } else if (!NT_STATUS_IS_OK(status)) {
501 return -1;
502 }
503
504 if (!h->in_replay) {
505 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
506 if (h->m_all == NULL) {
507 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
508 data->dsize = 0;
509 talloc_free(data->dptr);
510 return -1;
511 }
512 }
513
514 return 0;
515}
516
517
518static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
519static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
520
521static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
522 TALLOC_CTX *mem_ctx,
523 TDB_DATA key)
524{
525 struct db_record *result;
526 TDB_DATA ctdb_data;
527
528 if (!(result = talloc(mem_ctx, struct db_record))) {
529 DEBUG(0, ("talloc failed\n"));
530 return NULL;
531 }
532
533 result->private_data = ctx->transaction;
534
535 result->key.dsize = key.dsize;
536 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
537 if (result->key.dptr == NULL) {
538 DEBUG(0, ("talloc failed\n"));
539 TALLOC_FREE(result);
540 return NULL;
541 }
542
543 result->store = db_ctdb_store_transaction;
544 result->delete_rec = db_ctdb_delete_transaction;
545
546 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
547 if (ctdb_data.dptr == NULL) {
548 /* create the record */
549 result->value = tdb_null;
550 return result;
551 }
552
553 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
554 result->value.dptr = NULL;
555
556 if ((result->value.dsize != 0)
557 && !(result->value.dptr = (uint8 *)talloc_memdup(
558 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
559 result->value.dsize))) {
560 DEBUG(0, ("talloc failed\n"));
561 TALLOC_FREE(result);
562 }
563
564 SAFE_FREE(ctdb_data.dptr);
565
566 return result;
567}
568
569static int db_ctdb_record_destructor(struct db_record **recp)
570{
571 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
572 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
573 rec->private_data, struct db_ctdb_transaction_handle);
574 int ret = h->ctx->db->transaction_commit(h->ctx->db);
575 if (ret != 0) {
576 DEBUG(0,(__location__ " transaction_commit failed\n"));
577 }
578 return 0;
579}
580
581/*
582 auto-create a transaction for persistent databases
583 */
584static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
585 TALLOC_CTX *mem_ctx,
586 TDB_DATA key)
587{
588 int res;
589 struct db_record *rec, **recp;
590
591 res = db_ctdb_transaction_start(ctx->db);
592 if (res == -1) {
593 return NULL;
594 }
595
596 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
597 if (rec == NULL) {
598 ctx->db->transaction_cancel(ctx->db);
599 return NULL;
600 }
601
602 /* destroy this transaction when we release the lock */
603 recp = talloc(rec, struct db_record *);
604 if (recp == NULL) {
605 ctx->db->transaction_cancel(ctx->db);
606 talloc_free(rec);
607 return NULL;
608 }
609 *recp = rec;
610 talloc_set_destructor(recp, db_ctdb_record_destructor);
611 return rec;
612}
613
614
615/*
616 stores a record inside a transaction
617 */
618static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
619 TDB_DATA key, TDB_DATA data)
620{
621 TALLOC_CTX *tmp_ctx = talloc_new(h);
622 int ret;
623 TDB_DATA rec;
624 struct ctdb_ltdb_header header;
625 NTSTATUS status;
626
627 /* we need the header so we can update the RSN */
628 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
629 if (rec.dptr == NULL) {
630 /* the record doesn't exist - create one with us as dmaster.
631 This is only safe because we are in a transaction and this
632 is a persistent database */
633 ZERO_STRUCT(header);
634 } else {
635 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
636 rec.dsize -= sizeof(struct ctdb_ltdb_header);
637 /* a special case, we are writing the same data that is there now */
638 if (data.dsize == rec.dsize &&
639 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
640 SAFE_FREE(rec.dptr);
641 talloc_free(tmp_ctx);
642 return 0;
643 }
644 SAFE_FREE(rec.dptr);
645 }
646
647 header.dmaster = get_my_vnn();
648 header.rsn++;
649
650 if (!h->in_replay) {
651 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
652 if (h->m_all == NULL) {
653 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
654 talloc_free(tmp_ctx);
655 return -1;
656 }
657 }
658
659 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
660 if (h->m_write == NULL) {
661 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
662 talloc_free(tmp_ctx);
663 return -1;
664 }
665
666 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
667 if (NT_STATUS_IS_OK(status)) {
668 ret = 0;
669 } else {
670 ret = -1;
671 }
672
673 talloc_free(tmp_ctx);
674
675 return ret;
676}
677
678
679/*
680 a record store inside a transaction
681 */
682static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
683{
684 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
685 rec->private_data, struct db_ctdb_transaction_handle);
686 int ret;
687
688 ret = db_ctdb_transaction_store(h, rec->key, data);
689 if (ret != 0) {
690 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
691 }
692 return NT_STATUS_OK;
693}
694
695/*
696 a record delete inside a transaction
697 */
698static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
699{
700 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
701 rec->private_data, struct db_ctdb_transaction_handle);
702 int ret;
703
704 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
705 if (ret != 0) {
706 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
707 }
708 return NT_STATUS_OK;
709}
710
711
712/*
713 replay a transaction
714 */
715static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
716{
717 int ret, i;
718 struct ctdb_rec_data *rec = NULL;
719
720 h->in_replay = true;
721 talloc_free(h->m_write);
722 h->m_write = NULL;
723
724 ret = db_ctdb_transaction_fetch_start(h);
725 if (ret != 0) {
726 return ret;
727 }
728
729 for (i=0;i<h->m_all->count;i++) {
730 TDB_DATA key, data;
731
732 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
733 if (rec == NULL) {
734 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
735 goto failed;
736 }
737
738 if (rec->reqid == 0) {
739 /* its a store */
740 if (db_ctdb_transaction_store(h, key, data) != 0) {
741 goto failed;
742 }
743 } else {
744 TDB_DATA data2;
745 TALLOC_CTX *tmp_ctx = talloc_new(h);
746
747 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
748 talloc_free(tmp_ctx);
749 goto failed;
750 }
751 if (data2.dsize != data.dsize ||
752 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
753 /* the record has changed on us - we have to give up */
754 talloc_free(tmp_ctx);
755 goto failed;
756 }
757 talloc_free(tmp_ctx);
758 }
759 }
760
761 return 0;
762
763failed:
764 tdb_transaction_cancel(h->ctx->wtdb->tdb);
765 return -1;
766}
767
768
769/*
770 commit a transaction
771 */
772static int db_ctdb_transaction_commit(struct db_context *db)
773{
774 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
775 struct db_ctdb_ctx);
776 NTSTATUS rets;
777 int ret;
778 int status;
779 int retries = 0;
780 struct db_ctdb_transaction_handle *h = ctx->transaction;
781 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
782
783 if (h == NULL) {
784 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
785 return -1;
786 }
787
788 if (h->nested_cancel) {
789 db->transaction_cancel(db);
790 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
791 return -1;
792 }
793
794 if (h->nesting != 0) {
795 h->nesting--;
796 return 0;
797 }
798
799 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
800
801 talloc_set_destructor(h, NULL);
802
803 /* our commit strategy is quite complex.
804
805 - we first try to commit the changes to all other nodes
806
807 - if that works, then we commit locally and we are done
808
809 - if a commit on another node fails, then we need to cancel
810 the transaction, then restart the transaction (thus
811 opening a window of time for a pending recovery to
812 complete), then replay the transaction, checking all the
813 reads and writes (checking that reads give the same data,
814 and writes succeed). Then we retry the transaction to the
815 other nodes
816 */
817
818again:
819 if (h->m_write == NULL) {
820 /* no changes were made, potentially after a retry */
821 tdb_transaction_cancel(h->ctx->wtdb->tdb);
822 talloc_free(h);
823 ctx->transaction = NULL;
824 return 0;
825 }
826
827 /* tell ctdbd to commit to the other nodes */
828 rets = ctdbd_control_local(messaging_ctdbd_connection(),
829 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
830 h->ctx->db_id, 0,
831 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
832 if (!NT_STATUS_IS_OK(rets) || status != 0) {
833 tdb_transaction_cancel(h->ctx->wtdb->tdb);
834 sleep(1);
835
836 if (!NT_STATUS_IS_OK(rets)) {
837 failure_control = CTDB_CONTROL_TRANS2_ERROR;
838 } else {
839 /* work out what error code we will give if we
840 have to fail the operation */
841 switch ((enum ctdb_trans2_commit_error)status) {
842 case CTDB_TRANS2_COMMIT_SUCCESS:
843 case CTDB_TRANS2_COMMIT_SOMEFAIL:
844 case CTDB_TRANS2_COMMIT_TIMEOUT:
845 failure_control = CTDB_CONTROL_TRANS2_ERROR;
846 break;
847 case CTDB_TRANS2_COMMIT_ALLFAIL:
848 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
849 break;
850 }
851 }
852
853 if (++retries == 100) {
854 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
855 h->ctx->db_id, retries, (unsigned)failure_control));
856 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
857 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
858 tdb_null, NULL, NULL, NULL);
859 h->ctx->transaction = NULL;
860 talloc_free(h);
861 ctx->transaction = NULL;
862 return -1;
863 }
864
865 if (ctdb_replay_transaction(h) != 0) {
866 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
867 (unsigned)failure_control));
868 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
869 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
870 tdb_null, NULL, NULL, NULL);
871 h->ctx->transaction = NULL;
872 talloc_free(h);
873 ctx->transaction = NULL;
874 return -1;
875 }
876 goto again;
877 } else {
878 failure_control = CTDB_CONTROL_TRANS2_ERROR;
879 }
880
881 /* do the real commit locally */
882 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
883 if (ret != 0) {
884 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
885 (unsigned)failure_control));
886 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
887 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
888 h->ctx->transaction = NULL;
889 talloc_free(h);
890 return ret;
891 }
892
893 /* tell ctdbd that we are finished with our local commit */
894 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
895 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
896 tdb_null, NULL, NULL, NULL);
897 h->ctx->transaction = NULL;
898 talloc_free(h);
899 return 0;
900}
901
902
903/*
904 cancel a transaction
905 */
906static int db_ctdb_transaction_cancel(struct db_context *db)
907{
908 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
909 struct db_ctdb_ctx);
910 struct db_ctdb_transaction_handle *h = ctx->transaction;
911
912 if (h == NULL) {
913 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
914 return -1;
915 }
916
917 if (h->nesting != 0) {
918 h->nesting--;
919 h->nested_cancel = true;
920 return 0;
921 }
922
923 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
924
925 ctx->transaction = NULL;
926 talloc_free(h);
927 return 0;
928}
929
930
931static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
932{
933 struct db_ctdb_rec *crec = talloc_get_type_abort(
934 rec->private_data, struct db_ctdb_rec);
935
936 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
937}
938
939
940
941static NTSTATUS db_ctdb_delete(struct db_record *rec)
942{
943 TDB_DATA data;
944
945 /*
946 * We have to store the header with empty data. TODO: Fix the
947 * tdb-level cleanup
948 */
949
950 ZERO_STRUCT(data);
951
952 return db_ctdb_store(rec, data, 0);
953
954}
955
956static int db_ctdb_record_destr(struct db_record* data)
957{
958 struct db_ctdb_rec *crec = talloc_get_type_abort(
959 data->private_data, struct db_ctdb_rec);
960
961 DEBUG(10, (DEBUGLEVEL > 10
962 ? "Unlocking db %u key %s\n"
963 : "Unlocking db %u key %.20s\n",
964 (int)crec->ctdb_ctx->db_id,
965 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
966 data->key.dsize)));
967
968 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
969 DEBUG(0, ("tdb_chainunlock failed\n"));
970 return -1;
971 }
972
973 return 0;
974}
975
976static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
977 TALLOC_CTX *mem_ctx,
978 TDB_DATA key,
979 bool persistent)
980{
981 struct db_record *result;
982 struct db_ctdb_rec *crec;
983 NTSTATUS status;
984 TDB_DATA ctdb_data;
985 int migrate_attempts = 0;
986
987 if (!(result = talloc(mem_ctx, struct db_record))) {
988 DEBUG(0, ("talloc failed\n"));
989 return NULL;
990 }
991
992 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
993 DEBUG(0, ("talloc failed\n"));
994 TALLOC_FREE(result);
995 return NULL;
996 }
997
998 result->private_data = (void *)crec;
999 crec->ctdb_ctx = ctx;
1000
1001 result->key.dsize = key.dsize;
1002 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
1003 if (result->key.dptr == NULL) {
1004 DEBUG(0, ("talloc failed\n"));
1005 TALLOC_FREE(result);
1006 return NULL;
1007 }
1008
1009 /*
1010 * Do a blocking lock on the record
1011 */
1012again:
1013
1014 if (DEBUGLEVEL >= 10) {
1015 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1016 DEBUG(10, (DEBUGLEVEL > 10
1017 ? "Locking db %u key %s\n"
1018 : "Locking db %u key %.20s\n",
1019 (int)crec->ctdb_ctx->db_id, keystr));
1020 TALLOC_FREE(keystr);
1021 }
1022
1023 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1024 DEBUG(3, ("tdb_chainlock failed\n"));
1025 TALLOC_FREE(result);
1026 return NULL;
1027 }
1028
1029 result->store = db_ctdb_store;
1030 result->delete_rec = db_ctdb_delete;
1031 talloc_set_destructor(result, db_ctdb_record_destr);
1032
1033 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1034
1035 /*
1036 * See if we have a valid record and we are the dmaster. If so, we can
1037 * take the shortcut and just return it.
1038 */
1039
1040 if ((ctdb_data.dptr == NULL) ||
1041 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1042 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1043#if 0
1044 || (random() % 2 != 0)
1045#endif
1046) {
1047 SAFE_FREE(ctdb_data.dptr);
1048 tdb_chainunlock(ctx->wtdb->tdb, key);
1049 talloc_set_destructor(result, NULL);
1050
1051 migrate_attempts += 1;
1052
1053 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1054 ctdb_data.dptr, ctdb_data.dptr ?
1055 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1056 get_my_vnn()));
1057
1058 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1059 if (!NT_STATUS_IS_OK(status)) {
1060 DEBUG(5, ("ctdb_migrate failed: %s\n",
1061 nt_errstr(status)));
1062 TALLOC_FREE(result);
1063 return NULL;
1064 }
1065 /* now its migrated, try again */
1066 goto again;
1067 }
1068
1069 if (migrate_attempts > 10) {
1070 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1071 migrate_attempts));
1072 }
1073
1074 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1075
1076 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1077 result->value.dptr = NULL;
1078
1079 if ((result->value.dsize != 0)
1080 && !(result->value.dptr = (uint8 *)talloc_memdup(
1081 result, ctdb_data.dptr + sizeof(crec->header),
1082 result->value.dsize))) {
1083 DEBUG(0, ("talloc failed\n"));
1084 TALLOC_FREE(result);
1085 }
1086
1087 SAFE_FREE(ctdb_data.dptr);
1088
1089 return result;
1090}
1091
1092static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1093 TALLOC_CTX *mem_ctx,
1094 TDB_DATA key)
1095{
1096 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1097 struct db_ctdb_ctx);
1098
1099 if (ctx->transaction != NULL) {
1100 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1101 }
1102
1103 if (db->persistent) {
1104 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1105 }
1106
1107 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1108}
1109
1110/*
1111 fetch (unlocked, no migration) operation on ctdb
1112 */
1113static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1114 TDB_DATA key, TDB_DATA *data)
1115{
1116 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1117 struct db_ctdb_ctx);
1118 NTSTATUS status;
1119 TDB_DATA ctdb_data;
1120
1121 if (ctx->transaction) {
1122 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1123 }
1124
1125 /* try a direct fetch */
1126 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1127
1128 /*
1129 * See if we have a valid record and we are the dmaster. If so, we can
1130 * take the shortcut and just return it.
1131 * we bypass the dmaster check for persistent databases
1132 */
1133 if ((ctdb_data.dptr != NULL) &&
1134 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1135 (db->persistent ||
1136 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1137 /* we are the dmaster - avoid the ctdb protocol op */
1138
1139 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1140 if (data->dsize == 0) {
1141 SAFE_FREE(ctdb_data.dptr);
1142 data->dptr = NULL;
1143 return 0;
1144 }
1145
1146 data->dptr = (uint8 *)talloc_memdup(
1147 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1148 data->dsize);
1149
1150 SAFE_FREE(ctdb_data.dptr);
1151
1152 if (data->dptr == NULL) {
1153 return -1;
1154 }
1155 return 0;
1156 }
1157
1158 SAFE_FREE(ctdb_data.dptr);
1159
1160 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1161 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1162 if (!NT_STATUS_IS_OK(status)) {
1163 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1164 return -1;
1165 }
1166
1167 return 0;
1168}
1169
1170struct traverse_state {
1171 struct db_context *db;
1172 int (*fn)(struct db_record *rec, void *private_data);
1173 void *private_data;
1174};
1175
1176static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1177{
1178 struct traverse_state *state = (struct traverse_state *)private_data;
1179 struct db_record *rec;
1180 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1181 /* we have to give them a locked record to prevent races */
1182 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1183 if (rec && rec->value.dsize > 0) {
1184 state->fn(rec, state->private_data);
1185 }
1186 talloc_free(tmp_ctx);
1187}
1188
1189static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1190 void *private_data)
1191{
1192 struct traverse_state *state = (struct traverse_state *)private_data;
1193 struct db_record *rec;
1194 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1195 int ret = 0;
1196 /* we have to give them a locked record to prevent races */
1197 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1198 if (rec && rec->value.dsize > 0) {
1199 ret = state->fn(rec, state->private_data);
1200 }
1201 talloc_free(tmp_ctx);
1202 return ret;
1203}
1204
1205static int db_ctdb_traverse(struct db_context *db,
1206 int (*fn)(struct db_record *rec,
1207 void *private_data),
1208 void *private_data)
1209{
1210 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1211 struct db_ctdb_ctx);
1212 struct traverse_state state;
1213
1214 state.db = db;
1215 state.fn = fn;
1216 state.private_data = private_data;
1217
1218 if (db->persistent) {
1219 /* for persistent databases we don't need to do a ctdb traverse,
1220 we can do a faster local traverse */
1221 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1222 }
1223
1224
1225 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1226 return 0;
1227}
1228
1229static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1230{
1231 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1232}
1233
1234static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1235{
1236 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1237}
1238
1239static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1240{
1241 struct traverse_state *state = (struct traverse_state *)private_data;
1242 struct db_record rec;
1243 rec.key = key;
1244 rec.value = data;
1245 rec.store = db_ctdb_store_deny;
1246 rec.delete_rec = db_ctdb_delete_deny;
1247 rec.private_data = state->db;
1248 state->fn(&rec, state->private_data);
1249}
1250
1251static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1252 void *private_data)
1253{
1254 struct traverse_state *state = (struct traverse_state *)private_data;
1255 struct db_record rec;
1256 rec.key = kbuf;
1257 rec.value = dbuf;
1258 rec.store = db_ctdb_store_deny;
1259 rec.delete_rec = db_ctdb_delete_deny;
1260 rec.private_data = state->db;
1261
1262 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1263 /* a deleted record */
1264 return 0;
1265 }
1266 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1267 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1268
1269 return state->fn(&rec, state->private_data);
1270}
1271
1272static int db_ctdb_traverse_read(struct db_context *db,
1273 int (*fn)(struct db_record *rec,
1274 void *private_data),
1275 void *private_data)
1276{
1277 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1278 struct db_ctdb_ctx);
1279 struct traverse_state state;
1280
1281 state.db = db;
1282 state.fn = fn;
1283 state.private_data = private_data;
1284
1285 if (db->persistent) {
1286 /* for persistent databases we don't need to do a ctdb traverse,
1287 we can do a faster local traverse */
1288 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1289 }
1290
1291 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1292 return 0;
1293}
1294
1295static int db_ctdb_get_seqnum(struct db_context *db)
1296{
1297 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1298 struct db_ctdb_ctx);
1299 return tdb_get_seqnum(ctx->wtdb->tdb);
1300}
1301
1302static int db_ctdb_get_flags(struct db_context *db)
1303{
1304 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1305 struct db_ctdb_ctx);
1306 return tdb_get_flags(ctx->wtdb->tdb);
1307}
1308
1309struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1310 const char *name,
1311 int hash_size, int tdb_flags,
1312 int open_flags, mode_t mode)
1313{
1314 struct db_context *result;
1315 struct db_ctdb_ctx *db_ctdb;
1316 char *db_path;
1317
1318 if (!lp_clustering()) {
1319 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1320 return NULL;
1321 }
1322
1323 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1324 DEBUG(0, ("talloc failed\n"));
1325 TALLOC_FREE(result);
1326 return NULL;
1327 }
1328
1329 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1330 DEBUG(0, ("talloc failed\n"));
1331 TALLOC_FREE(result);
1332 return NULL;
1333 }
1334
1335 db_ctdb->transaction = NULL;
1336 db_ctdb->db = result;
1337
1338 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1339 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1340 TALLOC_FREE(result);
1341 return NULL;
1342 }
1343
1344 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1345
1346 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1347
1348 /* only pass through specific flags */
1349 tdb_flags &= TDB_SEQNUM;
1350
1351 /* honor permissions if user has specified O_CREAT */
1352 if (open_flags & O_CREAT) {
1353 chmod(db_path, mode);
1354 }
1355
1356 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1357 if (db_ctdb->wtdb == NULL) {
1358 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1359 TALLOC_FREE(result);
1360 return NULL;
1361 }
1362 talloc_free(db_path);
1363
1364 result->private_data = (void *)db_ctdb;
1365 result->fetch_locked = db_ctdb_fetch_locked;
1366 result->fetch = db_ctdb_fetch;
1367 result->traverse = db_ctdb_traverse;
1368 result->traverse_read = db_ctdb_traverse_read;
1369 result->get_seqnum = db_ctdb_get_seqnum;
1370 result->get_flags = db_ctdb_get_flags;
1371 result->transaction_start = db_ctdb_transaction_start;
1372 result->transaction_commit = db_ctdb_transaction_commit;
1373 result->transaction_cancel = db_ctdb_transaction_cancel;
1374
1375 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1376 name, db_ctdb->db_id));
1377
1378 return result;
1379}
1380#endif
Note: See TracBrowser for help on using the repository browser.