source: vendor/current/ctdb/client/client_db.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 49.0 KB
Line 
1/*
2 CTDB client code
3
4 Copyright (C) Amitay Isaacs 2015
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "replace.h"
21#include "system/network.h"
22#include "system/filesys.h"
23
24#include <talloc.h>
25#include <tevent.h>
26#include <tdb.h>
27
28#include "common/logging.h"
29
30#include "lib/tdb_wrap/tdb_wrap.h"
31#include "lib/util/tevent_unix.h"
32#include "lib/util/dlinklist.h"
33#include "lib/util/debug.h"
34
35#include "protocol/protocol.h"
36#include "protocol/protocol_api.h"
37#include "client/client_private.h"
38#include "client/client.h"
39
40static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
42 const char *db_name)
43{
44 struct ctdb_db_context *db;
45
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
48 return db;
49 }
50 }
51
52 return NULL;
53}
54
55struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
59 uint32_t db_id;
60 uint8_t db_flags;
61 bool readonly_done, sticky_done;
62 uint32_t *pnn_list;
63 int count;
64};
65
66static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70static struct tevent_req *ctdb_set_db_flags_send(
71 TALLOC_CTX *mem_ctx,
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
76{
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
80
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
83 if (req == NULL) {
84 return NULL;
85 }
86
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88 tevent_req_done(req);
89 return tevent_req_post(req, ev);
90 }
91
92 state->ev = ev;
93 state->client = client;
94 state->timeout = timeout;
95 state->db_id = db_id;
96 state->db_flags = db_flags;
97
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100 &request);
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
103 }
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106 return req;
107}
108
109static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110{
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
118 int ret;
119 bool status;
120
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122 TALLOC_FREE(subreq);
123 if (! status) {
124 tevent_req_error(req, ret);
125 return;
126 }
127
128 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
129 talloc_free(reply);
130 if (ret != 0) {
131 tevent_req_error(req, ret);
132 return;
133 }
134
135 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136 state, &state->pnn_list);
137 talloc_free(nodemap);
138 if (state->count <= 0) {
139 tevent_req_error(req, ENOMEM);
140 return;
141 }
142
143 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144 ctdb_req_control_set_db_readonly(&request, state->db_id);
145 subreq = ctdb_client_control_multi_send(
146 state, state->ev, state->client,
147 state->pnn_list, state->count,
148 state->timeout, &request);
149 if (tevent_req_nomem(subreq, req)) {
150 return;
151 }
152 tevent_req_set_callback(subreq,
153 ctdb_set_db_flags_readonly_done, req);
154 } else {
155 state->readonly_done = true;
156 }
157
158 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159 ctdb_req_control_set_db_sticky(&request, state->db_id);
160 subreq = ctdb_client_control_multi_send(
161 state, state->ev, state->client,
162 state->pnn_list, state->count,
163 state->timeout, &request);
164 if (tevent_req_nomem(subreq, req)) {
165 return;
166 }
167 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
168 req);
169 } else {
170 state->sticky_done = true;
171 }
172}
173
174static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175{
176 struct tevent_req *req = tevent_req_callback_data(
177 subreq, struct tevent_req);
178 struct ctdb_set_db_flags_state *state = tevent_req_data(
179 req, struct ctdb_set_db_flags_state);
180 int ret;
181 bool status;
182
183 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
184 NULL);
185 TALLOC_FREE(subreq);
186 if (! status) {
187 tevent_req_error(req, ret);
188 return;
189 }
190
191 state->readonly_done = true;
192
193 if (state->readonly_done && state->sticky_done) {
194 tevent_req_done(req);
195 }
196}
197
198static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199{
200 struct tevent_req *req = tevent_req_callback_data(
201 subreq, struct tevent_req);
202 struct ctdb_set_db_flags_state *state = tevent_req_data(
203 req, struct ctdb_set_db_flags_state);
204 int ret;
205 bool status;
206
207 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
208 NULL);
209 TALLOC_FREE(subreq);
210 if (! status) {
211 tevent_req_error(req, ret);
212 return;
213 }
214
215 state->sticky_done = true;
216
217 if (state->readonly_done && state->sticky_done) {
218 tevent_req_done(req);
219 }
220}
221
222static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
223{
224 int err;
225
226 if (tevent_req_is_unix_error(req, &err)) {
227 if (perr != NULL) {
228 *perr = err;
229 }
230 return false;
231 }
232 return true;
233}
234
235struct ctdb_attach_state {
236 struct tevent_context *ev;
237 struct ctdb_client_context *client;
238 struct timeval timeout;
239 uint32_t destnode;
240 uint8_t db_flags;
241 uint32_t tdb_flags;
242 struct ctdb_db_context *db;
243};
244
245static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248static void ctdb_attach_health_done(struct tevent_req *subreq);
249static void ctdb_attach_flags_done(struct tevent_req *subreq);
250
251struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252 struct tevent_context *ev,
253 struct ctdb_client_context *client,
254 struct timeval timeout,
255 const char *db_name, uint8_t db_flags)
256{
257 struct tevent_req *req, *subreq;
258 struct ctdb_attach_state *state;
259 struct ctdb_req_control request;
260
261 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
262 if (req == NULL) {
263 return NULL;
264 }
265
266 state->db = client_db_handle(client, db_name);
267 if (state->db != NULL) {
268 tevent_req_done(req);
269 return tevent_req_post(req, ev);
270 }
271
272 state->ev = ev;
273 state->client = client;
274 state->timeout = timeout;
275 state->destnode = ctdb_client_pnn(client);
276 state->db_flags = db_flags;
277
278 state->db = talloc_zero(client, struct ctdb_db_context);
279 if (tevent_req_nomem(state->db, req)) {
280 return tevent_req_post(req, ev);
281 }
282
283 state->db->db_name = talloc_strdup(state->db, db_name);
284 if (tevent_req_nomem(state->db, req)) {
285 return tevent_req_post(req, ev);
286 }
287
288 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289 state->db->persistent = true;
290 }
291
292 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293 subreq = ctdb_client_control_send(state, ev, client,
294 ctdb_client_pnn(client), timeout,
295 &request);
296 if (tevent_req_nomem(subreq, req)) {
297 return tevent_req_post(req, ev);
298 }
299 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
300
301 return req;
302}
303
304static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305{
306 struct tevent_req *req = tevent_req_callback_data(
307 subreq, struct tevent_req);
308 struct ctdb_attach_state *state = tevent_req_data(
309 req, struct ctdb_attach_state);
310 struct ctdb_reply_control *reply;
311 struct ctdb_req_control request;
312 uint32_t mutex_enabled;
313 int ret;
314 bool status;
315
316 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
317 TALLOC_FREE(subreq);
318 if (! status) {
319 tevent_req_error(req, ret);
320 return;
321 }
322
323 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324 if (ret != 0) {
325 /* Treat error as mutex support not available */
326 mutex_enabled = 0;
327 }
328
329 state->tdb_flags = TDB_DEFAULT;
330 if (! state->db->persistent) {
331 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
332 TDB_CLEAR_IF_FIRST);
333 }
334 if (mutex_enabled == 1) {
335 state->tdb_flags |= TDB_MUTEX_LOCKING;
336 }
337
338 if (state->db->persistent) {
339 ctdb_req_control_db_attach_persistent(&request,
340 state->db->db_name,
341 state->tdb_flags);
342 } else {
343 ctdb_req_control_db_attach(&request, state->db->db_name,
344 state->tdb_flags);
345 }
346
347 subreq = ctdb_client_control_send(state, state->ev, state->client,
348 state->destnode, state->timeout,
349 &request);
350 if (tevent_req_nomem(subreq, req)) {
351 return;
352 }
353 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
354}
355
356static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357{
358 struct tevent_req *req = tevent_req_callback_data(
359 subreq, struct tevent_req);
360 struct ctdb_attach_state *state = tevent_req_data(
361 req, struct ctdb_attach_state);
362 struct ctdb_req_control request;
363 struct ctdb_reply_control *reply;
364 bool status;
365 int ret;
366
367 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
368 TALLOC_FREE(subreq);
369 if (! status) {
370 tevent_req_error(req, ret);
371 return;
372 }
373
374 if (state->db->persistent) {
375 ret = ctdb_reply_control_db_attach_persistent(
376 reply, &state->db->db_id);
377 } else {
378 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
379 }
380 talloc_free(reply);
381 if (ret != 0) {
382 tevent_req_error(req, ret);
383 return;
384 }
385
386 ctdb_req_control_getdbpath(&request, state->db->db_id);
387 subreq = ctdb_client_control_send(state, state->ev, state->client,
388 state->destnode, state->timeout,
389 &request);
390 if (tevent_req_nomem(subreq, req)) {
391 return;
392 }
393 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
394}
395
396static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
397{
398 struct tevent_req *req = tevent_req_callback_data(
399 subreq, struct tevent_req);
400 struct ctdb_attach_state *state = tevent_req_data(
401 req, struct ctdb_attach_state);
402 struct ctdb_reply_control *reply;
403 struct ctdb_req_control request;
404 bool status;
405 int ret;
406
407 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
408 TALLOC_FREE(subreq);
409 if (! status) {
410 tevent_req_error(req, ret);
411 return;
412 }
413
414 ret = ctdb_reply_control_getdbpath(reply, state->db,
415 &state->db->db_path);
416 talloc_free(reply);
417 if (ret != 0) {
418 tevent_req_error(req, ret);
419 return;
420 }
421
422 ctdb_req_control_db_get_health(&request, state->db->db_id);
423 subreq = ctdb_client_control_send(state, state->ev, state->client,
424 state->destnode, state->timeout,
425 &request);
426 if (tevent_req_nomem(subreq, req)) {
427 return;
428 }
429 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
430}
431
432static void ctdb_attach_health_done(struct tevent_req *subreq)
433{
434 struct tevent_req *req = tevent_req_callback_data(
435 subreq, struct tevent_req);
436 struct ctdb_attach_state *state = tevent_req_data(
437 req, struct ctdb_attach_state);
438 struct ctdb_reply_control *reply;
439 const char *reason;
440 bool status;
441 int ret;
442
443 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
444 TALLOC_FREE(subreq);
445 if (! status) {
446 tevent_req_error(req, ret);
447 return;
448 }
449
450 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
451 if (ret != 0) {
452 tevent_req_error(req, ret);
453 return;
454 }
455
456 if (reason != NULL) {
457 /* Database unhealthy, avoid attach */
458 /* FIXME: Log here */
459 tevent_req_error(req, EIO);
460 return;
461 }
462
463 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464 state->destnode, state->timeout,
465 state->db->db_id, state->db_flags);
466 if (tevent_req_nomem(subreq, req)) {
467 return;
468 }
469 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
470}
471
472static void ctdb_attach_flags_done(struct tevent_req *subreq)
473{
474 struct tevent_req *req = tevent_req_callback_data(
475 subreq, struct tevent_req);
476 struct ctdb_attach_state *state = tevent_req_data(
477 req, struct ctdb_attach_state);
478 bool status;
479 int ret;
480
481 status = ctdb_set_db_flags_recv(subreq, &ret);
482 TALLOC_FREE(subreq);
483 if (! status) {
484 tevent_req_error(req, ret);
485 return;
486 }
487
488 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489 state->tdb_flags, O_RDWR, 0);
490 if (tevent_req_nomem(state->db->ltdb, req)) {
491 return;
492 }
493 DLIST_ADD(state->client->db, state->db);
494
495 tevent_req_done(req);
496}
497
498bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499 struct ctdb_db_context **out)
500{
501 struct ctdb_attach_state *state = tevent_req_data(
502 req, struct ctdb_attach_state);
503 int err;
504
505 if (tevent_req_is_unix_error(req, &err)) {
506 if (perr != NULL) {
507 *perr = err;
508 }
509 return false;
510 }
511
512 if (out != NULL) {
513 *out = state->db;
514 }
515 return true;
516}
517
518int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
519 struct ctdb_client_context *client,
520 struct timeval timeout,
521 const char *db_name, uint8_t db_flags,
522 struct ctdb_db_context **out)
523{
524 struct tevent_req *req;
525 bool status;
526 int ret;
527
528 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
529 db_name, db_flags);
530 if (req == NULL) {
531 return ENOMEM;
532 }
533
534 tevent_req_poll(req, ev);
535
536 status = ctdb_attach_recv(req, &ret, out);
537 if (! status) {
538 return ret;
539 }
540
541 /*
542 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
543 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
544 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
545 */
546
547 return 0;
548}
549
550int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
551 struct ctdb_client_context *client,
552 struct timeval timeout, uint32_t db_id)
553{
554 struct ctdb_db_context *db;
555 int ret;
556
557 ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
558 db_id);
559 if (ret != 0) {
560 return ret;
561 }
562
563 for (db = client->db; db != NULL; db = db->next) {
564 if (db->db_id == db_id) {
565 DLIST_REMOVE(client->db, db);
566 break;
567 }
568 }
569
570 return 0;
571}
572
573uint32_t ctdb_db_id(struct ctdb_db_context *db)
574{
575 return db->db_id;
576}
577
578struct ctdb_db_traverse_state {
579 ctdb_rec_parser_func_t parser;
580 void *private_data;
581 bool extract_header;
582 int error;
583};
584
585static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
586 TDB_DATA data, void *private_data)
587{
588 struct ctdb_db_traverse_state *state =
589 (struct ctdb_db_traverse_state *)private_data;
590 int ret;
591
592 if (state->extract_header) {
593 struct ctdb_ltdb_header header;
594
595 ret = ctdb_ltdb_header_extract(&data, &header);
596 if (ret != 0) {
597 state->error = ret;
598 return 1;
599 }
600
601 ret = state->parser(0, &header, key, data, state->private_data);
602 } else {
603 ret = state->parser(0, NULL, key, data, state->private_data);
604 }
605
606 if (ret != 0) {
607 state->error = ret;
608 return 1;
609 }
610
611 return 0;
612}
613
614int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
615 bool extract_header,
616 ctdb_rec_parser_func_t parser, void *private_data)
617{
618 struct ctdb_db_traverse_state state;
619 int ret;
620
621 state.parser = parser;
622 state.private_data = private_data;
623 state.extract_header = extract_header;
624 state.error = 0;
625
626 if (readonly) {
627 ret = tdb_traverse_read(db->ltdb->tdb,
628 ctdb_db_traverse_handler, &state);
629 } else {
630 ret = tdb_traverse(db->ltdb->tdb,
631 ctdb_db_traverse_handler, &state);
632 }
633
634 if (ret == -1) {
635 return EIO;
636 }
637
638 return state.error;
639}
640
641static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
642 struct ctdb_ltdb_header *header,
643 TALLOC_CTX *mem_ctx, TDB_DATA *data)
644{
645 TDB_DATA rec;
646 int ret;
647
648 rec = tdb_fetch(db->ltdb->tdb, key);
649 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
650 /* No record present */
651 if (rec.dptr != NULL) {
652 free(rec.dptr);
653 }
654
655 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
656 return EIO;
657 }
658
659 header->rsn = 0;
660 header->dmaster = CTDB_UNKNOWN_PNN;
661 header->flags = 0;
662
663 if (data != NULL) {
664 *data = tdb_null;
665 }
666 return 0;
667 }
668
669 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
670 if (ret != 0) {
671 return ret;
672 }
673
674 ret = 0;
675 if (data != NULL) {
676 size_t offset = ctdb_ltdb_header_len(header);
677
678 data->dsize = rec.dsize - offset;
679 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
680 data->dsize);
681 if (data->dptr == NULL) {
682 ret = ENOMEM;
683 }
684 }
685
686 free(rec.dptr);
687 return ret;
688}
689
690/*
691 * Fetch a record from volatile database
692 *
693 * Steps:
694 * 1. Get a lock on the hash chain
695 * 2. If the record does not exist, migrate the record
696 * 3. If readonly=true and delegations do not exist, migrate the record.
697 * 4. If readonly=false and delegations exist, migrate the record.
698 * 5. If the local node is not dmaster, migrate the record.
699 * 6. Return record
700 */
701
702struct ctdb_fetch_lock_state {
703 struct tevent_context *ev;
704 struct ctdb_client_context *client;
705 struct ctdb_record_handle *h;
706 bool readonly;
707 uint32_t pnn;
708};
709
710static int ctdb_fetch_lock_check(struct tevent_req *req);
711static void ctdb_fetch_lock_migrate(struct tevent_req *req);
712static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
713
714struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
715 struct tevent_context *ev,
716 struct ctdb_client_context *client,
717 struct ctdb_db_context *db,
718 TDB_DATA key, bool readonly)
719{
720 struct ctdb_fetch_lock_state *state;
721 struct tevent_req *req;
722 int ret;
723
724 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
725 if (req == NULL) {
726 return NULL;
727 }
728
729 state->ev = ev;
730 state->client = client;
731
732 state->h = talloc_zero(db, struct ctdb_record_handle);
733 if (tevent_req_nomem(state->h, req)) {
734 return tevent_req_post(req, ev);
735 }
736 state->h->client = client;
737 state->h->db = db;
738 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
739 if (tevent_req_nomem(state->h->key.dptr, req)) {
740 return tevent_req_post(req, ev);
741 }
742 state->h->key.dsize = key.dsize;
743 state->h->readonly = false;
744
745 state->readonly = readonly;
746 state->pnn = ctdb_client_pnn(client);
747
748 /* Check that database is not persistent */
749 if (db->persistent) {
750 tevent_req_error(req, EINVAL);
751 return tevent_req_post(req, ev);
752 }
753
754 ret = ctdb_fetch_lock_check(req);
755 if (ret == 0) {
756 tevent_req_done(req);
757 return tevent_req_post(req, ev);
758 }
759 if (ret != EAGAIN) {
760 tevent_req_error(req, ret);
761 return tevent_req_post(req, ev);
762 }
763 return req;
764}
765
766static int ctdb_fetch_lock_check(struct tevent_req *req)
767{
768 struct ctdb_fetch_lock_state *state = tevent_req_data(
769 req, struct ctdb_fetch_lock_state);
770 struct ctdb_record_handle *h = state->h;
771 struct ctdb_ltdb_header header;
772 TDB_DATA data = tdb_null;
773 int ret, err = 0;
774 bool do_migrate = false;
775
776 ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
777 if (ret != 0) {
778 err = EIO;
779 goto failed;
780 }
781
782 data = tdb_fetch(h->db->ltdb->tdb, h->key);
783 if (data.dptr == NULL) {
784 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
785 goto migrate;
786 } else {
787 err = EIO;
788 goto failed;
789 }
790 }
791
792 /* Got the record */
793 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
794 if (ret != 0) {
795 err = ret;
796 goto failed;
797 }
798
799 if (! state->readonly) {
800 /* Read/write access */
801 if (header.dmaster == state->pnn &&
802 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
803 goto migrate;
804 }
805
806 if (header.dmaster != state->pnn) {
807 goto migrate;
808 }
809 } else {
810 /* Readonly access */
811 if (header.dmaster != state->pnn &&
812 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
813 CTDB_REC_RO_HAVE_DELEGATIONS))) {
814 goto migrate;
815 }
816 }
817
818 /* We are the dmaster or readonly delegation */
819 h->header = header;
820 h->data = data;
821 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
822 CTDB_REC_RO_HAVE_DELEGATIONS)) {
823 h->readonly = true;
824 }
825 return 0;
826
827migrate:
828 do_migrate = true;
829 err = EAGAIN;
830
831failed:
832 if (data.dptr != NULL) {
833 free(data.dptr);
834 }
835 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
836 if (ret != 0) {
837 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
838 h->db->db_name));
839 return EIO;
840 }
841
842 if (do_migrate) {
843 ctdb_fetch_lock_migrate(req);
844 }
845 return err;
846}
847
848static void ctdb_fetch_lock_migrate(struct tevent_req *req)
849{
850 struct ctdb_fetch_lock_state *state = tevent_req_data(
851 req, struct ctdb_fetch_lock_state);
852 struct ctdb_req_call request;
853 struct tevent_req *subreq;
854
855 ZERO_STRUCT(request);
856 request.flags = CTDB_IMMEDIATE_MIGRATION;
857 if (state->readonly) {
858 request.flags |= CTDB_WANT_READONLY;
859 }
860 request.db_id = state->h->db->db_id;
861 request.callid = CTDB_NULL_FUNC;
862 request.key = state->h->key;
863
864 subreq = ctdb_client_call_send(state, state->ev, state->client,
865 &request);
866 if (tevent_req_nomem(subreq, req)) {
867 return;
868 }
869
870 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
871}
872
873static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
874{
875 struct tevent_req *req = tevent_req_callback_data(
876 subreq, struct tevent_req);
877 struct ctdb_fetch_lock_state *state = tevent_req_data(
878 req, struct ctdb_fetch_lock_state);
879 struct ctdb_reply_call *reply;
880 int ret;
881 bool status;
882
883 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
884 TALLOC_FREE(subreq);
885 if (! status) {
886 tevent_req_error(req, ret);
887 return;
888 }
889
890 if (reply->status != 0) {
891 tevent_req_error(req, EIO);
892 return;
893 }
894 talloc_free(reply);
895
896 ret = ctdb_fetch_lock_check(req);
897 if (ret != 0) {
898 tevent_req_error(req, ret);
899 return;
900 }
901
902 tevent_req_done(req);
903}
904
905static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
906{
907 tdb_chainunlock(h->db->ltdb->tdb, h->key);
908 free(h->data.dptr);
909 return 0;
910}
911
912struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
913 struct ctdb_ltdb_header *header,
914 TALLOC_CTX *mem_ctx,
915 TDB_DATA *data, int *perr)
916{
917 struct ctdb_fetch_lock_state *state = tevent_req_data(
918 req, struct ctdb_fetch_lock_state);
919 struct ctdb_record_handle *h = state->h;
920 int err;
921
922 if (tevent_req_is_unix_error(req, &err)) {
923 if (perr != NULL) {
924 *perr = err;
925 }
926 return NULL;
927 }
928
929 if (header != NULL) {
930 *header = h->header;
931 }
932 if (data != NULL) {
933 size_t offset;
934
935 offset = ctdb_ltdb_header_len(&h->header);
936
937 data->dsize = h->data.dsize - offset;
938 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
939 data->dsize);
940 if (data->dptr == NULL) {
941 TALLOC_FREE(state->h);
942 if (perr != NULL) {
943 *perr = ENOMEM;
944 }
945 return NULL;
946 }
947 }
948
949 talloc_set_destructor(h, ctdb_record_handle_destructor);
950 return h;
951}
952
953int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
954 struct ctdb_client_context *client,
955 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
956 struct ctdb_record_handle **out,
957 struct ctdb_ltdb_header *header, TDB_DATA *data)
958{
959 struct tevent_req *req;
960 struct ctdb_record_handle *h;
961 int ret;
962
963 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
964 if (req == NULL) {
965 return ENOMEM;
966 }
967
968 tevent_req_poll(req, ev);
969
970 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
971 if (h == NULL) {
972 return ret;
973 }
974
975 *out = h;
976 return 0;
977}
978
979int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
980{
981 TDB_DATA rec;
982 size_t offset;
983 int ret;
984
985 /* Cannot modify the record if it was obtained as a readonly copy */
986 if (h->readonly) {
987 return EINVAL;
988 }
989
990 /* Check if the new data is same */
991 if (h->data.dsize == data.dsize &&
992 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
993 /* No need to do anything */
994 return 0;
995 }
996
997 offset = ctdb_ltdb_header_len(&h->header);
998 rec.dsize = offset + data.dsize;
999 rec.dptr = talloc_size(h, rec.dsize);
1000 if (rec.dptr == NULL) {
1001 return ENOMEM;
1002 }
1003
1004 ctdb_ltdb_header_push(&h->header, rec.dptr);
1005 memcpy(rec.dptr + offset, data.dptr, data.dsize);
1006
1007 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1008 if (ret != 0) {
1009 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1010 h->db->db_name));
1011 return EIO;
1012 }
1013
1014 talloc_free(rec.dptr);
1015 return 0;
1016}
1017
1018int ctdb_delete_record(struct ctdb_record_handle *h)
1019{
1020 TDB_DATA rec;
1021 struct ctdb_key_data key;
1022 int ret;
1023
1024 /* Cannot delete the record if it was obtained as a readonly copy */
1025 if (h->readonly) {
1026 return EINVAL;
1027 }
1028
1029 rec.dsize = ctdb_ltdb_header_len(&h->header);
1030 rec.dptr = talloc_size(h, rec.dsize);
1031 if (rec.dptr == NULL) {
1032 return ENOMEM;
1033 }
1034
1035 ctdb_ltdb_header_push(&h->header, rec.dptr);
1036
1037 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1038 talloc_free(rec.dptr);
1039 if (ret != 0) {
1040 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1041 h->db->db_name));
1042 return EIO;
1043 }
1044
1045 key.db_id = h->db->db_id;
1046 key.header = h->header;
1047 key.key = h->key;
1048
1049 ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1050 h->client->pnn,
1051 tevent_timeval_zero(), &key);
1052 if (ret != 0) {
1053 DEBUG(DEBUG_WARNING,
1054 ("Failed to mark record to be deleted in DB %s\n",
1055 h->db->db_name));
1056 return ret;
1057 }
1058
1059 return 0;
1060}
1061
1062/*
1063 * Global lock functions
1064 */
1065
1066struct ctdb_g_lock_lock_state {
1067 struct tevent_context *ev;
1068 struct ctdb_client_context *client;
1069 struct ctdb_db_context *db;
1070 TDB_DATA key;
1071 struct ctdb_server_id my_sid;
1072 enum ctdb_g_lock_type lock_type;
1073 struct ctdb_record_handle *h;
1074 /* state for verification of active locks */
1075 struct ctdb_g_lock_list *lock_list;
1076 unsigned int current;
1077};
1078
1079static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1080static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1081static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1082static int ctdb_g_lock_lock_update(struct tevent_req *req);
1083static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1084
1085static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1086 enum ctdb_g_lock_type l2)
1087{
1088 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1089 return false;
1090 }
1091 return true;
1092}
1093
1094struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1095 struct tevent_context *ev,
1096 struct ctdb_client_context *client,
1097 struct ctdb_db_context *db,
1098 const char *keyname,
1099 struct ctdb_server_id *sid,
1100 bool readonly)
1101{
1102 struct tevent_req *req, *subreq;
1103 struct ctdb_g_lock_lock_state *state;
1104
1105 req = tevent_req_create(mem_ctx, &state,
1106 struct ctdb_g_lock_lock_state);
1107 if (req == NULL) {
1108 return NULL;
1109 }
1110
1111 state->ev = ev;
1112 state->client = client;
1113 state->db = db;
1114 state->key.dptr = discard_const(keyname);
1115 state->key.dsize = strlen(keyname) + 1;
1116 state->my_sid = *sid;
1117 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1118
1119 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1120 false);
1121 if (tevent_req_nomem(subreq, req)) {
1122 return tevent_req_post(req, ev);
1123 }
1124 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1125
1126 return req;
1127}
1128
1129static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1130{
1131 struct tevent_req *req = tevent_req_callback_data(
1132 subreq, struct tevent_req);
1133 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1134 req, struct ctdb_g_lock_lock_state);
1135 TDB_DATA data;
1136 int ret = 0;
1137
1138 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1139 TALLOC_FREE(subreq);
1140 if (state->h == NULL) {
1141 tevent_req_error(req, ret);
1142 return;
1143 }
1144
1145 if (state->lock_list != NULL) {
1146 TALLOC_FREE(state->lock_list);
1147 state->current = 0;
1148 }
1149
1150 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1151 &state->lock_list);
1152 talloc_free(data.dptr);
1153 if (ret != 0) {
1154 tevent_req_error(req, ret);
1155 return;
1156 }
1157
1158 ctdb_g_lock_lock_process_locks(req);
1159}
1160
1161static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1162{
1163 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1164 req, struct ctdb_g_lock_lock_state);
1165 struct tevent_req *subreq;
1166 struct ctdb_g_lock *lock;
1167 bool check_server = false;
1168 int ret;
1169
1170 while (state->current < state->lock_list->num) {
1171 lock = &state->lock_list->lock[state->current];
1172
1173 /* We should not ask for the same lock more than once */
1174 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1175 tevent_req_error(req, EDEADLK);
1176 return;
1177 }
1178
1179 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1180 check_server = true;
1181 break;
1182 }
1183
1184 state->current += 1;
1185 }
1186
1187 if (check_server) {
1188 struct ctdb_req_control request;
1189 struct ctdb_uint64_array u64_array;
1190
1191 u64_array.num = 1;
1192 u64_array.val = &lock->sid.unique_id;
1193
1194 ctdb_req_control_check_srvids(&request, &u64_array);
1195 subreq = ctdb_client_control_send(state, state->ev,
1196 state->client,
1197 state->client->pnn,
1198 tevent_timeval_zero(),
1199 &request);
1200 if (tevent_req_nomem(subreq, req)) {
1201 return;
1202 }
1203 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1204 return;
1205 }
1206
1207 /* There is no conflict, add ourself to the lock_list */
1208 state->lock_list->lock = talloc_realloc(state->lock_list,
1209 state->lock_list->lock,
1210 struct ctdb_g_lock,
1211 state->lock_list->num + 1);
1212 if (state->lock_list->lock == NULL) {
1213 tevent_req_error(req, ENOMEM);
1214 return;
1215 }
1216
1217 lock = &state->lock_list->lock[state->lock_list->num];
1218 lock->type = state->lock_type;
1219 lock->sid = state->my_sid;
1220 state->lock_list->num += 1;
1221
1222 ret = ctdb_g_lock_lock_update(req);
1223 if (ret != 0) {
1224 tevent_req_error(req, ret);
1225 return;
1226 }
1227
1228 tevent_req_done(req);
1229}
1230
1231static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1232{
1233 struct tevent_req *req = tevent_req_callback_data(
1234 subreq, struct tevent_req);
1235 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1236 req, struct ctdb_g_lock_lock_state);
1237 struct ctdb_reply_control *reply;
1238 struct ctdb_uint8_array *u8_array;
1239 int ret;
1240 bool status;
1241 int8_t val;
1242
1243 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1244 TALLOC_FREE(subreq);
1245 if (! status) {
1246 tevent_req_error(req, ret);
1247 return;
1248 }
1249
1250 ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1251 if (ret != 0) {
1252 tevent_req_error(req, ENOMEM);
1253 return;
1254 }
1255
1256 if (u8_array->num != 1) {
1257 talloc_free(u8_array);
1258 tevent_req_error(req, EIO);
1259 return;
1260 }
1261
1262 val = u8_array->val[0];
1263 talloc_free(u8_array);
1264
1265 if (val == 1) {
1266 /* server process exists, need to retry */
1267 subreq = tevent_wakeup_send(state, state->ev,
1268 tevent_timeval_current_ofs(1,0));
1269 if (tevent_req_nomem(subreq, req)) {
1270 return;
1271 }
1272 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1273 return;
1274 }
1275
1276 /* server process does not exist, remove conflicting entry */
1277 state->lock_list->lock[state->current] =
1278 state->lock_list->lock[state->lock_list->num-1];
1279 state->lock_list->num -= 1;
1280
1281 ret = ctdb_g_lock_lock_update(req);
1282 if (ret != 0) {
1283 tevent_req_error(req, ret);
1284 return;
1285 }
1286
1287 ctdb_g_lock_lock_process_locks(req);
1288}
1289
1290static int ctdb_g_lock_lock_update(struct tevent_req *req)
1291{
1292 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1293 req, struct ctdb_g_lock_lock_state);
1294 TDB_DATA data;
1295 int ret;
1296
1297 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1298 data.dptr = talloc_size(state, data.dsize);
1299 if (data.dptr == NULL) {
1300 return ENOMEM;
1301 }
1302
1303 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1304 ret = ctdb_store_record(state->h, data);
1305 talloc_free(data.dptr);
1306 return ret;
1307}
1308
1309#if 0
1310static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1311 struct ctdb_g_lock_list *lock_list,
1312 struct ctdb_record_handle *h)
1313{
1314 struct ctdb_g_lock *lock;
1315 bool conflict = false;
1316 bool modified = false;
1317 int ret, i;
1318
1319 for (i=0; i<lock_list->num; i++) {
1320 lock = &lock_list->lock[i];
1321
1322 /* We should not ask for lock more than once */
1323 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1324 return EDEADLK;
1325 }
1326
1327 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1328 bool exists;
1329
1330 conflict = true;
1331 ret = ctdb_server_id_exists(state->client, &lock->sid,
1332 &exists);
1333 if (ret != 0) {
1334 return ret;
1335 }
1336
1337 if (exists) {
1338 break;
1339 }
1340
1341 /* Server does not exist, delete conflicting entry */
1342 lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1343 lock_list->num -= 1;
1344 modified = true;
1345 }
1346 }
1347
1348 if (! conflict) {
1349 lock = talloc_realloc(lock_list, lock_list->lock,
1350 struct ctdb_g_lock, lock_list->num+1);
1351 if (lock == NULL) {
1352 return ENOMEM;
1353 }
1354
1355 lock[lock_list->num].type = state->lock_type;
1356 lock[lock_list->num].sid = state->my_sid;
1357 lock_list->lock = lock;
1358 lock_list->num += 1;
1359 modified = true;
1360 }
1361
1362 if (modified) {
1363 TDB_DATA data;
1364
1365 data.dsize = ctdb_g_lock_list_len(lock_list);
1366 data.dptr = talloc_size(state, data.dsize);
1367 if (data.dptr == NULL) {
1368 return ENOMEM;
1369 }
1370
1371 ctdb_g_lock_list_push(lock_list, data.dptr);
1372 ret = ctdb_store_record(h, data);
1373 talloc_free(data.dptr);
1374 if (ret != 0) {
1375 return ret;
1376 }
1377 }
1378
1379 if (conflict) {
1380 return EAGAIN;
1381 }
1382 return 0;
1383}
1384#endif
1385
1386static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1387{
1388 struct tevent_req *req = tevent_req_callback_data(
1389 subreq, struct tevent_req);
1390 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1391 req, struct ctdb_g_lock_lock_state);
1392 bool success;
1393
1394 success = tevent_wakeup_recv(subreq);
1395 TALLOC_FREE(subreq);
1396 if (! success) {
1397 tevent_req_error(req, ENOMEM);
1398 return;
1399 }
1400
1401 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1402 state->db, state->key, false);
1403 if (tevent_req_nomem(subreq, req)) {
1404 return;
1405 }
1406 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1407}
1408
1409bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1410{
1411 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1412 req, struct ctdb_g_lock_lock_state);
1413 int err;
1414
1415 TALLOC_FREE(state->h);
1416
1417 if (tevent_req_is_unix_error(req, &err)) {
1418 if (perr != NULL) {
1419 *perr = err;
1420 }
1421 return false;
1422 }
1423
1424 return true;
1425}
1426
1427struct ctdb_g_lock_unlock_state {
1428 struct tevent_context *ev;
1429 struct ctdb_client_context *client;
1430 struct ctdb_db_context *db;
1431 TDB_DATA key;
1432 struct ctdb_server_id my_sid;
1433 struct ctdb_record_handle *h;
1434 struct ctdb_g_lock_list *lock_list;
1435};
1436
1437static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1438static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1439
1440struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1441 struct tevent_context *ev,
1442 struct ctdb_client_context *client,
1443 struct ctdb_db_context *db,
1444 const char *keyname,
1445 struct ctdb_server_id sid)
1446{
1447 struct tevent_req *req, *subreq;
1448 struct ctdb_g_lock_unlock_state *state;
1449
1450 req = tevent_req_create(mem_ctx, &state,
1451 struct ctdb_g_lock_unlock_state);
1452 if (req == NULL) {
1453 return NULL;
1454 }
1455
1456 state->ev = ev;
1457 state->client = client;
1458 state->db = db;
1459 state->key.dptr = discard_const(keyname);
1460 state->key.dsize = strlen(keyname) + 1;
1461 state->my_sid = sid;
1462
1463 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1464 false);
1465 if (tevent_req_nomem(subreq, req)) {
1466 return tevent_req_post(req, ev);
1467 }
1468 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1469
1470 return req;
1471}
1472
1473static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1474{
1475 struct tevent_req *req = tevent_req_callback_data(
1476 subreq, struct tevent_req);
1477 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1478 req, struct ctdb_g_lock_unlock_state);
1479 TDB_DATA data;
1480 int ret = 0;
1481
1482 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1483 TALLOC_FREE(subreq);
1484 if (state->h == NULL) {
1485 tevent_req_error(req, ret);
1486 return;
1487 }
1488
1489 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1490 &state->lock_list);
1491 if (ret != 0) {
1492 tevent_req_error(req, ret);
1493 return;
1494 }
1495
1496 ret = ctdb_g_lock_unlock_update(req);
1497 if (ret != 0) {
1498 tevent_req_error(req, ret);
1499 return;
1500 }
1501
1502 tevent_req_done(req);
1503}
1504
1505static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1506{
1507 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1508 req, struct ctdb_g_lock_unlock_state);
1509 struct ctdb_g_lock *lock;
1510 int ret, i;
1511
1512 for (i=0; i<state->lock_list->num; i++) {
1513 lock = &state->lock_list->lock[i];
1514
1515 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1516 break;
1517 }
1518 }
1519
1520 if (i < state->lock_list->num) {
1521 state->lock_list->lock[i] =
1522 state->lock_list->lock[state->lock_list->num-1];
1523 state->lock_list->num -= 1;
1524 }
1525
1526 if (state->lock_list->num == 0) {
1527 ctdb_delete_record(state->h);
1528 } else {
1529 TDB_DATA data;
1530
1531 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1532 data.dptr = talloc_size(state, data.dsize);
1533 if (data.dptr == NULL) {
1534 return ENOMEM;
1535 }
1536
1537 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1538 ret = ctdb_store_record(state->h, data);
1539 talloc_free(data.dptr);
1540 if (ret != 0) {
1541 return ret;
1542 }
1543 }
1544
1545 return 0;
1546}
1547
1548bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1549{
1550 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1551 req, struct ctdb_g_lock_unlock_state);
1552 int err;
1553
1554 TALLOC_FREE(state->h);
1555
1556 if (tevent_req_is_unix_error(req, &err)) {
1557 if (perr != NULL) {
1558 *perr = err;
1559 }
1560 return false;
1561 }
1562
1563 return true;
1564}
1565
1566/*
1567 * Persistent database functions
1568 */
1569struct ctdb_transaction_start_state {
1570 struct tevent_context *ev;
1571 struct ctdb_client_context *client;
1572 struct timeval timeout;
1573 struct ctdb_transaction_handle *h;
1574 uint32_t destnode;
1575};
1576
1577static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1578static void ctdb_transaction_register_done(struct tevent_req *subreq);
1579static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1580static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1581
1582struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1583 struct tevent_context *ev,
1584 struct ctdb_client_context *client,
1585 struct timeval timeout,
1586 struct ctdb_db_context *db,
1587 bool readonly)
1588{
1589 struct ctdb_transaction_start_state *state;
1590 struct tevent_req *req, *subreq;
1591 struct ctdb_transaction_handle *h;
1592
1593 req = tevent_req_create(mem_ctx, &state,
1594 struct ctdb_transaction_start_state);
1595 if (req == NULL) {
1596 return NULL;
1597 }
1598
1599 if (! db->persistent) {
1600 tevent_req_error(req, EINVAL);
1601 return tevent_req_post(req, ev);
1602 }
1603
1604 state->ev = ev;
1605 state->client = client;
1606 state->destnode = ctdb_client_pnn(client);
1607
1608 h = talloc_zero(db, struct ctdb_transaction_handle);
1609 if (tevent_req_nomem(h, req)) {
1610 return tevent_req_post(req, ev);
1611 }
1612
1613 h->ev = ev;
1614 h->client = client;
1615 h->db = db;
1616 h->readonly = readonly;
1617 h->updated = false;
1618
1619 /* SRVID is unique for databases, so client can have transactions active
1620 * for multiple databases */
1621 h->sid.pid = getpid();
1622 h->sid.task_id = db->db_id;
1623 h->sid.vnn = state->destnode;
1624 h->sid.unique_id = h->sid.task_id;
1625 h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1626
1627 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1628 if (tevent_req_nomem(h->recbuf, req)) {
1629 return tevent_req_post(req, ev);
1630 }
1631
1632 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1633 if (tevent_req_nomem(h->lock_name, req)) {
1634 return tevent_req_post(req, ev);
1635 }
1636
1637 state->h = h;
1638
1639 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1640 if (tevent_req_nomem(subreq, req)) {
1641 return tevent_req_post(req, ev);
1642 }
1643 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1644
1645 return req;
1646}
1647
1648static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1649{
1650 struct tevent_req *req = tevent_req_callback_data(
1651 subreq, struct tevent_req);
1652 struct ctdb_transaction_start_state *state = tevent_req_data(
1653 req, struct ctdb_transaction_start_state);
1654 struct ctdb_req_control request;
1655 bool status;
1656 int ret;
1657
1658 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1659 TALLOC_FREE(subreq);
1660 if (! status) {
1661 tevent_req_error(req, ret);
1662 return;
1663 }
1664
1665 ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1666 subreq = ctdb_client_control_send(state, state->ev, state->client,
1667 state->destnode, state->timeout,
1668 &request);
1669 if (tevent_req_nomem(subreq, req)) {
1670 return;
1671 }
1672 tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1673}
1674
1675static void ctdb_transaction_register_done(struct tevent_req *subreq)
1676{
1677 struct tevent_req *req = tevent_req_callback_data(
1678 subreq, struct tevent_req);
1679 struct ctdb_transaction_start_state *state = tevent_req_data(
1680 req, struct ctdb_transaction_start_state);
1681 struct ctdb_reply_control *reply;
1682 bool status;
1683 int ret;
1684
1685 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1686 TALLOC_FREE(subreq);
1687 if (! status) {
1688 tevent_req_error(req, ret);
1689 return;
1690 }
1691
1692 ret = ctdb_reply_control_register_srvid(reply);
1693 talloc_free(reply);
1694 if (ret != 0) {
1695 tevent_req_error(req, ret);
1696 return;
1697 }
1698
1699 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1700 state->h->db_g_lock, state->h->lock_name,
1701 &state->h->sid, state->h->readonly);
1702 if (tevent_req_nomem(subreq, req)) {
1703 return;
1704 }
1705 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1706}
1707
1708static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1709{
1710 struct tevent_req *req = tevent_req_callback_data(
1711 subreq, struct tevent_req);
1712 int ret;
1713 bool status;
1714
1715 status = ctdb_g_lock_lock_recv(subreq, &ret);
1716 TALLOC_FREE(subreq);
1717 if (! status) {
1718 tevent_req_error(req, ret);
1719 return;
1720 }
1721
1722 tevent_req_done(req);
1723}
1724
1725struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1726 struct tevent_req *req,
1727 int *perr)
1728{
1729 struct ctdb_transaction_start_state *state = tevent_req_data(
1730 req, struct ctdb_transaction_start_state);
1731 struct ctdb_transaction_handle *h = state->h;
1732 int err;
1733
1734 if (tevent_req_is_unix_error(req, &err)) {
1735 if (perr != NULL) {
1736 *perr = err;
1737 }
1738 return NULL;
1739 }
1740
1741 talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1742 return h;
1743}
1744
1745static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1746{
1747 int ret;
1748
1749 ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1750 tevent_timeval_zero(),
1751 h->sid.unique_id);
1752 if (ret != 0) {
1753 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1754 }
1755
1756 return 0;
1757}
1758
1759int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1760 struct ctdb_client_context *client,
1761 struct timeval timeout,
1762 struct ctdb_db_context *db, bool readonly,
1763 struct ctdb_transaction_handle **out)
1764{
1765 struct tevent_req *req;
1766 struct ctdb_transaction_handle *h;
1767 int ret;
1768
1769 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1770 readonly);
1771 if (req == NULL) {
1772 return ENOMEM;
1773 }
1774
1775 tevent_req_poll(req, ev);
1776
1777 h = ctdb_transaction_start_recv(req, &ret);
1778 if (h == NULL) {
1779 return ret;
1780 }
1781
1782 *out = h;
1783 return 0;
1784}
1785
1786struct ctdb_transaction_record_fetch_state {
1787 TDB_DATA key, data;
1788 struct ctdb_ltdb_header header;
1789 bool found;
1790};
1791
1792static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1793 struct ctdb_ltdb_header *header,
1794 TDB_DATA key,
1795 TDB_DATA data,
1796 void *private_data)
1797{
1798 struct ctdb_transaction_record_fetch_state *state =
1799 (struct ctdb_transaction_record_fetch_state *)private_data;
1800
1801 if (state->key.dsize == key.dsize &&
1802 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1803 state->data = data;
1804 state->header = *header;
1805 state->found = true;
1806 }
1807
1808 return 0;
1809}
1810
1811static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1812 TDB_DATA key,
1813 struct ctdb_ltdb_header *header,
1814 TDB_DATA *data)
1815{
1816 struct ctdb_transaction_record_fetch_state state;
1817 int ret;
1818
1819 state.key = key;
1820 state.found = false;
1821
1822 ret = ctdb_rec_buffer_traverse(h->recbuf,
1823 ctdb_transaction_record_fetch_traverse,
1824 &state);
1825 if (ret != 0) {
1826 return ret;
1827 }
1828
1829 if (state.found) {
1830 if (header != NULL) {
1831 *header = state.header;
1832 }
1833 if (data != NULL) {
1834 *data = state.data;
1835 }
1836 return 0;
1837 }
1838
1839 return ENOENT;
1840}
1841
1842int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1843 TDB_DATA key,
1844 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1845{
1846 TDB_DATA tmp_data;
1847 struct ctdb_ltdb_header header;
1848 int ret;
1849
1850 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1851 if (ret == 0) {
1852 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1853 tmp_data.dsize);
1854 if (data->dptr == NULL) {
1855 return ENOMEM;
1856 }
1857 data->dsize = tmp_data.dsize;
1858 return 0;
1859 }
1860
1861 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1862 if (ret != 0) {
1863 return ret;
1864 }
1865
1866 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1867 if (ret != 0) {
1868 return ret;
1869 }
1870
1871 return 0;
1872}
1873
1874int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1875 TDB_DATA key, TDB_DATA data)
1876{
1877 TALLOC_CTX *tmp_ctx;
1878 struct ctdb_ltdb_header header;
1879 TDB_DATA old_data;
1880 int ret;
1881
1882 if (h->readonly) {
1883 return EINVAL;
1884 }
1885
1886 tmp_ctx = talloc_new(h);
1887 if (tmp_ctx == NULL) {
1888 return ENOMEM;
1889 }
1890
1891 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1892 if (ret != 0) {
1893 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1894 if (ret != 0) {
1895 return ret;
1896 }
1897 }
1898
1899 if (old_data.dsize == data.dsize &&
1900 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1901 talloc_free(tmp_ctx);
1902 return 0;
1903 }
1904
1905 header.dmaster = ctdb_client_pnn(h->client);
1906 header.rsn += 1;
1907
1908 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1909 talloc_free(tmp_ctx);
1910 if (ret != 0) {
1911 return ret;
1912 }
1913 h->updated = true;
1914
1915 return 0;
1916}
1917
1918int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1919 TDB_DATA key)
1920{
1921 return ctdb_transaction_store_record(h, key, tdb_null);
1922}
1923
1924static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1925 uint64_t seqnum)
1926{
1927 const char *keyname = CTDB_DB_SEQNUM_KEY;
1928 TDB_DATA key, data;
1929
1930 key.dptr = discard_const(keyname);
1931 key.dsize = strlen(keyname) + 1;
1932
1933 data.dptr = (uint8_t *)&seqnum;
1934 data.dsize = sizeof(seqnum);
1935
1936 return ctdb_transaction_store_record(h, key, data);
1937}
1938
1939struct ctdb_transaction_commit_state {
1940 struct tevent_context *ev;
1941 struct ctdb_transaction_handle *h;
1942 uint64_t seqnum;
1943};
1944
1945static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1946static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1947
1948struct tevent_req *ctdb_transaction_commit_send(
1949 TALLOC_CTX *mem_ctx,
1950 struct tevent_context *ev,
1951 struct ctdb_transaction_handle *h)
1952{
1953 struct tevent_req *req, *subreq;
1954 struct ctdb_transaction_commit_state *state;
1955 int ret;
1956
1957 req = tevent_req_create(mem_ctx, &state,
1958 struct ctdb_transaction_commit_state);
1959 if (req == NULL) {
1960 return NULL;
1961 }
1962
1963 state->ev = ev;
1964 state->h = h;
1965
1966 ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1967 h->client->pnn, tevent_timeval_zero(),
1968 h->db->db_id, &state->seqnum);
1969 if (ret != 0) {
1970 tevent_req_error(req, ret);
1971 return tevent_req_post(req, ev);
1972 }
1973
1974 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1975 if (ret != 0) {
1976 tevent_req_error(req, ret);
1977 return tevent_req_post(req, ev);
1978 }
1979
1980 subreq = ctdb_recovery_wait_send(state, ev, h->client);
1981 if (tevent_req_nomem(subreq, req)) {
1982 return tevent_req_post(req, ev);
1983 }
1984 tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1985
1986 return req;
1987}
1988
1989static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1990{
1991 struct tevent_req *req = tevent_req_callback_data(
1992 subreq, struct tevent_req);
1993 struct ctdb_transaction_commit_state *state = tevent_req_data(
1994 req, struct ctdb_transaction_commit_state);
1995 struct ctdb_req_control request;
1996 int ret;
1997 bool status;
1998
1999 status = ctdb_recovery_wait_recv(subreq, &ret);
2000 TALLOC_FREE(subreq);
2001 if (! status) {
2002 tevent_req_error(req, ret);
2003 return;
2004 }
2005
2006 ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2007 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2008 state->h->client->pnn,
2009 tevent_timeval_zero(), &request);
2010 if (tevent_req_nomem(subreq, req)) {
2011 return;
2012 }
2013 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2014}
2015
2016static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2017{
2018 struct tevent_req *req = tevent_req_callback_data(
2019 subreq, struct tevent_req);
2020 struct ctdb_transaction_commit_state *state = tevent_req_data(
2021 req, struct ctdb_transaction_commit_state);
2022 struct ctdb_reply_control *reply;
2023 uint64_t seqnum;
2024 int ret;
2025 bool status;
2026
2027 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2028 TALLOC_FREE(subreq);
2029 if (! status) {
2030 tevent_req_error(req, ret);
2031 return;
2032 }
2033
2034 ret = ctdb_reply_control_trans3_commit(reply);
2035 if (ret < 0) {
2036 /* Control failed due to recovery */
2037 subreq = ctdb_recovery_wait_send(state, state->ev,
2038 state->h->client);
2039 if (tevent_req_nomem(subreq, req)) {
2040 return;
2041 }
2042 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2043 req);
2044 return;
2045 }
2046
2047 ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2048 state->h->client->pnn,
2049 tevent_timeval_zero(),
2050 state->h->db->db_id, &seqnum);
2051 if (ret != 0) {
2052 tevent_req_error(req, ret);
2053 return;
2054 }
2055
2056 if (seqnum == state->seqnum) {
2057 subreq = ctdb_recovery_wait_send(state, state->ev,
2058 state->h->client);
2059 if (tevent_req_nomem(subreq, req)) {
2060 return;
2061 }
2062 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2063 req);
2064 return;
2065 }
2066
2067 if (seqnum != state->seqnum + 1) {
2068 tevent_req_error(req, EIO);
2069 return;
2070 }
2071
2072 tevent_req_done(req);
2073}
2074
2075bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2076{
2077 struct ctdb_transaction_commit_state *state = tevent_req_data(
2078 req, struct ctdb_transaction_commit_state);
2079 int err;
2080
2081 if (tevent_req_is_unix_error(req, &err)) {
2082 if (perr != NULL) {
2083 *perr = err;
2084 }
2085 TALLOC_FREE(state->h);
2086 return false;
2087 }
2088
2089 TALLOC_FREE(state->h);
2090 return true;
2091}
2092
2093int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2094{
2095 struct tevent_req *req;
2096 int ret;
2097 bool status;
2098
2099 if (h->readonly || ! h->updated) {
2100 talloc_free(h);
2101 return 0;
2102 }
2103
2104 req = ctdb_transaction_commit_send(h, h->ev, h);
2105 if (req == NULL) {
2106 talloc_free(h);
2107 return ENOMEM;
2108 }
2109
2110 tevent_req_poll(req, h->ev);
2111
2112 status = ctdb_transaction_commit_recv(req, &ret);
2113 if (! status) {
2114 talloc_free(h);
2115 return ret;
2116 }
2117
2118 talloc_free(h);
2119 return 0;
2120}
2121
2122int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2123{
2124 talloc_free(h);
2125 return 0;
2126}
2127
2128/*
2129 * TODO:
2130 *
2131 * In future Samba should register SERVER_ID.
2132 * Make that structure same as struct srvid {}.
2133 */
Note: See TracBrowser for help on using the repository browser.