source: vendor/current/ctdb/server/ctdb_call.c

Last change on this file was 988, checked in by Silvan Scherrer, 9 years ago

Samba Server: update vendor to version 4.4.3

File size: 53.8 KB
Line 
1/*
2 ctdb_call protocol code
3
4 Copyright (C) Andrew Tridgell 2006
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18*/
19/*
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
22*/
23#include "replace.h"
24#include "system/network.h"
25#include "system/filesys.h"
26
27#include <talloc.h>
28#include <tevent.h>
29
30#include "lib/util/dlinklist.h"
31#include "lib/util/debug.h"
32#include "lib/util/samba_util.h"
33#include "lib/util/util_process.h"
34
35#include "ctdb_private.h"
36#include "ctdb_client.h"
37
38#include "common/rb_tree.h"
39#include "common/reqid.h"
40#include "common/system.h"
41#include "common/common.h"
42#include "common/logging.h"
43
44struct ctdb_sticky_record {
45 struct ctdb_context *ctdb;
46 struct ctdb_db_context *ctdb_db;
47 TDB_CONTEXT *pindown;
48};
49
50/*
51 find the ctdb_db from a db index
52 */
53 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
54{
55 struct ctdb_db_context *ctdb_db;
56
57 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
58 if (ctdb_db->db_id == id) {
59 break;
60 }
61 }
62 return ctdb_db;
63}
64
65/*
66 a varient of input packet that can be used in lock requeue
67*/
68static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
69{
70 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
71 ctdb_input_pkt(ctdb, hdr);
72}
73
74
75/*
76 send an error reply
77*/
78static void ctdb_send_error(struct ctdb_context *ctdb,
79 struct ctdb_req_header *hdr, uint32_t status,
80 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
81static void ctdb_send_error(struct ctdb_context *ctdb,
82 struct ctdb_req_header *hdr, uint32_t status,
83 const char *fmt, ...)
84{
85 va_list ap;
86 struct ctdb_reply_error_old *r;
87 char *msg;
88 int msglen, len;
89
90 if (ctdb->methods == NULL) {
91 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
92 return;
93 }
94
95 va_start(ap, fmt);
96 msg = talloc_vasprintf(ctdb, fmt, ap);
97 if (msg == NULL) {
98 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
99 }
100 va_end(ap);
101
102 msglen = strlen(msg)+1;
103 len = offsetof(struct ctdb_reply_error_old, msg);
104 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
105 struct ctdb_reply_error_old);
106 CTDB_NO_MEMORY_FATAL(ctdb, r);
107
108 r->hdr.destnode = hdr->srcnode;
109 r->hdr.reqid = hdr->reqid;
110 r->status = status;
111 r->msglen = msglen;
112 memcpy(&r->msg[0], msg, msglen);
113
114 ctdb_queue_packet(ctdb, &r->hdr);
115
116 talloc_free(msg);
117}
118
119
120/**
121 * send a redirect reply
122 *
123 * The logic behind this function is this:
124 *
125 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
126 * to its local ctdb (ctdb_request_call). If the node is not itself
127 * the record's DMASTER, it first redirects the packet to the
128 * record's LMASTER. The LMASTER then redirects the call packet to
129 * the current DMASTER. Note that this works because of this: When
130 * a record is migrated off a node, then the new DMASTER is stored
131 * in the record's copy on the former DMASTER.
132 */
133static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
134 struct ctdb_db_context *ctdb_db,
135 TDB_DATA key,
136 struct ctdb_req_call_old *c,
137 struct ctdb_ltdb_header *header)
138{
139 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
140
141 c->hdr.destnode = lmaster;
142 if (ctdb->pnn == lmaster) {
143 c->hdr.destnode = header->dmaster;
144 }
145 c->hopcount++;
146
147 if (c->hopcount%100 > 95) {
148 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
149 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
150 "header->dmaster:%d dst:%d\n",
151 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
152 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
153 header->dmaster, c->hdr.destnode));
154 }
155
156 ctdb_queue_packet(ctdb, &c->hdr);
157}
158
159
160/*
161 send a dmaster reply
162
163 caller must have the chainlock before calling this routine. Caller must be
164 the lmaster
165*/
166static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
167 struct ctdb_ltdb_header *header,
168 TDB_DATA key, TDB_DATA data,
169 uint32_t new_dmaster,
170 uint32_t reqid)
171{
172 struct ctdb_context *ctdb = ctdb_db->ctdb;
173 struct ctdb_reply_dmaster_old *r;
174 int ret, len;
175 TALLOC_CTX *tmp_ctx;
176
177 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
178 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
179 return;
180 }
181
182 header->dmaster = new_dmaster;
183 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
184 if (ret != 0) {
185 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
186 return;
187 }
188
189 if (ctdb->methods == NULL) {
190 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
191 return;
192 }
193
194 /* put the packet on a temporary context, allowing us to safely free
195 it below even if ctdb_reply_dmaster() has freed it already */
196 tmp_ctx = talloc_new(ctdb);
197
198 /* send the CTDB_REPLY_DMASTER */
199 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
200 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
201 struct ctdb_reply_dmaster_old);
202 CTDB_NO_MEMORY_FATAL(ctdb, r);
203
204 r->hdr.destnode = new_dmaster;
205 r->hdr.reqid = reqid;
206 r->hdr.generation = ctdb_db->generation;
207 r->rsn = header->rsn;
208 r->keylen = key.dsize;
209 r->datalen = data.dsize;
210 r->db_id = ctdb_db->db_id;
211 memcpy(&r->data[0], key.dptr, key.dsize);
212 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
213 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
214
215 ctdb_queue_packet(ctdb, &r->hdr);
216
217 talloc_free(tmp_ctx);
218}
219
220/*
221 send a dmaster request (give another node the dmaster for a record)
222
223 This is always sent to the lmaster, which ensures that the lmaster
224 always knows who the dmaster is. The lmaster will then send a
225 CTDB_REPLY_DMASTER to the new dmaster
226*/
227static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
228 struct ctdb_req_call_old *c,
229 struct ctdb_ltdb_header *header,
230 TDB_DATA *key, TDB_DATA *data)
231{
232 struct ctdb_req_dmaster_old *r;
233 struct ctdb_context *ctdb = ctdb_db->ctdb;
234 int len;
235 uint32_t lmaster = ctdb_lmaster(ctdb, key);
236
237 if (ctdb->methods == NULL) {
238 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
239 return;
240 }
241
242 if (data->dsize != 0) {
243 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
244 }
245
246 if (lmaster == ctdb->pnn) {
247 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
248 c->hdr.srcnode, c->hdr.reqid);
249 return;
250 }
251
252 len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
253 + sizeof(uint32_t);
254 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
255 struct ctdb_req_dmaster_old);
256 CTDB_NO_MEMORY_FATAL(ctdb, r);
257 r->hdr.destnode = lmaster;
258 r->hdr.reqid = c->hdr.reqid;
259 r->hdr.generation = ctdb_db->generation;
260 r->db_id = c->db_id;
261 r->rsn = header->rsn;
262 r->dmaster = c->hdr.srcnode;
263 r->keylen = key->dsize;
264 r->datalen = data->dsize;
265 memcpy(&r->data[0], key->dptr, key->dsize);
266 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
267 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
268
269 header->dmaster = c->hdr.srcnode;
270 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
271 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
272 }
273
274 ctdb_queue_packet(ctdb, &r->hdr);
275
276 talloc_free(r);
277}
278
279static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
280 struct tevent_timer *te,
281 struct timeval t, void *private_data)
282{
283 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
284 struct ctdb_sticky_record);
285
286 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
287 if (sr->pindown != NULL) {
288 talloc_free(sr->pindown);
289 sr->pindown = NULL;
290 }
291}
292
293static int
294ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
295{
296 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
297 uint32_t *k;
298 struct ctdb_sticky_record *sr;
299
300 k = ctdb_key_to_idkey(tmp_ctx, key);
301 if (k == NULL) {
302 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
303 talloc_free(tmp_ctx);
304 return -1;
305 }
306
307 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
308 if (sr == NULL) {
309 talloc_free(tmp_ctx);
310 return 0;
311 }
312
313 talloc_free(tmp_ctx);
314
315 if (sr->pindown == NULL) {
316 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
317 sr->pindown = talloc_new(sr);
318 if (sr->pindown == NULL) {
319 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
320 return -1;
321 }
322 tevent_add_timer(ctdb->ev, sr->pindown,
323 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
324 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
325 ctdb_sticky_pindown_timeout, sr);
326 }
327
328 return 0;
329}
330
331/*
332 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
333 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
334
335 must be called with the chainlock held. This function releases the chainlock
336*/
337static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
338 struct ctdb_req_header *hdr,
339 TDB_DATA key, TDB_DATA data,
340 uint64_t rsn, uint32_t record_flags)
341{
342 struct ctdb_call_state *state;
343 struct ctdb_context *ctdb = ctdb_db->ctdb;
344 struct ctdb_ltdb_header header;
345 int ret;
346
347 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
348
349 ZERO_STRUCT(header);
350 header.rsn = rsn;
351 header.dmaster = ctdb->pnn;
352 header.flags = record_flags;
353
354 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
355
356 if (state) {
357 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
358 /*
359 * We temporarily add the VACUUM_MIGRATED flag to
360 * the record flags, so that ctdb_ltdb_store can
361 * decide whether the record should be stored or
362 * deleted.
363 */
364 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
365 }
366 }
367
368 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
369 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
370
371 ret = ctdb_ltdb_unlock(ctdb_db, key);
372 if (ret != 0) {
373 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
374 }
375 return;
376 }
377
378 /* we just became DMASTER and this database is "sticky",
379 see if the record is flagged as "hot" and set up a pin-down
380 context to stop migrations for a little while if so
381 */
382 if (ctdb_db->sticky) {
383 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
384 }
385
386 if (state == NULL) {
387 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
388 ctdb->pnn, hdr->reqid, hdr->srcnode));
389
390 ret = ctdb_ltdb_unlock(ctdb_db, key);
391 if (ret != 0) {
392 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
393 }
394 return;
395 }
396
397 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
398 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
399
400 ret = ctdb_ltdb_unlock(ctdb_db, key);
401 if (ret != 0) {
402 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
403 }
404 return;
405 }
406
407 if (hdr->reqid != state->reqid) {
408 /* we found a record but it was the wrong one */
409 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
410
411 ret = ctdb_ltdb_unlock(ctdb_db, key);
412 if (ret != 0) {
413 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
414 }
415 return;
416 }
417
418 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
419
420 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
421 if (ret != 0) {
422 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
423 }
424
425 state->state = CTDB_CALL_DONE;
426 if (state->async.fn) {
427 state->async.fn(state);
428 }
429}
430
431struct dmaster_defer_call {
432 struct dmaster_defer_call *next, *prev;
433 struct ctdb_context *ctdb;
434 struct ctdb_req_header *hdr;
435};
436
437struct dmaster_defer_queue {
438 struct ctdb_db_context *ctdb_db;
439 uint32_t generation;
440 struct dmaster_defer_call *deferred_calls;
441};
442
443static void dmaster_defer_reprocess(struct tevent_context *ev,
444 struct tevent_timer *te,
445 struct timeval t,
446 void *private_data)
447{
448 struct dmaster_defer_call *call = talloc_get_type(
449 private_data, struct dmaster_defer_call);
450
451 ctdb_input_pkt(call->ctdb, call->hdr);
452 talloc_free(call);
453}
454
455static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
456{
457 /* Ignore requests, if database recovery happens in-between. */
458 if (ddq->generation != ddq->ctdb_db->generation) {
459 return 0;
460 }
461
462 while (ddq->deferred_calls != NULL) {
463 struct dmaster_defer_call *call = ddq->deferred_calls;
464
465 DLIST_REMOVE(ddq->deferred_calls, call);
466
467 talloc_steal(call->ctdb, call);
468 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
469 dmaster_defer_reprocess, call);
470 }
471 return 0;
472}
473
474static void *insert_ddq_callback(void *parm, void *data)
475{
476 if (data) {
477 talloc_free(data);
478 }
479 return parm;
480}
481
482/**
483 * This function is used to reigster a key in database that needs to be updated.
484 * Any requests for that key should get deferred till this is completed.
485 */
486static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
487 struct ctdb_req_header *hdr,
488 TDB_DATA key)
489{
490 uint32_t *k;
491 struct dmaster_defer_queue *ddq;
492
493 k = ctdb_key_to_idkey(hdr, key);
494 if (k == NULL) {
495 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
496 return -1;
497 }
498
499 /* Already exists */
500 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
501 if (ddq != NULL) {
502 if (ddq->generation == ctdb_db->generation) {
503 talloc_free(k);
504 return 0;
505 }
506
507 /* Recovery ocurred - get rid of old queue. All the deferred
508 * requests will be resent anyway from ctdb_call_resend_db.
509 */
510 talloc_free(ddq);
511 }
512
513 ddq = talloc(hdr, struct dmaster_defer_queue);
514 if (ddq == NULL) {
515 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
516 talloc_free(k);
517 return -1;
518 }
519 ddq->ctdb_db = ctdb_db;
520 ddq->generation = hdr->generation;
521 ddq->deferred_calls = NULL;
522
523 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
524 insert_ddq_callback, ddq);
525 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
526
527 talloc_free(k);
528 return 0;
529}
530
531static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
532 struct ctdb_req_header *hdr,
533 TDB_DATA key)
534{
535 struct dmaster_defer_queue *ddq;
536 struct dmaster_defer_call *call;
537 uint32_t *k;
538
539 k = ctdb_key_to_idkey(hdr, key);
540 if (k == NULL) {
541 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
542 return -1;
543 }
544
545 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
546 if (ddq == NULL) {
547 talloc_free(k);
548 return -1;
549 }
550
551 talloc_free(k);
552
553 if (ddq->generation != hdr->generation) {
554 talloc_set_destructor(ddq, NULL);
555 talloc_free(ddq);
556 return -1;
557 }
558
559 call = talloc(ddq, struct dmaster_defer_call);
560 if (call == NULL) {
561 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
562 return -1;
563 }
564
565 call->ctdb = ctdb_db->ctdb;
566 call->hdr = talloc_steal(call, hdr);
567
568 DLIST_ADD_END(ddq->deferred_calls, call);
569
570 return 0;
571}
572
573/*
574 called when a CTDB_REQ_DMASTER packet comes in
575
576 this comes into the lmaster for a record when the current dmaster
577 wants to give up the dmaster role and give it to someone else
578*/
579void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
580{
581 struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
582 TDB_DATA key, data, data2;
583 struct ctdb_ltdb_header header;
584 struct ctdb_db_context *ctdb_db;
585 uint32_t record_flags = 0;
586 size_t len;
587 int ret;
588
589 key.dptr = c->data;
590 key.dsize = c->keylen;
591 data.dptr = c->data + c->keylen;
592 data.dsize = c->datalen;
593 len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
594 + sizeof(uint32_t);
595 if (len <= c->hdr.length) {
596 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
597 sizeof(record_flags));
598 }
599
600 ctdb_db = find_ctdb_db(ctdb, c->db_id);
601 if (!ctdb_db) {
602 ctdb_send_error(ctdb, hdr, -1,
603 "Unknown database in request. db_id==0x%08x",
604 c->db_id);
605 return;
606 }
607
608 dmaster_defer_setup(ctdb_db, hdr, key);
609
610 /* fetch the current record */
611 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
612 ctdb_call_input_pkt, ctdb, false);
613 if (ret == -1) {
614 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
615 return;
616 }
617 if (ret == -2) {
618 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
619 return;
620 }
621
622 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
623 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
624 ctdb->pnn, ctdb_lmaster(ctdb, &key),
625 hdr->generation, ctdb->vnn_map->generation));
626 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
627 }
628
629 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
630 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
631
632 /* its a protocol error if the sending node is not the current dmaster */
633 if (header.dmaster != hdr->srcnode) {
634 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
635 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
636 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
637 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
638 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
639 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
640 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
641
642 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
643 ctdb_ltdb_unlock(ctdb_db, key);
644 return;
645 }
646 }
647
648 if (header.rsn > c->rsn) {
649 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
650 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
651 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
652 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
653 }
654
655 /* use the rsn from the sending node */
656 header.rsn = c->rsn;
657
658 /* store the record flags from the sending node */
659 header.flags = record_flags;
660
661 /* check if the new dmaster is the lmaster, in which case we
662 skip the dmaster reply */
663 if (c->dmaster == ctdb->pnn) {
664 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
665 } else {
666 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
667
668 ret = ctdb_ltdb_unlock(ctdb_db, key);
669 if (ret != 0) {
670 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
671 }
672 }
673}
674
675static void ctdb_sticky_record_timeout(struct tevent_context *ev,
676 struct tevent_timer *te,
677 struct timeval t, void *private_data)
678{
679 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
680 struct ctdb_sticky_record);
681 talloc_free(sr);
682}
683
684static void *ctdb_make_sticky_record_callback(void *parm, void *data)
685{
686 if (data) {
687 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
688 talloc_free(data);
689 }
690 return parm;
691}
692
693static int
694ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
695{
696 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
697 uint32_t *k;
698 struct ctdb_sticky_record *sr;
699
700 k = ctdb_key_to_idkey(tmp_ctx, key);
701 if (k == NULL) {
702 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
703 talloc_free(tmp_ctx);
704 return -1;
705 }
706
707 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
708 if (sr != NULL) {
709 talloc_free(tmp_ctx);
710 return 0;
711 }
712
713 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
714 if (sr == NULL) {
715 talloc_free(tmp_ctx);
716 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
717 return -1;
718 }
719
720 sr->ctdb = ctdb;
721 sr->ctdb_db = ctdb_db;
722 sr->pindown = NULL;
723
724 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
725 ctdb->tunable.sticky_duration,
726 ctdb_db->db_name, ctdb_hash(&key)));
727
728 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
729
730 tevent_add_timer(ctdb->ev, sr,
731 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
732 ctdb_sticky_record_timeout, sr);
733
734 talloc_free(tmp_ctx);
735 return 0;
736}
737
738struct pinned_down_requeue_handle {
739 struct ctdb_context *ctdb;
740 struct ctdb_req_header *hdr;
741};
742
743struct pinned_down_deferred_call {
744 struct ctdb_context *ctdb;
745 struct ctdb_req_header *hdr;
746};
747
748static void pinned_down_requeue(struct tevent_context *ev,
749 struct tevent_timer *te,
750 struct timeval t, void *private_data)
751{
752 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
753 struct ctdb_context *ctdb = handle->ctdb;
754
755 talloc_steal(ctdb, handle->hdr);
756 ctdb_call_input_pkt(ctdb, handle->hdr);
757
758 talloc_free(handle);
759}
760
761static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
762{
763 struct ctdb_context *ctdb = pinned_down->ctdb;
764 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
765
766 handle->ctdb = pinned_down->ctdb;
767 handle->hdr = pinned_down->hdr;
768 talloc_steal(handle, handle->hdr);
769
770 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
771 pinned_down_requeue, handle);
772
773 return 0;
774}
775
776static int
777ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
778{
779 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
780 uint32_t *k;
781 struct ctdb_sticky_record *sr;
782 struct pinned_down_deferred_call *pinned_down;
783
784 k = ctdb_key_to_idkey(tmp_ctx, key);
785 if (k == NULL) {
786 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
787 talloc_free(tmp_ctx);
788 return -1;
789 }
790
791 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
792 if (sr == NULL) {
793 talloc_free(tmp_ctx);
794 return -1;
795 }
796
797 talloc_free(tmp_ctx);
798
799 if (sr->pindown == NULL) {
800 return -1;
801 }
802
803 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
804 if (pinned_down == NULL) {
805 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
806 return -1;
807 }
808
809 pinned_down->ctdb = ctdb;
810 pinned_down->hdr = hdr;
811
812 talloc_set_destructor(pinned_down, pinned_down_destructor);
813 talloc_steal(pinned_down, hdr);
814
815 return 0;
816}
817
818static void
819ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
820{
821 int i, id;
822
823 /* smallest value is always at index 0 */
824 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
825 return;
826 }
827
828 /* see if we already know this key */
829 for (i = 0; i < MAX_HOT_KEYS; i++) {
830 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
831 continue;
832 }
833 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
834 continue;
835 }
836 /* found an entry for this key */
837 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
838 return;
839 }
840 ctdb_db->statistics.hot_keys[i].count = hopcount;
841 goto sort_keys;
842 }
843
844 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
845 id = ctdb_db->statistics.num_hot_keys;
846 ctdb_db->statistics.num_hot_keys++;
847 } else {
848 id = 0;
849 }
850
851 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
852 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
853 }
854 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
855 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
856 ctdb_db->statistics.hot_keys[id].count = hopcount;
857 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
858 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
859
860sort_keys:
861 for (i = 1; i < MAX_HOT_KEYS; i++) {
862 if (ctdb_db->statistics.hot_keys[i].count == 0) {
863 continue;
864 }
865 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
866 hopcount = ctdb_db->statistics.hot_keys[i].count;
867 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
868 ctdb_db->statistics.hot_keys[0].count = hopcount;
869
870 key = ctdb_db->statistics.hot_keys[i].key;
871 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
872 ctdb_db->statistics.hot_keys[0].key = key;
873 }
874 }
875}
876
877/*
878 called when a CTDB_REQ_CALL packet comes in
879*/
880void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
881{
882 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
883 TDB_DATA data;
884 struct ctdb_reply_call_old *r;
885 int ret, len;
886 struct ctdb_ltdb_header header;
887 struct ctdb_call *call;
888 struct ctdb_db_context *ctdb_db;
889 int tmp_count, bucket;
890
891 if (ctdb->methods == NULL) {
892 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
893 return;
894 }
895
896
897 ctdb_db = find_ctdb_db(ctdb, c->db_id);
898 if (!ctdb_db) {
899 ctdb_send_error(ctdb, hdr, -1,
900 "Unknown database in request. db_id==0x%08x",
901 c->db_id);
902 return;
903 }
904
905 call = talloc(hdr, struct ctdb_call);
906 CTDB_NO_MEMORY_FATAL(ctdb, call);
907
908 call->call_id = c->callid;
909 call->key.dptr = c->data;
910 call->key.dsize = c->keylen;
911 call->call_data.dptr = c->data + c->keylen;
912 call->call_data.dsize = c->calldatalen;
913 call->reply_data.dptr = NULL;
914 call->reply_data.dsize = 0;
915
916
917 /* If this record is pinned down we should defer the
918 request until the pindown times out
919 */
920 if (ctdb_db->sticky) {
921 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
922 DEBUG(DEBUG_WARNING,
923 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
924 talloc_free(call);
925 return;
926 }
927 }
928
929 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
930 talloc_free(call);
931 return;
932 }
933
934 /* determine if we are the dmaster for this key. This also
935 fetches the record data (if any), thus avoiding a 2nd fetch of the data
936 if the call will be answered locally */
937
938 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
939 ctdb_call_input_pkt, ctdb, false);
940 if (ret == -1) {
941 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
942 talloc_free(call);
943 return;
944 }
945 if (ret == -2) {
946 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
947 talloc_free(call);
948 return;
949 }
950
951 /* Dont do READONLY if we don't have a tracking database */
952 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
953 c->flags &= ~CTDB_WANT_READONLY;
954 }
955
956 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
957 header.flags &= ~CTDB_REC_RO_FLAGS;
958 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
959 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
960 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
961 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
962 }
963 /* and clear out the tracking data */
964 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
965 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
966 }
967 }
968
969 /* if we are revoking, we must defer all other calls until the revoke
970 * had completed.
971 */
972 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
973 talloc_free(data.dptr);
974 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
975
976 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
977 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
978 }
979 talloc_free(call);
980 return;
981 }
982
983 /*
984 * If we are not the dmaster and are not hosting any delegations,
985 * then we redirect the request to the node than can answer it
986 * (the lmaster or the dmaster).
987 */
988 if ((header.dmaster != ctdb->pnn)
989 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
990 talloc_free(data.dptr);
991 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
992
993 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
994 if (ret != 0) {
995 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
996 }
997 talloc_free(call);
998 return;
999 }
1000
1001 if ( (!(c->flags & CTDB_WANT_READONLY))
1002 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1003 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1004 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1005 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1006 }
1007 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1008
1009 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1010 ctdb_fatal(ctdb, "Failed to start record revoke");
1011 }
1012 talloc_free(data.dptr);
1013
1014 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1015 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1016 }
1017 talloc_free(call);
1018
1019 return;
1020 }
1021
1022 /* If this is the first request for delegation. bump rsn and set
1023 * the delegations flag
1024 */
1025 if ((c->flags & CTDB_WANT_READONLY)
1026 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1027 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1028 header.rsn += 3;
1029 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1030 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1031 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1032 }
1033 }
1034 if ((c->flags & CTDB_WANT_READONLY)
1035 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1036 TDB_DATA tdata;
1037
1038 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1039 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1040 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1041 }
1042 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1043 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1044 }
1045 free(tdata.dptr);
1046
1047 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1048 if (ret != 0) {
1049 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1050 }
1051
1052 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1053 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1054 struct ctdb_reply_call_old);
1055 CTDB_NO_MEMORY_FATAL(ctdb, r);
1056 r->hdr.destnode = c->hdr.srcnode;
1057 r->hdr.reqid = c->hdr.reqid;
1058 r->hdr.generation = ctdb_db->generation;
1059 r->status = 0;
1060 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1061 header.rsn -= 2;
1062 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1063 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1064 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1065
1066 if (data.dsize) {
1067 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1068 }
1069
1070 ctdb_queue_packet(ctdb, &r->hdr);
1071 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1072 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1073
1074 talloc_free(r);
1075 talloc_free(call);
1076 return;
1077 }
1078
1079 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1080 tmp_count = c->hopcount;
1081 bucket = 0;
1082 while (tmp_count) {
1083 tmp_count >>= 2;
1084 bucket++;
1085 }
1086 if (bucket >= MAX_COUNT_BUCKETS) {
1087 bucket = MAX_COUNT_BUCKETS - 1;
1088 }
1089 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1090 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1091 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1092
1093 /* If this database supports sticky records, then check if the
1094 hopcount is big. If it is it means the record is hot and we
1095 should make it sticky.
1096 */
1097 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1098 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1099 }
1100
1101
1102 /* Try if possible to migrate the record off to the caller node.
1103 * From the clients perspective a fetch of the data is just as
1104 * expensive as a migration.
1105 */
1106 if (c->hdr.srcnode != ctdb->pnn) {
1107 if (ctdb_db->persistent_state) {
1108 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1109 " of key %s while transaction is active\n",
1110 (char *)call->key.dptr));
1111 } else {
1112 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1113 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1114 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1115 talloc_free(data.dptr);
1116
1117 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1118 if (ret != 0) {
1119 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1120 }
1121 }
1122 talloc_free(call);
1123 return;
1124 }
1125
1126 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1127 if (ret != 0) {
1128 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1129 call->status = -1;
1130 }
1131
1132 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1133 if (ret != 0) {
1134 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1135 }
1136
1137 len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1138 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1139 struct ctdb_reply_call_old);
1140 CTDB_NO_MEMORY_FATAL(ctdb, r);
1141 r->hdr.destnode = hdr->srcnode;
1142 r->hdr.reqid = hdr->reqid;
1143 r->hdr.generation = ctdb_db->generation;
1144 r->status = call->status;
1145 r->datalen = call->reply_data.dsize;
1146 if (call->reply_data.dsize) {
1147 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1148 }
1149
1150 ctdb_queue_packet(ctdb, &r->hdr);
1151
1152 talloc_free(r);
1153 talloc_free(call);
1154}
1155
1156/**
1157 * called when a CTDB_REPLY_CALL packet comes in
1158 *
1159 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1160 * contains any reply data from the call
1161 */
1162void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1163{
1164 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1165 struct ctdb_call_state *state;
1166
1167 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1168 if (state == NULL) {
1169 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1170 return;
1171 }
1172
1173 if (hdr->reqid != state->reqid) {
1174 /* we found a record but it was the wrong one */
1175 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1176 return;
1177 }
1178
1179
1180 /* read only delegation processing */
1181 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1182 * delegation since we may need to update the record header
1183 */
1184 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1185 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1186 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1187 struct ctdb_ltdb_header oldheader;
1188 TDB_DATA key, data, olddata;
1189 int ret;
1190
1191 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1192 goto finished_ro;
1193 return;
1194 }
1195
1196 key.dsize = state->c->keylen;
1197 key.dptr = state->c->data;
1198 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1199 ctdb_call_input_pkt, ctdb, false);
1200 if (ret == -2) {
1201 return;
1202 }
1203 if (ret != 0) {
1204 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1205 return;
1206 }
1207
1208 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1209 if (ret != 0) {
1210 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1211 ctdb_ltdb_unlock(ctdb_db, key);
1212 goto finished_ro;
1213 }
1214
1215 if (header->rsn <= oldheader.rsn) {
1216 ctdb_ltdb_unlock(ctdb_db, key);
1217 goto finished_ro;
1218 }
1219
1220 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1221 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1222 ctdb_ltdb_unlock(ctdb_db, key);
1223 goto finished_ro;
1224 }
1225
1226 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1227 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1228 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1229 if (ret != 0) {
1230 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1231 ctdb_ltdb_unlock(ctdb_db, key);
1232 goto finished_ro;
1233 }
1234
1235 ctdb_ltdb_unlock(ctdb_db, key);
1236 }
1237finished_ro:
1238
1239 state->call->reply_data.dptr = c->data;
1240 state->call->reply_data.dsize = c->datalen;
1241 state->call->status = c->status;
1242
1243 talloc_steal(state, c);
1244
1245 state->state = CTDB_CALL_DONE;
1246 if (state->async.fn) {
1247 state->async.fn(state);
1248 }
1249}
1250
1251
1252/**
1253 * called when a CTDB_REPLY_DMASTER packet comes in
1254 *
1255 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1256 * request packet. It means that the current dmaster wants to give us
1257 * the dmaster role.
1258 */
1259void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1260{
1261 struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1262 struct ctdb_db_context *ctdb_db;
1263 TDB_DATA key, data;
1264 uint32_t record_flags = 0;
1265 size_t len;
1266 int ret;
1267
1268 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1269 if (ctdb_db == NULL) {
1270 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1271 return;
1272 }
1273
1274 key.dptr = c->data;
1275 key.dsize = c->keylen;
1276 data.dptr = &c->data[key.dsize];
1277 data.dsize = c->datalen;
1278 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1279 + sizeof(uint32_t);
1280 if (len <= c->hdr.length) {
1281 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1282 sizeof(record_flags));
1283 }
1284
1285 dmaster_defer_setup(ctdb_db, hdr, key);
1286
1287 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1288 ctdb_call_input_pkt, ctdb, false);
1289 if (ret == -2) {
1290 return;
1291 }
1292 if (ret != 0) {
1293 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1294 return;
1295 }
1296
1297 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1298}
1299
1300
1301/*
1302 called when a CTDB_REPLY_ERROR packet comes in
1303*/
1304void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1305{
1306 struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1307 struct ctdb_call_state *state;
1308
1309 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1310 if (state == NULL) {
1311 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1312 ctdb->pnn, hdr->reqid));
1313 return;
1314 }
1315
1316 if (hdr->reqid != state->reqid) {
1317 /* we found a record but it was the wrong one */
1318 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1319 return;
1320 }
1321
1322 talloc_steal(state, c);
1323
1324 state->state = CTDB_CALL_ERROR;
1325 state->errmsg = (char *)c->msg;
1326 if (state->async.fn) {
1327 state->async.fn(state);
1328 }
1329}
1330
1331
1332/*
1333 destroy a ctdb_call
1334*/
1335static int ctdb_call_destructor(struct ctdb_call_state *state)
1336{
1337 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1338 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1339 return 0;
1340}
1341
1342
1343/*
1344 called when a ctdb_call needs to be resent after a reconfigure event
1345*/
1346static void ctdb_call_resend(struct ctdb_call_state *state)
1347{
1348 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1349
1350 state->generation = state->ctdb_db->generation;
1351
1352 /* use a new reqid, in case the old reply does eventually come in */
1353 reqid_remove(ctdb->idr, state->reqid);
1354 state->reqid = reqid_new(ctdb->idr, state);
1355 state->c->hdr.reqid = state->reqid;
1356
1357 /* update the generation count for this request, so its valid with the new vnn_map */
1358 state->c->hdr.generation = state->generation;
1359
1360 /* send the packet to ourselves, it will be redirected appropriately */
1361 state->c->hdr.destnode = ctdb->pnn;
1362
1363 ctdb_queue_packet(ctdb, &state->c->hdr);
1364 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1365 state->ctdb_db->db_name, state->reqid, state->generation));
1366}
1367
1368/*
1369 resend all pending calls on recovery
1370 */
1371void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1372{
1373 struct ctdb_call_state *state, *next;
1374
1375 for (state = ctdb_db->pending_calls; state; state = next) {
1376 next = state->next;
1377 ctdb_call_resend(state);
1378 }
1379}
1380
1381void ctdb_call_resend_all(struct ctdb_context *ctdb)
1382{
1383 struct ctdb_db_context *ctdb_db;
1384
1385 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1386 ctdb_call_resend_db(ctdb_db);
1387 }
1388}
1389
1390/*
1391 this allows the caller to setup a async.fn
1392*/
1393static void call_local_trigger(struct tevent_context *ev,
1394 struct tevent_timer *te,
1395 struct timeval t, void *private_data)
1396{
1397 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1398 if (state->async.fn) {
1399 state->async.fn(state);
1400 }
1401}
1402
1403
1404/*
1405 construct an event driven local ctdb_call
1406
1407 this is used so that locally processed ctdb_call requests are processed
1408 in an event driven manner
1409*/
1410struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1411 struct ctdb_call *call,
1412 struct ctdb_ltdb_header *header,
1413 TDB_DATA *data)
1414{
1415 struct ctdb_call_state *state;
1416 struct ctdb_context *ctdb = ctdb_db->ctdb;
1417 int ret;
1418
1419 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1420 CTDB_NO_MEMORY_NULL(ctdb, state);
1421
1422 talloc_steal(state, data->dptr);
1423
1424 state->state = CTDB_CALL_DONE;
1425 state->call = talloc(state, struct ctdb_call);
1426 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1427 *(state->call) = *call;
1428 state->ctdb_db = ctdb_db;
1429
1430 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1431 if (ret != 0) {
1432 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1433 }
1434
1435 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1436 call_local_trigger, state);
1437
1438 return state;
1439}
1440
1441
1442/*
1443 make a remote ctdb call - async send. Called in daemon context.
1444
1445 This constructs a ctdb_call request and queues it for processing.
1446 This call never blocks.
1447*/
1448struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1449 struct ctdb_call *call,
1450 struct ctdb_ltdb_header *header)
1451{
1452 uint32_t len;
1453 struct ctdb_call_state *state;
1454 struct ctdb_context *ctdb = ctdb_db->ctdb;
1455
1456 if (ctdb->methods == NULL) {
1457 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1458 return NULL;
1459 }
1460
1461 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1462 CTDB_NO_MEMORY_NULL(ctdb, state);
1463 state->call = talloc(state, struct ctdb_call);
1464 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1465
1466 state->reqid = reqid_new(ctdb->idr, state);
1467 state->ctdb_db = ctdb_db;
1468 talloc_set_destructor(state, ctdb_call_destructor);
1469
1470 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1471 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1472 struct ctdb_req_call_old);
1473 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1474 state->c->hdr.destnode = header->dmaster;
1475
1476 /* this limits us to 16k outstanding messages - not unreasonable */
1477 state->c->hdr.reqid = state->reqid;
1478 state->c->hdr.generation = ctdb_db->generation;
1479 state->c->flags = call->flags;
1480 state->c->db_id = ctdb_db->db_id;
1481 state->c->callid = call->call_id;
1482 state->c->hopcount = 0;
1483 state->c->keylen = call->key.dsize;
1484 state->c->calldatalen = call->call_data.dsize;
1485 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1486 memcpy(&state->c->data[call->key.dsize],
1487 call->call_data.dptr, call->call_data.dsize);
1488 *(state->call) = *call;
1489 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1490 state->call->key.dptr = &state->c->data[0];
1491
1492 state->state = CTDB_CALL_WAIT;
1493 state->generation = ctdb_db->generation;
1494
1495 DLIST_ADD(ctdb_db->pending_calls, state);
1496
1497 ctdb_queue_packet(ctdb, &state->c->hdr);
1498
1499 return state;
1500}
1501
1502/*
1503 make a remote ctdb call - async recv - called in daemon context
1504
1505 This is called when the program wants to wait for a ctdb_call to complete and get the
1506 results. This call will block unless the call has already completed.
1507*/
1508int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1509{
1510 while (state->state < CTDB_CALL_DONE) {
1511 tevent_loop_once(state->ctdb_db->ctdb->ev);
1512 }
1513 if (state->state != CTDB_CALL_DONE) {
1514 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1515 talloc_free(state);
1516 return -1;
1517 }
1518
1519 if (state->call->reply_data.dsize) {
1520 call->reply_data.dptr = talloc_memdup(call,
1521 state->call->reply_data.dptr,
1522 state->call->reply_data.dsize);
1523 call->reply_data.dsize = state->call->reply_data.dsize;
1524 } else {
1525 call->reply_data.dptr = NULL;
1526 call->reply_data.dsize = 0;
1527 }
1528 call->status = state->call->status;
1529 talloc_free(state);
1530 return 0;
1531}
1532
1533
1534/*
1535 send a keepalive packet to the other node
1536*/
1537void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1538{
1539 struct ctdb_req_keepalive_old *r;
1540
1541 if (ctdb->methods == NULL) {
1542 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1543 return;
1544 }
1545
1546 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1547 sizeof(struct ctdb_req_keepalive_old),
1548 struct ctdb_req_keepalive_old);
1549 CTDB_NO_MEMORY_FATAL(ctdb, r);
1550 r->hdr.destnode = destnode;
1551 r->hdr.reqid = 0;
1552
1553 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1554
1555 ctdb_queue_packet(ctdb, &r->hdr);
1556
1557 talloc_free(r);
1558}
1559
1560
1561
1562struct revokechild_deferred_call {
1563 struct ctdb_context *ctdb;
1564 struct ctdb_req_header *hdr;
1565 deferred_requeue_fn fn;
1566 void *ctx;
1567};
1568
1569struct revokechild_handle {
1570 struct revokechild_handle *next, *prev;
1571 struct ctdb_context *ctdb;
1572 struct ctdb_db_context *ctdb_db;
1573 struct tevent_fd *fde;
1574 int status;
1575 int fd[2];
1576 pid_t child;
1577 TDB_DATA key;
1578};
1579
1580struct revokechild_requeue_handle {
1581 struct ctdb_context *ctdb;
1582 struct ctdb_req_header *hdr;
1583 deferred_requeue_fn fn;
1584 void *ctx;
1585};
1586
1587static void deferred_call_requeue(struct tevent_context *ev,
1588 struct tevent_timer *te,
1589 struct timeval t, void *private_data)
1590{
1591 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1592
1593 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1594 talloc_free(requeue_handle);
1595}
1596
1597static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1598{
1599 struct ctdb_context *ctdb = deferred_call->ctdb;
1600 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1601 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
1602
1603 requeue_handle->ctdb = ctdb;
1604 requeue_handle->hdr = deferred_call->hdr;
1605 requeue_handle->fn = deferred_call->fn;
1606 requeue_handle->ctx = deferred_call->ctx;
1607 talloc_steal(requeue_handle, requeue_handle->hdr);
1608
1609 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1610 tevent_add_timer(ctdb->ev, requeue_handle,
1611 timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
1612 deferred_call_requeue, requeue_handle);
1613
1614 return 0;
1615}
1616
1617
1618static int revokechild_destructor(struct revokechild_handle *rc)
1619{
1620 if (rc->fde != NULL) {
1621 talloc_free(rc->fde);
1622 }
1623
1624 if (rc->fd[0] != -1) {
1625 close(rc->fd[0]);
1626 }
1627 if (rc->fd[1] != -1) {
1628 close(rc->fd[1]);
1629 }
1630 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1631
1632 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1633 return 0;
1634}
1635
1636static void revokechild_handler(struct tevent_context *ev,
1637 struct tevent_fd *fde,
1638 uint16_t flags, void *private_data)
1639{
1640 struct revokechild_handle *rc = talloc_get_type(private_data,
1641 struct revokechild_handle);
1642 int ret;
1643 char c;
1644
1645 ret = sys_read(rc->fd[0], &c, 1);
1646 if (ret != 1) {
1647 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1648 rc->status = -1;
1649 talloc_free(rc);
1650 return;
1651 }
1652 if (c != 0) {
1653 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1654 rc->status = -1;
1655 talloc_free(rc);
1656 return;
1657 }
1658
1659 talloc_free(rc);
1660}
1661
1662struct ctdb_revoke_state {
1663 struct ctdb_db_context *ctdb_db;
1664 TDB_DATA key;
1665 struct ctdb_ltdb_header *header;
1666 TDB_DATA data;
1667 int count;
1668 int status;
1669 int finished;
1670};
1671
1672static void update_record_cb(struct ctdb_client_control_state *state)
1673{
1674 struct ctdb_revoke_state *revoke_state;
1675 int ret;
1676 int32_t res;
1677
1678 if (state == NULL) {
1679 return;
1680 }
1681 revoke_state = state->async.private_data;
1682
1683 state->async.fn = NULL;
1684 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1685 if ((ret != 0) || (res != 0)) {
1686 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1687 revoke_state->status = -1;
1688 }
1689
1690 revoke_state->count--;
1691 if (revoke_state->count <= 0) {
1692 revoke_state->finished = 1;
1693 }
1694}
1695
1696static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1697{
1698 struct ctdb_revoke_state *revoke_state = private_data;
1699 struct ctdb_client_control_state *state;
1700
1701 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1702 if (state == NULL) {
1703 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1704 revoke_state->status = -1;
1705 return;
1706 }
1707 state->async.fn = update_record_cb;
1708 state->async.private_data = revoke_state;
1709
1710 revoke_state->count++;
1711
1712}
1713
1714static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1715 struct tevent_timer *te,
1716 struct timeval yt, void *private_data)
1717{
1718 struct ctdb_revoke_state *state = private_data;
1719
1720 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1721 state->finished = 1;
1722 state->status = -1;
1723}
1724
1725static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1726{
1727 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1728 struct ctdb_ltdb_header new_header;
1729 TDB_DATA new_data;
1730
1731 state->ctdb_db = ctdb_db;
1732 state->key = key;
1733 state->header = header;
1734 state->data = data;
1735
1736 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1737
1738 tevent_add_timer(ctdb->ev, state,
1739 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1740 ctdb_revoke_timeout_handler, state);
1741
1742 while (state->finished == 0) {
1743 tevent_loop_once(ctdb->ev);
1744 }
1745
1746 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1747 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1748 talloc_free(state);
1749 return -1;
1750 }
1751 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1752 ctdb_ltdb_unlock(ctdb_db, key);
1753 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1754 talloc_free(state);
1755 return -1;
1756 }
1757 header->rsn++;
1758 if (new_header.rsn > header->rsn) {
1759 ctdb_ltdb_unlock(ctdb_db, key);
1760 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1761 talloc_free(state);
1762 return -1;
1763 }
1764 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1765 ctdb_ltdb_unlock(ctdb_db, key);
1766 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1767 talloc_free(state);
1768 return -1;
1769 }
1770
1771 /*
1772 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1773 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1774 */
1775 if (state->status == 0) {
1776 new_header.rsn++;
1777 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1778 } else {
1779 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1780 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1781 }
1782 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1783 ctdb_ltdb_unlock(ctdb_db, key);
1784 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1785 talloc_free(state);
1786 return -1;
1787 }
1788 ctdb_ltdb_unlock(ctdb_db, key);
1789
1790 talloc_free(state);
1791 return 0;
1792}
1793
1794
1795int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1796{
1797 TDB_DATA tdata;
1798 struct revokechild_handle *rc;
1799 pid_t parent = getpid();
1800 int ret;
1801
1802 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1803 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1804 header->rsn -= 1;
1805
1806 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1807 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1808 return -1;
1809 }
1810
1811 tdata = tdb_fetch(ctdb_db->rottdb, key);
1812 if (tdata.dsize > 0) {
1813 uint8_t *tmp;
1814
1815 tmp = tdata.dptr;
1816 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1817 free(tmp);
1818 }
1819
1820 rc->status = 0;
1821 rc->ctdb = ctdb;
1822 rc->ctdb_db = ctdb_db;
1823 rc->fd[0] = -1;
1824 rc->fd[1] = -1;
1825
1826 talloc_set_destructor(rc, revokechild_destructor);
1827
1828 rc->key.dsize = key.dsize;
1829 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1830 if (rc->key.dptr == NULL) {
1831 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1832 talloc_free(rc);
1833 return -1;
1834 }
1835
1836 ret = pipe(rc->fd);
1837 if (ret != 0) {
1838 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1839 talloc_free(rc);
1840 return -1;
1841 }
1842
1843
1844 rc->child = ctdb_fork(ctdb);
1845 if (rc->child == (pid_t)-1) {
1846 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1847 talloc_free(rc);
1848 return -1;
1849 }
1850
1851 if (rc->child == 0) {
1852 char c = 0;
1853 close(rc->fd[0]);
1854 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1855
1856 prctl_set_comment("ctdb_revokechild");
1857 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1858 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1859 c = 1;
1860 goto child_finished;
1861 }
1862
1863 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1864
1865child_finished:
1866 sys_write(rc->fd[1], &c, 1);
1867 /* make sure we die when our parent dies */
1868 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1869 sleep(5);
1870 }
1871 _exit(0);
1872 }
1873
1874 close(rc->fd[1]);
1875 rc->fd[1] = -1;
1876 set_close_on_exec(rc->fd[0]);
1877
1878 /* This is an active revokechild child process */
1879 DLIST_ADD_END(ctdb_db->revokechild_active, rc);
1880
1881 rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1882 revokechild_handler, (void *)rc);
1883 if (rc->fde == NULL) {
1884 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1885 talloc_free(rc);
1886 }
1887 tevent_fd_set_auto_close(rc->fde);
1888
1889 return 0;
1890}
1891
1892int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1893{
1894 struct revokechild_handle *rc;
1895 struct revokechild_deferred_call *deferred_call;
1896
1897 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1898 if (rc->key.dsize == 0) {
1899 continue;
1900 }
1901 if (rc->key.dsize != key.dsize) {
1902 continue;
1903 }
1904 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1905 break;
1906 }
1907 }
1908
1909 if (rc == NULL) {
1910 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1911 return -1;
1912 }
1913
1914 deferred_call = talloc(rc, struct revokechild_deferred_call);
1915 if (deferred_call == NULL) {
1916 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1917 return -1;
1918 }
1919
1920 deferred_call->ctdb = ctdb;
1921 deferred_call->hdr = hdr;
1922 deferred_call->fn = fn;
1923 deferred_call->ctx = call_context;
1924
1925 talloc_set_destructor(deferred_call, deferred_call_destructor);
1926 talloc_steal(deferred_call, hdr);
1927
1928 return 0;
1929}
Note: See TracBrowser for help on using the repository browser.