Merge branch 'master' of github.com:flamingcowtv/adsb-tools

This commit is contained in:
Ian Gulliver
2016-03-02 11:22:09 -08:00
31 changed files with 852 additions and 249 deletions

View File

@@ -1,18 +1,19 @@
COMP ?= clang
DISABLED_WARNINGS ?= -Wno-padded -Wno-disabled-macro-expansion
CFLAGS ?= -Weverything -Werror -O3 -g --std=gnu11 --pedantic-errors -fPIE -fstack-protector-strong -pthread -D_GNU_SOURCE $(DISABLED_WARNINGS)
CFLAGS ?= -Weverything -Werror -O3 -g --std=gnu11 --pedantic-errors -fPIE -fstack-protector-strong -pthread -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 $(DISABLED_WARNINGS)
LDFLAGS ?= $(CFLAGS) -Wl,-z,relro -Wl,-z,now -pie
LIBS ?= -ljansson -lprotobuf-c
TESTCASE_DIR ?= testcase
TESTOUT_DIR ?= testout
VALGRIND ?= valgrind
VALGRIND_FLAGS ?= --error-exitcode=1 --track-fds=yes --show-leak-kinds=all --leak-check=full
VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full
ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats
OBJ_NETWORK = incoming.o outgoing.o receive.o send.o
OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o
OBJ_FLOW = flow.o receive.o send.o
OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o
OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o
OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o
OBJ_PROTO = adsb.pb-c.o
all: adsbus
@@ -26,8 +27,8 @@ clean:
adsb.pb-c.c: ../proto/adsb.proto
protoc-c --c_out=./ --proto_path=$(dir $<) $<
adsbus: adsbus.o $(OBJ_NETWORK) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO)
$(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_NETWORK) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS)
adsbus: adsbus.o $(OBJ_TRANSPORT) $(OBJ_FLOW) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO)
$(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_TRANSPORT) $(OBJ_FLOW) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS)
fuzz:
rm -rf findings

View File

