diff --git a/adsbus/Makefile b/adsbus/Makefile index 2c299ac..65b33cf 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -1,18 +1,19 @@ COMP ?= clang DISABLED_WARNINGS ?= -Wno-padded -Wno-disabled-macro-expansion -CFLAGS ?= -Weverything -Werror -O3 -g --std=gnu11 --pedantic-errors -fPIE -fstack-protector-strong -pthread -D_GNU_SOURCE $(DISABLED_WARNINGS) +CFLAGS ?= -Weverything -Werror -O3 -g --std=gnu11 --pedantic-errors -fPIE -fstack-protector-strong -pthread -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 $(DISABLED_WARNINGS) LDFLAGS ?= $(CFLAGS) -Wl,-z,relro -Wl,-z,now -pie LIBS ?= -ljansson -lprotobuf-c TESTCASE_DIR ?= testcase TESTOUT_DIR ?= testout VALGRIND ?= valgrind -VALGRIND_FLAGS ?= --error-exitcode=1 --track-fds=yes --show-leak-kinds=all --leak-check=full +VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats -OBJ_NETWORK = incoming.o outgoing.o receive.o send.o +OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o +OBJ_FLOW = flow.o receive.o send.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o -OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o +OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o all: adsbus @@ -26,8 +27,8 @@ clean: adsb.pb-c.c: ../proto/adsb.proto protoc-c --c_out=./ --proto_path=$(dir $<) $< -adsbus: adsbus.o $(OBJ_NETWORK) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) - $(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_NETWORK) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS) +adsbus: adsbus.o $(OBJ_TRANSPORT) $(OBJ_FLOW) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) + $(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_TRANSPORT) $(OBJ_FLOW) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS) fuzz: rm -rf findings diff --git a/adsbus/README.md b/adsbus/README.md index f821924..7877616 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -2,12 +2,58 @@ adsbus is a hub and protocol translator for [ADS-B](https://en.wikipedia.org/wiki/Automatic_dependent_surveillance_%E2%80%93_broadcast) messages. +It is conceptually similar to `dump1090 --net-only`, but supports more protocols and configurations. It doesn't talk to your radio itself; it +hooks programs that do, then handles the network distribution and format translation. It doesn't output to a web interface or send data to +services like FlightAware; it provides hooks for programs that do. + ## Building ```bash -sudo apt-get -y install uuid-dev libjansson-dev +sudo apt-get -y install build-essential git clang libjansson-dev libprotobuf-c-dev protobuf-c-compiler git clone https://github.com/flamingcowtv/adsb-tools.git -cd adsb-tools +cd adsb-tools/adsbus make ``` + + +## Features + +* Separates the concepts of transport, data flow, and format +* Transports: + * Outgoing TCP connection + * Incoming TCP connection + * Local files or [named pipes](https://en.wikipedia.org/wiki/Named_pipe) + * [stdin/stdout](https://en.wikipedia.org/wiki/Standard_streams) + * Execute a command and talk to its stdin/stdout +* Data flows: + * Send (data flows out of adsbus) + * Receive (data flows in to adsbus) +* Formats: + * [airspy_adsb](../protocols/airspy_adsb.md) (a.k.a. ASAVR) + * [beast](../protocols/beast.md) + * [json](../protocols/json.md) + * [proto](../protocols/proto.md) (a.k.a. ProtoBuf, Protocol Buffers) + * [raw](../protocols/raw.md) (a.k.a. AVR) + * stats (send only, summary aggregated data) +* Transport features: + * [IPv4](https://en.wikipedia.org/wiki/IPv4) and [IPv6](https://en.wikipedia.org/wiki/IPv6) support + * Reresolution and reconnection on disconnect, with backoff and jitter + * [TCP keepalives](https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive) for dead connection detection + * [TCP fast open](https://en.wikipedia.org/wiki/TCP_Fast_Open) for faster startup of high-latency connections + * [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates +* Data flow features: + * Rapid detection and disconnection of receive <-> receive connections + * Less rapid detection and disconnection of send <-> send connections +* Format features: + * Autodetection of received data format + * [MLAT](https://en.wikipedia.org/wiki/Multilateration) scaling for different clock rates and counter bit widths + * [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) scaling for different bit widths + * Introduces json format for balanced human and machine readability with forward compatibility + * Introduces proto format for fast serialization and deserialization with forward compatibility +* Federation: + * Federation allows linking multiple instances of adsbus for: + * Scalability (cores, number of input or output clients, etc.) + * Efficient long-haul links (hub and spoke models on both ends) + * json and proto formats carry information about original source across multiple hops + * SO_REUSEPORT allows multiple adsbus instances to accept connections on the same IP and port without a load balancer diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 7357a60..0a96bbd 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -6,6 +6,7 @@ #include #include "beast.h" +#include "exec.h" #include "hex.h" #include "incoming.h" #include "json.h" @@ -32,6 +33,11 @@ static void print_usage(const char *name) { "\t--connect-send=FORMAT=HOST/PORT\n" "\t--listen-receive=[HOST/]PORT\n" "\t--listen-send=FORMAT=[HOST/]PORT\n" + "\t--file-read=PATH\n" + "\t--file-write=FORMAT=PATH\n" + "\t--file-append=FORMAT=PATH\n" + "\t--exec-receive=COMMAND\n" + "\t--exec-send=FORMAT=COMMAND\n" "\t--stdin\n" "\t--stdout=FORMAT\n" , name); @@ -45,6 +51,11 @@ static bool parse_opts(int argc, char *argv[]) { {"connect-send", required_argument, 0, 's'}, {"listen-receive", required_argument, 0, 'l'}, {"listen-send", required_argument, 0, 'm'}, + {"file-read", required_argument, 0, 'r'}, + {"file-write", required_argument, 0, 'w'}, + {"file-append", required_argument, 0, 'a'}, + {"exec-receive", required_argument, 0, 'e'}, + {"exec-send", required_argument, 0, 'f'}, {"stdin", no_argument, 0, 'i'}, {"stdout", required_argument, 0, 'o'}, {"help", no_argument, 0, 'h'}, @@ -71,6 +82,26 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_listen_send; break; + case 'r': + handler = opts_add_file_read; + break; + + case 'w': + handler = opts_add_file_write; + break; + + case 'a': + handler = opts_add_file_append; + break; + + case 'e': + handler = opts_add_exec_receive; + break; + + case 'f': + handler = opts_add_exec_send; + break; + case 'i': handler = opts_add_stdin; break; @@ -133,6 +164,7 @@ int main(int argc, char *argv[]) { send_cleanup(); incoming_cleanup(); outgoing_cleanup(); + exec_cleanup(); json_cleanup(); proto_cleanup(); diff --git a/adsbus/asyncaddrinfo.c b/adsbus/asyncaddrinfo.c new file mode 100644 index 0000000..c2578c4 --- /dev/null +++ b/adsbus/asyncaddrinfo.c @@ -0,0 +1,126 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "asyncaddrinfo.h" + +struct asyncaddrinfo_resolution { + int return_fd; + + char *node; + char *service; + struct addrinfo _hints, *hints; + + int err; + struct addrinfo *addrs; +}; + +static size_t asyncaddrinfo_num_threads; +static pthread_t *asyncaddrinfo_threads = NULL; +static int asyncaddrinfo_write_fd; + +static void *asyncaddrinfo_main(void *arg) { + int fd = (int) (intptr_t) arg; + struct asyncaddrinfo_resolution *res; + ssize_t len; + while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) { + res->err = getaddrinfo(res->node, res->service, res->hints, &res->addrs); + int return_fd = res->return_fd; + res->return_fd = -1; + assert(send(return_fd, &res, sizeof(res), MSG_EOR) == sizeof(res)); + // Main thread now owns res + assert(!close(return_fd)); + } + assert(!len); + assert(!close(fd)); + return NULL; +} + +static void asyncaddrinfo_del(struct asyncaddrinfo_resolution *res) { + if (res->node) { + free(res->node); + res->node = NULL; + } + if (res->service) { + free(res->service); + res->service = NULL; + } + free(res); +} + +void asyncaddrinfo_init(size_t threads) { + assert(!asyncaddrinfo_threads); + + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); + asyncaddrinfo_write_fd = fds[1]; + + asyncaddrinfo_num_threads = threads; + asyncaddrinfo_threads = malloc(asyncaddrinfo_num_threads * sizeof(*asyncaddrinfo_threads)); + assert(asyncaddrinfo_threads); + + for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { + int subfd = fcntl(fds[0], F_DUPFD_CLOEXEC, 0); + assert(subfd >= 0); + assert(!pthread_create(&asyncaddrinfo_threads[i], NULL, asyncaddrinfo_main, (void *) (intptr_t) subfd)); + } + assert(!close(fds[0])); +} + +void asyncaddrinfo_cleanup() { + assert(asyncaddrinfo_threads); + assert(!close(asyncaddrinfo_write_fd)); + asyncaddrinfo_write_fd = -1; + for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { + assert(!pthread_join(asyncaddrinfo_threads[i], NULL)); + } + free(asyncaddrinfo_threads); + asyncaddrinfo_threads = NULL; +} + +int asyncaddrinfo_resolve(const char *node, const char *service, const struct addrinfo *hints) { + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); + + struct asyncaddrinfo_resolution *res = malloc(sizeof(*res)); + assert(res); + res->return_fd = fds[1]; + if (node) { + res->node = strdup(node); + assert(res->node); + } else { + res->node = NULL; + } + if (service) { + res->service = strdup(service); + assert(res->service); + } else { + res->service = NULL; + } + if (hints) { + memcpy(&res->_hints, hints, sizeof(res->_hints)); + res->hints = &res->_hints; + } else { + res->hints = NULL; + } + assert(send(asyncaddrinfo_write_fd, &res, sizeof(res), MSG_EOR) == sizeof(res)); + // Resolve thread now owns res + + return fds[0]; +} + +int asyncaddrinfo_result(int fd, struct addrinfo **addrs) { + struct asyncaddrinfo_resolution *res; + assert(recv(fd, &res, sizeof(res), 0) == sizeof(res)); + assert(!close(fd)); + *addrs = res->addrs; + int err = res->err; + asyncaddrinfo_del(res); + return err; +} diff --git a/adsbus/asyncaddrinfo.h b/adsbus/asyncaddrinfo.h new file mode 100644 index 0000000..bd42d09 --- /dev/null +++ b/adsbus/asyncaddrinfo.h @@ -0,0 +1,8 @@ +#pragma once + +struct addrinfo; + +void asyncaddrinfo_init(size_t threads); +void asyncaddrinfo_cleanup(void); +int asyncaddrinfo_resolve(const char *node, const char *service, const struct addrinfo *hints); +int asyncaddrinfo_result(int fd, struct addrinfo **addrs); diff --git a/adsbus/exec.c b/adsbus/exec.c new file mode 100644 index 0000000..426fa0e --- /dev/null +++ b/adsbus/exec.c @@ -0,0 +1,135 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "buf.h" +#include "flow.h" +#include "list.h" +#include "peer.h" +#include "uuid.h" +#include "wakeup.h" + +#include "exec.h" + +struct exec { + struct peer peer; + uint8_t id[UUID_LEN]; + char *command; + struct flow *flow; + void *passthrough; + pid_t child; + struct list_head exec_list; +}; + +static struct list_head exec_head = LIST_HEAD_INIT(exec_head); + +static void exec_spawn_wrapper(struct peer *); + +static void exec_del(struct exec *exec) { + (*exec->flow->ref_count)--; + if (exec->child > 0) { + fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child); + // Racy with the process terminating, so don't assert on it + kill(exec->child, SIGTERM); + assert(waitpid(exec->child, NULL, 0) == exec->child); + } + free(exec->command); + free(exec); +} + +static void exec_close_handler(struct peer *peer) { + struct exec *exec = (struct exec *) peer; + int status; + assert(waitpid(exec->child, &status, WNOHANG) == exec->child); + exec->child = -1; + if (WIFEXITED(status)) { + fprintf(stderr, "E %s: Client exited with status %d\n", exec->id, WEXITSTATUS(status)); + } else { + assert(WIFSIGNALED(status)); + fprintf(stderr, "E %s: Client exited with signal %d\n", exec->id, WTERMSIG(status)); + } + uint32_t delay = wakeup_get_retry_delay_ms(1); + fprintf(stderr, "E %s: Will retry in %ds\n", exec->id, delay / 1000); + exec->peer.event_handler = exec_spawn_wrapper; + wakeup_add((struct peer *) exec, delay); +} + +static void exec_parent(struct exec *exec, pid_t child, int fd) { + exec->child = child; + fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child); + + if (!flow_hello(fd, exec->flow, exec->passthrough)) { + assert(!close(fd)); + exec_close_handler((struct peer *) exec); + return; + } + + exec->peer.event_handler = exec_close_handler; + exec->flow->new(fd, exec->passthrough, (struct peer *) exec); +} + +static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { + assert(setsid() != -1); + // We leave stderr open from child to parent + // Other than that, fds should have CLOEXEC set + if (fd != 0) { + assert(dup2(fd, 0) == 0); + } + if (fd != 1) { + assert(dup2(fd, 1) == 1); + } + if (fd != 0 && fd != 1) { + assert(!close(fd)); + } + assert(!execl("/bin/sh", "sh", "-c", exec->command, NULL)); + abort(); +} + +static void exec_spawn(struct exec *exec) { + fprintf(stderr, "E %s: Executing: %s\n", exec->id, exec->command); + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds)); + + int res = fork(); + assert(res >= 0); + if (res) { + assert(!close(fds[1])); + exec_parent(exec, res, fds[0]); + } else { + assert(!close(fds[0])); + exec_child(exec, fds[1]); + } +} + +static void exec_spawn_wrapper(struct peer *peer) { + struct exec *exec = (struct exec *) peer; + exec_spawn(exec); +} + +void exec_cleanup() { + struct exec *iter, *next; + list_for_each_entry_safe(iter, next, &exec_head, exec_list) { + exec_del(iter); + } +} + +void exec_new(char *command, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; + + struct exec *exec = malloc(sizeof(*exec)); + exec->peer.fd = -1; + uuid_gen(exec->id); + exec->command = strdup(command); + exec->flow = flow; + exec->passthrough = passthrough; + + list_add(&exec->exec_list, &exec_head); + + exec_spawn(exec); +} diff --git a/adsbus/exec.h b/adsbus/exec.h new file mode 100644 index 0000000..0c8c525 --- /dev/null +++ b/adsbus/exec.h @@ -0,0 +1,6 @@ +#pragma once + +struct flow; + +void exec_cleanup(void); +void exec_new(char *, struct flow *, void *); diff --git a/adsbus/file.c b/adsbus/file.c new file mode 100644 index 0000000..7844fac --- /dev/null +++ b/adsbus/file.c @@ -0,0 +1,43 @@ +#include +#include +#include +#include + +#include "flow.h" +#include "receive.h" +#include "send.h" + +#include "file.h" + +void file_fd_new(int fd, struct flow *flow, void *passthrough) { + flow->new(fd, passthrough, NULL); + // TODO: log error; retry? + flow_hello(fd, flow, passthrough); +} + +void file_read_new(char *path, struct flow *flow, void *passthrough) { + int fd = open(path, O_RDONLY | O_CLOEXEC); + if (fd == -1) { + // TODO: log error; retry? + return; + } + file_fd_new(fd, flow, passthrough); +} + +void file_write_new(char *path, struct flow *flow, void *passthrough) { + int fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC | O_CLOEXEC, S_IRUSR | S_IWUSR); + if (fd == -1) { + // TODO: log error; retry? + return; + } + file_fd_new(fd, flow, passthrough); +} + +void file_append_new(char *path, struct flow *flow, void *passthrough) { + int fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_CLOEXEC, S_IRWXU); + if (fd == -1) { + // TODO: log error; retry? + return; + } + flow->new(fd, flow, passthrough); +} diff --git a/adsbus/file.h b/adsbus/file.h new file mode 100644 index 0000000..7053e91 --- /dev/null +++ b/adsbus/file.h @@ -0,0 +1,8 @@ +#pragma once + +struct flow; + +void file_fd_new(int, struct flow *, void *); +void file_read_new(char *, struct flow *, void *); +void file_write_new(char *, struct flow *, void *); +void file_append_new(char *, struct flow *, void *); diff --git a/adsbus/flow.c b/adsbus/flow.c new file mode 100644 index 0000000..b7db29c --- /dev/null +++ b/adsbus/flow.c @@ -0,0 +1,25 @@ +#include + +#include "buf.h" +#include "socket.h" + +#include "flow.h" + +void flow_socket_connected(int fd, struct flow *flow) { + socket_connected(fd); + if (flow->socket_connected) { + flow->socket_connected(fd); + } +} + +bool flow_hello(int fd, struct flow *flow, void *passthrough) { + if (!flow->get_hello) { + return true; + } + struct buf buf = BUF_INIT, *buf_ptr = &buf; + flow->get_hello(&buf_ptr, passthrough); + if (!buf_ptr->length) { + return true; + } + return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); +} diff --git a/adsbus/flow.h b/adsbus/flow.h new file mode 100644 index 0000000..ddacf0a --- /dev/null +++ b/adsbus/flow.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +struct buf; +struct peer; + +struct flow { + const char *name; + void (*socket_connected)(int); + void (*new)(int, void *, struct peer *); + void (*get_hello)(struct buf **, void *); + uint32_t *ref_count; +}; + +void flow_socket_connected(int, struct flow *); +bool flow_hello(int, struct flow *, void *); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 70e4a89..6ee0c54 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "resolve.h" @@ -23,13 +24,9 @@ struct incoming { uint8_t id[UUID_LEN]; char *node; char *service; - struct addrinfo *addrs; - const char *error; uint32_t attempt; - incoming_connection_handler handler; - incoming_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; struct list_head incoming_list; }; @@ -44,18 +41,6 @@ static void incoming_retry(struct incoming *incoming) { wakeup_add((struct peer *) incoming, delay); } -static bool incoming_hello(int fd, struct incoming *incoming) { - if (!incoming->hello) { - return true; - } - struct buf buf = BUF_INIT, *buf_ptr = &buf; - incoming->hello(&buf_ptr, incoming->passthrough); - if (!buf_ptr->length) { - return true; - } - return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); -} - static void incoming_handler(struct peer *peer) { struct incoming *incoming = (struct incoming *) peer; @@ -79,19 +64,17 @@ static void incoming_handler(struct peer *peer) { local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); - socket_connected_init(fd); - - if (!incoming_hello(fd, incoming)) { + if (!flow_hello(fd, incoming->flow, incoming->passthrough)) { fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); assert(!close(fd)); return; } - incoming->handler(fd, incoming->passthrough, NULL); + incoming->flow->new(fd, incoming->passthrough, NULL); } static void incoming_del(struct incoming *incoming) { - (*incoming->count)--; + (*incoming->flow->ref_count)--; if (incoming->peer.fd >= 0) { assert(!close(incoming->peer.fd)); } @@ -100,9 +83,19 @@ static void incoming_del(struct incoming *incoming) { free(incoming); } -static void incoming_listen(struct incoming *incoming) { +static void incoming_listen(struct peer *peer) { + struct incoming *incoming = (struct incoming *) peer; + + struct addrinfo *addrs; + int err = resolve_result(peer, &addrs); + if (err) { + fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(err)); + incoming_retry(incoming); + return; + } + struct addrinfo *addr; - for (addr = incoming->addrs; addr; addr = addr->ai_next) { + for (addr = addrs; addr; addr = addr->ai_next) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf); @@ -110,8 +103,7 @@ static void incoming_listen(struct incoming *incoming) { incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol); assert(incoming->peer.fd >= 0); - int optval = 1; - setsockopt(incoming->peer.fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); + socket_pre_bind(incoming->peer.fd); if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); @@ -119,13 +111,15 @@ static void incoming_listen(struct incoming *incoming) { continue; } - socket_bound_init(incoming->peer.fd); + socket_pre_listen(incoming->peer.fd); + // Options are inherited through accept() + flow_socket_connected(incoming->peer.fd, incoming->flow); assert(listen(incoming->peer.fd, 255) == 0); break; } - freeaddrinfo(incoming->addrs); + freeaddrinfo(addrs); if (addr == NULL) { fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service); @@ -138,21 +132,10 @@ static void incoming_listen(struct incoming *incoming) { peer_epoll_add((struct peer *) incoming, EPOLLIN); } -static void incoming_resolve_handler(struct peer *peer) { - struct incoming *incoming = (struct incoming *) peer; - if (incoming->addrs) { - incoming_listen(incoming); - } else { - fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, incoming->error); - incoming_retry(incoming); - } -} - static void incoming_resolve(struct incoming *incoming) { fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service); - incoming->peer.fd = -1; - incoming->peer.event_handler = incoming_resolve_handler; - resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE, &incoming->addrs, &incoming->error); + incoming->peer.event_handler = incoming_listen; + resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE); } static void incoming_resolve_wrapper(struct peer *peer) { @@ -166,19 +149,17 @@ void incoming_cleanup() { } } -void incoming_new(char *node, char *service, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void incoming_new(char *node, char *service, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; uuid_gen(incoming->id); - incoming->node = strdup(node); + incoming->node = node ? strdup(node) : NULL; incoming->service = strdup(service); incoming->attempt = 0; - incoming->handler = handler; - incoming->hello = hello; + incoming->flow = flow; incoming->passthrough = passthrough; - incoming->count = count; list_add(&incoming->incoming_list, &incoming_head); diff --git a/adsbus/incoming.h b/adsbus/incoming.h index a6978ca..4691c1d 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -1,11 +1,6 @@ #pragma once -#include - -struct buf; -struct peer; +struct flow; void incoming_cleanup(void); -typedef void (*incoming_connection_handler)(int fd, void *, struct peer *); -typedef void (*incoming_get_hello)(struct buf **, void *); -void incoming_new(char *, char *, incoming_connection_handler, incoming_get_hello, void *, uint32_t *); +void incoming_new(char *, char *, struct flow *, void *); diff --git a/adsbus/opts.c b/adsbus/opts.c index 3a833e9..a4ba5c6 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -1,11 +1,15 @@ +#include +#include #include #include +#include +#include #include -#include "buf.h" +#include "exec.h" +#include "file.h" #include "incoming.h" #include "outgoing.h" -#include "peer.h" #include "receive.h" #include "send.h" @@ -21,13 +25,13 @@ static char *opts_split(char **arg, char delim) { return ret; } -static void opts_add_listen(char *host_port, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) { +static void opts_add_listen(char *host_port, struct flow *flow, void *passthrough) { char *host = opts_split(&host_port, '/'); if (host) { - incoming_new(host, host_port, handler, hello, passthrough, count); + incoming_new(host, host_port, flow, passthrough); free(host); } else { - incoming_new(NULL, host_port, handler, hello, passthrough, count); + incoming_new(NULL, host_port, flow, passthrough); } } @@ -52,7 +56,7 @@ bool opts_add_connect_receive(char *arg) { return false; } - outgoing_new(host, arg, receive_new, NULL, NULL, &peer_count_in); + outgoing_new(host, arg, receive_flow, NULL); free(host); return true; } @@ -68,13 +72,13 @@ bool opts_add_connect_send(char *arg) { return false; } - outgoing_new(host, arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + outgoing_new(host, arg, send_flow, serializer); free(host); return true; } bool opts_add_listen_receive(char *arg) { - opts_add_listen(arg, receive_new, NULL, NULL, &peer_count_in); + opts_add_listen(arg, receive_flow, NULL); return true; } @@ -84,12 +88,54 @@ bool opts_add_listen_send(char *arg) { return false; } - opts_add_listen(arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + opts_add_listen(arg, send_flow, serializer); + return true; +} + +bool opts_add_file_read(char *arg) { + file_read_new(arg, receive_flow, NULL); + return true; +} + +bool opts_add_file_write(char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { + return false; + } + + file_write_new(arg, send_flow, serializer); + return true; +} + +bool opts_add_file_append(char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { + return false; + } + + file_append_new(arg, send_flow, serializer); + return true; +} + +bool opts_add_exec_receive(char *arg) { + exec_new(arg, receive_flow, NULL); + return true; +} + +bool opts_add_exec_send(char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { + return false; + } + + exec_new(arg, send_flow, serializer); return true; } bool opts_add_stdin(char __attribute__((unused)) *arg) { - receive_new(dup(0), NULL, NULL); + int fd = fcntl(0, F_DUPFD_CLOEXEC, 0); + assert(fd >= 0); + file_fd_new(fd, receive_flow, NULL); return true; } @@ -98,17 +144,8 @@ bool opts_add_stdout(char *arg) { if (!serializer) { return false; } - int fd = dup(1); - { - // TODO: move into standard location for non-socket fd handling - struct buf buf = BUF_INIT, *buf_ptr = &buf; - send_hello(&buf_ptr, serializer); - if (buf_ptr->length) { - if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) { - return false; - } - } - } - send_new(fd, serializer, NULL); + int fd = fcntl(1, F_DUPFD_CLOEXEC, 0); + assert(fd >= 0); + file_fd_new(fd, send_flow, serializer); return true; } diff --git a/adsbus/opts.h b/adsbus/opts.h index b1f4c42..e6a9013 100644 --- a/adsbus/opts.h +++ b/adsbus/opts.h @@ -6,5 +6,10 @@ bool opts_add_connect_receive(char *); bool opts_add_connect_send(char *); bool opts_add_listen_receive(char *); bool opts_add_listen_send(char *); +bool opts_add_file_read(char *); +bool opts_add_file_write(char *); +bool opts_add_file_append(char *); +bool opts_add_exec_receive(char *); +bool opts_add_exec_send(char *); bool opts_add_stdout(char *); bool opts_add_stdin(char *); diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 37feac2..9f30c40 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "resolve.h" @@ -25,12 +26,9 @@ struct outgoing { char *service; struct addrinfo *addrs; struct addrinfo *addr; - const char *error; uint32_t attempt; - outgoing_connection_handler handler; - outgoing_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; struct list_head outgoing_list; }; @@ -41,7 +39,7 @@ static void outgoing_resolve(struct outgoing *); static void outgoing_resolve_wrapper(struct peer *); static void outgoing_retry(struct outgoing *outgoing) { - uint32_t delay = wakeup_get_retry_delay_ms(outgoing->attempt++); + uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt); fprintf(stderr, "O %s: Will retry in %ds\n", outgoing->id, delay / 1000); outgoing->peer.event_handler = outgoing_resolve_wrapper; wakeup_add((struct peer *) outgoing, delay); @@ -63,11 +61,11 @@ static void outgoing_connect_next(struct outgoing *outgoing) { assert(outgoing->peer.fd >= 0); struct buf buf = BUF_INIT, *buf_ptr = &buf; - if (outgoing->hello) { - outgoing->hello(&buf_ptr, outgoing->passthrough); + if (outgoing->flow->get_hello) { + outgoing->flow->get_hello(&buf_ptr, outgoing->passthrough); } - int result = (int) sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); - outgoing_connect_result(outgoing, result == 0 ? result : errno); + ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); + outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno); } static void outgoing_connect_handler(struct peer *peer) { @@ -97,12 +95,12 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { case 0: fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); freeaddrinfo(outgoing->addrs); - socket_connected_init(outgoing->peer.fd); + flow_socket_connected(outgoing->peer.fd, outgoing->flow); outgoing->attempt = 0; int fd = outgoing->peer.fd; outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_disconnect_handler; - outgoing->handler(fd, outgoing->passthrough, (struct peer *) outgoing); + outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing); break; case EINPROGRESS: @@ -123,20 +121,20 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { static void outgoing_resolve_handler(struct peer *peer) { struct outgoing *outgoing = (struct outgoing *) peer; - if (outgoing->addrs) { + int err = resolve_result(peer, &outgoing->addrs); + if (err) { + fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(err)); + outgoing_retry(outgoing); + } else { outgoing->addr = outgoing->addrs; outgoing_connect_next(outgoing); - } else { - fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, outgoing->error); - outgoing_retry(outgoing); } } static void outgoing_resolve(struct outgoing *outgoing) { fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service); - outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_resolve_handler; - resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0, &outgoing->addrs, &outgoing->error); + resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0); } static void outgoing_resolve_wrapper(struct peer *peer) { @@ -144,7 +142,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) { } static void outgoing_del(struct outgoing *outgoing) { - (*outgoing->count)--; + (*outgoing->flow->ref_count)--; if (outgoing->peer.fd >= 0) { assert(!close(outgoing->peer.fd)); } @@ -160,18 +158,16 @@ void outgoing_cleanup() { } } -void outgoing_new(char *node, char *service, outgoing_connection_handler handler, outgoing_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void outgoing_new(char *node, char *service, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct outgoing *outgoing = malloc(sizeof(*outgoing)); uuid_gen(outgoing->id); outgoing->node = strdup(node); outgoing->service = strdup(service); outgoing->attempt = 0; - outgoing->handler = handler; - outgoing->hello = hello; + outgoing->flow = flow; outgoing->passthrough = passthrough; - outgoing->count = count; list_add(&outgoing->outgoing_list, &outgoing_head); diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index a669b94..c39fdc5 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -1,9 +1,6 @@ #pragma once -struct buf; -struct peer; +struct flow; void outgoing_cleanup(void); -typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *); -typedef void (*outgoing_get_hello)(struct buf **, void *); -void outgoing_new(char *, char *, outgoing_connection_handler, outgoing_get_hello, void *, uint32_t *); +void outgoing_new(char *, char *, struct flow *, void *); diff --git a/adsbus/rand.c b/adsbus/rand.c index c7027ec..82d6563 100644 --- a/adsbus/rand.c +++ b/adsbus/rand.c @@ -16,7 +16,7 @@ static struct buf rand_buf = BUF_INIT; static int rand_fd; void rand_init() { - rand_fd = open("/dev/urandom", O_RDONLY); + rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC); assert(rand_fd >= 0); assert(read(rand_fd, buf_at(&rand_buf, 0), BUF_LEN_MAX) == BUF_LEN_MAX); rand_buf.length = BUF_LEN_MAX; diff --git a/adsbus/receive.c b/adsbus/receive.c index 8076d3b..3537139 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -9,11 +9,13 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "flow.h" #include "json.h" #include "packet.h" #include "peer.h" #include "proto.h" #include "raw.h" +#include "socket.h" #include "send.h" #include "uuid.h" @@ -35,6 +37,15 @@ struct receive { }; static struct receive *receive_head = NULL; +static void receive_new(int, void *, struct peer *); + +static struct flow _receive_flow = { + .name = "receive", + .new = receive_new, + .ref_count = &peer_count_in, +}; +struct flow *receive_flow = &_receive_flow; + static struct parser { char *name; parser parse; @@ -123,17 +134,10 @@ static void receive_read(struct peer *peer) { } } -void receive_cleanup() { - while (receive_head) { - receive_del(receive_head); - } -} - -void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { +static void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { peer_count_in++; - int res = shutdown(fd, SHUT_WR); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + socket_receive(fd); struct receive *receive = malloc(sizeof(*receive)); assert(receive); @@ -156,6 +160,12 @@ void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer fprintf(stderr, "R %s: New receive connection\n", receive->id); } +void receive_cleanup() { + while (receive_head) { + receive_del(receive_head); + } +} + void receive_print_usage() { fprintf(stderr, "\nSupported receive formats (auto-detected):\n"); for (size_t i = 0; i < NUM_PARSERS; i++) { diff --git a/adsbus/receive.h b/adsbus/receive.h index 1c7bd92..9652f2c 100644 --- a/adsbus/receive.h +++ b/adsbus/receive.h @@ -1,11 +1,9 @@ #pragma once -#include - #define PARSER_STATE_LEN 256 -struct peer; +struct flow; void receive_cleanup(void); -void receive_new(int, void *, struct peer *); void receive_print_usage(void); +extern struct flow *receive_flow; diff --git a/adsbus/resolve.c b/adsbus/resolve.c index 1b20a9c..35d4864 100644 --- a/adsbus/resolve.c +++ b/adsbus/resolve.c @@ -1,93 +1,33 @@ -#include -#include +#include +#include #include -#include -#include -#include +#include "asyncaddrinfo.h" #include "peer.h" #include "resolve.h" -struct resolve_request { - int fd; - const char *node; - const char *service; - int flags; - struct addrinfo **addrs; - const char **error; -}; - -struct resolve_peer { - struct peer peer; - struct peer *inner_peer; -}; - -static pthread_t resolve_thread; -static int resolve_write_fd; - -static void resolve_handler(struct peer *peer) { - struct resolve_peer *resolve_peer = (struct resolve_peer *) peer; - - assert(!close(resolve_peer->peer.fd)); - - peer_call(resolve_peer->inner_peer); - free(resolve_peer); -} - -static void *resolve_main(void *arg) { - int fd = (int) (intptr_t) arg; - struct resolve_request *request; - ssize_t ret; - while ((ret = read(fd, &request, sizeof(request))) == sizeof(request)) { - struct addrinfo hints = { - .ai_family = AF_UNSPEC, - .ai_socktype = SOCK_STREAM, - .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | request->flags, - }; - int err = getaddrinfo(request->node, request->service, &hints, request->addrs); - if (err) { - *request->addrs = NULL; - *request->error = gai_strerror(err); - } else { - *request->error = NULL; - } - close(request->fd); - free(request); - } - assert(!ret); - assert(!close(fd)); - return NULL; -} - void resolve_init() { - int fds[2]; - assert(!pipe2(fds, O_CLOEXEC)); - resolve_write_fd = fds[1]; - assert(!pthread_create(&resolve_thread, NULL, resolve_main, (void *) (intptr_t) fds[0])); + asyncaddrinfo_init(2); } void resolve_cleanup() { - assert(!close(resolve_write_fd)); - assert(!pthread_join(resolve_thread, NULL)); + asyncaddrinfo_cleanup(); } -void resolve(struct peer *peer, const char *node, const char *service, int flags, struct addrinfo **addrs, const char **error) { - int fds[2]; - assert(!pipe2(fds, O_CLOEXEC)); +void resolve(struct peer *peer, const char *node, const char *service, int flags) { + struct addrinfo hints = { + .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags, + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }; - struct resolve_request *request = malloc(sizeof(*request)); - request->fd = fds[1]; - request->node = node; - request->service = service; - request->flags = flags; - request->addrs = addrs; - request->error = error; - assert(write(resolve_write_fd, &request, sizeof(request)) == sizeof(request)); - - struct resolve_peer *resolve_peer = malloc(sizeof(*resolve_peer)); - resolve_peer->peer.fd = fds[0]; - resolve_peer->peer.event_handler = resolve_handler; - resolve_peer->inner_peer = peer; - peer_epoll_add((struct peer *) resolve_peer, EPOLLRDHUP); + peer->fd = asyncaddrinfo_resolve(node, service, &hints); + peer_epoll_add(peer, EPOLLIN); +} + +int resolve_result(struct peer *peer, struct addrinfo **addrs) { + int err = asyncaddrinfo_result(peer->fd, addrs); + peer->fd = -1; + return err; } diff --git a/adsbus/resolve.h b/adsbus/resolve.h index 92afb8c..8e91df5 100644 --- a/adsbus/resolve.h +++ b/adsbus/resolve.h @@ -5,4 +5,5 @@ struct addrinfo; void resolve_init(void); void resolve_cleanup(void); -void resolve(struct peer *, const char *, const char *, int, struct addrinfo **, const char **); +void resolve(struct peer *, const char *, const char *, int); +int resolve_result(struct peer *, struct addrinfo **addrs); diff --git a/adsbus/send.c b/adsbus/send.c index 551cc0d..326c365 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -12,12 +12,14 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "flow.h" #include "json.h" #include "list.h" #include "packet.h" #include "peer.h" #include "proto.h" #include "raw.h" +#include "socket.h" #include "stats.h" #include "uuid.h" @@ -31,6 +33,18 @@ struct send { struct list_head send_list; }; +static void send_new(int, void *, struct peer *); +static void send_get_hello(struct buf **, void *); + +static struct flow _send_flow = { + .name = "send", + .socket_connected = socket_connected_send, + .new = send_new, + .get_hello = send_get_hello, + .ref_count = &peer_count_out, +}; +struct flow *send_flow = &_send_flow; + typedef void (*serialize)(struct packet *, struct buf *); typedef void (*hello)(struct buf **); static struct serializer { @@ -86,36 +100,12 @@ static void send_del_wrapper(struct peer *peer) { send_del((struct send *) peer); } -void send_init() { - assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR); - for (size_t i = 0; i < NUM_SERIALIZERS; i++) { - list_head_init(&serializers[i].send_head); - } -} +static void send_new(int fd, void *passthrough, struct peer *on_close) { + struct serializer *serializer = (struct serializer *) passthrough; -void send_cleanup() { - for (size_t i = 0; i < NUM_SERIALIZERS; i++) { - struct send *iter, *next; - list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) { - send_del(iter); - } - } -} - -struct serializer *send_get_serializer(char *name) { - for (size_t i = 0; i < NUM_SERIALIZERS; i++) { - if (strcasecmp(serializers[i].name, name) == 0) { - return &serializers[i]; - } - } - return NULL; -} - -void send_new(int fd, struct serializer *serializer, struct peer *on_close) { peer_count_out++; - int res = shutdown(fd, SHUT_RD); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + socket_send(fd); struct send *send = malloc(sizeof(*send)); assert(send); @@ -132,17 +122,38 @@ void send_new(int fd, struct serializer *serializer, struct peer *on_close) { fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name); } -void send_new_wrapper(int fd, void *passthrough, struct peer *on_close) { - send_new(fd, (struct serializer *) passthrough, on_close); -} - -void send_hello(struct buf **buf_pp, void *passthrough) { +static void send_get_hello(struct buf **buf_pp, void *passthrough) { struct serializer *serializer = (struct serializer *) passthrough; if (serializer->hello) { serializer->hello(buf_pp); } } +void send_init() { + assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR); + for (size_t i = 0; i < NUM_SERIALIZERS; i++) { + list_head_init(&serializers[i].send_head); + } +} + +void send_cleanup() { + for (size_t i = 0; i < NUM_SERIALIZERS; i++) { + struct send *iter, *next; + list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) { + send_del(iter); + } + } +} + +void *send_get_serializer(char *name) { + for (size_t i = 0; i < NUM_SERIALIZERS; i++) { + if (strcasecmp(serializers[i].name, name) == 0) { + return &serializers[i]; + } + } + return NULL; +} + void send_write(struct packet *packet) { packet_sanity_check(packet); for (size_t i = 0; i < NUM_SERIALIZERS; i++) { diff --git a/adsbus/send.h b/adsbus/send.h index fdc3d67..eab1c4e 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -1,14 +1,14 @@ #pragma once -struct buf; +#include + +struct flow; struct packet; struct peer; void send_init(void); void send_cleanup(void); -struct serializer *send_get_serializer(char *); -void send_new(int, struct serializer *, struct peer *); -void send_new_wrapper(int, void *, struct peer *); -void send_hello(struct buf **, void *); +void *send_get_serializer(char *); void send_write(struct packet *); void send_print_usage(void); +extern struct flow *send_flow; diff --git a/adsbus/socket.c b/adsbus/socket.c index 19e3ce1..f90bdf0 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -7,12 +7,20 @@ #include "socket.h" -void socket_bound_init(int fd) { +void socket_pre_bind(int fd) { + // Called by transport code; safe to assume that fd is a socket + int optval = 1; + assert(!setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval))); +} + +void socket_pre_listen(int fd) { + // Called by transport code; safe to assume that fd is a socket int qlen = 5; assert(!setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen))); } -void socket_connected_init(int fd) { +void socket_connected(int fd) { + // Called by transport code; safe to assume that fd is a socket int optval = 1; assert(!setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))); optval = 30; @@ -22,3 +30,32 @@ void socket_connected_init(int fd) { optval = 3; assert(!setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval))); } + +void socket_connected_send(int fd) { + int optval = 128; // Lowest value that the kernel will accept + int res = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); + if (res == -1 && errno == ENOTSOCK) { + return; + } + assert(res == 0); + + optval = 128; // Lowest value that the kernel will accept + res = setsockopt(fd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &optval, sizeof(optval)); + assert(res == 0); + + optval = 60000; // 60s + res = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &optval, sizeof(optval)); + assert(res == 0); +} + +void socket_send(int fd) { + // Called by data flow code; NOT safe to assume that fd is a socket + int res = shutdown(fd, SHUT_RD); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); +} + +void socket_receive(int fd) { + // Called by data flow code; NOT safe to assume that fd is a socket + int res = shutdown(fd, SHUT_WR); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); +} diff --git a/adsbus/socket.h b/adsbus/socket.h index a7e6e27..b1989a6 100644 --- a/adsbus/socket.h +++ b/adsbus/socket.h @@ -1,4 +1,8 @@ #pragma once -void socket_bound_init(int); -void socket_connected_init(int); +void socket_pre_bind(int); +void socket_pre_listen(int); +void socket_connected(int); +void socket_connected_send(int); +void socket_send(int); +void socket_receive(int); diff --git a/proto/adsb.proto b/proto/adsb.proto index 870d215..aa0d6a6 100644 --- a/proto/adsb.proto +++ b/proto/adsb.proto @@ -1,20 +1,54 @@ message AdsbHeader { + // Always "aDsB" required string magic = 1; + + // Unique identifier for this server implementation + // Recommended: "https://url/of/source#version" required string server_version = 2; + + // Unique identifier for this server instance + // UUID recommended + // 36 character limit required string server_id = 3; + + // MHz of the clock used in subsequent mlat_timestamp fields required fixed32 mlat_timestamp_mhz = 4; + + // Maximum value of subsequent mlat_timestamp fields, at which point values are expected to wrap required fixed64 mlat_timestamp_max = 5; + + // Maximum value of subsequent rssi fields required fixed32 rssi_max = 6; } message AdsbPacket { + // Unique value for the source that recorded this packet + // UUID recommended + // 36 character limit required string source_id = 1; + + // Value of the MLAT counter when this packet arrived at the recorder + // Range [0, mlat_timestamp_max] + // Units of 1 / (mlat_timestamp_mhz * 10^6) Hz optional fixed64 mlat_timestamp = 2; + + // RSSI of the received packet at the recorder + // Range [0, rssi_max] + // Units unspecified optional fixed32 rssi = 3; + + // Binary packet payload. + // Length: + // mode_ac: 2 bytes + // mode_s_short: 7 bytes + // mode_s_long: 14 bytes required bytes payload = 4; } message Adsb { + // Each message must contain exactly one; zero is invalid. + // The first record of a stream must be a header. + // Subsequent records may be in any order, including additional headers. oneof record { AdsbHeader header = 1; AdsbPacket mode_ac = 2; @@ -23,6 +57,9 @@ message Adsb { } } +// adsbus proto serialization takes advantage of the fact that an AdsbStream +// with many messages and many AdsbStreams each with a single message encode +// identically. message AdsbStream { repeated Adsb msg = 1; } diff --git a/protocols/airspy_adsb.md b/protocols/airspy_adsb.md index 21df429..1e9ae32 100644 --- a/protocols/airspy_adsb.md +++ b/protocols/airspy_adsb.md @@ -12,7 +12,7 @@ level data. ## Frame structure * `*` (`0x2a`) -* Uppercase hex-encoded 2, 7, or 14 byte frame (4, 14, or 28 bytes after encoding) +* Uppercase hex-encoded 4, 14, or 28 byte payload (2, 7, or 14 bytes when decoded) * `;` (`0x3b`) * 4 byte MLAT timestamp (see below) * `;` (`0x3b`) diff --git a/protocols/json.md b/protocols/json.md new file mode 100644 index 0000000..5cf2b9e --- /dev/null +++ b/protocols/json.md @@ -0,0 +1,78 @@ +# JSON protocol + +This protocol was created by adsb-tools. This specification is official. + +## Format + +[JSON streaming](https://en.wikipedia.org/wiki/JSON_Streaming) encoding, line delimited. +Each line must contain a single outer JSON object; no other outer types are permitted. + +First frame must always be a header; frames can otherwise appear in any order, including +additional headers. + +## Common fields +* `type` (string): one of: + * `"header"` (see [Header](#header)) + * `"Mode-AC"` (see [Packet](#packet)) + * `"Mode-S short"` (see [Packet](#packet)) + * `"Mode-S long"` (see [Packet](#packet)) + + +## Header +* `type`: + * String + * `"header"` +* `magic`: + * String + * `"aDsB"` +* `server_version`: + * String + * Unique identifier for this server implementation + * Recommended: `"https://url/of/source#version"` +* `server_id`: + * String + * Unique identifier for this server instance + * [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended + * 36 character limit +* `mlat_timestamp_mhz`: + * Integer + * MHz of the clock used in subsequent `mlat_timestamp` fields +* `mlat_timestamp_max`: + * Integer + * Maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap +* `rssi_max`: + * Integer + * Maximum value of subsequent `rssi` fields + + +## Packet +* `type`: + * String + * One of: + * `"Mode-AC"` (4 byte payload, 2 bytes when decoded) + * `"Mode-S short"` (14 byte payload, 7 bytes when decoded) + * `"Mode-S long"` (28 byte payload, 14 bytes when decoded) +* `source_id`: + * String + * Unique value for the source that recorded this packet. + * [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended + * 36 character limit +* `mlat_timestamp`: + * Integer + * Value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder + * Range [0, `mlat_timestamp_max`] + * Units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz +* `rssi`: + * Integer + * [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the received packet at the recorder + * Range [0, `rssi_max`] + * Units unspecified +* `payload`: + * Upper-case, hex-encoded + * See `type` for length + + +## Examples +* `{"mlat_timestamp_mhz": 120, "type": "header", "magic": "aDsB", "server_version": "https://github.com/flamingcowtv/adsb-tools#1", "server_id": "fba76102-c39a-4c4e-af7c-ddd4ec0d45e2", "mlat_timestamp_max": 9223372036854775807, "rssi_max": 4294967295}\n` +* `{"payload": "02C58939D0B3C5", "type": "Mode-S short", "rssi": 269488144, "source_id": "f432c867-4108-4927-ba1f-1cfa71709bc4", "mlat_timestamp": 247651683709560}\n` +* `{"payload": "A8000B0B10010680A600003E4A72", "type": "Mode-S long", "rssi": 2206434179, "source_id": "f432c867-4108-4927-ba1f-1cfa71709bc4", "mlat_timestamp": 247651683777900}\n` diff --git a/protocols/proto.md b/protocols/proto.md new file mode 100644 index 0000000..062db97 --- /dev/null +++ b/protocols/proto.md @@ -0,0 +1,28 @@ +# Protobuf protocol + +This protocol was created by adsb-tools. This specification is official. + +## Format + +[Protocol buffer](https://developers.google.com/protocol-buffers/docs/overview) encoding. +Proto definition is [here](https://github.com/flamingcowtv/adsb-tools/blob/master/proto/adsb.proto). +Stream is a series of Adsb records with length encoded in a prefix (protobuf isn't self +delimiting). The prefix is structured such that an entire stream is a valid AdsbStream +record. + +First frame must always be an AdsbHeader; frames can otherwise appear in any order, including +additional headers. + +## Prefix +* `0x0a`: in protobuf encoding, field #1, type 2 (length-encoded bytes); (1 << 3) | 2 +* Length of packet not including prefix, encoded as a [base 128 varint](https://developers.google.com/protocol-buffers/docs/encoding#varints) + +## Packet +See [definition file](https://github.com/flamingcowtv/adsb-tools/blob/master/proto/adsb.proto) +for details. + +## Tips + +To decode a stream file: + +`$ protoc-c --proto_path=adsb-tools/proto --decode=AdsbStream adsb-tools/proto/adsb.proto < streamfile` diff --git a/protocols/raw.md b/protocols/raw.md index 8dc6ecd..427ede0 100644 --- a/protocols/raw.md +++ b/protocols/raw.md @@ -14,7 +14,7 @@ Simple textual format. ## Frame structure * `*` (`0x2a`) -* Uppercase hex-encoded 2, 7, or 14 byte frame (4, 14, or 28 bytes after encoding) +* Uppercase hex-encoded 4, 14, or 28 byte payload (2, 7, or 14 bytes when decoded) * `;` (`0x3b`) * `\n` (`0x0a`) **OR** `\r\n` (`0x0d 0x0a`)