diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 0ee3dbe..e15f853 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -114,7 +114,7 @@ int main(int argc, char *argv[]) { stats_init(); if (!parse_opts(argc, argv)) { - return EXIT_FAILURE; + peer_shutdown(); } assert(!close(1)); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index a69b384..7906e29 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -64,7 +64,7 @@ static void incoming_handler(struct peer *peer) { local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); - incoming->handler(fd, incoming->passthrough); + incoming->handler(fd, incoming->passthrough, NULL); } static void incoming_del(struct incoming *incoming) { diff --git a/adsbus/incoming.h b/adsbus/incoming.h index 1b3286e..cd5c32a 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -1,5 +1,7 @@ #pragma once +struct peer; + void incoming_cleanup(); -typedef void (*incoming_connection_handler)(int fd, void *); +typedef void (*incoming_connection_handler)(int fd, void *, struct peer *); void incoming_new(char *, char *, incoming_connection_handler, void *); diff --git a/adsbus/opts.c b/adsbus/opts.c index 86a678d..86ba9af 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -50,7 +50,7 @@ bool opts_add_dump(char *arg) { if (!serializer) { return false; } - send_add(dup(1), serializer); + send_new(dup(1), serializer, NULL); return true; } @@ -76,7 +76,7 @@ bool opts_add_connect_send(char *arg) { return false; } - incoming_new(host, arg, send_add_wrapper, serializer); + incoming_new(host, arg, send_new_wrapper, serializer); free(host); return true; } @@ -92,6 +92,6 @@ bool opts_add_listen_send(char *arg) { return false; } - opts_add_listen(arg, send_add_wrapper, serializer); + opts_add_listen(arg, send_new_wrapper, serializer); return true; } diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index fdca8d3..6ea7669 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -74,7 +74,9 @@ static void outgoing_connect_handler(struct peer *peer) { static void outgoing_disconnect_handler(struct peer *peer) { struct outgoing *outgoing = (struct outgoing *) peer; - assert(!close(outgoing->peer.fd)); + if (outgoing->peer.fd != -1) { + assert(!close(outgoing->peer.fd)); + } fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id); outgoing_resolve(outgoing); @@ -88,12 +90,10 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); freeaddrinfo(outgoing->addrs); outgoing->attempt = 0; - - // We listen for just hangup on this fd. We pass a duplicate to the handler, so our epoll setings are independent. + int fd = outgoing->peer.fd; + outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_disconnect_handler; - peer_epoll_add((struct peer *) outgoing, EPOLLRDHUP); - - outgoing->handler(dup(outgoing->peer.fd), outgoing->passthrough); + outgoing->handler(fd, outgoing->passthrough, (struct peer *) outgoing); break; case EINPROGRESS: diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index 7924bcc..f234b95 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -1,5 +1,7 @@ #pragma once +struct peer; + void outgoing_cleanup(); -typedef void (*outgoing_connection_handler)(int fd, void *); +typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *); void outgoing_new(char *, char *, outgoing_connection_handler, void *); diff --git a/adsbus/peer.c b/adsbus/peer.c index 2e9e597..0a66975 100644 --- a/adsbus/peer.c +++ b/adsbus/peer.c @@ -17,41 +17,41 @@ #include "peer.h" static int peer_epoll_fd; -static int peer_cancel_fd; -static bool peer_canceled = false; +static int peer_shutdown_fd; +static bool peer_shutdown_flag = false; -static void peer_cancel(int signal) { - assert(!close(peer_cancel_fd)); -} - -static void peer_cancel_handler(struct peer *peer) { +static void peer_shutdown_handler(struct peer *peer) { fprintf(stderr, "X %s: Shutting down\n", server_id); assert(!close(peer->fd)); free(peer); - peer_canceled = true; + peer_shutdown_flag = true; } void peer_init() { peer_epoll_fd = epoll_create1(EPOLL_CLOEXEC); assert(peer_epoll_fd >= 0); - int cancel_fds[2]; - assert(!pipe2(cancel_fds, O_CLOEXEC)); + int shutdown_fds[2]; + assert(!pipe2(shutdown_fds, O_CLOEXEC)); - struct peer *cancel_peer = malloc(sizeof(*cancel_peer)); - assert(cancel_peer); - cancel_peer->fd = cancel_fds[0]; - cancel_peer->event_handler = peer_cancel_handler; - peer_epoll_add(cancel_peer, EPOLLRDHUP); + struct peer *shutdown_peer = malloc(sizeof(*shutdown_peer)); + assert(shutdown_peer); + shutdown_peer->fd = shutdown_fds[0]; + shutdown_peer->event_handler = peer_shutdown_handler; + peer_epoll_add(shutdown_peer, EPOLLRDHUP); - peer_cancel_fd = cancel_fds[1]; - signal(SIGINT, peer_cancel); + peer_shutdown_fd = shutdown_fds[1]; + signal(SIGINT, peer_shutdown); } void peer_cleanup() { assert(!close(peer_epoll_fd)); } +void peer_shutdown(int signal) { + assert(!close(peer_shutdown_fd)); +} + void peer_epoll_add(struct peer *peer, uint32_t events) { struct epoll_event ev = { .events = events, @@ -66,9 +66,16 @@ void peer_epoll_del(struct peer *peer) { assert(!epoll_ctl(peer_epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL)); } +void peer_call(struct peer *peer) { + if (peer_shutdown_flag || !peer) { + return; + } + peer->event_handler(peer); +} + void peer_loop() { fprintf(stderr, "X %s: Starting event loop\n", server_id); - while (!peer_canceled) { + while (!peer_shutdown_flag) { #define MAX_EVENTS 10 struct epoll_event events[MAX_EVENTS]; int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, wakeup_get_delay()); diff --git a/adsbus/peer.h b/adsbus/peer.h index 976084e..1c8ac04 100644 --- a/adsbus/peer.h +++ b/adsbus/peer.h @@ -11,6 +11,8 @@ struct peer { }; void peer_init(); void peer_cleanup(); +void peer_shutdown(); void peer_epoll_add(struct peer *, uint32_t); void peer_epoll_del(struct peer *); +void peer_call(struct peer *); void peer_loop(); diff --git a/adsbus/receive.c b/adsbus/receive.c index 6e33115..391c276 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -22,6 +22,7 @@ typedef bool (*parser_wrapper)(struct receive *, struct packet *); typedef bool (*parser)(struct buf *, struct packet *, void *state); struct receive { struct peer peer; + struct peer *on_close; char id[UUID_LEN]; struct buf buf; char parser_state[PARSER_STATE_LEN]; @@ -79,6 +80,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac } static void receive_del(struct receive *receive) { + fprintf(stderr, "R %s: Connection closed\n", receive->id); peer_epoll_del((struct peer *) receive); assert(!close(receive->peer.fd)); if (receive->prev) { @@ -89,6 +91,7 @@ static void receive_del(struct receive *receive) { if (receive->next) { receive->next->prev = receive->prev; } + peer_call(receive->on_close); free(receive); } @@ -96,7 +99,6 @@ static void receive_read(struct peer *peer) { struct receive *receive = (struct receive *) peer; if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { - fprintf(stderr, "R %s: Connection closed by peer\n", receive->id); receive_del(receive); return; } @@ -124,18 +126,20 @@ void receive_cleanup() { } } -void receive_new(int fd, void *unused) { +void receive_new(int fd, void *unused, struct peer *on_close) { struct receive *receive = malloc(sizeof(*receive)); assert(receive); - uuid_gen(receive->id); + receive->peer.fd = fd; + receive->peer.event_handler = receive_read; + receive->on_close = on_close; + uuid_gen(receive->id); buf_init(&receive->buf); memset(receive->parser_state, 0, PARSER_STATE_LEN); receive->parser_wrapper = receive_autodetect_parse; receive->prev = NULL; receive->next = receive_head; receive_head = receive; - receive->peer.event_handler = receive_read; peer_epoll_add((struct peer *) receive, EPOLLIN); fprintf(stderr, "R %s: New receive connection\n", receive->id); diff --git a/adsbus/receive.h b/adsbus/receive.h index 236a340..ce83604 100644 --- a/adsbus/receive.h +++ b/adsbus/receive.h @@ -4,6 +4,8 @@ #define PARSER_STATE_LEN 256 +struct peer; + void receive_cleanup(); -void receive_new(int, void *); +void receive_new(int, void *, struct peer *); void receive_print_usage(); diff --git a/adsbus/resolve.c b/adsbus/resolve.c index fa69c34..7f8bbf1 100644 --- a/adsbus/resolve.c +++ b/adsbus/resolve.c @@ -33,9 +33,8 @@ static void resolve_handler(struct peer *peer) { assert(!close(resolve_peer->peer.fd)); - struct peer *inner_peer = resolve_peer->inner_peer; + peer_call(resolve_peer->inner_peer); free(resolve_peer); - inner_peer->event_handler(inner_peer); } static void *resolve_main(void *arg) { diff --git a/adsbus/send.c b/adsbus/send.c index 97e8853..71c1415 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -20,6 +20,7 @@ struct send { struct peer peer; + struct peer *on_close; char id[UUID_LEN]; struct serializer *serializer; struct send *prev; @@ -59,8 +60,10 @@ struct serializer { }; #define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) -static void send_hangup(struct send *send) { - fprintf(stderr, "S %s (%s): Peer disconnected\n", send->id, send->serializer->name); +static void send_del(struct send *send) { + fprintf(stderr, "S %s (%s): Connection closed\n", send->id, send->serializer->name); + peer_epoll_del((struct peer *) send); + assert(!close(send->peer.fd)); if (send->prev) { send->prev->next = send->next; } else { @@ -69,13 +72,12 @@ static void send_hangup(struct send *send) { if (send->next) { send->next->prev = send->prev; } - peer_epoll_del((struct peer *) send); - assert(!close(send->peer.fd)); + peer_call(send->on_close); free(send); } -static void send_hangup_wrapper(struct peer *peer) { - send_hangup((struct send *) peer); +static void send_del_wrapper(struct peer *peer) { + send_del((struct send *) peer); } static bool send_hello(int fd, struct serializer *serializer) { @@ -97,13 +99,8 @@ void send_init() { void send_cleanup() { for (int i = 0; i < NUM_SERIALIZERS; i++) { struct serializer *serializer = &serializers[i]; - struct send *send = serializer->send_head; - while (send) { - struct send *next = send->next; - peer_epoll_del((struct peer *) send); - assert(!close(send->peer.fd)); - free(send); - send = next; + while (serializer->send_head) { + send_del(serializer->send_head); } } } @@ -117,32 +114,32 @@ struct serializer *send_get_serializer(char *name) { return NULL; } -void send_add(int fd, struct serializer *serializer) { - if (!send_hello(fd, serializer)) { - fprintf(stderr, "S xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello\n"); - assert(!close(fd)); - return; - } - +void send_new(int fd, struct serializer *serializer, struct peer *on_close) { struct send *send = malloc(sizeof(*send)); assert(send); send->peer.fd = fd; - send->peer.event_handler = send_hangup_wrapper; - + send->peer.event_handler = send_del_wrapper; + send->on_close = on_close; uuid_gen(send->id); send->serializer = serializer; send->prev = NULL; send->next = serializer->send_head; serializer->send_head = send; - peer_epoll_add((struct peer *) send, EPOLLRDHUP); + peer_epoll_add((struct peer *) send, 0); fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name); + + if (!send_hello(fd, serializer)) { + fprintf(stderr, "S %s: Failed to write hello\n", send->id); + send_del(send); + return; + } } -void send_add_wrapper(int fd, void *passthrough) { - send_add(fd, (struct serializer *) passthrough); +void send_new_wrapper(int fd, void *passthrough, struct peer *on_close) { + send_new(fd, (struct serializer *) passthrough, on_close); } void send_write(struct packet *packet) { @@ -162,7 +159,7 @@ void send_write(struct packet *packet) { send = send->next; } else { struct send *next = send->next; - send_hangup(send); + send_del(send); send = next; } } diff --git a/adsbus/send.h b/adsbus/send.h index 3058ac9..69c0612 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -1,11 +1,12 @@ #pragma once struct packet; +struct peer; void send_init(); void send_cleanup(); struct serializer *send_get_serializer(char *); -void send_add(int, struct serializer *); -void send_add_wrapper(int, void *); +void send_new(int, struct serializer *, struct peer *); +void send_new_wrapper(int, void *, struct peer *); void send_write(struct packet *); void send_print_usage(); diff --git a/adsbus/wakeup.c b/adsbus/wakeup.c index 9fc9b7e..50c6f51 100644 --- a/adsbus/wakeup.c +++ b/adsbus/wakeup.c @@ -54,11 +54,10 @@ int wakeup_get_delay() { void wakeup_dispatch() { uint64_t now = wakeup_get_time_ms(); while (head && head->absolute_time_ms <= now) { - struct peer *peer = head->peer; + peer_call(head->peer); struct wakeup_entry *next = head->next; free(head); head = next; - peer->event_handler(peer); } }