diff --git a/Makefile b/Makefile index 7f68b7d..e723fab 100644 --- a/Makefile +++ b/Makefile @@ -11,5 +11,5 @@ clean: %.o: %.c *.h $(CC) -c $(CFLAGS) $< -o $@ -adsbus: adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o - $(CC) $(LDFLAGS) -o adsbus adsbus.o backend.o client.o incoming.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS) +adsbus: adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o common.o + $(CC) $(LDFLAGS) -o adsbus adsbus.o receive.o send.o incoming.o outgoing.o airspy_adsb.o beast.o json.o raw.o stats.o common.o $(LIBS) diff --git a/adsbus.c b/adsbus.c index 0868053..8ed6dd9 100644 --- a/adsbus.c +++ b/adsbus.c @@ -4,9 +4,12 @@ #include #include "common.h" -#include "backend.h" -#include "client.h" + +#include "receive.h" +#include "send.h" + #include "incoming.h" +#include "outgoing.h" #include "airspy_adsb.h" #include "beast.h" @@ -20,106 +23,134 @@ static void print_usage(const char *name) { "\n" "Options:\n" "\t--help\n" - "\t--backend=HOST/PORT\n" "\t--dump=FORMAT\n" - "\t--incoming=[HOST/]PORT\n" - "\t--listen=FORMAT=[HOST/]PORT\n" + "\t--connect-receive=HOST/PORT\n" + "\t--connect-send=FORMAT=HOST/PORT\n" + "\t--listen-receive=[HOST/]PORT\n" + "\t--listen-send=FORMAT=[HOST/]PORT\n" , name); - backend_print_usage(); - client_print_usage(); + receive_print_usage(); + send_print_usage(); } static bool add_dump(char *arg) { - struct serializer *serializer = client_get_serializer(arg); + struct serializer *serializer = send_get_serializer(arg); if (!serializer) { fprintf(stderr, "Unknown --dump=FORMAT: %s\n", arg); return false; } - client_add(1, serializer); + send_add(1, serializer); return true; } -static bool add_backend(char *arg) { +static bool add_connect_receive(char *arg) { char *port = strrchr(arg, '/'); if (!port) { - fprintf(stderr, "Invalid --backend=HOST/PORT (missing \"/\"): %s\n", arg); + fprintf(stderr, "Invalid --connect-receive=HOST/PORT (missing \"/\"): %s\n", arg); return false; } *(port++) = '\0'; - backend_new(arg, port); + outgoing_new(arg, port, receive_new, NULL); return true; } -static bool add_incoming(char *arg){ - char *port = strrchr(arg, '/'); - if (port) { - *(port++) = '\0'; - incoming_new(arg, port, backend_new_fd_wrapper, NULL); - } else { - incoming_new(NULL, arg, backend_new_fd_wrapper, NULL); - } - return true; -} - -static bool add_listener(char *arg) { +static bool add_connect_send(char *arg) { char *host_port = strchr(arg, '='); if (!host_port) { - fprintf(stderr, "Invalid --listener=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg); + fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"=\"): %s\n", arg); return false; } *(host_port++) = '\0'; - struct serializer *serializer = client_get_serializer(arg); + struct serializer *serializer = send_get_serializer(arg); if (!serializer) { - fprintf(stderr, "Unknown --listener=FORMAT=[HOST/]PORT format: %s\n", arg); + fprintf(stderr, "Unknown --connect-send=FORMAT=HOST/PORT format: %s\n", arg); + return false; + } + + char *port = strrchr(host_port, '/'); + if (!port) { + fprintf(stderr, "Invalid --connect-send=FORMAT=HOST/PORT (missing \"/\"): %s\n", host_port); + return false; + } + *(port++) = '\0'; + + incoming_new(host_port, port, send_add_wrapper, serializer); + return true; +} + +static bool add_listen_receive(char *arg){ + char *port = strrchr(arg, '/'); + if (port) { + *(port++) = '\0'; + incoming_new(arg, port, receive_new, NULL); + } else { + incoming_new(NULL, arg, receive_new, NULL); + } + return true; +} + +static bool add_listen_send(char *arg) { + char *host_port = strchr(arg, '='); + if (!host_port) { + fprintf(stderr, "Invalid --listen-send=FORMAT=[HOST/]PORT (missing \"=\"): %s\n", arg); + return false; + } + *(host_port++) = '\0'; + + struct serializer *serializer = send_get_serializer(arg); + if (!serializer) { + fprintf(stderr, "Unknown --listen-send=FORMAT=[HOST/]PORT format: %s\n", arg); return false; } char *port = strrchr(host_port, '/'); if (port) { *(port++) = '\0'; - incoming_new(host_port, port, client_add_wrapper, serializer); + incoming_new(host_port, port, send_add_wrapper, serializer); } else { - incoming_new(NULL, host_port, client_add_wrapper, serializer); + incoming_new(NULL, host_port, send_add_wrapper, serializer); } return true; } static bool parse_opts(int argc, char *argv[]) { static struct option long_options[] = { - {"backend", required_argument, 0, 'b'}, - {"dump", required_argument, 0, 'd'}, - {"incoming", required_argument, 0, 'i'}, - {"listen", required_argument, 0, 'l'}, - {"help", no_argument, 0, 'h'}, - {0, 0, 0, 0 }, + {"dump", required_argument, 0, 'd'}, + {"connect-receive", required_argument, 0, 'c'}, + {"connect-send", required_argument, 0, 's'}, + {"listen-receive", required_argument, 0, 'l'}, + {"listen-send", required_argument, 0, 'm'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0 }, }; int opt; while ((opt = getopt_long_only(argc, argv, "", long_options, NULL)) != -1) { bool (*handler)(char *) = NULL; switch (opt) { - case 'b': - handler = add_backend; - break; - case 'd': handler = add_dump; break; - case 'h': - print_usage(argv[0]); - return false; + case 'c': + handler = add_connect_receive; + break; - case 'i': - handler = add_incoming; + case 's': + handler = add_connect_send; break; case 'l': - handler = add_listener; + handler = add_listen_receive; break; + case 'm': + handler = add_listen_send; + break; + + case 'h': default: print_usage(argv[0]); return false; @@ -145,7 +176,7 @@ static bool parse_opts(int argc, char *argv[]) { int main(int argc, char *argv[]) { peer_init(); hex_init(); - client_init(); + send_init(); airspy_adsb_init(); beast_init(); json_init(); diff --git a/airspy_adsb.c b/airspy_adsb.c index 919f952..36f09c4 100644 --- a/airspy_adsb.c +++ b/airspy_adsb.c @@ -3,7 +3,7 @@ #include #include "common.h" -#include "backend.h" +#include "receive.h" #include "airspy_adsb.h" struct __attribute__((packed)) airspy_adsb_common_overlay { diff --git a/backend.c b/backend.c deleted file mode 100644 index 2ebaffa..0000000 --- a/backend.c +++ /dev/null @@ -1,207 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include "airspy_adsb.h" -#include "beast.h" -#include "raw.h" - -#include "client.h" -#include "backend.h" - -typedef bool (*parser_wrapper)(struct backend *, struct packet *); -typedef bool (*parser)(struct buf *, struct packet *, void *state); -struct backend { - struct peer peer; - char id[UUID_LEN]; - const char *node; - const char *service; - struct addrinfo *addrs; - struct addrinfo *addr; - struct buf buf; - char parser_state[PARSER_STATE_LEN]; - parser_wrapper parser_wrapper; - parser parser; -}; - -static void backend_connect_result(struct backend *, int); - -struct parser { - char *name; - parser parse; -} parsers[] = { - { - .name = "airspy_adsb", - .parse = airspy_adsb_parse, - }, - { - .name = "beast", - .parse = beast_parse, - }, - { - .name = "raw", - .parse = raw_parse, - }, -}; -#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) - -static bool backend_parse_wrapper(struct backend *backend, struct packet *packet) { - return backend->parser(&backend->buf, packet, backend->parser_state); -} - -static bool backend_autodetect_parse(struct backend *backend, struct packet *packet) { - struct buf *buf = &backend->buf; - void *state = backend->parser_state; - - for (int i = 0; i < NUM_PARSERS; i++) { - if (parsers[i].parse(buf, packet, state)) { - fprintf(stderr, "B %s: Detected input format %s\n", backend->id, parsers[i].name); - backend->parser_wrapper = backend_parse_wrapper; - backend->parser = parsers[i].parse; - return true; - } - } - return false; -} - -static struct backend *backend_create() { - struct backend *backend = malloc(sizeof(*backend)); - assert(backend); - uuid_gen(backend->id); - backend->peer.fd = -1; - backend->node = NULL; - backend->service = NULL; - buf_init(&backend->buf); - memset(backend->parser_state, 0, PARSER_STATE_LEN); - backend->parser_wrapper = backend_autodetect_parse; - return backend; -} - -static void backend_connect_handler(struct peer *peer) { - struct backend *backend = (struct backend *) peer; - - peer_epoll_del(peer); - - int error; - socklen_t len = sizeof(error); - assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); - backend_connect_result(backend, error); -} - -static void backend_connect_next(struct backend *backend) { - if (backend->addr == NULL) { - freeaddrinfo(backend->addrs); - fprintf(stderr, "B %s: Can't connect to any addresses of %s/%s\n", backend->id, backend->node, backend->service); - return; - } - - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - fprintf(stderr, "B %s: Connecting to %s/%s...\n", backend->id, hbuf, sbuf); - - backend->peer.fd = socket(backend->addr->ai_family, backend->addr->ai_socktype | SOCK_NONBLOCK, backend->addr->ai_protocol); - assert(backend->peer.fd >= 0); - - int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen); - backend_connect_result(backend, result == 0 ? result : errno); -} - -static void backend_read(struct peer *peer) { - struct backend *backend = (struct backend *) peer; - - if (buf_fill(&backend->buf, backend->peer.fd) <= 0) { - fprintf(stderr, "B %s: Connection closed by backend\n", backend->id); - close(backend->peer.fd); - // TODO: reconnect - return; - } - - struct packet packet = { - .backend = backend, - }; - while (backend->parser_wrapper(backend, &packet)) { - client_write(&packet); - } - - if (backend->buf.length == BUF_LEN_MAX) { - fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id); - close(backend->peer.fd); - // TODO: reconnect - return; - } -} - -static void backend_connect_result(struct backend *backend, int result) { - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - switch (result) { - case 0: - fprintf(stderr, "B %s: Connected to %s/%s\n", backend->id, hbuf, sbuf); - freeaddrinfo(backend->addrs); - backend->peer.event_handler = backend_read; - peer_epoll_add((struct peer *) backend, EPOLLIN); - break; - - case EINPROGRESS: - backend->peer.event_handler = backend_connect_handler; - peer_epoll_add((struct peer *) backend, EPOLLOUT); - break; - - default: - fprintf(stderr, "B %s: Can't connect to %s/%s: %s\n", backend->id, hbuf, sbuf, strerror(result)); - close(backend->peer.fd); - backend->addr = backend->addr->ai_next; - // Tail recursion :/ - backend_connect_next(backend); - break; - } -} - -static void backend_connect(struct backend *backend) { - fprintf(stderr, "B %s: Resolving %s/%s...\n", backend->id, backend->node, backend->service); - - struct addrinfo hints = { - .ai_family = AF_UNSPEC, - .ai_socktype = SOCK_STREAM, - }; - - int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs); - if (gai_err) { - fprintf(stderr, "B %s: Failed to resolve %s/%s: %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err)); - return; - } - backend->addr = backend->addrs; - backend_connect_next(backend); -} - -void backend_new(const char *node, const char *service) { - struct backend *backend = backend_create(); - backend->node = node; - backend->service = service; - backend_connect(backend); -} - -void backend_new_fd(int fd) { - struct backend *backend = backend_create(); - backend->peer.fd = fd; - backend->peer.event_handler = backend_read; - peer_epoll_add((struct peer *) backend, EPOLLIN); - - fprintf(stderr, "B %s: New backend from incoming connection\n", backend->id); -} - -void backend_new_fd_wrapper(int fd, void *unused) { - backend_new_fd(fd); -} - -void backend_print_usage() { - fprintf(stderr, "\nSupported input formats (autodetected):\n"); - for (int i = 0; i < NUM_PARSERS; i++) { - fprintf(stderr, "\t%s\n", parsers[i].name); - } -} diff --git a/backend.h b/backend.h deleted file mode 100644 index 7fc4559..0000000 --- a/backend.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include - -#include "common.h" - -#define PARSER_STATE_LEN 256 - -void backend_new(const char *, const char *); -void backend_new_fd(int); -void backend_new_fd_wrapper(int, void *); -void backend_print_usage(); diff --git a/beast.c b/beast.c index 617c89b..57f43be 100644 --- a/beast.c +++ b/beast.c @@ -4,7 +4,7 @@ #include #include "common.h" -#include "backend.h" +#include "receive.h" #include "beast.h" struct __attribute__((packed)) beast_common_overlay { diff --git a/client.c b/client.c deleted file mode 100644 index d4b0a61..0000000 --- a/client.c +++ /dev/null @@ -1,145 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common.h" -#include "json.h" -#include "stats.h" -#include "client.h" - -struct client { - struct peer peer; - char id[UUID_LEN]; - struct serializer *serializer; - struct client *prev; - struct client *next; -}; - -typedef void (*serializer)(struct packet *, struct buf *); -struct serializer { - char *name; - serializer serialize; - struct client *client_head; -} serializers[] = { - { - .name = "json", - .serialize = json_serialize, - }, - { - .name = "stats", - .serialize = stats_serialize, - }, -}; -#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) - -static void client_hangup(struct client *client) { - fprintf(stderr, "C %s (%s): Client disconnected\n", client->id, client->serializer->name); - if (client->prev) { - client->prev->next = client->next; - } else { - client->serializer->client_head = client->next; - } - if (client->next) { - client->next->prev = client->prev; - } - close(client->peer.fd); - free(client); -} - -static void client_hangup_wrapper(struct peer *peer) { - client_hangup((struct client *) peer); -} - -static bool client_hello(int fd, struct serializer *serializer) { - struct buf buf = BUF_INIT; - serializer->serialize(NULL, &buf); - if (buf.length == 0) { - return true; - } - if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) { - return false; - } - return true; -} - -void client_init() { - signal(SIGPIPE, SIG_IGN); -} - -struct serializer *client_get_serializer(char *name) { - for (int i = 0; i < NUM_SERIALIZERS; i++) { - if (strcasecmp(serializers[i].name, name) == 0) { - return &serializers[i]; - } - } - return NULL; -} - -void client_add(int fd, struct serializer *serializer) { - int flags = fcntl(fd, F_GETFL, 0); - assert(flags >= 0); - flags |= O_NONBLOCK; - assert(fcntl(fd, F_SETFL, flags) == 0); - - if (!client_hello(fd, serializer)) { - fprintf(stderr, "C xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello to client\n"); - return; - } - - struct client *client = malloc(sizeof(*client)); - assert(client); - - client->peer.fd = fd; - client->peer.event_handler = client_hangup_wrapper; - - uuid_gen(client->id); - client->serializer = serializer; - client->prev = NULL; - client->next = serializer->client_head; - serializer->client_head = client; - - // Only listen for hangup - peer_epoll_add((struct peer *) client, EPOLLIN); - - fprintf(stderr, "C %s (%s): New client\n", client->id, serializer->name); -} - -void client_add_wrapper(int fd, void *passthrough) { - client_add(fd, (struct serializer *) passthrough); -} - -void client_write(struct packet *packet) { - for (int i = 0; i < NUM_SERIALIZERS; i++) { - struct serializer *serializer = &serializers[i]; - if (serializer->client_head == NULL) { - continue; - } - struct buf buf = BUF_INIT; - serializer->serialize(packet, &buf); - if (buf.length == 0) { - continue; - } - struct client *client = serializer->client_head; - while (client) { - if (write(client->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) { - client = client->next; - } else { - struct client *next = client->next; - client_hangup(client); - client = next; - } - } - } -} - -void client_print_usage() { - fprintf(stderr, "\nSupported output formats:\n"); - for (int i = 0; i < NUM_SERIALIZERS; i++) { - fprintf(stderr, "\t%s\n", serializers[i].name); - } -} diff --git a/client.h b/client.h deleted file mode 100644 index e65d6c1..0000000 --- a/client.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once - -#include "common.h" - -void client_init(); -struct serializer *client_get_serializer(char *); -void client_add(int, struct serializer *); -void client_add_wrapper(int, void *); -void client_write(struct packet *); -void client_print_usage(); diff --git a/common.h b/common.h index 20389de..c70085e 100644 --- a/common.h +++ b/common.h @@ -44,7 +44,6 @@ void buf_consume(struct buf *, size_t); //////// packet #define DATA_LEN_MAX 14 -struct backend; struct packet { enum { MODE_S_SHORT, @@ -54,7 +53,6 @@ struct packet { uint8_t payload[DATA_LEN_MAX]; uint64_t mlat_timestamp; uint32_t rssi; - struct backend *backend; }; extern char *packet_type_names[]; diff --git a/json.c b/json.c index bc3dafc..4918634 100644 --- a/json.c +++ b/json.c @@ -3,8 +3,8 @@ #include #include -#include "backend.h" -#include "client.h" +#include "receive.h" +#include "send.h" #include "json.h" static void json_serialize_to_buf(json_t *obj, struct buf *buf) { @@ -14,7 +14,7 @@ static void json_serialize_to_buf(json_t *obj, struct buf *buf) { } static void json_hello(struct buf *buf) { - json_t *hello = json_pack("{sssIsIsI}", + json_t *hello = json_pack("{sIsIsI}", "mlat_timestamp_mhz", (json_int_t) MLAT_MHZ, "mlat_timestamp_max", (json_int_t) MLAT_MAX, "rssi_max", (json_int_t) RSSI_MAX); diff --git a/outgoing.c b/outgoing.c new file mode 100644 index 0000000..b2e3755 --- /dev/null +++ b/outgoing.c @@ -0,0 +1,105 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "outgoing.h" + +struct outgoing { + struct peer peer; + char id[UUID_LEN]; + const char *node; + const char *service; + struct addrinfo *addrs; + struct addrinfo *addr; + outgoing_connection_handler handler; + void *passthrough; +}; + +static void outgoing_connect_result(struct outgoing *, int); + +static void outgoing_connect_next(struct outgoing *outgoing) { + if (outgoing->addr == NULL) { + freeaddrinfo(outgoing->addrs); + fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service); + return; + } + + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + fprintf(stderr, "O %s: Connecting to %s/%s...\n", outgoing->id, hbuf, sbuf); + + outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK, outgoing->addr->ai_protocol); + assert(outgoing->peer.fd >= 0); + + int result = connect(outgoing->peer.fd, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); + outgoing_connect_result(outgoing, result == 0 ? result : errno); +} + +static void outgoing_connect_handler(struct peer *peer) { + struct outgoing *outgoing = (struct outgoing *) peer; + + peer_epoll_del(peer); + + int error; + socklen_t len = sizeof(error); + assert(getsockopt(outgoing->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0); + outgoing_connect_result(outgoing, error); +} + +static void outgoing_connect_result(struct outgoing *outgoing, int result) { + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); + switch (result) { + case 0: + fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); + freeaddrinfo(outgoing->addrs); + outgoing->handler(outgoing->peer.fd, outgoing->passthrough); + break; + + case EINPROGRESS: + outgoing->peer.event_handler = outgoing_connect_handler; + peer_epoll_add((struct peer *) outgoing, EPOLLOUT); + break; + + default: + fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result)); + close(outgoing->peer.fd); + outgoing->addr = outgoing->addr->ai_next; + // Tail recursion :/ + outgoing_connect_next(outgoing); + break; + } +} + +static void outgoing_resolve(struct outgoing *outgoing) { + fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service); + + struct addrinfo hints = { + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }; + + int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs); + if (gai_err) { + fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err)); + return; + } + outgoing->addr = outgoing->addrs; + outgoing_connect_next(outgoing); +} + +void outgoing_new(const char *node, const char *service, outgoing_connection_handler handler, void *passthrough) { + struct outgoing *outgoing = malloc(sizeof(*outgoing)); + uuid_gen(outgoing->id); + outgoing->node = node; + outgoing->service = service; + outgoing->handler = handler; + outgoing->passthrough = passthrough; + outgoing_resolve(outgoing); +} diff --git a/outgoing.h b/outgoing.h new file mode 100644 index 0000000..847be8c --- /dev/null +++ b/outgoing.h @@ -0,0 +1,4 @@ +#pragma once + +typedef void (*outgoing_connection_handler)(int fd, void *); +void outgoing_new(const char *, const char *, outgoing_connection_handler, void *); diff --git a/receive.c b/receive.c new file mode 100644 index 0000000..a06c5ae --- /dev/null +++ b/receive.c @@ -0,0 +1,106 @@ +#include +#include +#include +#include + +#include "airspy_adsb.h" +#include "beast.h" +#include "raw.h" + +#include "send.h" + +#include "receive.h" + +struct receive; +typedef bool (*parser_wrapper)(struct receive *, struct packet *); +typedef bool (*parser)(struct buf *, struct packet *, void *state); +struct receive { + struct peer peer; + char id[UUID_LEN]; + struct buf buf; + char parser_state[PARSER_STATE_LEN]; + parser_wrapper parser_wrapper; + parser parser; +}; + +struct parser { + char *name; + parser parse; +} parsers[] = { + { + .name = "airspy_adsb", + .parse = airspy_adsb_parse, + }, + { + .name = "beast", + .parse = beast_parse, + }, + { + .name = "raw", + .parse = raw_parse, + }, +}; +#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) + +static bool receive_parse_wrapper(struct receive *receive, struct packet *packet) { + return receive->parser(&receive->buf, packet, receive->parser_state); +} + +static bool receive_autodetect_parse(struct receive *receive, struct packet *packet) { + struct buf *buf = &receive->buf; + void *state = receive->parser_state; + + for (int i = 0; i < NUM_PARSERS; i++) { + if (parsers[i].parse(buf, packet, state)) { + fprintf(stderr, "R %s: Detected input format %s\n", receive->id, parsers[i].name); + receive->parser_wrapper = receive_parse_wrapper; + receive->parser = parsers[i].parse; + return true; + } + } + return false; +} + +static void receive_read(struct peer *peer) { + struct receive *receive = (struct receive *) peer; + + if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { + fprintf(stderr, "R %s: Connection closed by peer\n", receive->id); + close(receive->peer.fd); + // TODO: reconnect + return; + } + + struct packet packet = { 0 }; + while (receive->parser_wrapper(receive, &packet)) { + send_write(&packet); + } + + if (receive->buf.length == BUF_LEN_MAX) { + fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id); + close(receive->peer.fd); + // TODO: reconnect + return; + } +} + +void receive_new(int fd, void *unused) { + struct receive *receive = malloc(sizeof(*receive)); + assert(receive); + uuid_gen(receive->id); + receive->peer.fd = fd; + buf_init(&receive->buf); + memset(receive->parser_state, 0, PARSER_STATE_LEN); + receive->parser_wrapper = receive_autodetect_parse; + receive->peer.event_handler = receive_read; + peer_epoll_add((struct peer *) receive, EPOLLIN); + + fprintf(stderr, "R %s: New connection\n", receive->id); +} + +void receive_print_usage() { + fprintf(stderr, "\nSupported receive formats (autodetected):\n"); + for (int i = 0; i < NUM_PARSERS; i++) { + fprintf(stderr, "\t%s\n", parsers[i].name); + } +} diff --git a/receive.h b/receive.h new file mode 100644 index 0000000..4b6d264 --- /dev/null +++ b/receive.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +#include "common.h" + +#define PARSER_STATE_LEN 256 + +void receive_new(int, void *); +void receive_print_usage(); diff --git a/send.c b/send.c new file mode 100644 index 0000000..52bf204 --- /dev/null +++ b/send.c @@ -0,0 +1,145 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "json.h" +#include "stats.h" +#include "send.h" + +struct send { + struct peer peer; + char id[UUID_LEN]; + struct serializer *serializer; + struct send *prev; + struct send *next; +}; + +typedef void (*serializer)(struct packet *, struct buf *); +struct serializer { + char *name; + serializer serialize; + struct send *send_head; +} serializers[] = { + { + .name = "json", + .serialize = json_serialize, + }, + { + .name = "stats", + .serialize = stats_serialize, + }, +}; +#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) + +static void send_hangup(struct send *send) { + fprintf(stderr, "S %s (%s): Peer disconnected\n", send->id, send->serializer->name); + if (send->prev) { + send->prev->next = send->next; + } else { + send->serializer->send_head = send->next; + } + if (send->next) { + send->next->prev = send->prev; + } + close(send->peer.fd); + free(send); +} + +static void send_hangup_wrapper(struct peer *peer) { + send_hangup((struct send *) peer); +} + +static bool send_hello(int fd, struct serializer *serializer) { + struct buf buf = BUF_INIT; + serializer->serialize(NULL, &buf); + if (buf.length == 0) { + return true; + } + if (write(fd, buf_at(&buf, 0), buf.length) != buf.length) { + return false; + } + return true; +} + +void send_init() { + signal(SIGPIPE, SIG_IGN); +} + +struct serializer *send_get_serializer(char *name) { + for (int i = 0; i < NUM_SERIALIZERS; i++) { + if (strcasecmp(serializers[i].name, name) == 0) { + return &serializers[i]; + } + } + return NULL; +} + +void send_add(int fd, struct serializer *serializer) { + int flags = fcntl(fd, F_GETFL, 0); + assert(flags >= 0); + flags |= O_NONBLOCK; + assert(fcntl(fd, F_SETFL, flags) == 0); + + if (!send_hello(fd, serializer)) { + fprintf(stderr, "S xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx: Failed to write hello\n"); + return; + } + + struct send *send = malloc(sizeof(*send)); + assert(send); + + send->peer.fd = fd; + send->peer.event_handler = send_hangup_wrapper; + + uuid_gen(send->id); + send->serializer = serializer; + send->prev = NULL; + send->next = serializer->send_head; + serializer->send_head = send; + + // Only listen for hangup + peer_epoll_add((struct peer *) send, EPOLLIN); + + fprintf(stderr, "S %s (%s): New connection\n", send->id, serializer->name); +} + +void send_add_wrapper(int fd, void *passthrough) { + send_add(fd, (struct serializer *) passthrough); +} + +void send_write(struct packet *packet) { + for (int i = 0; i < NUM_SERIALIZERS; i++) { + struct serializer *serializer = &serializers[i]; + if (serializer->send_head == NULL) { + continue; + } + struct buf buf = BUF_INIT; + serializer->serialize(packet, &buf); + if (buf.length == 0) { + continue; + } + struct send *send = serializer->send_head; + while (send) { + if (write(send->peer.fd, buf_at(&buf, 0), buf.length) == buf.length) { + send = send->next; + } else { + struct send *next = send->next; + send_hangup(send); + send = next; + } + } + } +} + +void send_print_usage() { + fprintf(stderr, "\nSupported send formats:\n"); + for (int i = 0; i < NUM_SERIALIZERS; i++) { + fprintf(stderr, "\t%s\n", serializers[i].name); + } +} diff --git a/send.h b/send.h new file mode 100644 index 0000000..b3d5fca --- /dev/null +++ b/send.h @@ -0,0 +1,10 @@ +#pragma once + +#include "common.h" + +void send_init(); +struct serializer *send_get_serializer(char *); +void send_add(int, struct serializer *); +void send_add_wrapper(int, void *); +void send_write(struct packet *); +void send_print_usage();