/* status */
struct knet_link_status status;
/* internals */
+ pthread_mutex_t link_stats_mutex; /* used to update link stats */
uint8_t link_id;
uint8_t transport; /* #defined constant from API */
knet_transport_link_t transport_link; /* link_info_t from transport */
#include "links_acl.h"
int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
- unsigned int enabled, unsigned int connected)
+ unsigned int enabled, unsigned int connected, unsigned int lock_stats)
{
struct knet_host *host = knet_h->host_index[host_id];
struct knet_link *link = &host->link[link_id];
int notify_status = link->status.connected;
+ int savederrno = 0;
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
(!link->status.connected))
link->status.dynconnected = 0;
+ if (lock_stats) {
+ savederrno = pthread_mutex_lock(&link->link_stats_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s",
+ host_id, link_id, strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+ }
+
if (connected) {
time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
link->status.stats.up_count++;
link->status.stats.last_down_time_index = 0;
}
}
+
+ if (lock_stats) {
+ pthread_mutex_unlock(&link->link_stats_mutex);
+ }
return 0;
}
link->latency_cur_samples = 0;
link->flags = flags;
+ savederrno = pthread_mutex_init(&link->link_stats_mutex, NULL);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_LINK, "Unable to initialize link stats mutex: %s", strerror(savederrno));
+ err = -1;
+ goto exit_unlock;
+ }
+
if (transport_link_set_config(knet_h, link, transport) < 0) {
savederrno = errno;
err = -1;
check_rmall(knet_h, sock, transport);
}
+ pthread_mutex_destroy(&link->link_stats_mutex);
+
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
goto exit_unlock;
}
- err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
+ err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected, 0);
savederrno = errno;
if (enabled) {
return -1;
}
- savederrno = get_global_wrlock(knet_h);
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
goto exit_unlock;
}
+ savederrno = pthread_mutex_lock(&link->link_stats_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s",
+ host_id, link_id, strerror(savederrno));
+ err = -1;
+ goto exit_unlock;
+ }
+
memmove(status, &link->status, struct_size);
+ pthread_mutex_unlock(&link->link_stats_mutex);
+
/* Calculate totals - no point in doing this on-the-fly */
status->stats.rx_total_packets =
status->stats.rx_data_packets +
#define KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX 128
int _link_updown(knet_handle_t knet_h, knet_node_id_t node_id, uint8_t link_id,
- unsigned int enabled, unsigned int connected);
+ unsigned int enabled, unsigned int connected, unsigned int lock_stats);
void _link_clear_stats(knet_handle_t knet_h);
if (dst_link->status.connected == 1) {
log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down",
dst_host->host_id, dst_link->link_id);
- _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0);
+ _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0, 1);
}
}
static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
{
- int err = 0, savederrno = 0;
+ int err = 0, savederrno = 0, stats_err = 0;
int len;
ssize_t outlen = KNET_HEADER_PING_SIZE;
struct timespec clock_now, pong_last;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
+ stats_err = pthread_mutex_lock(&dst_link->link_stats_mutex);
+ if (stats_err) {
+ log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get stats mutex lock for host %u link %u: %s",
+ dst_host->host_id, dst_link->link_id, strerror(stats_err));
+ return;
+ }
+
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
} else {
dst_link->last_ping_size = outlen;
}
+ pthread_mutex_unlock(&dst_link->link_stats_mutex);
}
timespec_diff(pong_last, clock_now, &diff_ping);
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
return -1;
}
+
+ savederrno = pthread_mutex_lock(&dst_link->link_stats_mutex);
+ if (savederrno) {
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+ log_err(knet_h, KNET_SUB_PMTUD, "Unable to get stats mutex lock for host %u link %u: %s",
+ dst_host->host_id, dst_link->link_id, strerror(savederrno));
+ return -1;
+ }
+
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL,
pthread_mutex_unlock(&knet_h->tx_mutex);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
+ pthread_mutex_unlock(&dst_link->link_stats_mutex);
return -1;
case 0: /* ignore error and continue */
break;
pthread_mutex_unlock(&knet_h->tx_mutex);
if (len != (ssize_t )data_len) {
+ pthread_mutex_unlock(&dst_link->link_stats_mutex);
if (savederrno == EMSGSIZE) {
/*
* we cannot hold a lock on kmtu_mutex between resetting
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
+ pthread_mutex_unlock(&dst_link->link_stats_mutex);
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
return;
}
- src_link = NULL;
-
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
}
}
+ stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
+ if (stats_err) {
+ log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
+ src_host->host_id, src_link->link_id, strerror(savederrno));
+ return;
+ }
+
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
if (!src_host->status.reachable) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
- if (src_link) {
- src_link->status.stats.rx_data_packets++;
- src_link->status.stats.rx_data_bytes += len;
- }
+ src_link->status.stats.rx_data_packets++;
+ src_link->status.stats.rx_data_bytes += len;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
len = len + KNET_HEADER_DATA_SIZE;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
if (knet_h->crypto_instance) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
}
}
if (!found) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
KNET_NOTIFY_RX,
outlen,
errno);
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
- _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+ _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
+ /* Unlock so we don't deadlock with tx_mutex */
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
+ stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
+ if (stats_err < 0) {
+ log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
+ break;
+ }
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
- break;
+ return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
+
+ /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
- break;
+ return;
default:
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
- int err = 0, savederrno = 0;
+ int err = 0, savederrno = 0, locked = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
prev_sent = 0;
progress = 1;
+ locked = 0;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
continue;
}
+ savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
+ dst_host->host_id, cur_link->link_id, strerror(savederrno));
+ continue;
+ }
+ locked = 1;
+
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
break;
}
+ pthread_mutex_unlock(&cur_link->link_stats_mutex);
+ locked = 0;
}
out_unlock:
+ if (locked) {
+ pthread_mutex_unlock(&cur_link->link_stats_mutex);
+ }
errno = savederrno;
return err;
}