From 5671f37f8ae70294f239e88de4b8c1e5a65cfdf7 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 17 Feb 2016 17:19:57 -0800 Subject: [PATCH] Split backend and outgoing. Rename backend -> receive, client -> send. --- Makefile | 4 +- adsbus.c | 123 +++++++++++++++++++----------- airspy_adsb.c | 2 +- backend.c | 207 -------------------------------------------------- backend.h | 12 --- beast.c | 2 +- client.c | 145 ----------------------------------- client.h | 10 --- common.h | 2 - json.c | 6 +- outgoing.c | 105 +++++++++++++++++++++++++ outgoing.h | 4 + receive.c | 106 ++++++++++++++++++++++++++ receive.h | 10 +++ send.c | 145 +++++++++++++++++++++++++++++++++++ send.h | 10 +++ 16 files changed, 464 insertions(+), 429 deletions(-) delete mode 100644 backend.c delete mode 100644 backend.h delete mode 100644 client.c delete mode 100644 client.h create mode 100644 outgoing.c create mode 100644 outgoing.h create mode 100644 receive.c create mode 100644 receive.h create mode 100644 send.c create mode 100644 send.h diff --git a/Makefile b/Makefile index 7f68b7d..e723fab 100644 --- a/Makefile +++ b/Makefile @@ -11,5 +11,5 @@ clean: %.o: %.c *.h $(CC) -c $(CFLAGS) $< -o $@ -adsbus: adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o - $(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS) +adsbus: adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o common.o + $(CC) $(LDFLAGS) -o adsbus adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS) diff --git a/adsbus.c b/adsbus.c index 0868053..8ed6dd9 100644 --- a/adsbus.c +++ b/adsbus.c @@ -4,9 +4,12 @@ #include #include "common.h" -#include "backend.h" -#include "client.h" + +#include "receive.h" +#include "send.h" + #include "incoming.h" +#include "outgoing.h" #include "airspy_adsb.h" #include "beast.h" @@ -20,106 +23,134 @@ static void print_usage(const char *name) { "\n" "Options:\n" "\t--help\n" - "\t--backend=HOST/PORT\n" "\t--dump=FORMAT\n" - "\t--incoming=[HOST/]PORT\n" - "\t--listen=FORMAT=[HOST/]PORT\n" + "\t--connect-receive=HOST/PORT\n" + "\t--connect-send=FORMAT=HOST/PORT\n" + "\t--listen-receive=[HOST/]PORT\n" + "\t--listen-send=FORMAT=[HOST/]PORT\n" , name); - backend_print_usage(); - client_print_usage(); + receive_print_usage(); + send_print_usage(); } static bool add_dump(char *arg) { - struct serializer *serializer = client_get_serializer(arg); + struct serializer *serializer = send_get_serializer(arg); if (!serializer) { fprintf(stderr, "Unknown --dump=FORMAT: %s\n", arg); return false; } - client_add(1, serializer); + send_add(1, serializer); return true; } -static bool add_backend(char *arg) { +static bool add_connect_receive(char *arg) { char *port = strrchr(arg, '/'); if (!port) { - fprintf(stderr, "Invalid --backend=HOST/PORT (missing \"/\"): %s\n", arg); + fprintf(stderr, "Invalid --connect-receive=HOST/PORT (missing \"/\"): %s\n", arg); return false; } *(port++) = '\0'; - backend_new(arg, port); + outgoing_new(arg, port, receive_new, NULL); return true; } -static bool add_incoming(char *arg){ - char *port = strrchr(arg, '/'); - if (port) { - *(port++) = '\0'; - incoming_new(arg, port, backend_new_fd_wrapper, NULL); - } else { - incoming_new(NULL, arg, backend_new_fd_wrapper, NULL); - } - return true; -} - -static bool add_listener(char *arg) { +static bool add_connect_send(char *arg) { char *host_port = strchr(arg, '='); if (!host_port) { - fprintf(stderr, "Invalid --listener=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg); + fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"=\"): %s\n", arg); return false; } *(host_port++) = '\0'; - struct serializer *serializer = client_get_serializer(arg); + struct serializer *serializer = send_get_serializer(arg); if (!serializer) { - fprintf(stderr, "Unknown --listener=FORMAT=[HOST/]PORT format: %s\n", arg); + fprintf(stderr, "Unknown --connect-send=FORMAT=HOST/PORT format: %s\n", arg); + return false; + } + + char *port = strrchr(host_port, '/'); + if (!port) { + fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"/\"): %s\n", host_port); + return false; + } + *(port++) = '\0'; + + incoming_new(host_port, port, send_add_wrapper, serializer); + return true; +} + +static bool add_listen_receive(char *arg){ + char *port = strrchr(arg, '/'); + if (port) { + *(port++) = '\0'; + incoming_new(arg, port, receive_new, NULL); + } else { + incoming_new(NULL, arg, receive_new, NULL); + } + return true; +} + +static bool add_listen_send(char *arg) { + char *host_port = strchr(arg, '='); + if (!host_port) { + fprintf(stderr, "Invalid --listen-send=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg); + return false; + } + *(host_port++) = '\0'; + + struct serializer *serializer = send_get_serializer(arg); + if (!serializer) { + fprintf(stderr, "Unknown --listen-send=FORMAT=[HOST/]PORT format: %s\n", arg); return false; } char *port = strrchr(host_port, '/'); if (port) { *(port++) = '\0'; - incoming_new(host_port, port, client_add_wrapper, serializer); + incoming_new(host_port, port, send_add_wrapper, serializer); } else { - incoming_new(NULL, host_port, client_add_wrapper, serializer); + incoming_new(NULL, host_port, send_add_wrapper, serializer); } return true; } static bool parse_opts(int argc, char *argv[]) { static struct option long_options[] = { - {"backend", required_argument, 0, 'b'}, - {"dump", required_argument, 0, 'd'}, - {"incoming", required_argument, 0, 'i'}, - {"listen", required_argument, 0, 'l'}, - {"help", no_argument, 0, 'h'}, - {0, 0, 0, 0 }, + {"dump", required_argument, 0, 'd'}, + {"connect-receive", required_argument, 0, 'c'}, + {"connect-send", required_argument, 0, 's'}, + {"listen-receive", required_argument, 0, 'l'}, + {"listen-send", required_argument, 0, 'm'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0 }, }; int opt; while ((opt = getopt_long_only(argc, argv, "", long_options, NULL)) != -1) { bool (*handler)(char *) = NULL; switch (opt) { - case 'b': - handler = add_backend; - break; - case 'd': handler = add_dump; break; - case 'h': - print_usage(argv[0]); - return false; + case 'c': + handler = add_connect_receive; + break; - case 'i': - handler = add_incoming; + case 's': + handler = add_connect_send; break; case 'l': - handler = add_listener; + handler = add_listen_receive; break; + case 'm': + handler = add_listen_send; + break; + + case 'h': default: print_usage(argv[0]); return false; @@ -145,7 +176,7 @@ static bool parse_opts(int argc, char *argv[]) { int main(int argc, char *argv[]) { peer_init(); hex_init(); - client_init(); + send_init(); airspy_adsb_init(); beast_init(); json_init(); diff --git a/airspy_adsb.c b/airspy_adsb.c index 919f952..36f09c4 100644 --- a/airspy_adsb.c +++ b/airspy_adsb.c @@ -3,7 +3,7 @@ #include #include "common.h" -#include "backend.h" +#include "receive.h" #include "airspy_adsb.h" struct __attribute__((packed)) airspy_adsb_common_overlay { diff --git a/backend.c b/backend.c deleted file mode 100644 index 2ebaffa..0000000 --- a/backend.c +++ /dev/null @@ -1,207 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include "airspy_adsb.h" -#include "beast.h" -#include "raw.h" - -#include "client.h" -#include "backend.h" - -typedef bool (*parser_wrapper)(struct backend *, struct packet *); -typedef bool (*parser)(struct buf *, struct packet *, void *state); -struct backend { - struct peer peer; - char id[UUID_LEN]; - const char *node; - const char *service; - struct addrinfo *addrs; - struct addrinfo *addr; - struct buf buf; - char parser_state[PARSER_STATE_LEN]; - parser_wrapper parser_wrapper; - parser parser; -}; - -static void backend_connect_result(struct backend *, int); - -struct parser { - char *name; - parser parse; -} parsers[] = { - { - .name = "airspy_adsb", - .parse = airspy_adsb_parse, - }, - { - .name = "beast", - .parse = beast_parse, - }, - { - .name = "raw", - .parse = raw_parse, - }, -}; -#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) - -static bool backend_parse_wrapper(struct backend *backend, struct packet *packet) { - return backend->parser(&backend->buf, packet, backend->parser_state); -} - -static bool backend_autodetect_parse(struct backend *backend, struct packet *packet) { - struct buf *buf = &backend->buf; - void *state = backend->parser_state; - - for (int i = 0; i < NUM_PARSERS; i++) { - if (parsers[i].parse(buf, packet, state)) { - fprintf(stderr, "B %s: Detected input format %s\n", backend->id, parsers[i].name); - backend->parser_wrapper = backend_parse_wrapper; - backend->parser = parsers[i].parse; - return true; - } - } - return false; -} - -static struct backend *backend_create() { - struct backend *backend = malloc(sizeof(*backend)); - assert(backend); - uuid_gen(backend->id); - backend->peer.fd = -1; - backend->node = NULL; - backend->service = NULL; - buf_init(&backend->buf); - memset(backend->parser_state, 0, PARSER_STATE_LEN); - backend->parser_wrapper = backend_autodetect_parse; - return backend; -} - -static void backend_connect_handler(struct peer *peer) { - struct backend *backend = (struct backend *) peer; - - peer_epoll_del(peer); - - int error; - socklen_t len = sizeof(error); - assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); - backend_connect_result(backend, error); -} - -static void backend_connect_next(struct backend *backend) { - if (backend->addr == NULL) { - freeaddrinfo(backend->addrs); - fprintf(stderr, "B %s: Can't connect to any addresses of %s/%s\n", backend->id, backend->node, backend->service); - return; - } - - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - fprintf(stderr, "B %s: Connecting to %s/%s...\n", backend->id, hbuf, sbuf); - - backend->peer.fd = socket(backend->addr->ai_family, backend->addr->ai_socktype | SOCK_NONBLOCK, backend->addr->ai_protocol); - assert(backend->peer.fd >= 0); - - int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen); - backend_connect_result(backend, result == 0 ? result : errno); -} - -static void backend_read(struct peer *peer) { - struct backend *backend = (struct backend *) peer; - - if (buf_fill(&backend->buf, backend->peer.fd) <= 0) { - fprintf(stderr, "B %s: Connection closed by backend\n", backend->id); - close(backend->peer.fd); - // TODO: reconnect - return; - } - - struct packet packet = { - .backend = backend, - }; - while (backend->parser_wrapper(backend, &packet)) { - client_write(&packet); - } - - if (backend->buf.length == BUF_LEN_MAX) { - fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id); - close(backend->peer.fd); - // TODO: reconnect - return; - } -} - -static void backend_connect_result(struct backend *backend, int result) { - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - switch (result) { - case 0: - fprintf(stderr, "B %s: Connected to %s/%s\n", backend->id, hbuf, sbuf); - freeaddrinfo(backend->addrs); - backend->peer.event_handler = backend_read; - peer_epoll_add((struct peer *) backend, EPOLLIN); - break; - - case EINPROGRESS: - backend->peer.event_handler = backend_connect_handler; - peer_epoll_add((struct peer *) backend, EPOLLOUT); - break; - - default: - fprintf(stderr, "B %s: Can't connect to %s/%s: %s\n", backend->id, hbuf, sbuf, strerror(result)); - close(backend->peer.fd); - backend->addr = backend->addr->ai_next; - // Tail recursion :/ - backend_connect_next(backend); - break; - } -} - -static void backend_connect(struct backend *backend) { - fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service); - - struct addrinfo hints = { - .ai_family = AF_UNSPEC, - .ai_socktype = SOCK_STREAM, - }; - - int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs); - if (gai_err) { - fprintf(stderr, "B %s: Failed to resolve %s/%s: %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err)); - return; - } - backend->addr = backend->addrs; - backend_connect_next(backend); -} - -void backend_new(const char *node, const char *service) { - struct backend *backend = backend_create(); - backend->node = node; - backend->service = service; - backend_connect(backend); -} - -void backend_new_fd(int fd) { - struct backend *backend = backend_create(); - backend->peer.fd = fd; - backend->peer.event_handler = backend_read; - peer_epoll_add((struct peer *) backend, EPOLLIN); - - fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id); -} - -void backend_new_fd_wrapper(int fd, void *unused) { - backend_new_fd(fd); -} - -void backend_print_usage() { - fprintf(stderr, "\nSupported input formats (autodetected):\n"); - for (int i = 0; i < NUM_PARSERS; i++) { - fprintf(stderr, "\t%s\n", parsers[i].name); - } -} diff --git a/backend.h b/backend.h deleted file mode 100644 index 7fc4559..0000000 --- a/backend.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include - -#include "common.h" - -#define PARSER_STATE_LEN 256 - -void backend_new(const char *, const char *); -void backend_new_fd(int); -void backend_new_fd_wrapper(int, void *); -void backend_print_usage(); diff --git a/beast.c b/beast.c index 617c89b..57f43be 100644 --- a/beast.c +++ b/beast.c @@ -4,7 +4,7 @@ #include #include "common.h" -#include "backend.h" +#include "receive.h" #include "beast.h" struct __attribute__((packed)) beast_common_overlay { diff --git a/client.c b/client.c deleted file mode 100644 index d4b0a61..0000000 --- a/client.c +++ /dev/null @@ -1,145 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common.h" -#include "json.h" -#include "stats.h" -#include "client.h" - -struct client { - struct peer peer; - char id[UUID_LEN]; - struct serializer *serializer; - struct client *prev; - struct client *next; -}; - -typedef void (*serializer)(struct packet *, struct buf *); -struct serializer { - char *name; - serializer serialize; - struct client *client_head; -} serializers[] = { - { - .name = "json", - .serialize = json_serialize, - }, - { - .name = "stats", - .serialize = stats_serialize, - }, -}; -#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) - -static void client_hangup(struct client *client) { - fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, client->serializer->name); - if (client->prev) { - client->prev->next = client->next; - } else { - client->serializer->client_head = client->next; - } - if (client->next) { - client->next->prev = client->prev; - } - close(client->peer.fd); - free(client); -} - -static void client_hangup_wrapper(struct peer *peer) { - client_hangup((struct client *) peer); -} - -static bool client_hello(int fd, struct serializer *serializer) { - struct buf buf = BUF_INIT; - serializer->serialize(NULL, &buf); - if (buf.length == 0) { - return true; - } - if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) { - return false; - } - return true; -} - -void client_init() { - signal(SIGPIPE, SIG_IGN); -} - -struct serializer *client_get_serializer(char *name) { - for (int i = 0; i < NUM_SERIALIZERS; i++) { - if (strcasecmp(serializers[i].name, name) == 0) { - return &serializers[i]; - } - } - return NULL; -} - -void client_add(int fd, struct serializer *serializer) { - int flags = fcntl(fd, F_GETFL, 0); - assert(flags >= 0); - flags |= O_NONBLOCK; - assert(fcntl(fd, F_SETFL, flags) == 0); - - if (!client_hello(fd, serializer)) { - fprintf(stderr, "C xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello to client\n"); - return; - } - - struct client *client = malloc(sizeof(*client)); - assert(client); - - client->peer.fd = fd; - client->peer.event_handler = client_hangup_wrapper; - - uuid_gen(client->id); - client->serializer = serializer; - client->prev = NULL; - client->next = serializer->client_head; - serializer->client_head = client; - - // Only listen for hangup - peer_epoll_add((struct peer *) client, EPOLLIN); - - fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name); -} - -void client_add_wrapper(int fd, void *passthrough) { - client_add(fd, (struct serializer *) passthrough); -} - -void client_write(struct packet *packet) { - for (int i = 0; i < NUM_SERIALIZERS; i++) { - struct serializer *serializer = &serializers[i]; - if (serializer->client_head == NULL) { - continue; - } - struct buf buf = BUF_INIT; - serializer->serialize(packet, &buf); - if (buf.length == 0) { - continue; - } - struct client *client = serializer->client_head; - while (client) { - if (write(client->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) { - client = client->next; - } else { - struct client *next = client->next; - client_hangup(client); - client = next; - } - } - } -} - -void client_print_usage() { - fprintf(stderr, "\nSupported output formats:\n"); - for (int i = 0; i < NUM_SERIALIZERS; i++) { - fprintf(stderr, "\t%s\n", serializers[i].name); - } -} diff --git a/client.h b/client.h deleted file mode 100644 index e65d6c1..0000000 --- a/client.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once - -#include "common.h" - -void client_init(); -struct serializer *client_get_serializer(char *); -void client_add(int, struct serializer *); -void client_add_wrapper(int, void *); -void client_write(struct packet *); -void client_print_usage(); diff --git a/common.h b/common.h index 20389de..c70085e 100644 --- a/common.h +++ b/common.h @@ -44,7 +44,6 @@ void buf_consume(struct buf *, size_t); //////// packet #define DATA_LEN_MAX 14 -struct backend; struct packet { enum { MODE_S_SHORT, @@ -54,7 +53,6 @@ struct packet { uint8_t payload[DATA_LEN_MAX]; uint64_t mlat_timestamp; uint32_t rssi; - struct backend *backend; }; extern char *packet_type_names[]; diff --git a/json.c b/json.c index bc3dafc..4918634 100644 --- a/json.c +++ b/json.c @@ -3,8 +3,8 @@ #include #include -#include "backend.h" -#include "client.h" +#include "receive.h" +#include "send.h" #include "json.h" static void json_serialize_to_buf(json_t *obj, struct buf *buf) { @@ -14,7 +14,7 @@ static void json_serialize_to_buf(json_t *obj, struct buf *buf) { } static void json_hello(struct buf *buf) { - json_t *hello = json_pack("{sssIsIsI}", + json_t *hello = json_pack("{sIsIsI}", "mlat_timestamp_mhz", (json_int_t) MLAT_MHZ, "mlat_timestamp_max", (json_int_t) MLAT_MAX, "rssi_max", (json_int_t) RSSI_MAX); diff --git a/outgoing.c b/outgoing.c new file mode 100644 index 0000000..b2e3755 --- /dev/null +++ b/outgoing.c @@ -0,0 +1,105 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "outgoing.h" + +struct outgoing { + struct peer peer; + char id[UUID_LEN]; + const char *node; + const char *service; + struct addrinfo *addrs; + struct addrinfo *addr; + outgoing_connection_handler handler; + void *passthrough; +}; + +static void outgoing_connect_result(struct outgoing *, int); + +static void outgoing_connect_next(struct outgoing *outgoing) { + if (outgoing->addr == NULL) { + freeaddrinfo(outgoing->addrs); + fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service); + return; + } + + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + fprintf(stderr, "O %s: Connecting to %s/%s...\n", outgoing->id, hbuf, sbuf); + + outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK, outgoing->addr->ai_protocol); + assert(outgoing->peer.fd >= 0); + + int result = connect(outgoing->peer.fd, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); + outgoing_connect_result(outgoing, result == 0 ? result : errno); +} + +static void outgoing_connect_handler(struct peer *peer) { + struct outgoing *outgoing = (struct outgoing *) peer; + + peer_epoll_del(peer); + + int error; + socklen_t len = sizeof(error); + assert(getsockopt(outgoing->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); + outgoing_connect_result(outgoing, error); +} + +static void outgoing_connect_result(struct outgoing *outgoing, int result) { + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + switch (result) { + case 0: + fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); + freeaddrinfo(outgoing->addrs); + outgoing->handler(outgoing->peer.fd, outgoing->passthrough); + break; + + case EINPROGRESS: + outgoing->peer.event_handler = outgoing_connect_handler; + peer_epoll_add((struct peer *) outgoing, EPOLLOUT); + break; + + default: + fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result)); + close(outgoing->peer.fd); + outgoing->addr = outgoing->addr->ai_next; + // Tail recursion :/ + outgoing_connect_next(outgoing); + break; + } +} + +static void outgoing_resolve(struct outgoing *outgoing) { + fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service); + + struct addrinfo hints = { + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }; + + int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs); + if (gai_err) { + fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err)); + return; + } + outgoing->addr = outgoing->addrs; + outgoing_connect_next(outgoing); +} + +void outgoing_new(const char *node, const char *service, outgoing_connection_handler handler, void *passthrough) { + struct outgoing *outgoing = malloc(sizeof(*outgoing)); + uuid_gen(outgoing->id); + outgoing->node = node; + outgoing->service = service; + outgoing->handler = handler; + outgoing->passthrough = passthrough; + outgoing_resolve(outgoing); +} diff --git a/outgoing.h b/outgoing.h new file mode 100644 index 0000000..847be8c --- /dev/null +++ b/outgoing.h @@ -0,0 +1,4 @@ +#pragma once + +typedef void (*outgoing_connection_handler)(int fd, void *); +void outgoing_new(const char *, const char *, outgoing_connection_handler, void *); diff --git a/receive.c b/receive.c new file mode 100644 index 0000000..a06c5ae --- /dev/null +++ b/receive.c @@ -0,0 +1,106 @@ +#include +#include +#include +#include + +#include "airspy_adsb.h" +#include "beast.h" +#include "raw.h" + +#include "send.h" + +#include "receive.h" + +struct receive; +typedef bool (*parser_wrapper)(struct receive *, struct packet *); +typedef bool (*parser)(struct buf *, struct packet *, void *state); +struct receive { + struct peer peer; + char id[UUID_LEN]; + struct buf buf; + char parser_state[PARSER_STATE_LEN]; + parser_wrapper parser_wrapper; + parser parser; +}; + +struct parser { + char *name; + parser parse; +} parsers[] = { + { + .name = "airspy_adsb", + .parse = airspy_adsb_parse, + }, + { + .name = "beast", + .parse = beast_parse, + }, + { + .name = "raw", + .parse = raw_parse, + }, +}; +#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) + +static bool receive_parse_wrapper(struct receive *receive, struct packet *packet) { + return receive->parser(&receive->buf, packet, receive->parser_state); +} + +static bool receive_autodetect_parse(struct receive *receive, struct packet *packet) { + struct buf *buf = &receive->buf; + void *state = receive->parser_state; + + for (int i = 0; i < NUM_PARSERS; i++) { + if (parsers[i].parse(buf, packet, state)) { + fprintf(stderr, "R %s: Detected input format %s\n", receive->id, parsers[i].name); + receive->parser_wrapper = receive_parse_wrapper; + receive->parser = parsers[i].parse; + return true; + } + } + return false; +} + +static void receive_read(struct peer *peer) { + struct receive *receive = (struct receive *) peer; + + if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { + fprintf(stderr, "R %s: Connection closed by peer\n", receive->id); + close(receive->peer.fd); + // TODO: reconnect + return; + } + + struct packet packet = { 0 }; + while (receive->parser_wrapper(receive, &packet)) { + send_write(&packet); + } + + if (receive->buf.length == BUF_LEN_MAX) { + fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id); + close(receive->peer.fd); + // TODO: reconnect + return; + } +} + +void receive_new(int fd, void *unused) { + struct receive *receive = malloc(sizeof(*receive)); + assert(receive); + uuid_gen(receive->id); + receive->peer.fd = fd; + buf_init(&receive->buf); + memset(receive->parser_state, 0, PARSER_STATE_LEN); + receive->parser_wrapper = receive_autodetect_parse; + receive->peer.event_handler = receive_read; + peer_epoll_add((struct peer *) receive, EPOLLIN); + + fprintf(stderr, "R %s: New connection\n", receive->id); +} + +void receive_print_usage() { + fprintf(stderr, "\nSupported receive formats (autodetected):\n"); + for (int i = 0; i < NUM_PARSERS; i++) { + fprintf(stderr, "\t%s\n", parsers[i].name); + } +} diff --git a/receive.h b/receive.h new file mode 100644 index 0000000..4b6d264 --- /dev/null +++ b/receive.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +#include "common.h" + +#define PARSER_STATE_LEN 256 + +void receive_new(int, void *); +void receive_print_usage(); diff --git a/send.c b/send.c new file mode 100644 index 0000000..52bf204 --- /dev/null +++ b/send.c @@ -0,0 +1,145 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "json.h" +#include "stats.h" +#include "send.h" + +struct send { + struct peer peer; + char id[UUID_LEN]; + struct serializer *serializer; + struct send *prev; + struct send *next; +}; + +typedef void (*serializer)(struct packet *, struct buf *); +struct serializer { + char *name; + serializer serialize; + struct send *send_head; +} serializers[] = { + { + .name = "json", + .serialize = json_serialize, + }, + { + .name = "stats", + .serialize = stats_serialize, + }, +}; +#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) + +static void send_hangup(struct send *send) { + fprintf(stderr, "S %s (%s): Peer disconnected\n", send->id, send->serializer->name); + if (send->prev) { + send->prev->next = send->next; + } else { + send->serializer->send_head = send->next; + } + if (send->next) { + send->next->prev = send->prev; + } + close(send->peer.fd); + free(send); +} + +static void send_hangup_wrapper(struct peer *peer) { + send_hangup((struct send *) peer); +} + +static bool send_hello(int fd, struct serializer *serializer) { + struct buf buf = BUF_INIT; + serializer->serialize(NULL, &buf); + if (buf.length == 0) { + return true; + } + if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) { + return false; + } + return true; +} + +void send_init() { + signal(SIGPIPE, SIG_IGN); +} + +struct serializer *send_get_serializer(char *name) { + for (int i = 0; i < NUM_SERIALIZERS; i++) { + if (strcasecmp(serializers[i].name, name) == 0) { + return &serializers[i]; + } + } + return NULL; +} + +void send_add(int fd, struct serializer *serializer) { + int flags = fcntl(fd, F_GETFL, 0); + assert(flags >= 0); + flags |= O_NONBLOCK; + assert(fcntl(fd, F_SETFL, flags) == 0); + + if (!send_hello(fd, serializer)) { + fprintf(stderr, "S xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello\n"); + return; + } + + struct send *send = malloc(sizeof(*send)); + assert(send); + + send->peer.fd = fd; + send->peer.event_handler = send_hangup_wrapper; + + uuid_gen(send->id); + send->serializer = serializer; + send->prev = NULL; + send->next = serializer->send_head; + serializer->send_head = send; + + // Only listen for hangup + peer_epoll_add((struct peer *) send, EPOLLIN); + + fprintf(stderr, "S %s (%s): New connection\n", send->id, serializer->name); +} + +void send_add_wrapper(int fd, void *passthrough) { + send_add(fd, (struct serializer *) passthrough); +} + +void send_write(struct packet *packet) { + for (int i = 0; i < NUM_SERIALIZERS; i++) { + struct serializer *serializer = &serializers[i]; + if (serializer->send_head == NULL) { + continue; + } + struct buf buf = BUF_INIT; + serializer->serialize(packet, &buf); + if (buf.length == 0) { + continue; + } + struct send *send = serializer->send_head; + while (send) { + if (write(send->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) { + send = send->next; + } else { + struct send *next = send->next; + send_hangup(send); + send = next; + } + } + } +} + +void send_print_usage() { + fprintf(stderr, "\nSupported send formats:\n"); + for (int i = 0; i < NUM_SERIALIZERS; i++) { + fprintf(stderr, "\t%s\n", serializers[i].name); + } +} diff --git a/send.h b/send.h new file mode 100644 index 0000000..b3d5fca --- /dev/null +++ b/send.h @@ -0,0 +1,10 @@ +#pragma once + +#include "common.h" + +void send_init(); +struct serializer *send_get_serializer(char *); +void send_add(int, struct serializer *); +void send_add_wrapper(int, void *); +void send_write(struct packet *); +void send_print_usage();