From e14b44446650e24927e8b475ab36bf8ff7c16ea5 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 17 Feb 2016 15:41:27 -0800 Subject: [PATCH] Make epoll_fd global as a concession to clarity. Handle client disconnection even when silent. --- adsbus.c | 42 +++++++++-------------------------------- backend.c | 41 ++++++++++++++++++++-------------------- backend.h | 4 ++-- client.c | 55 +++++++++++++++++++++++++++++++++++++----------------- client.h | 2 +- common.c | 27 ++++++++++++++++++++++++--- common.h | 10 +++++++--- incoming.c | 9 ++++----- incoming.h | 4 ++-- 9 files changed, 107 insertions(+), 87 deletions(-) diff --git a/adsbus.c b/adsbus.c index f26964c..67d5fd2 100644 --- a/adsbus.c +++ b/adsbus.c @@ -2,7 +2,6 @@ #include #include #include -#include #include #include @@ -40,7 +39,7 @@ static void print_usage(char *argv[]) { , argv[0]); } -static bool parse_opts(int argc, char *argv[], int epoll_fd) { +static bool parse_opts(int argc, char *argv[]) { static struct option long_options[] = { {"backend", required_argument, 0, 'b'}, {"dump", required_argument, 0, 'd'}, @@ -63,7 +62,7 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) { *delim1 = '\0'; delim1++; - backend_new(optarg, delim1, epoll_fd); + backend_new(optarg, delim1); break; case 'd': @@ -79,11 +78,11 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) { case 'i': delim1 = strrchr(optarg, '/'); if (delim1 == NULL) { - incoming_new(NULL, optarg, epoll_fd, backend_new_fd, NULL); + incoming_new(NULL, optarg, backend_new_fd, NULL); } else { *delim1 = '\0'; delim1++; - incoming_new(optarg, delim1, epoll_fd, backend_new_fd, NULL); + incoming_new(optarg, delim1, backend_new_fd, NULL); } break; @@ -103,11 +102,11 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) { delim2 = strrchr(delim1, '/'); if (delim2 == NULL) { - incoming_new(NULL, delim1, epoll_fd, client_new_fd, serializer); + incoming_new(NULL, delim1, client_add_wrapper, serializer); } else { *delim2 = '\0'; delim2++; - incoming_new(delim1, delim2, epoll_fd, client_new_fd, serializer); + incoming_new(delim1, delim2, client_add_wrapper, serializer); } break; @@ -126,44 +125,21 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) { return true; } -static int loop(int epoll_fd) { - while (1) { -#define MAX_EVENTS 10 - struct epoll_event events[MAX_EVENTS]; - int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); - if (nfds == -1) { - perror("epoll_wait"); - return -1; - } - - for (int n = 0; n < nfds; n++) { - struct peer *peer = events[n].data.ptr; - peer->event_handler(peer, epoll_fd); - } - } -} - int main(int argc, char *argv[]) { signal(SIGPIPE, SIG_IGN); server_init(); + peer_init(); hex_init(); airspy_adsb_init(); beast_init(); json_init(); stats_init(); - int epoll_fd = epoll_create1(0); - if (epoll_fd == -1) { - perror("epoll_create1"); - return EXIT_FAILURE; - } - - if (!parse_opts(argc, argv, epoll_fd)) { + if (!parse_opts(argc, argv)) { return EXIT_FAILURE; } - loop(epoll_fd); - close(epoll_fd); + peer_loop(); return EXIT_SUCCESS; } diff --git a/backend.c b/backend.c index 009b4cb..2d001cd 100644 --- a/backend.c +++ b/backend.c @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -16,11 +15,11 @@ static bool backend_autodetect_parse(struct backend *, struct packet *); -static void backend_connect(struct backend *, int); -static void backend_connect_next(struct backend *, int); +static void backend_connect(struct backend *); +static void backend_connect_next(struct backend *); -static void backend_connect_handler(struct peer *, int); -static void backend_read(struct peer *, int); +static void backend_connect_handler(struct peer *); +static void backend_read(struct peer *); struct parser { @@ -56,23 +55,23 @@ static struct backend *backend_create() { return backend; } -void backend_new(char *node, char *service, int epoll_fd) { +void backend_new(char *node, char *service) { struct backend *backend = backend_create(); backend->node = node; backend->service = service; - backend_connect(backend, epoll_fd); + backend_connect(backend); } -void backend_new_fd(int fd, int epoll_fd, void *unused) { +void backend_new_fd(int fd, void *unused) { struct backend *backend = backend_create(); backend->peer.fd = fd; backend->peer.event_handler = backend_read; - peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN); + peer_epoll_add((struct peer *) backend, EPOLLIN); fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id); } -static void backend_connect(struct backend *backend, int epoll_fd) { +static void backend_connect(struct backend *backend) { fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service); struct addrinfo hints = { @@ -86,10 +85,10 @@ static void backend_connect(struct backend *backend, int epoll_fd) { return; } backend->addr = backend->addrs; - backend_connect_next(backend, epoll_fd); + backend_connect_next(backend); } -static void backend_connect_result(struct backend *backend, int epoll_fd, int result) { +static void backend_connect_result(struct backend *backend, int result) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); switch (result) { @@ -97,12 +96,12 @@ static void backend_connect_result(struct backend *backend, int epoll_fd, int re fprintf(stderr, "B %s: Connected to %s/%s\n", backend->id, hbuf, sbuf); freeaddrinfo(backend->addrs); backend->peer.event_handler = backend_read; - peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN); + peer_epoll_add((struct peer *) backend, EPOLLIN); break; case EINPROGRESS: backend->peer.event_handler = backend_connect_handler; - peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLOUT); + peer_epoll_add((struct peer *) backend, EPOLLOUT); break; default: @@ -110,12 +109,12 @@ static void backend_connect_result(struct backend *backend, int epoll_fd, int re close(backend->peer.fd); backend->addr = backend->addr->ai_next; // Tail recursion :/ - backend_connect_next(backend, epoll_fd); + backend_connect_next(backend); break; } } -static void backend_connect_next(struct backend *backend, int epoll_fd) { +static void backend_connect_next(struct backend *backend) { if (backend->addr == NULL) { freeaddrinfo(backend->addrs); fprintf(stderr, "B %s: Can't connect to any addresses of %s/%s\n", backend->id, backend->node, backend->service); @@ -130,21 +129,21 @@ static void backend_connect_next(struct backend *backend, int epoll_fd) { assert(backend->peer.fd >= 0); int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen); - backend_connect_result(backend, epoll_fd, result == 0 ? result : errno); + backend_connect_result(backend, result == 0 ? result : errno); } -static void backend_connect_handler(struct peer *peer, int epoll_fd) { +static void backend_connect_handler(struct peer *peer) { struct backend *backend = (struct backend *) peer; - peer_epoll_del(peer, epoll_fd); + peer_epoll_del(peer); int error; socklen_t len = sizeof(error); assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); - backend_connect_result(backend, epoll_fd, error); + backend_connect_result(backend, error); } -static void backend_read(struct peer *peer, int epoll_fd) { +static void backend_read(struct peer *peer) { struct backend *backend = (struct backend *) peer; if (buf_fill(&backend->buf, backend->peer.fd) <= 0) { diff --git a/backend.h b/backend.h index 3cfeadf..39758ec 100644 --- a/backend.h +++ b/backend.h @@ -21,5 +21,5 @@ struct backend { parser parser; }; -void backend_new(char *, char *, int); -void backend_new_fd(int, int, void *); +void backend_new(char *, char *); +void backend_new_fd(int, void *); diff --git a/client.c b/client.c index fa269b0..c6fc41e 100644 --- a/client.c +++ b/client.c @@ -6,13 +6,17 @@ #include #include +#include "common.h" #include "json.h" #include "stats.h" #include "client.h" + struct client { + struct peer peer; char id[UUID_LEN]; - int fd; + struct serializer *serializer; + struct client *prev; struct client *next; }; @@ -34,6 +38,24 @@ struct serializer { #define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) +static void client_hangup(struct client *client) { + fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, client->serializer->name); + if (client->prev) { + client->prev->next = client->next; + } else { + client->serializer->client_head = client->next; + } + if (client->next) { + client->next->prev = client->prev; + } + close(client->peer.fd); + free(client); +} + +static void client_hangup_wrapper(struct peer *peer) { + client_hangup((struct client *) peer); +} + struct serializer *client_get_serializer(char *name) { for (int i = 0; i < NUM_SERIALIZERS; i++) { if (strcasecmp(serializers[i].name, name) == 0) { @@ -69,17 +91,23 @@ void client_add(int fd, struct serializer *serializer) { struct client *client = malloc(sizeof(*client)); assert(client); + client->peer.fd = fd; + client->peer.event_handler = client_hangup_wrapper; + uuid_gen(client->id); - client->fd = fd; + client->serializer = serializer; + client->prev = NULL; client->next = serializer->client_head; serializer->client_head = client; + // Only listen for hangup + peer_epoll_add((struct peer *) client, EPOLLIN); + fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name); } -void client_new_fd(int fd, int epoll_fd, void *passthrough) { - struct serializer *serializer = (struct serializer *) passthrough; - client_add(fd, serializer); +void client_add_wrapper(int fd, void *passthrough) { + client_add(fd, (struct serializer *) passthrough); } void client_write(struct packet *packet) { @@ -93,21 +121,14 @@ void client_write(struct packet *packet) { if (buf.length == 0) { continue; } - struct client *client = serializer->client_head, *prev_client = NULL; + struct client *client = serializer->client_head; while (client) { - if (write(client->fd, buf_at(&buf, 0), buf.length) == buf.length) { - prev_client = client; + if (write(client->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) { client = client->next; } else { - fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, serializer->name); - if (prev_client) { - prev_client->next = client->next; - } else { - serializer->client_head = client->next; - } - struct client *del = client; - client = client->next; - free(del); + struct client *next = client->next; + client_hangup(client); + client = next; } } } diff --git a/client.h b/client.h index dca6b2a..86eb521 100644 --- a/client.h +++ b/client.h @@ -5,5 +5,5 @@ struct serializer *client_get_serializer(char *); void client_add(int, struct serializer *); -void client_new_fd(int, int, void *); +void client_add_wrapper(int, void *); void client_write(struct packet *); diff --git a/common.c b/common.c index 5ffd0f7..e7a10c5 100644 --- a/common.c +++ b/common.c @@ -3,13 +3,20 @@ #include #include #include -#include #include #include "common.h" -void peer_epoll_add(struct peer *peer, int epoll_fd, uint32_t events) { +int epoll_fd; + + +void peer_init() { + epoll_fd = epoll_create1(0); + assert(epoll_fd >= 0); +} + +void peer_epoll_add(struct peer *peer, uint32_t events) { struct epoll_event ev = { .events = events, .data = { @@ -19,10 +26,24 @@ void peer_epoll_add(struct peer *peer, int epoll_fd, uint32_t events) { assert(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev) == 0); } -void peer_epoll_del(struct peer *peer, int epoll_fd) { +void peer_epoll_del(struct peer *peer) { assert(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL) == 0); } +void peer_loop() { + while (1) { +#define MAX_EVENTS 10 + struct epoll_event events[MAX_EVENTS]; + int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + assert(nfds >= 0); + + for (int n = 0; n < nfds; n++) { + struct peer *peer = events[n].data.ptr; + peer->event_handler(peer); + } + } +} + void buf_init(struct buf *buf) { buf->start = 0; diff --git a/common.h b/common.h index dd19bd7..642a786 100644 --- a/common.h +++ b/common.h @@ -2,19 +2,23 @@ #include #include +#include //////// peer // All specific peer structs must be castable to this. struct peer; -typedef void (*peer_event_handler)(struct peer *, int epoll_fd); +typedef void (*peer_event_handler)(struct peer *); struct peer { int fd; peer_event_handler event_handler; }; -void peer_epoll_add(struct peer *, int, uint32_t); -void peer_epoll_del(struct peer *, int); +extern int epoll_fd; +void peer_init(); +void peer_epoll_add(struct peer *, uint32_t); +void peer_epoll_del(struct peer *); +void peer_loop(); //////// buf diff --git a/incoming.c b/incoming.c index 8a73d97..f601df4 100644 --- a/incoming.c +++ b/incoming.c @@ -6,7 +6,6 @@ #include #include #include -#include #include "common.h" #include "incoming.h" @@ -22,7 +21,7 @@ struct incoming { }; -static void incoming_handler(struct peer *peer, int epoll_fd) { +static void incoming_handler(struct peer *peer) { struct incoming *incoming = (struct incoming *) peer; struct sockaddr peer_addr, local_addr; @@ -45,10 +44,10 @@ static void incoming_handler(struct peer *peer, int epoll_fd) { local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); - incoming->handler(fd, epoll_fd, incoming->passthrough); + incoming->handler(fd, incoming->passthrough); } -void incoming_new(char *node, char *service, int epoll_fd, incoming_connection_handler handler, void *passthrough) { +void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough) { struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; uuid_gen(incoming->id); @@ -103,5 +102,5 @@ void incoming_new(char *node, char *service, int epoll_fd, incoming_connection_h return; } - peer_epoll_add((struct peer *) incoming, epoll_fd, EPOLLIN); + peer_epoll_add((struct peer *) incoming, EPOLLIN); } diff --git a/incoming.h b/incoming.h index eea923f..e82786a 100644 --- a/incoming.h +++ b/incoming.h @@ -1,4 +1,4 @@ #pragma once -typedef void (*incoming_connection_handler)(int fd, int epoll_fd, void *); -void incoming_new(char *, char *, int, incoming_connection_handler, void *); +typedef void (*incoming_connection_handler)(int fd, void *); +void incoming_new(char *, char *, incoming_connection_handler, void *);