source: vendor/current/ctdb/server/ctdb_daemon.c

Last change on this file was 989, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.7

File size: 50.9 KB
Line 
1/*
2 ctdb daemon code
3
4 Copyright (C) Andrew Tridgell 2006
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "replace.h"
21#include "system/network.h"
22#include "system/filesys.h"
23#include "system/wait.h"
24#include "system/time.h"
25
26#include <talloc.h>
27/* Allow use of deprecated function tevent_loop_allow_nesting() */
28#define TEVENT_DEPRECATED
29#include <tevent.h>
30#include <tdb.h>
31
32#include "lib/tdb_wrap/tdb_wrap.h"
33#include "lib/util/dlinklist.h"
34#include "lib/util/debug.h"
35#include "lib/util/samba_util.h"
36
37#include "ctdb_version.h"
38#include "ctdb_private.h"
39#include "ctdb_client.h"
40
41#include "common/rb_tree.h"
42#include "common/reqid.h"
43#include "common/system.h"
44#include "common/common.h"
45#include "common/logging.h"
46#include "common/pidfile.h"
47
48struct ctdb_client_pid_list {
49 struct ctdb_client_pid_list *next, *prev;
50 struct ctdb_context *ctdb;
51 pid_t pid;
52 struct ctdb_client *client;
53};
54
55const char *ctdbd_pidfile = NULL;
56static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
57
58static void daemon_incoming_packet(void *, struct ctdb_req_header *);
59
60static void print_exit_message(void)
61{
62 if (debug_extra != NULL && debug_extra[0] != '\0') {
63 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
64 } else {
65 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
66
67 /* Wait a second to allow pending log messages to be flushed */
68 sleep(1);
69 }
70}
71
72
73
74static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
75 struct timeval t, void *private_data)
76{
77 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
78
79 if (getpid() != ctdb->ctdbd_pid) {
80 return;
81 }
82
83 tevent_add_timer(ctdb->ev, ctdb,
84 timeval_current_ofs(1, 0),
85 ctdb_time_tick, ctdb);
86}
87
88/* Used to trigger a dummy event once per second, to make
89 * detection of hangs more reliable.
90 */
91static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
92{
93 tevent_add_timer(ctdb->ev, ctdb,
94 timeval_current_ofs(1, 0),
95 ctdb_time_tick, ctdb);
96}
97
98static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
99{
100 /* start monitoring for connected/disconnected nodes */
101 ctdb_start_keepalive(ctdb);
102
103 /* start periodic update of tcp tickle lists */
104 ctdb_start_tcp_tickle_update(ctdb);
105
106 /* start listening for recovery daemon pings */
107 ctdb_control_recd_ping(ctdb);
108
109 /* start listening to timer ticks */
110 ctdb_start_time_tickd(ctdb);
111}
112
113static void ignore_signal(int signum)
114{
115 struct sigaction act;
116
117 memset(&act, 0, sizeof(act));
118
119 act.sa_handler = SIG_IGN;
120 sigemptyset(&act.sa_mask);
121 sigaddset(&act.sa_mask, signum);
122 sigaction(signum, &act, NULL);
123}
124
125
126/*
127 send a packet to a client
128 */
129static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
130{
131 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
132 if (hdr->operation == CTDB_REQ_MESSAGE) {
133 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
134 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
135 talloc_free(client);
136 return -1;
137 }
138 }
139 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
140}
141
142/*
143 message handler for when we are in daemon mode. This redirects the message
144 to the right client
145 */
146static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
147 void *private_data)
148{
149 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
150 struct ctdb_req_message_old *r;
151 int len;
152
153 /* construct a message to send to the client containing the data */
154 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
155 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
156 len, struct ctdb_req_message_old);
157 CTDB_NO_MEMORY_VOID(client->ctdb, r);
158
159 talloc_set_name_const(r, "req_message packet");
160
161 r->srvid = srvid;
162 r->datalen = data.dsize;
163 memcpy(&r->data[0], data.dptr, data.dsize);
164
165 daemon_queue_send(client, &r->hdr);
166
167 talloc_free(r);
168}
169
170/*
171 this is called when the ctdb daemon received a ctdb request to
172 set the srvid from the client
173 */
174int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
175{
176 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
177 int res;
178 if (client == NULL) {
179 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
180 return -1;
181 }
182 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
183 client);
184 if (res != 0) {
185 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
186 (unsigned long long)srvid));
187 } else {
188 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
189 (unsigned long long)srvid));
190 }
191
192 return res;
193}
194
195/*
196 this is called when the ctdb daemon received a ctdb request to
197 remove a srvid from the client
198 */
199int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
200{
201 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
202 if (client == NULL) {
203 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
204 return -1;
205 }
206 return srvid_deregister(ctdb->srv, srvid, client);
207}
208
209int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
210 TDB_DATA *outdata)
211{
212 uint64_t *ids;
213 int i, num_ids;
214 uint8_t *results;
215
216 if ((indata.dsize % sizeof(uint64_t)) != 0) {
217 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
218 "size=%d\n", (int)indata.dsize));
219 return -1;
220 }
221
222 ids = (uint64_t *)indata.dptr;
223 num_ids = indata.dsize / 8;
224
225 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
226 if (results == NULL) {
227 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
228 return -1;
229 }
230 for (i=0; i<num_ids; i++) {
231 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
232 results[i/8] |= (1 << (i%8));
233 }
234 }
235 outdata->dptr = (uint8_t *)results;
236 outdata->dsize = talloc_get_size(results);
237 return 0;
238}
239
240/*
241 destroy a ctdb_client
242*/
243static int ctdb_client_destructor(struct ctdb_client *client)
244{
245 struct ctdb_db_context *ctdb_db;
246
247 ctdb_takeover_client_destructor_hook(client);
248 reqid_remove(client->ctdb->idr, client->client_id);
249 client->ctdb->num_clients--;
250
251 if (client->num_persistent_updates != 0) {
252 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
253 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
254 }
255 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
256 if (ctdb_db) {
257 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
258 "commit active. Forcing recovery.\n"));
259 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
260
261 /*
262 * trans3 transaction state:
263 *
264 * The destructor sets the pointer to NULL.
265 */
266 talloc_free(ctdb_db->persistent_state);
267 }
268
269 return 0;
270}
271
272
273/*
274 this is called when the ctdb daemon received a ctdb request message
275 from a local client over the unix domain socket
276 */
277static void daemon_request_message_from_client(struct ctdb_client *client,
278 struct ctdb_req_message_old *c)
279{
280 TDB_DATA data;
281 int res;
282
283 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
284 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
285 }
286
287 /* maybe the message is for another client on this node */
288 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
289 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
290 return;
291 }
292
293 /* its for a remote node */
294 data.dptr = &c->data[0];
295 data.dsize = c->datalen;
296 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
297 c->srvid, data);
298 if (res != 0) {
299 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
300 c->hdr.destnode));
301 }
302}
303
304
305struct daemon_call_state {
306 struct ctdb_client *client;
307 uint32_t reqid;
308 struct ctdb_call *call;
309 struct timeval start_time;
310
311 /* readonly request ? */
312 uint32_t readonly_fetch;
313 uint32_t client_callid;
314};
315
316/*
317 complete a call from a client
318*/
319static void daemon_call_from_client_callback(struct ctdb_call_state *state)
320{
321 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
322 struct daemon_call_state);
323 struct ctdb_reply_call_old *r;
324 int res;
325 uint32_t length;
326 struct ctdb_client *client = dstate->client;
327 struct ctdb_db_context *ctdb_db = state->ctdb_db;
328
329 talloc_steal(client, dstate);
330 talloc_steal(dstate, dstate->call);
331
332 res = ctdb_daemon_call_recv(state, dstate->call);
333 if (res != 0) {
334 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
335 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
336
337 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
338 return;
339 }
340
341 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
342 /* If the client asked for readonly FETCH, we remapped this to
343 FETCH_WITH_HEADER when calling the daemon. So we must
344 strip the extra header off the reply data before passing
345 it back to the client.
346 */
347 if (dstate->readonly_fetch
348 && dstate->client_callid == CTDB_FETCH_FUNC) {
349 length -= sizeof(struct ctdb_ltdb_header);
350 }
351
352 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
353 length, struct ctdb_reply_call_old);
354 if (r == NULL) {
355 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
356 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
357 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
358 return;
359 }
360 r->hdr.reqid = dstate->reqid;
361 r->status = dstate->call->status;
362
363 if (dstate->readonly_fetch
364 && dstate->client_callid == CTDB_FETCH_FUNC) {
365 /* client only asked for a FETCH so we must strip off
366 the extra ctdb_ltdb header
367 */
368 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
369 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
370 } else {
371 r->datalen = dstate->call->reply_data.dsize;
372 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
373 }
374
375 res = daemon_queue_send(client, &r->hdr);
376 if (res == -1) {
377 /* client is dead - return immediately */
378 return;
379 }
380 if (res != 0) {
381 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
382 }
383 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
384 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
385 talloc_free(dstate);
386}
387
388struct ctdb_daemon_packet_wrap {
389 struct ctdb_context *ctdb;
390 uint32_t client_id;
391};
392
393/*
394 a wrapper to catch disconnected clients
395 */
396static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
397{
398 struct ctdb_client *client;
399 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
400 struct ctdb_daemon_packet_wrap);
401 if (w == NULL) {
402 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
403 return;
404 }
405
406 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
407 if (client == NULL) {
408 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
409 w->client_id));
410 talloc_free(w);
411 return;
412 }
413 talloc_free(w);
414
415 /* process it */
416 daemon_incoming_packet(client, hdr);
417}
418
419struct ctdb_deferred_fetch_call {
420 struct ctdb_deferred_fetch_call *next, *prev;
421 struct ctdb_req_call_old *c;
422 struct ctdb_daemon_packet_wrap *w;
423};
424
425struct ctdb_deferred_fetch_queue {
426 struct ctdb_deferred_fetch_call *deferred_calls;
427};
428
429struct ctdb_deferred_requeue {
430 struct ctdb_deferred_fetch_call *dfc;
431 struct ctdb_client *client;
432};
433
434/* called from a timer event and starts reprocessing the deferred call.*/
435static void reprocess_deferred_call(struct tevent_context *ev,
436 struct tevent_timer *te,
437 struct timeval t, void *private_data)
438{
439 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
440 struct ctdb_client *client = dfr->client;
441
442 talloc_steal(client, dfr->dfc->c);
443 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
444 talloc_free(dfr);
445}
446
447/* the referral context is destroyed either after a timeout or when the initial
448 fetch-lock has finished.
449 at this stage, immediately start reprocessing the queued up deferred
450 calls so they get reprocessed immediately (and since we are dmaster at
451 this stage, trigger the waiting smbd processes to pick up and aquire the
452 record right away.
453*/
454static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
455{
456
457 /* need to reprocess the packets from the queue explicitely instead of
458 just using a normal destructor since we want, need, to
459 call the clients in the same oder as the requests queued up
460 */
461 while (dfq->deferred_calls != NULL) {
462 struct ctdb_client *client;
463 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
464 struct ctdb_deferred_requeue *dfr;
465
466 DLIST_REMOVE(dfq->deferred_calls, dfc);
467
468 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
469 if (client == NULL) {
470 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
471 dfc->w->client_id));
472 continue;
473 }
474
475 /* process it by pushing it back onto the eventloop */
476 dfr = talloc(client, struct ctdb_deferred_requeue);
477 if (dfr == NULL) {
478 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
479 continue;
480 }
481
482 dfr->dfc = talloc_steal(dfr, dfc);
483 dfr->client = client;
484
485 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
486 reprocess_deferred_call, dfr);
487 }
488
489 return 0;
490}
491
492/* insert the new deferral context into the rb tree.
493 there should never be a pre-existing context here, but check for it
494 warn and destroy the previous context if there is already a deferral context
495 for this key.
496*/
497static void *insert_dfq_callback(void *parm, void *data)
498{
499 if (data) {
500 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
501 talloc_free(data);
502 }
503 return parm;
504}
505
506/* if the original fetch-lock did not complete within a reasonable time,
507 free the context and context for all deferred requests to cause them to be
508 re-inserted into the event system.
509*/
510static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
511 struct timeval t, void *private_data)
512{
513 talloc_free(private_data);
514}
515
516/* This function is used in the local daemon to register a KEY in a database
517 for being "fetched"
518 While the remote fetch is in-flight, any futher attempts to re-fetch the
519 same record will be deferred until the fetch completes.
520*/
521static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
522{
523 uint32_t *k;
524 struct ctdb_deferred_fetch_queue *dfq;
525
526 k = ctdb_key_to_idkey(call, call->key);
527 if (k == NULL) {
528 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
529 return -1;
530 }
531
532 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
533 if (dfq == NULL) {
534 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
535 talloc_free(k);
536 return -1;
537 }
538 dfq->deferred_calls = NULL;
539
540 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
541
542 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
543
544 /* if the fetch havent completed in 30 seconds, just tear it all down
545 and let it try again as the events are reissued */
546 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
547 dfq_timeout, dfq);
548
549 talloc_free(k);
550 return 0;
551}
552
553/* check if this is a duplicate request to a fetch already in-flight
554 if it is, make this call deferred to be reprocessed later when
555 the in-flight fetch completes.
556*/
557static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
558{
559 uint32_t *k;
560 struct ctdb_deferred_fetch_queue *dfq;
561 struct ctdb_deferred_fetch_call *dfc;
562
563 k = ctdb_key_to_idkey(c, key);
564 if (k == NULL) {
565 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
566 return -1;
567 }
568
569 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
570 if (dfq == NULL) {
571 talloc_free(k);
572 return -1;
573 }
574
575
576 talloc_free(k);
577
578 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
579 if (dfc == NULL) {
580 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
581 return -1;
582 }
583
584 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
585 if (dfc->w == NULL) {
586 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
587 talloc_free(dfc);
588 return -1;
589 }
590
591 dfc->c = talloc_steal(dfc, c);
592 dfc->w->ctdb = ctdb_db->ctdb;
593 dfc->w->client_id = client->client_id;
594
595 DLIST_ADD_END(dfq->deferred_calls, dfc);
596
597 return 0;
598}
599
600
601/*
602 this is called when the ctdb daemon received a ctdb request call
603 from a local client over the unix domain socket
604 */
605static void daemon_request_call_from_client(struct ctdb_client *client,
606 struct ctdb_req_call_old *c)
607{
608 struct ctdb_call_state *state;
609 struct ctdb_db_context *ctdb_db;
610 struct daemon_call_state *dstate;
611 struct ctdb_call *call;
612 struct ctdb_ltdb_header header;
613 TDB_DATA key, data;
614 int ret;
615 struct ctdb_context *ctdb = client->ctdb;
616 struct ctdb_daemon_packet_wrap *w;
617
618 CTDB_INCREMENT_STAT(ctdb, total_calls);
619 CTDB_INCREMENT_STAT(ctdb, pending_calls);
620
621 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
622 if (!ctdb_db) {
623 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
624 c->db_id));
625 CTDB_DECREMENT_STAT(ctdb, pending_calls);
626 return;
627 }
628
629 if (ctdb_db->unhealthy_reason) {
630 /*
631 * this is just a warning, as the tdb should be empty anyway,
632 * and only persistent databases can be unhealthy, which doesn't
633 * use this code patch
634 */
635 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
636 ctdb_db->db_name, ctdb_db->unhealthy_reason));
637 }
638
639 key.dptr = c->data;
640 key.dsize = c->keylen;
641
642 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
643 CTDB_NO_MEMORY_VOID(ctdb, w);
644
645 w->ctdb = ctdb;
646 w->client_id = client->client_id;
647
648 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
649 (struct ctdb_req_header *)c, &data,
650 daemon_incoming_packet_wrap, w, true);
651 if (ret == -2) {
652 /* will retry later */
653 CTDB_DECREMENT_STAT(ctdb, pending_calls);
654 return;
655 }
656
657 talloc_free(w);
658
659 if (ret != 0) {
660 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
661 CTDB_DECREMENT_STAT(ctdb, pending_calls);
662 return;
663 }
664
665
666 /* check if this fetch request is a duplicate for a
667 request we already have in flight. If so defer it until
668 the first request completes.
669 */
670 if (ctdb->tunable.fetch_collapse == 1) {
671 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
672 ret = ctdb_ltdb_unlock(ctdb_db, key);
673 if (ret != 0) {
674 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
675 }
676 CTDB_DECREMENT_STAT(ctdb, pending_calls);
677 return;
678 }
679 }
680
681 /* Dont do READONLY if we don't have a tracking database */
682 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
683 c->flags &= ~CTDB_WANT_READONLY;
684 }
685
686 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
687 header.flags &= ~CTDB_REC_RO_FLAGS;
688 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
689 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
690 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
691 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
692 }
693 /* and clear out the tracking data */
694 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
695 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
696 }
697 }
698
699 /* if we are revoking, we must defer all other calls until the revoke
700 * had completed.
701 */
702 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
703 talloc_free(data.dptr);
704 ret = ctdb_ltdb_unlock(ctdb_db, key);
705
706 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
707 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
708 }
709 CTDB_DECREMENT_STAT(ctdb, pending_calls);
710 return;
711 }
712
713 if ((header.dmaster == ctdb->pnn)
714 && (!(c->flags & CTDB_WANT_READONLY))
715 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
716 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
717 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
718 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
719 }
720 ret = ctdb_ltdb_unlock(ctdb_db, key);
721
722 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
723 ctdb_fatal(ctdb, "Failed to start record revoke");
724 }
725 talloc_free(data.dptr);
726
727 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
728 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
729 }
730
731 CTDB_DECREMENT_STAT(ctdb, pending_calls);
732 return;
733 }
734
735 dstate = talloc(client, struct daemon_call_state);
736 if (dstate == NULL) {
737 ret = ctdb_ltdb_unlock(ctdb_db, key);
738 if (ret != 0) {
739 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
740 }
741
742 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
743 CTDB_DECREMENT_STAT(ctdb, pending_calls);
744 return;
745 }
746 dstate->start_time = timeval_current();
747 dstate->client = client;
748 dstate->reqid = c->hdr.reqid;
749 talloc_steal(dstate, data.dptr);
750
751 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
752 if (call == NULL) {
753 ret = ctdb_ltdb_unlock(ctdb_db, key);
754 if (ret != 0) {
755 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
756 }
757
758 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
759 CTDB_DECREMENT_STAT(ctdb, pending_calls);
760 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
761 return;
762 }
763
764 dstate->readonly_fetch = 0;
765 call->call_id = c->callid;
766 call->key = key;
767 call->call_data.dptr = c->data + c->keylen;
768 call->call_data.dsize = c->calldatalen;
769 call->flags = c->flags;
770
771 if (c->flags & CTDB_WANT_READONLY) {
772 /* client wants readonly record, so translate this into a
773 fetch with header. remember what the client asked for
774 so we can remap the reply back to the proper format for
775 the client in the reply
776 */
777 dstate->client_callid = call->call_id;
778 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
779 dstate->readonly_fetch = 1;
780 }
781
782 if (header.dmaster == ctdb->pnn) {
783 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
784 } else {
785 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
786 if (ctdb->tunable.fetch_collapse == 1) {
787 /* This request triggered a remote fetch-lock.
788 set up a deferral for this key so any additional
789 fetch-locks are deferred until the current one
790 finishes.
791 */
792 setup_deferred_fetch_locks(ctdb_db, call);
793 }
794 }
795
796 ret = ctdb_ltdb_unlock(ctdb_db, key);
797 if (ret != 0) {
798 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
799 }
800
801 if (state == NULL) {
802 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
803 CTDB_DECREMENT_STAT(ctdb, pending_calls);
804 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
805 return;
806 }
807 talloc_steal(state, dstate);
808 talloc_steal(client, state);
809
810 state->async.fn = daemon_call_from_client_callback;
811 state->async.private_data = dstate;
812}
813
814
815static void daemon_request_control_from_client(struct ctdb_client *client,
816 struct ctdb_req_control_old *c);
817
818/* data contains a packet from the client */
819static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
820{
821 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
822 TALLOC_CTX *tmp_ctx;
823 struct ctdb_context *ctdb = client->ctdb;
824
825 /* place the packet as a child of a tmp_ctx. We then use
826 talloc_free() below to free it. If any of the calls want
827 to keep it, then they will steal it somewhere else, and the
828 talloc_free() will be a no-op */
829 tmp_ctx = talloc_new(client);
830 talloc_steal(tmp_ctx, hdr);
831
832 if (hdr->ctdb_magic != CTDB_MAGIC) {
833 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
834 goto done;
835 }
836
837 if (hdr->ctdb_version != CTDB_PROTOCOL) {
838 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
839 goto done;
840 }
841
842 switch (hdr->operation) {
843 case CTDB_REQ_CALL:
844 CTDB_INCREMENT_STAT(ctdb, client.req_call);
845 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
846 break;
847
848 case CTDB_REQ_MESSAGE:
849 CTDB_INCREMENT_STAT(ctdb, client.req_message);
850 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
851 break;
852
853 case CTDB_REQ_CONTROL:
854 CTDB_INCREMENT_STAT(ctdb, client.req_control);
855 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
856 break;
857
858 default:
859 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
860 hdr->operation));
861 }
862
863done:
864 talloc_free(tmp_ctx);
865}
866
867/*
868 called when the daemon gets a incoming packet
869 */
870static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
871{
872 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
873 struct ctdb_req_header *hdr;
874
875 if (cnt == 0) {
876 talloc_free(client);
877 return;
878 }
879
880 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
881
882 if (cnt < sizeof(*hdr)) {
883 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
884 (unsigned)cnt);
885 return;
886 }
887 hdr = (struct ctdb_req_header *)data;
888 if (cnt != hdr->length) {
889 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
890 (unsigned)hdr->length, (unsigned)cnt);
891 return;
892 }
893
894 if (hdr->ctdb_magic != CTDB_MAGIC) {
895 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
896 return;
897 }
898
899 if (hdr->ctdb_version != CTDB_PROTOCOL) {
900 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
901 return;
902 }
903
904 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
905 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
906 hdr->srcnode, hdr->destnode));
907
908 /* it is the responsibility of the incoming packet function to free 'data' */
909 daemon_incoming_packet(client, hdr);
910}
911
912
913static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
914{
915 if (client_pid->ctdb->client_pids != NULL) {
916 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
917 }
918
919 return 0;
920}
921
922
923static void ctdb_accept_client(struct tevent_context *ev,
924 struct tevent_fd *fde, uint16_t flags,
925 void *private_data)
926{
927 struct sockaddr_un addr;
928 socklen_t len;
929 int fd;
930 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
931 struct ctdb_client *client;
932 struct ctdb_client_pid_list *client_pid;
933 pid_t peer_pid = 0;
934
935 memset(&addr, 0, sizeof(addr));
936 len = sizeof(addr);
937 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
938 if (fd == -1) {
939 return;
940 }
941
942 set_nonblocking(fd);
943 set_close_on_exec(fd);
944
945 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
946
947 client = talloc_zero(ctdb, struct ctdb_client);
948 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
949 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
950 }
951
952 client->ctdb = ctdb;
953 client->fd = fd;
954 client->client_id = reqid_new(ctdb->idr, client);
955 client->pid = peer_pid;
956
957 client_pid = talloc(client, struct ctdb_client_pid_list);
958 if (client_pid == NULL) {
959 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
960 close(fd);
961 talloc_free(client);
962 return;
963 }
964 client_pid->ctdb = ctdb;
965 client_pid->pid = peer_pid;
966 client_pid->client = client;
967
968 DLIST_ADD(ctdb->client_pids, client_pid);
969
970 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
971 ctdb_daemon_read_cb, client,
972 "client-%u", client->pid);
973
974 talloc_set_destructor(client, ctdb_client_destructor);
975 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
976 ctdb->num_clients++;
977}
978
979
980
981/*
982 create a unix domain socket and bind it
983 return a file descriptor open on the socket
984*/
985static int ux_socket_bind(struct ctdb_context *ctdb)
986{
987 struct sockaddr_un addr;
988 int ret;
989
990 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
991 if (ctdb->daemon.sd == -1) {
992 return -1;
993 }
994
995 memset(&addr, 0, sizeof(addr));
996 addr.sun_family = AF_UNIX;
997 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
998
999 /* Remove any old socket */
1000 ret = unlink(ctdb->daemon.name);
1001 if (ret == 0) {
1002 DEBUG(DEBUG_WARNING,
1003 ("Removed stale socket %s\n", ctdb->daemon.name));
1004 } else if (errno != ENOENT) {
1005 DEBUG(DEBUG_ERR,
1006 ("Failed to remove stale socket %s\n", ctdb->daemon.name));
1007 return -1;
1008 }
1009
1010 set_close_on_exec(ctdb->daemon.sd);
1011 set_nonblocking(ctdb->daemon.sd);
1012
1013 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1014 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1015 goto failed;
1016 }
1017
1018 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1019 chmod(ctdb->daemon.name, 0700) != 0) {
1020 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1021 goto failed;
1022 }
1023
1024
1025 if (listen(ctdb->daemon.sd, 100) != 0) {
1026 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1027 goto failed;
1028 }
1029
1030 return 0;
1031
1032failed:
1033 close(ctdb->daemon.sd);
1034 ctdb->daemon.sd = -1;
1035 return -1;
1036}
1037
1038static void initialise_node_flags (struct ctdb_context *ctdb)
1039{
1040 if (ctdb->pnn == -1) {
1041 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1042 }
1043
1044 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1045
1046 /* do we start out in DISABLED mode? */
1047 if (ctdb->start_as_disabled != 0) {
1048 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1049 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1050 }
1051 /* do we start out in STOPPED mode? */
1052 if (ctdb->start_as_stopped != 0) {
1053 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1054 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1055 }
1056}
1057
1058static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1059 void *private_data)
1060{
1061 if (status != 0) {
1062 ctdb_die(ctdb, "Failed to run setup event");
1063 }
1064 ctdb_run_notification_script(ctdb, "setup");
1065
1066 /* tell all other nodes we've just started up */
1067 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1068 0, CTDB_CONTROL_STARTUP, 0,
1069 CTDB_CTRL_FLAG_NOREPLY,
1070 tdb_null, NULL, NULL);
1071
1072 /* Start the recovery daemon */
1073 if (ctdb_start_recoverd(ctdb) != 0) {
1074 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1075 exit(11);
1076 }
1077
1078 ctdb_start_periodic_events(ctdb);
1079
1080 ctdb_wait_for_first_recovery(ctdb);
1081}
1082
1083static struct timeval tevent_before_wait_ts;
1084static struct timeval tevent_after_wait_ts;
1085
1086static void ctdb_tevent_trace(enum tevent_trace_point tp,
1087 void *private_data)
1088{
1089 struct timeval diff;
1090 struct timeval now;
1091 struct ctdb_context *ctdb =
1092 talloc_get_type(private_data, struct ctdb_context);
1093
1094 if (getpid() != ctdb->ctdbd_pid) {
1095 return;
1096 }
1097
1098 now = timeval_current();
1099
1100 switch (tp) {
1101 case TEVENT_TRACE_BEFORE_WAIT:
1102 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1103 diff = timeval_until(&tevent_after_wait_ts, &now);
1104 if (diff.tv_sec > 3) {
1105 DEBUG(DEBUG_ERR,
1106 ("Handling event took %ld seconds!\n",
1107 (long)diff.tv_sec));
1108 }
1109 }
1110 tevent_before_wait_ts = now;
1111 break;
1112
1113 case TEVENT_TRACE_AFTER_WAIT:
1114 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1115 diff = timeval_until(&tevent_before_wait_ts, &now);
1116 if (diff.tv_sec > 3) {
1117 DEBUG(DEBUG_CRIT,
1118 ("No event for %ld seconds!\n",
1119 (long)diff.tv_sec));
1120 }
1121 }
1122 tevent_after_wait_ts = now;
1123 break;
1124
1125 default:
1126 /* Do nothing for future tevent trace points */ ;
1127 }
1128}
1129
1130static void ctdb_remove_pidfile(void)
1131{
1132 TALLOC_FREE(ctdbd_pidfile_ctx);
1133}
1134
1135static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1136{
1137 if (ctdbd_pidfile != NULL) {
1138 int ret = pidfile_create(mem_ctx, ctdbd_pidfile,
1139 &ctdbd_pidfile_ctx);
1140 if (ret != 0) {
1141 DEBUG(DEBUG_ERR,
1142 ("Failed to create PID file %s\n",
1143 ctdbd_pidfile));
1144 exit(11);
1145 }
1146
1147 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1148 atexit(ctdb_remove_pidfile);
1149 }
1150}
1151
1152static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1153{
1154 int i, j, count;
1155
1156 /* initialize the vnn mapping table, skipping any deleted nodes */
1157 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1158 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1159
1160 count = 0;
1161 for (i = 0; i < ctdb->num_nodes; i++) {
1162 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1163 count++;
1164 }
1165 }
1166
1167 ctdb->vnn_map->generation = INVALID_GENERATION;
1168 ctdb->vnn_map->size = count;
1169 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1170 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1171
1172 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1173 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1174 continue;
1175 }
1176 ctdb->vnn_map->map[j] = i;
1177 j++;
1178 }
1179}
1180
1181static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1182{
1183 int nodeid;
1184
1185 if (ctdb->address == NULL) {
1186 ctdb_fatal(ctdb,
1187 "Can not determine PNN - node address is not set\n");
1188 }
1189
1190 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1191 if (nodeid == -1) {
1192 ctdb_fatal(ctdb,
1193 "Can not determine PNN - node address not found in node list\n");
1194 }
1195
1196 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1197 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1198}
1199
1200/*
1201 start the protocol going as a daemon
1202*/
1203int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1204{
1205 int res, ret = -1;
1206 struct tevent_fd *fde;
1207
1208 if (do_fork && fork()) {
1209 return 0;
1210 }
1211
1212 if (do_fork) {
1213 if (setsid() == -1) {
1214 ctdb_die(ctdb, "Failed to setsid()\n");
1215 }
1216 close(0);
1217 if (open("/dev/null", O_RDONLY) != 0) {
1218 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1219 exit(11);
1220 }
1221 }
1222 ignore_signal(SIGPIPE);
1223 ignore_signal(SIGUSR1);
1224
1225 ctdb->ctdbd_pid = getpid();
1226 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1227 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1228 ctdb_create_pidfile(ctdb);
1229
1230 /* create a unix domain stream socket to listen to */
1231 res = ux_socket_bind(ctdb);
1232 if (res!=0) {
1233 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1234 exit(10);
1235 }
1236
1237 /* Make sure we log something when the daemon terminates.
1238 * This must be the first exit handler to run (so the last to
1239 * be registered.
1240 */
1241 atexit(print_exit_message);
1242
1243 if (ctdb->do_setsched) {
1244 /* try to set us up as realtime */
1245 if (!set_scheduler()) {
1246 exit(1);
1247 }
1248 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1249 }
1250
1251 ctdb->ev = tevent_context_init(NULL);
1252 if (ctdb->ev == NULL) {
1253 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1254 exit(1);
1255 }
1256 tevent_loop_allow_nesting(ctdb->ev);
1257 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1258 ret = ctdb_init_tevent_logging(ctdb);
1259 if (ret != 0) {
1260 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1261 exit(1);
1262 }
1263
1264 /* set up a handler to pick up sigchld */
1265 if (ctdb_init_sigchld(ctdb) == NULL) {
1266 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1267 exit(1);
1268 }
1269
1270 ctdb_set_child_logging(ctdb);
1271
1272 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1273 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1274 exit(1);
1275 }
1276
1277 /* initialize statistics collection */
1278 ctdb_statistics_init(ctdb);
1279
1280 /* force initial recovery for election */
1281 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1282
1283 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1284 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1285 if (ret != 0) {
1286 ctdb_die(ctdb, "Failed to run init event\n");
1287 }
1288 ctdb_run_notification_script(ctdb, "init");
1289
1290 if (strcmp(ctdb->transport, "tcp") == 0) {
1291 ret = ctdb_tcp_init(ctdb);
1292 }
1293#ifdef USE_INFINIBAND
1294 if (strcmp(ctdb->transport, "ib") == 0) {
1295 ret = ctdb_ibw_init(ctdb);
1296 }
1297#endif
1298 if (ret != 0) {
1299 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1300 return -1;
1301 }
1302
1303 if (ctdb->methods == NULL) {
1304 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1305 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1306 }
1307
1308 /* Initialise the transport. This sets the node address if it
1309 * was not set via the command-line. */
1310 if (ctdb->methods->initialise(ctdb) != 0) {
1311 ctdb_fatal(ctdb, "transport failed to initialise");
1312 }
1313
1314 ctdb_set_my_pnn(ctdb);
1315
1316 initialise_node_flags(ctdb);
1317
1318 if (ctdb->public_addresses_file) {
1319 ret = ctdb_set_public_addresses(ctdb, true);
1320 if (ret == -1) {
1321 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1322 exit(1);
1323 }
1324 }
1325
1326 ctdb_initialise_vnn_map(ctdb);
1327
1328 /* attach to existing databases */
1329 if (ctdb_attach_databases(ctdb) != 0) {
1330 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1331 }
1332
1333 /* start frozen, then let the first election sort things out */
1334 if (!ctdb_blocking_freeze(ctdb)) {
1335 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1336 }
1337
1338 /* now start accepting clients, only can do this once frozen */
1339 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1340 ctdb_accept_client, ctdb);
1341 if (fde == NULL) {
1342 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1343 }
1344 tevent_fd_set_auto_close(fde);
1345
1346 /* Start the transport */
1347 if (ctdb->methods->start(ctdb) != 0) {
1348 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1349 ctdb_fatal(ctdb, "transport failed to start");
1350 }
1351
1352 /* Recovery daemon and timed events are started from the
1353 * callback, only after the setup event completes
1354 * successfully.
1355 */
1356 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1357 ret = ctdb_event_script_callback(ctdb,
1358 ctdb,
1359 ctdb_setup_event_callback,
1360 ctdb,
1361 CTDB_EVENT_SETUP,
1362 "%s",
1363 "");
1364 if (ret != 0) {
1365 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1366 exit(1);
1367 }
1368
1369 lockdown_memory(ctdb->valgrinding);
1370
1371 /* go into a wait loop to allow other nodes to complete */
1372 tevent_loop_wait(ctdb->ev);
1373
1374 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1375 exit(1);
1376}
1377
1378/*
1379 allocate a packet for use in daemon<->daemon communication
1380 */
1381struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1382 TALLOC_CTX *mem_ctx,
1383 enum ctdb_operation operation,
1384 size_t length, size_t slength,
1385 const char *type)
1386{
1387 int size;
1388 struct ctdb_req_header *hdr;
1389
1390 length = MAX(length, slength);
1391 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1392
1393 if (ctdb->methods == NULL) {
1394 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1395 operation, (unsigned)length));
1396 return NULL;
1397 }
1398
1399 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1400 if (hdr == NULL) {
1401 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1402 operation, (unsigned)length));
1403 return NULL;
1404 }
1405 talloc_set_name_const(hdr, type);
1406 memset(hdr, 0, slength);
1407 hdr->length = length;
1408 hdr->operation = operation;
1409 hdr->ctdb_magic = CTDB_MAGIC;
1410 hdr->ctdb_version = CTDB_PROTOCOL;
1411 hdr->generation = ctdb->vnn_map->generation;
1412 hdr->srcnode = ctdb->pnn;
1413
1414 return hdr;
1415}
1416
1417struct daemon_control_state {
1418 struct daemon_control_state *next, *prev;
1419 struct ctdb_client *client;
1420 struct ctdb_req_control_old *c;
1421 uint32_t reqid;
1422 struct ctdb_node *node;
1423};
1424
1425/*
1426 callback when a control reply comes in
1427 */
1428static void daemon_control_callback(struct ctdb_context *ctdb,
1429 int32_t status, TDB_DATA data,
1430 const char *errormsg,
1431 void *private_data)
1432{
1433 struct daemon_control_state *state = talloc_get_type(private_data,
1434 struct daemon_control_state);
1435 struct ctdb_client *client = state->client;
1436 struct ctdb_reply_control_old *r;
1437 size_t len;
1438 int ret;
1439
1440 /* construct a message to send to the client containing the data */
1441 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1442 if (errormsg) {
1443 len += strlen(errormsg);
1444 }
1445 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1446 struct ctdb_reply_control_old);
1447 CTDB_NO_MEMORY_VOID(ctdb, r);
1448
1449 r->hdr.reqid = state->reqid;
1450 r->status = status;
1451 r->datalen = data.dsize;
1452 r->errorlen = 0;
1453 memcpy(&r->data[0], data.dptr, data.dsize);
1454 if (errormsg) {
1455 r->errorlen = strlen(errormsg);
1456 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1457 }
1458
1459 ret = daemon_queue_send(client, &r->hdr);
1460 if (ret != -1) {
1461 talloc_free(state);
1462 }
1463}
1464
1465/*
1466 fail all pending controls to a disconnected node
1467 */
1468void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1469{
1470 struct daemon_control_state *state;
1471 while ((state = node->pending_controls)) {
1472 DLIST_REMOVE(node->pending_controls, state);
1473 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1474 "node is disconnected", state);
1475 }
1476}
1477
1478/*
1479 destroy a daemon_control_state
1480 */
1481static int daemon_control_destructor(struct daemon_control_state *state)
1482{
1483 if (state->node) {
1484 DLIST_REMOVE(state->node->pending_controls, state);
1485 }
1486 return 0;
1487}
1488
1489/*
1490 this is called when the ctdb daemon received a ctdb request control
1491 from a local client over the unix domain socket
1492 */
1493static void daemon_request_control_from_client(struct ctdb_client *client,
1494 struct ctdb_req_control_old *c)
1495{
1496 TDB_DATA data;
1497 int res;
1498 struct daemon_control_state *state;
1499 TALLOC_CTX *tmp_ctx = talloc_new(client);
1500
1501 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1502 c->hdr.destnode = client->ctdb->pnn;
1503 }
1504
1505 state = talloc(client, struct daemon_control_state);
1506 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1507
1508 state->client = client;
1509 state->c = talloc_steal(state, c);
1510 state->reqid = c->hdr.reqid;
1511 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1512 state->node = client->ctdb->nodes[c->hdr.destnode];
1513 DLIST_ADD(state->node->pending_controls, state);
1514 } else {
1515 state->node = NULL;
1516 }
1517
1518 talloc_set_destructor(state, daemon_control_destructor);
1519
1520 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1521 talloc_steal(tmp_ctx, state);
1522 }
1523
1524 data.dptr = &c->data[0];
1525 data.dsize = c->datalen;
1526 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1527 c->srvid, c->opcode, client->client_id,
1528 c->flags,
1529 data, daemon_control_callback,
1530 state);
1531 if (res != 0) {
1532 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1533 c->hdr.destnode));
1534 }
1535
1536 talloc_free(tmp_ctx);
1537}
1538
1539/*
1540 register a call function
1541*/
1542int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1543 ctdb_fn_t fn, int id)
1544{
1545 struct ctdb_registered_call *call;
1546 struct ctdb_db_context *ctdb_db;
1547
1548 ctdb_db = find_ctdb_db(ctdb, db_id);
1549 if (ctdb_db == NULL) {
1550 return -1;
1551 }
1552
1553 call = talloc(ctdb_db, struct ctdb_registered_call);
1554 call->fn = fn;
1555 call->id = id;
1556
1557 DLIST_ADD(ctdb_db->calls, call);
1558 return 0;
1559}
1560
1561
1562
1563/*
1564 this local messaging handler is ugly, but is needed to prevent
1565 recursion in ctdb_send_message() when the destination node is the
1566 same as the source node
1567 */
1568struct ctdb_local_message {
1569 struct ctdb_context *ctdb;
1570 uint64_t srvid;
1571 TDB_DATA data;
1572};
1573
1574static void ctdb_local_message_trigger(struct tevent_context *ev,
1575 struct tevent_timer *te,
1576 struct timeval t, void *private_data)
1577{
1578 struct ctdb_local_message *m = talloc_get_type(
1579 private_data, struct ctdb_local_message);
1580
1581 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1582 talloc_free(m);
1583}
1584
1585static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1586{
1587 struct ctdb_local_message *m;
1588 m = talloc(ctdb, struct ctdb_local_message);
1589 CTDB_NO_MEMORY(ctdb, m);
1590
1591 m->ctdb = ctdb;
1592 m->srvid = srvid;
1593 m->data = data;
1594 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1595 if (m->data.dptr == NULL) {
1596 talloc_free(m);
1597 return -1;
1598 }
1599
1600 /* this needs to be done as an event to prevent recursion */
1601 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1602 ctdb_local_message_trigger, m);
1603 return 0;
1604}
1605
1606/*
1607 send a ctdb message
1608*/
1609int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1610 uint64_t srvid, TDB_DATA data)
1611{
1612 struct ctdb_req_message_old *r;
1613 int len;
1614
1615 if (ctdb->methods == NULL) {
1616 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1617 return -1;
1618 }
1619
1620 /* see if this is a message to ourselves */
1621 if (pnn == ctdb->pnn) {
1622 return ctdb_local_message(ctdb, srvid, data);
1623 }
1624
1625 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1626 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1627 struct ctdb_req_message_old);
1628 CTDB_NO_MEMORY(ctdb, r);
1629
1630 r->hdr.destnode = pnn;
1631 r->srvid = srvid;
1632 r->datalen = data.dsize;
1633 memcpy(&r->data[0], data.dptr, data.dsize);
1634
1635 ctdb_queue_packet(ctdb, &r->hdr);
1636
1637 talloc_free(r);
1638 return 0;
1639}
1640
1641
1642
1643struct ctdb_client_notify_list {
1644 struct ctdb_client_notify_list *next, *prev;
1645 struct ctdb_context *ctdb;
1646 uint64_t srvid;
1647 TDB_DATA data;
1648};
1649
1650
1651static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1652{
1653 int ret;
1654
1655 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1656
1657 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1658 if (ret != 0) {
1659 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1660 }
1661
1662 return 0;
1663}
1664
1665int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1666{
1667 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1668 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1669 struct ctdb_client_notify_list *nl;
1670
1671 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1672
1673 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1674 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1675 return -1;
1676 }
1677
1678 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1679 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1680 return -1;
1681 }
1682
1683
1684 if (client == NULL) {
1685 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1686 return -1;
1687 }
1688
1689 for(nl=client->notify; nl; nl=nl->next) {
1690 if (nl->srvid == notify->srvid) {
1691 break;
1692 }
1693 }
1694 if (nl != NULL) {
1695 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1696 return -1;
1697 }
1698
1699 nl = talloc(client, struct ctdb_client_notify_list);
1700 CTDB_NO_MEMORY(ctdb, nl);
1701 nl->ctdb = ctdb;
1702 nl->srvid = notify->srvid;
1703 nl->data.dsize = notify->len;
1704 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1705 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1706 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1707
1708 DLIST_ADD(client->notify, nl);
1709 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1710
1711 return 0;
1712}
1713
1714int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1715{
1716 uint64_t srvid = *(uint64_t *)indata.dptr;
1717 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1718 struct ctdb_client_notify_list *nl;
1719
1720 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1721
1722 if (client == NULL) {
1723 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1724 return -1;
1725 }
1726
1727 for(nl=client->notify; nl; nl=nl->next) {
1728 if (nl->srvid == srvid) {
1729 break;
1730 }
1731 }
1732 if (nl == NULL) {
1733 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1734 return -1;
1735 }
1736
1737 DLIST_REMOVE(client->notify, nl);
1738 talloc_set_destructor(nl, NULL);
1739 talloc_free(nl);
1740
1741 return 0;
1742}
1743
1744struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1745{
1746 struct ctdb_client_pid_list *client_pid;
1747
1748 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1749 if (client_pid->pid == pid) {
1750 return client_pid->client;
1751 }
1752 }
1753 return NULL;
1754}
1755
1756
1757/* This control is used by samba when probing if a process (of a samba daemon)
1758 exists on the node.
1759 Samba does this when it needs/wants to check if a subrecord in one of the
1760 databases is still valied, or if it is stale and can be removed.
1761 If the node is in unhealthy or stopped state we just kill of the samba
1762 process holding htis sub-record and return to the calling samba that
1763 the process does not exist.
1764 This allows us to forcefully recall subrecords registered by samba processes
1765 on banned and stopped nodes.
1766*/
1767int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1768{
1769 struct ctdb_client *client;
1770
1771 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1772 client = ctdb_find_client_by_pid(ctdb, pid);
1773 if (client != NULL) {
1774 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1775 talloc_free(client);
1776 }
1777 return -1;
1778 }
1779
1780 return kill(pid, 0);
1781}
1782
1783int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1784{
1785 struct ctdb_node_map_old *node_map = NULL;
1786
1787 CHECK_CONTROL_DATA_SIZE(0);
1788
1789 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1790 if (node_map == NULL) {
1791 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1792 return -1;
1793 }
1794
1795 outdata->dptr = (unsigned char *)node_map;
1796 outdata->dsize = talloc_get_size(outdata->dptr);
1797
1798 return 0;
1799}
1800
1801void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1802{
1803 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1804 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1805 return;
1806 }
1807
1808 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1809 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1810 ctdb_stop_recoverd(ctdb);
1811 ctdb_stop_keepalive(ctdb);
1812 ctdb_stop_monitoring(ctdb);
1813 ctdb_release_all_ips(ctdb);
1814 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1815 if (ctdb->methods != NULL) {
1816 ctdb->methods->shutdown(ctdb);
1817 }
1818
1819 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1820 exit(exit_code);
1821}
1822
1823/* When forking the main daemon and the child process needs to connect
1824 * back to the daemon as a client process, this function can be used
1825 * to change the ctdb context from daemon into client mode. The child
1826 * process must be created using ctdb_fork() and not fork() -
1827 * ctdb_fork() does some necessary housekeeping.
1828 */
1829int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1830{
1831 int ret;
1832 va_list ap;
1833
1834 /* Add extra information so we can identify this in the logs */
1835 va_start(ap, fmt);
1836 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1837 va_end(ap);
1838
1839 /* get a new event context */
1840 ctdb->ev = tevent_context_init(ctdb);
1841 if (ctdb->ev == NULL) {
1842 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1843 exit(1);
1844 }
1845 tevent_loop_allow_nesting(ctdb->ev);
1846
1847 /* Connect to main CTDB daemon */
1848 ret = ctdb_socket_connect(ctdb);
1849 if (ret != 0) {
1850 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1851 return -1;
1852 }
1853
1854 ctdb->can_send_controls = true;
1855
1856 return 0;
1857}
Note: See TracBrowser for help on using the repository browser.