diff --git a/Makefile b/Makefile index c38911f..7f68b7d 100644 --- a/Makefile +++ b/Makefile @@ -11,5 +11,5 @@ clean: %.o: %.c *.h $(CC) -c $(CFLAGS) $< -o $@ -adsbus: adsbus.o backend.o client.o airspy_adsb.o beast.o json.o raw.o stats.o common.o - $(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS) +adsbus: adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o + $(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS) diff --git a/adsbus.c b/adsbus.c index f574fb9..e4271be 100644 --- a/adsbus.c +++ b/adsbus.c @@ -8,6 +8,8 @@ #include "common.h" #include "backend.h" #include "client.h" +#include "incoming.h" + #include "airspy_adsb.h" #include "beast.h" #include "json.h" @@ -32,14 +34,16 @@ static void print_usage(char *argv[]) { "\t--help\n" "\t--backend=HOST/PORT\n" "\t--dump=FORMAT\n" + "\t--incoming=[HOST/]PORT\n" , argv[0]); } static bool parse_opts(int argc, char *argv[], int epoll_fd) { static struct option long_options[] = { - {"backend", required_argument, 0, 'b'}, - {"dump", required_argument, 0, 'd'}, - {"help", no_argument, 0, 'h'}, + {"backend", required_argument, 0, 'b'}, + {"dump", required_argument, 0, 'd'}, + {"incoming", required_argument, 0, 'i'}, + {"help", no_argument, 0, 'h'}, }; int opt; @@ -69,6 +73,17 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) { print_usage(argv); return false; + case 'i': + delim = strrchr(optarg, '/'); + if (delim == NULL) { + incoming_new(NULL, optarg, epoll_fd, backend_new_fd); + } else { + *delim = '\0'; + delim++; + incoming_new(optarg, delim, epoll_fd, backend_new_fd); + } + break; + default: print_usage(argv); return false; diff --git a/backend.c b/backend.c index 8484da1..04c3fba 100644 --- a/backend.c +++ b/backend.c @@ -43,19 +43,35 @@ struct parser { #define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) -void backend_new(char *node, char *service, int epoll_fd) { +static struct backend *backend_create() { struct backend *backend = malloc(sizeof(*backend)); assert(backend); - backend->peer.fd = -1; uuid_gen(backend->id); - backend->node = node; - backend->service = service; + backend->peer.fd = -1; + backend->node = NULL; + backend->service = NULL; buf_init(&backend->buf); memset(backend->parser_state, 0, PARSER_STATE_LEN); backend->parser = backend_autodetect_parse; + return backend; +} + +void backend_new(char *node, char *service, int epoll_fd) { + struct backend *backend = backend_create(); + backend->node = node; + backend->service = service; backend_connect(backend, epoll_fd); } +void backend_new_fd(int fd, int epoll_fd) { + struct backend *backend = backend_create(); + backend->peer.fd = fd; + backend->peer.event_handler = backend_read; + peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN); + + fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id); +} + static void backend_connect(struct backend *backend, int epoll_fd) { fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service); @@ -66,7 +82,7 @@ static void backend_connect(struct backend *backend, int epoll_fd) { int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs); if (gai_err) { - fprintf(stderr, "B %s: getaddrinfo(%s/%s): %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err)); + fprintf(stderr, "B %s: Failed to resolve %s/%s: %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err)); return; } backend->addr = backend->addrs; @@ -131,10 +147,9 @@ static void backend_connect_handler(struct peer *peer, int epoll_fd) { static void backend_read(struct peer *peer, int epoll_fd) { struct backend *backend = (struct backend *) peer; - if (buf_fill(&backend->buf, backend->peer.fd) < 0) { + if (buf_fill(&backend->buf, backend->peer.fd) <= 0) { fprintf(stderr, "B %s: Connection closed by backend\n", backend->id); close(backend->peer.fd); - peer_epoll_del(peer, epoll_fd); // TODO: reconnect return; } @@ -149,7 +164,6 @@ static void backend_read(struct peer *peer, int epoll_fd) { if (backend->buf.length == BUF_LEN_MAX) { fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id); close(backend->peer.fd); - peer_epoll_del(peer, epoll_fd); // TODO: reconnect return; } diff --git a/backend.h b/backend.h index dbab29c..d6c662f 100644 --- a/backend.h +++ b/backend.h @@ -21,4 +21,5 @@ struct backend { parser parser; }; -void backend_new(char *node, char *service, int epoll_fd); +void backend_new(char *, char *, int); +void backend_new_fd(int, int); diff --git a/common.c b/common.c index e014fbe..5ffd0f7 100644 --- a/common.c +++ b/common.c @@ -38,7 +38,7 @@ ssize_t buf_fill(struct buf *buf, int fd) { size_t space = BUF_LEN_MAX - buf->length - buf->start; ssize_t in = read(fd, buf_at(buf, buf->length), space); - if (in < 0) { + if (in <= 0) { return in; } buf->length += in; diff --git a/incoming.c b/incoming.c new file mode 100644 index 0000000..8aa788e --- /dev/null +++ b/incoming.c @@ -0,0 +1,105 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "incoming.h" + + +struct incoming { + struct peer peer; + char id[UUID_LEN]; + char *node; + char *service; + incoming_connection_handler handler; +}; + + +static void incoming_handler(struct peer *peer, int epoll_fd) { + struct incoming *incoming = (struct incoming *) peer; + + struct sockaddr peer_addr, local_addr; + socklen_t peer_addrlen = sizeof(peer_addr), local_addrlen = sizeof(local_addr); + + int fd = accept(incoming->peer.fd, &peer_addr, &peer_addrlen); + if (fd == -1) { + fprintf(stderr, "I %s: Failed to accept new connection on %s/%s: %s\n", incoming->id, incoming->node, incoming->service, strerror(errno)); + return; + } + + char peer_hbuf[NI_MAXHOST], local_hbuf[NI_MAXHOST], peer_sbuf[NI_MAXSERV], local_sbuf[NI_MAXSERV]; + assert(getsockname(fd, &local_addr, &local_addrlen) == 0); + assert(getnameinfo(&peer_addr, peer_addrlen, peer_hbuf, sizeof(peer_hbuf), peer_sbuf, sizeof(peer_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + assert(getnameinfo(&local_addr, local_addrlen, local_hbuf, sizeof(local_hbuf), local_sbuf, sizeof(local_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + + fprintf(stderr, "I %s: New connection on %s/%s (%s/%s) from %s/%s\n", + incoming->id, + incoming->node, incoming->service, + local_hbuf, local_sbuf, + peer_hbuf, peer_sbuf); + + incoming->handler(fd, epoll_fd); +} + +void incoming_new(char *node, char *service, int epoll_fd, incoming_connection_handler handler) { + struct incoming *incoming = malloc(sizeof(*incoming)); + incoming->peer.event_handler = incoming_handler; + uuid_gen(incoming->id); + incoming->node = node; + incoming->service = service; + incoming->handler = handler; + + fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service); + + struct addrinfo hints = { + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + .ai_flags = AI_PASSIVE | AI_V4MAPPED | AI_ADDRCONFIG, + }; + + struct addrinfo *addrs; + int gai_err = getaddrinfo(incoming->node, incoming->service, &hints, &addrs); + if (gai_err) { + fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(gai_err)); + free(incoming); + return; + } + + struct addrinfo *addr; + for (addr = addrs; addr; addr = addr->ai_next) { + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf); + + incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + assert(incoming->peer.fd >= 0); + + int optval = 1; + setsockopt(incoming->peer.fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); + + if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { + fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); + close(incoming->peer.fd); + continue; + } + + assert(listen(incoming->peer.fd, 255) == 0); + break; + } + + freeaddrinfo(addrs); + + if (addr == NULL) { + fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service); + free(incoming); + return; + } + + peer_epoll_add((struct peer *) incoming, epoll_fd, EPOLLIN); +} diff --git a/incoming.h b/incoming.h new file mode 100644 index 0000000..6c7c9e1 --- /dev/null +++ b/incoming.h @@ -0,0 +1,4 @@ +#pragma once + +typedef void (*incoming_connection_handler)(int fd, int epoll_fd); +void incoming_new(char *, char *, int, incoming_connection_handler);