1 | /*
|
---|
2 | ctdb over TCP
|
---|
3 |
|
---|
4 | Copyright (C) Andrew Tridgell 2006
|
---|
5 | Copyright (C) Ronnie Sahlberg 2008
|
---|
6 |
|
---|
7 | This program is free software; you can redistribute it and/or modify
|
---|
8 | it under the terms of the GNU General Public License as published by
|
---|
9 | the Free Software Foundation; either version 3 of the License, or
|
---|
10 | (at your option) any later version.
|
---|
11 |
|
---|
12 | This program is distributed in the hope that it will be useful,
|
---|
13 | but WITHOUT ANY WARRANTY; without even the implied warranty of
|
---|
14 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
---|
15 | GNU General Public License for more details.
|
---|
16 |
|
---|
17 | You should have received a copy of the GNU General Public License
|
---|
18 | along with this program; if not, see <http://www.gnu.org/licenses/>.
|
---|
19 | */
|
---|
20 |
|
---|
21 | #include "replace.h"
|
---|
22 | #include "system/network.h"
|
---|
23 | #include "system/filesys.h"
|
---|
24 |
|
---|
25 | #include <talloc.h>
|
---|
26 | #include <tevent.h>
|
---|
27 |
|
---|
28 | #include "lib/util/debug.h"
|
---|
29 | #include "lib/util/time.h"
|
---|
30 |
|
---|
31 | #include "ctdb_private.h"
|
---|
32 |
|
---|
33 | #include "common/system.h"
|
---|
34 | #include "common/common.h"
|
---|
35 | #include "common/logging.h"
|
---|
36 |
|
---|
37 | #include "ctdb_tcp.h"
|
---|
38 |
|
---|
39 | /*
|
---|
40 | stop any connecting (established or pending) to a node
|
---|
41 | */
|
---|
42 | void ctdb_tcp_stop_connection(struct ctdb_node *node)
|
---|
43 | {
|
---|
44 | struct ctdb_tcp_node *tnode = talloc_get_type(
|
---|
45 | node->private_data, struct ctdb_tcp_node);
|
---|
46 |
|
---|
47 | ctdb_queue_set_fd(tnode->out_queue, -1);
|
---|
48 | talloc_free(tnode->connect_te);
|
---|
49 | talloc_free(tnode->connect_fde);
|
---|
50 | tnode->connect_fde = NULL;
|
---|
51 | tnode->connect_te = NULL;
|
---|
52 | if (tnode->fd != -1) {
|
---|
53 | close(tnode->fd);
|
---|
54 | tnode->fd = -1;
|
---|
55 | }
|
---|
56 | }
|
---|
57 |
|
---|
58 |
|
---|
59 | /*
|
---|
60 | called when a complete packet has come in - should not happen on this socket
|
---|
61 | unless the other side closes the connection with RST or FIN
|
---|
62 | */
|
---|
63 | void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
|
---|
64 | {
|
---|
65 | struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node);
|
---|
66 | struct ctdb_tcp_node *tnode = talloc_get_type(
|
---|
67 | node->private_data, struct ctdb_tcp_node);
|
---|
68 |
|
---|
69 | if (data == NULL) {
|
---|
70 | node->ctdb->upcalls->node_dead(node);
|
---|
71 | }
|
---|
72 |
|
---|
73 | ctdb_tcp_stop_connection(node);
|
---|
74 | tnode->connect_te = tevent_add_timer(node->ctdb->ev, tnode,
|
---|
75 | timeval_current_ofs(3, 0),
|
---|
76 | ctdb_tcp_node_connect, node);
|
---|
77 | }
|
---|
78 |
|
---|
79 | /*
|
---|
80 | called when socket becomes writeable on connect
|
---|
81 | */
|
---|
82 | static void ctdb_node_connect_write(struct tevent_context *ev,
|
---|
83 | struct tevent_fd *fde,
|
---|
84 | uint16_t flags, void *private_data)
|
---|
85 | {
|
---|
86 | struct ctdb_node *node = talloc_get_type(private_data,
|
---|
87 | struct ctdb_node);
|
---|
88 | struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
|
---|
89 | struct ctdb_tcp_node);
|
---|
90 | struct ctdb_context *ctdb = node->ctdb;
|
---|
91 | int error = 0;
|
---|
92 | socklen_t len = sizeof(error);
|
---|
93 | int one = 1;
|
---|
94 |
|
---|
95 | talloc_free(tnode->connect_te);
|
---|
96 | tnode->connect_te = NULL;
|
---|
97 |
|
---|
98 | if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
|
---|
99 | error != 0) {
|
---|
100 | ctdb_tcp_stop_connection(node);
|
---|
101 | tnode->connect_te = tevent_add_timer(ctdb->ev, tnode,
|
---|
102 | timeval_current_ofs(1, 0),
|
---|
103 | ctdb_tcp_node_connect, node);
|
---|
104 | return;
|
---|
105 | }
|
---|
106 |
|
---|
107 | talloc_free(tnode->connect_fde);
|
---|
108 | tnode->connect_fde = NULL;
|
---|
109 |
|
---|
110 | if (setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)) == -1) {
|
---|
111 | DEBUG(DEBUG_WARNING, ("Failed to set TCP_NODELAY on fd - %s\n",
|
---|
112 | strerror(errno)));
|
---|
113 | }
|
---|
114 | if (setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one)) == -1) {
|
---|
115 | DEBUG(DEBUG_WARNING, ("Failed to set KEEPALIVE on fd - %s\n",
|
---|
116 | strerror(errno)));
|
---|
117 | }
|
---|
118 |
|
---|
119 | ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
|
---|
120 |
|
---|
121 | /* the queue subsystem now owns this fd */
|
---|
122 | tnode->fd = -1;
|
---|
123 | }
|
---|
124 |
|
---|
125 |
|
---|
126 | /*
|
---|
127 | called when we should try and establish a tcp connection to a node
|
---|
128 | */
|
---|
129 | void ctdb_tcp_node_connect(struct tevent_context *ev, struct tevent_timer *te,
|
---|
130 | struct timeval t, void *private_data)
|
---|
131 | {
|
---|
132 | struct ctdb_node *node = talloc_get_type(private_data,
|
---|
133 | struct ctdb_node);
|
---|
134 | struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
|
---|
135 | struct ctdb_tcp_node);
|
---|
136 | struct ctdb_context *ctdb = node->ctdb;
|
---|
137 | ctdb_sock_addr sock_in;
|
---|
138 | int sockin_size;
|
---|
139 | int sockout_size;
|
---|
140 | ctdb_sock_addr sock_out;
|
---|
141 |
|
---|
142 | ctdb_tcp_stop_connection(node);
|
---|
143 |
|
---|
144 | sock_out = node->address;
|
---|
145 |
|
---|
146 | tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
|
---|
147 | if (tnode->fd == -1) {
|
---|
148 | DEBUG(DEBUG_ERR, (__location__ "Failed to create socket\n"));
|
---|
149 | return;
|
---|
150 | }
|
---|
151 | set_nonblocking(tnode->fd);
|
---|
152 | set_close_on_exec(tnode->fd);
|
---|
153 |
|
---|
154 | DEBUG(DEBUG_DEBUG, (__location__ " Created TCP SOCKET FD:%d\n", tnode->fd));
|
---|
155 |
|
---|
156 | /* Bind our side of the socketpair to the same address we use to listen
|
---|
157 | * on incoming CTDB traffic.
|
---|
158 | * We must specify this address to make sure that the address we expose to
|
---|
159 | * the remote side is actually routable in case CTDB traffic will run on
|
---|
160 | * a dedicated non-routeable network.
|
---|
161 | */
|
---|
162 | sock_in = *ctdb->address;
|
---|
163 |
|
---|
164 | /* AIX libs check to see if the socket address and length
|
---|
165 | arguments are consistent with each other on calls like
|
---|
166 | connect(). Can not get by with just sizeof(sock_in),
|
---|
167 | need sizeof(sock_in.ip).
|
---|
168 | */
|
---|
169 | switch (sock_in.sa.sa_family) {
|
---|
170 | case AF_INET:
|
---|
171 | sock_in.ip.sin_port = 0 /* Any port */;
|
---|
172 | sockin_size = sizeof(sock_in.ip);
|
---|
173 | sockout_size = sizeof(sock_out.ip);
|
---|
174 | break;
|
---|
175 | case AF_INET6:
|
---|
176 | sock_in.ip6.sin6_port = 0 /* Any port */;
|
---|
177 | sockin_size = sizeof(sock_in.ip6);
|
---|
178 | sockout_size = sizeof(sock_out.ip6);
|
---|
179 | break;
|
---|
180 | default:
|
---|
181 | DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
|
---|
182 | sock_in.sa.sa_family));
|
---|
183 | close(tnode->fd);
|
---|
184 | return;
|
---|
185 | }
|
---|
186 |
|
---|
187 | if (bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size) == -1) {
|
---|
188 | DEBUG(DEBUG_ERR, (__location__ "Failed to bind socket %s(%d)\n",
|
---|
189 | strerror(errno), errno));
|
---|
190 | close(tnode->fd);
|
---|
191 | return;
|
---|
192 | }
|
---|
193 |
|
---|
194 | if (connect(tnode->fd, (struct sockaddr *)&sock_out, sockout_size) != 0 &&
|
---|
195 | errno != EINPROGRESS) {
|
---|
196 | ctdb_tcp_stop_connection(node);
|
---|
197 | tnode->connect_te = tevent_add_timer(ctdb->ev, tnode,
|
---|
198 | timeval_current_ofs(1, 0),
|
---|
199 | ctdb_tcp_node_connect, node);
|
---|
200 | return;
|
---|
201 | }
|
---|
202 |
|
---|
203 | /* non-blocking connect - wait for write event */
|
---|
204 | tnode->connect_fde = tevent_add_fd(node->ctdb->ev, tnode, tnode->fd,
|
---|
205 | TEVENT_FD_WRITE|TEVENT_FD_READ,
|
---|
206 | ctdb_node_connect_write, node);
|
---|
207 |
|
---|
208 | /* don't give it long to connect - retry in one second. This ensures
|
---|
209 | that we find a node is up quickly (tcp normally backs off a syn reply
|
---|
210 | delay by quite a lot) */
|
---|
211 | tnode->connect_te = tevent_add_timer(ctdb->ev, tnode,
|
---|
212 | timeval_current_ofs(1, 0),
|
---|
213 | ctdb_tcp_node_connect, node);
|
---|
214 | }
|
---|
215 |
|
---|
216 | /*
|
---|
217 | called when we get contacted by another node
|
---|
218 | currently makes no attempt to check if the connection is really from a ctdb
|
---|
219 | node in our cluster
|
---|
220 | */
|
---|
221 | static void ctdb_listen_event(struct tevent_context *ev, struct tevent_fd *fde,
|
---|
222 | uint16_t flags, void *private_data)
|
---|
223 | {
|
---|
224 | struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
|
---|
225 | struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
|
---|
226 | ctdb_sock_addr addr;
|
---|
227 | socklen_t len;
|
---|
228 | int fd, nodeid;
|
---|
229 | struct ctdb_incoming *in;
|
---|
230 | int one = 1;
|
---|
231 |
|
---|
232 | memset(&addr, 0, sizeof(addr));
|
---|
233 | len = sizeof(addr);
|
---|
234 | fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
|
---|
235 | if (fd == -1) return;
|
---|
236 |
|
---|
237 | nodeid = ctdb_ip_to_nodeid(ctdb, &addr);
|
---|
238 |
|
---|
239 | if (nodeid == -1) {
|
---|
240 | DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", ctdb_addr_to_str(&addr)));
|
---|
241 | close(fd);
|
---|
242 | return;
|
---|
243 | }
|
---|
244 |
|
---|
245 | in = talloc_zero(ctcp, struct ctdb_incoming);
|
---|
246 | in->fd = fd;
|
---|
247 | in->ctdb = ctdb;
|
---|
248 |
|
---|
249 | set_nonblocking(in->fd);
|
---|
250 | set_close_on_exec(in->fd);
|
---|
251 |
|
---|
252 | DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
|
---|
253 |
|
---|
254 | if (setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one)) == -1) {
|
---|
255 | DEBUG(DEBUG_WARNING, ("Failed to set KEEPALIVE on fd - %s\n",
|
---|
256 | strerror(errno)));
|
---|
257 | }
|
---|
258 |
|
---|
259 | in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
|
---|
260 | ctdb_tcp_read_cb, in, "ctdbd-%s", ctdb_addr_to_str(&addr));
|
---|
261 | }
|
---|
262 |
|
---|
263 |
|
---|
264 | /*
|
---|
265 | automatically find which address to listen on
|
---|
266 | */
|
---|
267 | static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
|
---|
268 | {
|
---|
269 | struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
|
---|
270 | struct ctdb_tcp);
|
---|
271 | ctdb_sock_addr sock;
|
---|
272 | int lock_fd, i;
|
---|
273 | const char *lock_path = CTDB_RUNDIR "/.socket_lock";
|
---|
274 | struct flock lock;
|
---|
275 | int one = 1;
|
---|
276 | int sock_size;
|
---|
277 | struct tevent_fd *fde;
|
---|
278 |
|
---|
279 | /* If there are no nodes, then it won't be possible to find
|
---|
280 | * the first one. Log a failure and short circuit the whole
|
---|
281 | * process.
|
---|
282 | */
|
---|
283 | if (ctdb->num_nodes == 0) {
|
---|
284 | DEBUG(DEBUG_CRIT,("No nodes available to attempt bind to - is the nodes file empty?\n"));
|
---|
285 | return -1;
|
---|
286 | }
|
---|
287 |
|
---|
288 | /* in order to ensure that we don't get two nodes with the
|
---|
289 | same adddress, we must make the bind() and listen() calls
|
---|
290 | atomic. The SO_REUSEADDR setsockopt only prevents double
|
---|
291 | binds if the first socket is in LISTEN state */
|
---|
292 | lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
|
---|
293 | if (lock_fd == -1) {
|
---|
294 | DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
|
---|
295 | return -1;
|
---|
296 | }
|
---|
297 |
|
---|
298 | lock.l_type = F_WRLCK;
|
---|
299 | lock.l_whence = SEEK_SET;
|
---|
300 | lock.l_start = 0;
|
---|
301 | lock.l_len = 1;
|
---|
302 | lock.l_pid = 0;
|
---|
303 |
|
---|
304 | if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
|
---|
305 | DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
|
---|
306 | close(lock_fd);
|
---|
307 | return -1;
|
---|
308 | }
|
---|
309 |
|
---|
310 | for (i=0; i < ctdb->num_nodes; i++) {
|
---|
311 | if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
|
---|
312 | continue;
|
---|
313 | }
|
---|
314 | sock = ctdb->nodes[i]->address;
|
---|
315 |
|
---|
316 | switch (sock.sa.sa_family) {
|
---|
317 | case AF_INET:
|
---|
318 | sock_size = sizeof(sock.ip);
|
---|
319 | break;
|
---|
320 | case AF_INET6:
|
---|
321 | sock_size = sizeof(sock.ip6);
|
---|
322 | break;
|
---|
323 | default:
|
---|
324 | DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
|
---|
325 | sock.sa.sa_family));
|
---|
326 | continue;
|
---|
327 | }
|
---|
328 |
|
---|
329 | ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
|
---|
330 | if (ctcp->listen_fd == -1) {
|
---|
331 | ctdb_set_error(ctdb, "socket failed\n");
|
---|
332 | continue;
|
---|
333 | }
|
---|
334 |
|
---|
335 | set_close_on_exec(ctcp->listen_fd);
|
---|
336 |
|
---|
337 | if (setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,
|
---|
338 | (char *)&one,sizeof(one)) == -1) {
|
---|
339 | DEBUG(DEBUG_WARNING, ("Failed to set REUSEADDR on fd - %s\n",
|
---|
340 | strerror(errno)));
|
---|
341 | }
|
---|
342 |
|
---|
343 | if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
|
---|
344 | break;
|
---|
345 | }
|
---|
346 |
|
---|
347 | if (errno == EADDRNOTAVAIL) {
|
---|
348 | DEBUG(DEBUG_DEBUG,(__location__ " Failed to bind() to socket. %s(%d)\n",
|
---|
349 | strerror(errno), errno));
|
---|
350 | } else {
|
---|
351 | DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n",
|
---|
352 | strerror(errno), errno));
|
---|
353 | }
|
---|
354 |
|
---|
355 | close(ctcp->listen_fd);
|
---|
356 | ctcp->listen_fd = -1;
|
---|
357 | }
|
---|
358 |
|
---|
359 | if (i == ctdb->num_nodes) {
|
---|
360 | DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
|
---|
361 | goto failed;
|
---|
362 | }
|
---|
363 | ctdb->address = talloc_memdup(ctdb,
|
---|
364 | &ctdb->nodes[i]->address,
|
---|
365 | sizeof(ctdb_sock_addr));
|
---|
366 | if (ctdb->address == NULL) {
|
---|
367 | ctdb_set_error(ctdb, "Out of memory at %s:%d",
|
---|
368 | __FILE__, __LINE__);
|
---|
369 | goto failed;
|
---|
370 | }
|
---|
371 |
|
---|
372 | ctdb->name = talloc_asprintf(ctdb, "%s:%u",
|
---|
373 | ctdb_addr_to_str(ctdb->address),
|
---|
374 | ctdb_addr_to_port(ctdb->address));
|
---|
375 | if (ctdb->name == NULL) {
|
---|
376 | ctdb_set_error(ctdb, "Out of memory at %s:%d",
|
---|
377 | __FILE__, __LINE__);
|
---|
378 | goto failed;
|
---|
379 | }
|
---|
380 | DEBUG(DEBUG_INFO,("ctdb chose network address %s\n", ctdb->name));
|
---|
381 |
|
---|
382 | if (listen(ctcp->listen_fd, 10) == -1) {
|
---|
383 | goto failed;
|
---|
384 | }
|
---|
385 |
|
---|
386 | fde = tevent_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, TEVENT_FD_READ,
|
---|
387 | ctdb_listen_event, ctdb);
|
---|
388 | tevent_fd_set_auto_close(fde);
|
---|
389 |
|
---|
390 | close(lock_fd);
|
---|
391 |
|
---|
392 | return 0;
|
---|
393 |
|
---|
394 | failed:
|
---|
395 | close(lock_fd);
|
---|
396 | if (ctcp->listen_fd != -1) {
|
---|
397 | close(ctcp->listen_fd);
|
---|
398 | ctcp->listen_fd = -1;
|
---|
399 | }
|
---|
400 | return -1;
|
---|
401 | }
|
---|
402 |
|
---|
403 |
|
---|
404 | /*
|
---|
405 | listen on our own address
|
---|
406 | */
|
---|
407 | int ctdb_tcp_listen(struct ctdb_context *ctdb)
|
---|
408 | {
|
---|
409 | struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
|
---|
410 | struct ctdb_tcp);
|
---|
411 | ctdb_sock_addr sock;
|
---|
412 | int sock_size;
|
---|
413 | int one = 1;
|
---|
414 | struct tevent_fd *fde;
|
---|
415 |
|
---|
416 | /* we can either auto-bind to the first available address, or we can
|
---|
417 | use a specified address */
|
---|
418 | if (!ctdb->address) {
|
---|
419 | return ctdb_tcp_listen_automatic(ctdb);
|
---|
420 | }
|
---|
421 |
|
---|
422 | sock = *ctdb->address;
|
---|
423 |
|
---|
424 | switch (sock.sa.sa_family) {
|
---|
425 | case AF_INET:
|
---|
426 | sock_size = sizeof(sock.ip);
|
---|
427 | break;
|
---|
428 | case AF_INET6:
|
---|
429 | sock_size = sizeof(sock.ip6);
|
---|
430 | break;
|
---|
431 | default:
|
---|
432 | DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
|
---|
433 | sock.sa.sa_family));
|
---|
434 | goto failed;
|
---|
435 | }
|
---|
436 |
|
---|
437 | ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
|
---|
438 | if (ctcp->listen_fd == -1) {
|
---|
439 | ctdb_set_error(ctdb, "socket failed\n");
|
---|
440 | return -1;
|
---|
441 | }
|
---|
442 |
|
---|
443 | set_close_on_exec(ctcp->listen_fd);
|
---|
444 |
|
---|
445 | if (setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one)) == -1) {
|
---|
446 | DEBUG(DEBUG_WARNING, ("Failed to set REUSEADDR on fd - %s\n",
|
---|
447 | strerror(errno)));
|
---|
448 | }
|
---|
449 |
|
---|
450 | if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0) {
|
---|
451 | DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n", strerror(errno), errno));
|
---|
452 | goto failed;
|
---|
453 | }
|
---|
454 |
|
---|
455 | if (listen(ctcp->listen_fd, 10) == -1) {
|
---|
456 | goto failed;
|
---|
457 | }
|
---|
458 |
|
---|
459 | fde = tevent_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, TEVENT_FD_READ,
|
---|
460 | ctdb_listen_event, ctdb);
|
---|
461 | tevent_fd_set_auto_close(fde);
|
---|
462 |
|
---|
463 | return 0;
|
---|
464 |
|
---|
465 | failed:
|
---|
466 | if (ctcp->listen_fd != -1) {
|
---|
467 | close(ctcp->listen_fd);
|
---|
468 | }
|
---|
469 | ctcp->listen_fd = -1;
|
---|
470 | return -1;
|
---|
471 | }
|
---|
472 |
|
---|