source: branches/samba-3.3.x/source/lib/dbwrap_ctdb.c

Last change on this file was 223, checked in by Herwig Bauernfeind, 16 years ago

Update Samba 3.3 branch to 3.3.3

File size: 32.2 KB
Line 
1/*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "includes.h"
21#ifdef CLUSTER_SUPPORT
22#include "ctdb.h"
23#include "ctdb_private.h"
24#include "ctdbd_conn.h"
25
26struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
31 */
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
34 uint32_t nesting;
35 bool nested_cancel;
36};
37
38struct db_ctdb_ctx {
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
41 uint32 db_id;
42 struct db_ctdb_transaction_handle *transaction;
43};
44
45struct db_ctdb_rec {
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
48};
49
50static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51 TALLOC_CTX *mem_ctx,
52 TDB_DATA key,
53 bool persistent);
54
55static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56{
57 NTSTATUS status;
58 enum TDB_ERROR tret = tdb_error(tdb);
59
60 switch (tret) {
61 case TDB_ERR_EXISTS:
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 break;
64 case TDB_ERR_NOEXIST:
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 break;
67 default:
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69 break;
70 }
71
72 return status;
73}
74
75
76
77/*
78 form a ctdb_rec_data record from a key/data pair
79
80 note that header may be NULL. If not NULL then it is included in the data portion
81 of the record
82 */
83static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
84 TDB_DATA key,
85 struct ctdb_ltdb_header *header,
86 TDB_DATA data)
87{
88 size_t length;
89 struct ctdb_rec_data *d;
90
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94 if (d == NULL) {
95 return NULL;
96 }
97 d->length = length;
98 d->reqid = reqid;
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
101 if (header) {
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105 } else {
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
108 }
109 return d;
110}
111
112
113/* helper function for marshalling multiple records */
114static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
116 uint64_t db_id,
117 uint32_t reqid,
118 TDB_DATA key,
119 struct ctdb_ltdb_header *header,
120 TDB_DATA data)
121{
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2 = NULL;
125
126 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
127 if (r == NULL) {
128 talloc_free(m);
129 return NULL;
130 }
131
132 if (m == NULL) {
133 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135 if (m == NULL) {
136 goto done;
137 }
138 m->db_id = db_id;
139 }
140
141 m_size = talloc_get_size(m);
142 r_size = talloc_get_size(r);
143
144 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145 mem_ctx, m, m_size + r_size);
146 if (m2 == NULL) {
147 talloc_free(m);
148 goto done;
149 }
150
151 memcpy(m_size + (uint8_t *)m2, r, r_size);
152
153 m2->count++;
154
155done:
156 talloc_free(r);
157 return m2;
158}
159
160/* we've finished marshalling, return a data blob with the marshalled records */
161static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
162{
163 TDB_DATA data;
164 data.dptr = (uint8_t *)m;
165 data.dsize = talloc_get_size(m);
166 return data;
167}
168
169/*
170 loop over a marshalling buffer
171
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
174*/
175static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176 uint32_t *reqid,
177 struct ctdb_ltdb_header *header,
178 TDB_DATA *key, TDB_DATA *data)
179{
180 if (r == NULL) {
181 r = (struct ctdb_rec_data *)&m->data[0];
182 } else {
183 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184 }
185
186 if (reqid != NULL) {
187 *reqid = r->reqid;
188 }
189
190 if (key != NULL) {
191 key->dptr = &r->data[0];
192 key->dsize = r->keylen;
193 }
194 if (data != NULL) {
195 data->dptr = &r->data[r->keylen];
196 data->dsize = r->datalen;
197 if (header != NULL) {
198 data->dptr += sizeof(*header);
199 data->dsize -= sizeof(*header);
200 }
201 }
202
203 if (header != NULL) {
204 if (r->datalen < sizeof(*header)) {
205 return NULL;
206 }
207 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208 }
209
210 return r;
211}
212
213
214
215/* start a transaction on a database */
216static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
217{
218 tdb_transaction_cancel(h->ctx->wtdb->tdb);
219 return 0;
220}
221
222/* start a transaction on a database */
223static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
224{
225 struct db_record *rh;
226 TDB_DATA key;
227 TALLOC_CTX *tmp_ctx;
228 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
229 int ret;
230 struct db_ctdb_ctx *ctx = h->ctx;
231 TDB_DATA data;
232
233 key.dptr = (uint8_t *)discard_const(keyname);
234 key.dsize = strlen(keyname);
235
236again:
237 tmp_ctx = talloc_new(h);
238
239 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
240 if (rh == NULL) {
241 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
242 talloc_free(tmp_ctx);
243 return -1;
244 }
245 talloc_free(rh);
246
247 ret = tdb_transaction_start(ctx->wtdb->tdb);
248 if (ret != 0) {
249 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250 talloc_free(tmp_ctx);
251 return -1;
252 }
253
254 data = tdb_fetch(ctx->wtdb->tdb, key);
255 if ((data.dptr == NULL) ||
256 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258 SAFE_FREE(data.dptr);
259 tdb_transaction_cancel(ctx->wtdb->tdb);
260 talloc_free(tmp_ctx);
261 goto again;
262 }
263
264 SAFE_FREE(data.dptr);
265 talloc_free(tmp_ctx);
266
267 return 0;
268}
269
270
271/* start a transaction on a database */
272static int db_ctdb_transaction_start(struct db_context *db)
273{
274 struct db_ctdb_transaction_handle *h;
275 int ret;
276 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
277 struct db_ctdb_ctx);
278
279 if (!db->persistent) {
280 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
281 ctx->db_id));
282 return -1;
283 }
284
285 if (ctx->transaction) {
286 ctx->transaction->nesting++;
287 return 0;
288 }
289
290 h = talloc_zero(db, struct db_ctdb_transaction_handle);
291 if (h == NULL) {
292 DEBUG(0,(__location__ " oom for transaction handle\n"));
293 return -1;
294 }
295
296 h->ctx = ctx;
297
298 ret = db_ctdb_transaction_fetch_start(h);
299 if (ret != 0) {
300 talloc_free(h);
301 return -1;
302 }
303
304 talloc_set_destructor(h, db_ctdb_transaction_destructor);
305
306 ctx->transaction = h;
307
308 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
309
310 return 0;
311}
312
313
314
315/*
316 fetch a record inside a transaction
317 */
318static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
319 TALLOC_CTX *mem_ctx,
320 TDB_DATA key, TDB_DATA *data)
321{
322 struct db_ctdb_transaction_handle *h = db->transaction;
323
324 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
325
326 if (data->dptr != NULL) {
327 uint8_t *oldptr = (uint8_t *)data->dptr;
328 data->dsize -= sizeof(struct ctdb_ltdb_header);
329 if (data->dsize == 0) {
330 data->dptr = NULL;
331 } else {
332 data->dptr = (uint8 *)
333 talloc_memdup(
334 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
335 data->dsize);
336 }
337 SAFE_FREE(oldptr);
338 if (data->dptr == NULL && data->dsize != 0) {
339 return -1;
340 }
341 }
342
343 if (!h->in_replay) {
344 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345 if (h->m_all == NULL) {
346 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
347 data->dsize = 0;
348 talloc_free(data->dptr);
349 return -1;
350 }
351 }
352
353 return 0;
354}
355
356
357static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
359
360static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
361 TALLOC_CTX *mem_ctx,
362 TDB_DATA key)
363{
364 struct db_record *result;
365 TDB_DATA ctdb_data;
366
367 if (!(result = talloc(mem_ctx, struct db_record))) {
368 DEBUG(0, ("talloc failed\n"));
369 return NULL;
370 }
371
372 result->private_data = ctx->transaction;
373
374 result->key.dsize = key.dsize;
375 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376 if (result->key.dptr == NULL) {
377 DEBUG(0, ("talloc failed\n"));
378 TALLOC_FREE(result);
379 return NULL;
380 }
381
382 result->store = db_ctdb_store_transaction;
383 result->delete_rec = db_ctdb_delete_transaction;
384
385 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386 if (ctdb_data.dptr == NULL) {
387 /* create the record */
388 result->value = tdb_null;
389 return result;
390 }
391
392 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393 result->value.dptr = NULL;
394
395 if ((result->value.dsize != 0)
396 && !(result->value.dptr = (uint8 *)talloc_memdup(
397 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398 result->value.dsize))) {
399 DEBUG(0, ("talloc failed\n"));
400 TALLOC_FREE(result);
401 }
402
403 SAFE_FREE(ctdb_data.dptr);
404
405 return result;
406}
407
408static int db_ctdb_record_destructor(struct db_record **recp)
409{
410 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
411 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
412 rec->private_data, struct db_ctdb_transaction_handle);
413 int ret = h->ctx->db->transaction_commit(h->ctx->db);
414 if (ret != 0) {
415 DEBUG(0,(__location__ " transaction_commit failed\n"));
416 }
417 return 0;
418}
419
420/*
421 auto-create a transaction for persistent databases
422 */
423static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
424 TALLOC_CTX *mem_ctx,
425 TDB_DATA key)
426{
427 int res;
428 struct db_record *rec, **recp;
429
430 res = db_ctdb_transaction_start(ctx->db);
431 if (res == -1) {
432 return NULL;
433 }
434
435 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
436 if (rec == NULL) {
437 ctx->db->transaction_cancel(ctx->db);
438 return NULL;
439 }
440
441 /* destroy this transaction when we release the lock */
442 recp = talloc(rec, struct db_record *);
443 if (recp == NULL) {
444 ctx->db->transaction_cancel(ctx->db);
445 talloc_free(rec);
446 return NULL;
447 }
448 *recp = rec;
449 talloc_set_destructor(recp, db_ctdb_record_destructor);
450 return rec;
451}
452
453
454/*
455 stores a record inside a transaction
456 */
457static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
458 TDB_DATA key, TDB_DATA data)
459{
460 TALLOC_CTX *tmp_ctx = talloc_new(h);
461 int ret;
462 TDB_DATA rec;
463 struct ctdb_ltdb_header header;
464
465 /* we need the header so we can update the RSN */
466 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
467 if (rec.dptr == NULL) {
468 /* the record doesn't exist - create one with us as dmaster.
469 This is only safe because we are in a transaction and this
470 is a persistent database */
471 ZERO_STRUCT(header);
472 header.dmaster = get_my_vnn();
473 } else {
474 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
475 rec.dsize -= sizeof(struct ctdb_ltdb_header);
476 /* a special case, we are writing the same data that is there now */
477 if (data.dsize == rec.dsize &&
478 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
479 SAFE_FREE(rec.dptr);
480 talloc_free(tmp_ctx);
481 return 0;
482 }
483 SAFE_FREE(rec.dptr);
484 }
485
486 header.rsn++;
487
488 if (!h->in_replay) {
489 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
490 if (h->m_all == NULL) {
491 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
492 talloc_free(tmp_ctx);
493 return -1;
494 }
495 }
496
497 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
498 if (h->m_write == NULL) {
499 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
500 talloc_free(tmp_ctx);
501 return -1;
502 }
503
504 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
505 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
506 if (rec.dptr == NULL) {
507 DEBUG(0,(__location__ " Failed to alloc record\n"));
508 talloc_free(tmp_ctx);
509 return -1;
510 }
511 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
512 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
513
514 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
515
516 talloc_free(tmp_ctx);
517
518 return ret;
519}
520
521
522/*
523 a record store inside a transaction
524 */
525static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
526{
527 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
528 rec->private_data, struct db_ctdb_transaction_handle);
529 int ret;
530
531 ret = db_ctdb_transaction_store(h, rec->key, data);
532 if (ret != 0) {
533 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
534 }
535 return NT_STATUS_OK;
536}
537
538/*
539 a record delete inside a transaction
540 */
541static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
542{
543 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
544 rec->private_data, struct db_ctdb_transaction_handle);
545 int ret;
546
547 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
548 if (ret != 0) {
549 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
550 }
551 return NT_STATUS_OK;
552}
553
554
555/*
556 replay a transaction
557 */
558static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
559{
560 int ret, i;
561 struct ctdb_rec_data *rec = NULL;
562
563 h->in_replay = true;
564 talloc_free(h->m_write);
565 h->m_write = NULL;
566
567 ret = db_ctdb_transaction_fetch_start(h);
568 if (ret != 0) {
569 return ret;
570 }
571
572 for (i=0;i<h->m_all->count;i++) {
573 TDB_DATA key, data;
574
575 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
576 if (rec == NULL) {
577 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
578 goto failed;
579 }
580
581 if (rec->reqid == 0) {
582 /* its a store */
583 if (db_ctdb_transaction_store(h, key, data) != 0) {
584 goto failed;
585 }
586 } else {
587 TDB_DATA data2;
588 TALLOC_CTX *tmp_ctx = talloc_new(h);
589
590 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
591 talloc_free(tmp_ctx);
592 goto failed;
593 }
594 if (data2.dsize != data.dsize ||
595 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
596 /* the record has changed on us - we have to give up */
597 talloc_free(tmp_ctx);
598 goto failed;
599 }
600 talloc_free(tmp_ctx);
601 }
602 }
603
604 return 0;
605
606failed:
607 tdb_transaction_cancel(h->ctx->wtdb->tdb);
608 return -1;
609}
610
611
612/*
613 commit a transaction
614 */
615static int db_ctdb_transaction_commit(struct db_context *db)
616{
617 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
618 struct db_ctdb_ctx);
619 NTSTATUS rets;
620 int ret;
621 int status;
622 int retries = 0;
623 struct db_ctdb_transaction_handle *h = ctx->transaction;
624 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
625
626 if (h == NULL) {
627 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
628 return -1;
629 }
630
631 if (h->nested_cancel) {
632 db->transaction_cancel(db);
633 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
634 return -1;
635 }
636
637 if (h->nesting != 0) {
638 h->nesting--;
639 return 0;
640 }
641
642 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
643
644 talloc_set_destructor(h, NULL);
645
646 /* our commit strategy is quite complex.
647
648 - we first try to commit the changes to all other nodes
649
650 - if that works, then we commit locally and we are done
651
652 - if a commit on another node fails, then we need to cancel
653 the transaction, then restart the transaction (thus
654 opening a window of time for a pending recovery to
655 complete), then replay the transaction, checking all the
656 reads and writes (checking that reads give the same data,
657 and writes succeed). Then we retry the transaction to the
658 other nodes
659 */
660
661again:
662 if (h->m_write == NULL) {
663 /* no changes were made, potentially after a retry */
664 tdb_transaction_cancel(h->ctx->wtdb->tdb);
665 talloc_free(h);
666 ctx->transaction = NULL;
667 return 0;
668 }
669
670 /* tell ctdbd to commit to the other nodes */
671 rets = ctdbd_control_local(messaging_ctdbd_connection(),
672 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
673 h->ctx->db_id, 0,
674 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
675 if (!NT_STATUS_IS_OK(rets) || status != 0) {
676 tdb_transaction_cancel(h->ctx->wtdb->tdb);
677 sleep(1);
678
679 if (!NT_STATUS_IS_OK(rets)) {
680 failure_control = CTDB_CONTROL_TRANS2_ERROR;
681 } else {
682 /* work out what error code we will give if we
683 have to fail the operation */
684 switch ((enum ctdb_trans2_commit_error)status) {
685 case CTDB_TRANS2_COMMIT_SUCCESS:
686 case CTDB_TRANS2_COMMIT_SOMEFAIL:
687 case CTDB_TRANS2_COMMIT_TIMEOUT:
688 failure_control = CTDB_CONTROL_TRANS2_ERROR;
689 break;
690 case CTDB_TRANS2_COMMIT_ALLFAIL:
691 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
692 break;
693 }
694 }
695
696 if (++retries == 5) {
697 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
698 h->ctx->db_id, retries, (unsigned)failure_control));
699 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
700 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
701 tdb_null, NULL, NULL, NULL);
702 h->ctx->transaction = NULL;
703 talloc_free(h);
704 ctx->transaction = NULL;
705 return -1;
706 }
707
708 if (ctdb_replay_transaction(h) != 0) {
709 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
710 (unsigned)failure_control));
711 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
712 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
713 tdb_null, NULL, NULL, NULL);
714 h->ctx->transaction = NULL;
715 talloc_free(h);
716 ctx->transaction = NULL;
717 return -1;
718 }
719 goto again;
720 } else {
721 failure_control = CTDB_CONTROL_TRANS2_ERROR;
722 }
723
724 /* do the real commit locally */
725 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
726 if (ret != 0) {
727 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
728 (unsigned)failure_control));
729 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
730 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
731 h->ctx->transaction = NULL;
732 talloc_free(h);
733 return ret;
734 }
735
736 /* tell ctdbd that we are finished with our local commit */
737 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
738 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
739 tdb_null, NULL, NULL, NULL);
740 h->ctx->transaction = NULL;
741 talloc_free(h);
742 return 0;
743}
744
745
746/*
747 cancel a transaction
748 */
749static int db_ctdb_transaction_cancel(struct db_context *db)
750{
751 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
752 struct db_ctdb_ctx);
753 struct db_ctdb_transaction_handle *h = ctx->transaction;
754
755 if (h == NULL) {
756 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
757 return -1;
758 }
759
760 if (h->nesting != 0) {
761 h->nesting--;
762 h->nested_cancel = true;
763 return 0;
764 }
765
766 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
767
768 ctx->transaction = NULL;
769 talloc_free(h);
770 return 0;
771}
772
773
774static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
775{
776 struct db_ctdb_rec *crec = talloc_get_type_abort(
777 rec->private_data, struct db_ctdb_rec);
778 TDB_DATA cdata;
779 int ret;
780
781 cdata.dsize = sizeof(crec->header) + data.dsize;
782
783 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
784 return NT_STATUS_NO_MEMORY;
785 }
786
787 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
788 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
789
790 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
791
792 SAFE_FREE(cdata.dptr);
793
794 return (ret == 0) ? NT_STATUS_OK
795 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
796}
797
798
799
800static NTSTATUS db_ctdb_delete(struct db_record *rec)
801{
802 TDB_DATA data;
803
804 /*
805 * We have to store the header with empty data. TODO: Fix the
806 * tdb-level cleanup
807 */
808
809 ZERO_STRUCT(data);
810
811 return db_ctdb_store(rec, data, 0);
812
813}
814
815static int db_ctdb_record_destr(struct db_record* data)
816{
817 struct db_ctdb_rec *crec = talloc_get_type_abort(
818 data->private_data, struct db_ctdb_rec);
819
820 DEBUG(10, (DEBUGLEVEL > 10
821 ? "Unlocking db %u key %s\n"
822 : "Unlocking db %u key %.20s\n",
823 (int)crec->ctdb_ctx->db_id,
824 hex_encode(data, (unsigned char *)data->key.dptr,
825 data->key.dsize)));
826
827 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
828 DEBUG(0, ("tdb_chainunlock failed\n"));
829 return -1;
830 }
831
832 return 0;
833}
834
835static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
836 TALLOC_CTX *mem_ctx,
837 TDB_DATA key,
838 bool persistent)
839{
840 struct db_record *result;
841 struct db_ctdb_rec *crec;
842 NTSTATUS status;
843 TDB_DATA ctdb_data;
844 int migrate_attempts = 0;
845
846 if (!(result = talloc(mem_ctx, struct db_record))) {
847 DEBUG(0, ("talloc failed\n"));
848 return NULL;
849 }
850
851 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
852 DEBUG(0, ("talloc failed\n"));
853 TALLOC_FREE(result);
854 return NULL;
855 }
856
857 result->private_data = (void *)crec;
858 crec->ctdb_ctx = ctx;
859
860 result->key.dsize = key.dsize;
861 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
862 if (result->key.dptr == NULL) {
863 DEBUG(0, ("talloc failed\n"));
864 TALLOC_FREE(result);
865 return NULL;
866 }
867
868 /*
869 * Do a blocking lock on the record
870 */
871again:
872
873 if (DEBUGLEVEL >= 10) {
874 char *keystr = hex_encode(result, key.dptr, key.dsize);
875 DEBUG(10, (DEBUGLEVEL > 10
876 ? "Locking db %u key %s\n"
877 : "Locking db %u key %.20s\n",
878 (int)crec->ctdb_ctx->db_id, keystr));
879 TALLOC_FREE(keystr);
880 }
881
882 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
883 DEBUG(3, ("tdb_chainlock failed\n"));
884 TALLOC_FREE(result);
885 return NULL;
886 }
887
888 result->store = db_ctdb_store;
889 result->delete_rec = db_ctdb_delete;
890 talloc_set_destructor(result, db_ctdb_record_destr);
891
892 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
893
894 /*
895 * See if we have a valid record and we are the dmaster. If so, we can
896 * take the shortcut and just return it.
897 */
898
899 if ((ctdb_data.dptr == NULL) ||
900 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
901 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
902#if 0
903 || (random() % 2 != 0)
904#endif
905) {
906 SAFE_FREE(ctdb_data.dptr);
907 tdb_chainunlock(ctx->wtdb->tdb, key);
908 talloc_set_destructor(result, NULL);
909
910 migrate_attempts += 1;
911
912 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
913 ctdb_data.dptr, ctdb_data.dptr ?
914 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
915 get_my_vnn()));
916
917 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
918 if (!NT_STATUS_IS_OK(status)) {
919 DEBUG(5, ("ctdb_migrate failed: %s\n",
920 nt_errstr(status)));
921 TALLOC_FREE(result);
922 return NULL;
923 }
924 /* now its migrated, try again */
925 goto again;
926 }
927
928 if (migrate_attempts > 10) {
929 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
930 migrate_attempts));
931 }
932
933 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
934
935 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
936 result->value.dptr = NULL;
937
938 if ((result->value.dsize != 0)
939 && !(result->value.dptr = (uint8 *)talloc_memdup(
940 result, ctdb_data.dptr + sizeof(crec->header),
941 result->value.dsize))) {
942 DEBUG(0, ("talloc failed\n"));
943 TALLOC_FREE(result);
944 }
945
946 SAFE_FREE(ctdb_data.dptr);
947
948 return result;
949}
950
951static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
952 TALLOC_CTX *mem_ctx,
953 TDB_DATA key)
954{
955 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
956 struct db_ctdb_ctx);
957
958 if (ctx->transaction != NULL) {
959 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
960 }
961
962 if (db->persistent) {
963 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
964 }
965
966 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
967}
968
969/*
970 fetch (unlocked, no migration) operation on ctdb
971 */
972static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
973 TDB_DATA key, TDB_DATA *data)
974{
975 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
976 struct db_ctdb_ctx);
977 NTSTATUS status;
978 TDB_DATA ctdb_data;
979
980 if (ctx->transaction) {
981 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
982 }
983
984 /* try a direct fetch */
985 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
986
987 /*
988 * See if we have a valid record and we are the dmaster. If so, we can
989 * take the shortcut and just return it.
990 * we bypass the dmaster check for persistent databases
991 */
992 if ((ctdb_data.dptr != NULL) &&
993 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
994 (db->persistent ||
995 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
996 /* we are the dmaster - avoid the ctdb protocol op */
997
998 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
999 if (data->dsize == 0) {
1000 SAFE_FREE(ctdb_data.dptr);
1001 data->dptr = NULL;
1002 return 0;
1003 }
1004
1005 data->dptr = (uint8 *)talloc_memdup(
1006 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1007 data->dsize);
1008
1009 SAFE_FREE(ctdb_data.dptr);
1010
1011 if (data->dptr == NULL) {
1012 return -1;
1013 }
1014 return 0;
1015 }
1016
1017 SAFE_FREE(ctdb_data.dptr);
1018
1019 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1020 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1021 if (!NT_STATUS_IS_OK(status)) {
1022 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1023 return -1;
1024 }
1025
1026 return 0;
1027}
1028
1029struct traverse_state {
1030 struct db_context *db;
1031 int (*fn)(struct db_record *rec, void *private_data);
1032 void *private_data;
1033};
1034
1035static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1036{
1037 struct traverse_state *state = (struct traverse_state *)private_data;
1038 struct db_record *rec;
1039 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1040 /* we have to give them a locked record to prevent races */
1041 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1042 if (rec && rec->value.dsize > 0) {
1043 state->fn(rec, state->private_data);
1044 }
1045 talloc_free(tmp_ctx);
1046}
1047
1048static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1049 void *private_data)
1050{
1051 struct traverse_state *state = (struct traverse_state *)private_data;
1052 struct db_record *rec;
1053 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1054 int ret = 0;
1055 /* we have to give them a locked record to prevent races */
1056 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1057 if (rec && rec->value.dsize > 0) {
1058 ret = state->fn(rec, state->private_data);
1059 }
1060 talloc_free(tmp_ctx);
1061 return ret;
1062}
1063
1064static int db_ctdb_traverse(struct db_context *db,
1065 int (*fn)(struct db_record *rec,
1066 void *private_data),
1067 void *private_data)
1068{
1069 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1070 struct db_ctdb_ctx);
1071 struct traverse_state state;
1072
1073 state.db = db;
1074 state.fn = fn;
1075 state.private_data = private_data;
1076
1077 if (db->persistent) {
1078 /* for persistent databases we don't need to do a ctdb traverse,
1079 we can do a faster local traverse */
1080 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1081 }
1082
1083
1084 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1085 return 0;
1086}
1087
1088static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1089{
1090 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1091}
1092
1093static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1094{
1095 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1096}
1097
1098static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1099{
1100 struct traverse_state *state = (struct traverse_state *)private_data;
1101 struct db_record rec;
1102 rec.key = key;
1103 rec.value = data;
1104 rec.store = db_ctdb_store_deny;
1105 rec.delete_rec = db_ctdb_delete_deny;
1106 rec.private_data = state->db;
1107 state->fn(&rec, state->private_data);
1108}
1109
1110static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1111 void *private_data)
1112{
1113 struct traverse_state *state = (struct traverse_state *)private_data;
1114 struct db_record rec;
1115 rec.key = kbuf;
1116 rec.value = dbuf;
1117 rec.store = db_ctdb_store_deny;
1118 rec.delete_rec = db_ctdb_delete_deny;
1119 rec.private_data = state->db;
1120
1121 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1122 /* a deleted record */
1123 return 0;
1124 }
1125 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1126 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1127
1128 return state->fn(&rec, state->private_data);
1129}
1130
1131static int db_ctdb_traverse_read(struct db_context *db,
1132 int (*fn)(struct db_record *rec,
1133 void *private_data),
1134 void *private_data)
1135{
1136 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1137 struct db_ctdb_ctx);
1138 struct traverse_state state;
1139
1140 state.db = db;
1141 state.fn = fn;
1142 state.private_data = private_data;
1143
1144 if (db->persistent) {
1145 /* for persistent databases we don't need to do a ctdb traverse,
1146 we can do a faster local traverse */
1147 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1148 }
1149
1150 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1151 return 0;
1152}
1153
1154static int db_ctdb_get_seqnum(struct db_context *db)
1155{
1156 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1157 struct db_ctdb_ctx);
1158 return tdb_get_seqnum(ctx->wtdb->tdb);
1159}
1160
1161struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1162 const char *name,
1163 int hash_size, int tdb_flags,
1164 int open_flags, mode_t mode)
1165{
1166 struct db_context *result;
1167 struct db_ctdb_ctx *db_ctdb;
1168 char *db_path;
1169
1170 if (!lp_clustering()) {
1171 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1172 return NULL;
1173 }
1174
1175 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1176 DEBUG(0, ("talloc failed\n"));
1177 TALLOC_FREE(result);
1178 return NULL;
1179 }
1180
1181 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1182 DEBUG(0, ("talloc failed\n"));
1183 TALLOC_FREE(result);
1184 return NULL;
1185 }
1186
1187 db_ctdb->transaction = NULL;
1188 db_ctdb->db = result;
1189
1190 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1191 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1192 TALLOC_FREE(result);
1193 return NULL;
1194 }
1195
1196 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1197
1198 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1199
1200 /* only pass through specific flags */
1201 tdb_flags &= TDB_SEQNUM;
1202
1203 /* honor permissions if user has specified O_CREAT */
1204 if (open_flags & O_CREAT) {
1205 chmod(db_path, mode);
1206 }
1207
1208 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1209 if (db_ctdb->wtdb == NULL) {
1210 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1211 TALLOC_FREE(result);
1212 return NULL;
1213 }
1214 talloc_free(db_path);
1215
1216 result->private_data = (void *)db_ctdb;
1217 result->fetch_locked = db_ctdb_fetch_locked;
1218 result->fetch = db_ctdb_fetch;
1219 result->traverse = db_ctdb_traverse;
1220 result->traverse_read = db_ctdb_traverse_read;
1221 result->get_seqnum = db_ctdb_get_seqnum;
1222 result->transaction_start = db_ctdb_transaction_start;
1223 result->transaction_commit = db_ctdb_transaction_commit;
1224 result->transaction_cancel = db_ctdb_transaction_cancel;
1225
1226 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1227 name, db_ctdb->db_id));
1228
1229 return result;
1230}
1231#endif
Note: See TracBrowser for help on using the repository browser.