1 | /*
|
---|
2 | efficient async ctdb traverse
|
---|
3 |
|
---|
4 | Copyright (C) Andrew Tridgell 2007
|
---|
5 |
|
---|
6 | This program is free software; you can redistribute it and/or modify
|
---|
7 | it under the terms of the GNU General Public License as published by
|
---|
8 | the Free Software Foundation; either version 3 of the License, or
|
---|
9 | (at your option) any later version.
|
---|
10 |
|
---|
11 | This program is distributed in the hope that it will be useful,
|
---|
12 | but WITHOUT ANY WARRANTY; without even the implied warranty of
|
---|
13 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
---|
14 | GNU General Public License for more details.
|
---|
15 |
|
---|
16 | You should have received a copy of the GNU General Public License
|
---|
17 | along with this program; if not, see <http://www.gnu.org/licenses/>.
|
---|
18 | */
|
---|
19 |
|
---|
20 | #include "replace.h"
|
---|
21 | #include "system/filesys.h"
|
---|
22 | #include "system/network.h"
|
---|
23 | #include "system/wait.h"
|
---|
24 | #include "system/time.h"
|
---|
25 |
|
---|
26 | #include <talloc.h>
|
---|
27 | #include <tevent.h>
|
---|
28 |
|
---|
29 | #include "lib/tdb_wrap/tdb_wrap.h"
|
---|
30 | #include "lib/util/dlinklist.h"
|
---|
31 | #include "lib/util/debug.h"
|
---|
32 | #include "lib/util/samba_util.h"
|
---|
33 | #include "lib/util/util_process.h"
|
---|
34 |
|
---|
35 | #include "ctdb_private.h"
|
---|
36 | #include "ctdb_client.h"
|
---|
37 |
|
---|
38 | #include "common/reqid.h"
|
---|
39 | #include "common/system.h"
|
---|
40 | #include "common/common.h"
|
---|
41 | #include "common/logging.h"
|
---|
42 |
|
---|
43 | typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
|
---|
44 |
|
---|
45 | /*
|
---|
46 | handle returned to caller - freeing this handler will kill the child and
|
---|
47 | terminate the traverse
|
---|
48 | */
|
---|
49 | struct ctdb_traverse_local_handle {
|
---|
50 | struct ctdb_traverse_local_handle *next, *prev;
|
---|
51 | struct ctdb_db_context *ctdb_db;
|
---|
52 | int fd[2];
|
---|
53 | pid_t child;
|
---|
54 | uint64_t srvid;
|
---|
55 | uint32_t client_reqid;
|
---|
56 | uint32_t reqid;
|
---|
57 | int srcnode;
|
---|
58 | void *private_data;
|
---|
59 | ctdb_traverse_fn_t callback;
|
---|
60 | bool withemptyrecords;
|
---|
61 | struct tevent_fd *fde;
|
---|
62 | int records_failed;
|
---|
63 | int records_sent;
|
---|
64 | };
|
---|
65 |
|
---|
66 | /*
|
---|
67 | * called when traverse is completed by child or on error
|
---|
68 | */
|
---|
69 | static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent_fd *fde,
|
---|
70 | uint16_t flags, void *private_data)
|
---|
71 | {
|
---|
72 | struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
|
---|
73 | struct ctdb_traverse_local_handle);
|
---|
74 | ctdb_traverse_fn_t callback = h->callback;
|
---|
75 | void *p = h->private_data;
|
---|
76 | int res;
|
---|
77 | ssize_t n;
|
---|
78 |
|
---|
79 | /* Read the number of records sent by traverse child */
|
---|
80 | n = sys_read(h->fd[0], &res, sizeof(res));
|
---|
81 | if (n < 0 || n != sizeof(res)) {
|
---|
82 | /* Traverse child failed */
|
---|
83 | DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
|
---|
84 | h->ctdb_db->db_name, h->reqid));
|
---|
85 | } else if (res < 0) {
|
---|
86 | /* Traverse failed */
|
---|
87 | res = -res;
|
---|
88 | DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
|
---|
89 | h->ctdb_db->db_name, h->reqid, res));
|
---|
90 | } else {
|
---|
91 | DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
|
---|
92 | h->ctdb_db->db_name, h->reqid, res));
|
---|
93 | }
|
---|
94 |
|
---|
95 | callback(p, tdb_null, tdb_null);
|
---|
96 | }
|
---|
97 |
|
---|
98 | /*
|
---|
99 | destroy a in-flight traverse operation
|
---|
100 | */
|
---|
101 | static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
|
---|
102 | {
|
---|
103 | DLIST_REMOVE(h->ctdb_db->traverse, h);
|
---|
104 | ctdb_kill(h->ctdb_db->ctdb, h->child, SIGKILL);
|
---|
105 | return 0;
|
---|
106 | }
|
---|
107 |
|
---|
108 | /*
|
---|
109 | callback from tdb_traverse_read()
|
---|
110 | */
|
---|
111 | static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
|
---|
112 | {
|
---|
113 | struct ctdb_traverse_local_handle *h = talloc_get_type(p,
|
---|
114 | struct ctdb_traverse_local_handle);
|
---|
115 | struct ctdb_rec_data_old *d;
|
---|
116 | struct ctdb_ltdb_header *hdr;
|
---|
117 | int res, status;
|
---|
118 | TDB_DATA outdata;
|
---|
119 |
|
---|
120 | hdr = (struct ctdb_ltdb_header *)data.dptr;
|
---|
121 |
|
---|
122 | if (h->ctdb_db->persistent == 0) {
|
---|
123 | /* filter out zero-length records */
|
---|
124 | if (!h->withemptyrecords &&
|
---|
125 | data.dsize <= sizeof(struct ctdb_ltdb_header))
|
---|
126 | {
|
---|
127 | return 0;
|
---|
128 | }
|
---|
129 |
|
---|
130 | /* filter out non-authoritative records */
|
---|
131 | if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
|
---|
132 | return 0;
|
---|
133 | }
|
---|
134 | }
|
---|
135 |
|
---|
136 | d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
|
---|
137 | if (d == NULL) {
|
---|
138 | /* error handling is tricky in this child code .... */
|
---|
139 | h->records_failed++;
|
---|
140 | return -1;
|
---|
141 | }
|
---|
142 |
|
---|
143 | outdata.dptr = (uint8_t *)d;
|
---|
144 | outdata.dsize = d->length;
|
---|
145 |
|
---|
146 | res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
|
---|
147 | CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
|
---|
148 | if (res != 0 || status != 0) {
|
---|
149 | h->records_failed++;
|
---|
150 | return -1;
|
---|
151 | }
|
---|
152 |
|
---|
153 | h->records_sent++;
|
---|
154 | return 0;
|
---|
155 | }
|
---|
156 |
|
---|
157 | struct traverse_all_state {
|
---|
158 | struct ctdb_context *ctdb;
|
---|
159 | struct ctdb_traverse_local_handle *h;
|
---|
160 | uint32_t reqid;
|
---|
161 | uint32_t srcnode;
|
---|
162 | uint32_t client_reqid;
|
---|
163 | uint64_t srvid;
|
---|
164 | bool withemptyrecords;
|
---|
165 | };
|
---|
166 |
|
---|
167 | /*
|
---|
168 | setup a non-blocking traverse of a local ltdb. The callback function
|
---|
169 | will be called on every record in the local ltdb. To stop the
|
---|
170 | traverse, talloc_free() the traverse_handle.
|
---|
171 |
|
---|
172 | The traverse is finished when the callback is called with tdb_null for key and data
|
---|
173 | */
|
---|
174 | static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
|
---|
175 | ctdb_traverse_fn_t callback,
|
---|
176 | struct traverse_all_state *all_state)
|
---|
177 | {
|
---|
178 | struct ctdb_traverse_local_handle *h;
|
---|
179 | int ret;
|
---|
180 |
|
---|
181 | h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
|
---|
182 | if (h == NULL) {
|
---|
183 | return NULL;
|
---|
184 | }
|
---|
185 |
|
---|
186 | ret = pipe(h->fd);
|
---|
187 |
|
---|
188 | if (ret != 0) {
|
---|
189 | talloc_free(h);
|
---|
190 | return NULL;
|
---|
191 | }
|
---|
192 |
|
---|
193 | h->child = ctdb_fork(ctdb_db->ctdb);
|
---|
194 |
|
---|
195 | if (h->child == (pid_t)-1) {
|
---|
196 | close(h->fd[0]);
|
---|
197 | close(h->fd[1]);
|
---|
198 | talloc_free(h);
|
---|
199 | return NULL;
|
---|
200 | }
|
---|
201 |
|
---|
202 | h->callback = callback;
|
---|
203 | h->private_data = all_state;
|
---|
204 | h->ctdb_db = ctdb_db;
|
---|
205 | h->client_reqid = all_state->client_reqid;
|
---|
206 | h->reqid = all_state->reqid;
|
---|
207 | h->srvid = all_state->srvid;
|
---|
208 | h->srcnode = all_state->srcnode;
|
---|
209 | h->withemptyrecords = all_state->withemptyrecords;
|
---|
210 |
|
---|
211 | if (h->child == 0) {
|
---|
212 | /* start the traverse in the child */
|
---|
213 | int res, status;
|
---|
214 | pid_t parent = getpid();
|
---|
215 | struct ctdb_context *ctdb = ctdb_db->ctdb;
|
---|
216 | struct ctdb_rec_data_old *d;
|
---|
217 | TDB_DATA outdata;
|
---|
218 |
|
---|
219 | close(h->fd[0]);
|
---|
220 |
|
---|
221 | prctl_set_comment("ctdb_traverse");
|
---|
222 | if (switch_from_server_to_client(ctdb, "traverse_local-%s:",
|
---|
223 | ctdb_db->db_name) != 0) {
|
---|
224 | DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
|
---|
225 | _exit(0);
|
---|
226 | }
|
---|
227 |
|
---|
228 | d = ctdb_marshall_record(h, h->reqid, tdb_null, NULL, tdb_null);
|
---|
229 | if (d == NULL) {
|
---|
230 | res = 0;
|
---|
231 | sys_write(h->fd[1], &res, sizeof(int));
|
---|
232 | _exit(0);
|
---|
233 | }
|
---|
234 |
|
---|
235 | res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
|
---|
236 | if (res == -1 || h->records_failed > 0) {
|
---|
237 | /* traverse failed */
|
---|
238 | res = -(h->records_sent);
|
---|
239 | } else {
|
---|
240 | res = h->records_sent;
|
---|
241 | }
|
---|
242 |
|
---|
243 | /* Wait till all the data is flushed from output queue */
|
---|
244 | while (ctdb_queue_length(ctdb->daemon.queue) > 0) {
|
---|
245 | tevent_loop_once(ctdb->ev);
|
---|
246 | }
|
---|
247 |
|
---|
248 | /* End traverse by sending empty record */
|
---|
249 | outdata.dptr = (uint8_t *)d;
|
---|
250 | outdata.dsize = d->length;
|
---|
251 | ret = ctdb_control(ctdb, h->srcnode, 0,
|
---|
252 | CTDB_CONTROL_TRAVERSE_DATA,
|
---|
253 | CTDB_CTRL_FLAG_NOREPLY, outdata,
|
---|
254 | NULL, NULL, &status, NULL, NULL);
|
---|
255 | if (ret == -1 || status == -1) {
|
---|
256 | if (res > 0) {
|
---|
257 | res = -res;
|
---|
258 | }
|
---|
259 | }
|
---|
260 |
|
---|
261 | sys_write(h->fd[1], &res, sizeof(res));
|
---|
262 |
|
---|
263 | while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
|
---|
264 | sleep(5);
|
---|
265 | }
|
---|
266 | _exit(0);
|
---|
267 | }
|
---|
268 |
|
---|
269 | close(h->fd[1]);
|
---|
270 | set_close_on_exec(h->fd[0]);
|
---|
271 |
|
---|
272 | talloc_set_destructor(h, traverse_local_destructor);
|
---|
273 |
|
---|
274 | DLIST_ADD(ctdb_db->traverse, h);
|
---|
275 |
|
---|
276 | h->fde = tevent_add_fd(ctdb_db->ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
|
---|
277 | ctdb_traverse_child_handler, h);
|
---|
278 | if (h->fde == NULL) {
|
---|
279 | close(h->fd[0]);
|
---|
280 | talloc_free(h);
|
---|
281 | return NULL;
|
---|
282 | }
|
---|
283 | tevent_fd_set_auto_close(h->fde);
|
---|
284 |
|
---|
285 | return h;
|
---|
286 | }
|
---|
287 |
|
---|
288 |
|
---|
289 | struct ctdb_traverse_all_handle {
|
---|
290 | struct ctdb_context *ctdb;
|
---|
291 | struct ctdb_db_context *ctdb_db;
|
---|
292 | uint32_t reqid;
|
---|
293 | ctdb_traverse_fn_t callback;
|
---|
294 | void *private_data;
|
---|
295 | uint32_t null_count;
|
---|
296 | bool timedout;
|
---|
297 | };
|
---|
298 |
|
---|
299 | /*
|
---|
300 | destroy a traverse_all op
|
---|
301 | */
|
---|
302 | static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
|
---|
303 | {
|
---|
304 | reqid_remove(state->ctdb->idr, state->reqid);
|
---|
305 | return 0;
|
---|
306 | }
|
---|
307 |
|
---|
308 | /* called when a traverse times out */
|
---|
309 | static void ctdb_traverse_all_timeout(struct tevent_context *ev,
|
---|
310 | struct tevent_timer *te,
|
---|
311 | struct timeval t, void *private_data)
|
---|
312 | {
|
---|
313 | struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
|
---|
314 |
|
---|
315 | DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
|
---|
316 | CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
|
---|
317 |
|
---|
318 | state->timedout = true;
|
---|
319 | state->callback(state->private_data, tdb_null, tdb_null);
|
---|
320 | }
|
---|
321 |
|
---|
322 |
|
---|
323 | struct traverse_start_state {
|
---|
324 | struct ctdb_context *ctdb;
|
---|
325 | struct ctdb_traverse_all_handle *h;
|
---|
326 | uint32_t srcnode;
|
---|
327 | uint32_t reqid;
|
---|
328 | uint32_t db_id;
|
---|
329 | uint64_t srvid;
|
---|
330 | bool withemptyrecords;
|
---|
331 | int num_records;
|
---|
332 | };
|
---|
333 |
|
---|
334 |
|
---|
335 | /*
|
---|
336 | setup a cluster-wide non-blocking traverse of a ctdb. The
|
---|
337 | callback function will be called on every record in the local
|
---|
338 | ltdb. To stop the traverse, talloc_free() the traverse_handle.
|
---|
339 |
|
---|
340 | The traverse is finished when the callback is called with tdb_null
|
---|
341 | for key and data
|
---|
342 | */
|
---|
343 | static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
|
---|
344 | ctdb_traverse_fn_t callback,
|
---|
345 | struct traverse_start_state *start_state)
|
---|
346 | {
|
---|
347 | struct ctdb_traverse_all_handle *state;
|
---|
348 | struct ctdb_context *ctdb = ctdb_db->ctdb;
|
---|
349 | int ret;
|
---|
350 | TDB_DATA data;
|
---|
351 | struct ctdb_traverse_all r;
|
---|
352 | struct ctdb_traverse_all_ext r_ext;
|
---|
353 | uint32_t destination;
|
---|
354 |
|
---|
355 | state = talloc(start_state, struct ctdb_traverse_all_handle);
|
---|
356 | if (state == NULL) {
|
---|
357 | return NULL;
|
---|
358 | }
|
---|
359 |
|
---|
360 | state->ctdb = ctdb;
|
---|
361 | state->ctdb_db = ctdb_db;
|
---|
362 | state->reqid = reqid_new(ctdb_db->ctdb->idr, state);
|
---|
363 | state->callback = callback;
|
---|
364 | state->private_data = start_state;
|
---|
365 | state->null_count = 0;
|
---|
366 | state->timedout = false;
|
---|
367 |
|
---|
368 | talloc_set_destructor(state, ctdb_traverse_all_destructor);
|
---|
369 |
|
---|
370 | if (start_state->withemptyrecords) {
|
---|
371 | r_ext.db_id = ctdb_db->db_id;
|
---|
372 | r_ext.reqid = state->reqid;
|
---|
373 | r_ext.pnn = ctdb->pnn;
|
---|
374 | r_ext.client_reqid = start_state->reqid;
|
---|
375 | r_ext.srvid = start_state->srvid;
|
---|
376 | r_ext.withemptyrecords = start_state->withemptyrecords;
|
---|
377 |
|
---|
378 | data.dptr = (uint8_t *)&r_ext;
|
---|
379 | data.dsize = sizeof(r_ext);
|
---|
380 | } else {
|
---|
381 | r.db_id = ctdb_db->db_id;
|
---|
382 | r.reqid = state->reqid;
|
---|
383 | r.pnn = ctdb->pnn;
|
---|
384 | r.client_reqid = start_state->reqid;
|
---|
385 | r.srvid = start_state->srvid;
|
---|
386 |
|
---|
387 | data.dptr = (uint8_t *)&r;
|
---|
388 | data.dsize = sizeof(r);
|
---|
389 | }
|
---|
390 |
|
---|
391 | if (ctdb_db->persistent == 0) {
|
---|
392 | /* normal database, traverse all nodes */
|
---|
393 | destination = CTDB_BROADCAST_VNNMAP;
|
---|
394 | } else {
|
---|
395 | int i;
|
---|
396 | /* persistent database, traverse one node, preferably
|
---|
397 | * the local one
|
---|
398 | */
|
---|
399 | destination = ctdb->pnn;
|
---|
400 | /* check we are in the vnnmap */
|
---|
401 | for (i=0; i < ctdb->vnn_map->size; i++) {
|
---|
402 | if (ctdb->vnn_map->map[i] == ctdb->pnn) {
|
---|
403 | break;
|
---|
404 | }
|
---|
405 | }
|
---|
406 | /* if we are not in the vnn map we just pick the first
|
---|
407 | * node instead
|
---|
408 | */
|
---|
409 | if (i == ctdb->vnn_map->size) {
|
---|
410 | destination = ctdb->vnn_map->map[0];
|
---|
411 | }
|
---|
412 | }
|
---|
413 |
|
---|
414 | /* tell all the nodes in the cluster to start sending records to this
|
---|
415 | * node, or if it is a persistent database, just tell the local
|
---|
416 | * node
|
---|
417 | */
|
---|
418 |
|
---|
419 | if (start_state->withemptyrecords) {
|
---|
420 | ret = ctdb_daemon_send_control(ctdb, destination, 0,
|
---|
421 | CTDB_CONTROL_TRAVERSE_ALL_EXT,
|
---|
422 | 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
|
---|
423 | } else {
|
---|
424 | ret = ctdb_daemon_send_control(ctdb, destination, 0,
|
---|
425 | CTDB_CONTROL_TRAVERSE_ALL,
|
---|
426 | 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
|
---|
427 | }
|
---|
428 |
|
---|
429 | if (ret != 0) {
|
---|
430 | talloc_free(state);
|
---|
431 | return NULL;
|
---|
432 | }
|
---|
433 |
|
---|
434 | DEBUG(DEBUG_NOTICE,("Starting traverse on DB %s (id %d)\n",
|
---|
435 | ctdb_db->db_name, state->reqid));
|
---|
436 |
|
---|
437 | /* timeout the traverse */
|
---|
438 | tevent_add_timer(ctdb->ev, state,
|
---|
439 | timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
|
---|
440 | ctdb_traverse_all_timeout, state);
|
---|
441 |
|
---|
442 | return state;
|
---|
443 | }
|
---|
444 |
|
---|
445 | /*
|
---|
446 | called when local traverse ends
|
---|
447 | */
|
---|
448 | static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
|
---|
449 | {
|
---|
450 | struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
|
---|
451 |
|
---|
452 | /* we're done */
|
---|
453 | talloc_free(state);
|
---|
454 | }
|
---|
455 |
|
---|
456 | /*
|
---|
457 | * extended version to take the "withemptyrecords" parameter"
|
---|
458 | */
|
---|
459 | int32_t ctdb_control_traverse_all_ext(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
|
---|
460 | {
|
---|
461 | struct ctdb_traverse_all_ext *c = (struct ctdb_traverse_all_ext *)data.dptr;
|
---|
462 | struct traverse_all_state *state;
|
---|
463 | struct ctdb_db_context *ctdb_db;
|
---|
464 |
|
---|
465 | if (data.dsize != sizeof(struct ctdb_traverse_all_ext)) {
|
---|
466 | DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all_ext\n"));
|
---|
467 | return -1;
|
---|
468 | }
|
---|
469 |
|
---|
470 | ctdb_db = find_ctdb_db(ctdb, c->db_id);
|
---|
471 | if (ctdb_db == NULL) {
|
---|
472 | return -1;
|
---|
473 | }
|
---|
474 |
|
---|
475 | if (ctdb_db->unhealthy_reason) {
|
---|
476 | if (ctdb->tunable.allow_unhealthy_db_read == 0) {
|
---|
477 | DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
|
---|
478 | ctdb_db->db_name, ctdb_db->unhealthy_reason));
|
---|
479 | return -1;
|
---|
480 | }
|
---|
481 | DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
|
---|
482 | ctdb_db->db_name, ctdb_db->unhealthy_reason));
|
---|
483 | }
|
---|
484 |
|
---|
485 | state = talloc(ctdb_db, struct traverse_all_state);
|
---|
486 | if (state == NULL) {
|
---|
487 | return -1;
|
---|
488 | }
|
---|
489 |
|
---|
490 | state->reqid = c->reqid;
|
---|
491 | state->srcnode = c->pnn;
|
---|
492 | state->ctdb = ctdb;
|
---|
493 | state->client_reqid = c->client_reqid;
|
---|
494 | state->srvid = c->srvid;
|
---|
495 | state->withemptyrecords = c->withemptyrecords;
|
---|
496 |
|
---|
497 | state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
|
---|
498 | if (state->h == NULL) {
|
---|
499 | talloc_free(state);
|
---|
500 | return -1;
|
---|
501 | }
|
---|
502 |
|
---|
503 | return 0;
|
---|
504 | }
|
---|
505 |
|
---|
506 | /*
|
---|
507 | called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
|
---|
508 | setup a traverse of our local ltdb, sending the records as
|
---|
509 | CTDB_CONTROL_TRAVERSE_DATA records back to the originator
|
---|
510 | */
|
---|
511 | int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
|
---|
512 | {
|
---|
513 | struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
|
---|
514 | struct traverse_all_state *state;
|
---|
515 | struct ctdb_db_context *ctdb_db;
|
---|
516 |
|
---|
517 | if (data.dsize != sizeof(struct ctdb_traverse_all)) {
|
---|
518 | DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
|
---|
519 | return -1;
|
---|
520 | }
|
---|
521 |
|
---|
522 | ctdb_db = find_ctdb_db(ctdb, c->db_id);
|
---|
523 | if (ctdb_db == NULL) {
|
---|
524 | return -1;
|
---|
525 | }
|
---|
526 |
|
---|
527 | if (ctdb_db->unhealthy_reason) {
|
---|
528 | if (ctdb->tunable.allow_unhealthy_db_read == 0) {
|
---|
529 | DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
|
---|
530 | ctdb_db->db_name, ctdb_db->unhealthy_reason));
|
---|
531 | return -1;
|
---|
532 | }
|
---|
533 | DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
|
---|
534 | ctdb_db->db_name, ctdb_db->unhealthy_reason));
|
---|
535 | }
|
---|
536 |
|
---|
537 | state = talloc(ctdb_db, struct traverse_all_state);
|
---|
538 | if (state == NULL) {
|
---|
539 | return -1;
|
---|
540 | }
|
---|
541 |
|
---|
542 | state->reqid = c->reqid;
|
---|
543 | state->srcnode = c->pnn;
|
---|
544 | state->ctdb = ctdb;
|
---|
545 | state->client_reqid = c->client_reqid;
|
---|
546 | state->srvid = c->srvid;
|
---|
547 | state->withemptyrecords = false;
|
---|
548 |
|
---|
549 | state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
|
---|
550 | if (state->h == NULL) {
|
---|
551 | talloc_free(state);
|
---|
552 | return -1;
|
---|
553 | }
|
---|
554 |
|
---|
555 | return 0;
|
---|
556 | }
|
---|
557 |
|
---|
558 |
|
---|
559 | /*
|
---|
560 | called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
|
---|
561 | call the traverse_all callback with the record
|
---|
562 | */
|
---|
563 | int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
|
---|
564 | {
|
---|
565 | struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
|
---|
566 | struct ctdb_traverse_all_handle *state;
|
---|
567 | TDB_DATA key;
|
---|
568 | ctdb_traverse_fn_t callback;
|
---|
569 | void *private_data;
|
---|
570 |
|
---|
571 | if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
|
---|
572 | DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
|
---|
573 | return -1;
|
---|
574 | }
|
---|
575 |
|
---|
576 | state = reqid_find(ctdb->idr, d->reqid, struct ctdb_traverse_all_handle);
|
---|
577 | if (state == NULL || d->reqid != state->reqid) {
|
---|
578 | /* traverse might have been terminated already */
|
---|
579 | return -1;
|
---|
580 | }
|
---|
581 |
|
---|
582 | key.dsize = d->keylen;
|
---|
583 | key.dptr = &d->data[0];
|
---|
584 | data.dsize = d->datalen;
|
---|
585 | data.dptr = &d->data[d->keylen];
|
---|
586 |
|
---|
587 | if (key.dsize == 0 && data.dsize == 0) {
|
---|
588 | state->null_count++;
|
---|
589 | /* Persistent databases are only scanned on one node (the local
|
---|
590 | * node)
|
---|
591 | */
|
---|
592 | if (state->ctdb_db->persistent == 0) {
|
---|
593 | if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
|
---|
594 | return 0;
|
---|
595 | }
|
---|
596 | }
|
---|
597 | }
|
---|
598 |
|
---|
599 | callback = state->callback;
|
---|
600 | private_data = state->private_data;
|
---|
601 |
|
---|
602 | callback(private_data, key, data);
|
---|
603 | return 0;
|
---|
604 | }
|
---|
605 |
|
---|
606 | /*
|
---|
607 | kill a in-progress traverse, used when a client disconnects
|
---|
608 | */
|
---|
609 | int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
|
---|
610 | TDB_DATA *outdata, uint32_t srcnode)
|
---|
611 | {
|
---|
612 | struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
|
---|
613 | struct ctdb_db_context *ctdb_db;
|
---|
614 | struct ctdb_traverse_local_handle *t;
|
---|
615 |
|
---|
616 | ctdb_db = find_ctdb_db(ctdb, d->db_id);
|
---|
617 | if (ctdb_db == NULL) {
|
---|
618 | return -1;
|
---|
619 | }
|
---|
620 |
|
---|
621 | for (t=ctdb_db->traverse; t; t=t->next) {
|
---|
622 | if (t->client_reqid == d->reqid &&
|
---|
623 | t->srvid == d->srvid) {
|
---|
624 | talloc_free(t);
|
---|
625 | break;
|
---|
626 | }
|
---|
627 | }
|
---|
628 |
|
---|
629 | return 0;
|
---|
630 | }
|
---|
631 |
|
---|
632 |
|
---|
633 | /*
|
---|
634 | this is called when a client disconnects during a traverse
|
---|
635 | we need to notify all the nodes taking part in the search that they
|
---|
636 | should kill their traverse children
|
---|
637 | */
|
---|
638 | static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
|
---|
639 | {
|
---|
640 | struct ctdb_traverse_start r;
|
---|
641 | TDB_DATA data;
|
---|
642 |
|
---|
643 | DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
|
---|
644 | r.db_id = state->db_id;
|
---|
645 | r.reqid = state->reqid;
|
---|
646 | r.srvid = state->srvid;
|
---|
647 |
|
---|
648 | data.dptr = (uint8_t *)&r;
|
---|
649 | data.dsize = sizeof(r);
|
---|
650 |
|
---|
651 | ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
|
---|
652 | CTDB_CONTROL_TRAVERSE_KILL,
|
---|
653 | 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
|
---|
654 | return 0;
|
---|
655 | }
|
---|
656 |
|
---|
657 | /*
|
---|
658 | callback which sends records as messages to the client
|
---|
659 | */
|
---|
660 | static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
|
---|
661 | {
|
---|
662 | struct traverse_start_state *state;
|
---|
663 | struct ctdb_rec_data_old *d;
|
---|
664 | TDB_DATA cdata;
|
---|
665 |
|
---|
666 | state = talloc_get_type(p, struct traverse_start_state);
|
---|
667 |
|
---|
668 | d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
|
---|
669 | if (d == NULL) {
|
---|
670 | return;
|
---|
671 | }
|
---|
672 |
|
---|
673 | cdata.dptr = (uint8_t *)d;
|
---|
674 | cdata.dsize = d->length;
|
---|
675 |
|
---|
676 | srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
|
---|
677 | if (key.dsize == 0 && data.dsize == 0) {
|
---|
678 | DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
|
---|
679 | state->h->ctdb_db->db_name, state->h->reqid,
|
---|
680 | state->num_records));
|
---|
681 |
|
---|
682 | if (state->h->timedout) {
|
---|
683 | /* timed out, send TRAVERSE_KILL control */
|
---|
684 | talloc_free(state);
|
---|
685 | } else {
|
---|
686 | /* end of traverse */
|
---|
687 | talloc_set_destructor(state, NULL);
|
---|
688 | talloc_free(state);
|
---|
689 | }
|
---|
690 | } else {
|
---|
691 | state->num_records++;
|
---|
692 | }
|
---|
693 | }
|
---|
694 |
|
---|
695 |
|
---|
696 | /**
|
---|
697 | * start a traverse_all - called as a control from a client.
|
---|
698 | * extended version to take the "withemptyrecords" parameter.
|
---|
699 | */
|
---|
700 | int32_t ctdb_control_traverse_start_ext(struct ctdb_context *ctdb,
|
---|
701 | TDB_DATA data,
|
---|
702 | TDB_DATA *outdata,
|
---|
703 | uint32_t srcnode,
|
---|
704 | uint32_t client_id)
|
---|
705 | {
|
---|
706 | struct ctdb_traverse_start_ext *d = (struct ctdb_traverse_start_ext *)data.dptr;
|
---|
707 | struct traverse_start_state *state;
|
---|
708 | struct ctdb_db_context *ctdb_db;
|
---|
709 | struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
|
---|
710 |
|
---|
711 | if (client == NULL) {
|
---|
712 | DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
|
---|
713 | return -1;
|
---|
714 | }
|
---|
715 |
|
---|
716 | if (data.dsize != sizeof(*d)) {
|
---|
717 | DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
|
---|
718 | return -1;
|
---|
719 | }
|
---|
720 |
|
---|
721 | ctdb_db = find_ctdb_db(ctdb, d->db_id);
|
---|
722 | if (ctdb_db == NULL) {
|
---|
723 | return -1;
|
---|
724 | }
|
---|
725 |
|
---|
726 | if (ctdb_db->unhealthy_reason) {
|
---|
727 | if (ctdb->tunable.allow_unhealthy_db_read == 0) {
|
---|
728 | DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
|
---|
729 | ctdb_db->db_name, ctdb_db->unhealthy_reason));
|
---|
730 | return -1;
|
---|
731 | }
|
---|
732 | DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
|
---|
733 | ctdb_db->db_name, ctdb_db->unhealthy_reason));
|
---|
734 | }
|
---|
735 |
|
---|
736 | state = talloc(client, struct traverse_start_state);
|
---|
737 | if (state == NULL) {
|
---|
738 | return -1;
|
---|
739 | }
|
---|
740 |
|
---|
741 | state->srcnode = srcnode;
|
---|
742 | state->reqid = d->reqid;
|
---|
743 | state->srvid = d->srvid;
|
---|
744 | state->db_id = d->db_id;
|
---|
745 | state->ctdb = ctdb;
|
---|
746 | state->withemptyrecords = d->withemptyrecords;
|
---|
747 | state->num_records = 0;
|
---|
748 |
|
---|
749 | state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
|
---|
750 | if (state->h == NULL) {
|
---|
751 | talloc_free(state);
|
---|
752 | return -1;
|
---|
753 | }
|
---|
754 |
|
---|
755 | talloc_set_destructor(state, ctdb_traverse_start_destructor);
|
---|
756 |
|
---|
757 | return 0;
|
---|
758 | }
|
---|
759 |
|
---|
760 | /**
|
---|
761 | * start a traverse_all - called as a control from a client.
|
---|
762 | */
|
---|
763 | int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb,
|
---|
764 | TDB_DATA data,
|
---|
765 | TDB_DATA *outdata,
|
---|
766 | uint32_t srcnode,
|
---|
767 | uint32_t client_id)
|
---|
768 | {
|
---|
769 | struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
|
---|
770 | struct ctdb_traverse_start_ext d2;
|
---|
771 | TDB_DATA data2;
|
---|
772 |
|
---|
773 | ZERO_STRUCT(d2);
|
---|
774 | d2.db_id = d->db_id;
|
---|
775 | d2.reqid = d->reqid;
|
---|
776 | d2.srvid = d->srvid;
|
---|
777 | d2.withemptyrecords = false;
|
---|
778 |
|
---|
779 | data2.dsize = sizeof(d2);
|
---|
780 | data2.dptr = (uint8_t *)&d2;
|
---|
781 |
|
---|
782 | return ctdb_control_traverse_start_ext(ctdb, data2, outdata, srcnode, client_id);
|
---|
783 | }
|
---|