From 161ea56d4553ee4ae65d1bcfc8587373b9318cb4 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 12:34:43 -0800 Subject: [PATCH 01/35] Corral the last setsockopt() call into one place. --- adsbus/incoming.c | 3 +-- adsbus/socket.c | 5 +++++ adsbus/socket.h | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 70e4a89..934d3b1 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -110,8 +110,7 @@ static void incoming_listen(struct incoming *incoming) { incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol); assert(incoming->peer.fd >= 0); - int optval = 1; - setsockopt(incoming->peer.fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); + socket_pre_bind_init(incoming->peer.fd); if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); diff --git a/adsbus/socket.c b/adsbus/socket.c index 19e3ce1..89bd56d 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -7,6 +7,11 @@ #include "socket.h" +void socket_pre_bind_init(int fd) { + int optval = 1; + assert(!setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval))); +} + void socket_bound_init(int fd) { int qlen = 5; assert(!setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen))); diff --git a/adsbus/socket.h b/adsbus/socket.h index a7e6e27..12de4af 100644 --- a/adsbus/socket.h +++ b/adsbus/socket.h @@ -1,4 +1,5 @@ #pragma once +void socket_pre_bind_init(int); void socket_bound_init(int); void socket_connected_init(int); From fdafe2e055d20e50381831d496ad4995e6605f42 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 12:45:00 -0800 Subject: [PATCH 02/35] Convenience function for simple hello. --- adsbus/opts.c | 14 +------------- adsbus/send.c | 12 ++++++++++++ adsbus/send.h | 1 + 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/adsbus/opts.c b/adsbus/opts.c index 3a833e9..e805809 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -98,17 +98,5 @@ bool opts_add_stdout(char *arg) { if (!serializer) { return false; } - int fd = dup(1); - { - // TODO: move into standard location for non-socket fd handling - struct buf buf = BUF_INIT, *buf_ptr = &buf; - send_hello(&buf_ptr, serializer); - if (buf_ptr->length) { - if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) { - return false; - } - } - } - send_new(fd, serializer, NULL); - return true; + return send_new_hello(dup(1), serializer, NULL); } diff --git a/adsbus/send.c b/adsbus/send.c index 551cc0d..073c7e4 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -136,6 +136,18 @@ void send_new_wrapper(int fd, void *passthrough, struct peer *on_close) { send_new(fd, (struct serializer *) passthrough, on_close); } +bool send_new_hello(int fd, struct serializer *serializer, struct peer *on_close) { + struct buf buf = BUF_INIT, *buf_ptr = &buf; + send_hello(&buf_ptr, serializer); + if (buf_ptr->length) { + if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) { + return false; + } + } + send_new(fd, serializer, on_close); + return true; +} + void send_hello(struct buf **buf_pp, void *passthrough) { struct serializer *serializer = (struct serializer *) passthrough; if (serializer->hello) { diff --git a/adsbus/send.h b/adsbus/send.h index fdc3d67..a50fa3f 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -9,6 +9,7 @@ void send_cleanup(void); struct serializer *send_get_serializer(char *); void send_new(int, struct serializer *, struct peer *); void send_new_wrapper(int, void *, struct peer *); +bool send_new_hello(int, struct serializer *, struct peer *); void send_hello(struct buf **, void *); void send_write(struct packet *); void send_print_usage(void); From e86ea976715e3c9bc521b93019976a75daf2d2c6 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 13:08:04 -0800 Subject: [PATCH 03/35] --file-{read,write,append} --- adsbus/adsbus.c | 18 ++++++++++++++++++ adsbus/opts.c | 38 ++++++++++++++++++++++++++++++++++++++ adsbus/opts.h | 3 +++ adsbus/send.c | 1 + 4 files changed, 60 insertions(+) diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 7357a60..822400e 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -32,6 +32,9 @@ static void print_usage(const char *name) { "\t--connect-send=FORMAT=HOST/PORT\n" "\t--listen-receive=[HOST/]PORT\n" "\t--listen-send=FORMAT=[HOST/]PORT\n" + "\t--file-read=PATH\n" + "\t--file-write=FORMAT=PATH\n" + "\t--file-append=FORMAT=PATH\n" "\t--stdin\n" "\t--stdout=FORMAT\n" , name); @@ -45,6 +48,9 @@ static bool parse_opts(int argc, char *argv[]) { {"connect-send", required_argument, 0, 's'}, {"listen-receive", required_argument, 0, 'l'}, {"listen-send", required_argument, 0, 'm'}, + {"file-read", required_argument, 0, 'r'}, + {"file-write", required_argument, 0, 'w'}, + {"file-append", required_argument, 0, 'a'}, {"stdin", no_argument, 0, 'i'}, {"stdout", required_argument, 0, 'o'}, {"help", no_argument, 0, 'h'}, @@ -71,6 +77,18 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_listen_send; break; + case 'r': + handler = opts_add_file_read; + break; + + case 'w': + handler = opts_add_file_write; + break; + + case 'a': + handler = opts_add_file_append; + break; + case 'i': handler = opts_add_stdin; break; diff --git a/adsbus/opts.c b/adsbus/opts.c index e805809..bef8bd2 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -1,5 +1,8 @@ +#include #include #include +#include +#include #include #include "buf.h" @@ -88,6 +91,41 @@ bool opts_add_listen_send(char *arg) { return true; } +bool opts_add_file_read(char *arg) { + int fd = open(arg, O_RDONLY | O_CLOEXEC); + if (fd == -1) { + return false; + } + receive_new(fd, NULL, NULL); + return true; +} + +bool opts_add_file_write(char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { + return NULL; + } + + int fd = open(arg, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC | O_CLOEXEC, S_IRWXU); + if (fd == -1) { + return false; + } + return send_new_hello(fd, serializer, NULL); +} + +bool opts_add_file_append(char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { + return NULL; + } + + int fd = open(arg, O_WRONLY | O_CREAT | O_NOFOLLOW | O_CLOEXEC, S_IRWXU); + if (fd == -1) { + return false; + } + return send_new_hello(fd, serializer, NULL); +} + bool opts_add_stdin(char __attribute__((unused)) *arg) { receive_new(dup(0), NULL, NULL); return true; diff --git a/adsbus/opts.h b/adsbus/opts.h index b1f4c42..b27161d 100644 --- a/adsbus/opts.h +++ b/adsbus/opts.h @@ -6,5 +6,8 @@ bool opts_add_connect_receive(char *); bool opts_add_connect_send(char *); bool opts_add_listen_receive(char *); bool opts_add_listen_send(char *); +bool opts_add_file_read(char *); +bool opts_add_file_write(char *); +bool opts_add_file_append(char *); bool opts_add_stdout(char *); bool opts_add_stdin(char *); diff --git a/adsbus/send.c b/adsbus/send.c index 073c7e4..43221d4 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -141,6 +141,7 @@ bool send_new_hello(int fd, struct serializer *serializer, struct peer *on_close send_hello(&buf_ptr, serializer); if (buf_ptr->length) { if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) { + assert(!close(fd)); return false; } } From b9d179b957787670caa371af7944f0be4f4bb4d5 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 13:11:32 -0800 Subject: [PATCH 04/35] Largefile support. --- adsbus/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adsbus/Makefile b/adsbus/Makefile index 2c299ac..9e454e8 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -1,6 +1,6 @@ COMP ?= clang DISABLED_WARNINGS ?= -Wno-padded -Wno-disabled-macro-expansion -CFLAGS ?= -Weverything -Werror -O3 -g --std=gnu11 --pedantic-errors -fPIE -fstack-protector-strong -pthread -D_GNU_SOURCE $(DISABLED_WARNINGS) +CFLAGS ?= -Weverything -Werror -O3 -g --std=gnu11 --pedantic-errors -fPIE -fstack-protector-strong -pthread -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 $(DISABLED_WARNINGS) LDFLAGS ?= $(CFLAGS) -Wl,-z,relro -Wl,-z,now -pie LIBS ?= -ljansson -lprotobuf-c From 29d0b33161b0ea67ce38dba9b692c1ae22ed9b99 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 13:15:12 -0800 Subject: [PATCH 05/35] Set FD_CLOEXEC after dup() --- adsbus/opts.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/adsbus/opts.c b/adsbus/opts.c index bef8bd2..7a98824 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -127,7 +128,9 @@ bool opts_add_file_append(char *arg) { } bool opts_add_stdin(char __attribute__((unused)) *arg) { - receive_new(dup(0), NULL, NULL); + int fd = dup(0); + assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); + receive_new(fd, NULL, NULL); return true; } @@ -136,5 +139,7 @@ bool opts_add_stdout(char *arg) { if (!serializer) { return false; } - return send_new_hello(dup(1), serializer, NULL); + int fd = dup(1); + assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); + return send_new_hello(fd, serializer, NULL); } From 58a8576da881070f7755f391a63aca4a4e00758b Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 15:53:55 -0800 Subject: [PATCH 06/35] Add --exec-{send,receive} --- adsbus/Makefile | 6 +- adsbus/adsbus.c | 14 +++++ adsbus/exec.c | 150 ++++++++++++++++++++++++++++++++++++++++++++++ adsbus/exec.h | 11 ++++ adsbus/opts.c | 16 +++++ adsbus/opts.h | 2 + adsbus/outgoing.c | 2 +- adsbus/rand.c | 2 +- 8 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 adsbus/exec.c create mode 100644 adsbus/exec.h diff --git a/adsbus/Makefile b/adsbus/Makefile index 9e454e8..0c345da 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -10,7 +10,7 @@ VALGRIND ?= valgrind VALGRIND_FLAGS ?= --error-exitcode=1 --track-fds=yes --show-leak-kinds=all --leak-check=full ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats -OBJ_NETWORK = incoming.o outgoing.o receive.o send.o +OBJ_TRANSPORT = exec.o incoming.o outgoing.o receive.o send.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o @@ -26,8 +26,8 @@ clean: adsb.pb-c.c: ../proto/adsb.proto protoc-c --c_out=./ --proto_path=$(dir $<) $< -adsbus: adsbus.o $(OBJ_NETWORK) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) - $(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_NETWORK) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS) +adsbus: adsbus.o $(OBJ_TRANSPORT) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) + $(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_TRANSPORT) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS) fuzz: rm -rf findings diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 822400e..0a96bbd 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -6,6 +6,7 @@ #include #include "beast.h" +#include "exec.h" #include "hex.h" #include "incoming.h" #include "json.h" @@ -35,6 +36,8 @@ static void print_usage(const char *name) { "\t--file-read=PATH\n" "\t--file-write=FORMAT=PATH\n" "\t--file-append=FORMAT=PATH\n" + "\t--exec-receive=COMMAND\n" + "\t--exec-send=FORMAT=COMMAND\n" "\t--stdin\n" "\t--stdout=FORMAT\n" , name); @@ -51,6 +54,8 @@ static bool parse_opts(int argc, char *argv[]) { {"file-read", required_argument, 0, 'r'}, {"file-write", required_argument, 0, 'w'}, {"file-append", required_argument, 0, 'a'}, + {"exec-receive", required_argument, 0, 'e'}, + {"exec-send", required_argument, 0, 'f'}, {"stdin", no_argument, 0, 'i'}, {"stdout", required_argument, 0, 'o'}, {"help", no_argument, 0, 'h'}, @@ -89,6 +94,14 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_file_append; break; + case 'e': + handler = opts_add_exec_receive; + break; + + case 'f': + handler = opts_add_exec_send; + break; + case 'i': handler = opts_add_stdin; break; @@ -151,6 +164,7 @@ int main(int argc, char *argv[]) { send_cleanup(); incoming_cleanup(); outgoing_cleanup(); + exec_cleanup(); json_cleanup(); proto_cleanup(); diff --git a/adsbus/exec.c b/adsbus/exec.c new file mode 100644 index 0000000..9a3cceb --- /dev/null +++ b/adsbus/exec.c @@ -0,0 +1,150 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "buf.h" +#include "list.h" +#include "peer.h" +#include "uuid.h" +#include "wakeup.h" + +#include "exec.h" + +struct exec { + struct peer peer; + uint8_t id[UUID_LEN]; + char *command; + exec_connection_handler handler; + exec_get_hello hello; + void *passthrough; + uint32_t *count; + pid_t child; + struct list_head exec_list; +}; + +static struct list_head exec_head = LIST_HEAD_INIT(exec_head); + +static void exec_spawn_wrapper(struct peer *); + +static void exec_del(struct exec *exec) { + (*exec->count)--; + if (exec->child > 0) { + fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child); + // Racy with the process terminating, so don't assert on it + kill(exec->child, SIGTERM); + assert(waitpid(exec->child, NULL, 0) == exec->child); + } + free(exec->command); + free(exec); +} + +static void exec_close_handler(struct peer *peer) { + struct exec *exec = (struct exec *) peer; + int status; + assert(waitpid(exec->child, &status, WNOHANG) == exec->child); + exec->child = -1; + if (WIFEXITED(status)) { + fprintf(stderr, "E %s: Client exited with status %d\n", exec->id, WEXITSTATUS(status)); + } else { + assert(WIFSIGNALED(status)); + fprintf(stderr, "E %s: Client exited with signal %d\n", exec->id, WTERMSIG(status)); + } + uint32_t delay = wakeup_get_retry_delay_ms(1); + fprintf(stderr, "E %s: Will retry in %ds\n", exec->id, delay / 1000); + exec->peer.event_handler = exec_spawn_wrapper; + wakeup_add((struct peer *) exec, delay); +} + +static bool exec_hello(int fd, struct exec *exec) { + if (!exec->hello) { + return true; + } + struct buf buf = BUF_INIT, *buf_ptr = &buf; + exec->hello(&buf_ptr, exec->passthrough); + if (!buf_ptr->length) { + return true; + } + return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); +} + +static void exec_parent(struct exec *exec, pid_t child, int fd) { + exec->child = child; + fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child); + + if (!exec_hello(fd, exec)) { + assert(!close(fd)); + exec_close_handler((struct peer *) exec); + return; + } + + exec->peer.event_handler = exec_close_handler; + exec->handler(fd, exec->passthrough, (struct peer *) exec); +} + +static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { + assert(setsid() != -1); + // We leave stderr open from child to parent + // Other than that, fds should have CLOEXEC set + if (fd != 0) { + assert(dup2(fd, 0) == 0); + } + if (fd != 1) { + assert(dup2(fd, 1) == 1); + } + if (fd != 0 && fd != 1) { + assert(!close(fd)); + } + assert(!execl("/bin/sh", "sh", "-c", exec->command, NULL)); + abort(); +} + +static void exec_spawn(struct exec *exec) { + fprintf(stderr, "E %s: Executing: %s\n", exec->id, exec->command); + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds)); + + int res = fork(); + assert(res >= 0); + if (res) { + assert(!close(fds[1])); + exec_parent(exec, res, fds[0]); + } else { + assert(!close(fds[0])); + exec_child(exec, fds[1]); + } +} + +static void exec_spawn_wrapper(struct peer *peer) { + struct exec *exec = (struct exec *) peer; + exec_spawn(exec); +} + +void exec_cleanup() { + struct exec *iter, *next; + list_for_each_entry_safe(iter, next, &exec_head, exec_list) { + exec_del(iter); + } +} + +void exec_new(char *command, exec_connection_handler handler, exec_get_hello hello, void *passthrough, uint32_t *count) { + (*count)++; + + struct exec *exec = malloc(sizeof(*exec)); + exec->peer.fd = -1; + uuid_gen(exec->id); + exec->command = strdup(command); + exec->handler = handler; + exec->hello = hello; + exec->passthrough = passthrough; + exec->count = count; + + list_add(&exec->exec_list, &exec_head); + + exec_spawn(exec); +} diff --git a/adsbus/exec.h b/adsbus/exec.h new file mode 100644 index 0000000..2429f17 --- /dev/null +++ b/adsbus/exec.h @@ -0,0 +1,11 @@ +#pragma once + +#include + +struct buf; +struct peer; + +void exec_cleanup(void); +typedef void (*exec_connection_handler)(int fd, void *, struct peer *); +typedef void (*exec_get_hello)(struct buf **, void *); +void exec_new(char *, exec_connection_handler, exec_get_hello, void *, uint32_t *); diff --git a/adsbus/opts.c b/adsbus/opts.c index 7a98824..f3b4475 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -7,6 +7,7 @@ #include #include "buf.h" +#include "exec.h" #include "incoming.h" #include "outgoing.h" #include "peer.h" @@ -127,6 +128,21 @@ bool opts_add_file_append(char *arg) { return send_new_hello(fd, serializer, NULL); } +bool opts_add_exec_receive(char *arg) { + exec_new(arg, receive_new, NULL, NULL, &peer_count_in); + return true; +} + +bool opts_add_exec_send(char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { + return NULL; + } + + exec_new(arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + return true; +} + bool opts_add_stdin(char __attribute__((unused)) *arg) { int fd = dup(0); assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); diff --git a/adsbus/opts.h b/adsbus/opts.h index b27161d..e6a9013 100644 --- a/adsbus/opts.h +++ b/adsbus/opts.h @@ -9,5 +9,7 @@ bool opts_add_listen_send(char *); bool opts_add_file_read(char *); bool opts_add_file_write(char *); bool opts_add_file_append(char *); +bool opts_add_exec_receive(char *); +bool opts_add_exec_send(char *); bool opts_add_stdout(char *); bool opts_add_stdin(char *); diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 37feac2..52b6629 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -41,7 +41,7 @@ static void outgoing_resolve(struct outgoing *); static void outgoing_resolve_wrapper(struct peer *); static void outgoing_retry(struct outgoing *outgoing) { - uint32_t delay = wakeup_get_retry_delay_ms(outgoing->attempt++); + uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt); fprintf(stderr, "O %s: Will retry in %ds\n", outgoing->id, delay / 1000); outgoing->peer.event_handler = outgoing_resolve_wrapper; wakeup_add((struct peer *) outgoing, delay); diff --git a/adsbus/rand.c b/adsbus/rand.c index c7027ec..82d6563 100644 --- a/adsbus/rand.c +++ b/adsbus/rand.c @@ -16,7 +16,7 @@ static struct buf rand_buf = BUF_INIT; static int rand_fd; void rand_init() { - rand_fd = open("/dev/urandom", O_RDONLY); + rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC); assert(rand_fd >= 0); assert(read(rand_fd, buf_at(&rand_buf, 0), BUF_LEN_MAX) == BUF_LEN_MAX); rand_buf.length = BUF_LEN_MAX; From 6bee996d5a701a93fc12b2ebb6be78f1041d60b6 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 15:58:11 -0800 Subject: [PATCH 07/35] Trace children in valgrind. --- adsbus/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adsbus/Makefile b/adsbus/Makefile index 0c345da..0297916 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -7,7 +7,7 @@ LIBS ?= -ljansson -lprotobuf-c TESTCASE_DIR ?= testcase TESTOUT_DIR ?= testout VALGRIND ?= valgrind -VALGRIND_FLAGS ?= --error-exitcode=1 --track-fds=yes --show-leak-kinds=all --leak-check=full +VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats OBJ_TRANSPORT = exec.o incoming.o outgoing.o receive.o send.o From 05a67df06f17b8c55ba0021f39d4a130c63ffb86 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 18:36:12 -0800 Subject: [PATCH 08/35] Clearer language about hex string lengths. --- protocols/airspy_adsb.md | 2 +- protocols/raw.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/airspy_adsb.md b/protocols/airspy_adsb.md index 21df429..1e9ae32 100644 --- a/protocols/airspy_adsb.md +++ b/protocols/airspy_adsb.md @@ -12,7 +12,7 @@ level data. ## Frame structure * `*` (`0x2a`) -* Uppercase hex-encoded 2, 7, or 14 byte frame (4, 14, or 28 bytes after encoding) +* Uppercase hex-encoded 4, 14, or 28 byte payload (2, 7, or 14 bytes when decoded) * `;` (`0x3b`) * 4 byte MLAT timestamp (see below) * `;` (`0x3b`) diff --git a/protocols/raw.md b/protocols/raw.md index 8dc6ecd..427ede0 100644 --- a/protocols/raw.md +++ b/protocols/raw.md @@ -14,7 +14,7 @@ Simple textual format. ## Frame structure * `*` (`0x2a`) -* Uppercase hex-encoded 2, 7, or 14 byte frame (4, 14, or 28 bytes after encoding) +* Uppercase hex-encoded 4, 14, or 28 byte payload (2, 7, or 14 bytes when decoded) * `;` (`0x3b`) * `\n` (`0x0a`) **OR** `\r\n` (`0x0d 0x0a`) From 4f9d9af660411c1ee84c4ccd7ede3c2b1b3d06b7 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 18:56:37 -0800 Subject: [PATCH 09/35] JSON protocol description start. --- protocols/json.md | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 protocols/json.md diff --git a/protocols/json.md b/protocols/json.md new file mode 100644 index 0000000..84fe783 --- /dev/null +++ b/protocols/json.md @@ -0,0 +1,45 @@ +# JSON protocol + +This protocol was created by adsb-tools. This specification is official. + +## Format + +[JSON streaming](https://en.wikipedia.org/wiki/JSON_Streaming) encoding, line delimited. +Each line must contain a single outer JSON object; no other types are permitted. + +First frame must always be a header; frames can otherwise appear in any order, including +additional headers. + +## Common fields +* `type` (string): one of: + * `header` (see [Header]) + * `Mode-AC` (see [Packet]) + * `Mode-S short` (see [Packet]) + * `Mode-S long` (see [Packet]) + + +## Header +* `type`: `header` +* `magic`: `aDsB` +* `server_version`: (string) +* `server_id`: (string) unique identifier for this server instance. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit +* `mlat_timestamp_mhz`: (integer) MHz of the clock used in subsequent `mlat_timestamp` fields +* `mlat_timestamp_max`: (integer) maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap +* `rssi_max`: (integer) maximum value of subsequent `rssi` fields + + +## Packet +* `type`: (string): one of: + * `Mode-AC` (see [Packet]; 4 byte payload, 2 bytes when decoded) + * `Mode-S short` (see [Packet]; 14 byte payload, 7 bytes when decoded) + * `Mode-S long` (see [Packet]; 28 byte payload, 14 bytes when decoded) +* `source_id`: (string) unique value for the source that recorded this packet. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit +* `mlat_timestamp`: (integer) value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder, range [0, `mlat_timestamp_max`], in units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz +* `rssi`: (integer) [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the receiver packet at the recorder, range [0, `rssi_max`], units unspecified +* `payload`: upper-case, hex-encoded. see `type` for length + + +## Examples +* `{"mlat_timestamp_mhz": 120, "type": "header", "magic": "aDsB", "server_version": "https://github.com/flamingcowtv/adsb-tools#1", "server_id": "fba76102-c39a-4c4e-af7c-ddd4ec0d45e2", "mlat_timestamp_max": 9223372036854775807, "rssi_max": 4294967295}\n` +* `{"payload": "02C58939D0B3C5", "type": "Mode-S short", "rssi": 269488144, "source_id": "f432c867-4108-4927-ba1f-1cfa71709bc4", "mlat_timestamp": 247651683709560}\n` +* `{"payload": "A8000B0B10010680A600003E4A72", "type": "Mode-S long", "rssi": 2206434179, "source_id": "f432c867-4108-4927-ba1f-1cfa71709bc4", "mlat_timestamp": 247651683777900}\n` From 1c9eec7cf78f63f8bb0cc0dd7d4a46abb5b41820 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 19:02:12 -0800 Subject: [PATCH 10/35] Minor cleanups. --- protocols/json.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/protocols/json.md b/protocols/json.md index 84fe783..94d6b7e 100644 --- a/protocols/json.md +++ b/protocols/json.md @@ -5,23 +5,23 @@ This protocol was created by adsb-tools. This specification is official. ## Format [JSON streaming](https://en.wikipedia.org/wiki/JSON_Streaming) encoding, line delimited. -Each line must contain a single outer JSON object; no other types are permitted. +Each line must contain a single outer JSON object; no other outer types are permitted. First frame must always be a header; frames can otherwise appear in any order, including additional headers. ## Common fields * `type` (string): one of: - * `header` (see [Header]) - * `Mode-AC` (see [Packet]) - * `Mode-S short` (see [Packet]) - * `Mode-S long` (see [Packet]) + * `header` (see [Header](#header)) + * `Mode-AC` (see [Packet](#packet)) + * `Mode-S short` (see [Packet](#packet)) + * `Mode-S long` (see [Packet](#packet)) ## Header * `type`: `header` * `magic`: `aDsB` -* `server_version`: (string) +* `server_version`: (string) unqiue identifier for this server implementation. `https://url/of/source#version` recommended * `server_id`: (string) unique identifier for this server instance. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit * `mlat_timestamp_mhz`: (integer) MHz of the clock used in subsequent `mlat_timestamp` fields * `mlat_timestamp_max`: (integer) maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap @@ -30,9 +30,9 @@ additional headers. ## Packet * `type`: (string): one of: - * `Mode-AC` (see [Packet]; 4 byte payload, 2 bytes when decoded) - * `Mode-S short` (see [Packet]; 14 byte payload, 7 bytes when decoded) - * `Mode-S long` (see [Packet]; 28 byte payload, 14 bytes when decoded) + * `Mode-AC` (see [Packet](#packet); 4 byte payload, 2 bytes when decoded) + * `Mode-S short` (see [Packet](#packet); 14 byte payload, 7 bytes when decoded) + * `Mode-S long` (see [Packet](#packet); 28 byte payload, 14 bytes when decoded) * `source_id`: (string) unique value for the source that recorded this packet. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit * `mlat_timestamp`: (integer) value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder, range [0, `mlat_timestamp_max`], in units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz * `rssi`: (integer) [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the receiver packet at the recorder, range [0, `rssi_max`], units unspecified From 388741d3e0e503393dd4757a31e3c576aae61ce4 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 19:16:13 -0800 Subject: [PATCH 11/35] Proto definition. --- protocols/proto.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 protocols/proto.md diff --git a/protocols/proto.md b/protocols/proto.md new file mode 100644 index 0000000..7b8bf01 --- /dev/null +++ b/protocols/proto.md @@ -0,0 +1,27 @@ +# Protobuf protocol + +This protocol was created by adsb-tools. This specification is official. + +## Format + +[Protocol buffer](https://developers.google.com/protocol-buffers/docs/overview) encoding. +Proto definition is [here](https://github.com/flamingcowtv/adsb-tools/blob/master/proto/adsb.proto). +Stream is a series of Adsb records with length encoded in a prefix (protobuf isn't self +delimiting). The prefix is structured such that an entire stream is a valid AdsbStream +record. + +First frame must always be an AdsbHeader; frames can otherwise appear in any order, including +additional headers. + +## Prefix +* `0x0a`: in protobuf encoding, field #1, type 2 (length-encoded bytes); (1 << 3) | 2 +* Length of packet not including prefix, encoded as a [base 128 varint](https://developers.google.com/protocol-buffers/docs/encoding#varints) + +## Packet +See [definition file](https://github.com/flamingcowtv/adsb-tools/blob/master/proto/adsb.proto) +for details. + +## Tips + +To decode a stream file: +`$ protoc-c --proto_path=adsb-tools/proto --decode=AdsbStream adsb-tools/proto/adsb.proto < streamfile` From 5f99768ab3552d7b5c504101b565806e07e62436 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 19:49:06 -0800 Subject: [PATCH 12/35] Update build instructions. --- adsbus/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adsbus/README.md b/adsbus/README.md index f821924..06783f6 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -6,8 +6,8 @@ adsbus is a hub and protocol translator for [ADS-B](https://en.wikipedia.org/wik ## Building ```bash -sudo apt-get -y install uuid-dev libjansson-dev +sudo apt-get -y install build-essential git clang libjansson-dev libprotobuf-c-dev protobuf-c-compiler git clone https://github.com/flamingcowtv/adsb-tools.git -cd adsb-tools +cd adsb-tools/adsbus make ``` From 68fbe187b1c5471dbf666d56ac8054d87a74e77b Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 19:59:04 -0800 Subject: [PATCH 13/35] More protocol docs. --- proto/adsb.proto | 37 +++++++++++++++++++++++++++++++++++++ protocols/json.md | 4 ++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/proto/adsb.proto b/proto/adsb.proto index 870d215..aa0d6a6 100644 --- a/proto/adsb.proto +++ b/proto/adsb.proto @@ -1,20 +1,54 @@ message AdsbHeader { + // Always "aDsB" required string magic = 1; + + // Unique identifier for this server implementation + // Recommended: "https://url/of/source#version" required string server_version = 2; + + // Unique identifier for this server instance + // UUID recommended + // 36 character limit required string server_id = 3; + + // MHz of the clock used in subsequent mlat_timestamp fields required fixed32 mlat_timestamp_mhz = 4; + + // Maximum value of subsequent mlat_timestamp fields, at which point values are expected to wrap required fixed64 mlat_timestamp_max = 5; + + // Maximum value of subsequent rssi fields required fixed32 rssi_max = 6; } message AdsbPacket { + // Unique value for the source that recorded this packet + // UUID recommended + // 36 character limit required string source_id = 1; + + // Value of the MLAT counter when this packet arrived at the recorder + // Range [0, mlat_timestamp_max] + // Units of 1 / (mlat_timestamp_mhz * 10^6) Hz optional fixed64 mlat_timestamp = 2; + + // RSSI of the received packet at the recorder + // Range [0, rssi_max] + // Units unspecified optional fixed32 rssi = 3; + + // Binary packet payload. + // Length: + // mode_ac: 2 bytes + // mode_s_short: 7 bytes + // mode_s_long: 14 bytes required bytes payload = 4; } message Adsb { + // Each message must contain exactly one; zero is invalid. + // The first record of a stream must be a header. + // Subsequent records may be in any order, including additional headers. oneof record { AdsbHeader header = 1; AdsbPacket mode_ac = 2; @@ -23,6 +57,9 @@ message Adsb { } } +// adsbus proto serialization takes advantage of the fact that an AdsbStream +// with many messages and many AdsbStreams each with a single message encode +// identically. message AdsbStream { repeated Adsb msg = 1; } diff --git a/protocols/json.md b/protocols/json.md index 94d6b7e..b0d1152 100644 --- a/protocols/json.md +++ b/protocols/json.md @@ -21,7 +21,7 @@ additional headers. ## Header * `type`: `header` * `magic`: `aDsB` -* `server_version`: (string) unqiue identifier for this server implementation. `https://url/of/source#version` recommended +* `server_version`: (string) unique identifier for this server implementation. `https://url/of/source#version` recommended * `server_id`: (string) unique identifier for this server instance. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit * `mlat_timestamp_mhz`: (integer) MHz of the clock used in subsequent `mlat_timestamp` fields * `mlat_timestamp_max`: (integer) maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap @@ -35,7 +35,7 @@ additional headers. * `Mode-S long` (see [Packet](#packet); 28 byte payload, 14 bytes when decoded) * `source_id`: (string) unique value for the source that recorded this packet. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit * `mlat_timestamp`: (integer) value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder, range [0, `mlat_timestamp_max`], in units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz -* `rssi`: (integer) [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the receiver packet at the recorder, range [0, `rssi_max`], units unspecified +* `rssi`: (integer) [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the received packet at the recorder, range [0, `rssi_max`], units unspecified * `payload`: upper-case, hex-encoded. see `type` for length From 31dd90fea27390c4973d57e6c3eae558174429c0 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 20:03:25 -0800 Subject: [PATCH 14/35] Layout cleanup. --- protocols/json.md | 63 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/protocols/json.md b/protocols/json.md index b0d1152..0fcc57d 100644 --- a/protocols/json.md +++ b/protocols/json.md @@ -19,24 +19,57 @@ additional headers. ## Header -* `type`: `header` -* `magic`: `aDsB` -* `server_version`: (string) unique identifier for this server implementation. `https://url/of/source#version` recommended -* `server_id`: (string) unique identifier for this server instance. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit -* `mlat_timestamp_mhz`: (integer) MHz of the clock used in subsequent `mlat_timestamp` fields -* `mlat_timestamp_max`: (integer) maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap -* `rssi_max`: (integer) maximum value of subsequent `rssi` fields +* `type`: + * String + * `header` +* `magic`: + * String + * `aDsB` +* `server_version`: + * String + * Unique identifier for this server implementation + * Recommended: `https://url/of/source#version` +* `server_id`: + * String + * Unique identifier for this server instance + * [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended + * 36 character limit +* `mlat_timestamp_mhz`: + * Integer + * MHz of the clock used in subsequent `mlat_timestamp` fields +* `mlat_timestamp_max`: + * Integer + * Maximum value of subsequent `mlat_timestamp` fields, at which point values are expected to wrap +* `rssi_max`: + * Integer + * Maximum value of subsequent `rssi` fields ## Packet -* `type`: (string): one of: - * `Mode-AC` (see [Packet](#packet); 4 byte payload, 2 bytes when decoded) - * `Mode-S short` (see [Packet](#packet); 14 byte payload, 7 bytes when decoded) - * `Mode-S long` (see [Packet](#packet); 28 byte payload, 14 bytes when decoded) -* `source_id`: (string) unique value for the source that recorded this packet. [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended; 36 character limit -* `mlat_timestamp`: (integer) value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder, range [0, `mlat_timestamp_max`], in units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz -* `rssi`: (integer) [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the received packet at the recorder, range [0, `rssi_max`], units unspecified -* `payload`: upper-case, hex-encoded. see `type` for length +* `type`: + * String + * One of: + * `Mode-AC` (4 byte payload, 2 bytes when decoded) + * `Mode-S short` (14 byte payload, 7 bytes when decoded) + * `Mode-S long` (28 byte payload, 14 bytes when decoded) +* `source_id`: + * String + * Unique value for the source that recorded this packet. + * [UUID](https://en.wikipedia.org/wiki/Universally_unique_identifier) recommended + * 36 character limit +* `mlat_timestamp`: + * Integer + * Value of the [MLAT](https://en.wikipedia.org/wiki/Multilateration) counter when this packet arrived at the recorder + * Range [0, `mlat_timestamp_max`] + * Units of 1 / (`mlat_timestamp_mhz` * 10^6) Hz +* `rssi`: + * Integer + * [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) of the received packet at the recorder + * Range [0, `rssi_max`] + * Units unspecified +* `payload`: + * Upper-case, hex-encoded + * See `type` for length ## Examples From 35d5a1f8832c604979c4f9bcd08a2b9f0805bf9f Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 20:05:02 -0800 Subject: [PATCH 15/35] More formatting. --- protocols/json.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/protocols/json.md b/protocols/json.md index 0fcc57d..5cf2b9e 100644 --- a/protocols/json.md +++ b/protocols/json.md @@ -12,23 +12,23 @@ additional headers. ## Common fields * `type` (string): one of: - * `header` (see [Header](#header)) - * `Mode-AC` (see [Packet](#packet)) - * `Mode-S short` (see [Packet](#packet)) - * `Mode-S long` (see [Packet](#packet)) + * `"header"` (see [Header](#header)) + * `"Mode-AC"` (see [Packet](#packet)) + * `"Mode-S short"` (see [Packet](#packet)) + * `"Mode-S long"` (see [Packet](#packet)) ## Header * `type`: * String - * `header` + * `"header"` * `magic`: * String - * `aDsB` + * `"aDsB"` * `server_version`: * String * Unique identifier for this server implementation - * Recommended: `https://url/of/source#version` + * Recommended: `"https://url/of/source#version"` * `server_id`: * String * Unique identifier for this server instance @@ -49,9 +49,9 @@ additional headers. * `type`: * String * One of: - * `Mode-AC` (4 byte payload, 2 bytes when decoded) - * `Mode-S short` (14 byte payload, 7 bytes when decoded) - * `Mode-S long` (28 byte payload, 14 bytes when decoded) + * `"Mode-AC"` (4 byte payload, 2 bytes when decoded) + * `"Mode-S short"` (14 byte payload, 7 bytes when decoded) + * `"Mode-S long"` (28 byte payload, 14 bytes when decoded) * `source_id`: * String * Unique value for the source that recorded this packet. From 0b2a8469cee753d71a23b85b01d38deeb59624d0 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 20:05:40 -0800 Subject: [PATCH 16/35] Formatting --- protocols/proto.md | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/proto.md b/protocols/proto.md index 7b8bf01..062db97 100644 --- a/protocols/proto.md +++ b/protocols/proto.md @@ -24,4 +24,5 @@ for details. ## Tips To decode a stream file: + `$ protoc-c --proto_path=adsb-tools/proto --decode=AdsbStream adsb-tools/proto/adsb.proto < streamfile` From a3d6adb73bef13bb81ed377f0a56b4ac30b1150e Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 21:23:43 -0800 Subject: [PATCH 17/35] Fix null node name for incoming socket. --- adsbus/incoming.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 934d3b1..c8ae367 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -171,7 +171,7 @@ void incoming_new(char *node, char *service, incoming_connection_handler handler struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; uuid_gen(incoming->id); - incoming->node = strdup(node); + incoming->node = node ? strdup(node) : NULL; incoming->service = strdup(service); incoming->attempt = 0; incoming->handler = handler; From 8285011efde2116c0e49d6be49d10ff17dd47a77 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 21:41:35 -0800 Subject: [PATCH 18/35] PR --- adsbus/README.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/adsbus/README.md b/adsbus/README.md index 06783f6..b6d7d54 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -2,6 +2,10 @@ adsbus is a hub and protocol translator for [ADS-B](https://en.wikipedia.org/wiki/Automatic_dependent_surveillance_%E2%80%93_broadcast) messages. +It is conceptually similar to `dump1090 --net-only`, but supports more protocols and configurations. It doesn't talk to your radio itself; it +hooks programs that do, then handles the network distribution and format translation. It doesn't output to a web interface or send data to +services like FlightAware; it provides hooks for programs that do. + ## Building @@ -11,3 +15,42 @@ git clone https://github.com/flamingcowtv/adsb-tools.git cd adsb-tools/adsbus make ``` + + +## Features + +* Separates the concepts of transport, data direction, and format +* Transports: + * Outgoing TCP connection + * Incoming TCP connection + * Local files or [named pipes](https://en.wikipedia.org/wiki/Named_pipe) + * [stdin/stdout](https://en.wikipedia.org/wiki/Standard_streams) + * Execute a command and use its stdin/stdout +* Data directions: + * Send (data flows out of adsbus) + * Receive (data flows in to adsbus) +* Formats: + * [airspy_adsb](../airspy_adsb.md) (a.k.a. ASAVR) + * [beast](../beast.md) + * [json](../json.md) + * [proto](../proto.md) (a.k.a. ProtoBuf, Protocol Buffers) + * [raw](../raw.md) (a.k.a. AVR) + * stats (outgoing only, summary aggregated data) +* Transport features: + * [IPv4](https://en.wikipedia.org/wiki/IPv4) and [IPv6](https://en.wikipedia.org/wiki/IPv6) support + * Reresolution/reconnection on disconnect, with backoff and jitter + * [TCP keepalives](https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive) for dead connection detection + * [TCP fast open](https://en.wikipedia.org/wiki/TCP_Fast_Open) for faster startup on high-latency connections + * [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates +* Format features: + * Autodetection of received data format + * [MLAT](https://en.wikipedia.org/wiki/Multilateration) scaling for different clock rates and counter bit widths + * [RSSI](https://en.wikipedia.org/wiki/Received_signal_strength_indication) scaling for different bit widths + * Introduces json format for balanced human and machine readability with forward compatibility + * Introduces proto format for fast serialization and deserialization with forward compatibility +* Federation: + * Federation allows linking multiple instances of adsbus for: + * Scalability (cores, number of input or output clients, etc.) + * Efficient long-haul links (hub and spoke models on both ends) + * json and proto formats carry information about original source across multiple hops + * SO_REUSEPORT allows multiple adsbus instances to accept connections on the same IP and port without a load balancer From 31aa3fa9a31a9fe8bc79d99672e47805f65ee381 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 21:44:21 -0800 Subject: [PATCH 19/35] Minor fixes --- adsbus/README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/adsbus/README.md b/adsbus/README.md index b6d7d54..87f7847 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -25,22 +25,22 @@ make * Incoming TCP connection * Local files or [named pipes](https://en.wikipedia.org/wiki/Named_pipe) * [stdin/stdout](https://en.wikipedia.org/wiki/Standard_streams) - * Execute a command and use its stdin/stdout + * Execute a command and talk to its stdin/stdout * Data directions: * Send (data flows out of adsbus) * Receive (data flows in to adsbus) * Formats: - * [airspy_adsb](../airspy_adsb.md) (a.k.a. ASAVR) - * [beast](../beast.md) - * [json](../json.md) - * [proto](../proto.md) (a.k.a. ProtoBuf, Protocol Buffers) - * [raw](../raw.md) (a.k.a. AVR) - * stats (outgoing only, summary aggregated data) + * [airspy_adsb](../protocols/airspy_adsb.md) (a.k.a. ASAVR) + * [beast](../protocols/beast.md) + * [json](../protocols/json.md) + * [proto](../protocols/proto.md) (a.k.a. ProtoBuf, Protocol Buffers) + * [raw](../protocols/raw.md) (a.k.a. AVR) + * stats (send only, summary aggregated data) * Transport features: * [IPv4](https://en.wikipedia.org/wiki/IPv4) and [IPv6](https://en.wikipedia.org/wiki/IPv6) support - * Reresolution/reconnection on disconnect, with backoff and jitter + * Reresolution and reconnection on disconnect, with backoff and jitter * [TCP keepalives](https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive) for dead connection detection - * [TCP fast open](https://en.wikipedia.org/wiki/TCP_Fast_Open) for faster startup on high-latency connections + * [TCP fast open](https://en.wikipedia.org/wiki/TCP_Fast_Open) for faster startup of high-latency connections * [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates * Format features: * Autodetection of received data format From 669b289e8412c4afbfbf2eff7d7bc6a5db0cff5e Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 21:49:29 -0800 Subject: [PATCH 20/35] Data flow hype --- adsbus/README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/adsbus/README.md b/adsbus/README.md index 87f7847..4456f11 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -19,14 +19,14 @@ make ## Features -* Separates the concepts of transport, data direction, and format +* Separates the concepts of transport, data flow, and format * Transports: * Outgoing TCP connection * Incoming TCP connection * Local files or [named pipes](https://en.wikipedia.org/wiki/Named_pipe) * [stdin/stdout](https://en.wikipedia.org/wiki/Standard_streams) * Execute a command and talk to its stdin/stdout -* Data directions: +* Data flows: * Send (data flows out of adsbus) * Receive (data flows in to adsbus) * Formats: @@ -42,6 +42,8 @@ make * [TCP keepalives](https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive) for dead connection detection * [TCP fast open](https://en.wikipedia.org/wiki/TCP_Fast_Open) for faster startup of high-latency connections * [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates +* Data flow features: + * Rapid detection and disconnection of receive <-> receive connections * Format features: * Autodetection of received data format * [MLAT](https://en.wikipedia.org/wiki/Multilateration) scaling for different clock rates and counter bit widths From aa11eea121614ecf87f73818b6a97189b21cd7b0 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 22:21:07 -0800 Subject: [PATCH 21/35] Move more socket code into socket.c --- adsbus/receive.c | 4 ++-- adsbus/send.c | 4 ++-- adsbus/socket.c | 15 +++++++++++++++ adsbus/socket.h | 2 ++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/adsbus/receive.c b/adsbus/receive.c index 8076d3b..bb463f5 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -14,6 +14,7 @@ #include "peer.h" #include "proto.h" #include "raw.h" +#include "socket.h" #include "send.h" #include "uuid.h" @@ -132,8 +133,7 @@ void receive_cleanup() { void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { peer_count_in++; - int res = shutdown(fd, SHUT_WR); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + socket_receive_init(fd); struct receive *receive = malloc(sizeof(*receive)); assert(receive); diff --git a/adsbus/send.c b/adsbus/send.c index 43221d4..b93dd2a 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -18,6 +18,7 @@ #include "peer.h" #include "proto.h" #include "raw.h" +#include "socket.h" #include "stats.h" #include "uuid.h" @@ -114,8 +115,7 @@ struct serializer *send_get_serializer(char *name) { void send_new(int fd, struct serializer *serializer, struct peer *on_close) { peer_count_out++; - int res = shutdown(fd, SHUT_RD); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + socket_send_init(fd); struct send *send = malloc(sizeof(*send)); assert(send); diff --git a/adsbus/socket.c b/adsbus/socket.c index 89bd56d..0dfaa57 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -8,16 +8,19 @@ #include "socket.h" void socket_pre_bind_init(int fd) { + // Called by transport code; safe to assume that fd is a socket int optval = 1; assert(!setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval))); } void socket_bound_init(int fd) { + // Called by transport code; safe to assume that fd is a socket int qlen = 5; assert(!setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen))); } void socket_connected_init(int fd) { + // Called by transport code; safe to assume that fd is a socket int optval = 1; assert(!setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))); optval = 30; @@ -27,3 +30,15 @@ void socket_connected_init(int fd) { optval = 3; assert(!setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval))); } + +void socket_send_init(int fd) { + // Called by data flow code; NOT safe to assume that fd is a socket + int res = shutdown(fd, SHUT_RD); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); +} + +void socket_receive_init(int fd) { + // Called by data flow code; NOT safe to assume that fd is a socket + int res = shutdown(fd, SHUT_WR); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); +} diff --git a/adsbus/socket.h b/adsbus/socket.h index 12de4af..d5ae8dd 100644 --- a/adsbus/socket.h +++ b/adsbus/socket.h @@ -3,3 +3,5 @@ void socket_pre_bind_init(int); void socket_bound_init(int); void socket_connected_init(int); +void socket_send_init(int); +void socket_receive_init(int); From 49b6b0083bf1954c24a27f2a1854343c512602ac Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 22:34:53 -0800 Subject: [PATCH 22/35] Fix misunderstanding of sendto(MSG_FASTOPEN) --- adsbus/outgoing.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 52b6629..928d80d 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -66,8 +66,8 @@ static void outgoing_connect_next(struct outgoing *outgoing) { if (outgoing->hello) { outgoing->hello(&buf_ptr, outgoing->passthrough); } - int result = (int) sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); - outgoing_connect_result(outgoing, result == 0 ? result : errno); + ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); + outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno); } static void outgoing_connect_handler(struct peer *peer) { From 5c7bf0b3958ecbc95dabdbd64d7af80a4479e92e Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 22:50:26 -0800 Subject: [PATCH 23/35] If we get a write <-> write connection, don't waste space buffering data we won't use. --- adsbus/socket.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/adsbus/socket.c b/adsbus/socket.c index 0dfaa57..b451e85 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -35,6 +35,10 @@ void socket_send_init(int fd) { // Called by data flow code; NOT safe to assume that fd is a socket int res = shutdown(fd, SHUT_RD); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + + int optval = 128; // Lowest value that the kernel will accept + res = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); } void socket_receive_init(int fd) { From d842ad65bbeb3c6163d6b799fc5d8bb93c5bf3b6 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 23:03:18 -0800 Subject: [PATCH 24/35] More limits to damage done by write <-> write and disconnected hsots. --- adsbus/socket.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/adsbus/socket.c b/adsbus/socket.c index b451e85..0c37ceb 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -39,6 +39,14 @@ void socket_send_init(int fd) { int optval = 128; // Lowest value that the kernel will accept res = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + + optval = 128; // Lowest value that the kernel will accept + res = setsockopt(fd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &optval, sizeof(optval)); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + + optval = 60000; // 60s + res = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &optval, sizeof(optval)); + assert(res == 0 || (res == -1 && errno == ENOTSOCK)); } void socket_receive_init(int fd) { From 2e9582d822e72501fc79f3bfbdfa386fed671ed3 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 23:04:12 -0800 Subject: [PATCH 25/35] New feature addition. --- adsbus/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/adsbus/README.md b/adsbus/README.md index 4456f11..aa47a83 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -44,6 +44,7 @@ make * [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates * Data flow features: * Rapid detection and disconnection of receive <-> receive connections + * Slight less rapid detection and disconnection of send <-> send connections * Format features: * Autodetection of received data format * [MLAT](https://en.wikipedia.org/wiki/Multilateration) scaling for different clock rates and counter bit widths From e44edeba53abaaeab3a3d771ca1c89961c5c70ea Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Feb 2016 23:07:02 -0800 Subject: [PATCH 26/35] A word --- adsbus/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adsbus/README.md b/adsbus/README.md index aa47a83..7877616 100644 --- a/adsbus/README.md +++ b/adsbus/README.md @@ -44,7 +44,7 @@ make * [SO_REUSEPORT](https://lwn.net/Articles/542629/) for zero-downtime updates * Data flow features: * Rapid detection and disconnection of receive <-> receive connections - * Slight less rapid detection and disconnection of send <-> send connections + * Less rapid detection and disconnection of send <-> send connections * Format features: * Autodetection of received data format * [MLAT](https://en.wikipedia.org/wiki/Multilateration) scaling for different clock rates and counter bit widths From 3539f9d6409e1cabf233dd05054164c8ecca7137 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 15:22:10 -0800 Subject: [PATCH 27/35] Call less syscalls after each accept() --- adsbus/incoming.c | 2 -- adsbus/socket.c | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/adsbus/incoming.c b/adsbus/incoming.c index c8ae367..48dcbfd 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -79,8 +79,6 @@ static void incoming_handler(struct peer *peer) { local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); - socket_connected_init(fd); - if (!incoming_hello(fd, incoming)) { fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); assert(!close(fd)); diff --git a/adsbus/socket.c b/adsbus/socket.c index 0c37ceb..5e34ccb 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -17,6 +17,10 @@ void socket_bound_init(int fd) { // Called by transport code; safe to assume that fd is a socket int qlen = 5; assert(!setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen))); + + // These options are inherited through accept() + // Verified by flamingcow on kernel 4.2.0 + socket_connected_init(fd); } void socket_connected_init(int fd) { From 3fb3c02cf55b7c7efd36b0f06a360bc9ab0b5bc3 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 17:12:06 -0800 Subject: [PATCH 28/35] Encapsulate flow descriptions inside a struct. --- adsbus/exec.c | 21 +++++++++------------ adsbus/exec.h | 9 ++------- adsbus/flow.h | 17 +++++++++++++++++ adsbus/incoming.c | 21 +++++++++------------ adsbus/incoming.h | 9 ++------- adsbus/opts.c | 21 ++++++++++----------- adsbus/outgoing.c | 21 +++++++++------------ adsbus/outgoing.h | 7 ++----- adsbus/receive.c | 8 ++++++++ adsbus/receive.h | 4 ++-- adsbus/send.c | 9 +++++++++ adsbus/send.h | 4 ++++ 12 files changed, 83 insertions(+), 68 deletions(-) create mode 100644 adsbus/flow.h diff --git a/adsbus/exec.c b/adsbus/exec.c index 9a3cceb..93770aa 100644 --- a/adsbus/exec.c +++ b/adsbus/exec.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "uuid.h" @@ -20,10 +21,8 @@ struct exec { struct peer peer; uint8_t id[UUID_LEN]; char *command; - exec_connection_handler handler; - exec_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; pid_t child; struct list_head exec_list; }; @@ -33,7 +32,7 @@ static struct list_head exec_head = LIST_HEAD_INIT(exec_head); static void exec_spawn_wrapper(struct peer *); static void exec_del(struct exec *exec) { - (*exec->count)--; + (*exec->flow->ref_count)--; if (exec->child > 0) { fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child); // Racy with the process terminating, so don't assert on it @@ -62,11 +61,11 @@ static void exec_close_handler(struct peer *peer) { } static bool exec_hello(int fd, struct exec *exec) { - if (!exec->hello) { + if (!exec->flow->get_hello) { return true; } struct buf buf = BUF_INIT, *buf_ptr = &buf; - exec->hello(&buf_ptr, exec->passthrough); + exec->flow->get_hello(&buf_ptr, exec->passthrough); if (!buf_ptr->length) { return true; } @@ -84,7 +83,7 @@ static void exec_parent(struct exec *exec, pid_t child, int fd) { } exec->peer.event_handler = exec_close_handler; - exec->handler(fd, exec->passthrough, (struct peer *) exec); + exec->flow->new(fd, exec->passthrough, (struct peer *) exec); } static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { @@ -132,17 +131,15 @@ void exec_cleanup() { } } -void exec_new(char *command, exec_connection_handler handler, exec_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void exec_new(char *command, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct exec *exec = malloc(sizeof(*exec)); exec->peer.fd = -1; uuid_gen(exec->id); exec->command = strdup(command); - exec->handler = handler; - exec->hello = hello; + exec->flow = flow; exec->passthrough = passthrough; - exec->count = count; list_add(&exec->exec_list, &exec_head); diff --git a/adsbus/exec.h b/adsbus/exec.h index 2429f17..0c8c525 100644 --- a/adsbus/exec.h +++ b/adsbus/exec.h @@ -1,11 +1,6 @@ #pragma once -#include - -struct buf; -struct peer; +struct flow; void exec_cleanup(void); -typedef void (*exec_connection_handler)(int fd, void *, struct peer *); -typedef void (*exec_get_hello)(struct buf **, void *); -void exec_new(char *, exec_connection_handler, exec_get_hello, void *, uint32_t *); +void exec_new(char *, struct flow *, void *); diff --git a/adsbus/flow.h b/adsbus/flow.h new file mode 100644 index 0000000..e7ff369 --- /dev/null +++ b/adsbus/flow.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +struct buf; +struct peer; + +typedef void (*flow_bound_socket_init)(int); +typedef void (*flow_new)(int, void *, struct peer *); +typedef void (*flow_get_hello)(struct buf **, void *); +struct flow { + const char *name; + flow_bound_socket_init bound_socket_init; + flow_new new; + flow_get_hello get_hello; + uint32_t *ref_count; +}; diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 48dcbfd..74603b9 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "resolve.h" @@ -26,10 +27,8 @@ struct incoming { struct addrinfo *addrs; const char *error; uint32_t attempt; - incoming_connection_handler handler; - incoming_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; struct list_head incoming_list; }; @@ -45,11 +44,11 @@ static void incoming_retry(struct incoming *incoming) { } static bool incoming_hello(int fd, struct incoming *incoming) { - if (!incoming->hello) { + if (!incoming->flow->get_hello) { return true; } struct buf buf = BUF_INIT, *buf_ptr = &buf; - incoming->hello(&buf_ptr, incoming->passthrough); + incoming->flow->get_hello(&buf_ptr, incoming->passthrough); if (!buf_ptr->length) { return true; } @@ -85,11 +84,11 @@ static void incoming_handler(struct peer *peer) { return; } - incoming->handler(fd, incoming->passthrough, NULL); + incoming->flow->new(fd, incoming->passthrough, NULL); } static void incoming_del(struct incoming *incoming) { - (*incoming->count)--; + (*incoming->flow->ref_count)--; if (incoming->peer.fd >= 0) { assert(!close(incoming->peer.fd)); } @@ -163,8 +162,8 @@ void incoming_cleanup() { } } -void incoming_new(char *node, char *service, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void incoming_new(char *node, char *service, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; @@ -172,10 +171,8 @@ void incoming_new(char *node, char *service, incoming_connection_handler handler incoming->node = node ? strdup(node) : NULL; incoming->service = strdup(service); incoming->attempt = 0; - incoming->handler = handler; - incoming->hello = hello; + incoming->flow = flow; incoming->passthrough = passthrough; - incoming->count = count; list_add(&incoming->incoming_list, &incoming_head); diff --git a/adsbus/incoming.h b/adsbus/incoming.h index a6978ca..4691c1d 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -1,11 +1,6 @@ #pragma once -#include - -struct buf; -struct peer; +struct flow; void incoming_cleanup(void); -typedef void (*incoming_connection_handler)(int fd, void *, struct peer *); -typedef void (*incoming_get_hello)(struct buf **, void *); -void incoming_new(char *, char *, incoming_connection_handler, incoming_get_hello, void *, uint32_t *); +void incoming_new(char *, char *, struct flow *, void *); diff --git a/adsbus/opts.c b/adsbus/opts.c index f3b4475..ad90250 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -6,11 +6,10 @@ #include #include -#include "buf.h" #include "exec.h" +#include "flow.h" #include "incoming.h" #include "outgoing.h" -#include "peer.h" #include "receive.h" #include "send.h" @@ -26,13 +25,13 @@ static char *opts_split(char **arg, char delim) { return ret; } -static void opts_add_listen(char *host_port, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) { +static void opts_add_listen(char *host_port, struct flow *flow, void *passthrough) { char *host = opts_split(&host_port, '/'); if (host) { - incoming_new(host, host_port, handler, hello, passthrough, count); + incoming_new(host, host_port, flow, passthrough); free(host); } else { - incoming_new(NULL, host_port, handler, hello, passthrough, count); + incoming_new(NULL, host_port, flow, passthrough); } } @@ -57,7 +56,7 @@ bool opts_add_connect_receive(char *arg) { return false; } - outgoing_new(host, arg, receive_new, NULL, NULL, &peer_count_in); + outgoing_new(host, arg, receive_flow, NULL); free(host); return true; } @@ -73,13 +72,13 @@ bool opts_add_connect_send(char *arg) { return false; } - outgoing_new(host, arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + outgoing_new(host, arg, send_flow, serializer); free(host); return true; } bool opts_add_listen_receive(char *arg) { - opts_add_listen(arg, receive_new, NULL, NULL, &peer_count_in); + opts_add_listen(arg, receive_flow, NULL); return true; } @@ -89,7 +88,7 @@ bool opts_add_listen_send(char *arg) { return false; } - opts_add_listen(arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + opts_add_listen(arg, send_flow, serializer); return true; } @@ -129,7 +128,7 @@ bool opts_add_file_append(char *arg) { } bool opts_add_exec_receive(char *arg) { - exec_new(arg, receive_new, NULL, NULL, &peer_count_in); + exec_new(arg, receive_flow, NULL); return true; } @@ -139,7 +138,7 @@ bool opts_add_exec_send(char *arg) { return NULL; } - exec_new(arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + exec_new(arg, send_flow, serializer); return true; } diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 928d80d..23bdeb1 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "resolve.h" @@ -27,10 +28,8 @@ struct outgoing { struct addrinfo *addr; const char *error; uint32_t attempt; - outgoing_connection_handler handler; - outgoing_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; struct list_head outgoing_list; }; @@ -63,8 +62,8 @@ static void outgoing_connect_next(struct outgoing *outgoing) { assert(outgoing->peer.fd >= 0); struct buf buf = BUF_INIT, *buf_ptr = &buf; - if (outgoing->hello) { - outgoing->hello(&buf_ptr, outgoing->passthrough); + if (outgoing->flow->get_hello) { + outgoing->flow->get_hello(&buf_ptr, outgoing->passthrough); } ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno); @@ -102,7 +101,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { int fd = outgoing->peer.fd; outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_disconnect_handler; - outgoing->handler(fd, outgoing->passthrough, (struct peer *) outgoing); + outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing); break; case EINPROGRESS: @@ -144,7 +143,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) { } static void outgoing_del(struct outgoing *outgoing) { - (*outgoing->count)--; + (*outgoing->flow->ref_count)--; if (outgoing->peer.fd >= 0) { assert(!close(outgoing->peer.fd)); } @@ -160,18 +159,16 @@ void outgoing_cleanup() { } } -void outgoing_new(char *node, char *service, outgoing_connection_handler handler, outgoing_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void outgoing_new(char *node, char *service, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct outgoing *outgoing = malloc(sizeof(*outgoing)); uuid_gen(outgoing->id); outgoing->node = strdup(node); outgoing->service = strdup(service); outgoing->attempt = 0; - outgoing->handler = handler; - outgoing->hello = hello; + outgoing->flow = flow; outgoing->passthrough = passthrough; - outgoing->count = count; list_add(&outgoing->outgoing_list, &outgoing_head); diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index a669b94..c39fdc5 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -1,9 +1,6 @@ #pragma once -struct buf; -struct peer; +struct flow; void outgoing_cleanup(void); -typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *); -typedef void (*outgoing_get_hello)(struct buf **, void *); -void outgoing_new(char *, char *, outgoing_connection_handler, outgoing_get_hello, void *, uint32_t *); +void outgoing_new(char *, char *, struct flow *, void *); diff --git a/adsbus/receive.c b/adsbus/receive.c index bb463f5..67126bf 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -9,6 +9,7 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "flow.h" #include "json.h" #include "packet.h" #include "peer.h" @@ -36,6 +37,13 @@ struct receive { }; static struct receive *receive_head = NULL; +static struct flow _receive_flow = { + .name = "receive", + .new = receive_new, + .ref_count = &peer_count_in, +}; +struct flow *receive_flow = &_receive_flow; + static struct parser { char *name; parser parse; diff --git a/adsbus/receive.h b/adsbus/receive.h index 1c7bd92..a600a48 100644 --- a/adsbus/receive.h +++ b/adsbus/receive.h @@ -1,11 +1,11 @@ #pragma once -#include - #define PARSER_STATE_LEN 256 +struct flow; struct peer; void receive_cleanup(void); void receive_new(int, void *, struct peer *); void receive_print_usage(void); +extern struct flow *receive_flow; diff --git a/adsbus/send.c b/adsbus/send.c index b93dd2a..2db9b23 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -12,6 +12,7 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "flow.h" #include "json.h" #include "list.h" #include "packet.h" @@ -32,6 +33,14 @@ struct send { struct list_head send_list; }; +static struct flow _send_flow = { + .name = "send", + .new = send_new_wrapper, + .get_hello = send_hello, + .ref_count = &peer_count_out, +}; +struct flow *send_flow = &_send_flow; + typedef void (*serialize)(struct packet *, struct buf *); typedef void (*hello)(struct buf **); static struct serializer { diff --git a/adsbus/send.h b/adsbus/send.h index a50fa3f..8375d48 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -1,6 +1,9 @@ #pragma once +#include + struct buf; +struct flow; struct packet; struct peer; @@ -13,3 +16,4 @@ bool send_new_hello(int, struct serializer *, struct peer *); void send_hello(struct buf **, void *); void send_write(struct packet *); void send_print_usage(void); +extern struct flow *send_flow; From 90de5b80bc2bfe02cb63b86edb89694bd55aceb6 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 20:16:57 -0800 Subject: [PATCH 29/35] Hide more interfaces behind the flow code. --- adsbus/opts.c | 6 ++-- adsbus/receive.c | 16 +++++----- adsbus/receive.h | 2 -- adsbus/send.c | 79 ++++++++++++++++++++++++------------------------ adsbus/send.h | 8 ++--- 5 files changed, 55 insertions(+), 56 deletions(-) diff --git a/adsbus/opts.c b/adsbus/opts.c index ad90250..694af5e 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -97,7 +97,8 @@ bool opts_add_file_read(char *arg) { if (fd == -1) { return false; } - receive_new(fd, NULL, NULL); + // TODO: add file.[ch] + receive_flow->new(fd, NULL, NULL); return true; } @@ -145,7 +146,8 @@ bool opts_add_exec_send(char *arg) { bool opts_add_stdin(char __attribute__((unused)) *arg) { int fd = dup(0); assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); - receive_new(fd, NULL, NULL); + // TODO: add file.[ch] + receive_flow->new(fd, NULL, NULL); return true; } diff --git a/adsbus/receive.c b/adsbus/receive.c index 67126bf..3839498 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -37,6 +37,8 @@ struct receive { }; static struct receive *receive_head = NULL; +static void receive_new(int, void *, struct peer *); + static struct flow _receive_flow = { .name = "receive", .new = receive_new, @@ -132,13 +134,7 @@ static void receive_read(struct peer *peer) { } } -void receive_cleanup() { - while (receive_head) { - receive_del(receive_head); - } -} - -void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { +static void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { peer_count_in++; socket_receive_init(fd); @@ -164,6 +160,12 @@ void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer fprintf(stderr, "R %s: New receive connection\n", receive->id); } +void receive_cleanup() { + while (receive_head) { + receive_del(receive_head); + } +} + void receive_print_usage() { fprintf(stderr, "\nSupported receive formats (auto-detected):\n"); for (size_t i = 0; i < NUM_PARSERS; i++) { diff --git a/adsbus/receive.h b/adsbus/receive.h index a600a48..9652f2c 100644 --- a/adsbus/receive.h +++ b/adsbus/receive.h @@ -3,9 +3,7 @@ #define PARSER_STATE_LEN 256 struct flow; -struct peer; void receive_cleanup(void); -void receive_new(int, void *, struct peer *); void receive_print_usage(void); extern struct flow *receive_flow; diff --git a/adsbus/send.c b/adsbus/send.c index 2db9b23..cd9a56d 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -33,10 +33,13 @@ struct send { struct list_head send_list; }; +static void send_new(int, void *, struct peer *); +static void send_get_hello(struct buf **, void *); + static struct flow _send_flow = { .name = "send", - .new = send_new_wrapper, - .get_hello = send_hello, + .new = send_new, + .get_hello = send_get_hello, .ref_count = &peer_count_out, }; struct flow *send_flow = &_send_flow; @@ -96,32 +99,9 @@ static void send_del_wrapper(struct peer *peer) { send_del((struct send *) peer); } -void send_init() { - assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR); - for (size_t i = 0; i < NUM_SERIALIZERS; i++) { - list_head_init(&serializers[i].send_head); - } -} +static void send_new(int fd, void *passthrough, struct peer *on_close) { + struct serializer *serializer = (struct serializer *) passthrough; -void send_cleanup() { - for (size_t i = 0; i < NUM_SERIALIZERS; i++) { - struct send *iter, *next; - list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) { - send_del(iter); - } - } -} - -struct serializer *send_get_serializer(char *name) { - for (size_t i = 0; i < NUM_SERIALIZERS; i++) { - if (strcasecmp(serializers[i].name, name) == 0) { - return &serializers[i]; - } - } - return NULL; -} - -void send_new(int fd, struct serializer *serializer, struct peer *on_close) { peer_count_out++; socket_send_init(fd); @@ -141,30 +121,51 @@ void send_new(int fd, struct serializer *serializer, struct peer *on_close) { fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name); } -void send_new_wrapper(int fd, void *passthrough, struct peer *on_close) { - send_new(fd, (struct serializer *) passthrough, on_close); +static void send_get_hello(struct buf **buf_pp, void *passthrough) { + struct serializer *serializer = (struct serializer *) passthrough; + if (serializer->hello) { + serializer->hello(buf_pp); + } } -bool send_new_hello(int fd, struct serializer *serializer, struct peer *on_close) { +void send_init() { + assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR); + for (size_t i = 0; i < NUM_SERIALIZERS; i++) { + list_head_init(&serializers[i].send_head); + } +} + +void send_cleanup() { + for (size_t i = 0; i < NUM_SERIALIZERS; i++) { + struct send *iter, *next; + list_for_each_entry_safe(iter, next, &serializers[i].send_head, send_list) { + send_del(iter); + } + } +} + +void *send_get_serializer(char *name) { + for (size_t i = 0; i < NUM_SERIALIZERS; i++) { + if (strcasecmp(serializers[i].name, name) == 0) { + return &serializers[i]; + } + } + return NULL; +} + +bool send_new_hello(int fd, void *passthrough, struct peer *on_close) { struct buf buf = BUF_INIT, *buf_ptr = &buf; - send_hello(&buf_ptr, serializer); + send_get_hello(&buf_ptr, passthrough); if (buf_ptr->length) { if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) { assert(!close(fd)); return false; } } - send_new(fd, serializer, on_close); + send_new(fd, passthrough, on_close); return true; } -void send_hello(struct buf **buf_pp, void *passthrough) { - struct serializer *serializer = (struct serializer *) passthrough; - if (serializer->hello) { - serializer->hello(buf_pp); - } -} - void send_write(struct packet *packet) { packet_sanity_check(packet); for (size_t i = 0; i < NUM_SERIALIZERS; i++) { diff --git a/adsbus/send.h b/adsbus/send.h index 8375d48..17316cc 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -2,18 +2,14 @@ #include -struct buf; struct flow; struct packet; struct peer; void send_init(void); void send_cleanup(void); -struct serializer *send_get_serializer(char *); -void send_new(int, struct serializer *, struct peer *); -void send_new_wrapper(int, void *, struct peer *); -bool send_new_hello(int, struct serializer *, struct peer *); -void send_hello(struct buf **, void *); +void *send_get_serializer(char *); +bool send_new_hello(int, void *, struct peer *); void send_write(struct packet *); void send_print_usage(void); extern struct flow *send_flow; From b106887617ce6b8da07835121c4cf20e407654f2 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 20:49:36 -0800 Subject: [PATCH 30/35] Clean up file and hello messes. --- adsbus/Makefile | 7 ++++--- adsbus/exec.c | 14 +------------- adsbus/file.c | 43 +++++++++++++++++++++++++++++++++++++++++++ adsbus/file.h | 8 ++++++++ adsbus/flow.c | 17 +++++++++++++++++ adsbus/flow.h | 3 +++ adsbus/incoming.c | 14 +------------- adsbus/opts.c | 35 ++++++++++++----------------------- adsbus/send.c | 13 ------------- adsbus/send.h | 1 - 10 files changed, 89 insertions(+), 66 deletions(-) create mode 100644 adsbus/file.c create mode 100644 adsbus/file.h create mode 100644 adsbus/flow.c diff --git a/adsbus/Makefile b/adsbus/Makefile index 0297916..beec994 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -10,7 +10,8 @@ VALGRIND ?= valgrind VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats -OBJ_TRANSPORT = exec.o incoming.o outgoing.o receive.o send.o +OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o +OBJ_FLOW = flow.o receive.o send.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o @@ -26,8 +27,8 @@ clean: adsb.pb-c.c: ../proto/adsb.proto protoc-c --c_out=./ --proto_path=$(dir $<) $< -adsbus: adsbus.o $(OBJ_TRANSPORT) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) - $(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_TRANSPORT) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS) +adsbus: adsbus.o $(OBJ_TRANSPORT) $(OBJ_FLOW) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) + $(COMP) $(LDFLAGS) -o adsbus adsbus.o $(OBJ_TRANSPORT) $(OBJ_FLOW) $(OBJ_PROTOCOL) $(OBJ_UTIL) $(OBJ_PROTO) $(LIBS) fuzz: rm -rf findings diff --git a/adsbus/exec.c b/adsbus/exec.c index 93770aa..426fa0e 100644 --- a/adsbus/exec.c +++ b/adsbus/exec.c @@ -60,23 +60,11 @@ static void exec_close_handler(struct peer *peer) { wakeup_add((struct peer *) exec, delay); } -static bool exec_hello(int fd, struct exec *exec) { - if (!exec->flow->get_hello) { - return true; - } - struct buf buf = BUF_INIT, *buf_ptr = &buf; - exec->flow->get_hello(&buf_ptr, exec->passthrough); - if (!buf_ptr->length) { - return true; - } - return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); -} - static void exec_parent(struct exec *exec, pid_t child, int fd) { exec->child = child; fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child); - if (!exec_hello(fd, exec)) { + if (!flow_hello(fd, exec->flow, exec->passthrough)) { assert(!close(fd)); exec_close_handler((struct peer *) exec); return; diff --git a/adsbus/file.c b/adsbus/file.c new file mode 100644 index 0000000..7844fac --- /dev/null +++ b/adsbus/file.c @@ -0,0 +1,43 @@ +#include +#include +#include +#include + +#include "flow.h" +#include "receive.h" +#include "send.h" + +#include "file.h" + +void file_fd_new(int fd, struct flow *flow, void *passthrough) { + flow->new(fd, passthrough, NULL); + // TODO: log error; retry? + flow_hello(fd, flow, passthrough); +} + +void file_read_new(char *path, struct flow *flow, void *passthrough) { + int fd = open(path, O_RDONLY | O_CLOEXEC); + if (fd == -1) { + // TODO: log error; retry? + return; + } + file_fd_new(fd, flow, passthrough); +} + +void file_write_new(char *path, struct flow *flow, void *passthrough) { + int fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC | O_CLOEXEC, S_IRUSR | S_IWUSR); + if (fd == -1) { + // TODO: log error; retry? + return; + } + file_fd_new(fd, flow, passthrough); +} + +void file_append_new(char *path, struct flow *flow, void *passthrough) { + int fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_CLOEXEC, S_IRWXU); + if (fd == -1) { + // TODO: log error; retry? + return; + } + flow->new(fd, flow, passthrough); +} diff --git a/adsbus/file.h b/adsbus/file.h new file mode 100644 index 0000000..7053e91 --- /dev/null +++ b/adsbus/file.h @@ -0,0 +1,8 @@ +#pragma once + +struct flow; + +void file_fd_new(int, struct flow *, void *); +void file_read_new(char *, struct flow *, void *); +void file_write_new(char *, struct flow *, void *); +void file_append_new(char *, struct flow *, void *); diff --git a/adsbus/flow.c b/adsbus/flow.c new file mode 100644 index 0000000..4aa46a7 --- /dev/null +++ b/adsbus/flow.c @@ -0,0 +1,17 @@ +#include + +#include "buf.h" + +#include "flow.h" + +bool flow_hello(int fd, struct flow *flow, void *passthrough) { + if (!flow->get_hello) { + return true; + } + struct buf buf = BUF_INIT, *buf_ptr = &buf; + flow->get_hello(&buf_ptr, passthrough); + if (!buf_ptr->length) { + return true; + } + return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); +} diff --git a/adsbus/flow.h b/adsbus/flow.h index e7ff369..dfe9d07 100644 --- a/adsbus/flow.h +++ b/adsbus/flow.h @@ -1,5 +1,6 @@ #pragma once +#include #include struct buf; @@ -15,3 +16,5 @@ struct flow { flow_get_hello get_hello; uint32_t *ref_count; }; + +bool flow_hello(int, struct flow *, void *); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 74603b9..62da1bd 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -43,18 +43,6 @@ static void incoming_retry(struct incoming *incoming) { wakeup_add((struct peer *) incoming, delay); } -static bool incoming_hello(int fd, struct incoming *incoming) { - if (!incoming->flow->get_hello) { - return true; - } - struct buf buf = BUF_INIT, *buf_ptr = &buf; - incoming->flow->get_hello(&buf_ptr, incoming->passthrough); - if (!buf_ptr->length) { - return true; - } - return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); -} - static void incoming_handler(struct peer *peer) { struct incoming *incoming = (struct incoming *) peer; @@ -78,7 +66,7 @@ static void incoming_handler(struct peer *peer) { local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); - if (!incoming_hello(fd, incoming)) { + if (!flow_hello(fd, incoming->flow, incoming->passthrough)) { fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); assert(!close(fd)); return; diff --git a/adsbus/opts.c b/adsbus/opts.c index 694af5e..9d5ab71 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -7,7 +7,7 @@ #include #include "exec.h" -#include "flow.h" +#include "file.h" #include "incoming.h" #include "outgoing.h" #include "receive.h" @@ -93,39 +93,28 @@ bool opts_add_listen_send(char *arg) { } bool opts_add_file_read(char *arg) { - int fd = open(arg, O_RDONLY | O_CLOEXEC); - if (fd == -1) { - return false; - } - // TODO: add file.[ch] - receive_flow->new(fd, NULL, NULL); + file_read_new(arg, receive_flow, NULL); return true; } bool opts_add_file_write(char *arg) { struct serializer *serializer = opts_get_serializer(&arg); if (!serializer) { - return NULL; - } - - int fd = open(arg, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC | O_CLOEXEC, S_IRWXU); - if (fd == -1) { return false; } - return send_new_hello(fd, serializer, NULL); + + file_write_new(arg, send_flow, serializer); + return true; } bool opts_add_file_append(char *arg) { struct serializer *serializer = opts_get_serializer(&arg); if (!serializer) { - return NULL; - } - - int fd = open(arg, O_WRONLY | O_CREAT | O_NOFOLLOW | O_CLOEXEC, S_IRWXU); - if (fd == -1) { return false; } - return send_new_hello(fd, serializer, NULL); + + file_append_new(arg, send_flow, serializer); + return true; } bool opts_add_exec_receive(char *arg) { @@ -136,7 +125,7 @@ bool opts_add_exec_receive(char *arg) { bool opts_add_exec_send(char *arg) { struct serializer *serializer = opts_get_serializer(&arg); if (!serializer) { - return NULL; + return false; } exec_new(arg, send_flow, serializer); @@ -146,8 +135,7 @@ bool opts_add_exec_send(char *arg) { bool opts_add_stdin(char __attribute__((unused)) *arg) { int fd = dup(0); assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); - // TODO: add file.[ch] - receive_flow->new(fd, NULL, NULL); + file_fd_new(fd, receive_flow, NULL); return true; } @@ -158,5 +146,6 @@ bool opts_add_stdout(char *arg) { } int fd = dup(1); assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); - return send_new_hello(fd, serializer, NULL); + file_fd_new(fd, send_flow, serializer); + return true; } diff --git a/adsbus/send.c b/adsbus/send.c index cd9a56d..6d3cd7f 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -153,19 +153,6 @@ void *send_get_serializer(char *name) { return NULL; } -bool send_new_hello(int fd, void *passthrough, struct peer *on_close) { - struct buf buf = BUF_INIT, *buf_ptr = &buf; - send_get_hello(&buf_ptr, passthrough); - if (buf_ptr->length) { - if (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) != (ssize_t) buf_ptr->length) { - assert(!close(fd)); - return false; - } - } - send_new(fd, passthrough, on_close); - return true; -} - void send_write(struct packet *packet) { packet_sanity_check(packet); for (size_t i = 0; i < NUM_SERIALIZERS; i++) { diff --git a/adsbus/send.h b/adsbus/send.h index 17316cc..eab1c4e 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -9,7 +9,6 @@ struct peer; void send_init(void); void send_cleanup(void); void *send_get_serializer(char *); -bool send_new_hello(int, void *, struct peer *); void send_write(struct packet *); void send_print_usage(void); extern struct flow *send_flow; From a093b8a1b6560d41009c35664fd76d8e88a3b34d Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 22:03:04 -0800 Subject: [PATCH 31/35] Switch from dup() + fcntl(F_SETFL, FD_CLOEXEC) to fcntl(F_DUPFD_CLOEXEC) to save a second syscall. --- adsbus/opts.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/adsbus/opts.c b/adsbus/opts.c index 9d5ab71..a4ba5c6 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -133,8 +133,8 @@ bool opts_add_exec_send(char *arg) { } bool opts_add_stdin(char __attribute__((unused)) *arg) { - int fd = dup(0); - assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); + int fd = fcntl(0, F_DUPFD_CLOEXEC, 0); + assert(fd >= 0); file_fd_new(fd, receive_flow, NULL); return true; } @@ -144,8 +144,8 @@ bool opts_add_stdout(char *arg) { if (!serializer) { return false; } - int fd = dup(1); - assert(!fcntl(fd, F_SETFD, FD_CLOEXEC)); + int fd = fcntl(1, F_DUPFD_CLOEXEC, 0); + assert(fd >= 0); file_fd_new(fd, send_flow, serializer); return true; } From b6a582e95a073542000b31ee6fb73e8c7681c523 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 22:17:37 -0800 Subject: [PATCH 32/35] Do flow-specific setsockopt() before accept() --- adsbus/flow.c | 8 ++++++++ adsbus/flow.h | 10 ++++------ adsbus/incoming.c | 6 ++++-- adsbus/outgoing.c | 2 +- adsbus/receive.c | 2 +- adsbus/send.c | 3 ++- adsbus/socket.c | 33 +++++++++++++++++---------------- adsbus/socket.h | 11 ++++++----- 8 files changed, 43 insertions(+), 32 deletions(-) diff --git a/adsbus/flow.c b/adsbus/flow.c index 4aa46a7..b7db29c 100644 --- a/adsbus/flow.c +++ b/adsbus/flow.c @@ -1,9 +1,17 @@ #include #include "buf.h" +#include "socket.h" #include "flow.h" +void flow_socket_connected(int fd, struct flow *flow) { + socket_connected(fd); + if (flow->socket_connected) { + flow->socket_connected(fd); + } +} + bool flow_hello(int fd, struct flow *flow, void *passthrough) { if (!flow->get_hello) { return true; diff --git a/adsbus/flow.h b/adsbus/flow.h index dfe9d07..ddacf0a 100644 --- a/adsbus/flow.h +++ b/adsbus/flow.h @@ -6,15 +6,13 @@ struct buf; struct peer; -typedef void (*flow_bound_socket_init)(int); -typedef void (*flow_new)(int, void *, struct peer *); -typedef void (*flow_get_hello)(struct buf **, void *); struct flow { const char *name; - flow_bound_socket_init bound_socket_init; - flow_new new; - flow_get_hello get_hello; + void (*socket_connected)(int); + void (*new)(int, void *, struct peer *); + void (*get_hello)(struct buf **, void *); uint32_t *ref_count; }; +void flow_socket_connected(int, struct flow *); bool flow_hello(int, struct flow *, void *); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 62da1bd..1b93991 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -95,7 +95,7 @@ static void incoming_listen(struct incoming *incoming) { incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol); assert(incoming->peer.fd >= 0); - socket_pre_bind_init(incoming->peer.fd); + socket_pre_bind(incoming->peer.fd); if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); @@ -103,7 +103,9 @@ static void incoming_listen(struct incoming *incoming) { continue; } - socket_bound_init(incoming->peer.fd); + socket_pre_listen(incoming->peer.fd); + // Options are inherited through accept() + flow_socket_connected(incoming->peer.fd, incoming->flow); assert(listen(incoming->peer.fd, 255) == 0); break; diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 23bdeb1..ce37b9f 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -96,7 +96,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { case 0: fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); freeaddrinfo(outgoing->addrs); - socket_connected_init(outgoing->peer.fd); + flow_socket_connected(outgoing->peer.fd, outgoing->flow); outgoing->attempt = 0; int fd = outgoing->peer.fd; outgoing->peer.fd = -1; diff --git a/adsbus/receive.c b/adsbus/receive.c index 3839498..3537139 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -137,7 +137,7 @@ static void receive_read(struct peer *peer) { static void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { peer_count_in++; - socket_receive_init(fd); + socket_receive(fd); struct receive *receive = malloc(sizeof(*receive)); assert(receive); diff --git a/adsbus/send.c b/adsbus/send.c index 6d3cd7f..326c365 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -38,6 +38,7 @@ static void send_get_hello(struct buf **, void *); static struct flow _send_flow = { .name = "send", + .socket_connected = socket_connected_send, .new = send_new, .get_hello = send_get_hello, .ref_count = &peer_count_out, @@ -104,7 +105,7 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) { peer_count_out++; - socket_send_init(fd); + socket_send(fd); struct send *send = malloc(sizeof(*send)); assert(send); diff --git a/adsbus/socket.c b/adsbus/socket.c index 5e34ccb..f90bdf0 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -7,23 +7,19 @@ #include "socket.h" -void socket_pre_bind_init(int fd) { +void socket_pre_bind(int fd) { // Called by transport code; safe to assume that fd is a socket int optval = 1; assert(!setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval))); } -void socket_bound_init(int fd) { +void socket_pre_listen(int fd) { // Called by transport code; safe to assume that fd is a socket int qlen = 5; assert(!setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen))); - - // These options are inherited through accept() - // Verified by flamingcow on kernel 4.2.0 - socket_connected_init(fd); } -void socket_connected_init(int fd) { +void socket_connected(int fd) { // Called by transport code; safe to assume that fd is a socket int optval = 1; assert(!setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))); @@ -35,25 +31,30 @@ void socket_connected_init(int fd) { assert(!setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval))); } -void socket_send_init(int fd) { - // Called by data flow code; NOT safe to assume that fd is a socket - int res = shutdown(fd, SHUT_RD); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); - +void socket_connected_send(int fd) { int optval = 128; // Lowest value that the kernel will accept - res = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + int res = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); + if (res == -1 && errno == ENOTSOCK) { + return; + } + assert(res == 0); optval = 128; // Lowest value that the kernel will accept res = setsockopt(fd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &optval, sizeof(optval)); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + assert(res == 0); optval = 60000; // 60s res = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &optval, sizeof(optval)); + assert(res == 0); +} + +void socket_send(int fd) { + // Called by data flow code; NOT safe to assume that fd is a socket + int res = shutdown(fd, SHUT_RD); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); } -void socket_receive_init(int fd) { +void socket_receive(int fd) { // Called by data flow code; NOT safe to assume that fd is a socket int res = shutdown(fd, SHUT_WR); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); diff --git a/adsbus/socket.h b/adsbus/socket.h index d5ae8dd..b1989a6 100644 --- a/adsbus/socket.h +++ b/adsbus/socket.h @@ -1,7 +1,8 @@ #pragma once -void socket_pre_bind_init(int); -void socket_bound_init(int); -void socket_connected_init(int); -void socket_send_init(int); -void socket_receive_init(int); +void socket_pre_bind(int); +void socket_pre_listen(int); +void socket_connected(int); +void socket_connected_send(int); +void socket_send(int); +void socket_receive(int); From 1a45e4361ad5a08b45551ae2a55574c77dd64815 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Feb 2016 22:37:41 -0800 Subject: [PATCH 33/35] Finally find a clean ownership solution for resolve(). --- adsbus/incoming.c | 33 ++++++++---------- adsbus/outgoing.c | 13 ++++--- adsbus/resolve.c | 87 ++++++++++++++++++++++------------------------- adsbus/resolve.h | 3 +- 4 files changed, 63 insertions(+), 73 deletions(-) diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 1b93991..6ee0c54 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -24,8 +24,6 @@ struct incoming { uint8_t id[UUID_LEN]; char *node; char *service; - struct addrinfo *addrs; - const char *error; uint32_t attempt; struct flow *flow; void *passthrough; @@ -85,9 +83,19 @@ static void incoming_del(struct incoming *incoming) { free(incoming); } -static void incoming_listen(struct incoming *incoming) { +static void incoming_listen(struct peer *peer) { + struct incoming *incoming = (struct incoming *) peer; + + struct addrinfo *addrs; + int err = resolve_result(peer, &addrs); + if (err) { + fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(err)); + incoming_retry(incoming); + return; + } + struct addrinfo *addr; - for (addr = incoming->addrs; addr; addr = addr->ai_next) { + for (addr = addrs; addr; addr = addr->ai_next) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf); @@ -111,7 +119,7 @@ static void incoming_listen(struct incoming *incoming) { break; } - freeaddrinfo(incoming->addrs); + freeaddrinfo(addrs); if (addr == NULL) { fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service); @@ -124,21 +132,10 @@ static void incoming_listen(struct incoming *incoming) { peer_epoll_add((struct peer *) incoming, EPOLLIN); } -static void incoming_resolve_handler(struct peer *peer) { - struct incoming *incoming = (struct incoming *) peer; - if (incoming->addrs) { - incoming_listen(incoming); - } else { - fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, incoming->error); - incoming_retry(incoming); - } -} - static void incoming_resolve(struct incoming *incoming) { fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service); - incoming->peer.fd = -1; - incoming->peer.event_handler = incoming_resolve_handler; - resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE, &incoming->addrs, &incoming->error); + incoming->peer.event_handler = incoming_listen; + resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE); } static void incoming_resolve_wrapper(struct peer *peer) { diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index ce37b9f..9f30c40 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -26,7 +26,6 @@ struct outgoing { char *service; struct addrinfo *addrs; struct addrinfo *addr; - const char *error; uint32_t attempt; struct flow *flow; void *passthrough; @@ -122,20 +121,20 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { static void outgoing_resolve_handler(struct peer *peer) { struct outgoing *outgoing = (struct outgoing *) peer; - if (outgoing->addrs) { + int err = resolve_result(peer, &outgoing->addrs); + if (err) { + fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(err)); + outgoing_retry(outgoing); + } else { outgoing->addr = outgoing->addrs; outgoing_connect_next(outgoing); - } else { - fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, outgoing->error); - outgoing_retry(outgoing); } } static void outgoing_resolve(struct outgoing *outgoing) { fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service); - outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_resolve_handler; - resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0, &outgoing->addrs, &outgoing->error); + resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0); } static void outgoing_resolve_wrapper(struct peer *peer) { diff --git a/adsbus/resolve.c b/adsbus/resolve.c index 1b20a9c..51990a2 100644 --- a/adsbus/resolve.c +++ b/adsbus/resolve.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -9,53 +10,39 @@ #include "resolve.h" -struct resolve_request { +struct resolve { int fd; - const char *node; - const char *service; - int flags; - struct addrinfo **addrs; - const char **error; -}; -struct resolve_peer { - struct peer peer; - struct peer *inner_peer; + char *node; + char *service; + int flags; + + int err; + struct addrinfo *addrs; }; static pthread_t resolve_thread; static int resolve_write_fd; -static void resolve_handler(struct peer *peer) { - struct resolve_peer *resolve_peer = (struct resolve_peer *) peer; - - assert(!close(resolve_peer->peer.fd)); - - peer_call(resolve_peer->inner_peer); - free(resolve_peer); -} - static void *resolve_main(void *arg) { int fd = (int) (intptr_t) arg; - struct resolve_request *request; - ssize_t ret; - while ((ret = read(fd, &request, sizeof(request))) == sizeof(request)) { + struct resolve *res; + ssize_t len; + while ((len = read(fd, &res, sizeof(res))) == sizeof(res)) { struct addrinfo hints = { .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM, - .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | request->flags, + .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | res->flags, }; - int err = getaddrinfo(request->node, request->service, &hints, request->addrs); - if (err) { - *request->addrs = NULL; - *request->error = gai_strerror(err); - } else { - *request->error = NULL; - } - close(request->fd); - free(request); + res->err = getaddrinfo(res->node, res->service, &hints, &res->addrs); + free(res->node); + free(res->service); + res->node = res->service = NULL; + assert(write(res->fd, &res, sizeof(res)) == sizeof(res)); + close(res->fd); + res->fd = -1; } - assert(!ret); + assert(!len); assert(!close(fd)); return NULL; } @@ -72,22 +59,28 @@ void resolve_cleanup() { assert(!pthread_join(resolve_thread, NULL)); } -void resolve(struct peer *peer, const char *node, const char *service, int flags, struct addrinfo **addrs, const char **error) { +void resolve(struct peer *peer, const char *node, const char *service, int flags) { int fds[2]; assert(!pipe2(fds, O_CLOEXEC)); - struct resolve_request *request = malloc(sizeof(*request)); - request->fd = fds[1]; - request->node = node; - request->service = service; - request->flags = flags; - request->addrs = addrs; - request->error = error; - assert(write(resolve_write_fd, &request, sizeof(request)) == sizeof(request)); + struct resolve *res = malloc(sizeof(*res)); + res->fd = fds[1]; + res->node = strdup(node); + res->service = strdup(service); + res->flags = flags; + assert(write(resolve_write_fd, &res, sizeof(res)) == sizeof(res)); - struct resolve_peer *resolve_peer = malloc(sizeof(*resolve_peer)); - resolve_peer->peer.fd = fds[0]; - resolve_peer->peer.event_handler = resolve_handler; - resolve_peer->inner_peer = peer; - peer_epoll_add((struct peer *) resolve_peer, EPOLLRDHUP); + peer->fd = fds[0]; + peer_epoll_add(peer, EPOLLIN); +} + +int resolve_result(struct peer *peer, struct addrinfo **addrs) { + struct resolve *res; + assert(read(peer->fd, &res, sizeof(res)) == sizeof(res)); + assert(!close(peer->fd)); + peer->fd = -1; + *addrs = res->addrs; + int err = res->err; + free(res); + return err; } diff --git a/adsbus/resolve.h b/adsbus/resolve.h index 92afb8c..8e91df5 100644 --- a/adsbus/resolve.h +++ b/adsbus/resolve.h @@ -5,4 +5,5 @@ struct addrinfo; void resolve_init(void); void resolve_cleanup(void); -void resolve(struct peer *, const char *, const char *, int, struct addrinfo **, const char **); +void resolve(struct peer *, const char *, const char *, int); +int resolve_result(struct peer *, struct addrinfo **addrs); From adfc23cba7c5be253dd2abcbc2d157030b8b1615 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 1 Mar 2016 19:43:11 -0800 Subject: [PATCH 34/35] Split out generic resolver functions. --- adsbus/Makefile | 2 +- adsbus/asyncaddrinfo.c | 125 +++++++++++++++++++++++++++++++++++++++++ adsbus/asyncaddrinfo.h | 8 +++ adsbus/resolve.c | 77 ++++--------------------- 4 files changed, 146 insertions(+), 66 deletions(-) create mode 100644 adsbus/asyncaddrinfo.c create mode 100644 adsbus/asyncaddrinfo.h diff --git a/adsbus/Makefile b/adsbus/Makefile index beec994..65b33cf 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -13,7 +13,7 @@ ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json - OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o OBJ_FLOW = flow.o receive.o send.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o -OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o +OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o all: adsbus diff --git a/adsbus/asyncaddrinfo.c b/adsbus/asyncaddrinfo.c new file mode 100644 index 0000000..74edf69 --- /dev/null +++ b/adsbus/asyncaddrinfo.c @@ -0,0 +1,125 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "asyncaddrinfo.h" + +struct asyncaddrinfo_resolution { + int return_fd; + + char *node; + char *service; + struct addrinfo _hints, *hints; + + int err; + struct addrinfo *addrs; +}; + +static size_t asyncaddrinfo_num_threads; +static pthread_t *asyncaddrinfo_threads = NULL; +static int asyncaddrinfo_write_fd; + +static void *asyncaddrinfo_main(void *arg) { + int fd = (int) (intptr_t) arg; + struct asyncaddrinfo_resolution *res; + ssize_t len; + while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) { + res->err = getaddrinfo(res->node, res->service, res->hints, &res->addrs); + int return_fd = res->return_fd; + res->return_fd = -1; + assert(send(return_fd, &res, sizeof(res), MSG_EOR) == sizeof(res)); + // Main thread now owns res + assert(!close(return_fd)); + } + assert(!len); + assert(!close(fd)); + return NULL; +} + +static void asyncaddrinfo_del(struct asyncaddrinfo_resolution *res) { + if (res->node) { + free(res->node); + res->node = NULL; + } + if (res->service) { + free(res->service); + res->service = NULL; + } + free(res); +} + +void asyncaddrinfo_init(size_t threads) { + assert(!asyncaddrinfo_threads); + + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); + asyncaddrinfo_write_fd = fds[1]; + + asyncaddrinfo_num_threads = threads; + asyncaddrinfo_threads = malloc(asyncaddrinfo_num_threads * sizeof(*asyncaddrinfo_threads)); + assert(asyncaddrinfo_threads); + + for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { + int subfd = dup(fds[0]); + assert(subfd >= 0); + assert(!pthread_create(&asyncaddrinfo_threads[i], NULL, asyncaddrinfo_main, (void *) (intptr_t) subfd)); + } + assert(!close(fds[0])); +} + +void asyncaddrinfo_cleanup() { + assert(asyncaddrinfo_threads); + assert(!close(asyncaddrinfo_write_fd)); + asyncaddrinfo_write_fd = -1; + for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { + assert(!pthread_join(asyncaddrinfo_threads[i], NULL)); + } + free(asyncaddrinfo_threads); + asyncaddrinfo_threads = NULL; +} + +int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints) { + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); + + struct asyncaddrinfo_resolution *res = malloc(sizeof(*res)); + assert(res); + res->return_fd = fds[1]; + if (node) { + res->node = strdup(node); + assert(res->node); + } else { + res->node = NULL; + } + if (service) { + res->service = strdup(service); + assert(res->service); + } else { + res->service = NULL; + } + if (hints) { + memcpy(&res->_hints, hints, sizeof(res->_hints)); + res->hints = &res->_hints; + } else { + res->hints = NULL; + } + assert(send(asyncaddrinfo_write_fd, &res, sizeof(res), MSG_EOR) == sizeof(res)); + // Resolve thread now owns res + + return fds[0]; +} + +int asyncaddrinfo_result(int fd, struct addrinfo **addrs) { + struct asyncaddrinfo_resolution *res; + assert(recv(fd, &res, sizeof(res), 0) == sizeof(res)); + assert(!close(fd)); + *addrs = res->addrs; + int err = res->err; + asyncaddrinfo_del(res); + return err; +} diff --git a/adsbus/asyncaddrinfo.h b/adsbus/asyncaddrinfo.h new file mode 100644 index 0000000..a0a7ef6 --- /dev/null +++ b/adsbus/asyncaddrinfo.h @@ -0,0 +1,8 @@ +#pragma once + +struct addrinfo; + +void asyncaddrinfo_init(size_t threads); +void asyncaddrinfo_cleanup(void); +int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints); +int asyncaddrinfo_result(int fd, struct addrinfo **addrs); diff --git a/adsbus/resolve.c b/adsbus/resolve.c index 51990a2..35d4864 100644 --- a/adsbus/resolve.c +++ b/adsbus/resolve.c @@ -1,86 +1,33 @@ -#include -#include +#include +#include #include -#include -#include -#include -#include +#include "asyncaddrinfo.h" #include "peer.h" #include "resolve.h" -struct resolve { - int fd; - - char *node; - char *service; - int flags; - - int err; - struct addrinfo *addrs; -}; - -static pthread_t resolve_thread; -static int resolve_write_fd; - -static void *resolve_main(void *arg) { - int fd = (int) (intptr_t) arg; - struct resolve *res; - ssize_t len; - while ((len = read(fd, &res, sizeof(res))) == sizeof(res)) { - struct addrinfo hints = { - .ai_family = AF_UNSPEC, - .ai_socktype = SOCK_STREAM, - .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | res->flags, - }; - res->err = getaddrinfo(res->node, res->service, &hints, &res->addrs); - free(res->node); - free(res->service); - res->node = res->service = NULL; - assert(write(res->fd, &res, sizeof(res)) == sizeof(res)); - close(res->fd); - res->fd = -1; - } - assert(!len); - assert(!close(fd)); - return NULL; -} - void resolve_init() { - int fds[2]; - assert(!pipe2(fds, O_CLOEXEC)); - resolve_write_fd = fds[1]; - assert(!pthread_create(&resolve_thread, NULL, resolve_main, (void *) (intptr_t) fds[0])); + asyncaddrinfo_init(2); } void resolve_cleanup() { - assert(!close(resolve_write_fd)); - assert(!pthread_join(resolve_thread, NULL)); + asyncaddrinfo_cleanup(); } void resolve(struct peer *peer, const char *node, const char *service, int flags) { - int fds[2]; - assert(!pipe2(fds, O_CLOEXEC)); + struct addrinfo hints = { + .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags, + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }; - struct resolve *res = malloc(sizeof(*res)); - res->fd = fds[1]; - res->node = strdup(node); - res->service = strdup(service); - res->flags = flags; - assert(write(resolve_write_fd, &res, sizeof(res)) == sizeof(res)); - - peer->fd = fds[0]; + peer->fd = asyncaddrinfo_resolve(node, service, &hints); peer_epoll_add(peer, EPOLLIN); } int resolve_result(struct peer *peer, struct addrinfo **addrs) { - struct resolve *res; - assert(read(peer->fd, &res, sizeof(res)) == sizeof(res)); - assert(!close(peer->fd)); + int err = asyncaddrinfo_result(peer->fd, addrs); peer->fd = -1; - *addrs = res->addrs; - int err = res->err; - free(res); return err; } From 761460886aea8c09db87e26c7656ee0ba26a5017 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 1 Mar 2016 20:17:58 -0800 Subject: [PATCH 35/35] Sync with upstream --- adsbus/asyncaddrinfo.c | 5 +++-- adsbus/asyncaddrinfo.h | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/adsbus/asyncaddrinfo.c b/adsbus/asyncaddrinfo.c index 74edf69..c2578c4 100644 --- a/adsbus/asyncaddrinfo.c +++ b/adsbus/asyncaddrinfo.c @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -65,7 +66,7 @@ void asyncaddrinfo_init(size_t threads) { assert(asyncaddrinfo_threads); for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { - int subfd = dup(fds[0]); + int subfd = fcntl(fds[0], F_DUPFD_CLOEXEC, 0); assert(subfd >= 0); assert(!pthread_create(&asyncaddrinfo_threads[i], NULL, asyncaddrinfo_main, (void *) (intptr_t) subfd)); } @@ -83,7 +84,7 @@ void asyncaddrinfo_cleanup() { asyncaddrinfo_threads = NULL; } -int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints) { +int asyncaddrinfo_resolve(const char *node, const char *service, const struct addrinfo *hints) { int fds[2]; assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); diff --git a/adsbus/asyncaddrinfo.h b/adsbus/asyncaddrinfo.h index a0a7ef6..bd42d09 100644 --- a/adsbus/asyncaddrinfo.h +++ b/adsbus/asyncaddrinfo.h @@ -4,5 +4,5 @@ struct addrinfo; void asyncaddrinfo_init(size_t threads); void asyncaddrinfo_cleanup(void); -int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints); +int asyncaddrinfo_resolve(const char *node, const char *service, const struct addrinfo *hints); int asyncaddrinfo_result(int fd, struct addrinfo **addrs);