@@ -2,12 +2,58 @@
adsbus is a hub and protocol translator for [ADS-B](https://en.wikipedia.org/wiki/Automatic_dependent_surveillance_%E2%80%93_broadcast) messages.
It is conceptually similar to `dump1090 --net-only`, but supports more protocols and configurations. It doesn't talk to your radio itself; it
hooks programs that do, then handles the network distribution and format translation. It doesn't output to a web interface or send data to
services like FlightAware; it provides hooks for programs that do.
## Building
```bash
sudo apt-get -y install uuid-dev libjansson-dev
sudo apt-get -y install build-essential git clang libjansson-dev libprotobuf-c-dev protobuf-c-compiler
git clone https://github.com/flamingcowtv/adsb-tools.git
cd adsb-tools
cd adsb-tools/adsbus
make
```
## Features
* Separates the concepts of transport, data flow, and format
* Transports:
* Outgoing TCP connection
* Incoming TCP connection
* Local files or [named pipes](https://en.wikipedia.org/wiki/Named_pipe)
* [stdin/stdout](https://en.wikipedia.org/wiki/Standard_streams)
* Execute a command and talk to its stdin/stdout
* Data flows:
* Send (data flows out of adsbus)
* Receive (data flows in to adsbus)
* Formats:
* [airspy_adsb](../protocols/airspy_adsb.md) (a.k.a. ASAVR)
* [beast](../protocols/beast.md)
* [json](../protocols/json.md)
* [proto](../protocols/proto.md) (a.k.a. ProtoBuf, Protocol Buffers)
* [raw](../protocols/raw.md) (a.k.a. AVR)
* stats (send only, summary aggregated data)
* Transport features:
* [IPv4](https://en.wikipedia.org/wiki/IPv4) and [IPv6](https://en.wikipedia.org/wiki/IPv6) support
* Reresolution and reconnection on disconnect, with backoff and jitter
* [TCP keepalives](https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive) for dead connection detection
* [TCP fast open](https://en.wikipedia.org/wiki/TCP_Fast_Open) for faster startup of high-latency connections
* [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates
* Data flow features:
* Rapid detection and disconnection of receive <-> receive connections
* Less rapid detection and disconnection of send <-> send connections
* Format features:
* Autodetection of received data format
* [MLAT](https://en.wikipedia.org/wiki/Multilateration) scaling for different clock rates and counter bit widths
* [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) scaling for different bit widths
* Introduces json format for balanced human and machine readability with forward compatibility
* Introduces proto format for fast serialization and deserialization with forward compatibility
* Federation:
* Federation allows linking multiple instances of adsbus for:
* Scalability (cores, number of input or output clients, etc.)
* Efficient long-haul links (hub and spoke models on both ends)
* json and proto formats carry information about original source across multiple hops
* SO_REUSEPORT allows multiple adsbus instances to accept connections on the same IP and port without a load balancer

View File

@@ -6,6 +6,7 @@
#include <unistd.h>
#include "beast.h"
#include "exec.h"
#include "hex.h"
#include "incoming.h"
#include "json.h"
@@ -32,6 +33,11 @@ static void print_usage(const char *name) {
"\t--connect-send=FORMAT=HOST/PORT\n"
"\t--listen-receive=[HOST/]PORT\n"
"\t--listen-send=FORMAT=[HOST/]PORT\n"
"\t--file-read=PATH\n"
"\t--file-write=FORMAT=PATH\n"
"\t--file-append=FORMAT=PATH\n"
"\t--exec-receive=COMMAND\n"
"\t--exec-send=FORMAT=COMMAND\n"
"\t--stdin\n"
"\t--stdout=FORMAT\n"
, name);
@@ -45,6 +51,11 @@ static bool parse_opts(int argc, char *argv[]) {
{"connect-send", required_argument, 0, 's'},
{"listen-receive", required_argument, 0, 'l'},
{"listen-send", required_argument, 0, 'm'},
{"file-read", required_argument, 0, 'r'},
{"file-write", required_argument, 0, 'w'},
{"file-append", required_argument, 0, 'a'},
{"exec-receive", required_argument, 0, 'e'},
{"exec-send", required_argument, 0, 'f'},
{"stdin", no_argument, 0, 'i'},
{"stdout", required_argument, 0, 'o'},
{"help", no_argument, 0, 'h'},
@@ -71,6 +82,26 @@ static bool parse_opts(int argc, char *argv[]) {
handler = opts_add_listen_send;
break;
case 'r':
handler = opts_add_file_read;
break;
case 'w':
handler = opts_add_file_write;
break;
case 'a':
handler = opts_add_file_append;
break;
case 'e':
handler = opts_add_exec_receive;
break;
case 'f':
handler = opts_add_exec_send;
break;
case 'i':
handler = opts_add_stdin;
break;
@@ -133,6 +164,7 @@ int main(int argc, char *argv[]) {
send_cleanup();
incoming_cleanup();
outgoing_cleanup();
exec_cleanup();
json_cleanup();
proto_cleanup();

126
adsbus/asyncaddrinfo.c Normal file
View File

@@ -0,0 +1,126 @@
#include <assert.h>
#include <fcntl.h>
#include <netdb.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include "asyncaddrinfo.h"
struct asyncaddrinfo_resolution {
int return_fd;
char *node;
char *service;
struct addrinfo _hints, *hints;
int err;
struct addrinfo *addrs;
};
static size_t asyncaddrinfo_num_threads;
static pthread_t *asyncaddrinfo_threads = NULL;
static int asyncaddrinfo_write_fd;
static void *asyncaddrinfo_main(void *arg) {
int fd = (int) (intptr_t) arg;
struct asyncaddrinfo_resolution *res;
ssize_t len;
while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) {
res->err = getaddrinfo(res->node, res->service, res->hints, &res->addrs);
int return_fd = res->return_fd;
res->return_fd = -1;
assert(send(return_fd, &res, sizeof(res), MSG_EOR) == sizeof(res));
// Main thread now owns res
assert(!close(return_fd));
}
assert(!len);
assert(!close(fd));
return NULL;
}
static void asyncaddrinfo_del(struct asyncaddrinfo_resolution *res) {
if (res->node) {
free(res->node);
res->node = NULL;
}
if (res->service) {
free(res->service);
res->service = NULL;
}
free(res);
}
void asyncaddrinfo_init(size_t threads) {
assert(!asyncaddrinfo_threads);
int fds[2];
assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds));
asyncaddrinfo_write_fd = fds[1];
asyncaddrinfo_num_threads = threads;
asyncaddrinfo_threads = malloc(asyncaddrinfo_num_threads * sizeof(*asyncaddrinfo_threads));
assert(asyncaddrinfo_threads);
for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) {
int subfd = fcntl(fds[0], F_DUPFD_CLOEXEC, 0);
assert(subfd >= 0);
assert(!pthread_create(&asyncaddrinfo_threads[i], NULL, asyncaddrinfo_main, (void *) (intptr_t) subfd));
}
assert(!close(fds[0]));
}
void asyncaddrinfo_cleanup() {
assert(asyncaddrinfo_threads);
assert(!close(asyncaddrinfo_write_fd));
asyncaddrinfo_write_fd = -1;
for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) {
assert(!pthread_join(asyncaddrinfo_threads[i], NULL));
}
free(asyncaddrinfo_threads);
asyncaddrinfo_threads = NULL;
}
int asyncaddrinfo_resolve(const char *node, const char *service, const struct addrinfo *hints) {
int fds[2];
assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds));
struct asyncaddrinfo_resolution *res = malloc(sizeof(*res));
assert(res);
res->return_fd = fds[1];
if (node) {
res->node = strdup(node);
assert(res->node);
} else {
res->node = NULL;
}
if (service) {
res->service = strdup(service);
assert(res->service);
} else {
res->service = NULL;
}
if (hints) {
memcpy(&res->_hints, hints, sizeof(res->_hints));
res->hints = &res->_hints;
} else {
res->hints = NULL;
}
assert(send(asyncaddrinfo_write_fd, &res, sizeof(res), MSG_EOR) == sizeof(res));
// Resolve thread now owns res
return fds[0];
}
int asyncaddrinfo_result(int fd, struct addrinfo **addrs) {
struct asyncaddrinfo_resolution *res;
assert(recv(fd, &res, sizeof(res), 0) == sizeof(res));
assert(!close(fd));
*addrs = res->addrs;
int err = res->err;
asyncaddrinfo_del(res);
return err;
}

8
adsbus/asyncaddrinfo.h Normal file
View File

@@ -0,0 +1,8 @@
#pragma once
struct addrinfo;
void asyncaddrinfo_init(size_t threads);
void asyncaddrinfo_cleanup(void);
int asyncaddrinfo_resolve(const char *node, const char *service, const struct addrinfo *hints);
int asyncaddrinfo_result(int fd, struct addrinfo **addrs);

135
adsbus/exec.c Normal file
View File

@@ -0,0 +1,135 @@
#include <assert.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "buf.h"
#include "flow.h"
#include "list.h"
#include "peer.h"
#include "uuid.h"
#include "wakeup.h"
#include "exec.h"
struct exec {
struct peer peer;
uint8_t id[UUID_LEN];
char *command;
struct flow *flow;
void *passthrough;
pid_t child;
struct list_head exec_list;
};
static struct list_head exec_head = LIST_HEAD_INIT(exec_head);
static void exec_spawn_wrapper(struct peer *);
static void exec_del(struct exec *exec) {
(*exec->flow->ref_count)--;
if (exec->child > 0) {
fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child);
// Racy with the process terminating, so don't assert on it
kill(exec->child, SIGTERM);
assert(waitpid(exec->child, NULL, 0) == exec->child);
}
free(exec->command);
free(exec);
}
static void exec_close_handler(struct peer *peer) {
struct exec *exec = (struct exec *) peer;
int status;
assert(waitpid(exec->child, &status, WNOHANG) == exec->child);
exec->child = -1;
if (WIFEXITED(status)) {
fprintf(stderr, "E %s: Client exited with status %d\n", exec->id, WEXITSTATUS(status));
} else {
assert(WIFSIGNALED(status));
fprintf(stderr, "E %s: Client exited with signal %d\n", exec->id, WTERMSIG(status));
}
uint32_t delay = wakeup_get_retry_delay_ms(1);
fprintf(stderr, "E %s: Will retry in %ds\n", exec->id, delay / 1000);
exec->peer.event_handler = exec_spawn_wrapper;
wakeup_add((struct peer *) exec, delay);
}
static void exec_parent(struct exec *exec, pid_t child, int fd) {
exec->child = child;
fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child);
if (!flow_hello(fd, exec->flow, exec->passthrough)) {
assert(!close(fd));
exec_close_handler((struct peer *) exec);
return;
}
exec->peer.event_handler = exec_close_handler;
exec->flow->new(fd, exec->passthrough, (struct peer *) exec);
}
static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) {
assert(setsid() != -1);
// We leave stderr open from child to parent
// Other than that, fds should have CLOEXEC set
if (fd != 0) {
assert(dup2(fd, 0) == 0);
}
if (fd != 1) {
assert(dup2(fd, 1) == 1);
}
if (fd != 0 && fd != 1) {
assert(!close(fd));
}
assert(!execl("/bin/sh", "sh", "-c", exec->command, NULL));
abort();
}
static void exec_spawn(struct exec *exec) {
fprintf(stderr, "E %s: Executing: %s\n", exec->id, exec->command);
int fds[2];
assert(!socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds));
int res = fork();
assert(res >= 0);
if (res) {
assert(!close(fds[1]));
exec_parent(exec, res, fds[0]);
} else {
assert(!close(fds[0]));
exec_child(exec, fds[1]);
}
}
static void exec_spawn_wrapper(struct peer *peer) {
struct exec *exec = (struct exec *) peer;
exec_spawn(exec);
}
void exec_cleanup() {
struct exec *iter, *next;
list_for_each_entry_safe(iter, next, &exec_head, exec_list) {
exec_del(iter);
}
}
void exec_new(char *command, struct flow *flow, void *passthrough) {
(*flow->ref_count)++;
struct exec *exec = malloc(sizeof(*exec));
exec->peer.fd = -1;
uuid_gen(exec->id);
exec->command = strdup(command);
exec->flow = flow;
exec->passthrough = passthrough;
list_add(&exec->exec_list, &exec_head);
exec_spawn(exec);
}

