diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 581aa97..78b21aa 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -2,10 +2,15 @@ #include #include #include +#include +#include #include "common.h" #include "wakeup.h" +#include "incoming.h" +#include "outgoing.h" + #include "receive.h" #include "send.h" @@ -92,6 +97,8 @@ static bool parse_opts(int argc, char *argv[]) { } int main(int argc, char *argv[]) { + assert(!close(0)); + hex_init(); peer_init(); @@ -112,5 +119,8 @@ int main(int argc, char *argv[]) { wakeup_cleanup(); send_cleanup(); + incoming_cleanup(); + outgoing_cleanup(); + return EXIT_SUCCESS; } diff --git a/adsbus/common.c b/adsbus/common.c index a3f893c..702fbf6 100644 --- a/adsbus/common.c +++ b/adsbus/common.c @@ -1,26 +1,52 @@ +#define _GNU_SOURCE + #include #include #include #include #include +#include #include #include +#include #include #include "common.h" -static int epoll_fd; +static char server_id[UUID_LEN]; +static int peer_epoll_fd; +static int peer_cancel_fd; static bool peer_canceled = false; static void peer_cancel(int signal) { + assert(!close(peer_cancel_fd)); +} + +static void peer_cancel_handler(struct peer *peer) { + fprintf(stderr, "X %s: Shutting down\n", server_id); + assert(!close(peer->fd)); + free(peer); peer_canceled = true; } void peer_init() { + uuid_gen(server_id); + + peer_epoll_fd = epoll_create1(0); + assert(peer_epoll_fd >= 0); + + int cancel_fds[2]; + assert(!pipe2(cancel_fds, O_NONBLOCK)); + + struct peer *cancel_peer = malloc(sizeof(*cancel_peer)); + assert(cancel_peer); + cancel_peer->fd = cancel_fds[0]; + cancel_peer->event_handler = peer_cancel_handler; + peer_epoll_add(cancel_peer, EPOLLRDHUP); + + peer_cancel_fd = cancel_fds[1]; signal(SIGINT, peer_cancel); - epoll_fd = epoll_create1(0); - assert(epoll_fd >= 0); } void peer_epoll_add(struct peer *peer, uint32_t events) { @@ -30,28 +56,30 @@ void peer_epoll_add(struct peer *peer, uint32_t events) { .ptr = peer, }, }; - assert(!epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev)); + assert(!epoll_ctl(peer_epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev)); } void peer_epoll_del(struct peer *peer) { - assert(!epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL)); + assert(!epoll_ctl(peer_epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL)); } void peer_loop() { + fprintf(stderr, "X %s: Starting event loop\n", server_id); while (!peer_canceled) { #define MAX_EVENTS 10 struct epoll_event events[MAX_EVENTS]; - int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); - if (nfds < 0) { - perror("epoll_wait"); - break; + int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, -1); + if (nfds == -1 && errno == EINTR) { + continue; } + assert(nfds > 0); for (int n = 0; n < nfds; n++) { struct peer *peer = events[n].data.ptr; peer->event_handler(peer); } } + assert(!close(peer_epoll_fd)); } diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 0925a72..4c6114b 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -13,15 +13,19 @@ #include "common.h" #include "incoming.h" +struct incoming; struct incoming { struct peer peer; char id[UUID_LEN]; - const char *node; - const char *service; + char *node; + char *service; incoming_connection_handler handler; void *passthrough; + struct incoming *next; }; +static struct incoming *incoming_head = NULL; + static void incoming_handler(struct peer *peer) { struct incoming *incoming = (struct incoming *) peer; @@ -48,7 +52,23 @@ static void incoming_handler(struct peer *peer) { incoming->handler(fd, incoming->passthrough); } -void incoming_new(const char *node, const char *service, incoming_connection_handler handler, void *passthrough) { +static void incoming_del(struct incoming *incoming) { + assert(!close(incoming->peer.fd)); + free(incoming->node); + free(incoming->service); + free(incoming); +} + +void incoming_cleanup() { + struct incoming *iter = incoming_head; + while (iter) { + struct incoming *next = iter->next; + incoming_del(iter); + iter = next; + } +} + +void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough) { struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; uuid_gen(incoming->id); @@ -69,7 +89,7 @@ void incoming_new(const char *node, const char *service, incoming_connection_han int gai_err = getaddrinfo(incoming->node, incoming->service, &hints, &addrs); if (gai_err) { fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(gai_err)); - // TODO: add incoming_del, free strdup values + // TODO: retry free(incoming); return; } @@ -88,7 +108,7 @@ void incoming_new(const char *node, const char *service, incoming_connection_han if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); - close(incoming->peer.fd); + assert(!close(incoming->peer.fd)); continue; } @@ -100,10 +120,13 @@ void incoming_new(const char *node, const char *service, incoming_connection_han if (addr == NULL) { fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service); - // TODO: use incoming_del + // TODO: retry free(incoming); return; } peer_epoll_add((struct peer *) incoming, EPOLLIN); + + incoming->next = incoming_head; + incoming_head = incoming; } diff --git a/adsbus/incoming.h b/adsbus/incoming.h index 37e9ad3..1b3286e 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -1,4 +1,5 @@ #pragma once +void incoming_cleanup(); typedef void (*incoming_connection_handler)(int fd, void *); -void incoming_new(const char *, const char *, incoming_connection_handler, void *); +void incoming_new(char *, char *, incoming_connection_handler, void *); diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 7da278b..d728dbb 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -10,17 +11,21 @@ #include "common.h" #include "outgoing.h" +struct outgoing; struct outgoing { struct peer peer; char id[UUID_LEN]; - const char *node; - const char *service; + char *node; + char *service; struct addrinfo *addrs; struct addrinfo *addr; outgoing_connection_handler handler; void *passthrough; + struct outgoing *next; }; +static struct outgoing *outgoing_head = NULL; + static void outgoing_connect_result(struct outgoing *, int); static void outgoing_resolve(struct outgoing *); @@ -56,7 +61,7 @@ static void outgoing_connect_handler(struct peer *peer) { static void outgoing_disconnect_handler(struct peer *peer) { struct outgoing *outgoing = (struct outgoing *) peer; - close(outgoing->peer.fd); + assert(!close(outgoing->peer.fd)); fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id); outgoing_resolve(outgoing); @@ -84,7 +89,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { default: fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result)); - close(outgoing->peer.fd); + assert(!close(outgoing->peer.fd)); outgoing->addr = outgoing->addr->ai_next; // Tail recursion :/ outgoing_connect_next(outgoing); @@ -110,12 +115,32 @@ static void outgoing_resolve(struct outgoing *outgoing) { outgoing_connect_next(outgoing); } -void outgoing_new(const char *node, const char *service, outgoing_connection_handler handler, void *passthrough) { +static void outgoing_del(struct outgoing *outgoing) { + assert(!close(outgoing->peer.fd)); + free(outgoing->node); + free(outgoing->service); + free(outgoing); +} + +void outgoing_cleanup() { + struct outgoing *iter = outgoing_head; + while (iter) { + struct outgoing *next = iter->next; + outgoing_del(iter); + iter = next; + } +} + +void outgoing_new(char *node, char *service, outgoing_connection_handler handler, void *passthrough) { struct outgoing *outgoing = malloc(sizeof(*outgoing)); uuid_gen(outgoing->id); outgoing->node = strdup(node); outgoing->service = strdup(service); outgoing->handler = handler; outgoing->passthrough = passthrough; + + outgoing->next = outgoing_head; + outgoing_head = outgoing; + outgoing_resolve(outgoing); } diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index 847be8c..7924bcc 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -1,4 +1,5 @@ #pragma once +void outgoing_cleanup(); typedef void (*outgoing_connection_handler)(int fd, void *); -void outgoing_new(const char *, const char *, outgoing_connection_handler, void *); +void outgoing_new(char *, char *, outgoing_connection_handler, void *); diff --git a/adsbus/receive.c b/adsbus/receive.c index d2152c3..9a5c041 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -56,12 +57,17 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac return false; } +static void receive_del(struct receive *receive) { + assert(!close(receive->peer.fd)); + free(receive); +} + static void receive_read(struct peer *peer) { struct receive *receive = (struct receive *) peer; if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { fprintf(stderr, "R %s: Connection closed by peer\n", receive->id); - close(receive->peer.fd); + receive_del(receive); return; } @@ -72,7 +78,7 @@ static void receive_read(struct peer *peer) { if (receive->buf.length == BUF_LEN_MAX) { fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id); - close(receive->peer.fd); + receive_del(receive); return; } } diff --git a/adsbus/wakeup.c b/adsbus/wakeup.c index 8cf21e7..e40909e 100644 --- a/adsbus/wakeup.c +++ b/adsbus/wakeup.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -22,15 +23,19 @@ static void *wakeup_main(void *arg) { }; assert(!epoll_ctl(epoll_fd, EPOLL_CTL_ADD, read_fd, &ev)); + while (1) { #define MAX_EVENTS 10 - struct epoll_event events[MAX_EVENTS]; - int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); - if (nfds < 0) { - perror("epoll_wait"); + struct epoll_event events[MAX_EVENTS]; + int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + if (nfds == -1 && errno == EINTR) { + continue; + } + assert(nfds >= 0); + break; // XXX } - close(read_fd); - close(epoll_fd); + assert(!close(read_fd)); + assert(!close(epoll_fd)); return NULL; } @@ -42,7 +47,7 @@ void wakeup_init() { } void wakeup_cleanup() { - close(wakeup_write_fd); + assert(!close(wakeup_write_fd)); assert(!pthread_join(wakeup_thread, NULL)); }