]> git.proxmox.com Git - mirror_kronosnet.git/commitdiff
[stats] allow knet_link_get_status to operate in readlock context
authorFabio M. Di Nitto <fdinitto@redhat.com>
Thu, 20 Feb 2020 08:04:59 +0000 (09:04 +0100)
committerFabio M. Di Nitto <fdinitto@redhat.com>
Fri, 28 Feb 2020 05:48:34 +0000 (06:48 +0100)
- add per link stats mutex
- use per link stats mutex across the board

note: some threads need to lock for a slightly longer period of time than
strictly necessary to avoid reverse-order locking with other mutexes.

Signed-off-by: Christine Caulfield <ccaulfie@redhat.com>
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
libknet/internals.h
libknet/links.c
libknet/links.h
libknet/threads_heartbeat.c
libknet/threads_pmtud.c
libknet/threads_rx.c
libknet/threads_tx.c

index 0b10574198537680e44be172cd47d2327b1fa3b1..0a27008eefea2a3a6d413d9c0f6384bf152f30a7 100644 (file)
@@ -72,6 +72,7 @@ struct knet_link {
        /* status */
        struct knet_link_status status;
        /* internals */
+       pthread_mutex_t link_stats_mutex;       /* used to update link stats */
        uint8_t link_id;
        uint8_t transport;                      /* #defined constant from API */
        knet_transport_link_t transport_link;   /* link_info_t from transport */
index 3161586199bb00ff79f03989e37f6ebc43f3b898..b3e99e49f6fc35f9cabb021b48ad513f05d7890a 100644 (file)
 #include "links_acl.h"
 
 int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
-                unsigned int enabled, unsigned int connected)
+                unsigned int enabled, unsigned int connected, unsigned int lock_stats)
 {
        struct knet_host *host = knet_h->host_index[host_id];
        struct knet_link *link = &host->link[link_id];
        int notify_status = link->status.connected;
+       int savederrno = 0;
 
        if ((link->status.enabled == enabled) &&
            (link->status.connected == connected))
@@ -59,6 +60,16 @@ int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
            (!link->status.connected))
                link->status.dynconnected = 0;
 
+       if (lock_stats) {
+               savederrno = pthread_mutex_lock(&link->link_stats_mutex);
+               if (savederrno) {
+                       log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s",
+                               host_id, link_id, strerror(savederrno));
+                       errno = savederrno;
+                       return -1;
+               }
+       }
+
        if (connected) {
                time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
                link->status.stats.up_count++;
@@ -72,6 +83,10 @@ int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
                        link->status.stats.last_down_time_index = 0;
                }
        }
+
+       if (lock_stats) {
+               pthread_mutex_unlock(&link->link_stats_mutex);
+       }
        return 0;
 }
 
@@ -249,6 +264,13 @@ int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t l
        link->latency_cur_samples = 0;
        link->flags = flags;
 
+       savederrno = pthread_mutex_init(&link->link_stats_mutex, NULL);
+       if (savederrno) {
+               log_err(knet_h, KNET_SUB_LINK, "Unable to initialize link stats mutex: %s", strerror(savederrno));
+               err = -1;
+               goto exit_unlock;
+       }
+
        if (transport_link_set_config(knet_h, link, transport) < 0) {
                savederrno = errno;
                err = -1;
@@ -506,6 +528,8 @@ int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t
                check_rmall(knet_h, sock, transport);
        }
 
+       pthread_mutex_destroy(&link->link_stats_mutex);
+
        memset(link, 0, sizeof(struct knet_link));
        link->link_id = link_id;
 
@@ -579,7 +603,7 @@ int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t l
                goto exit_unlock;
        }
 
-       err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
+       err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected, 0);
        savederrno = errno;
 
        if (enabled) {
@@ -1121,7 +1145,7 @@ int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t l
                return -1;
        }
 
-       savederrno = get_global_wrlock(knet_h);
+       savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
        if (savederrno) {
                log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
                        strerror(savederrno));