6
adsbus/exec.h Normal file
View File

@@ -0,0 +1,6 @@
#pragma once
struct flow;
void exec_cleanup(void);
void exec_new(char *, struct flow *, void *);

43
adsbus/file.c Normal file
View File

@@ -0,0 +1,43 @@
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "flow.h"
#include "receive.h"
#include "send.h"
#include "file.h"
void file_fd_new(int fd, struct flow *flow, void *passthrough) {
flow->new(fd, passthrough, NULL);
// TODO: log error; retry?
flow_hello(fd, flow, passthrough);
}
void file_read_new(char *path, struct flow *flow, void *passthrough) {
int fd = open(path, O_RDONLY | O_CLOEXEC);
if (fd == -1) {
// TODO: log error; retry?
return;
}
file_fd_new(fd, flow, passthrough);
}
void file_write_new(char *path, struct flow *flow, void *passthrough) {
int fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC | O_CLOEXEC, S_IRUSR | S_IWUSR);
if (fd == -1) {
// TODO: log error; retry?
return;
}
file_fd_new(fd, flow, passthrough);
}
void file_append_new(char *path, struct flow *flow, void *passthrough) {
int fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_CLOEXEC, S_IRWXU);
if (fd == -1) {
// TODO: log error; retry?
return;
}
flow->new(fd, flow, passthrough);
}

8
adsbus/file.h Normal file
View File

@@ -0,0 +1,8 @@
#pragma once
struct flow;
void file_fd_new(int, struct flow *, void *);
void file_read_new(char *, struct flow *, void *);
void file_write_new(char *, struct flow *, void *);
void file_append_new(char *, struct flow *, void *);

25
adsbus/flow.c Normal file
View File

@@ -0,0 +1,25 @@
#include <unistd.h>
#include "buf.h"
#include "socket.h"
#include "flow.h"
void flow_socket_connected(int fd, struct flow *flow) {
socket_connected(fd);
if (flow->socket_connected) {
flow->socket_connected(fd);
}
}
bool flow_hello(int fd, struct flow *flow, void *passthrough) {
if (!flow->get_hello) {
return true;
}
struct buf buf = BUF_INIT, *buf_ptr = &buf;
flow->get_hello(&buf_ptr, passthrough);
if (!buf_ptr->length) {
return true;
}
return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length);
}

18
adsbus/flow.h Normal file
View File

@@ -0,0 +1,18 @@
#pragma once
#include <stdbool.h>
#include <stdint.h>
struct buf;
struct peer;
struct flow {
const char *name;
void (*socket_connected)(int);
void (*new)(int, void *, struct peer *);
void (*get_hello)(struct buf **, void *);
uint32_t *ref_count;
};
void flow_socket_connected(int, struct flow *);
bool flow_hello(int, struct flow *, void *);

View File

