source: vendor/current/ctdb/server/ctdb_recover.c

Last change on this file was 989, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.7

File size: 45.1 KB
Line 
1/*
2 ctdb recovery code
3
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
19*/
20#include "replace.h"
21#include "system/time.h"
22#include "system/network.h"
23#include "system/filesys.h"
24#include "system/wait.h"
25
26#include <talloc.h>
27#include <tevent.h>
28#include <tdb.h>
29
30#include "lib/tdb_wrap/tdb_wrap.h"
31#include "lib/util/dlinklist.h"
32#include "lib/util/debug.h"
33#include "lib/util/samba_util.h"
34#include "lib/util/util_process.h"
35
36#include "ctdb_private.h"
37#include "ctdb_client.h"
38
39#include "common/system.h"
40#include "common/common.h"
41#include "common/logging.h"
42
43int
44ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
45{
46 struct ctdb_vnn_map_wire *map;
47 size_t len;
48
49 CHECK_CONTROL_DATA_SIZE(0);
50
51 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
52 map = talloc_size(outdata, len);
53 CTDB_NO_MEMORY(ctdb, map);
54
55 map->generation = ctdb->vnn_map->generation;
56 map->size = ctdb->vnn_map->size;
57 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
58
59 outdata->dsize = len;
60 outdata->dptr = (uint8_t *)map;
61
62 return 0;
63}
64
65int
66ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
67{
68 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
69
70 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
71 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
72 return -1;
73 }
74
75 talloc_free(ctdb->vnn_map);
76
77 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
78 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
79
80 ctdb->vnn_map->generation = map->generation;
81 ctdb->vnn_map->size = map->size;
82 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
83 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
84
85 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
86
87 return 0;
88}
89
90int
91ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92{
93 uint32_t i, len;
94 struct ctdb_db_context *ctdb_db;
95 struct ctdb_dbid_map_old *dbid_map;
96
97 CHECK_CONTROL_DATA_SIZE(0);
98
99 len = 0;
100 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
101 len++;
102 }
103
104
105 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
106 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
107 if (!outdata->dptr) {
108 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
109 exit(1);
110 }
111
112 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
113 dbid_map->num = len;
114 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
115 dbid_map->dbs[i].db_id = ctdb_db->db_id;
116 if (ctdb_db->persistent != 0) {
117 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
118 }
119 if (ctdb_db->readonly != 0) {
120 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
121 }
122 if (ctdb_db->sticky != 0) {
123 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
124 }
125 }
126
127 return 0;
128}
129
130int
131ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
132{
133 CHECK_CONTROL_DATA_SIZE(0);
134
135 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
136 ctdb->num_nodes,
137 outdata);
138 if (outdata->dptr == NULL) {
139 return -1;
140 }
141
142 outdata->dsize = talloc_get_size(outdata->dptr);
143
144 return 0;
145}
146
147/*
148 reload the nodes file
149*/
150int
151ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
152{
153 int i, num_nodes;
154 TALLOC_CTX *tmp_ctx;
155 struct ctdb_node **nodes;
156
157 tmp_ctx = talloc_new(ctdb);
158
159 /* steal the old nodes file for a while */
160 talloc_steal(tmp_ctx, ctdb->nodes);
161 nodes = ctdb->nodes;
162 ctdb->nodes = NULL;
163 num_nodes = ctdb->num_nodes;
164 ctdb->num_nodes = 0;
165
166 /* load the new nodes file */
167 ctdb_load_nodes_file(ctdb);
168
169 for (i=0; i<ctdb->num_nodes; i++) {
170 /* keep any identical pre-existing nodes and connections */
171 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
172 talloc_free(ctdb->nodes[i]);
173 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
174 continue;
175 }
176
177 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
178 continue;
179 }
180
181 /* any new or different nodes must be added */
182 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
183 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
184 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
185 }
186 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
187 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
188 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
189 }
190 }
191
192 /* tell the recovery daemon to reaload the nodes file too */
193 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
194
195 talloc_free(tmp_ctx);
196
197 return 0;
198}
199
200/*
201 a traverse function for pulling all relevent records from pulldb
202 */
203struct pulldb_data {
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
206 struct ctdb_marshall_buffer *pulldata;
207 uint32_t len;
208 uint32_t allocated_len;
209 bool failed;
210};
211
212static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
213{
214 struct pulldb_data *params = (struct pulldb_data *)p;
215 struct ctdb_rec_data_old *rec;
216 struct ctdb_context *ctdb = params->ctdb;
217 struct ctdb_db_context *ctdb_db = params->ctdb_db;
218
219 /* add the record to the blob */
220 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
221 if (rec == NULL) {
222 params->failed = true;
223 return -1;
224 }
225 if (params->len + rec->length >= params->allocated_len) {
226 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
227 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
228 }
229 if (params->pulldata == NULL) {
230 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
231 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
232 }
233 params->pulldata->count++;
234 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
235 params->len += rec->length;
236
237 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
238 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
239 }
240
241 talloc_free(rec);
242
243 return 0;
244}
245
246/*
247 pull a bunch of records from a ltdb, filtering by lmaster
248 */
249int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
250{
251 struct ctdb_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_marshall_buffer *reply;
255
256 pull = (struct ctdb_pulldb *)indata.dptr;
257
258 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
259 if (!ctdb_db) {
260 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
261 return -1;
262 }
263
264 if (!ctdb_db_frozen(ctdb_db)) {
265 DEBUG(DEBUG_ERR,
266 ("rejecting ctdb_control_pull_db when not frozen\n"));
267 return -1;
268 }
269
270 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
271 CTDB_NO_MEMORY(ctdb, reply);
272
273 reply->db_id = pull->db_id;
274
275 params.ctdb = ctdb;
276 params.ctdb_db = ctdb_db;
277 params.pulldata = reply;
278 params.len = offsetof(struct ctdb_marshall_buffer, data);
279 params.allocated_len = params.len;
280 params.failed = false;
281
282 if (ctdb_db->unhealthy_reason) {
283 /* this is just a warning, as the tdb should be empty anyway */
284 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
285 ctdb_db->db_name, ctdb_db->unhealthy_reason));
286 }
287
288 if (ctdb_lockdb_mark(ctdb_db) != 0) {
289 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
290 return -1;
291 }
292
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
295 ctdb_lockdb_unmark(ctdb_db);
296 talloc_free(params.pulldata);
297 return -1;
298 }
299
300 ctdb_lockdb_unmark(ctdb_db);
301
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
304
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
307 }
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
310 }
311
312
313 return 0;
314}
315
316struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
320 uint32_t pnn;
321 uint64_t srvid;
322 uint32_t num_records;
323};
324
325static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
327{
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
330
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
333 if (recs == NULL) {
334 TALLOC_FREE(state->recs);
335 return -1;
336 }
337 state->recs = recs;
338
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
341 TDB_DATA buffer;
342 int ret;
343
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
347 if (ret != 0) {
348 TALLOC_FREE(state->recs);
349 return -1;
350 }
351
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
354 }
355
356 return 0;
357}
358
359int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
362{
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
366 int ret;
367
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
369
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
373 pulldb_ext->db_id));
374 return -1;
375 }
376
377 if (!ctdb_db_frozen(ctdb_db)) {
378 DEBUG(DEBUG_ERR,
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
380 return -1;
381 }
382
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
385 DEBUG(DEBUG_WARNING,
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
388 }
389
390 state.ctdb = ctdb;
391 state.ctdb_db = ctdb_db;
392 state.recs = NULL;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
396
397 if (ctdb_lockdb_mark(ctdb_db) != 0) {
398 DEBUG(DEBUG_ERR,
399 (__location__ " Failed to get lock on entire db - failing\n"));
400 return -1;
401 }
402
403 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
404 if (ret == -1) {
405 DEBUG(DEBUG_ERR,
406 (__location__ " Failed to get traverse db '%s'\n",
407 ctdb_db->db_name));
408 ctdb_lockdb_unmark(ctdb_db);
409 return -1;
410 }
411
412 /* Last few records */
413 if (state.recs != NULL) {
414 TDB_DATA buffer;
415
416 buffer = ctdb_marshall_finish(state.recs);
417 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
418 state.srvid, buffer);
419 if (ret != 0) {
420 TALLOC_FREE(state.recs);
421 ctdb_lockdb_unmark(ctdb_db);
422 return -1;
423 }
424
425 state.num_records += state.recs->count;
426 TALLOC_FREE(state.recs);
427 }
428
429 ctdb_lockdb_unmark(ctdb_db);
430
431 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
432 if (outdata->dptr == NULL) {
433 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
434 return -1;
435 }
436
437 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
438 outdata->dsize = sizeof(uint32_t);
439
440 return 0;
441}
442
443/*
444 push a bunch of records into a ltdb, filtering by rsn
445 */
446int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
447{
448 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
450 int i, ret;
451 struct ctdb_rec_data_old *rec;
452
453 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
454 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
455 return -1;
456 }
457
458 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
459 if (!ctdb_db) {
460 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
461 return -1;
462 }
463
464 if (!ctdb_db_frozen(ctdb_db)) {
465 DEBUG(DEBUG_ERR,
466 ("rejecting ctdb_control_push_db when not frozen\n"));
467 return -1;
468 }
469
470 if (ctdb_lockdb_mark(ctdb_db) != 0) {
471 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
472 return -1;
473 }
474
475 rec = (struct ctdb_rec_data_old *)&reply->data[0];
476
477 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
478 reply->count, reply->db_id));
479
480 for (i=0;i<reply->count;i++) {
481 TDB_DATA key, data;
482 struct ctdb_ltdb_header *hdr;
483
484 key.dptr = &rec->data[0];
485 key.dsize = rec->keylen;
486 data.dptr = &rec->data[key.dsize];
487 data.dsize = rec->datalen;
488
489 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
490 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
491 goto failed;
492 }
493 hdr = (struct ctdb_ltdb_header *)data.dptr;
494 /* strip off any read only record flags. All readonly records
495 are revoked implicitely by a recovery
496 */
497 hdr->flags &= ~CTDB_REC_RO_FLAGS;
498
499 data.dptr += sizeof(*hdr);
500 data.dsize -= sizeof(*hdr);
501
502 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
503 if (ret != 0) {
504 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
505 goto failed;
506 }
507
508 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
509 }
510
511 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
512 reply->count, reply->db_id));
513
514 if (ctdb_db->readonly) {
515 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
516 ctdb_db->db_id));
517 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
518 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
519 ctdb_db->readonly = false;
520 tdb_close(ctdb_db->rottdb);
521 ctdb_db->rottdb = NULL;
522 ctdb_db->readonly = false;
523 }
524 while (ctdb_db->revokechild_active != NULL) {
525 talloc_free(ctdb_db->revokechild_active);
526 }
527 }
528
529 ctdb_lockdb_unmark(ctdb_db);
530 return 0;
531
532failed:
533 ctdb_lockdb_unmark(ctdb_db);
534 return -1;
535}
536
537struct db_push_state {
538 struct ctdb_context *ctdb;
539 struct ctdb_db_context *ctdb_db;
540 uint64_t srvid;
541 uint32_t num_records;
542 bool failed;
543};
544
545static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
546 void *private_data)
547{
548 struct db_push_state *state = talloc_get_type(
549 private_data, struct db_push_state);
550 struct ctdb_marshall_buffer *recs;
551 struct ctdb_rec_data_old *rec;
552 int i, ret;
553
554 if (state->failed) {
555 return;
556 }
557
558 recs = (struct ctdb_marshall_buffer *)indata.dptr;
559 rec = (struct ctdb_rec_data_old *)&recs->data[0];
560
561 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
562 recs->count, recs->db_id));
563
564 for (i=0; i<recs->count; i++) {
565 TDB_DATA key, data;
566 struct ctdb_ltdb_header *hdr;
567
568 key.dptr = &rec->data[0];
569 key.dsize = rec->keylen;
570 data.dptr = &rec->data[key.dsize];
571 data.dsize = rec->datalen;
572
573 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
574 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
575 goto failed;
576 }
577
578 hdr = (struct ctdb_ltdb_header *)data.dptr;
579 /* Strip off any read only record flags.
580 * All readonly records are revoked implicitely by a recovery.
581 */
582 hdr->flags &= ~CTDB_REC_RO_FLAGS;
583
584 data.dptr += sizeof(*hdr);
585 data.dsize -= sizeof(*hdr);
586
587 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
588 if (ret != 0) {
589 DEBUG(DEBUG_ERR,
590 (__location__ " Unable to store record\n"));
591 goto failed;
592 }
593
594 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
595 }
596
597 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
598 recs->count, recs->db_id));
599
600 state->num_records += recs->count;
601 return;
602
603failed:
604 state->failed = true;
605}
606
607int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
608{
609 struct ctdb_pulldb_ext *pulldb_ext;
610 struct ctdb_db_context *ctdb_db;
611 struct db_push_state *state;
612 int ret;
613
614 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
615
616 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
617 if (ctdb_db == NULL) {
618 DEBUG(DEBUG_ERR,
619 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
620 return -1;
621 }
622
623 if (!ctdb_db_frozen(ctdb_db)) {
624 DEBUG(DEBUG_ERR,
625 ("rejecting ctdb_control_db_push_start when not frozen\n"));
626 return -1;
627 }
628
629 if (ctdb_db->push_started) {
630 DEBUG(DEBUG_WARNING,
631 (__location__ " DB push already started for %s\n",
632 ctdb_db->db_name));
633
634 /* De-register old state */
635 state = (struct db_push_state *)ctdb_db->push_state;
636 if (state != NULL) {
637 srvid_deregister(ctdb->srv, state->srvid, state);
638 talloc_free(state);
639 ctdb_db->push_state = NULL;
640 }
641 }
642
643 state = talloc_zero(ctdb_db, struct db_push_state);
644 if (state == NULL) {
645 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
646 return -1;
647 }
648
649 state->ctdb = ctdb;
650 state->ctdb_db = ctdb_db;
651 state->srvid = pulldb_ext->srvid;
652 state->failed = false;
653
654 ret = srvid_register(ctdb->srv, state, state->srvid,
655 db_push_msg_handler, state);
656 if (ret != 0) {
657 DEBUG(DEBUG_ERR,
658 (__location__ " Failed to register srvid for db push\n"));
659 talloc_free(state);
660 return -1;
661 }
662
663 if (ctdb_lockdb_mark(ctdb_db) != 0) {
664 DEBUG(DEBUG_ERR,
665 (__location__ " Failed to get lock on entire db - failing\n"));
666 srvid_deregister(ctdb->srv, state->srvid, state);
667 talloc_free(state);
668 return -1;
669 }
670
671 ctdb_db->push_started = true;
672 ctdb_db->push_state = state;
673
674 return 0;
675}
676
677int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
678 TDB_DATA indata, TDB_DATA *outdata)
679{
680 uint32_t db_id;
681 struct ctdb_db_context *ctdb_db;
682 struct db_push_state *state;
683
684 db_id = *(uint32_t *)indata.dptr;
685
686 ctdb_db = find_ctdb_db(ctdb, db_id);
687 if (ctdb_db == NULL) {
688 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
689 return -1;
690 }
691
692 if (!ctdb_db_frozen(ctdb_db)) {
693 DEBUG(DEBUG_ERR,
694 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
695 return -1;
696 }
697
698 if (!ctdb_db->push_started) {
699 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
700 return -1;
701 }
702
703 if (ctdb_db->readonly) {
704 DEBUG(DEBUG_ERR,
705 ("Clearing the tracking database for dbid 0x%x\n",
706 ctdb_db->db_id));
707 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
708 DEBUG(DEBUG_ERR,
709 ("Failed to wipe tracking database for 0x%x."
710 " Dropping read-only delegation support\n",
711 ctdb_db->db_id));
712 ctdb_db->readonly = false;
713 tdb_close(ctdb_db->rottdb);
714 ctdb_db->rottdb = NULL;
715 ctdb_db->readonly = false;
716 }
717
718 while (ctdb_db->revokechild_active != NULL) {
719 talloc_free(ctdb_db->revokechild_active);
720 }
721 }
722
723 ctdb_lockdb_unmark(ctdb_db);
724
725 state = (struct db_push_state *)ctdb_db->push_state;
726 if (state == NULL) {
727 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
728 return -1;
729 }
730
731 srvid_deregister(ctdb->srv, state->srvid, state);
732
733 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
734 if (outdata->dptr == NULL) {
735 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
736 talloc_free(state);
737 ctdb_db->push_state = NULL;
738 return -1;
739 }
740
741 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
742 outdata->dsize = sizeof(uint32_t);
743
744 talloc_free(state);
745 ctdb_db->push_started = false;
746 ctdb_db->push_state = NULL;
747
748 return 0;
749}
750
751struct ctdb_set_recmode_state {
752 struct ctdb_context *ctdb;
753 struct ctdb_req_control_old *c;
754 uint32_t recmode;
755 int fd[2];
756 struct tevent_timer *te;
757 struct tevent_fd *fde;
758 pid_t child;
759 struct timeval start_time;
760};
761
762/*
763 called if our set_recmode child times out. this would happen if
764 ctdb_recovery_lock() would block.
765 */
766static void ctdb_set_recmode_timeout(struct tevent_context *ev,
767 struct tevent_timer *te,
768 struct timeval t, void *private_data)
769{
770 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
771 struct ctdb_set_recmode_state);
772
773 /* we consider this a success, not a failure, as we failed to
774 set the recovery lock which is what we wanted. This can be
775 caused by the cluster filesystem being very slow to
776 arbitrate locks immediately after a node failure.
777 */
778 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
779 state->ctdb->recovery_mode = state->recmode;
780 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
781 talloc_free(state);
782}
783
784
785/* when we free the recmode state we must kill any child process.
786*/
787static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
788{
789 double l = timeval_elapsed(&state->start_time);
790
791 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
792
793 if (state->fd[0] != -1) {
794 state->fd[0] = -1;
795 }
796 if (state->fd[1] != -1) {
797 state->fd[1] = -1;
798 }
799 ctdb_kill(state->ctdb, state->child, SIGKILL);
800 return 0;
801}
802
803/* this is called when the client process has completed ctdb_recovery_lock()
804 and has written data back to us through the pipe.
805*/
806static void set_recmode_handler(struct tevent_context *ev,
807 struct tevent_fd *fde,
808 uint16_t flags, void *private_data)
809{
810 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
811 struct ctdb_set_recmode_state);
812 char c = 0;
813 int ret;
814
815 /* we got a response from our child process so we can abort the
816 timeout.
817 */
818 talloc_free(state->te);
819 state->te = NULL;
820
821
822 /* If, as expected, the child was unable to take the recovery
823 * lock then it will have written 0 into the pipe, so
824 * continue. However, any other value (e.g. 1) indicates that
825 * it was able to take the recovery lock when it should have
826 * been held by the recovery daemon on the recovery master.
827 */
828 ret = sys_read(state->fd[0], &c, 1);
829 if (ret != 1 || c != 0) {
830 ctdb_request_control_reply(
831 state->ctdb, state->c, NULL, -1,
832 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
833 talloc_free(state);
834 return;
835 }
836
837 state->ctdb->recovery_mode = state->recmode;
838
839 /* release any deferred attach calls from clients */
840 if (state->recmode == CTDB_RECOVERY_NORMAL) {
841 ctdb_process_deferred_attach(state->ctdb);
842 }
843
844 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
845 talloc_free(state);
846 return;
847}
848
849static void
850ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
851 struct timeval t, void *private_data)
852{
853 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
854
855 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
856 talloc_free(ctdb->release_ips_ctx);
857 ctdb->release_ips_ctx = NULL;
858
859 ctdb_release_all_ips(ctdb);
860}
861
862/*
863 * Set up an event to drop all public ips if we remain in recovery for too
864 * long
865 */
866int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
867{
868 if (ctdb->release_ips_ctx != NULL) {
869 talloc_free(ctdb->release_ips_ctx);
870 }
871 ctdb->release_ips_ctx = talloc_new(ctdb);
872 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
873
874 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
875 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
876 ctdb_drop_all_ips_event, ctdb);
877 return 0;
878}
879
880/*
881 set the recovery mode
882 */
883int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
884 struct ctdb_req_control_old *c,
885 TDB_DATA indata, bool *async_reply,
886 const char **errormsg)
887{
888 uint32_t recmode = *(uint32_t *)indata.dptr;
889 int i, ret;
890 struct ctdb_set_recmode_state *state;
891 pid_t parent = getpid();
892 struct ctdb_db_context *ctdb_db;
893
894 /* if we enter recovery but stay in recovery for too long
895 we will eventually drop all our ip addresses
896 */
897 if (recmode == CTDB_RECOVERY_NORMAL) {
898 talloc_free(ctdb->release_ips_ctx);
899 ctdb->release_ips_ctx = NULL;
900 } else {
901 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
902 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
903 }
904 }
905
906 if (recmode != ctdb->recovery_mode) {
907 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
908 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
909 }
910
911 if (recmode != CTDB_RECOVERY_NORMAL ||
912 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
913 ctdb->recovery_mode = recmode;
914 return 0;
915 }
916
917 /* some special handling when ending recovery mode */
918
919 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
920 if (ctdb_db->generation != ctdb->vnn_map->generation) {
921 DEBUG(DEBUG_ERR,
922 ("Inconsistent DB generation %u for %s\n",
923 ctdb_db->generation, ctdb_db->db_name));
924 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
925 return -1;
926 }
927 }
928
929 /* force the databases to thaw */
930 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
931 if (ctdb_db_prio_frozen(ctdb, i)) {
932 ctdb_control_thaw(ctdb, i, false);
933 }
934 }
935
936 /* release any deferred attach calls from clients */
937 if (recmode == CTDB_RECOVERY_NORMAL) {
938 ctdb_process_deferred_attach(ctdb);
939 }
940
941 if (ctdb->recovery_lock_file == NULL) {
942 /* Not using recovery lock file */
943 ctdb->recovery_mode = recmode;
944 return 0;
945 }
946
947 state = talloc(ctdb, struct ctdb_set_recmode_state);
948 CTDB_NO_MEMORY(ctdb, state);
949
950 state->start_time = timeval_current();
951 state->fd[0] = -1;
952 state->fd[1] = -1;
953
954 /* For the rest of what needs to be done, we need to do this in
955 a child process since
956 1, the call to ctdb_recovery_lock() can block if the cluster
957 filesystem is in the process of recovery.
958 */
959 ret = pipe(state->fd);
960 if (ret != 0) {
961 talloc_free(state);
962 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
963 return -1;
964 }
965
966 state->child = ctdb_fork(ctdb);
967 if (state->child == (pid_t)-1) {
968 close(state->fd[0]);
969 close(state->fd[1]);
970 talloc_free(state);
971 return -1;
972 }
973
974 if (state->child == 0) {
975 char cc = 0;
976 close(state->fd[0]);
977
978 prctl_set_comment("ctdb_recmode");
979 debug_extra = talloc_asprintf(NULL, "set_recmode:");
980 /* Daemon should not be able to get the recover lock,
981 * as it should be held by the recovery master */
982 if (ctdb_recovery_lock(ctdb)) {
983 DEBUG(DEBUG_ERR,
984 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
985 ctdb->recovery_lock_file));
986 ctdb_recovery_unlock(ctdb);
987 cc = 1;
988 }
989
990 sys_write(state->fd[1], &cc, 1);
991 /* make sure we die when our parent dies */
992 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
993 sleep(5);
994 sys_write(state->fd[1], &cc, 1);
995 }
996 _exit(0);
997 }
998 close(state->fd[1]);
999 set_close_on_exec(state->fd[0]);
1000
1001 state->fd[1] = -1;
1002
1003 talloc_set_destructor(state, set_recmode_destructor);
1004
1005 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
1006
1007 state->te = tevent_add_timer(ctdb->ev, state, timeval_current_ofs(5, 0),
1008 ctdb_set_recmode_timeout, state);
1009
1010 state->fde = tevent_add_fd(ctdb->ev, state, state->fd[0], TEVENT_FD_READ,
1011 set_recmode_handler, (void *)state);
1012
1013 if (state->fde == NULL) {
1014 talloc_free(state);
1015 return -1;
1016 }
1017 tevent_fd_set_auto_close(state->fde);
1018
1019 state->ctdb = ctdb;
1020 state->recmode = recmode;
1021 state->c = talloc_steal(state, c);
1022
1023 *async_reply = true;
1024
1025 return 0;
1026}
1027
1028
1029bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
1030{
1031 return ctdb->recovery_lock_fd != -1;
1032}
1033
1034/*
1035 try and get the recovery lock in shared storage - should only work
1036 on the recovery master recovery daemon. Anywhere else is a bug
1037 */
1038bool ctdb_recovery_lock(struct ctdb_context *ctdb)
1039{
1040 struct flock lock;
1041
1042 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
1043 O_RDWR|O_CREAT, 0600);
1044 if (ctdb->recovery_lock_fd == -1) {
1045 DEBUG(DEBUG_ERR,
1046 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
1047 ctdb->recovery_lock_file, strerror(errno)));
1048 return false;
1049 }
1050
1051 set_close_on_exec(ctdb->recovery_lock_fd);
1052
1053 lock.l_type = F_WRLCK;
1054 lock.l_whence = SEEK_SET;
1055 lock.l_start = 0;
1056 lock.l_len = 1;
1057 lock.l_pid = 0;
1058
1059 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
1060 int saved_errno = errno;
1061 close(ctdb->recovery_lock_fd);
1062 ctdb->recovery_lock_fd = -1;
1063 /* Fail silently on these errors, since they indicate
1064 * lock contention, but log an error for any other
1065 * failure. */
1066 if (saved_errno != EACCES &&
1067 saved_errno != EAGAIN) {
1068 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
1069 "recovery lock on '%s' - (%s)\n",
1070 ctdb->recovery_lock_file,
1071 strerror(saved_errno)));
1072 }
1073 return false;
1074 }
1075
1076 return true;
1077}
1078
1079void ctdb_recovery_unlock(struct ctdb_context *ctdb)
1080{
1081 if (ctdb->recovery_lock_fd != -1) {
1082 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
1083 close(ctdb->recovery_lock_fd);
1084 ctdb->recovery_lock_fd = -1;
1085 }
1086}
1087
1088/*
1089 delete a record as part of the vacuum process
1090 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
1091 use non-blocking locks
1092
1093 return 0 if the record was successfully deleted (i.e. it does not exist
1094 when the function returns)
1095 or !0 is the record still exists in the tdb after returning.
1096 */
1097static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
1098{
1099 TDB_DATA key, data, data2;
1100 struct ctdb_ltdb_header *hdr, *hdr2;
1101
1102 /* these are really internal tdb functions - but we need them here for
1103 non-blocking lock of the freelist */
1104 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
1105 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
1106
1107
1108 key.dsize = rec->keylen;
1109 key.dptr = &rec->data[0];
1110 data.dsize = rec->datalen;
1111 data.dptr = &rec->data[rec->keylen];
1112
1113 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1114 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
1115 return -1;
1116 }
1117
1118 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1119 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
1120 return -1;
1121 }
1122
1123 hdr = (struct ctdb_ltdb_header *)data.dptr;
1124
1125 /* use a non-blocking lock */
1126 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1127 return -1;
1128 }
1129
1130 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1131 if (data2.dptr == NULL) {
1132 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1133 return 0;
1134 }
1135
1136 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1137 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
1138 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1139 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
1140 }
1141 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1142 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
1143 }
1144 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1145 free(data2.dptr);
1146 return 0;
1147 }
1148
1149 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1150
1151 if (hdr2->rsn > hdr->rsn) {
1152 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1153 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
1154 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
1155 free(data2.dptr);
1156 return -1;
1157 }
1158
1159 /* do not allow deleting record that have readonly flags set. */
1160 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1161 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1162 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1163 free(data2.dptr);
1164 return -1;
1165 }
1166 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1167 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1168 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1169 free(data2.dptr);
1170 return -1;
1171 }
1172
1173 if (hdr2->dmaster == ctdb->pnn) {
1174 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1175 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1176 free(data2.dptr);
1177 return -1;
1178 }
1179
1180 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1181 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1182 free(data2.dptr);
1183 return -1;
1184 }
1185
1186 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1187 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1188 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1189 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1190 free(data2.dptr);
1191 return -1;
1192 }
1193
1194 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1195 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1196 free(data2.dptr);
1197 return 0;
1198}
1199
1200
1201
1202struct recovery_callback_state {
1203 struct ctdb_req_control_old *c;
1204};
1205
1206
1207/*
1208 called when the 'recovered' event script has finished
1209 */
1210static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1211{
1212 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1213
1214 ctdb_enable_monitoring(ctdb);
1215 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1216
1217 if (status != 0) {
1218 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1219 if (status == -ETIME) {
1220 ctdb_ban_self(ctdb);
1221 }
1222 }
1223
1224 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1225 talloc_free(state);
1226
1227 gettimeofday(&ctdb->last_recovery_finished, NULL);
1228
1229 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1230 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1231 }
1232}
1233
1234/*
1235 recovery has finished
1236 */
1237int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1238 struct ctdb_req_control_old *c,
1239 bool *async_reply)
1240{
1241 int ret;
1242 struct recovery_callback_state *state;
1243
1244 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1245
1246 ctdb_persistent_finish_trans3_commits(ctdb);
1247
1248 state = talloc(ctdb, struct recovery_callback_state);
1249 CTDB_NO_MEMORY(ctdb, state);
1250
1251 state->c = c;
1252
1253 ctdb_disable_monitoring(ctdb);
1254
1255 ret = ctdb_event_script_callback(ctdb, state,
1256 ctdb_end_recovery_callback,
1257 state,
1258 CTDB_EVENT_RECOVERED, "%s", "");
1259
1260 if (ret != 0) {
1261 ctdb_enable_monitoring(ctdb);
1262
1263 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1264 talloc_free(state);
1265 return -1;
1266 }
1267
1268 /* tell the control that we will be reply asynchronously */
1269 state->c = talloc_steal(state, c);
1270 *async_reply = true;
1271 return 0;
1272}
1273
1274/*
1275 called when the 'startrecovery' event script has finished
1276 */
1277static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1278{
1279 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1280
1281 if (status != 0) {
1282 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1283 }
1284
1285 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1286 talloc_free(state);
1287}
1288
1289/*
1290 run the startrecovery eventscript
1291 */
1292int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1293 struct ctdb_req_control_old *c,
1294 bool *async_reply)
1295{
1296 int ret;
1297 struct recovery_callback_state *state;
1298
1299 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1300 gettimeofday(&ctdb->last_recovery_started, NULL);
1301
1302 state = talloc(ctdb, struct recovery_callback_state);
1303 CTDB_NO_MEMORY(ctdb, state);
1304
1305 state->c = talloc_steal(state, c);
1306
1307 ctdb_disable_monitoring(ctdb);
1308
1309 ret = ctdb_event_script_callback(ctdb, state,
1310 ctdb_start_recovery_callback,
1311 state,
1312 CTDB_EVENT_START_RECOVERY,
1313 "%s", "");
1314
1315 if (ret != 0) {
1316 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1317 talloc_free(state);
1318 return -1;
1319 }
1320
1321 /* tell the control that we will be reply asynchronously */
1322 *async_reply = true;
1323 return 0;
1324}
1325
1326/*
1327 try to delete all these records as part of the vacuuming process
1328 and return the records we failed to delete
1329*/
1330int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1331{
1332 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1333 struct ctdb_db_context *ctdb_db;
1334 int i;
1335 struct ctdb_rec_data_old *rec;
1336 struct ctdb_marshall_buffer *records;
1337
1338 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1339 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1340 return -1;
1341 }
1342
1343 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1344 if (!ctdb_db) {
1345 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1346 return -1;
1347 }
1348
1349
1350 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1351 reply->count, reply->db_id));
1352
1353
1354 /* create a blob to send back the records we couldnt delete */
1355 records = (struct ctdb_marshall_buffer *)
1356 talloc_zero_size(outdata,
1357 offsetof(struct ctdb_marshall_buffer, data));
1358 if (records == NULL) {
1359 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1360 return -1;
1361 }
1362 records->db_id = ctdb_db->db_id;
1363
1364
1365 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1366 for (i=0;i<reply->count;i++) {
1367 TDB_DATA key, data;
1368
1369 key.dptr = &rec->data[0];
1370 key.dsize = rec->keylen;
1371 data.dptr = &rec->data[key.dsize];
1372 data.dsize = rec->datalen;
1373
1374 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1375 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1376 talloc_free(records);
1377 return -1;
1378 }
1379
1380 /* If we cant delete the record we must add it to the reply
1381 so the lmaster knows it may not purge this record
1382 */
1383 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1384 size_t old_size;
1385 struct ctdb_ltdb_header *hdr;
1386
1387 hdr = (struct ctdb_ltdb_header *)data.dptr;
1388 data.dptr += sizeof(*hdr);
1389 data.dsize -= sizeof(*hdr);
1390
1391 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1392
1393 old_size = talloc_get_size(records);
1394 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1395 if (records == NULL) {
1396 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1397 return -1;
1398 }
1399 records->count++;
1400 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1401 }
1402
1403 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1404 }
1405
1406
1407 *outdata = ctdb_marshall_finish(records);
1408
1409 return 0;
1410}
1411
1412/**
1413 * Store a record as part of the vacuum process:
1414 * This is called from the RECEIVE_RECORD control which
1415 * the lmaster uses to send the current empty copy
1416 * to all nodes for storing, before it lets the other
1417 * nodes delete the records in the second phase with
1418 * the TRY_DELETE_RECORDS control.
1419 *
1420 * Only store if we are not lmaster or dmaster, and our
1421 * rsn is <= the provided rsn. Use non-blocking locks.
1422 *
1423 * return 0 if the record was successfully stored.
1424 * return !0 if the record still exists in the tdb after returning.
1425 */
1426static int store_tdb_record(struct ctdb_context *ctdb,
1427 struct ctdb_db_context *ctdb_db,
1428 struct ctdb_rec_data_old *rec)
1429{
1430 TDB_DATA key, data, data2;
1431 struct ctdb_ltdb_header *hdr, *hdr2;
1432 int ret;
1433
1434 key.dsize = rec->keylen;
1435 key.dptr = &rec->data[0];
1436 data.dsize = rec->datalen;
1437 data.dptr = &rec->data[rec->keylen];
1438
1439 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1440 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1441 "where we are lmaster\n"));
1442 return -1;
1443 }
1444
1445 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1446 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1447 return -1;
1448 }
1449
1450 hdr = (struct ctdb_ltdb_header *)data.dptr;
1451
1452 /* use a non-blocking lock */
1453 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1454 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1455 return -1;
1456 }
1457
1458 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1459 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1460 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1461 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1462 ret = -1;
1463 goto done;
1464 }
1465 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1466 ret = 0;
1467 goto done;
1468 }
1469
1470 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1471
1472 if (hdr2->rsn > hdr->rsn) {
1473 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1474 "rsn=%llu - called with rsn=%llu\n",
1475 (unsigned long long)hdr2->rsn,
1476 (unsigned long long)hdr->rsn));
1477 ret = -1;
1478 goto done;
1479 }
1480
1481 /* do not allow vacuuming of records that have readonly flags set. */
1482 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1483 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1484 "flags set\n"));
1485 ret = -1;
1486 goto done;
1487 }
1488 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1489 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1490 "flags set\n"));
1491 ret = -1;
1492 goto done;
1493 }
1494
1495 if (hdr2->dmaster == ctdb->pnn) {
1496 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1497 "where we are the dmaster\n"));
1498 ret = -1;
1499 goto done;
1500 }
1501
1502 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1503 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1504 ret = -1;
1505 goto done;
1506 }
1507
1508 ret = 0;
1509
1510done:
1511 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1512 free(data2.dptr);
1513 return ret;
1514}
1515
1516
1517
1518/**
1519 * Try to store all these records as part of the vacuuming process
1520 * and return the records we failed to store.
1521 */
1522int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1523 TDB_DATA indata, TDB_DATA *outdata)
1524{
1525 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1526 struct ctdb_db_context *ctdb_db;
1527 int i;
1528 struct ctdb_rec_data_old *rec;
1529 struct ctdb_marshall_buffer *records;
1530
1531 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1532 DEBUG(DEBUG_ERR,
1533 (__location__ " invalid data in receive_records\n"));
1534 return -1;
1535 }
1536
1537 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1538 if (!ctdb_db) {
1539 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1540 reply->db_id));
1541 return -1;
1542 }
1543
1544 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1545 "dbid 0x%x\n", reply->count, reply->db_id));
1546
1547 /* create a blob to send back the records we could not store */
1548 records = (struct ctdb_marshall_buffer *)
1549 talloc_zero_size(outdata,
1550 offsetof(struct ctdb_marshall_buffer, data));
1551 if (records == NULL) {
1552 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1553 return -1;
1554 }
1555 records->db_id = ctdb_db->db_id;
1556
1557 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1558 for (i=0; i<reply->count; i++) {
1559 TDB_DATA key, data;
1560
1561 key.dptr = &rec->data[0];
1562 key.dsize = rec->keylen;
1563 data.dptr = &rec->data[key.dsize];
1564 data.dsize = rec->datalen;
1565
1566 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1567 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1568 "in indata\n"));
1569 talloc_free(records);
1570 return -1;
1571 }
1572
1573 /*
1574 * If we can not store the record we must add it to the reply
1575 * so the lmaster knows it may not purge this record.
1576 */
1577 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1578 size_t old_size;
1579 struct ctdb_ltdb_header *hdr;
1580
1581 hdr = (struct ctdb_ltdb_header *)data.dptr;
1582 data.dptr += sizeof(*hdr);
1583 data.dsize -= sizeof(*hdr);
1584
1585 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1586 "record with hash 0x%08x in vacuum "
1587 "via RECEIVE_RECORDS\n",
1588 ctdb_hash(&key)));
1589
1590 old_size = talloc_get_size(records);
1591 records = talloc_realloc_size(outdata, records,
1592 old_size + rec->length);
1593 if (records == NULL) {
1594 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1595 "expand\n"));
1596 return -1;
1597 }
1598 records->count++;
1599 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1600 }
1601
1602 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1603 }
1604
1605 *outdata = ctdb_marshall_finish(records);
1606
1607 return 0;
1608}
1609
1610
1611/*
1612 report capabilities
1613 */
1614int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1615{
1616 uint32_t *capabilities = NULL;
1617
1618 capabilities = talloc(outdata, uint32_t);
1619 CTDB_NO_MEMORY(ctdb, capabilities);
1620 *capabilities = ctdb->capabilities;
1621
1622 outdata->dsize = sizeof(uint32_t);
1623 outdata->dptr = (uint8_t *)capabilities;
1624
1625 return 0;
1626}
1627
1628/* The recovery daemon will ping us at regular intervals.
1629 If we havent been pinged for a while we assume the recovery
1630 daemon is inoperable and we restart.
1631*/
1632static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1633 struct tevent_timer *te,
1634 struct timeval t, void *p)
1635{
1636 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1637 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1638
1639 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1640
1641 if (*count < ctdb->tunable.recd_ping_failcount) {
1642 (*count)++;
1643 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1644 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1645 ctdb_recd_ping_timeout, ctdb);
1646 return;
1647 }
1648
1649 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1650
1651 ctdb_stop_recoverd(ctdb);
1652 ctdb_start_recoverd(ctdb);
1653}
1654
1655int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1656{
1657 talloc_free(ctdb->recd_ping_count);
1658
1659 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1660 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1661
1662 if (ctdb->tunable.recd_ping_timeout != 0) {
1663 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1664 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1665 ctdb_recd_ping_timeout, ctdb);
1666 }
1667
1668 return 0;
1669}
1670
1671
1672
1673int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1674{
1675 uint32_t new_recmaster;
1676
1677 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1678 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1679
1680 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1681 DEBUG(DEBUG_NOTICE,
1682 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1683 }
1684
1685 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1686 DEBUG(DEBUG_NOTICE,
1687 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1688 }
1689
1690 ctdb->recovery_master = new_recmaster;
1691 return 0;
1692}
1693
1694
1695int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1696{
1697 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1698 ctdb_disable_monitoring(ctdb);
1699 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1700
1701 return 0;
1702}
1703
1704int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1705{
1706 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1707 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1708
1709 return 0;
1710}
1711
Note: See TracBrowser for help on using the repository browser.