@@ -1148,8 +1172,18 @@ int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t l
                goto exit_unlock;
        }
 
+       savederrno = pthread_mutex_lock(&link->link_stats_mutex);
+       if (savederrno) {
+               log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s",
+                       host_id, link_id, strerror(savederrno));
+               err = -1;
+               goto exit_unlock;
+       }
+
        memmove(status, &link->status, struct_size);
 
+       pthread_mutex_unlock(&link->link_stats_mutex);
+
        /* Calculate totals - no point in doing this on-the-fly */
        status->stats.rx_total_packets =
                status->stats.rx_data_packets +
index 66890a84c7c1500d41e6a13d1c6f560f38f6acca..1ba2b70adea679dec5c2993b83fb72569519dc51 100644 (file)
@@ -41,7 +41,7 @@
 #define KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX  128
 
 int _link_updown(knet_handle_t knet_h, knet_node_id_t node_id, uint8_t link_id,
-                unsigned int enabled, unsigned int connected);
+                unsigned int enabled, unsigned int connected, unsigned int lock_stats);
 
 void _link_clear_stats(knet_handle_t knet_h);
 
index 0c1f52add0982fe8381b840d17eb08d077e03946..c21ffac0d6d6aa9175941a65a456e099c48bb2a9 100644 (file)
@@ -31,13 +31,13 @@ static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct
        if (dst_link->status.connected == 1) {
                log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down",
                         dst_host->host_id, dst_link->link_id);
-               _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0);
+               _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0, 1);
        }
 }
 
 static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
 {
-       int err = 0, savederrno = 0;
+       int err = 0, savederrno = 0, stats_err = 0;
        int len;
        ssize_t outlen = KNET_HEADER_PING_SIZE;
        struct timespec clock_now, pong_last;
@@ -89,6 +89,13 @@ static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host,
                        pthread_mutex_unlock(&knet_h->handle_stats_mutex);
                }
 
+               stats_err = pthread_mutex_lock(&dst_link->link_stats_mutex);
+               if (stats_err) {
+                       log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get stats mutex lock for host %u link %u: %s",
+                               dst_host->host_id, dst_link->link_id, strerror(stats_err));
+                       return;
+               }
+
 retry:
                if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
                        len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
@@ -123,6 +130,7 @@ retry:
                } else {
                        dst_link->last_ping_size = outlen;
                }
+               pthread_mutex_unlock(&dst_link->link_stats_mutex);
        }
 
        timespec_diff(pong_last, clock_now, &diff_ping);
index d04c77c43e5baffa5983f477702466f08f5ccb0b..38528a83ebb4d6f5902013c4faa03479c63ed391 100644 (file)
@@ -188,9 +188,20 @@ restart:
 
        savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
        if (savederrno) {
+               pthread_mutex_unlock(&knet_h->pmtud_mutex);
                log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
                return -1;
        }
+
+       savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex);
+       if (savederrno) {
+               pthread_mutex_unlock(&knet_h->pmtud_mutex);
+               pthread_mutex_unlock(&knet_h->tx_mutex);
+               log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s",
+                       dst_host->host_id, dst_link->link_id, strerror(savederrno));
+               return -1;
+       }
+
 retry:
        if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
                len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL,
@@ -223,6 +234,7 @@ retry:
                        pthread_mutex_unlock(&knet_h->tx_mutex);
                        pthread_mutex_unlock(&knet_h->pmtud_mutex);
                        dst_link->status.stats.tx_pmtu_errors++;
+                       pthread_mutex_unlock(&dst_link->link_stats_mutex);
                        return -1;
                case 0: /* ignore error and continue */
                        break;
