diff --git a/adsbus.c b/adsbus.c index 2862097..38d23ad 100644 --- a/adsbus.c +++ b/adsbus.c @@ -56,12 +56,8 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) { delim++; // TODO: Fix orphan malloc. - struct backend *backend = malloc(sizeof(*backend)); + struct backend *backend = backend_new(optarg, delim, epoll_fd); assert(backend); - backend_init(backend); - if (!backend_connect(optarg, delim, backend, epoll_fd)) { - return false; - } break; case 'd': @@ -101,17 +97,7 @@ static int loop(int epoll_fd) { for (int n = 0; n < nfds; n++) { struct peer *peer = events[n].data.ptr; - switch (peer->type) { - case PEER_BACKEND: - if (!backend_read((struct backend *) peer)) { - return -1; - } - break; - - default: - fprintf(stderr, "Unpossible: unknown peer type.\n"); - return -1; - } + peer->event_handler(peer, epoll_fd); } } } diff --git a/backend.c b/backend.c index 72e69d3..1774ac7 100644 --- a/backend.c +++ b/backend.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "airspy_adsb.h" #include "beast.h" @@ -12,7 +13,12 @@ #include "backend.h" -bool backend_autodetect_parse(struct backend *, struct packet *); +static bool backend_autodetect_parse(struct backend *, struct packet *); +static void backend_connect(struct backend *, int); +static void backend_connect_next(struct backend *, int); + +static void backend_connect_handler(struct peer *, int); +static void backend_read(struct peer *, int); static parser parsers[] = { @@ -22,84 +28,101 @@ static parser parsers[] = { #define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) -void backend_init(struct backend *backend) { - backend->type = PEER_BACKEND; - backend->fd = -1; +struct backend *backend_new(char *node, char *service, int epoll_fd) { + struct backend *backend = malloc(sizeof(*backend)); + assert(backend); + backend->peer.fd = -1; + uuid_gen(backend->id); + backend->node = node; + backend->service = service; buf_init(&backend->buf); memset(backend->parser_state, 0, PARSER_STATE_LEN); backend->parser = backend_autodetect_parse; + backend_connect(backend, epoll_fd); + return backend; } -bool backend_connect(char *node, char *service, struct backend *backend, int epoll_fd) { - assert(backend->type == PEER_BACKEND); +static void backend_connect(struct backend *backend, int epoll_fd) { + fprintf(stderr, "B %s: Connecting to %s %s...\n", backend->id, backend->node, backend->service); - uuid_gen(backend->id); + struct addrinfo hints = { + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }; - struct addrinfo *addrs; - - { - struct addrinfo hints = { - .ai_family = AF_UNSPEC, - .ai_socktype = SOCK_STREAM, - }; - - int gai_err = getaddrinfo(node, service, &hints, &addrs); - if (gai_err) { - fprintf(stderr, "B %s: getaddrinfo(%s %s): %s\n", backend->id, node, service, gai_strerror(gai_err)); - return false; - } + int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs); + if (gai_err) { + fprintf(stderr, "B %s: getaddrinfo(%s %s): %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err)); + return; } + backend->addr = backend->addrs; + backend_connect_next(backend, epoll_fd); +} - { - struct addrinfo *addr; - for (addr = addrs; addr != NULL; addr = addr->ai_next) { - backend->fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (backend->fd == -1) { - perror("socket"); - continue; - } - - if (connect(backend->fd, addr->ai_addr, addr->ai_addrlen) != -1) { - break; - } - - close(backend->fd); - } - - if (addr == NULL) { - freeaddrinfo(addrs); - fprintf(stderr, "B %s: Can't connect to %s %s\n", backend->id, node, service); - return false; - } - - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - if (getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0) { +static void backend_connect_result(struct backend *backend, int epoll_fd, int result) { + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + switch (result) { + case 0: fprintf(stderr, "B %s: Connected to %s %s\n", backend->id, hbuf, sbuf); - } + freeaddrinfo(backend->addrs); + backend->peer.event_handler = backend_read; + peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN); + break; + + case EINPROGRESS: + backend->peer.event_handler = backend_connect_handler; + peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLOUT); + break; + + default: + fprintf(stderr, "B %s: Can't connect to %s %s: %s\n", backend->id, hbuf, sbuf, strerror(result)); + close(backend->peer.fd); + backend->addr = backend->addr->ai_next; + // Tail recursion :/ + backend_connect_next(backend, epoll_fd); + break; } - - freeaddrinfo(addrs); - - { - struct epoll_event ev = { - .events = EPOLLIN, - .data = { - .ptr = backend, - }, - }; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, backend->fd, &ev) == -1) { - perror("epoll_ctl"); - return false; - } - } - - return true; } -bool backend_read(struct backend *backend) { - if (buf_fill(&backend->buf, backend->fd) < 0) { +static void backend_connect_next(struct backend *backend, int epoll_fd) { + if (backend->addr == NULL) { + freeaddrinfo(backend->addrs); + fprintf(stderr, "B %s: Can't connect to %s %s\n", backend->id, backend->node, backend->service); + return; + } + + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + fprintf(stderr, "B %s: Connecting to %s %s...\n", backend->id, hbuf, sbuf); + + backend->peer.fd = socket(backend->addr->ai_family, backend->addr->ai_socktype | SOCK_NONBLOCK, backend->addr->ai_protocol); + assert(backend->peer.fd >= 0); + + int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen); + backend_connect_result(backend, epoll_fd, result == 0 ? result : errno); +} + +static void backend_connect_handler(struct peer *peer, int epoll_fd) { + struct backend *backend = (struct backend *) peer; + + peer_epoll_del(peer, epoll_fd); + + int error; + socklen_t len = sizeof(error); + assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); + backend_connect_result(backend, epoll_fd, error); +} + +static void backend_read(struct peer *peer, int epoll_fd) { + struct backend *backend = (struct backend *) peer; + + if (buf_fill(&backend->buf, backend->peer.fd) < 0) { fprintf(stderr, "B %s: Connection closed by backend\n", backend->id); - return false; + close(backend->peer.fd); + peer_epoll_del(peer, epoll_fd); + // TODO: reconnect + return; } struct packet packet = { @@ -111,14 +134,14 @@ bool backend_read(struct backend *backend) { if (backend->buf.length == BUF_LEN_MAX) { fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id); - return false; + close(backend->peer.fd); + peer_epoll_del(peer, epoll_fd); + // TODO: reconnect + return; } - return true; } -bool backend_autodetect_parse(struct backend *backend, struct packet *packet) { - assert(backend->type == PEER_BACKEND); - +static bool backend_autodetect_parse(struct backend *backend, struct packet *packet) { for (int i = 0; i < NUM_PARSERS; i++) { if (parsers[i](backend, packet)) { backend->parser = parsers[i]; diff --git a/backend.h b/backend.h index 68e3ac3..9821cea 100644 --- a/backend.h +++ b/backend.h @@ -7,17 +7,18 @@ #define PARSER_STATE_LEN 256 struct backend; +struct addrinfo; typedef bool (*parser)(struct backend *, struct packet *); struct backend { - enum peer_type type; + struct peer peer; char id[UUID_LEN]; - int fd; + char *node; + char *service; + struct addrinfo *addrs; + struct addrinfo *addr; struct buf buf; char parser_state[PARSER_STATE_LEN]; parser parser; }; - -void backend_init(struct backend *); -bool backend_connect(char *, char *, struct backend *, int); -bool backend_read(struct backend *); +struct backend *backend_new(char *node, char *service, int epoll_fd); diff --git a/common.c b/common.c index c810710..7453d8a 100644 --- a/common.c +++ b/common.c @@ -3,11 +3,27 @@ #include #include #include +#include #include #include "common.h" +void peer_epoll_add(struct peer *peer, int epoll_fd, uint32_t events) { + struct epoll_event ev = { + .events = events, + .data = { + .ptr = peer, + }, + }; + assert(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev) == 0); +} + +void peer_epoll_del(struct peer *peer, int epoll_fd) { + assert(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL) == 0); +} + + void buf_init(struct buf *buf) { buf->start = 0; buf->length = 0; diff --git a/common.h b/common.h index 0af786e..606cb60 100644 --- a/common.h +++ b/common.h @@ -7,12 +7,14 @@ //////// peer // All specific peer structs must be castable to this. +struct peer; +typedef void (*peer_event_handler)(struct peer *, int epoll_fd); struct peer { - enum peer_type { - PEER_BACKEND, - PEER_CLIENT, - } type; + int fd; + peer_event_handler event_handler; }; +void peer_epoll_add(struct peer *, int, uint32_t); +void peer_epoll_del(struct peer *, int); //////// buf