@@ -9,6 +9,7 @@
#include <unistd.h>
#include "buf.h"
#include "flow.h"
#include "list.h"
#include "peer.h"
#include "resolve.h"
@@ -23,13 +24,9 @@ struct incoming {
uint8_t id[UUID_LEN];
char *node;
char *service;
struct addrinfo *addrs;
const char *error;
uint32_t attempt;
incoming_connection_handler handler;
incoming_get_hello hello;
struct flow *flow;
void *passthrough;
uint32_t *count;
struct list_head incoming_list;
};
@@ -44,18 +41,6 @@ static void incoming_retry(struct incoming *incoming) {
wakeup_add((struct peer *) incoming, delay);
}
static bool incoming_hello(int fd, struct incoming *incoming) {
if (!incoming->hello) {
return true;
}
struct buf buf = BUF_INIT, *buf_ptr = &buf;
incoming->hello(&buf_ptr, incoming->passthrough);
if (!buf_ptr->length) {
return true;
}
return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length);
}
static void incoming_handler(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer;
@@ -79,19 +64,17 @@ static void incoming_handler(struct peer *peer) {
local_hbuf, local_sbuf,
peer_hbuf, peer_sbuf);
socket_connected_init(fd);
if (!incoming_hello(fd, incoming)) {
if (!flow_hello(fd, incoming->flow, incoming->passthrough)) {
fprintf(stderr, "I %s: Error writing greeting\n", incoming->id);
assert(!close(fd));
return;
}
incoming->handler(fd, incoming->passthrough, NULL);
incoming->flow->new(fd, incoming->passthrough, NULL);
}
static void incoming_del(struct incoming *incoming) {
(*incoming->count)--;
(*incoming->flow->ref_count)--;
if (incoming->peer.fd >= 0) {
assert(!close(incoming->peer.fd));
}
@@ -100,9 +83,19 @@ static void incoming_del(struct incoming *incoming) {
free(incoming);
}
static void incoming_listen(struct incoming *incoming) {
static void incoming_listen(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer;
struct addrinfo *addrs;
int err = resolve_result(peer, &addrs);
if (err) {
fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(err));
incoming_retry(incoming);
return;
}
struct addrinfo *addr;
for (addr = incoming->addrs; addr; addr = addr->ai_next) {
for (addr = addrs; addr; addr = addr->ai_next) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf);
@@ -110,8 +103,7 @@ static void incoming_listen(struct incoming *incoming) {
incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol);
assert(incoming->peer.fd >= 0);
int optval = 1;
setsockopt(incoming->peer.fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
socket_pre_bind(incoming->peer.fd);
if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) {
fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno));
@@ -119,13 +111,15 @@ static void incoming_listen(struct incoming *incoming) {
continue;
}
socket_bound_init(incoming->peer.fd);
socket_pre_listen(incoming->peer.fd);
// Options are inherited through accept()
flow_socket_connected(incoming->peer.fd, incoming->flow);
assert(listen(incoming->peer.fd, 255) == 0);
break;
}
freeaddrinfo(incoming->addrs);
freeaddrinfo(addrs);
if (addr == NULL) {
fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service);
@@ -138,21 +132,10 @@ static void incoming_listen(struct incoming *incoming) {
peer_epoll_add((struct peer *) incoming, EPOLLIN);
}
static void incoming_resolve_handler(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer;
if (incoming->addrs) {
incoming_listen(incoming);
} else {
fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, incoming->error);
incoming_retry(incoming);
}
}
static void incoming_resolve(struct incoming *incoming) {
fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service);
incoming->peer.fd = -1;
incoming->peer.event_handler = incoming_resolve_handler;
resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE, &incoming->addrs, &incoming->error);
incoming->peer.event_handler = incoming_listen;
resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE);
}
static void incoming_resolve_wrapper(struct peer *peer) {
@@ -166,19 +149,17 @@ void incoming_cleanup() {
}
}
void incoming_new(char *node, char *service, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) {
(*count)++;
void incoming_new(char *node, char *service, struct flow *flow, void *passthrough) {
(*flow->ref_count)++;
struct incoming *incoming = malloc(sizeof(*incoming));
incoming->peer.event_handler = incoming_handler;
uuid_gen(incoming->id);
incoming->node = strdup(node);
incoming->node = node ? strdup(node) : NULL;
incoming->service = strdup(service);
incoming->attempt = 0;
incoming->handler = handler;
incoming->hello = hello;
incoming->flow = flow;
incoming->passthrough = passthrough;
incoming->count = count;
list_add(&incoming->incoming_list, &incoming_head);

View File

@@ -1,11 +1,6 @@
#pragma once
#include <stdint.h>
struct buf;
struct peer;
struct flow;
void incoming_cleanup(void);
typedef void (*incoming_connection_handler)(int fd, void *, struct peer *);
typedef void (*incoming_get_hello)(struct buf **, void *);
void incoming_new(char *, char *, incoming_connection_handler, incoming_get_hello, void *, uint32_t *);
void incoming_new(char *, char *, struct flow *, void *);

View File

@@ -1,11 +1,15 @@
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "buf.h"
#include "exec.h"
#include "file.h"
#include "incoming.h"
#include "outgoing.h"
#include "peer.h"
#include "receive.h"
#include "send.h"
@@ -21,13 +25,13 @@ static char *opts_split(char **arg, char delim) {
return ret;
}
static void opts_add_listen(char *host_port, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) {
static void opts_add_listen(char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (host) {
incoming_new(host, host_port, handler, hello, passthrough, count);
incoming_new(host, host_port, flow, passthrough);
free(host);
} else {
incoming_new(NULL, host_port, handler, hello, passthrough, count);
incoming_new(NULL, host_port, flow, passthrough);
}
}
@@ -52,7 +56,7 @@ bool opts_add_connect_receive(char *arg) {
return false;
}
outgoing_new(host, arg, receive_new, NULL, NULL, &peer_count_in);
outgoing_new(host, arg, receive_flow, NULL);
free(host);
return true;
}
@@ -68,13 +72,13 @@ bool opts_add_connect_send(char *arg) {
return false;
}
outgoing_new(host, arg, send_new_wrapper, send_hello, serializer, &peer_count_out);
outgoing_new(host, arg, send_flow, serializer);
free(host);
return true;
}
bool opts_add_listen_receive(char *arg) {
opts_add_listen(arg, receive_new, NULL, NULL, &peer_count_in);
opts_add_listen(arg, receive_flow, NULL);
return true;
}
@@ -84,12 +88,54 @@ bool opts_add_listen_send(char *arg) {
return false;
}
opts_add_listen(arg, send_new_wrapper, send_hello, serializer, &peer_count_out);
opts_add_listen(arg, send_flow, serializer);
return true;
}
bool opts_add_file_read(char *arg) {
file_read_new(arg, receive_flow, NULL);
return true;
}
bool opts_add_file_write(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
file_write_new(arg, send_flow, serializer);
return true;
}
bool opts_add_file_append(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
file_append_new(arg, send_flow, serializer);
return true;
}
bool opts_add_exec_receive(char *arg) {
exec_new(arg, receive_flow, NULL);
return true;
}
bool opts_add_exec_send(char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
exec_new(arg, send_flow, serializer);
return true;
}
bool opts_add_stdin(char __attribute__((unused)) *arg) {
receive_new(dup(0), NULL, NULL);
int fd = fcntl(0, F_DUPFD_CLOEXEC, 0);
assert(fd >= 0);
file_fd_new(fd, receive_flow, NULL);
return true;
}
@@ -98,17 +144,8 @@ bool opts_add_stdout(char *arg) {
if (!serializer) {
return false;
}
int fd = dup(1);
{
// TODO: move into standard location for non-socket fd handling
struct buf buf = BUF_INIT, *buf_ptr = &buf;
send_hello(&buf_ptr, serializer);
if (buf_ptr->length) {
if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) {
return false;
}
}
}
send_new(fd, serializer, NULL);
int fd = fcntl(1, F_DUPFD_CLOEXEC, 0);
assert(fd >= 0);
file_fd_new(fd, send_flow, serializer);
return true;
}

