diff --git a/adsbus/.gitignore b/adsbus/.gitignore index 55c3eac..14d0f0c 100644 --- a/adsbus/.gitignore +++ b/adsbus/.gitignore @@ -33,3 +33,6 @@ # Binaries adsbus + +# Generated +build.h diff --git a/adsbus/Makefile b/adsbus/Makefile index fe6dabd..d5549a0 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -10,20 +10,26 @@ VALGRIND ?= valgrind VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats -OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o +OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o stdinout.o OBJ_FLOW = flow.o receive.o send.o send_receive.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o -OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o +OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o log.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o all: adsbus clean: - rm -rf *.o adsbus testout findings + rm -rf *.o adsbus testout findings build.h -%.o: %.c *.h +%.o: %.c *.h build.h $(COMP) -c $(CFLAGS) $< -o $@ +build.h: + echo "#define GIT_LAST_CHANGE \"$$(git log --format=%H -n 1)\"" > $@ + echo "#define GIT_LOCAL_CLEAN $$(git diff --exit-code > /dev/null && echo true || echo false)" >> $@ + echo "#define HOSTNAME \"$$(hostname --fqdn)\"" >> $@ + echo "#define USERNAME \"$$(whoami)\"" >> $@ + adsb.pb-c.c: ../proto/adsb.proto protoc-c --c_out=./ --proto_path=$(dir $<) $< diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index 5621eb3..c1bdded 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -1,9 +1,6 @@ #include #include -#include -#include #include -#include #include #include #include @@ -14,6 +11,7 @@ #include "hex.h" #include "incoming.h" #include "json.h" +#include "log.h" #include "opts.h" #include "outgoing.h" #include "peer.h" @@ -25,164 +23,43 @@ #include "send_receive.h" #include "server.h" #include "stats.h" +#include "stdinout.h" #include "wakeup.h" -static void print_usage(const char *name) { - fprintf(stderr, - "\n" - "Usage: %s [OPTION]...\n" - "\n" - "Options:\n" - "\t--help\n" - "\t--connect-receive=HOST/PORT\n" - "\t--connect-send=FORMAT=HOST/PORT\n" - "\t--connect-send-receive=FORMAT=HOST/PORT\n" - "\t--listen-receive=[HOST/]PORT\n" - "\t--listen-send=FORMAT=[HOST/]PORT\n" - "\t--listen-send-receive=FORMAT=[HOST/]PORT\n" - "\t--file-read=PATH\n" - "\t--file-write=FORMAT=PATH\n" - "\t--file-write-read=FORMAT=PATH\n" - "\t--file-append=FORMAT=PATH\n" - "\t--file-append-read=FORMAT=PATH\n" - "\t--exec-receive=COMMAND\n" - "\t--exec-send=FORMAT=COMMAND\n" - "\t--exec-send-receive=FORMAT=COMMAND\n" - "\t--stdin\n" - "\t--stdout=FORMAT\n" - , name); - receive_print_usage(); - send_print_usage(); -} - -static bool parse_opts(int argc, char *argv[]) { - static struct option long_options[] = { - {"connect-receive", required_argument, 0, 'c'}, - {"connect-send", required_argument, 0, 's'}, - {"connect-send-receive", required_argument, 0, 't'}, - {"listen-receive", required_argument, 0, 'l'}, - {"listen-send", required_argument, 0, 'm'}, - {"listen-send-receive", required_argument, 0, 'n'}, - {"file-read", required_argument, 0, 'r'}, - {"file-write", required_argument, 0, 'w'}, - {"file-write-read", required_argument, 0, 'x'}, - {"file-append", required_argument, 0, 'a'}, - {"file-append-read", required_argument, 0, 'b'}, - {"exec-receive", required_argument, 0, 'e'}, - {"exec-send", required_argument, 0, 'f'}, - {"exec-send-receive", required_argument, 0, 'g'}, - {"stdin", no_argument, 0, 'i'}, - {"stdout", required_argument, 0, 'o'}, - {"help", no_argument, 0, 'h'}, - {0, 0, 0, 0 }, - }; - - int opt; - while ((opt = getopt_long_only(argc, argv, "", long_options, NULL)) != -1) { - bool (*handler)(char *) = NULL; - switch (opt) { - case 'c': - handler = opts_add_connect_receive; - break; - - case 's': - handler = opts_add_connect_send; - break; - - case 't': - handler = opts_add_connect_send_receive; - break; - - case 'l': - handler = opts_add_listen_receive; - break; - - case 'm': - handler = opts_add_listen_send; - break; - - case 'n': - handler = opts_add_listen_send_receive; - break; - - case 'r': - handler = opts_add_file_read; - break; - - case 'w': - handler = opts_add_file_write; - break; - - case 'x': - handler = opts_add_file_write_read; - break; - - case 'a': - handler = opts_add_file_append; - break; - - case 'b': - handler = opts_add_file_append_read; - break; - - case 'e': - handler = opts_add_exec_receive; - break; - - case 'f': - handler = opts_add_exec_send; - break; - - case 'g': - handler = opts_add_exec_send_receive; - break; - - case 'i': - handler = opts_add_stdin; - break; - - case 'o': - handler = opts_add_stdout; - break; - - case 'h': - default: - print_usage(argv[0]); - return false; - } - - if (handler) { - if (!handler(optarg)) { - fprintf(stderr, "Invalid flag value: %s\n", optarg); - print_usage(argv[0]); - return false; - } - } - } - - if (optind != argc) { - fprintf(stderr, "Not a flag: %s\n", argv[optind]); - print_usage(argv[0]); - return false; - } - - return true; -} - static void reopen(int fd, char *path, int flags) { // Presumes that all fds < fd are open assert(!close(fd)); - assert(open(path, flags | O_CLOEXEC) == fd); + assert(open(path, flags | O_CLOEXEC | O_NOCTTY) == fd); +} + +static void adsbus_opts_add() { + // This order controls the order in --help, but nothing else. + server_opts_add(); + log_opts_add(); + outgoing_opts_add(); + incoming_opts_add(); + exec_opts_add(); + file_opts_add(); + stdinout_opts_add(); } int main(int argc, char *argv[]) { + adsbus_opts_add(); + + opts_init(argc, argv); + hex_init(); rand_init(); - resolve_init(); + + log_init(); server_init(); + + resolve_init(); wakeup_init(); peer_init(); + log_init_peer(); + receive_init(); send_init(); @@ -191,9 +68,11 @@ int main(int argc, char *argv[]) { proto_init(); stats_init(); - if (!parse_opts(argc, argv)) { - peer_shutdown(0); - } + outgoing_init(); + incoming_init(); + exec_init(); + file_init(); + stdinout_init(); reopen(STDIN_FILENO, "/dev/null", O_RDONLY); reopen(STDOUT_FILENO, "/dev/full", O_WRONLY); @@ -216,11 +95,14 @@ int main(int argc, char *argv[]) { rand_cleanup(); wakeup_cleanup(); + log_cleanup_peer(); + peer_cleanup(); + log_cleanup(); + assert(!close(STDIN_FILENO)); assert(!close(STDOUT_FILENO)); - assert(!close(STDERR_FILENO)); return EXIT_SUCCESS; } diff --git a/adsbus/airspy_adsb.c b/adsbus/airspy_adsb.c index eeac285..876bbc1 100644 --- a/adsbus/airspy_adsb.c +++ b/adsbus/airspy_adsb.c @@ -1,12 +1,9 @@ #include -#include -#include #include "buf.h" #include "hex.h" #include "packet.h" #include "receive.h" -#include "uuid.h" #include "airspy_adsb.h" diff --git a/adsbus/asyncaddrinfo.c b/adsbus/asyncaddrinfo.c index c2578c4..6e2411a 100644 --- a/adsbus/asyncaddrinfo.c +++ b/adsbus/asyncaddrinfo.c @@ -2,10 +2,12 @@ #include #include #include +#include #include #include -#include #include +#include +#include #include #include "asyncaddrinfo.h" @@ -27,6 +29,11 @@ static int asyncaddrinfo_write_fd; static void *asyncaddrinfo_main(void *arg) { int fd = (int) (intptr_t) arg; + + sigset_t sigmask; + assert(!sigfillset(&sigmask)); + assert(!pthread_sigmask(SIG_BLOCK, &sigmask, NULL)); + struct asyncaddrinfo_resolution *res; ssize_t len; while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) { diff --git a/adsbus/beast.c b/adsbus/beast.c index b808210..31c2846 100644 --- a/adsbus/beast.c +++ b/adsbus/beast.c @@ -1,7 +1,5 @@ #include -#include #include -#include #include "buf.h" #include "packet.h" diff --git a/adsbus/exec.c b/adsbus/exec.c index e072c74..f5826ed 100644 --- a/adsbus/exec.c +++ b/adsbus/exec.c @@ -1,17 +1,20 @@ #include #include #include -#include #include #include +#include #include #include #include -#include "buf.h" #include "flow.h" -#include "list.h" +#include "log.h" +#include "opts.h" #include "peer.h" +#include "receive.h" +#include "send.h" +#include "send_receive.h" #include "uuid.h" #include "wakeup.h" @@ -19,6 +22,7 @@ struct exec { struct peer peer; + struct peer log_peer; uint8_t id[UUID_LEN]; char *command; struct flow *flow; @@ -28,89 +32,164 @@ struct exec { }; static struct list_head exec_head = LIST_HEAD_INIT(exec_head); +static opts_group exec_opts; + +static char log_module = 'E'; static void exec_spawn_wrapper(struct peer *); +static void exec_harvest(struct exec *exec) { + if (exec->child > 0) { + int status; + assert(waitpid(exec->child, &status, 0) == exec->child); + exec->child = -1; + if (WIFEXITED(status)) { + LOG(exec->id, "Client exited with status %d", WEXITSTATUS(status)); + } else { + assert(WIFSIGNALED(status)); + LOG(exec->id, "Client exited with signal %d", WTERMSIG(status)); + } + } + peer_close(&exec->log_peer); +} + static void exec_del(struct exec *exec) { flow_ref_dec(exec->flow); if (exec->child > 0) { - fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child); + LOG(exec->id, "Sending SIGTERM to child process %d", exec->child); // Racy with the process terminating, so don't assert on it kill(exec->child, SIGTERM); - assert(waitpid(exec->child, NULL, 0) == exec->child); } + exec_harvest(exec); list_del(&exec->exec_list); free(exec->command); free(exec); } static void exec_close_handler(struct peer *peer) { - struct exec *exec = (struct exec *) peer; - int status; - assert(waitpid(exec->child, &status, WNOHANG) == exec->child); - exec->child = -1; - if (WIFEXITED(status)) { - fprintf(stderr, "E %s: Client exited with status %d\n", exec->id, WEXITSTATUS(status)); - } else { - assert(WIFSIGNALED(status)); - fprintf(stderr, "E %s: Client exited with signal %d\n", exec->id, WTERMSIG(status)); - } + struct exec *exec = container_of(peer, struct exec, peer); + exec_harvest(exec); uint32_t delay = wakeup_get_retry_delay_ms(1); - fprintf(stderr, "E %s: Will retry in %ds\n", exec->id, delay / 1000); + LOG(exec->id, "Will retry in %ds", delay / 1000); exec->peer.event_handler = exec_spawn_wrapper; - wakeup_add((struct peer *) exec, delay); + wakeup_add(&exec->peer, delay); } -static void exec_parent(struct exec *exec, pid_t child, int fd) { +static void exec_log_handler(struct peer *peer) { + // Do you believe in magic? + struct exec *exec = container_of(peer, struct exec, log_peer); + + char linebuf[4096]; + ssize_t ret = read(exec->log_peer.fd, linebuf, 4096); + if (ret <= 0) { + LOG(exec->id, "Log input stream closed"); + peer_close(&exec->log_peer); + return; + } + size_t len = (size_t) ret; + char *iter = linebuf, *eol; + while ((eol = memchr(iter, '\n', len))) { + assert(eol >= iter); + size_t linelen = (size_t) (eol - iter); + LOG(exec->id, "(child output) %.*s", (int) linelen, iter); + iter += (linelen + 1); + len -= (linelen + 1); + } + if (len) { + LOG(exec->id, "(child output) %.*s", (int) len, iter); + } +} + +static void exec_parent(struct exec *exec, pid_t child, int data_fd, int log_fd) { exec->child = child; - fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child); + LOG(exec->id, "Child started as process %d", exec->child); + + exec->log_peer.fd = log_fd; + exec->log_peer.event_handler = exec_log_handler; + peer_epoll_add(&exec->log_peer, EPOLLIN); exec->peer.event_handler = exec_close_handler; - if (!flow_new_send_hello(fd, exec->flow, exec->passthrough, (struct peer *) exec)) { - exec_close_handler((struct peer *) exec); + if (!flow_new_send_hello(data_fd, exec->flow, exec->passthrough, &exec->peer)) { + exec_close_handler(&exec->peer); return; } } -static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { +static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int data_fd, int log_fd) { assert(setsid() != -1); // We leave stderr open from child to parent // Other than that, fds should have CLOEXEC set - if (fd != 0) { - assert(dup2(fd, 0) == 0); + if (data_fd != STDIN_FILENO) { + assert(dup2(data_fd, STDIN_FILENO) == STDIN_FILENO); } - if (fd != 1) { - assert(dup2(fd, 1) == 1); + if (data_fd != STDOUT_FILENO) { + assert(dup2(data_fd, STDOUT_FILENO) == STDOUT_FILENO); } - if (fd != 0 && fd != 1) { - assert(!close(fd)); + if (data_fd != STDIN_FILENO && data_fd != STDOUT_FILENO) { + assert(!close(data_fd)); + } + if (log_fd != STDERR_FILENO) { + assert(dup2(log_fd, STDERR_FILENO) == STDERR_FILENO); + assert(!close(log_fd)); } assert(!execl("/bin/sh", "sh", "-c", exec->command, NULL)); abort(); } static void exec_spawn(struct exec *exec) { - fprintf(stderr, "E %s: Executing: %s\n", exec->id, exec->command); - int fds[2]; - assert(!socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds)); + LOG(exec->id, "Executing: %s", exec->command); + int data_fds[2], log_fds[2]; + // Leave these sockets blocking; we move in lock step with subprograms + assert(!socketpair(AF_UNIX, SOCK_STREAM, 0, data_fds)); + assert(!socketpair(AF_UNIX, SOCK_STREAM, 0, log_fds)); int res = fork(); assert(res >= 0); if (res) { - assert(!close(fds[1])); - exec_parent(exec, res, fds[0]); + assert(!close(data_fds[1])); + assert(!close(log_fds[1])); + assert(!shutdown(log_fds[0], SHUT_WR)); + exec_parent(exec, res, data_fds[0], log_fds[0]); } else { - assert(!close(fds[0])); - exec_child(exec, fds[1]); + assert(!close(data_fds[0])); + assert(!close(log_fds[0])); + exec_child(exec, data_fds[1], log_fds[1]); } } static void exec_spawn_wrapper(struct peer *peer) { - struct exec *exec = (struct exec *) peer; + struct exec *exec = container_of(peer, struct exec, peer); exec_spawn(exec); } +static bool exec_add(const char *cmd, struct flow *flow, void *passthrough) { + exec_new(cmd, flow, passthrough); + return true; +} + +static bool exec_receive(const char *arg) { + return exec_add(arg, receive_flow, NULL); +} + +static bool exec_send(const char *arg) { + return send_add(exec_add, send_flow, arg); +} + +static bool exec_send_receive(const char *arg) { + return send_add(exec_add, send_receive_flow, arg); +} + +void exec_opts_add() { + opts_add("exec-receive", "COMMAND", exec_receive, exec_opts); + opts_add("exec-send", "FORMAT=COMMAND", exec_send, exec_opts); + opts_add("exec-send-receive", "FORMAT=COMMAND", exec_send_receive, exec_opts); +} + +void exec_init() { + opts_call(exec_opts); +} + void exec_cleanup() { struct exec *iter, *next; list_for_each_entry_safe(iter, next, &exec_head, exec_list) { @@ -118,7 +197,7 @@ void exec_cleanup() { } } -void exec_new(char *command, struct flow *flow, void *passthrough) { +void exec_new(const char *command, struct flow *flow, void *passthrough) { flow_ref_inc(flow); struct exec *exec = malloc(sizeof(*exec)); diff --git a/adsbus/exec.h b/adsbus/exec.h index 0c8c525..adb0c79 100644 --- a/adsbus/exec.h +++ b/adsbus/exec.h @@ -2,5 +2,7 @@ struct flow; +void exec_opts_add(void); +void exec_init(void); void exec_cleanup(void); -void exec_new(char *, struct flow *, void *); +void exec_new(const char *, struct flow *, void *); diff --git a/adsbus/file.c b/adsbus/file.c index 2d09009..89f1782 100644 --- a/adsbus/file.c +++ b/adsbus/file.c @@ -2,18 +2,18 @@ #include #include #include -#include #include -#include #include #include #include #include "flow.h" -#include "list.h" +#include "log.h" +#include "opts.h" #include "peer.h" #include "receive.h" #include "send.h" +#include "send_receive.h" #include "uuid.h" #include "wakeup.h" @@ -32,6 +32,9 @@ struct file { }; static struct list_head file_head = LIST_HEAD_INIT(file_head); +static opts_group file_opts; + +static char log_module = 'F'; static void file_open_wrapper(struct peer *); @@ -67,14 +70,14 @@ static void file_del(struct file *file) { static void file_retry(struct file *file) { uint32_t delay = wakeup_get_retry_delay_ms(file->attempt++); - fprintf(stderr, "F %s: Will retry in %ds\n", file->id, delay / 1000); + LOG(file->id, "Will retry in %ds", delay / 1000); file->peer.event_handler = file_open_wrapper; - wakeup_add((struct peer *) file, delay); + wakeup_add(&file->peer, delay); } static void file_handle_close(struct peer *peer) { - struct file *file = (struct file *) peer; - fprintf(stderr, "F %s: File closed: %s\n", file->id, file->path); + struct file *file = container_of(peer, struct file, peer); + LOG(file->id, "File closed: %s", file->path); if (file->retry) { file_retry(file); @@ -84,10 +87,10 @@ static void file_handle_close(struct peer *peer) { } static void file_open(struct file *file) { - fprintf(stderr, "F %s: Opening file: %s\n", file->id, file->path); - int fd = open(file->path, file->flags | O_CLOEXEC, S_IRUSR | S_IWUSR); + LOG(file->id, "Opening file: %s", file->path); + int fd = open(file->path, file->flags | O_CLOEXEC | O_NOCTTY, S_IRUSR | S_IWUSR); if (fd == -1) { - fprintf(stderr, "F %s: Error opening file: %s\n", file->id, strerror(errno)); + LOG(file->id, "Error opening file: %s", strerror(errno)); file_retry(file); return; } @@ -95,19 +98,19 @@ static void file_open(struct file *file) { file->retry = file_should_retry(fd, file); file->peer.event_handler = file_handle_close; file->attempt = 0; - if (!flow_new_send_hello(fd, file->flow, file->passthrough, (struct peer *) file)) { - fprintf(stderr, "F %s: Error writing greeting\n", file->id); + if (!flow_new_send_hello(fd, file->flow, file->passthrough, &file->peer)) { + LOG(file->id, "Error writing greeting"); file_retry(file); return; } } static void file_open_wrapper(struct peer *peer) { - struct file *file = (struct file *) peer; + struct file *file = container_of(peer, struct file, peer); file_open(file); } -static void file_new(char *path, int flags, struct flow *flow, void *passthrough) { +static void file_new(const char *path, int flags, struct flow *flow, void *passthrough) { flow_ref_inc(flow); struct file *file = malloc(sizeof(*file)); @@ -126,6 +129,49 @@ static void file_new(char *path, int flags, struct flow *flow, void *passthrough file_open(file); } +static bool file_write_add(const char *path, struct flow *flow, void *passthrough) { + file_write_new(path, flow, passthrough); + return true; +} + +static bool file_append_add(const char *path, struct flow *flow, void *passthrough) { + file_append_new(path, flow, passthrough); + return true; +} + +static bool file_read(const char *arg) { + file_read_new(arg, receive_flow, NULL); + return true; +} + +static bool file_write(const char *arg) { + return send_add(file_write_add, send_flow, arg); +} + +static bool file_write_read(const char *arg) { + return send_add(file_write_add, send_receive_flow, arg); +} + +static bool file_append(const char *arg) { + return send_add(file_append_add, send_flow, arg); +} + +static bool file_append_read(const char *arg) { + return send_add(file_append_add, send_receive_flow, arg); +} + +void file_opts_add() { + opts_add("file-read", "PATH", file_read, file_opts); + opts_add("file-write", "FORMAT=PATH", file_write, file_opts); + opts_add("file-write-read", "FORMAT=PATH", file_write_read, file_opts); + opts_add("file-append", "FORMAT=PATH", file_append, file_opts); + opts_add("file-append-read", "FORMAT=PATH", file_append_read, file_opts); +} + +void file_init() { + opts_call(file_opts); +} + void file_cleanup() { struct file *iter, *next; list_for_each_entry_safe(iter, next, &file_head, file_list) { @@ -133,14 +179,14 @@ void file_cleanup() { } } -void file_read_new(char *path, struct flow *flow, void *passthrough) { +void file_read_new(const char *path, struct flow *flow, void *passthrough) { file_new(path, O_RDONLY, flow, passthrough); } -void file_write_new(char *path, struct flow *flow, void *passthrough) { +void file_write_new(const char *path, struct flow *flow, void *passthrough) { file_new(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC, flow, passthrough); } -void file_append_new(char *path, struct flow *flow, void *passthrough) { +void file_append_new(const char *path, struct flow *flow, void *passthrough) { file_new(path, O_WRONLY | O_CREAT | O_NOFOLLOW, flow, passthrough); } diff --git a/adsbus/file.h b/adsbus/file.h index 7727a49..659b626 100644 --- a/adsbus/file.h +++ b/adsbus/file.h @@ -2,7 +2,9 @@ struct flow; +void file_opts_add(void); +void file_init(void); void file_cleanup(void); -void file_read_new(char *, struct flow *, void *); -void file_write_new(char *, struct flow *, void *); -void file_append_new(char *, struct flow *, void *); +void file_read_new(const char *, struct flow *, void *); +void file_write_new(const char *, struct flow *, void *); +void file_append_new(const char *, struct flow *, void *); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index daced14..60e8cf5 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -1,18 +1,21 @@ #include -#include +#include +#include #include +#include #include #include -#include -#include -#include +#include #include -#include "buf.h" #include "flow.h" -#include "list.h" +#include "log.h" +#include "opts.h" #include "peer.h" +#include "receive.h" #include "resolve.h" +#include "send.h" +#include "send_receive.h" #include "socket.h" #include "wakeup.h" #include "uuid.h" @@ -31,25 +34,28 @@ struct incoming { }; static struct list_head incoming_head = LIST_HEAD_INIT(incoming_head); +static opts_group incoming_opts; + +static char log_module = 'I'; static void incoming_resolve_wrapper(struct peer *); static void incoming_retry(struct incoming *incoming) { uint32_t delay = wakeup_get_retry_delay_ms(incoming->attempt++); - fprintf(stderr, "I %s: Will retry in %ds\n", incoming->id, delay / 1000); + LOG(incoming->id, "Will retry in %ds", delay / 1000); incoming->peer.event_handler = incoming_resolve_wrapper; - wakeup_add((struct peer *) incoming, delay); + wakeup_add(&incoming->peer, delay); } static void incoming_handler(struct peer *peer) { - struct incoming *incoming = (struct incoming *) peer; + struct incoming *incoming = container_of(peer, struct incoming, peer); struct sockaddr peer_addr, local_addr; socklen_t peer_addrlen = sizeof(peer_addr), local_addrlen = sizeof(local_addr); int fd = accept4(incoming->peer.fd, &peer_addr, &peer_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); if (fd == -1) { - fprintf(stderr, "I %s: Failed to accept new connection on %s/%s: %s\n", incoming->id, incoming->node, incoming->service, strerror(errno)); + LOG(incoming->id, "Failed to accept new connection on %s/%s: %s", incoming->node, incoming->service, strerror(errno)); return; } @@ -58,8 +64,7 @@ static void incoming_handler(struct peer *peer) { assert(getnameinfo(&peer_addr, peer_addrlen, peer_hbuf, sizeof(peer_hbuf), peer_sbuf, sizeof(peer_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(&local_addr, local_addrlen, local_hbuf, sizeof(local_hbuf), local_sbuf, sizeof(local_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - fprintf(stderr, "I %s: New incoming connection on %s/%s (%s/%s) from %s/%s\n", - incoming->id, + LOG(incoming->id, "New incoming connection on %s/%s (%s/%s) from %s/%s", incoming->node, incoming->service, local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); @@ -67,16 +72,14 @@ static void incoming_handler(struct peer *peer) { flow_socket_connected(fd, incoming->flow); if (!flow_new_send_hello(fd, incoming->flow, incoming->passthrough, NULL)) { - fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); + LOG(incoming->id, "Error writing greeting"); return; } } static void incoming_del(struct incoming *incoming) { flow_ref_dec(incoming->flow); - if (incoming->peer.fd >= 0) { - assert(!close(incoming->peer.fd)); - } + peer_close(&incoming->peer); list_del(&incoming->incoming_list); free(incoming->node); free(incoming->service); @@ -84,12 +87,12 @@ static void incoming_del(struct incoming *incoming) { } static void incoming_listen(struct peer *peer) { - struct incoming *incoming = (struct incoming *) peer; + struct incoming *incoming = container_of(peer, struct incoming, peer); struct addrinfo *addrs; int err = resolve_result(peer, &addrs); if (err) { - fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(err)); + LOG(incoming->id, "Failed to resolve %s/%s: %s", incoming->node, incoming->service, gai_strerror(err)); incoming_retry(incoming); return; } @@ -98,7 +101,7 @@ static void incoming_listen(struct peer *peer) { for (addr = addrs; addr; addr = addr->ai_next) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf); + LOG(incoming->id, "Listening on %s/%s...", hbuf, sbuf); incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol); assert(incoming->peer.fd >= 0); @@ -106,7 +109,7 @@ static void incoming_listen(struct peer *peer) { socket_pre_bind(incoming->peer.fd); if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { - fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); + LOG(incoming->id, "Failed to bind to %s/%s: %s", hbuf, sbuf, strerror(errno)); assert(!close(incoming->peer.fd)); continue; } @@ -122,24 +125,57 @@ static void incoming_listen(struct peer *peer) { freeaddrinfo(addrs); if (addr == NULL) { - fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service); + LOG(incoming->id, "Failed to bind any addresses for %s/%s...", incoming->node, incoming->service); incoming_retry(incoming); return; } incoming->attempt = 0; incoming->peer.event_handler = incoming_handler; - peer_epoll_add((struct peer *) incoming, EPOLLIN); + peer_epoll_add(&incoming->peer, EPOLLIN); } static void incoming_resolve(struct incoming *incoming) { - fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service); + LOG(incoming->id, "Resolving %s/%s...", incoming->node, incoming->service); incoming->peer.event_handler = incoming_listen; - resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE); + resolve(&incoming->peer, incoming->node, incoming->service, AI_PASSIVE); } static void incoming_resolve_wrapper(struct peer *peer) { - incoming_resolve((struct incoming *) peer); + incoming_resolve(container_of(peer, struct incoming, peer)); +} + +static bool incoming_add(const char *host_port, struct flow *flow, void *passthrough) { + char *host = opts_split(&host_port, '/'); + if (host) { + incoming_new(host, host_port, flow, passthrough); + free(host); + } else { + incoming_new(NULL, host_port, flow, passthrough); + } + return true; +} + +static bool incoming_listen_receive(const char *arg) { + return incoming_add(arg, receive_flow, NULL); +} + +static bool incoming_listen_send(const char *arg) { + return send_add(incoming_add, send_flow, arg); +} + +static bool incoming_listen_send_receive(const char *arg) { + return send_add(incoming_add, send_receive_flow, arg); +} + +void incoming_opts_add() { + opts_add("listen-receive", "[HOST/]PORT", incoming_listen_receive, incoming_opts); + opts_add("listen-send", "FORMAT=[HOST/]PORT", incoming_listen_send, incoming_opts); + opts_add("listen-send-receive", "FORMAT=[HOST/]PORT", incoming_listen_send_receive, incoming_opts); +} + +void incoming_init() { + opts_call(incoming_opts); } void incoming_cleanup() { @@ -149,7 +185,7 @@ void incoming_cleanup() { } } -void incoming_new(char *node, char *service, struct flow *flow, void *passthrough) { +void incoming_new(const char *node, const char *service, struct flow *flow, void *passthrough) { flow_ref_inc(flow); struct incoming *incoming = malloc(sizeof(*incoming)); diff --git a/adsbus/incoming.h b/adsbus/incoming.h index 4691c1d..785a164 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -2,5 +2,7 @@ struct flow; +void incoming_opts_add(void); +void incoming_init(void); void incoming_cleanup(void); -void incoming_new(char *, char *, struct flow *, void *); +void incoming_new(const char *, const char *, struct flow *, void *); diff --git a/adsbus/json.c b/adsbus/json.c index 98a0836..bc6f7de 100644 --- a/adsbus/json.c +++ b/adsbus/json.c @@ -1,16 +1,15 @@ #include -#include +#include #include #include #include "hex.h" #include "buf.h" +#include "log.h" #include "packet.h" #include "rand.h" #include "receive.h" -#include "send.h" #include "server.h" -#include "uuid.h" #include "json.h" @@ -27,6 +26,8 @@ struct json_parser_state { static json_t *json_prev = NULL; static struct buf json_hello_buf = BUF_INIT; +static char log_module = 'R'; // borrowing + static void json_serialize_to_buf(json_t *obj, struct buf *buf) { assert(json_dump_callback(obj, json_buf_append_callback, buf, 0) == 0); json_decref(obj); @@ -80,11 +81,11 @@ static bool json_parse_header(json_t *in, struct packet *packet, struct json_par } if (!strcmp(json_server_id, (const char *) server_id)) { - fprintf(stderr, "R %s: Attempt to receive json data from our own server ID (%s); loop!\n", packet->source_id, server_id); + LOG(packet->source_id, "Attempt to receive json data from our own server ID (%s); loop!", server_id); return false; } - fprintf(stderr, "R %s: Connected to server ID: %s\n", packet->source_id, json_server_id); + LOG(packet->source_id, "Connected to server ID: %s", json_server_id); state->mlat_timestamp_mhz = (uint16_t) mlat_timestamp_mhz; state->mlat_timestamp_max = (uint64_t) mlat_timestamp_max; diff --git a/adsbus/list.c b/adsbus/list.c index f8c2873..e37101b 100644 --- a/adsbus/list.c +++ b/adsbus/list.c @@ -1,5 +1,3 @@ -#include - #include "list.h" void list_head_init(struct list_head *head) { diff --git a/adsbus/list.h b/adsbus/list.h index fb9af51..c4f4433 100644 --- a/adsbus/list.h +++ b/adsbus/list.h @@ -1,16 +1,15 @@ #pragma once #include +#include #pragma GCC diagnostic ignored "-Wcast-align" #pragma GCC diagnostic ignored "-Wgnu-statement-expression" #pragma GCC diagnostic ignored "-Wlanguage-extension-token" -#define offset_of(type, member) ((size_t) &((type *) NULL)->member) - #define container_of(ptr, type, member) ({ \ typeof( ((type *) NULL)->member ) *__mptr = (ptr); \ - (type *)( (char *)__mptr - offset_of(type, member) );}) + (type *)( (char *)__mptr - offsetof(type, member) );}) struct list_head { struct list_head *next; diff --git a/adsbus/log.c b/adsbus/log.c new file mode 100644 index 0000000..9fabea2 --- /dev/null +++ b/adsbus/log.c @@ -0,0 +1,148 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "log.h" +#include "opts.h" +#include "peer.h" +#include "uuid.h" + +#pragma GCC diagnostic ignored "-Wformat-nonliteral" + +static FILE *log_stream = NULL; +static char *log_path = NULL; +static uint8_t log_id[UUID_LEN]; +static struct peer log_rotate_peer; +static bool log_timestamps = false; +static bool log_quiet = false; +static opts_group log_opts; + +static char log_module = 'L'; + +static void log_open() { + if (!log_path) { + return; + } + int fd = open(log_path, O_WRONLY | O_CREAT | O_APPEND | O_NOCTTY | O_CLOEXEC, S_IRUSR | S_IWUSR); + assert(fd >= 0); + assert(dup3(fd, STDERR_FILENO, O_CLOEXEC) == STDERR_FILENO); + assert(!close(fd)); +} + +static void log_rotate() { + assert(log_path); + + uint8_t old_log_id[UUID_LEN], new_log_id[UUID_LEN]; + uuid_gen(new_log_id); + LOG(log_id, "Switching to new log with ID %s at: %s", new_log_id, log_path); + memcpy(old_log_id, log_id, UUID_LEN); + memcpy(log_id, new_log_id, UUID_LEN); + log_open(); + LOG(log_id, "Log start after switch from log ID %s", old_log_id); +} + +static void log_rotate_handler(struct peer *peer) { + struct signalfd_siginfo siginfo; + assert(read(peer->fd, &siginfo, sizeof(siginfo)) == sizeof(siginfo)); + LOG(log_id, "Received signal %u; rotating logs", siginfo.ssi_signo); + log_rotate(); +} + +static bool log_set_path(const char *path) { + if (log_path) { + return false; + } + log_path = strdup(path); + assert(log_path); + return true; +} + +static bool log_enable_timestamps(const char __attribute__ ((unused)) *arg) { + log_timestamps = true; + return true; +} + +static bool log_set_quiet(const char __attribute__ ((unused)) *arg) { + log_quiet = true; + return true; +} + +void log_opts_add() { + opts_add("log-file", "PATH", log_set_path, log_opts); + opts_add("log-timestamps", NULL, log_enable_timestamps, log_opts); + opts_add("quiet", NULL, log_set_quiet, log_opts); +} + +void log_init() { + opts_call(log_opts); + log_open(); + log_stream = fdopen(STDERR_FILENO, "a"); + assert(log_stream); + setlinebuf(log_stream); + + uuid_gen(log_id); + LOG(log_id, "Log start"); +} + +void log_init_peer() { + if (log_path) { + sigset_t sigmask; + assert(!sigemptyset(&sigmask)); + assert(!sigaddset(&sigmask, SIGHUP)); + log_rotate_peer.fd = signalfd(-1, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + assert(log_rotate_peer.fd >= 0); + log_rotate_peer.event_handler = log_rotate_handler; + peer_epoll_add(&log_rotate_peer, EPOLLIN); + + assert(!sigprocmask(SIG_BLOCK, &sigmask, NULL)); + } else { + log_rotate_peer.fd = -1; + } +} + +void log_cleanup_peer() { + peer_close(&log_rotate_peer); +} + +void log_cleanup() { + LOG(log_id, "Log end"); + assert(!fclose(log_stream)); + if (log_path) { + free(log_path); + log_path = NULL; + } +} + +void log_write(char type, const char *loc, const uint8_t *id, const char *fmt, ...) { + if (log_quiet) { + return; + } + + va_list ap; + va_start(ap, fmt); + + char datetime[64] = ""; + if (log_timestamps) { + time_t now; + assert(time(&now) >= 0); + struct tm tmnow; + assert(gmtime_r(&now, &tmnow)); + assert(strftime(datetime, sizeof(datetime), "%FT%TZ ", &tmnow) > 0); + } + + assert(fprintf(log_stream, "%s[%18s] %c %s: ", datetime, loc, type, id) > 0); + assert(vfprintf(log_stream, fmt, ap) > 0); + assert(fprintf(log_stream, "\n") == 1); + va_end(ap); +} diff --git a/adsbus/log.h b/adsbus/log.h new file mode 100644 index 0000000..6494eb6 --- /dev/null +++ b/adsbus/log.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +#define LOG_STR(line) #line +#define LOG_LOC(file, line) (file ":" LOG_STR(line)) +#define LOG(id, ...) log_write((log_module), LOG_LOC(__FILE__, __LINE__), (id), __VA_ARGS__) + +void log_opts_add(void); +void log_init(void); +void log_cleanup(void); +void log_init_peer(void); +void log_cleanup_peer(void); +void log_write(char, const char *, const uint8_t *, const char *, ...) + __attribute__ ((__format__ (__printf__, 4, 5))); diff --git a/adsbus/opts.c b/adsbus/opts.c index 0b39e88..7218fdb 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -1,23 +1,91 @@ #include -#include +#include #include +#include #include -#include -#include -#include - -#include "exec.h" -#include "file.h" -#include "flow.h" -#include "incoming.h" -#include "outgoing.h" -#include "receive.h" -#include "send.h" -#include "send_receive.h" #include "opts.h" -static char *opts_split(char **arg, char delim) { +#define OPTS_MAX 128 + +static struct { + const char *arg_help; + opts_handler handler; + void *group; +} opts[OPTS_MAX]; + +static struct option opts_long[OPTS_MAX]; +static size_t opts_num = 0; +static int opts_argc; +static char **opts_argv; +static opts_group opts_group_internal; + +static void opts_print_usage() { + fprintf(stderr, + "Usage: %s [OPTION]...\n" + "\n" + "Options:\n" + , opts_argv[0]); + for (size_t i = 0; i < opts_num; i++) { + fprintf(stderr, "\t--%s%s%s\n", opts_long[i].name, opts[i].arg_help ? "=" : "", opts[i].arg_help ? opts[i].arg_help : ""); + } +} + +static bool opts_help(const char __attribute__ ((unused)) *arg) { + opts_print_usage(); + exit(EXIT_SUCCESS); +} + +void opts_init(int argc, char *argv[]) { + opts_argc = argc; + opts_argv = argv; + opts_add("help", NULL, opts_help, opts_group_internal); + + assert(opts_num < OPTS_MAX); + opts_long[opts_num].name = NULL; + opts_long[opts_num].has_arg = 0; + opts_long[opts_num].flag = NULL; + opts_long[opts_num].val = 0; + + opts_call(opts_group_internal); +} + +void opts_add(const char *name, const char *arg_help, opts_handler handler, opts_group group) { + assert(opts_num < OPTS_MAX); + opts[opts_num].arg_help = arg_help; + opts[opts_num].handler = handler; + opts[opts_num].group = group; + opts_long[opts_num].name = name; + opts_long[opts_num].has_arg = arg_help ? required_argument : no_argument; + opts_long[opts_num].flag = NULL; + opts_long[opts_num].val = 0; + opts_num++; +} + +void opts_call(opts_group group) { + optind = 1; + int opt, longindex; + while ((opt = getopt_long_only(opts_argc, opts_argv, "", opts_long, &longindex)) == 0) { + if (opts[longindex].group != group) { + continue; + } + if (!opts[longindex].handler(optarg)) { + fprintf(stderr, "Invalid option value: %s\n", opts_argv[optind - 1]); + exit(EXIT_FAILURE); + } + } + if (opt != -1) { + opts_print_usage(); + exit(EXIT_FAILURE); + } + + if (optind != opts_argc) { + fprintf(stderr, "Not a flag: %s\n", opts_argv[optind]); + exit(EXIT_FAILURE); + } +} + +char *opts_split(const char **arg, char delim) { char *split = strchr(*arg, delim); if (!split) { return NULL; @@ -26,137 +94,3 @@ static char *opts_split(char **arg, char delim) { *arg = split + 1; return ret; } - -static bool opts_add_listen(char *host_port, struct flow *flow, void *passthrough) { - char *host = opts_split(&host_port, '/'); - if (host) { - incoming_new(host, host_port, flow, passthrough); - free(host); - } else { - incoming_new(NULL, host_port, flow, passthrough); - } - return true; -} - -static bool opts_add_connect(char *host_port, struct flow *flow, void *passthrough) { - char *host = opts_split(&host_port, '/'); - if (!host) { - return false; - } - - outgoing_new(host, host_port, flow, passthrough); - free(host); - return true; -} - -static bool opts_add_file_write_int(char *path, struct flow *flow, void *passthrough) { - file_write_new(path, flow, passthrough); - return true; -} - -static bool opts_add_file_append_int(char *path, struct flow *flow, void *passthrough) { - file_append_new(path, flow, passthrough); - return true; -} - -static bool opts_add_exec(char *cmd, struct flow *flow, void *passthrough) { - exec_new(cmd, flow, passthrough); - return true; -} - -static struct serializer *opts_get_serializer(char **arg) { - char *format = opts_split(arg, '='); - if (!format) { - return NULL; - } - - struct serializer *serializer = send_get_serializer(format); - free(format); - if (!serializer) { - return NULL; - } - - return serializer; -} - -static bool opts_add_send(bool (*next)(char *, struct flow *, void *), struct flow *flow, char *arg) { - struct serializer *serializer = opts_get_serializer(&arg); - if (!serializer) { - return false; - } - return next(arg, flow, serializer); -} - -bool opts_add_connect_receive(char *arg) { - return opts_add_connect(arg, receive_flow, NULL); -} - -bool opts_add_connect_send(char *arg) { - return opts_add_send(opts_add_connect, send_flow, arg); -} - -bool opts_add_connect_send_receive(char *arg) { - return opts_add_send(opts_add_connect, send_receive_flow, arg); -} - -bool opts_add_listen_receive(char *arg) { - return opts_add_listen(arg, receive_flow, NULL); -} - -bool opts_add_listen_send(char *arg) { - return opts_add_send(opts_add_listen, send_flow, arg); -} - -bool opts_add_listen_send_receive(char *arg) { - return opts_add_send(opts_add_listen, send_receive_flow, arg); -} - -bool opts_add_file_read(char *arg) { - file_read_new(arg, receive_flow, NULL); - return true; -} - -bool opts_add_file_write(char *arg) { - return opts_add_send(opts_add_file_write_int, send_flow, arg); -} - -bool opts_add_file_write_read(char *arg) { - return opts_add_send(opts_add_file_write_int, send_receive_flow, arg); -} - -bool opts_add_file_append(char *arg) { - return opts_add_send(opts_add_file_append_int, send_flow, arg); -} - -bool opts_add_file_append_read(char *arg) { - return opts_add_send(opts_add_file_append_int, send_receive_flow, arg); -} - -bool opts_add_exec_receive(char *arg) { - exec_new(arg, receive_flow, NULL); - return true; -} - -bool opts_add_exec_send(char *arg) { - return opts_add_send(opts_add_exec, send_flow, arg); -} - -bool opts_add_exec_send_receive(char *arg) { - return opts_add_send(opts_add_exec, send_receive_flow, arg); -} - -bool opts_add_stdin(char __attribute__((unused)) *arg) { - int fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 0); - assert(fd >= 0); - return flow_new_send_hello(fd, receive_flow, NULL, NULL); -} - -bool opts_add_stdout(char *arg) { - struct serializer *serializer = send_get_serializer(arg); - if (!serializer) { - return false; - } - int fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 0); - assert(fd >= 0); - return flow_new_send_hello(fd, send_flow, serializer, NULL); -} diff --git a/adsbus/opts.h b/adsbus/opts.h index 3c03ce8..4f832ae 100644 --- a/adsbus/opts.h +++ b/adsbus/opts.h @@ -2,19 +2,10 @@ #include -bool opts_add_connect_receive(char *); -bool opts_add_connect_send(char *); -bool opts_add_connect_send_receive(char *); -bool opts_add_listen_receive(char *); -bool opts_add_listen_send(char *); -bool opts_add_listen_send_receive(char *); -bool opts_add_file_read(char *); -bool opts_add_file_write(char *); -bool opts_add_file_write_read(char *); -bool opts_add_file_append(char *); -bool opts_add_file_append_read(char *); -bool opts_add_exec_receive(char *); -bool opts_add_exec_send(char *); -bool opts_add_exec_send_receive(char *); -bool opts_add_stdout(char *); -bool opts_add_stdin(char *); +typedef bool (*opts_handler)(const char *); +typedef char opts_group[1]; + +void opts_init(int, char *[]); +void opts_add(const char *, const char *, opts_handler, opts_group); +void opts_call(opts_group); +char *opts_split(const char **, char); diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 6ccf25b..a868579 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -1,19 +1,22 @@ -#include -#include #include -#include -#include -#include #include +#include +#include #include +#include +#include +#include #include #include "buf.h" #include "flow.h" -#include "list.h" +#include "log.h" +#include "opts.h" #include "peer.h" +#include "receive.h" #include "resolve.h" -#include "socket.h" +#include "send.h" +#include "send_receive.h" #include "wakeup.h" #include "uuid.h" @@ -33,29 +36,33 @@ struct outgoing { }; static struct list_head outgoing_head = LIST_HEAD_INIT(outgoing_head); +static opts_group outgoing_opts; + +static char log_module = 'O'; static void outgoing_connect_result(struct outgoing *, int); static void outgoing_resolve(struct outgoing *); static void outgoing_resolve_wrapper(struct peer *); static void outgoing_retry(struct outgoing *outgoing) { + outgoing->peer.fd = -1; uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt); - fprintf(stderr, "O %s: Will retry in %ds\n", outgoing->id, delay / 1000); + LOG(outgoing->id, "Will retry in %ds", delay / 1000); outgoing->peer.event_handler = outgoing_resolve_wrapper; - wakeup_add((struct peer *) outgoing, delay); + wakeup_add(&outgoing->peer, delay); } static void outgoing_connect_next(struct outgoing *outgoing) { if (outgoing->addr == NULL) { freeaddrinfo(outgoing->addrs); - fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service); + LOG(outgoing->id, "Can't connect to any addresses of %s/%s", outgoing->node, outgoing->service); outgoing_retry(outgoing); return; } char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); - fprintf(stderr, "O %s: Connecting to %s/%s...\n", outgoing->id, hbuf, sbuf); + LOG(outgoing->id, "Connecting to %s/%s...", hbuf, sbuf); outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC, outgoing->addr->ai_protocol); assert(outgoing->peer.fd >= 0); @@ -67,7 +74,7 @@ static void outgoing_connect_next(struct outgoing *outgoing) { } static void outgoing_connect_handler(struct peer *peer) { - struct outgoing *outgoing = (struct outgoing *) peer; + struct outgoing *outgoing = container_of(peer, struct outgoing, peer); peer_epoll_del(&outgoing->peer); @@ -78,11 +85,8 @@ static void outgoing_connect_handler(struct peer *peer) { } static void outgoing_disconnect_handler(struct peer *peer) { - struct outgoing *outgoing = (struct outgoing *) peer; - if (outgoing->peer.fd != -1) { - assert(!close(outgoing->peer.fd)); - } - fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id); + struct outgoing *outgoing = container_of(peer, struct outgoing, peer); + LOG(outgoing->id, "Peer disconnected; reconnecting..."); outgoing_retry(outgoing); } @@ -91,7 +95,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); switch (result) { case 0: - fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); + LOG(outgoing->id, "Connected to %s/%s", hbuf, sbuf); freeaddrinfo(outgoing->addrs); outgoing->attempt = 0; int fd = outgoing->peer.fd; @@ -99,18 +103,17 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { outgoing->peer.event_handler = outgoing_disconnect_handler; flow_socket_ready(fd, outgoing->flow); flow_socket_connected(fd, outgoing->flow); - flow_new(fd, outgoing->flow, outgoing->passthrough, (struct peer *) outgoing); + flow_new(fd, outgoing->flow, outgoing->passthrough, &outgoing->peer); break; case EINPROGRESS: outgoing->peer.event_handler = outgoing_connect_handler; - peer_epoll_add((struct peer *) outgoing, EPOLLOUT); + peer_epoll_add(&outgoing->peer, EPOLLOUT); break; default: - fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result)); + LOG(outgoing->id, "Can't connect to %s/%s: %s", hbuf, sbuf, strerror(result)); assert(!close(outgoing->peer.fd)); - outgoing->peer.fd = -1; outgoing->addr = outgoing->addr->ai_next; // Tail recursion :/ outgoing_connect_next(outgoing); @@ -119,10 +122,10 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { } static void outgoing_resolve_handler(struct peer *peer) { - struct outgoing *outgoing = (struct outgoing *) peer; + struct outgoing *outgoing = container_of(peer, struct outgoing, peer); int err = resolve_result(peer, &outgoing->addrs); if (err) { - fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(err)); + LOG(outgoing->id, "Failed to resolve %s/%s: %s", outgoing->node, outgoing->service, gai_strerror(err)); outgoing_retry(outgoing); } else { outgoing->addr = outgoing->addrs; @@ -131,26 +134,57 @@ static void outgoing_resolve_handler(struct peer *peer) { } static void outgoing_resolve(struct outgoing *outgoing) { - fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service); + LOG(outgoing->id, "Resolving %s/%s...", outgoing->node, outgoing->service); outgoing->peer.event_handler = outgoing_resolve_handler; - resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0); + resolve(&outgoing->peer, outgoing->node, outgoing->service, 0); } static void outgoing_resolve_wrapper(struct peer *peer) { - outgoing_resolve((struct outgoing *) peer); + outgoing_resolve(container_of(peer, struct outgoing, peer)); } static void outgoing_del(struct outgoing *outgoing) { flow_ref_dec(outgoing->flow); - if (outgoing->peer.fd >= 0) { - assert(!close(outgoing->peer.fd)); - } + peer_close(&outgoing->peer); list_del(&outgoing->outgoing_list); free(outgoing->node); free(outgoing->service); free(outgoing); } +static bool outgoing_add(const char *host_port, struct flow *flow, void *passthrough) { + char *host = opts_split(&host_port, '/'); + if (!host) { + return false; + } + + outgoing_new(host, host_port, flow, passthrough); + free(host); + return true; +} + +static bool outgoing_connect_receive(const char *arg) { + return outgoing_add(arg, receive_flow, NULL); +} + +static bool outgoing_connect_send(const char *arg) { + return send_add(outgoing_add, send_flow, arg); +} + +static bool outgoing_connect_send_receive(const char *arg) { + return send_add(outgoing_add, send_receive_flow, arg); +} + +void outgoing_opts_add() { + opts_add("connect-receive", "HOST/PORT", outgoing_connect_receive, outgoing_opts); + opts_add("connect-send", "FORMAT=HOST/PORT", outgoing_connect_send, outgoing_opts); + opts_add("connect-send-receive", "FORMAT=HOST/PORT", outgoing_connect_send_receive, outgoing_opts); +} + +void outgoing_init() { + opts_call(outgoing_opts); +} + void outgoing_cleanup() { struct outgoing *iter, *next; list_for_each_entry_safe(iter, next, &outgoing_head, outgoing_list) { @@ -158,7 +192,7 @@ void outgoing_cleanup() { } } -void outgoing_new(char *node, char *service, struct flow *flow, void *passthrough) { +void outgoing_new(const char *node, const char *service, struct flow *flow, void *passthrough) { flow_ref_inc(flow); struct outgoing *outgoing = malloc(sizeof(*outgoing)); diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index c39fdc5..4333dd0 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -2,5 +2,7 @@ struct flow; +void outgoing_opts_add(void); +void outgoing_init(void); void outgoing_cleanup(void); -void outgoing_new(char *, char *, struct flow *, void *); +void outgoing_new(const char *, const char *, struct flow *, void *); diff --git a/adsbus/packet.c b/adsbus/packet.c index 6232e81..3dfd461 100644 --- a/adsbus/packet.c +++ b/adsbus/packet.c @@ -1,7 +1,6 @@ #pragma GCC diagnostic ignored "-Wtautological-constant-out-of-range-compare" #include -#include #include #include "uuid.h" diff --git a/adsbus/peer.c b/adsbus/peer.c index 3c8f70e..067f533 100644 --- a/adsbus/peer.c +++ b/adsbus/peer.c @@ -1,61 +1,61 @@ #include #include -#include #include #include -#include #include -#include +#include +#include +#include +#include +#include #include +#include "log.h" #include "server.h" -#include "uuid.h" -#include "wakeup.h" #include "peer.h" +static char log_module = 'X'; + uint32_t peer_count_in = 0, peer_count_out = 0, peer_count_out_in = 0; static int peer_epoll_fd; -static int peer_shutdown_fd; +static struct peer peer_shutdown_peer; static bool peer_shutdown_flag = false; static struct list_head peer_always_trigger_head = LIST_HEAD_INIT(peer_always_trigger_head); -static void peer_shutdown_handler(struct peer *peer) { - fprintf(stderr, "X %s: Shutting down\n", server_id); - assert(!close(peer->fd)); - free(peer); +static void peer_shutdown() { + peer_close(&peer_shutdown_peer); peer_shutdown_flag = true; } +static void peer_shutdown_handler(struct peer *peer) { + struct signalfd_siginfo siginfo; + assert(read(peer->fd, &siginfo, sizeof(siginfo)) == sizeof(siginfo)); + LOG(server_id, "Received signal %u; shutting down", siginfo.ssi_signo); + peer_shutdown(); +} + void peer_init() { peer_epoll_fd = epoll_create1(EPOLL_CLOEXEC); assert(peer_epoll_fd >= 0); - int shutdown_fds[2]; - assert(!pipe2(shutdown_fds, O_CLOEXEC)); + sigset_t sigmask; + assert(!sigemptyset(&sigmask)); + assert(!sigaddset(&sigmask, SIGINT)); + assert(!sigaddset(&sigmask, SIGTERM)); + peer_shutdown_peer.fd = signalfd(-1, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + assert(peer_shutdown_peer.fd >= 0); + peer_shutdown_peer.event_handler = peer_shutdown_handler; + peer_epoll_add(&peer_shutdown_peer, EPOLLIN); - struct peer *shutdown_peer = malloc(sizeof(*shutdown_peer)); - assert(shutdown_peer); - shutdown_peer->fd = shutdown_fds[0]; - shutdown_peer->event_handler = peer_shutdown_handler; - peer_epoll_add(shutdown_peer, EPOLLRDHUP); - - peer_shutdown_fd = shutdown_fds[1]; - signal(SIGINT, peer_shutdown); + assert(!sigprocmask(SIG_BLOCK, &sigmask, NULL)); } void peer_cleanup() { assert(!close(peer_epoll_fd)); } -void peer_shutdown(int __attribute__((unused)) signal) { - if (peer_shutdown_fd != -1) { - assert(!close(peer_shutdown_fd)); - peer_shutdown_fd = -1; - } -} - void peer_epoll_add(struct peer *peer, uint32_t events) { struct epoll_event ev = { .events = events, @@ -87,6 +87,15 @@ void peer_epoll_del(struct peer *peer) { } } +void peer_close(struct peer *peer) { + if (peer->fd == -1) { + return; + } + peer_epoll_del(peer); + assert(!close(peer->fd)); + peer->fd = -1; +} + void peer_call(struct peer *peer) { if (peer_shutdown_flag || !peer) { return; @@ -95,18 +104,20 @@ void peer_call(struct peer *peer) { } void peer_loop() { - fprintf(stderr, "X %s: Starting event loop\n", server_id); + LOG(server_id, "Starting event loop"); while (!peer_shutdown_flag) { if (!(peer_count_in + peer_count_out_in)) { - fprintf(stderr, "X %s: No remaining inputs\n", server_id); - peer_shutdown(0); + LOG(server_id, "No remaining inputs"); + peer_shutdown(); + break; } else if (!(peer_count_out + peer_count_out_in)) { - fprintf(stderr, "X %s: No remaining outputs\n", server_id); - peer_shutdown(0); + LOG(server_id, "No remaining outputs"); + peer_shutdown(); + break; } #define MAX_EVENTS 10 struct epoll_event events[MAX_EVENTS]; - int delay = list_is_empty(&peer_always_trigger_head) ? wakeup_get_delay() : 0; + int delay = list_is_empty(&peer_always_trigger_head) ? -1 : 0; int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, delay); if (nfds == -1 && errno == EINTR) { continue; @@ -124,7 +135,5 @@ void peer_loop() { peer_call(iter); } } - - wakeup_dispatch(); } } diff --git a/adsbus/peer.h b/adsbus/peer.h index 1b380ea..9b39f94 100644 --- a/adsbus/peer.h +++ b/adsbus/peer.h @@ -18,8 +18,8 @@ extern uint32_t peer_count_in, peer_count_out, peer_count_out_in; void peer_init(void); void peer_cleanup(void); -void peer_shutdown(int signal); void peer_epoll_add(struct peer *, uint32_t); void peer_epoll_del(struct peer *); +void peer_close(struct peer *); void peer_call(struct peer *); void peer_loop(void); diff --git a/adsbus/proto.c b/adsbus/proto.c index d85cc25..36a6cf9 100644 --- a/adsbus/proto.c +++ b/adsbus/proto.c @@ -1,19 +1,17 @@ -#pragma GCC diagnostic ignored "-Wcast-qual" -#pragma GCC diagnostic ignored "-Wpacked" - -#include -#include -#include +#include #include #include "buf.h" +#include "log.h" #include "packet.h" #include "server.h" -#include "uuid.h" #include "adsb.pb-c.h" #include "proto.h" +#pragma GCC diagnostic ignored "-Wcast-qual" +#pragma GCC diagnostic ignored "-Wpacked" + #define PROTO_MAGIC "aDsB" struct proto_parser_state { @@ -24,6 +22,8 @@ struct proto_parser_state { bool have_header; }; +static char log_module = 'R'; // borrowing + static Adsb *proto_prev = NULL; static struct buf proto_hello_buf = BUF_INIT; @@ -145,12 +145,12 @@ static bool proto_parse_header(AdsbHeader *header, struct packet *packet, struct state->rssi_max = header->rssi_max; if (!strcmp(header->server_id, (const char *) server_id)) { - fprintf(stderr, "R %s: Attempt to receive proto data from our own server ID (%s); loop!\n", packet->source_id, server_id); + LOG(packet->source_id, "Attempt to receive proto data from our own server ID (%s); loop!", server_id); return false; } state->have_header = true; - fprintf(stderr, "R %s: Connected to server ID: %s\n", packet->source_id, header->server_id); + LOG(packet->source_id, "Connected to server ID: %s", header->server_id); return true; } diff --git a/adsbus/rand.c b/adsbus/rand.c index 82d6563..b4d27f1 100644 --- a/adsbus/rand.c +++ b/adsbus/rand.c @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -16,7 +15,7 @@ static struct buf rand_buf = BUF_INIT; static int rand_fd; void rand_init() { - rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC); + rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC | O_NOCTTY); assert(rand_fd >= 0); assert(read(rand_fd, buf_at(&rand_buf, 0), BUF_LEN_MAX) == BUF_LEN_MAX); rand_buf.length = BUF_LEN_MAX; diff --git a/adsbus/raw.c b/adsbus/raw.c index bcceb3d..b176b06 100644 --- a/adsbus/raw.c +++ b/adsbus/raw.c @@ -1,11 +1,8 @@ #include -#include -#include #include "buf.h" #include "hex.h" #include "packet.h" -#include "uuid.h" #include "raw.h" diff --git a/adsbus/receive.c b/adsbus/receive.c index 8b34095..189e6b6 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -1,18 +1,16 @@ #include -#include #include #include #include -#include #include -#include +#include #include "airspy_adsb.h" #include "beast.h" #include "buf.h" #include "flow.h" #include "json.h" -#include "list.h" +#include "log.h" #include "packet.h" #include "peer.h" #include "proto.h" @@ -39,6 +37,8 @@ struct receive { }; static struct list_head receive_head = LIST_HEAD_INIT(receive_head); +static char log_module = 'R'; + static void receive_new(int, void *, struct peer *); static struct flow _receive_flow = { @@ -92,7 +92,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac for (size_t i = 0; i < NUM_PARSERS; i++) { if (parsers[i].parse(buf, packet, state)) { - fprintf(stderr, "R %s: Detected input format %s\n", receive->id, parsers[i].name); + LOG(receive->id, "Detected input format: %s", parsers[i].name); receive->parser_wrapper = receive_parse_wrapper; receive->parser = parsers[i].parse; return true; @@ -105,17 +105,16 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac } static void receive_del(struct receive *receive) { - fprintf(stderr, "R %s: Connection closed\n", receive->id); + LOG(receive->id, "Connection closed"); peer_count_in--; - peer_epoll_del((struct peer *) receive); - assert(!close(receive->peer.fd)); + peer_close(&receive->peer); list_del(&receive->receive_list); peer_call(receive->on_close); free(receive); } static void receive_read(struct peer *peer) { - struct receive *receive = (struct receive *) peer; + struct receive *receive = container_of(peer, struct receive, peer); if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { receive_del(receive); @@ -134,14 +133,14 @@ static void receive_read(struct peer *peer) { continue; } if (++packet.hops > receive_max_hops) { - fprintf(stderr, "R %s: Packet exceeded hop limit (%u > %u); dropping. You may have a loop in your configuration.\n", receive->id, packet.hops, receive_max_hops); + LOG(receive->id, "Packet exceeded hop limit (%u > %u); dropping. You may have a loop in your configuration.", packet.hops, receive_max_hops); continue; } send_write(&packet); } if (receive->buf.length == BUF_LEN_MAX) { - fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id); + LOG(receive->id, "Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking."); receive_del(receive); return; } @@ -164,9 +163,9 @@ static void receive_new(int fd, void __attribute__((unused)) *passthrough, struc list_add(&receive->receive_list, &receive_head); - peer_epoll_add((struct peer *) receive, EPOLLIN); + peer_epoll_add(&receive->peer, EPOLLIN); - fprintf(stderr, "R %s: New receive connection\n", receive->id); + LOG(receive->id, "New receive connection"); } void receive_init() { diff --git a/adsbus/resolve.c b/adsbus/resolve.c index 35d4864..57f05c6 100644 --- a/adsbus/resolve.c +++ b/adsbus/resolve.c @@ -1,5 +1,3 @@ -#include -#include #include #include "asyncaddrinfo.h" diff --git a/adsbus/send.c b/adsbus/send.c index 5522442..77b0732 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -1,7 +1,5 @@ #include -#include #include -#include #include #include #include @@ -15,7 +13,8 @@ #include "buf.h" #include "flow.h" #include "json.h" -#include "list.h" +#include "log.h" +#include "opts.h" #include "packet.h" #include "peer.h" #include "proto.h" @@ -47,6 +46,8 @@ static struct flow _send_flow = { }; struct flow *send_flow = &_send_flow; +static char log_module = 'S'; + typedef void (*serialize)(struct packet *, struct buf *); typedef void (*hello)(struct buf **); static struct serializer { @@ -89,17 +90,16 @@ static struct serializer { #define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) static void send_del(struct send *send) { - fprintf(stderr, "S %s (%s): Connection closed\n", send->id, send->serializer->name); + LOG(send->id, "Connection closed"); peer_count_out--; - peer_epoll_del((struct peer *) send); - assert(!close(send->peer.fd)); + peer_close(&send->peer); list_del(&send->send_list); peer_call(send->on_close); free(send); } static void send_del_wrapper(struct peer *peer) { - send_del((struct send *) peer); + send_del(container_of(peer, struct send, peer)); } static void send_new(int fd, void *passthrough, struct peer *on_close) { @@ -119,9 +119,24 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) { list_add(&send->send_list, &serializer->send_head); - peer_epoll_add((struct peer *) send, 0); + peer_epoll_add(&send->peer, 0); - fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name); + LOG(send->id, "New send connection: %s", serializer->name); +} + +static struct serializer *send_parse_format(const char **arg) { + char *format = opts_split(arg, '='); + if (!format) { + return NULL; + } + + struct serializer *serializer = send_get_serializer(format); + free(format); + if (!serializer) { + return NULL; + } + + return serializer; } void send_init() { @@ -140,7 +155,7 @@ void send_cleanup() { } } -void *send_get_serializer(char *name) { +void *send_get_serializer(const char *name) { for (size_t i = 0; i < NUM_SERIALIZERS; i++) { if (strcasecmp(serializers[i].name, name) == 0) { return &serializers[i]; @@ -177,8 +192,8 @@ void send_write(struct packet *packet) { } if (write(iter->peer.fd, buf_at(&buf, 0), buf.length) != (ssize_t) buf.length) { // peer_loop() will see this shutdown and call send_del - int res = shutdown(iter->peer.fd, SHUT_WR); - assert(res == 0 || (res == -1 && errno == ENOTSOCK)); + // Ignore error + shutdown(iter->peer.fd, SHUT_WR); } } } @@ -190,3 +205,11 @@ void send_print_usage() { fprintf(stderr, "\t%s\n", serializers[i].name); } } + +bool send_add(bool (*next)(const char *, struct flow *, void *), struct flow *flow, const char *arg) { + struct serializer *serializer = send_parse_format(&arg); + if (!serializer) { + return false; + } + return next(arg, flow, serializer); +} diff --git a/adsbus/send.h b/adsbus/send.h index a3c380f..31d6752 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -1,13 +1,16 @@ #pragma once +#include + struct buf; struct flow; struct packet; void send_init(void); void send_cleanup(void); -void *send_get_serializer(char *); +void *send_get_serializer(const char *); void send_get_hello(struct buf **, void *); void send_write(struct packet *); void send_print_usage(void); +bool send_add(bool (*)(const char *, struct flow *, void *), struct flow *, const char *); extern struct flow *send_flow; diff --git a/adsbus/send_receive.c b/adsbus/send_receive.c index 0285cf2..6e72b23 100644 --- a/adsbus/send_receive.c +++ b/adsbus/send_receive.c @@ -1,10 +1,8 @@ #include #include #include -#include #include "flow.h" -#include "list.h" #include "peer.h" #include "receive.h" #include "send.h" @@ -37,7 +35,7 @@ static void send_receive_del(struct send_receive *send_receive) { } static void send_receive_on_close(struct peer *peer) { - struct send_receive *send_receive = (struct send_receive *) peer; + struct send_receive *send_receive = container_of(peer, struct send_receive, peer); if (!--(send_receive->ref_count)) { send_receive_del(send_receive); @@ -54,10 +52,10 @@ static void send_receive_new(int fd, void *passthrough, struct peer *on_close) { send_receive->ref_count = 2; list_add(&send_receive->send_receive_list, &send_receive_head); - flow_new(fd, send_flow, passthrough, on_close); + flow_new(fd, send_flow, passthrough, &send_receive->peer); int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0); assert(fd2 >= 0); - flow_new(fd2, receive_flow, NULL, on_close); + flow_new(fd2, receive_flow, NULL, &send_receive->peer); } void send_receive_cleanup() { diff --git a/adsbus/server.c b/adsbus/server.c index 288996f..4978c39 100644 --- a/adsbus/server.c +++ b/adsbus/server.c @@ -1,13 +1,85 @@ +#include #include +#include +#include +#include +#include +#include +#include + +#include "build.h" +#include "log.h" +#include "opts.h" #include "uuid.h" #include "server.h" +#pragma GCC diagnostic ignored "-Wdate-time" + uint8_t server_id[UUID_LEN]; char server_version[] = "https://github.com/flamingcowtv/adsb-tools#1"; +static opts_group server_opts1, server_opts2; + +static char log_module = 'X'; + +static bool server_detach(const char __attribute__ ((unused)) *arg) { + LOG(server_id, "Detaching"); + + int ret = fork(); + assert(ret >= 0); + if (ret > 0) { + // We are the parent + exit(EXIT_SUCCESS); + } + + LOG(server_id, "Background process ID: %u", getpid()); + + assert(setsid() >= 0); + + return true; +} + +static bool server_pid_file(const char *path) { + pid_t pid = getpid(); + LOG(server_id, "Writing process ID %d to pid-file: %s", pid, path); + FILE *fh = fopen(path, "w"); + assert(fh); + assert(fprintf(fh, "%d\n", pid) > 0); + assert(!fclose(fh)); + return true; +} + +void server_opts_add() { + opts_add("detach", NULL, server_detach, server_opts1); + opts_add("pid-file", "PATH", server_pid_file, server_opts2); +} void server_init() { uuid_gen(server_id); - fprintf(stderr, "X %s: Server start\n", server_id); + LOG(server_id, "Server start"); + LOG(server_id, "Build data:"); + LOG(server_id, "\tgit_last_change: %s", GIT_LAST_CHANGE); + LOG(server_id, "\tgit_local_clean: %s", GIT_LOCAL_CLEAN ? "true" : "false"); + LOG(server_id, "\tclang_version: %s", __clang_version__); + LOG(server_id, "\tglibc_version: %d.%d", __GLIBC__, __GLIBC_MINOR__); + LOG(server_id, "\tjansson_version: %s", JANSSON_VERSION); + LOG(server_id, "\tprotobuf-c_version: %s", PROTOBUF_C_VERSION); + LOG(server_id, "\tdatetime: %s", __DATE__ " " __TIME__); + LOG(server_id, "\tusername: %s", USERNAME); + LOG(server_id, "\thostname: %s", HOSTNAME); + + LOG(server_id, "Runtime data:"); + struct utsname utsname; + assert(!uname(&utsname)); + LOG(server_id, "\tusername: %s", getlogin()); + LOG(server_id, "\thostname: %s", utsname.nodename); + LOG(server_id, "\tprocess_id: %d", getpid()); + LOG(server_id, "\tsystem: %s", utsname.sysname); + LOG(server_id, "\trelease: %s", utsname.release); + LOG(server_id, "\tversion: %s", utsname.version); + LOG(server_id, "\tmachine: %s", utsname.machine); + + opts_call(server_opts1); + opts_call(server_opts2); } diff --git a/adsbus/server.h b/adsbus/server.h index b5fcfc8..8e9e889 100644 --- a/adsbus/server.h +++ b/adsbus/server.h @@ -3,4 +3,5 @@ extern uint8_t server_id[]; extern char server_version[]; +void server_opts_add(void); void server_init(void); diff --git a/adsbus/socket.c b/adsbus/socket.c index 207460e..7782ded 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -2,8 +2,9 @@ #include #include #include -#include #include +#include +#include #include "socket.h" diff --git a/adsbus/stdinout.c b/adsbus/stdinout.c new file mode 100644 index 0000000..c1c7f38 --- /dev/null +++ b/adsbus/stdinout.c @@ -0,0 +1,37 @@ +#include +#include +#include + +#include "flow.h" +#include "opts.h" +#include "receive.h" +#include "send.h" + +#include "stdinout.h" + +static opts_group stdinout_opts; + +static bool stdinout_stdin(const char __attribute__((unused)) *arg) { + int fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 0); + assert(fd >= 0); + return flow_new_send_hello(fd, receive_flow, NULL, NULL); +} + +static bool stdinout_stdout(const char *arg) { + struct serializer *serializer = send_get_serializer(arg); + if (!serializer) { + return false; + } + int fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 0); + assert(fd >= 0); + return flow_new_send_hello(fd, send_flow, serializer, NULL); +} + +void stdinout_opts_add() { + opts_add("stdin", NULL, stdinout_stdin, stdinout_opts); + opts_add("stdout", "FORMAT", stdinout_stdout, stdinout_opts); +} + +void stdinout_init() { + opts_call(stdinout_opts); +} diff --git a/adsbus/stdinout.h b/adsbus/stdinout.h new file mode 100644 index 0000000..e1291f4 --- /dev/null +++ b/adsbus/stdinout.h @@ -0,0 +1,4 @@ +#pragma once + +void stdinout_opts_add(void); +void stdinout_init(void); diff --git a/adsbus/uuid.c b/adsbus/uuid.c index 792bf49..b5c0bca 100644 --- a/adsbus/uuid.c +++ b/adsbus/uuid.c @@ -1,5 +1,3 @@ -#include - #include "hex.h" #include "rand.h" diff --git a/adsbus/wakeup.c b/adsbus/wakeup.c index ed89abd..1dfb5b9 100644 --- a/adsbus/wakeup.c +++ b/adsbus/wakeup.c @@ -1,91 +1,72 @@ #include -#include -#include -#include #include -#include -#include -#include -#include -#include +#include #include -#include "list.h" #include "peer.h" #include "rand.h" #include "wakeup.h" -struct wakeup_entry { - int fd; - uint64_t absolute_time_ms; - struct peer *peer; +struct wakeup { + struct peer peer; + struct peer *inner_peer; struct list_head wakeup_list; }; static struct list_head wakeup_head = LIST_HEAD_INIT(wakeup_head); -static uint64_t wakeup_get_time_ms() { - struct timespec tp; - assert(!clock_gettime(CLOCK_MONOTONIC_COARSE, &tp)); -#define MS_PER_S UINT64_C(1000) -#define NS_PER_MS UINT64_C(1000000) - assert(tp.tv_sec >= 0); - assert(tp.tv_nsec >= 0); - return ((uint64_t) tp.tv_sec * MS_PER_S) + ((uint64_t) tp.tv_nsec / NS_PER_MS); +static void wakeup_del(struct wakeup *wakeup) { + peer_close(&wakeup->peer); + list_del(&wakeup->wakeup_list); + free(wakeup); +} + +static void wakeup_handler(struct peer *peer) { + struct wakeup *wakeup = container_of(peer, struct wakeup, peer); + + uint64_t events; + assert(read(wakeup->peer.fd, &events, sizeof(events)) == sizeof(events)); + assert(events == 1); + + peer_call(wakeup->inner_peer); + wakeup_del(wakeup); } void wakeup_init() { } void wakeup_cleanup() { - struct wakeup_entry *iter, *next; + struct wakeup *iter, *next; list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) { - free(iter); - } -} - -int wakeup_get_delay() { - if (list_is_empty(&wakeup_head)) { - return -1; - } - uint64_t now = wakeup_get_time_ms(); - struct wakeup_entry *next_to_fire = list_entry(wakeup_head.next, struct wakeup_entry, wakeup_list); - if (next_to_fire->absolute_time_ms > now) { - uint64_t delta = next_to_fire->absolute_time_ms - now; - assert(delta < INT_MAX); - return (int) delta; - } else { - return 0; - } -} - -void wakeup_dispatch() { - uint64_t now = wakeup_get_time_ms(); - struct wakeup_entry *iter, *next; - list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) { - if (iter->absolute_time_ms > now) { - break; - } - peer_call(iter->peer); - list_del(&iter->wakeup_list); - free(iter); + wakeup_del(iter); } } void wakeup_add(struct peer *peer, uint32_t delay_ms) { - struct wakeup_entry *entry = malloc(sizeof(*entry)); - entry->absolute_time_ms = wakeup_get_time_ms() + delay_ms; - entry->peer = peer; + struct wakeup *wakeup = malloc(sizeof(*wakeup)); + wakeup->inner_peer = peer; + list_add(&wakeup->wakeup_list, &wakeup_head); - struct wakeup_entry *iter, *next; - list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) { - if (iter->absolute_time_ms > entry->absolute_time_ms) { - list_add(&entry->wakeup_list, &iter->wakeup_list); - return; - } - } - list_add(&entry->wakeup_list, &wakeup_head); + wakeup->peer.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + assert(wakeup->peer.fd >= 0); + +#define MS_PER_S UINT64_C(1000) +#define NS_PER_MS UINT64_C(1000000) + const struct itimerspec ts = { + .it_interval = { + .tv_sec = 0, + .tv_nsec = 0, + }, + .it_value = { + .tv_sec = delay_ms / MS_PER_S, + .tv_nsec = (delay_ms % MS_PER_S) * NS_PER_MS, + }, + }; + assert(!timerfd_settime(wakeup->peer.fd, 0, &ts, NULL)); + + wakeup->peer.event_handler = wakeup_handler; + peer_epoll_add(&wakeup->peer, EPOLLIN); } #define RETRY_MIN_MS 2000 diff --git a/adsbus/wakeup.h b/adsbus/wakeup.h index 77ffb3b..c11f7e3 100644 --- a/adsbus/wakeup.h +++ b/adsbus/wakeup.h @@ -6,7 +6,5 @@ struct peer; void wakeup_init(void); void wakeup_cleanup(void); -int wakeup_get_delay(void); -void wakeup_dispatch(void); void wakeup_add(struct peer *, uint32_t); uint32_t wakeup_get_retry_delay_ms(uint32_t);