@@ -235,6 +247,7 @@ retry:
        pthread_mutex_unlock(&knet_h->tx_mutex);
 
        if (len != (ssize_t )data_len) {
+               pthread_mutex_unlock(&dst_link->link_stats_mutex);
                if (savederrno == EMSGSIZE) {
                        /*
                         * we cannot hold a lock on kmtu_mutex between resetting
@@ -263,6 +276,7 @@ retry:
                dst_link->last_recv_mtu = 0;
                dst_link->status.stats.tx_pmtu_packets++;
                dst_link->status.stats.tx_pmtu_bytes += data_len;
+               pthread_mutex_unlock(&dst_link->link_stats_mutex);
 
                if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
                        log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
index ca2a8e9e31bb45668efb3126207cae33f466249e..ccef73b98816770d7a69f74a5892a56e77f70581 100644 (file)
@@ -286,8 +286,6 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                return;
        }
 
-       src_link = NULL;
-
        src_link = src_host->link +
                (inbuf->khp_ping_link % KNET_MAX_LINK);
        if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
@@ -324,10 +322,18 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                }
        }
 
+       stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
+       if (stats_err) {
+               log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
+                       src_host->host_id, src_link->link_id, strerror(savederrno));
+               return;
+       }
+
        switch (inbuf->kh_type) {
        case KNET_HEADER_TYPE_HOST_INFO:
        case KNET_HEADER_TYPE_DATA:
                if (!src_host->status.reachable) {
+                       pthread_mutex_unlock(&src_link->link_stats_mutex);
                        log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
                        return;
                }
@@ -335,12 +341,11 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                channel = inbuf->khp_data_channel;
                src_host->got_data = 1;
 
-               if (src_link) {
-                       src_link->status.stats.rx_data_packets++;
-                       src_link->status.stats.rx_data_bytes += len;
-               }
+               src_link->status.stats.rx_data_packets++;
+               src_link->status.stats.rx_data_bytes += len;
 
                if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+                       pthread_mutex_unlock(&src_link->link_stats_mutex);
                        if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
                                log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
                        }
@@ -356,6 +361,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                         */
                        len = len - KNET_HEADER_DATA_SIZE;
                        if (pckt_defrag(knet_h, inbuf, &len)) {
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                return;
                        }
                        len = len + KNET_HEADER_DATA_SIZE;
@@ -376,6 +382,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
 
                        stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
                        if (stats_err < 0) {
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
                                return;
                        }
@@ -404,6 +411,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                        } else {
                                knet_h->stats.rx_failed_to_decompress++;
                                pthread_mutex_unlock(&knet_h->handle_stats_mutex);
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
                                         err, strerror(errno));
                                return;
@@ -415,6 +423,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                        if (knet_h->crypto_instance) {
                                stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
                                if (stats_err < 0) {
+                                       pthread_mutex_unlock(&src_link->link_stats_mutex);
                                        log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
                                        return;
                                }
@@ -451,11 +460,13 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                                                dst_host_ids,
                                                &dst_host_ids_entries);
                                if (bcast < 0) {
+                                       pthread_mutex_unlock(&src_link->link_stats_mutex);
                                        log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
                                        return;
                                }
 
                                if ((!bcast) && (!dst_host_ids_entries)) {
+                                       pthread_mutex_unlock(&src_link->link_stats_mutex);
                                        log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
                                        return;
                                }
@@ -463,6 +474,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                                /* check if we are dst for this packet */
                                if (!bcast) {
                                        if (dst_host_ids_entries > KNET_MAX_HOST) {
+                                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                                log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
                                                return;
                                        }
@@ -473,6 +485,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
                                                }
                                        }
                                        if (!found) {
+                                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                                log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
                                                return;
                                        }
@@ -482,6 +495,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
 
                if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
                        if (!knet_h->sockfd[channel].in_use) {
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                log_debug(knet_h, KNET_SUB_RX,
                                          "received packet for channel %d but there is no local sock connected",
                                          channel);
@@ -510,6 +524,7 @@ retry:
                                                       KNET_NOTIFY_RX,
                                                       outlen,
                                                       errno);
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                return;
                        }
                        if ((size_t)outlen == iov_out[0].iov_len) {
@@ -521,6 +536,7 @@ retry:
                                knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
                        }
                        if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                return;
                        }
                        _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