View File

@@ -6,5 +6,10 @@ bool opts_add_connect_receive(char *);
bool opts_add_connect_send(char *);
bool opts_add_listen_receive(char *);
bool opts_add_listen_send(char *);
bool opts_add_file_read(char *);
bool opts_add_file_write(char *);
bool opts_add_file_append(char *);
bool opts_add_exec_receive(char *);
bool opts_add_exec_send(char *);
bool opts_add_stdout(char *);
bool opts_add_stdin(char *);

View File

@@ -9,6 +9,7 @@
#include <unistd.h>
#include "buf.h"
#include "flow.h"
#include "list.h"
#include "peer.h"
#include "resolve.h"
@@ -25,12 +26,9 @@ struct outgoing {
char *service;
struct addrinfo *addrs;
struct addrinfo *addr;
const char *error;
uint32_t attempt;
outgoing_connection_handler handler;
outgoing_get_hello hello;
struct flow *flow;
void *passthrough;
uint32_t *count;
struct list_head outgoing_list;
};
@@ -41,7 +39,7 @@ static void outgoing_resolve(struct outgoing *);
static void outgoing_resolve_wrapper(struct peer *);
static void outgoing_retry(struct outgoing *outgoing) {
uint32_t delay = wakeup_get_retry_delay_ms(outgoing->attempt++);
uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt);
fprintf(stderr, "O %s: Will retry in %ds\n", outgoing->id, delay / 1000);
outgoing->peer.event_handler = outgoing_resolve_wrapper;
wakeup_add((struct peer *) outgoing, delay);
@@ -63,11 +61,11 @@ static void outgoing_connect_next(struct outgoing *outgoing) {
assert(outgoing->peer.fd >= 0);
struct buf buf = BUF_INIT, *buf_ptr = &buf;
if (outgoing->hello) {
outgoing->hello(&buf_ptr, outgoing->passthrough);
if (outgoing->flow->get_hello) {
outgoing->flow->get_hello(&buf_ptr, outgoing->passthrough);
}
int result = (int) sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen);
outgoing_connect_result(outgoing, result == 0 ? result : errno);
ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen);
outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno);
}
static void outgoing_connect_handler(struct peer *peer) {
@@ -97,12 +95,12 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
case 0:
fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf);
freeaddrinfo(outgoing->addrs);
socket_connected_init(outgoing->peer.fd);
flow_socket_connected(outgoing->peer.fd, outgoing->flow);
outgoing->attempt = 0;
int fd = outgoing->peer.fd;
outgoing->peer.fd = -1;
outgoing->peer.event_handler = outgoing_disconnect_handler;
outgoing->handler(fd, outgoing->passthrough, (struct peer *) outgoing);
outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing);
break;
case EINPROGRESS:
@@ -123,20 +121,20 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
static void outgoing_resolve_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer;
if (outgoing->addrs) {
int err = resolve_result(peer, &outgoing->addrs);
if (err) {
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(err));
outgoing_retry(outgoing);
} else {
outgoing->addr = outgoing->addrs;
outgoing_connect_next(outgoing);
} else {
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, outgoing->error);
outgoing_retry(outgoing);
}
}
static void outgoing_resolve(struct outgoing *outgoing) {
fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service);
outgoing->peer.fd = -1;
outgoing->peer.event_handler = outgoing_resolve_handler;
resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0, &outgoing->addrs, &outgoing->error);
resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0);
}
static void outgoing_resolve_wrapper(struct peer *peer) {
@@ -144,7 +142,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) {
}
static void outgoing_del(struct outgoing *outgoing) {
(*outgoing->count)--;
(*outgoing->flow->ref_count)--;
if (outgoing->peer.fd >= 0) {
assert(!close(outgoing->peer.fd));
}
@@ -160,18 +158,16 @@ void outgoing_cleanup() {
}
}
void outgoing_new(char *node, char *service, outgoing_connection_handler handler, outgoing_get_hello hello, void *passthrough, uint32_t *count) {
(*count)++;
void outgoing_new(char *node, char *service, struct flow *flow, void *passthrough) {
(*flow->ref_count)++;
struct outgoing *outgoing = malloc(sizeof(*outgoing));
uuid_gen(outgoing->id);
outgoing->node = strdup(node);
outgoing->service = strdup(service);
outgoing->attempt = 0;
outgoing->handler = handler;
outgoing->hello = hello;
outgoing->flow = flow;
outgoing->passthrough = passthrough;
outgoing->count = count;
list_add(&outgoing->outgoing_list, &outgoing_head);

View File

@@ -1,9 +1,6 @@
#pragma once
struct buf;
struct peer;
struct flow;
void outgoing_cleanup(void);
typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *);
typedef void (*outgoing_get_hello)(struct buf **, void *);
void outgoing_new(char *, char *, outgoing_connection_handler, outgoing_get_hello, void *, uint32_t *);
void outgoing_new(char *, char *, struct flow *, void *);

View File

@@ -16,7 +16,7 @@ static struct buf rand_buf = BUF_INIT;
static int rand_fd;
void rand_init() {
rand_fd = open("/dev/urandom", O_RDONLY);
rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC);
assert(rand_fd >= 0);
assert(read(rand_fd, buf_at(&rand_buf, 0), BUF_LEN_MAX) == BUF_LEN_MAX);
rand_buf.length = BUF_LEN_MAX;

View File

@@ -9,11 +9,13 @@
#include "airspy_adsb.h"
#include "beast.h"
#include "buf.h"
#include "flow.h"
#include "json.h"
#include "packet.h"
#include "peer.h"
#include "proto.h"
#include "raw.h"
#include "socket.h"
#include "send.h"
#include "uuid.h"
@@ -35,6 +37,15 @@ struct receive {
};
static struct receive *receive_head = NULL;
static void receive_new(int, void *, struct peer *);
static struct flow _receive_flow = {
.name = "receive",
.new = receive_new,
.ref_count = &peer_count_in,
};
struct flow *receive_flow = &_receive_flow;
static struct parser {
char *name;
parser parse;
@@ -123,17 +134,10 @@ static void receive_read(struct peer *peer) {
}
}
void receive_cleanup() {
while (receive_head) {
receive_del(receive_head);
}
}
void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) {
static void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) {
peer_count_in++;
int res = shutdown(fd, SHUT_WR);
assert(res == 0 || (res == -1 && errno == ENOTSOCK));
socket_receive(fd);
struct receive *receive = malloc(sizeof(*receive));
assert(receive);
@@ -156,6 +160,12 @@ void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer
fprintf(stderr, "R %s: New receive connection\n", receive->id);
}
void receive_cleanup() {
while (receive_head) {
receive_del(receive_head);
}
}
void receive_print_usage() {
fprintf(stderr, "\nSupported receive formats (auto-detected):\n");
for (size_t i = 0; i < NUM_PARSERS; i++) {

View File

@@ -1,11 +1,9 @@
#pragma once
#include <stdbool.h>
#define PARSER_STATE_LEN 256
struct peer;
struct flow;
void receive_cleanup(void);
void receive_new(int, void *, struct peer *);
void receive_print_usage(void);
extern struct flow *receive_flow;

View File

@@ -1,93 +1,33 @@
#include <assert.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include "asyncaddrinfo.h"
#include "peer.h"
#include "resolve.h"
struct resolve_request {
int fd;
const char *node;
const char *service;
int flags;
struct addrinfo **addrs;
const char **error;
};
struct resolve_peer {
struct peer peer;
struct peer *inner_peer;
};
static pthread_t resolve_thread;
static int resolve_write_fd;
static void resolve_handler(struct peer *peer) {
struct resolve_peer *resolve_peer = (struct resolve_peer *) peer;
assert(!close(resolve_peer->peer.fd));
peer_call(resolve_peer->inner_peer);
free(resolve_peer);
}
static void *resolve_main(void *arg) {
int fd = (int) (intptr_t) arg;
struct resolve_request *request;
ssize_t ret;
while ((ret = read(fd, &request, sizeof(request))) == sizeof(request)) {
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | request->flags,
};
int err = getaddrinfo(request->node, request->service, &hints, request->addrs);
if (err) {
*request->addrs = NULL;
*request->error = gai_strerror(err);
} else {
*request->error = NULL;
}
close(request->fd);
free(request);
}
assert(!ret);
assert(!close(fd));
return NULL;
}
void resolve_init() {
int fds[2];
assert(!pipe2(fds, O_CLOEXEC));
resolve_write_fd = fds[1];
assert(!pthread_create(&resolve_thread, NULL, resolve_main, (void *) (intptr_t) fds[0]));
asyncaddrinfo_init(2);
}
void resolve_cleanup() {
assert(!close(resolve_write_fd));
assert(!pthread_join(resolve_thread, NULL));
asyncaddrinfo_cleanup();
}
void resolve(struct peer *peer, const char *node, const char *service, int flags, struct addrinfo **addrs, const char **error) {
int fds[2];
assert(!pipe2(fds, O_CLOEXEC));
void resolve(struct peer *peer, const char *node, const char *service, int flags) {
struct addrinfo hints = {
.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags,
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
};
struct resolve_request *request = malloc(sizeof(*request));
request->fd = fds[1];
request->node = node;
request->service = service;
request->flags = flags;
request->addrs = addrs;
request->error = error;
assert(write(resolve_write_fd, &request, sizeof(request)) == sizeof(request));
struct resolve_peer *resolve_peer = malloc(sizeof(*resolve_peer));
resolve_peer->peer.fd = fds[0];
resolve_peer->peer.event_handler = resolve_handler;
resolve_peer->inner_peer = peer;
peer_epoll_add((struct peer *) resolve_peer, EPOLLRDHUP);
peer->fd = asyncaddrinfo_resolve(node, service, &hints);
peer_epoll_add(peer, EPOLLIN);
}
int resolve_result(struct peer *peer, struct addrinfo **addrs) {
int err = asyncaddrinfo_result(peer->fd, addrs);
peer->fd = -1;
return err;
}

View File

@@ -5,4 +5,5 @@ struct addrinfo;
void resolve_init(void);
void resolve_cleanup(void);
void resolve(struct peer *, const char *, const char *, int, struct addrinfo **, const char **);
void resolve(struct peer *, const char *, const char *, int);
int resolve_result(struct peer *, struct addrinfo **addrs);

View File

@@ -12,12 +12,14 @@
#include "airspy_adsb.h"
#include "beast.h"
#include "buf.h"
#include "flow.h"
#include "json.h"
#include "list.h"
#include "packet.h"
#include "peer.h"
#include "proto.h"
#include "raw.h"
#include "socket.h"
#include "stats.h"
#include "uuid.h"
@@ -31,6 +33,18 @@ struct send {
struct list_head send_list;
};
static void send_new(int, void *, struct peer *);
static void send_get_hello(struct buf **, void *);
static struct flow _send_flow = {
.name = "send",
.socket_connected = socket_connected_send,
.new = send_new,
.get_hello = send_get_hello,
.ref_count = &peer_count_out,
};
struct flow *send_flow = &_send_flow;
typedef void (*serialize)(struct packet *, struct buf *);
typedef void (*hello)(struct buf **);
static struct serializer {
@@ -86,36 +100,12 @@ static void send_del_wrapper(struct peer *peer) {
send_del((struct send *) peer);
}
void send_init() {
assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
list_head_init(&serializers[i].send_head);
}
}
static void send_new(int fd, void *passthrough, struct peer *on_close) {
struct serializer *serializer = (struct serializer *) passthrough;
void send_cleanup() {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) {
send_del(iter);
}
}
}
struct serializer *send_get_serializer(char *name) {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void send_new(int fd, struct serializer *serializer, struct peer *on_close) {
peer_count_out++;
int res = shutdown(fd, SHUT_RD);
assert(res == 0 || (res == -1 && errno == ENOTSOCK));
socket_send(fd);
struct send *send = malloc(sizeof(*send));
assert(send);
@@ -132,17 +122,38 @@ void send_new(int fd, struct serializer *serializer, struct peer *on_close) {
fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name);
}
void send_new_wrapper(int fd, void *passthrough, struct peer *on_close) {
send_new(fd, (struct serializer *) passthrough, on_close);
}
void send_hello(struct buf **buf_pp, void *passthrough) {
static void send_get_hello(struct buf **buf_pp, void *passthrough) {
struct serializer *serializer = (struct serializer *) passthrough;
if (serializer->hello) {
serializer->hello(buf_pp);
}
}
void send_init() {
assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
list_head_init(&serializers[i].send_head);
}
}
void send_cleanup() {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
struct send *iter, *next;
list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) {
send_del(iter);
}
}
}
void *send_get_serializer(char *name) {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i];
}
}
return NULL;
}
void send_write(struct packet *packet) {
packet_sanity_check(packet);
for (size_t i = 0; i < NUM_SERIALIZERS; i++) {

View File

@@ -1,14 +1,14 @@
#pragma once
struct buf;
#include <stdbool.h>
struct flow;
struct packet;
struct peer;
void send_init(void);
void send_cleanup(void);
struct serializer *send_get_serializer(char *);
void send_new(int, struct serializer *, struct peer *);
void send_new_wrapper(int, void *, struct peer *);
void send_hello(struct buf **, void *);
void *send_get_serializer(char *);
void send_write(struct packet *);
void send_print_usage(void);
extern struct flow *send_flow;

View File

@@ -7,12 +7,20 @@
#include "socket.h"
void socket_bound_init(int fd) {
void socket_pre_bind(int fd) {
// Called by transport code; safe to assume that fd is a socket
int optval = 1;
assert(!setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)));
}
void socket_pre_listen(int fd) {
// Called by transport code; safe to assume that fd is a socket
int qlen = 5;
assert(!setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen)));
}
void socket_connected_init(int fd) {
void socket_connected(int fd) {
// Called by transport code; safe to assume that fd is a socket
int optval = 1;
assert(!setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)));
optval = 30;
@@ -22,3 +30,32 @@ void socket_connected_init(int fd) {
optval = 3;
assert(!setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval)));
}
void socket_connected_send(int fd) {
int optval = 128; // Lowest value that the kernel will accept
int res = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval));
if (res == -1 && errno == ENOTSOCK) {
return;
}
assert(res == 0);
optval = 128; // Lowest value that the kernel will accept
res = setsockopt(fd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &optval, sizeof(optval));
assert(res == 0);
optval = 60000; // 60s
res = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &optval, sizeof(optval));
assert(res == 0);
}
void socket_send(int fd) {
// Called by data flow code; NOT safe to assume that fd is a socket
int res = shutdown(fd, SHUT_RD);
assert(res == 0 || (res == -1 && errno == ENOTSOCK));
}
void socket_receive(int fd) {
// Called by data flow code; NOT safe to assume that fd is a socket
int res = shutdown(fd, SHUT_WR);
assert(res == 0 || (res == -1 && errno == ENOTSOCK));
}

