source: vendor/current/ctdb/server/ctdb_traverse.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 20.0 KB
Line 
1/*
2 efficient async ctdb traverse
3
4 Copyright (C) Andrew Tridgell 2007
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "replace.h"
21#include "system/filesys.h"
22#include "system/network.h"
23#include "system/wait.h"
24#include "system/time.h"
25
26#include <talloc.h>
27#include <tevent.h>
28
29#include "lib/tdb_wrap/tdb_wrap.h"
30#include "lib/util/dlinklist.h"
31#include "lib/util/debug.h"
32#include "lib/util/samba_util.h"
33#include "lib/util/util_process.h"
34
35#include "ctdb_private.h"
36#include "ctdb_client.h"
37
38#include "common/reqid.h"
39#include "common/system.h"
40#include "common/common.h"
41#include "common/logging.h"
42
43typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
44
45/*
46 handle returned to caller - freeing this handler will kill the child and
47 terminate the traverse
48 */
49struct ctdb_traverse_local_handle {
50 struct ctdb_traverse_local_handle *next, *prev;
51 struct ctdb_db_context *ctdb_db;
52 int fd[2];
53 pid_t child;
54 uint64_t srvid;
55 uint32_t client_reqid;
56 uint32_t reqid;
57 int srcnode;
58 void *private_data;
59 ctdb_traverse_fn_t callback;
60 bool withemptyrecords;
61 struct tevent_fd *fde;
62 int records_failed;
63 int records_sent;
64};
65
66/*
67 * called when traverse is completed by child or on error
68 */
69static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent_fd *fde,
70 uint16_t flags, void *private_data)
71{
72 struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
73 struct ctdb_traverse_local_handle);
74 ctdb_traverse_fn_t callback = h->callback;
75 void *p = h->private_data;
76 int res;
77 ssize_t n;
78
79 /* Read the number of records sent by traverse child */
80 n = sys_read(h->fd[0], &res, sizeof(res));
81 if (n < 0 || n != sizeof(res)) {
82 /* Traverse child failed */
83 DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
84 h->ctdb_db->db_name, h->reqid));
85 } else if (res < 0) {
86 /* Traverse failed */
87 res = -res;
88 DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
89 h->ctdb_db->db_name, h->reqid, res));
90 } else {
91 DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
92 h->ctdb_db->db_name, h->reqid, res));
93 }
94
95 callback(p, tdb_null, tdb_null);
96}
97
98/*
99 destroy a in-flight traverse operation
100 */
101static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
102{
103 DLIST_REMOVE(h->ctdb_db->traverse, h);
104 ctdb_kill(h->ctdb_db->ctdb, h->child, SIGKILL);
105 return 0;
106}
107
108/*
109 callback from tdb_traverse_read()
110 */
111static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
112{
113 struct ctdb_traverse_local_handle *h = talloc_get_type(p,
114 struct ctdb_traverse_local_handle);
115 struct ctdb_rec_data_old *d;
116 struct ctdb_ltdb_header *hdr;
117 int res, status;
118 TDB_DATA outdata;
119
120 hdr = (struct ctdb_ltdb_header *)data.dptr;
121
122 if (h->ctdb_db->persistent == 0) {
123 /* filter out zero-length records */
124 if (!h->withemptyrecords &&
125 data.dsize <= sizeof(struct ctdb_ltdb_header))
126 {
127 return 0;
128 }
129
130 /* filter out non-authoritative records */
131 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
132 return 0;
133 }
134 }
135
136 d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
137 if (d == NULL) {
138 /* error handling is tricky in this child code .... */
139 h->records_failed++;
140 return -1;
141 }
142
143 outdata.dptr = (uint8_t *)d;
144 outdata.dsize = d->length;
145
146 res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
147 CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
148 if (res != 0 || status != 0) {
149 h->records_failed++;
150 return -1;
151 }
152
153 h->records_sent++;
154 return 0;
155}
156
157struct traverse_all_state {
158 struct ctdb_context *ctdb;
159 struct ctdb_traverse_local_handle *h;
160 uint32_t reqid;
161 uint32_t srcnode;
162 uint32_t client_reqid;
163 uint64_t srvid;
164 bool withemptyrecords;
165};
166
167/*
168 setup a non-blocking traverse of a local ltdb. The callback function
169 will be called on every record in the local ltdb. To stop the
170 traverse, talloc_free() the traverse_handle.
171
172 The traverse is finished when the callback is called with tdb_null for key and data
173 */
174static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
175 ctdb_traverse_fn_t callback,
176 struct traverse_all_state *all_state)
177{
178 struct ctdb_traverse_local_handle *h;
179 int ret;
180
181 h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
182 if (h == NULL) {
183 return NULL;
184 }
185
186 ret = pipe(h->fd);
187
188 if (ret != 0) {
189 talloc_free(h);
190 return NULL;
191 }
192
193 h->child = ctdb_fork(ctdb_db->ctdb);
194
195 if (h->child == (pid_t)-1) {
196 close(h->fd[0]);
197 close(h->fd[1]);
198 talloc_free(h);
199 return NULL;
200 }
201
202 h->callback = callback;
203 h->private_data = all_state;
204 h->ctdb_db = ctdb_db;
205 h->client_reqid = all_state->client_reqid;
206 h->reqid = all_state->reqid;
207 h->srvid = all_state->srvid;
208 h->srcnode = all_state->srcnode;
209 h->withemptyrecords = all_state->withemptyrecords;
210
211 if (h->child == 0) {
212 /* start the traverse in the child */
213 int res, status;
214 pid_t parent = getpid();
215 struct ctdb_context *ctdb = ctdb_db->ctdb;
216 struct ctdb_rec_data_old *d;
217 TDB_DATA outdata;
218
219 close(h->fd[0]);
220
221 prctl_set_comment("ctdb_traverse");
222 if (switch_from_server_to_client(ctdb, "traverse_local-%s:",
223 ctdb_db->db_name) != 0) {
224 DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
225 _exit(0);
226 }
227
228 d = ctdb_marshall_record(h, h->reqid, tdb_null, NULL, tdb_null);
229 if (d == NULL) {
230 res = 0;
231 sys_write(h->fd[1], &res, sizeof(int));
232 _exit(0);
233 }
234
235 res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
236 if (res == -1 || h->records_failed > 0) {
237 /* traverse failed */
238 res = -(h->records_sent);
239 } else {
240 res = h->records_sent;
241 }
242
243 /* Wait till all the data is flushed from output queue */
244 while (ctdb_queue_length(ctdb->daemon.queue) > 0) {
245 tevent_loop_once(ctdb->ev);
246 }
247
248 /* End traverse by sending empty record */
249 outdata.dptr = (uint8_t *)d;
250 outdata.dsize = d->length;
251 ret = ctdb_control(ctdb, h->srcnode, 0,
252 CTDB_CONTROL_TRAVERSE_DATA,
253 CTDB_CTRL_FLAG_NOREPLY, outdata,
254 NULL, NULL, &status, NULL, NULL);
255 if (ret == -1 || status == -1) {
256 if (res > 0) {
257 res = -res;
258 }
259 }
260
261 sys_write(h->fd[1], &res, sizeof(res));
262
263 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
264 sleep(5);
265 }
266 _exit(0);
267 }
268
269 close(h->fd[1]);
270 set_close_on_exec(h->fd[0]);
271
272 talloc_set_destructor(h, traverse_local_destructor);
273
274 DLIST_ADD(ctdb_db->traverse, h);
275
276 h->fde = tevent_add_fd(ctdb_db->ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
277 ctdb_traverse_child_handler, h);
278 if (h->fde == NULL) {
279 close(h->fd[0]);
280 talloc_free(h);
281 return NULL;
282 }
283 tevent_fd_set_auto_close(h->fde);
284
285 return h;
286}
287
288
289struct ctdb_traverse_all_handle {
290 struct ctdb_context *ctdb;
291 struct ctdb_db_context *ctdb_db;
292 uint32_t reqid;
293 ctdb_traverse_fn_t callback;
294 void *private_data;
295 uint32_t null_count;
296 bool timedout;
297};
298
299/*
300 destroy a traverse_all op
301 */
302static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
303{
304 reqid_remove(state->ctdb->idr, state->reqid);
305 return 0;
306}
307
308/* called when a traverse times out */
309static void ctdb_traverse_all_timeout(struct tevent_context *ev,
310 struct tevent_timer *te,
311 struct timeval t, void *private_data)
312{
313 struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
314
315 DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
316 CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
317
318 state->timedout = true;
319 state->callback(state->private_data, tdb_null, tdb_null);
320}
321
322
323struct traverse_start_state {
324 struct ctdb_context *ctdb;
325 struct ctdb_traverse_all_handle *h;
326 uint32_t srcnode;
327 uint32_t reqid;
328 uint32_t db_id;
329 uint64_t srvid;
330 bool withemptyrecords;
331 int num_records;
332};
333
334
335/*
336 setup a cluster-wide non-blocking traverse of a ctdb. The
337 callback function will be called on every record in the local
338 ltdb. To stop the traverse, talloc_free() the traverse_handle.
339
340 The traverse is finished when the callback is called with tdb_null
341 for key and data
342 */
343static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
344 ctdb_traverse_fn_t callback,
345 struct traverse_start_state *start_state)
346{
347 struct ctdb_traverse_all_handle *state;
348 struct ctdb_context *ctdb = ctdb_db->ctdb;
349 int ret;
350 TDB_DATA data;
351 struct ctdb_traverse_all r;
352 struct ctdb_traverse_all_ext r_ext;
353 uint32_t destination;
354
355 state = talloc(start_state, struct ctdb_traverse_all_handle);
356 if (state == NULL) {
357 return NULL;
358 }
359
360 state->ctdb = ctdb;
361 state->ctdb_db = ctdb_db;
362 state->reqid = reqid_new(ctdb_db->ctdb->idr, state);
363 state->callback = callback;
364 state->private_data = start_state;
365 state->null_count = 0;
366 state->timedout = false;
367
368 talloc_set_destructor(state, ctdb_traverse_all_destructor);
369
370 if (start_state->withemptyrecords) {
371 r_ext.db_id = ctdb_db->db_id;
372 r_ext.reqid = state->reqid;
373 r_ext.pnn = ctdb->pnn;
374 r_ext.client_reqid = start_state->reqid;
375 r_ext.srvid = start_state->srvid;
376 r_ext.withemptyrecords = start_state->withemptyrecords;
377
378 data.dptr = (uint8_t *)&r_ext;
379 data.dsize = sizeof(r_ext);
380 } else {
381 r.db_id = ctdb_db->db_id;
382 r.reqid = state->reqid;
383 r.pnn = ctdb->pnn;
384 r.client_reqid = start_state->reqid;
385 r.srvid = start_state->srvid;
386
387 data.dptr = (uint8_t *)&r;
388 data.dsize = sizeof(r);
389 }
390
391 if (ctdb_db->persistent == 0) {
392 /* normal database, traverse all nodes */
393 destination = CTDB_BROADCAST_VNNMAP;
394 } else {
395 int i;
396 /* persistent database, traverse one node, preferably
397 * the local one
398 */
399 destination = ctdb->pnn;
400 /* check we are in the vnnmap */
401 for (i=0; i < ctdb->vnn_map->size; i++) {
402 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
403 break;
404 }
405 }
406 /* if we are not in the vnn map we just pick the first
407 * node instead
408 */
409 if (i == ctdb->vnn_map->size) {
410 destination = ctdb->vnn_map->map[0];
411 }
412 }
413
414 /* tell all the nodes in the cluster to start sending records to this
415 * node, or if it is a persistent database, just tell the local
416 * node
417 */
418
419 if (start_state->withemptyrecords) {
420 ret = ctdb_daemon_send_control(ctdb, destination, 0,
421 CTDB_CONTROL_TRAVERSE_ALL_EXT,
422 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
423 } else {
424 ret = ctdb_daemon_send_control(ctdb, destination, 0,
425 CTDB_CONTROL_TRAVERSE_ALL,
426 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
427 }
428
429 if (ret != 0) {
430 talloc_free(state);
431 return NULL;
432 }
433
434 DEBUG(DEBUG_NOTICE,("Starting traverse on DB %s (id %d)\n",
435 ctdb_db->db_name, state->reqid));
436
437 /* timeout the traverse */
438 tevent_add_timer(ctdb->ev, state,
439 timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
440 ctdb_traverse_all_timeout, state);
441
442 return state;
443}
444
445/*
446 called when local traverse ends
447 */
448static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
449{
450 struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
451
452 /* we're done */
453 talloc_free(state);
454}
455
456/*
457 * extended version to take the "withemptyrecords" parameter"
458 */
459int32_t ctdb_control_traverse_all_ext(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
460{
461 struct ctdb_traverse_all_ext *c = (struct ctdb_traverse_all_ext *)data.dptr;
462 struct traverse_all_state *state;
463 struct ctdb_db_context *ctdb_db;
464
465 if (data.dsize != sizeof(struct ctdb_traverse_all_ext)) {
466 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all_ext\n"));
467 return -1;
468 }
469
470 ctdb_db = find_ctdb_db(ctdb, c->db_id);
471 if (ctdb_db == NULL) {
472 return -1;
473 }
474
475 if (ctdb_db->unhealthy_reason) {
476 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
477 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
478 ctdb_db->db_name, ctdb_db->unhealthy_reason));
479 return -1;
480 }
481 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
482 ctdb_db->db_name, ctdb_db->unhealthy_reason));
483 }
484
485 state = talloc(ctdb_db, struct traverse_all_state);
486 if (state == NULL) {
487 return -1;
488 }
489
490 state->reqid = c->reqid;
491 state->srcnode = c->pnn;
492 state->ctdb = ctdb;
493 state->client_reqid = c->client_reqid;
494 state->srvid = c->srvid;
495 state->withemptyrecords = c->withemptyrecords;
496
497 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
498 if (state->h == NULL) {
499 talloc_free(state);
500 return -1;
501 }
502
503 return 0;
504}
505
506/*
507 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
508 setup a traverse of our local ltdb, sending the records as
509 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
510 */
511int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
512{
513 struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
514 struct traverse_all_state *state;
515 struct ctdb_db_context *ctdb_db;
516
517 if (data.dsize != sizeof(struct ctdb_traverse_all)) {
518 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
519 return -1;
520 }
521
522 ctdb_db = find_ctdb_db(ctdb, c->db_id);
523 if (ctdb_db == NULL) {
524 return -1;
525 }
526
527 if (ctdb_db->unhealthy_reason) {
528 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
529 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
530 ctdb_db->db_name, ctdb_db->unhealthy_reason));
531 return -1;
532 }
533 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
534 ctdb_db->db_name, ctdb_db->unhealthy_reason));
535 }
536
537 state = talloc(ctdb_db, struct traverse_all_state);
538 if (state == NULL) {
539 return -1;
540 }
541
542 state->reqid = c->reqid;
543 state->srcnode = c->pnn;
544 state->ctdb = ctdb;
545 state->client_reqid = c->client_reqid;
546 state->srvid = c->srvid;
547 state->withemptyrecords = false;
548
549 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
550 if (state->h == NULL) {
551 talloc_free(state);
552 return -1;
553 }
554
555 return 0;
556}
557
558
559/*
560 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
561 call the traverse_all callback with the record
562 */
563int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
564{
565 struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
566 struct ctdb_traverse_all_handle *state;
567 TDB_DATA key;
568 ctdb_traverse_fn_t callback;
569 void *private_data;
570
571 if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
572 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
573 return -1;
574 }
575
576 state = reqid_find(ctdb->idr, d->reqid, struct ctdb_traverse_all_handle);
577 if (state == NULL || d->reqid != state->reqid) {
578 /* traverse might have been terminated already */
579 return -1;
580 }
581
582 key.dsize = d->keylen;
583 key.dptr = &d->data[0];
584 data.dsize = d->datalen;
585 data.dptr = &d->data[d->keylen];
586
587 if (key.dsize == 0 && data.dsize == 0) {
588 state->null_count++;
589 /* Persistent databases are only scanned on one node (the local
590 * node)
591 */
592 if (state->ctdb_db->persistent == 0) {
593 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
594 return 0;
595 }
596 }
597 }
598
599 callback = state->callback;
600 private_data = state->private_data;
601
602 callback(private_data, key, data);
603 return 0;
604}
605
606/*
607 kill a in-progress traverse, used when a client disconnects
608 */
609int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
610 TDB_DATA *outdata, uint32_t srcnode)
611{
612 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
613 struct ctdb_db_context *ctdb_db;
614 struct ctdb_traverse_local_handle *t;
615
616 ctdb_db = find_ctdb_db(ctdb, d->db_id);
617 if (ctdb_db == NULL) {
618 return -1;
619 }
620
621 for (t=ctdb_db->traverse; t; t=t->next) {
622 if (t->client_reqid == d->reqid &&
623 t->srvid == d->srvid) {
624 talloc_free(t);
625 break;
626 }
627 }
628
629 return 0;
630}
631
632
633/*
634 this is called when a client disconnects during a traverse
635 we need to notify all the nodes taking part in the search that they
636 should kill their traverse children
637 */
638static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
639{
640 struct ctdb_traverse_start r;
641 TDB_DATA data;
642
643 DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
644 r.db_id = state->db_id;
645 r.reqid = state->reqid;
646 r.srvid = state->srvid;
647
648 data.dptr = (uint8_t *)&r;
649 data.dsize = sizeof(r);
650
651 ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
652 CTDB_CONTROL_TRAVERSE_KILL,
653 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
654 return 0;
655}
656
657/*
658 callback which sends records as messages to the client
659 */
660static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
661{
662 struct traverse_start_state *state;
663 struct ctdb_rec_data_old *d;
664 TDB_DATA cdata;
665
666 state = talloc_get_type(p, struct traverse_start_state);
667
668 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
669 if (d == NULL) {
670 return;
671 }
672
673 cdata.dptr = (uint8_t *)d;
674 cdata.dsize = d->length;
675
676 srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
677 if (key.dsize == 0 && data.dsize == 0) {
678 DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
679 state->h->ctdb_db->db_name, state->h->reqid,
680 state->num_records));
681
682 if (state->h->timedout) {
683 /* timed out, send TRAVERSE_KILL control */
684 talloc_free(state);
685 } else {
686 /* end of traverse */
687 talloc_set_destructor(state, NULL);
688 talloc_free(state);
689 }
690 } else {
691 state->num_records++;
692 }
693}
694
695
696/**
697 * start a traverse_all - called as a control from a client.
698 * extended version to take the "withemptyrecords" parameter.
699 */
700int32_t ctdb_control_traverse_start_ext(struct ctdb_context *ctdb,
701 TDB_DATA data,
702 TDB_DATA *outdata,
703 uint32_t srcnode,
704 uint32_t client_id)
705{
706 struct ctdb_traverse_start_ext *d = (struct ctdb_traverse_start_ext *)data.dptr;
707 struct traverse_start_state *state;
708 struct ctdb_db_context *ctdb_db;
709 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
710
711 if (client == NULL) {
712 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
713 return -1;
714 }
715
716 if (data.dsize != sizeof(*d)) {
717 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
718 return -1;
719 }
720
721 ctdb_db = find_ctdb_db(ctdb, d->db_id);
722 if (ctdb_db == NULL) {
723 return -1;
724 }
725
726 if (ctdb_db->unhealthy_reason) {
727 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
728 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
729 ctdb_db->db_name, ctdb_db->unhealthy_reason));
730 return -1;
731 }
732 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
733 ctdb_db->db_name, ctdb_db->unhealthy_reason));
734 }
735
736 state = talloc(client, struct traverse_start_state);
737 if (state == NULL) {
738 return -1;
739 }
740
741 state->srcnode = srcnode;
742 state->reqid = d->reqid;
743 state->srvid = d->srvid;
744 state->db_id = d->db_id;
745 state->ctdb = ctdb;
746 state->withemptyrecords = d->withemptyrecords;
747 state->num_records = 0;
748
749 state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
750 if (state->h == NULL) {
751 talloc_free(state);
752 return -1;
753 }
754
755 talloc_set_destructor(state, ctdb_traverse_start_destructor);
756
757 return 0;
758}
759
760/**
761 * start a traverse_all - called as a control from a client.
762 */
763int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb,
764 TDB_DATA data,
765 TDB_DATA *outdata,
766 uint32_t srcnode,
767 uint32_t client_id)
768{
769 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
770 struct ctdb_traverse_start_ext d2;
771 TDB_DATA data2;
772
773 ZERO_STRUCT(d2);
774 d2.db_id = d->db_id;
775 d2.reqid = d->reqid;
776 d2.srvid = d->srvid;
777 d2.withemptyrecords = false;
778
779 data2.dsize = sizeof(d2);
780 data2.dptr = (uint8_t *)&d2;
781
782 return ctdb_control_traverse_start_ext(ctdb, data2, outdata, srcnode, client_id);
783}
Note: See TracBrowser for help on using the repository browser.