From 5e604f05968e7ab3d3676dc232ade65902eea7c1 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Thu, 25 Feb 2016 16:17:25 -0800 Subject: [PATCH] In/out reference counting. --- adsbus/incoming.c | 7 ++++++- adsbus/incoming.h | 4 +++- adsbus/opts.c | 16 ++++++++-------- adsbus/outgoing.c | 7 ++++++- adsbus/outgoing.h | 2 +- adsbus/peer.c | 2 ++ adsbus/peer.h | 3 +++ adsbus/receive.c | 3 +++ adsbus/send.c | 3 +++ 9 files changed, 35 insertions(+), 12 deletions(-) diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 7906e29..a6d68c5 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -27,6 +27,7 @@ struct incoming { uint32_t attempt; incoming_connection_handler handler; void *passthrough; + uint32_t *count; struct incoming *next; }; @@ -68,6 +69,7 @@ static void incoming_handler(struct peer *peer) { } static void incoming_del(struct incoming *incoming) { + (*incoming->count)--; if (incoming->peer.fd >= 0) { assert(!close(incoming->peer.fd)); } @@ -142,7 +144,9 @@ void incoming_cleanup() { } } -void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough) { +void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough, uint32_t *count) { + (*count)++; + struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; uuid_gen(incoming->id); @@ -151,6 +155,7 @@ void incoming_new(char *node, char *service, incoming_connection_handler handler incoming->attempt = 0; incoming->handler = handler; incoming->passthrough = passthrough; + incoming->count = count; incoming->next = incoming_head; incoming_head = incoming; diff --git a/adsbus/incoming.h b/adsbus/incoming.h index cd5c32a..3ef2879 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -1,7 +1,9 @@ #pragma once +#include + struct peer; void incoming_cleanup(); typedef void (*incoming_connection_handler)(int fd, void *, struct peer *); -void incoming_new(char *, char *, incoming_connection_handler, void *); +void incoming_new(char *, char *, incoming_connection_handler, void *, uint32_t *); diff --git a/adsbus/opts.c b/adsbus/opts.c index 5d2a627..389ca73 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -4,7 +4,7 @@ #include "incoming.h" #include "outgoing.h" - +#include "peer.h" #include "receive.h" #include "send.h" @@ -20,13 +20,13 @@ static char *opts_split(char **arg, char delim) { return ret; } -static void opts_add_listen(char *host_port, incoming_connection_handler handler, void *passthrough) { +static void opts_add_listen(char *host_port, incoming_connection_handler handler, void *passthrough, uint32_t *count) { char *host = opts_split(&host_port, '/'); if (host) { - incoming_new(host, host_port, handler, passthrough); + incoming_new(host, host_port, handler, passthrough, count); free(host); } else { - incoming_new(NULL, host_port, handler, passthrough); + incoming_new(NULL, host_port, handler, passthrough, count); } } @@ -51,7 +51,7 @@ bool opts_add_connect_receive(char *arg) { return false; } - outgoing_new(host, arg, receive_new, NULL); + outgoing_new(host, arg, receive_new, NULL, &peer_count_in); free(host); return true; } @@ -67,13 +67,13 @@ bool opts_add_connect_send(char *arg) { return false; } - outgoing_new(host, arg, send_new_wrapper, serializer); + outgoing_new(host, arg, send_new_wrapper, serializer, &peer_count_out); free(host); return true; } bool opts_add_listen_receive(char *arg) { - opts_add_listen(arg, receive_new, NULL); + opts_add_listen(arg, receive_new, NULL, &peer_count_in); return true; } @@ -83,7 +83,7 @@ bool opts_add_listen_send(char *arg) { return false; } - opts_add_listen(arg, send_new_wrapper, serializer); + opts_add_listen(arg, send_new_wrapper, serializer, &peer_count_out); return true; } diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 32a899d..2495ff1 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -26,6 +26,7 @@ struct outgoing { uint32_t attempt; outgoing_connection_handler handler; void *passthrough; + uint32_t *count; struct outgoing *next; }; @@ -134,6 +135,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) { } static void outgoing_del(struct outgoing *outgoing) { + (*outgoing->count)--; if (outgoing->peer.fd >= 0) { assert(!close(outgoing->peer.fd)); } @@ -151,7 +153,9 @@ void outgoing_cleanup() { } } -void outgoing_new(char *node, char *service, outgoing_connection_handler handler, void *passthrough) { +void outgoing_new(char *node, char *service, outgoing_connection_handler handler, void *passthrough, uint32_t *count) { + (*count)++; + struct outgoing *outgoing = malloc(sizeof(*outgoing)); uuid_gen(outgoing->id); outgoing->node = strdup(node); @@ -159,6 +163,7 @@ void outgoing_new(char *node, char *service, outgoing_connection_handler handler outgoing->attempt = 0; outgoing->handler = handler; outgoing->passthrough = passthrough; + outgoing->count = count; outgoing->next = outgoing_head; outgoing_head = outgoing; diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index f234b95..777354e 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -4,4 +4,4 @@ struct peer; void outgoing_cleanup(); typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *); -void outgoing_new(char *, char *, outgoing_connection_handler, void *); +void outgoing_new(char *, char *, outgoing_connection_handler, void *, uint32_t *); diff --git a/adsbus/peer.c b/adsbus/peer.c index 0a66975..d79e0eb 100644 --- a/adsbus/peer.c +++ b/adsbus/peer.c @@ -16,6 +16,8 @@ #include "peer.h" +uint32_t peer_count_in = 0, peer_count_out = 0; + static int peer_epoll_fd; static int peer_shutdown_fd; static bool peer_shutdown_flag = false; diff --git a/adsbus/peer.h b/adsbus/peer.h index 1c8ac04..72f2fee 100644 --- a/adsbus/peer.h +++ b/adsbus/peer.h @@ -9,6 +9,9 @@ struct peer { int fd; peer_event_handler event_handler; }; + +extern uint32_t peer_count_in, peer_count_out; + void peer_init(); void peer_cleanup(); void peer_shutdown(); diff --git a/adsbus/receive.c b/adsbus/receive.c index b39212f..3e7e558 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -83,6 +83,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac static void receive_del(struct receive *receive) { fprintf(stderr, "R %s: Connection closed\n", receive->id); + peer_count_out--; peer_epoll_del((struct peer *) receive); assert(!close(receive->peer.fd)); if (receive->prev) { @@ -129,6 +130,8 @@ void receive_cleanup() { } void receive_new(int fd, void *unused, struct peer *on_close) { + peer_count_in++; + int res = shutdown(fd, SHUT_WR); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); diff --git a/adsbus/send.c b/adsbus/send.c index c9e98e3..7034d09 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -64,6 +64,7 @@ struct serializer { static void send_del(struct send *send) { fprintf(stderr, "S %s (%s): Connection closed\n", send->id, send->serializer->name); + peer_count_out--; peer_epoll_del((struct peer *) send); assert(!close(send->peer.fd)); if (send->prev) { @@ -116,6 +117,8 @@ struct serializer *send_get_serializer(char *name) { } void send_new(int fd, struct serializer *serializer, struct peer *on_close) { + peer_count_out++; + struct send *send = malloc(sizeof(*send)); assert(send);