View File

@@ -1,4 +1,8 @@
#pragma once
void socket_bound_init(int);
void socket_connected_init(int);
void socket_pre_bind(int);
void socket_pre_listen(int);
void socket_connected(int);
void socket_connected_send(int);
void socket_send(int);
void socket_receive(int);

View File

@@ -1,20 +1,54 @@
message AdsbHeader {
// Always "aDsB"
required string magic = 1;
// Unique identifier for this server implementation
// Recommended: "https://url/of/source#version"
required string server_version = 2;
// Unique identifier for this server instance
// UUID recommended
// 36 character limit
required string server_id = 3;
// MHz of the clock used in subsequent mlat_timestamp fields
required fixed32 mlat_timestamp_mhz = 4;
// Maximum value of subsequent mlat_timestamp fields, at which point values are expected to wrap
required fixed64 mlat_timestamp_max = 5;
// Maximum value of subsequent rssi fields
required fixed32 rssi_max = 6;
}
message AdsbPacket {
// Unique value for the source that recorded this packet
// UUID recommended
// 36 character limit
required string source_id = 1;
// Value of the MLAT counter when this packet arrived at the recorder
// Range [0, mlat_timestamp_max]
// Units of 1 / (mlat_timestamp_mhz * 10^6) Hz
optional fixed64 mlat_timestamp = 2;
// RSSI of the received packet at the recorder
// Range [0, rssi_max]
// Units unspecified
optional fixed32 rssi = 3;
// Binary packet payload.
// Length:
// mode_ac: 2 bytes
// mode_s_short: 7 bytes
// mode_s_long: 14 bytes
required bytes payload = 4;
}
message Adsb {
// Each message must contain exactly one; zero is invalid.
// The first record of a stream must be a header.
// Subsequent records may be in any order, including additional headers.
oneof record {
AdsbHeader header = 1;
AdsbPacket mode_ac = 2;
@@ -23,6 +57,9 @@ message Adsb {
}
}
// adsbus proto serialization takes advantage of the fact that an AdsbStream
// with many messages and many AdsbStreams each with a single message encode
// identically.
message AdsbStream {
repeated Adsb msg = 1;
}

View File

@@ -12,7 +12,7 @@ level data.
## Frame structure
* `*` (`0x2a`)
* Uppercase hex-encoded 2, 7, or 14 byte frame (4, 14, or 28 bytes after encoding)
* Uppercase hex-encoded 4, 14, or 28 byte payload (2, 7, or 14 bytes when decoded)
* `;` (`0x3b`)
* 4 byte MLAT timestamp (see below)
* `;` (`0x3b`)

78
protocols/json.md Normal file
View File

@@ -0,0 +1,78 @@
# JSON protocol
This protocol was created by adsb-tools. This specification is official.
## Format
[JSON streaming](https://en.wikipedia.org/wiki/JSON_Streaming) encoding, line delimited.
Each line must contain a single outer JSON object; no other outer types are permitted.
First frame must always be a header; frames can otherwise appear in any order, including
additional headers.
## Common fields
* `type` (string): one of:
* `"header"` (see [Header](#header))
* `"Mode-AC"` (see [Packet](#packet))
* `"Mode-S short"` (see [Packet](#packet))
* `"Mode-S long"` (see [Packet](#packet))
## Header
* `type`:
* String
* `"header"`
* `magic`:
* String
* `"aDsB"`
* `server_version`:
* String
* Unique identifier for this server implementation
* Recommended: `"https://url/of/source#version"`
* `server_id`:
* String
* Unique identifier for this server instance
* [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended
* 36 character limit
* `mlat_timestamp_mhz`:
* Integer
* MHz of the clock used in subsequent `mlat_timestamp` fields
* `mlat_timestamp_max`:
* Integer
* Maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap
* `rssi_max`:
* Integer
* Maximum value of subsequent `rssi` fields
## Packet
* `type`:
* String
* One of:
* `"Mode-AC"` (4 byte payload, 2 bytes when decoded)
* `"Mode-S short"` (14 byte payload, 7 bytes when decoded)
* `"Mode-S long"` (28 byte payload, 14 bytes when decoded)
* `source_id`:
* String
* Unique value for the source that recorded this packet.
* [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended
* 36 character limit
* `mlat_timestamp`:
* Integer
* Value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder
* Range [0, `mlat_timestamp_max`]
* Units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz
* `rssi`:
* Integer
* [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the received packet at the recorder
* Range [0, `rssi_max`]
* Units unspecified
* `payload`:
* Upper-case, hex-encoded
* See `type` for length
## Examples
* `{"mlat_timestamp_mhz": 120, "type": "header", "magic": "aDsB", "server_version": "https://github.com/flamingcowtv/adsb-tools#1", "server_id": "fba76102-c39a-4c4e-af7c-ddd4ec0d45e2", "mlat_timestamp_max": 9223372036854775807, "rssi_max": 4294967295}\n`
* `{"payload": "02C58939D0B3C5", "type": "Mode-S short", "rssi": 269488144, "source_id": "f432c867-4108-4927-ba1f-1cfa71709bc4", "mlat_timestamp": 247651683709560}\n`
* `{"payload": "A8000B0B10010680A600003E4A72", "type": "Mode-S long", "rssi": 2206434179, "source_id": "f432c867-4108-4927-ba1f-1cfa71709bc4", "mlat_timestamp": 247651683777900}\n`

28
protocols/proto.md Normal file
View File

@@ -0,0 +1,28 @@
# Protobuf protocol
This protocol was created by adsb-tools. This specification is official.
## Format
[Protocol buffer](https://developers.google.com/protocol-buffers/docs/overview) encoding.
Proto definition is [here](https://github.com/flamingcowtv/adsb-tools/blob/master/proto/adsb.proto).
Stream is a series of Adsb records with length encoded in a prefix (protobuf isn't self
delimiting). The prefix is structured such that an entire stream is a valid AdsbStream
record.
First frame must always be an AdsbHeader; frames can otherwise appear in any order, including
additional headers.
## Prefix
* `0x0a`: in protobuf encoding, field #1, type 2 (length-encoded bytes); (1 << 3) | 2
* Length of packet not including prefix, encoded as a [base 128 varint](https://developers.google.com/protocol-buffers/docs/encoding#varints)
## Packet
See [definition file](https://github.com/flamingcowtv/adsb-tools/blob/master/proto/adsb.proto)
for details.
## Tips
To decode a stream file:
`$ protoc-c --proto_path=adsb-tools/proto --decode=AdsbStream adsb-tools/proto/adsb.proto < streamfile`

View File

@@ -14,7 +14,7 @@ Simple textual format.
## Frame structure
* `*` (`0x2a`)
* Uppercase hex-encoded 2, 7, or 14 byte frame (4, 14, or 28 bytes after encoding)
* Uppercase hex-encoded 4, 14, or 28 byte payload (2, 7, or 14 bytes when decoded)
* `;` (`0x3b`)
* `\n` (`0x0a`) **OR** `\r\n` (`0x0d 0x0a`)