@@ -668,7 +684,7 @@ retry_pong:
                                        if (src_link->received_pong >= src_link->pong_count) {
                                                log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
                                                         src_host->host_id, src_link->link_id);
-                                               _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+                                               _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
                                        } else {
                                                src_link->received_pong++;
                                                log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
@@ -720,6 +736,9 @@ retry_pong:
                        pthread_mutex_unlock(&knet_h->handle_stats_mutex);
                }
 
+               /* Unlock so we don't deadlock with tx_mutex */
+               pthread_mutex_unlock(&src_link->link_stats_mutex);
+
                savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
                if (savederrno) {
                        log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
@@ -736,6 +755,11 @@ retry_pmtud:
                        savederrno = errno;
                        if (len != outlen) {
                                err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
+                               stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
+                               if (stats_err < 0) {
+                                       log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
+                                       break;
+                               }
                                switch(err) {
                                        case -1: /* unrecoverable error */
                                                log_debug(knet_h, KNET_SUB_RX,
@@ -751,17 +775,23 @@ retry_pmtud:
                                                break;
                                        case 1: /* retry to send those same data */
                                                src_link->status.stats.tx_pmtu_retries++;
+                                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                                                goto retry_pmtud;
                                                break;
                                }
+                               pthread_mutex_unlock(&src_link->link_stats_mutex);
                        }
                }
                pthread_mutex_unlock(&knet_h->tx_mutex);
 out_pmtud:
-               break;
+               return; /* Don't need to unlock link_stats_mutex */
        case KNET_HEADER_TYPE_PMTUD_REPLY:
                src_link->status.stats.rx_pmtu_packets++;
                src_link->status.stats.rx_pmtu_bytes += len;
+
+               /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
+               pthread_mutex_unlock(&src_link->link_stats_mutex);
+
                if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
                        log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
                        break;
@@ -769,10 +799,12 @@ out_pmtud:
                src_link->last_recv_mtu = inbuf->khp_pmtud_size;
                pthread_cond_signal(&knet_h->pmtud_cond);
                pthread_mutex_unlock(&knet_h->pmtud_mutex);
-               break;
+               return;
        default:
+               pthread_mutex_unlock(&src_link->link_stats_mutex);
                return;
        }
+       pthread_mutex_unlock(&src_link->link_stats_mutex);
 }
 
 static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
index 671ec0cad6b0d9aa7afa5d5f77d9751b52279388..bfce36d8fea003fda145613d7ce6da3a5be310d8 100644 (file)
@@ -36,7 +36,7 @@
 static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
 {
        int link_idx, msg_idx, sent_msgs, prev_sent, progress;
-       int err = 0, savederrno = 0;
+       int err = 0, savederrno = 0, locked = 0;
        unsigned int i;
        struct knet_mmsghdr *cur;
        struct knet_link *cur_link;
@@ -44,6 +44,7 @@ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,
        for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
                prev_sent = 0;
                progress = 1;
+               locked = 0;
 
                cur_link = &dst_host->link[dst_host->active_links[link_idx]];
 
@@ -51,6 +52,14 @@ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,
                        continue;
                }
 
+               savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
+               if (savederrno) {
+                       log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
+                               dst_host->host_id, cur_link->link_id, strerror(savederrno));
+                       continue;
+               }
+               locked = 1;
+
                msg_idx = 0;
                while (msg_idx < msgs_to_send) {
                        msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
@@ -120,9 +129,14 @@ retry:
 
                        break;
                }
+               pthread_mutex_unlock(&cur_link->link_stats_mutex);
+               locked = 0;
        }
 
 out_unlock:
+       if (locked) {
+               pthread_mutex_unlock(&cur_link->link_stats_mutex);
+       }
        errno = savederrno;
        return err;
 }