Merge branch 'master' of github.com:flamingcowtv/adsb-tools

This commit is contained in:
Ian Gulliver
2016-03-09 13:29:41 -08:00
40 changed files with 904 additions and 604 deletions

3
adsbus/.gitignore vendored
View File

@@ -33,3 +33,6 @@
# Binaries # Binaries
adsbus adsbus
# Generated
build.h

View File

@@ -10,20 +10,26 @@ VALGRIND ?= valgrind
VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show-leak-kinds=all --leak-check=full
ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats
OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o stdinout.o
OBJ_FLOW = flow.o receive.o send.o send_receive.o OBJ_FLOW = flow.o receive.o send.o send_receive.o
OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o
OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o log.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o
OBJ_PROTO = adsb.pb-c.o OBJ_PROTO = adsb.pb-c.o
all: adsbus all: adsbus
clean: clean:
rm -rf *.o adsbus testout findings rm -rf *.o adsbus testout findings build.h
%.o: %.c *.h %.o: %.c *.h build.h
$(COMP) -c $(CFLAGS) $< -o $@ $(COMP) -c $(CFLAGS) $< -o $@
build.h:
echo "#define GIT_LAST_CHANGE \"$$(git log --format=%H -n 1)\"" > $@
echo "#define GIT_LOCAL_CLEAN $$(git diff --exit-code > /dev/null && echo true || echo false)" >> $@
echo "#define HOSTNAME \"$$(hostname --fqdn)\"" >> $@
echo "#define USERNAME \"$$(whoami)\"" >> $@
adsb.pb-c.c: ../proto/adsb.proto adsb.pb-c.c: ../proto/adsb.proto
protoc-c --c_out=./ --proto_path=$(dir $<) $< protoc-c --c_out=./ --proto_path=$(dir $<) $<

View File

@@ -1,9 +1,6 @@
#include <assert.h> #include <assert.h>
#include <fcntl.h> #include <fcntl.h>
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
@@ -14,6 +11,7 @@
#include "hex.h" #include "hex.h"
#include "incoming.h" #include "incoming.h"
#include "json.h" #include "json.h"
#include "log.h"
#include "opts.h" #include "opts.h"
#include "outgoing.h" #include "outgoing.h"
#include "peer.h" #include "peer.h"
@@ -25,164 +23,43 @@
#include "send_receive.h" #include "send_receive.h"
#include "server.h" #include "server.h"
#include "stats.h" #include "stats.h"
#include "stdinout.h"
#include "wakeup.h" #include "wakeup.h"
static void print_usage(const char *name) {
fprintf(stderr,
"\n"
"Usage: %s [OPTION]...\n"
"\n"
"Options:\n"
"\t--help\n"
"\t--connect-receive=HOST/PORT\n"
"\t--connect-send=FORMAT=HOST/PORT\n"
"\t--connect-send-receive=FORMAT=HOST/PORT\n"
"\t--listen-receive=[HOST/]PORT\n"
"\t--listen-send=FORMAT=[HOST/]PORT\n"
"\t--listen-send-receive=FORMAT=[HOST/]PORT\n"
"\t--file-read=PATH\n"
"\t--file-write=FORMAT=PATH\n"
"\t--file-write-read=FORMAT=PATH\n"
"\t--file-append=FORMAT=PATH\n"
"\t--file-append-read=FORMAT=PATH\n"
"\t--exec-receive=COMMAND\n"
"\t--exec-send=FORMAT=COMMAND\n"
"\t--exec-send-receive=FORMAT=COMMAND\n"
"\t--stdin\n"
"\t--stdout=FORMAT\n"
, name);
receive_print_usage();
send_print_usage();
}
static bool parse_opts(int argc, char *argv[]) {
static struct option long_options[] = {
{"connect-receive", required_argument, 0, 'c'},
{"connect-send", required_argument, 0, 's'},
{"connect-send-receive", required_argument, 0, 't'},
{"listen-receive", required_argument, 0, 'l'},
{"listen-send", required_argument, 0, 'm'},
{"listen-send-receive", required_argument, 0, 'n'},
{"file-read", required_argument, 0, 'r'},
{"file-write", required_argument, 0, 'w'},
{"file-write-read", required_argument, 0, 'x'},
{"file-append", required_argument, 0, 'a'},
{"file-append-read", required_argument, 0, 'b'},
{"exec-receive", required_argument, 0, 'e'},
{"exec-send", required_argument, 0, 'f'},
{"exec-send-receive", required_argument, 0, 'g'},
{"stdin", no_argument, 0, 'i'},
{"stdout", required_argument, 0, 'o'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0 },
};
int opt;
while ((opt = getopt_long_only(argc, argv, "", long_options, NULL)) != -1) {
bool (*handler)(char *) = NULL;
switch (opt) {
case 'c':
handler = opts_add_connect_receive;
break;
case 's':
handler = opts_add_connect_send;
break;
case 't':
handler = opts_add_connect_send_receive;
break;
case 'l':
handler = opts_add_listen_receive;
break;
case 'm':
handler = opts_add_listen_send;
break;
case 'n':
handler = opts_add_listen_send_receive;
break;
case 'r':
handler = opts_add_file_read;
break;
case 'w':
handler = opts_add_file_write;
break;
case 'x':
handler = opts_add_file_write_read;
break;
case 'a':
handler = opts_add_file_append;
break;
case 'b':
handler = opts_add_file_append_read;
break;
case 'e':
handler = opts_add_exec_receive;
break;
case 'f':
handler = opts_add_exec_send;
break;
case 'g':
handler = opts_add_exec_send_receive;
break;
case 'i':
handler = opts_add_stdin;
break;
case 'o':
handler = opts_add_stdout;
break;
case 'h':
default:
print_usage(argv[0]);
return false;
}
if (handler) {
if (!handler(optarg)) {
fprintf(stderr, "Invalid flag value: %s\n", optarg);
print_usage(argv[0]);
return false;
}
}
}
if (optind != argc) {
fprintf(stderr, "Not a flag: %s\n", argv[optind]);
print_usage(argv[0]);
return false;
}
return true;
}
static void reopen(int fd, char *path, int flags) { static void reopen(int fd, char *path, int flags) {
// Presumes that all fds < fd are open // Presumes that all fds < fd are open
assert(!close(fd)); assert(!close(fd));
assert(open(path, flags | O_CLOEXEC) == fd); assert(open(path, flags | O_CLOEXEC | O_NOCTTY) == fd);
}
static void adsbus_opts_add() {
// This order controls the order in --help, but nothing else.
server_opts_add();
log_opts_add();
outgoing_opts_add();
incoming_opts_add();
exec_opts_add();
file_opts_add();
stdinout_opts_add();
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
adsbus_opts_add();
opts_init(argc, argv);
hex_init(); hex_init();
rand_init(); rand_init();
resolve_init();
log_init();
server_init(); server_init();
resolve_init();
wakeup_init(); wakeup_init();
peer_init(); peer_init();
log_init_peer();
receive_init(); receive_init();
send_init(); send_init();
@@ -191,9 +68,11 @@ int main(int argc, char *argv[]) {
proto_init(); proto_init();
stats_init(); stats_init();
if (!parse_opts(argc, argv)) { outgoing_init();
peer_shutdown(0); incoming_init();
} exec_init();
file_init();
stdinout_init();
reopen(STDIN_FILENO, "/dev/null", O_RDONLY); reopen(STDIN_FILENO, "/dev/null", O_RDONLY);
reopen(STDOUT_FILENO, "/dev/full", O_WRONLY); reopen(STDOUT_FILENO, "/dev/full", O_WRONLY);
@@ -216,11 +95,14 @@ int main(int argc, char *argv[]) {
rand_cleanup(); rand_cleanup();
wakeup_cleanup(); wakeup_cleanup();
log_cleanup_peer();
peer_cleanup(); peer_cleanup();
log_cleanup();
assert(!close(STDIN_FILENO)); assert(!close(STDIN_FILENO));
assert(!close(STDOUT_FILENO)); assert(!close(STDOUT_FILENO));
assert(!close(STDERR_FILENO));
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }

View File

@@ -1,12 +1,9 @@
#include <assert.h> #include <assert.h>
#include <stdio.h>
#include <string.h>
#include "buf.h" #include "buf.h"
#include "hex.h" #include "hex.h"
#include "packet.h" #include "packet.h"
#include "receive.h" #include "receive.h"
#include "uuid.h"
#include "airspy_adsb.h" #include "airspy_adsb.h"

View File

@@ -2,10 +2,12 @@
#include <fcntl.h> #include <fcntl.h>
#include <netdb.h> #include <netdb.h>
#include <pthread.h> #include <pthread.h>
#include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "asyncaddrinfo.h" #include "asyncaddrinfo.h"
@@ -27,6 +29,11 @@ static int asyncaddrinfo_write_fd;
static void *asyncaddrinfo_main(void *arg) { static void *asyncaddrinfo_main(void *arg) {
int fd = (int) (intptr_t) arg; int fd = (int) (intptr_t) arg;
sigset_t sigmask;
assert(!sigfillset(&sigmask));
assert(!pthread_sigmask(SIG_BLOCK, &sigmask, NULL));
struct asyncaddrinfo_resolution *res; struct asyncaddrinfo_resolution *res;
ssize_t len; ssize_t len;
while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) { while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) {

View File

@@ -1,7 +1,5 @@
#include <assert.h> #include <assert.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include <stdint.h>
#include "buf.h" #include "buf.h"
#include "packet.h" #include "packet.h"

View File

@@ -1,17 +1,20 @@
#include <assert.h> #include <assert.h>
#include <signal.h> #include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
#include "buf.h"
#include "flow.h" #include "flow.h"
#include "list.h" #include "log.h"
#include "opts.h"
#include "peer.h" #include "peer.h"
#include "receive.h"
#include "send.h"
#include "send_receive.h"
#include "uuid.h" #include "uuid.h"
#include "wakeup.h" #include "wakeup.h"
@@ -19,6 +22,7 @@
struct exec { struct exec {
struct peer peer; struct peer peer;
struct peer log_peer;
uint8_t id[UUID_LEN]; uint8_t id[UUID_LEN];
char *command; char *command;
struct flow *flow; struct flow *flow;
@@ -28,89 +32,164 @@ struct exec {
}; };
static struct list_head exec_head = LIST_HEAD_INIT(exec_head); static struct list_head exec_head = LIST_HEAD_INIT(exec_head);
static opts_group exec_opts;
static char log_module = 'E';
static void exec_spawn_wrapper(struct peer *); static void exec_spawn_wrapper(struct peer *);
static void exec_harvest(struct exec *exec) {
if (exec->child > 0) {
int status;
assert(waitpid(exec->child, &status, 0) == exec->child);
exec->child = -1;
if (WIFEXITED(status)) {
LOG(exec->id, "Client exited with status %d", WEXITSTATUS(status));
} else {
assert(WIFSIGNALED(status));
LOG(exec->id, "Client exited with signal %d", WTERMSIG(status));
}
}
peer_close(&exec->log_peer);
}
static void exec_del(struct exec *exec) { static void exec_del(struct exec *exec) {
flow_ref_dec(exec->flow); flow_ref_dec(exec->flow);
if (exec->child > 0) { if (exec->child > 0) {
fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child); LOG(exec->id, "Sending SIGTERM to child process %d", exec->child);
// Racy with the process terminating, so don't assert on it // Racy with the process terminating, so don't assert on it
kill(exec->child, SIGTERM); kill(exec->child, SIGTERM);
assert(waitpid(exec->child, NULL, 0) == exec->child);
} }
exec_harvest(exec);
list_del(&exec->exec_list); list_del(&exec->exec_list);
free(exec->command); free(exec->command);
free(exec); free(exec);
} }
static void exec_close_handler(struct peer *peer) { static void exec_close_handler(struct peer *peer) {
struct exec *exec = (struct exec *) peer; struct exec *exec = container_of(peer, struct exec, peer);
int status; exec_harvest(exec);
assert(waitpid(exec->child, &status, WNOHANG) == exec->child);
exec->child = -1;
if (WIFEXITED(status)) {
fprintf(stderr, "E %s: Client exited with status %d\n", exec->id, WEXITSTATUS(status));
} else {
assert(WIFSIGNALED(status));
fprintf(stderr, "E %s: Client exited with signal %d\n", exec->id, WTERMSIG(status));
}
uint32_t delay = wakeup_get_retry_delay_ms(1); uint32_t delay = wakeup_get_retry_delay_ms(1);
fprintf(stderr, "E %s: Will retry in %ds\n", exec->id, delay / 1000); LOG(exec->id, "Will retry in %ds", delay / 1000);
exec->peer.event_handler = exec_spawn_wrapper; exec->peer.event_handler = exec_spawn_wrapper;
wakeup_add((struct peer *) exec, delay); wakeup_add(&exec->peer, delay);
} }
static void exec_parent(struct exec *exec, pid_t child, int fd) { static void exec_log_handler(struct peer *peer) {
// Do you believe in magic?
struct exec *exec = container_of(peer, struct exec, log_peer);
char linebuf[4096];
ssize_t ret = read(exec->log_peer.fd, linebuf, 4096);
if (ret <= 0) {
LOG(exec->id, "Log input stream closed");
peer_close(&exec->log_peer);
return;
}
size_t len = (size_t) ret;
char *iter = linebuf, *eol;
while ((eol = memchr(iter, '\n', len))) {
assert(eol >= iter);
size_t linelen = (size_t) (eol - iter);
LOG(exec->id, "(child output) %.*s", (int) linelen, iter);
iter += (linelen + 1);
len -= (linelen + 1);
}
if (len) {
LOG(exec->id, "(child output) %.*s", (int) len, iter);
}
}
static void exec_parent(struct exec *exec, pid_t child, int data_fd, int log_fd) {
exec->child = child; exec->child = child;
fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child); LOG(exec->id, "Child started as process %d", exec->child);
exec->log_peer.fd = log_fd;
exec->log_peer.event_handler = exec_log_handler;
peer_epoll_add(&exec->log_peer, EPOLLIN);
exec->peer.event_handler = exec_close_handler; exec->peer.event_handler = exec_close_handler;
if (!flow_new_send_hello(fd, exec->flow, exec->passthrough, (struct peer *) exec)) { if (!flow_new_send_hello(data_fd, exec->flow, exec->passthrough, &exec->peer)) {
exec_close_handler((struct peer *) exec); exec_close_handler(&exec->peer);
return; return;
} }
} }
static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int data_fd, int log_fd) {
assert(setsid() != -1); assert(setsid() != -1);
// We leave stderr open from child to parent // We leave stderr open from child to parent
// Other than that, fds should have CLOEXEC set // Other than that, fds should have CLOEXEC set
if (fd != 0) { if (data_fd != STDIN_FILENO) {
assert(dup2(fd, 0) == 0); assert(dup2(data_fd, STDIN_FILENO) == STDIN_FILENO);
} }
if (fd != 1) { if (data_fd != STDOUT_FILENO) {
assert(dup2(fd, 1) == 1); assert(dup2(data_fd, STDOUT_FILENO) == STDOUT_FILENO);
} }
if (fd != 0 && fd != 1) { if (data_fd != STDIN_FILENO && data_fd != STDOUT_FILENO) {
assert(!close(fd)); assert(!close(data_fd));
}
if (log_fd != STDERR_FILENO) {
assert(dup2(log_fd, STDERR_FILENO) == STDERR_FILENO);
assert(!close(log_fd));
} }
assert(!execl("/bin/sh", "sh", "-c", exec->command, NULL)); assert(!execl("/bin/sh", "sh", "-c", exec->command, NULL));
abort(); abort();
} }
static void exec_spawn(struct exec *exec) { static void exec_spawn(struct exec *exec) {
fprintf(stderr, "E %s: Executing: %s\n", exec->id, exec->command); LOG(exec->id, "Executing: %s", exec->command);
int fds[2]; int data_fds[2], log_fds[2];
assert(!socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds)); // Leave these sockets blocking; we move in lock step with subprograms
assert(!socketpair(AF_UNIX, SOCK_STREAM, 0, data_fds));
assert(!socketpair(AF_UNIX, SOCK_STREAM, 0, log_fds));
int res = fork(); int res = fork();
assert(res >= 0); assert(res >= 0);
if (res) { if (res) {
assert(!close(fds[1])); assert(!close(data_fds[1]));
exec_parent(exec, res, fds[0]); assert(!close(log_fds[1]));
assert(!shutdown(log_fds[0], SHUT_WR));
exec_parent(exec, res, data_fds[0], log_fds[0]);
} else { } else {
assert(!close(fds[0])); assert(!close(data_fds[0]));
exec_child(exec, fds[1]); assert(!close(log_fds[0]));
exec_child(exec, data_fds[1], log_fds[1]);
} }
} }
static void exec_spawn_wrapper(struct peer *peer) { static void exec_spawn_wrapper(struct peer *peer) {
struct exec *exec = (struct exec *) peer; struct exec *exec = container_of(peer, struct exec, peer);
exec_spawn(exec); exec_spawn(exec);
} }
static bool exec_add(const char *cmd, struct flow *flow, void *passthrough) {
exec_new(cmd, flow, passthrough);
return true;
}
static bool exec_receive(const char *arg) {
return exec_add(arg, receive_flow, NULL);
}
static bool exec_send(const char *arg) {
return send_add(exec_add, send_flow, arg);
}
static bool exec_send_receive(const char *arg) {
return send_add(exec_add, send_receive_flow, arg);
}
void exec_opts_add() {
opts_add("exec-receive", "COMMAND", exec_receive, exec_opts);
opts_add("exec-send", "FORMAT=COMMAND", exec_send, exec_opts);
opts_add("exec-send-receive", "FORMAT=COMMAND", exec_send_receive, exec_opts);
}
void exec_init() {
opts_call(exec_opts);
}
void exec_cleanup() { void exec_cleanup() {
struct exec *iter, *next; struct exec *iter, *next;
list_for_each_entry_safe(iter, next, &exec_head, exec_list) { list_for_each_entry_safe(iter, next, &exec_head, exec_list) {
@@ -118,7 +197,7 @@ void exec_cleanup() {
} }
} }
void exec_new(char *command, struct flow *flow, void *passthrough) { void exec_new(const char *command, struct flow *flow, void *passthrough) {
flow_ref_inc(flow); flow_ref_inc(flow);
struct exec *exec = malloc(sizeof(*exec)); struct exec *exec = malloc(sizeof(*exec));

View File

@@ -2,5 +2,7 @@
struct flow; struct flow;
void exec_opts_add(void);
void exec_init(void);
void exec_cleanup(void); void exec_cleanup(void);
void exec_new(char *, struct flow *, void *); void exec_new(const char *, struct flow *, void *);

View File

@@ -2,18 +2,18 @@
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include <sys/epoll.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "flow.h" #include "flow.h"
#include "list.h" #include "log.h"
#include "opts.h"
#include "peer.h" #include "peer.h"
#include "receive.h" #include "receive.h"
#include "send.h" #include "send.h"
#include "send_receive.h"
#include "uuid.h" #include "uuid.h"
#include "wakeup.h" #include "wakeup.h"
@@ -32,6 +32,9 @@ struct file {
}; };
static struct list_head file_head = LIST_HEAD_INIT(file_head); static struct list_head file_head = LIST_HEAD_INIT(file_head);
static opts_group file_opts;
static char log_module = 'F';
static void file_open_wrapper(struct peer *); static void file_open_wrapper(struct peer *);
@@ -67,14 +70,14 @@ static void file_del(struct file *file) {
static void file_retry(struct file *file) { static void file_retry(struct file *file) {
uint32_t delay = wakeup_get_retry_delay_ms(file->attempt++); uint32_t delay = wakeup_get_retry_delay_ms(file->attempt++);
fprintf(stderr, "F %s: Will retry in %ds\n", file->id, delay / 1000); LOG(file->id, "Will retry in %ds", delay / 1000);
file->peer.event_handler = file_open_wrapper; file->peer.event_handler = file_open_wrapper;
wakeup_add((struct peer *) file, delay); wakeup_add(&file->peer, delay);
} }
static void file_handle_close(struct peer *peer) { static void file_handle_close(struct peer *peer) {
struct file *file = (struct file *) peer; struct file *file = container_of(peer, struct file, peer);
fprintf(stderr, "F %s: File closed: %s\n", file->id, file->path); LOG(file->id, "File closed: %s", file->path);
if (file->retry) { if (file->retry) {
file_retry(file); file_retry(file);
@@ -84,10 +87,10 @@ static void file_handle_close(struct peer *peer) {
} }
static void file_open(struct file *file) { static void file_open(struct file *file) {
fprintf(stderr, "F %s: Opening file: %s\n", file->id, file->path); LOG(file->id, "Opening file: %s", file->path);
int fd = open(file->path, file->flags | O_CLOEXEC, S_IRUSR | S_IWUSR); int fd = open(file->path, file->flags | O_CLOEXEC | O_NOCTTY, S_IRUSR | S_IWUSR);
if (fd == -1) { if (fd == -1) {
fprintf(stderr, "F %s: Error opening file: %s\n", file->id, strerror(errno)); LOG(file->id, "Error opening file: %s", strerror(errno));
file_retry(file); file_retry(file);
return; return;
} }
@@ -95,19 +98,19 @@ static void file_open(struct file *file) {
file->retry = file_should_retry(fd, file); file->retry = file_should_retry(fd, file);
file->peer.event_handler = file_handle_close; file->peer.event_handler = file_handle_close;
file->attempt = 0; file->attempt = 0;
if (!flow_new_send_hello(fd, file->flow, file->passthrough, (struct peer *) file)) { if (!flow_new_send_hello(fd, file->flow, file->passthrough, &file->peer)) {
fprintf(stderr, "F %s: Error writing greeting\n", file->id); LOG(file->id, "Error writing greeting");
file_retry(file); file_retry(file);
return; return;
} }
} }
static void file_open_wrapper(struct peer *peer) { static void file_open_wrapper(struct peer *peer) {
struct file *file = (struct file *) peer; struct file *file = container_of(peer, struct file, peer);
file_open(file); file_open(file);
} }
static void file_new(char *path, int flags, struct flow *flow, void *passthrough) { static void file_new(const char *path, int flags, struct flow *flow, void *passthrough) {
flow_ref_inc(flow); flow_ref_inc(flow);
struct file *file = malloc(sizeof(*file)); struct file *file = malloc(sizeof(*file));
@@ -126,6 +129,49 @@ static void file_new(char *path, int flags, struct flow *flow, void *passthrough
file_open(file); file_open(file);
} }
static bool file_write_add(const char *path, struct flow *flow, void *passthrough) {
file_write_new(path, flow, passthrough);
return true;
}
static bool file_append_add(const char *path, struct flow *flow, void *passthrough) {
file_append_new(path, flow, passthrough);
return true;
}
static bool file_read(const char *arg) {
file_read_new(arg, receive_flow, NULL);
return true;
}
static bool file_write(const char *arg) {
return send_add(file_write_add, send_flow, arg);
}
static bool file_write_read(const char *arg) {
return send_add(file_write_add, send_receive_flow, arg);
}
static bool file_append(const char *arg) {
return send_add(file_append_add, send_flow, arg);
}
static bool file_append_read(const char *arg) {
return send_add(file_append_add, send_receive_flow, arg);
}
void file_opts_add() {
opts_add("file-read", "PATH", file_read, file_opts);
opts_add("file-write", "FORMAT=PATH", file_write, file_opts);
opts_add("file-write-read", "FORMAT=PATH", file_write_read, file_opts);
opts_add("file-append", "FORMAT=PATH", file_append, file_opts);
opts_add("file-append-read", "FORMAT=PATH", file_append_read, file_opts);
}
void file_init() {
opts_call(file_opts);
}
void file_cleanup() { void file_cleanup() {
struct file *iter, *next; struct file *iter, *next;
list_for_each_entry_safe(iter, next, &file_head, file_list) { list_for_each_entry_safe(iter, next, &file_head, file_list) {
@@ -133,14 +179,14 @@ void file_cleanup() {
} }
} }
void file_read_new(char *path, struct flow *flow, void *passthrough) { void file_read_new(const char *path, struct flow *flow, void *passthrough) {
file_new(path, O_RDONLY, flow, passthrough); file_new(path, O_RDONLY, flow, passthrough);
} }
void file_write_new(char *path, struct flow *flow, void *passthrough) { void file_write_new(const char *path, struct flow *flow, void *passthrough) {
file_new(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC, flow, passthrough); file_new(path, O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC, flow, passthrough);
} }
void file_append_new(char *path, struct flow *flow, void *passthrough) { void file_append_new(const char *path, struct flow *flow, void *passthrough) {
file_new(path, O_WRONLY | O_CREAT | O_NOFOLLOW, flow, passthrough); file_new(path, O_WRONLY | O_CREAT | O_NOFOLLOW, flow, passthrough);
} }

View File

@@ -2,7 +2,9 @@
struct flow; struct flow;
void file_opts_add(void);
void file_init(void);
void file_cleanup(void); void file_cleanup(void);
void file_read_new(char *, struct flow *, void *); void file_read_new(const char *, struct flow *, void *);
void file_write_new(char *, struct flow *, void *); void file_write_new(const char *, struct flow *, void *);
void file_append_new(char *, struct flow *, void *); void file_append_new(const char *, struct flow *, void *);

View File

@@ -1,18 +1,21 @@
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <errno.h>
#include <netdb.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h> #include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <unistd.h> #include <unistd.h>
#include "buf.h"
#include "flow.h" #include "flow.h"
#include "list.h" #include "log.h"
#include "opts.h"
#include "peer.h" #include "peer.h"
#include "receive.h"
#include "resolve.h" #include "resolve.h"
#include "send.h"
#include "send_receive.h"
#include "socket.h" #include "socket.h"
#include "wakeup.h" #include "wakeup.h"
#include "uuid.h" #include "uuid.h"
@@ -31,25 +34,28 @@ struct incoming {
}; };
static struct list_head incoming_head = LIST_HEAD_INIT(incoming_head); static struct list_head incoming_head = LIST_HEAD_INIT(incoming_head);
static opts_group incoming_opts;
static char log_module = 'I';
static void incoming_resolve_wrapper(struct peer *); static void incoming_resolve_wrapper(struct peer *);
static void incoming_retry(struct incoming *incoming) { static void incoming_retry(struct incoming *incoming) {
uint32_t delay = wakeup_get_retry_delay_ms(incoming->attempt++); uint32_t delay = wakeup_get_retry_delay_ms(incoming->attempt++);
fprintf(stderr, "I %s: Will retry in %ds\n", incoming->id, delay / 1000); LOG(incoming->id, "Will retry in %ds", delay / 1000);
incoming->peer.event_handler = incoming_resolve_wrapper; incoming->peer.event_handler = incoming_resolve_wrapper;
wakeup_add((struct peer *) incoming, delay); wakeup_add(&incoming->peer, delay);
} }
static void incoming_handler(struct peer *peer) { static void incoming_handler(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer; struct incoming *incoming = container_of(peer, struct incoming, peer);
struct sockaddr peer_addr, local_addr; struct sockaddr peer_addr, local_addr;
socklen_t peer_addrlen = sizeof(peer_addr), local_addrlen = sizeof(local_addr); socklen_t peer_addrlen = sizeof(peer_addr), local_addrlen = sizeof(local_addr);
int fd = accept4(incoming->peer.fd, &peer_addr, &peer_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); int fd = accept4(incoming->peer.fd, &peer_addr, &peer_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (fd == -1) { if (fd == -1) {
fprintf(stderr, "I %s: Failed to accept new connection on %s/%s: %s\n", incoming->id, incoming->node, incoming->service, strerror(errno)); LOG(incoming->id, "Failed to accept new connection on %s/%s: %s", incoming->node, incoming->service, strerror(errno));
return; return;
} }
@@ -58,8 +64,7 @@ static void incoming_handler(struct peer *peer) {
assert(getnameinfo(&peer_addr, peer_addrlen, peer_hbuf, sizeof(peer_hbuf), peer_sbuf, sizeof(peer_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(&peer_addr, peer_addrlen, peer_hbuf, sizeof(peer_hbuf), peer_sbuf, sizeof(peer_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
assert(getnameinfo(&local_addr, local_addrlen, local_hbuf, sizeof(local_hbuf), local_sbuf, sizeof(local_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(&local_addr, local_addrlen, local_hbuf, sizeof(local_hbuf), local_sbuf, sizeof(local_sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "I %s: New incoming connection on %s/%s (%s/%s) from %s/%s\n", LOG(incoming->id, "New incoming connection on %s/%s (%s/%s) from %s/%s",
incoming->id,
incoming->node, incoming->service, incoming->node, incoming->service,
local_hbuf, local_sbuf, local_hbuf, local_sbuf,
peer_hbuf, peer_sbuf); peer_hbuf, peer_sbuf);
@@ -67,16 +72,14 @@ static void incoming_handler(struct peer *peer) {
flow_socket_connected(fd, incoming->flow); flow_socket_connected(fd, incoming->flow);
if (!flow_new_send_hello(fd, incoming->flow, incoming->passthrough, NULL)) { if (!flow_new_send_hello(fd, incoming->flow, incoming->passthrough, NULL)) {
fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); LOG(incoming->id, "Error writing greeting");
return; return;
} }
} }
static void incoming_del(struct incoming *incoming) { static void incoming_del(struct incoming *incoming) {
flow_ref_dec(incoming->flow); flow_ref_dec(incoming->flow);
if (incoming->peer.fd >= 0) { peer_close(&incoming->peer);
assert(!close(incoming->peer.fd));
}
list_del(&incoming->incoming_list); list_del(&incoming->incoming_list);
free(incoming->node); free(incoming->node);
free(incoming->service); free(incoming->service);
@@ -84,12 +87,12 @@ static void incoming_del(struct incoming *incoming) {
} }
static void incoming_listen(struct peer *peer) { static void incoming_listen(struct peer *peer) {
struct incoming *incoming = (struct incoming *) peer; struct incoming *incoming = container_of(peer, struct incoming, peer);
struct addrinfo *addrs; struct addrinfo *addrs;
int err = resolve_result(peer, &addrs); int err = resolve_result(peer, &addrs);
if (err) { if (err) {
fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(err)); LOG(incoming->id, "Failed to resolve %s/%s: %s", incoming->node, incoming->service, gai_strerror(err));
incoming_retry(incoming); incoming_retry(incoming);
return; return;
} }
@@ -98,7 +101,7 @@ static void incoming_listen(struct peer *peer) {
for (addr = addrs; addr; addr = addr->ai_next) { for (addr = addrs; addr; addr = addr->ai_next) {
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "I %s: Listening on %s/%s...\n", incoming->id, hbuf, sbuf); LOG(incoming->id, "Listening on %s/%s...", hbuf, sbuf);
incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol); incoming->peer.fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, addr->ai_protocol);
assert(incoming->peer.fd >= 0); assert(incoming->peer.fd >= 0);
@@ -106,7 +109,7 @@ static void incoming_listen(struct peer *peer) {
socket_pre_bind(incoming->peer.fd); socket_pre_bind(incoming->peer.fd);
if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) { if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) {
fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno)); LOG(incoming->id, "Failed to bind to %s/%s: %s", hbuf, sbuf, strerror(errno));
assert(!close(incoming->peer.fd)); assert(!close(incoming->peer.fd));
continue; continue;
} }
@@ -122,24 +125,57 @@ static void incoming_listen(struct peer *peer) {
freeaddrinfo(addrs); freeaddrinfo(addrs);
if (addr == NULL) { if (addr == NULL) {
fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service); LOG(incoming->id, "Failed to bind any addresses for %s/%s...", incoming->node, incoming->service);
incoming_retry(incoming); incoming_retry(incoming);
return; return;
} }
incoming->attempt = 0; incoming->attempt = 0;
incoming->peer.event_handler = incoming_handler; incoming->peer.event_handler = incoming_handler;
peer_epoll_add((struct peer *) incoming, EPOLLIN); peer_epoll_add(&incoming->peer, EPOLLIN);
} }
static void incoming_resolve(struct incoming *incoming) { static void incoming_resolve(struct incoming *incoming) {
fprintf(stderr, "I %s: Resolving %s/%s...\n", incoming->id, incoming->node, incoming->service); LOG(incoming->id, "Resolving %s/%s...", incoming->node, incoming->service);
incoming->peer.event_handler = incoming_listen; incoming->peer.event_handler = incoming_listen;
resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE); resolve(&incoming->peer, incoming->node, incoming->service, AI_PASSIVE);
} }
static void incoming_resolve_wrapper(struct peer *peer) { static void incoming_resolve_wrapper(struct peer *peer) {
incoming_resolve((struct incoming *) peer); incoming_resolve(container_of(peer, struct incoming, peer));
}
static bool incoming_add(const char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (host) {
incoming_new(host, host_port, flow, passthrough);
free(host);
} else {
incoming_new(NULL, host_port, flow, passthrough);
}
return true;
}
static bool incoming_listen_receive(const char *arg) {
return incoming_add(arg, receive_flow, NULL);
}
static bool incoming_listen_send(const char *arg) {
return send_add(incoming_add, send_flow, arg);
}
static bool incoming_listen_send_receive(const char *arg) {
return send_add(incoming_add, send_receive_flow, arg);
}
void incoming_opts_add() {
opts_add("listen-receive", "[HOST/]PORT", incoming_listen_receive, incoming_opts);
opts_add("listen-send", "FORMAT=[HOST/]PORT", incoming_listen_send, incoming_opts);
opts_add("listen-send-receive", "FORMAT=[HOST/]PORT", incoming_listen_send_receive, incoming_opts);
}
void incoming_init() {
opts_call(incoming_opts);
} }
void incoming_cleanup() { void incoming_cleanup() {
@@ -149,7 +185,7 @@ void incoming_cleanup() {
} }
} }
void incoming_new(char *node, char *service, struct flow *flow, void *passthrough) { void incoming_new(const char *node, const char *service, struct flow *flow, void *passthrough) {
flow_ref_inc(flow); flow_ref_inc(flow);
struct incoming *incoming = malloc(sizeof(*incoming)); struct incoming *incoming = malloc(sizeof(*incoming));

View File

@@ -2,5 +2,7 @@
struct flow; struct flow;
void incoming_opts_add(void);
void incoming_init(void);
void incoming_cleanup(void); void incoming_cleanup(void);
void incoming_new(char *, char *, struct flow *, void *); void incoming_new(const char *, const char *, struct flow *, void *);

View File

@@ -1,16 +1,15 @@
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdbool.h>
#include <string.h> #include <string.h>
#include <jansson.h> #include <jansson.h>
#include "hex.h" #include "hex.h"
#include "buf.h" #include "buf.h"
#include "log.h"
#include "packet.h" #include "packet.h"
#include "rand.h" #include "rand.h"
#include "receive.h" #include "receive.h"
#include "send.h"
#include "server.h" #include "server.h"
#include "uuid.h"
#include "json.h" #include "json.h"
@@ -27,6 +26,8 @@ struct json_parser_state {
static json_t *json_prev = NULL; static json_t *json_prev = NULL;
static struct buf json_hello_buf = BUF_INIT; static struct buf json_hello_buf = BUF_INIT;
static char log_module = 'R'; // borrowing
static void json_serialize_to_buf(json_t *obj, struct buf *buf) { static void json_serialize_to_buf(json_t *obj, struct buf *buf) {
assert(json_dump_callback(obj, json_buf_append_callback, buf, 0) == 0); assert(json_dump_callback(obj, json_buf_append_callback, buf, 0) == 0);
json_decref(obj); json_decref(obj);
@@ -80,11 +81,11 @@ static bool json_parse_header(json_t *in, struct packet *packet, struct json_par
} }
if (!strcmp(json_server_id, (const char *) server_id)) { if (!strcmp(json_server_id, (const char *) server_id)) {
fprintf(stderr, "R %s: Attempt to receive json data from our own server ID (%s); loop!\n", packet->source_id, server_id); LOG(packet->source_id, "Attempt to receive json data from our own server ID (%s); loop!", server_id);
return false; return false;
} }
fprintf(stderr, "R %s: Connected to server ID: %s\n", packet->source_id, json_server_id); LOG(packet->source_id, "Connected to server ID: %s", json_server_id);
state->mlat_timestamp_mhz = (uint16_t) mlat_timestamp_mhz; state->mlat_timestamp_mhz = (uint16_t) mlat_timestamp_mhz;
state->mlat_timestamp_max = (uint64_t) mlat_timestamp_max; state->mlat_timestamp_max = (uint64_t) mlat_timestamp_max;

View File

@@ -1,5 +1,3 @@
#include <stdlib.h>
#include "list.h" #include "list.h"
void list_head_init(struct list_head *head) { void list_head_init(struct list_head *head) {

View File

@@ -1,16 +1,15 @@
#pragma once #pragma once
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h>
#pragma GCC diagnostic ignored "-Wcast-align" #pragma GCC diagnostic ignored "-Wcast-align"
#pragma GCC diagnostic ignored "-Wgnu-statement-expression" #pragma GCC diagnostic ignored "-Wgnu-statement-expression"
#pragma GCC diagnostic ignored "-Wlanguage-extension-token" #pragma GCC diagnostic ignored "-Wlanguage-extension-token"
#define offset_of(type, member) ((size_t) &((type *) NULL)->member)
#define container_of(ptr, type, member) ({ \ #define container_of(ptr, type, member) ({ \
typeof( ((type *) NULL)->member ) *__mptr = (ptr); \ typeof( ((type *) NULL)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offset_of(type, member) );}) (type *)( (char *)__mptr - offsetof(type, member) );})
struct list_head { struct list_head {
struct list_head *next; struct list_head *next;

148
adsbus/log.c Normal file
View File

@@ -0,0 +1,148 @@
#include <assert.h>
#include <fcntl.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "log.h"
#include "opts.h"
#include "peer.h"
#include "uuid.h"
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
static FILE *log_stream = NULL;
static char *log_path = NULL;
static uint8_t log_id[UUID_LEN];
static struct peer log_rotate_peer;
static bool log_timestamps = false;
static bool log_quiet = false;
static opts_group log_opts;
static char log_module = 'L';
static void log_open() {
if (!log_path) {
return;
}
int fd = open(log_path, O_WRONLY | O_CREAT | O_APPEND | O_NOCTTY | O_CLOEXEC, S_IRUSR | S_IWUSR);
assert(fd >= 0);
assert(dup3(fd, STDERR_FILENO, O_CLOEXEC) == STDERR_FILENO);
assert(!close(fd));
}
static void log_rotate() {
assert(log_path);
uint8_t old_log_id[UUID_LEN], new_log_id[UUID_LEN];
uuid_gen(new_log_id);
LOG(log_id, "Switching to new log with ID %s at: %s", new_log_id, log_path);
memcpy(old_log_id, log_id, UUID_LEN);
memcpy(log_id, new_log_id, UUID_LEN);
log_open();
LOG(log_id, "Log start after switch from log ID %s", old_log_id);
}
static void log_rotate_handler(struct peer *peer) {
struct signalfd_siginfo siginfo;
assert(read(peer->fd, &siginfo, sizeof(siginfo)) == sizeof(siginfo));
LOG(log_id, "Received signal %u; rotating logs", siginfo.ssi_signo);
log_rotate();
}
static bool log_set_path(const char *path) {
if (log_path) {
return false;
}
log_path = strdup(path);
assert(log_path);
return true;
}
static bool log_enable_timestamps(const char __attribute__ ((unused)) *arg) {
log_timestamps = true;
return true;
}
static bool log_set_quiet(const char __attribute__ ((unused)) *arg) {
log_quiet = true;
return true;
}
void log_opts_add() {
opts_add("log-file", "PATH", log_set_path, log_opts);
opts_add("log-timestamps", NULL, log_enable_timestamps, log_opts);
opts_add("quiet", NULL, log_set_quiet, log_opts);
}
void log_init() {
opts_call(log_opts);
log_open();
log_stream = fdopen(STDERR_FILENO, "a");
assert(log_stream);
setlinebuf(log_stream);
uuid_gen(log_id);
LOG(log_id, "Log start");
}
void log_init_peer() {
if (log_path) {
sigset_t sigmask;
assert(!sigemptyset(&sigmask));
assert(!sigaddset(&sigmask, SIGHUP));
log_rotate_peer.fd = signalfd(-1, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
assert(log_rotate_peer.fd >= 0);
log_rotate_peer.event_handler = log_rotate_handler;
peer_epoll_add(&log_rotate_peer, EPOLLIN);
assert(!sigprocmask(SIG_BLOCK, &sigmask, NULL));
} else {
log_rotate_peer.fd = -1;
}
}
void log_cleanup_peer() {
peer_close(&log_rotate_peer);
}
void log_cleanup() {
LOG(log_id, "Log end");
assert(!fclose(log_stream));
if (log_path) {
free(log_path);
log_path = NULL;
}
}
void log_write(char type, const char *loc, const uint8_t *id, const char *fmt, ...) {
if (log_quiet) {
return;
}
va_list ap;
va_start(ap, fmt);
char datetime[64] = "";
if (log_timestamps) {
time_t now;
assert(time(&now) >= 0);
struct tm tmnow;
assert(gmtime_r(&now, &tmnow));
assert(strftime(datetime, sizeof(datetime), "%FT%TZ ", &tmnow) > 0);
}
assert(fprintf(log_stream, "%s[%18s] %c %s: ", datetime, loc, type, id) > 0);
assert(vfprintf(log_stream, fmt, ap) > 0);
assert(fprintf(log_stream, "\n") == 1);
va_end(ap);
}

16
adsbus/log.h Normal file
View File

@@ -0,0 +1,16 @@
#pragma once
#include <stdbool.h>
#include <stdint.h>
#define LOG_STR(line) #line
#define LOG_LOC(file, line) (file ":" LOG_STR(line))
#define LOG(id, ...) log_write((log_module), LOG_LOC(__FILE__, __LINE__), (id), __VA_ARGS__)
void log_opts_add(void);
void log_init(void);
void log_cleanup(void);
void log_init_peer(void);
void log_cleanup_peer(void);
void log_write(char, const char *, const uint8_t *, const char *, ...)
__attribute__ ((__format__ (__printf__, 4, 5)));

View File

@@ -1,23 +1,91 @@
#include <assert.h> #include <assert.h>
#include <fcntl.h> #include <getopt.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "exec.h"
#include "file.h"
#include "flow.h"
#include "incoming.h"
#include "outgoing.h"
#include "receive.h"
#include "send.h"
#include "send_receive.h"
#include "opts.h" #include "opts.h"
static char *opts_split(char **arg, char delim) { #define OPTS_MAX 128
static struct {
const char *arg_help;
opts_handler handler;
void *group;
} opts[OPTS_MAX];
static struct option opts_long[OPTS_MAX];
static size_t opts_num = 0;
static int opts_argc;
static char **opts_argv;
static opts_group opts_group_internal;
static void opts_print_usage() {
fprintf(stderr,
"Usage: %s [OPTION]...\n"
"\n"
"Options:\n"
, opts_argv[0]);
for (size_t i = 0; i < opts_num; i++) {
fprintf(stderr, "\t--%s%s%s\n", opts_long[i].name, opts[i].arg_help ? "=" : "", opts[i].arg_help ? opts[i].arg_help : "");
}
}
static bool opts_help(const char __attribute__ ((unused)) *arg) {
opts_print_usage();
exit(EXIT_SUCCESS);
}
void opts_init(int argc, char *argv[]) {
opts_argc = argc;
opts_argv = argv;
opts_add("help", NULL, opts_help, opts_group_internal);
assert(opts_num < OPTS_MAX);
opts_long[opts_num].name = NULL;
opts_long[opts_num].has_arg = 0;
opts_long[opts_num].flag = NULL;
opts_long[opts_num].val = 0;
opts_call(opts_group_internal);
}
void opts_add(const char *name, const char *arg_help, opts_handler handler, opts_group group) {
assert(opts_num < OPTS_MAX);
opts[opts_num].arg_help = arg_help;
opts[opts_num].handler = handler;
opts[opts_num].group = group;
opts_long[opts_num].name = name;
opts_long[opts_num].has_arg = arg_help ? required_argument : no_argument;
opts_long[opts_num].flag = NULL;
opts_long[opts_num].val = 0;
opts_num++;
}
void opts_call(opts_group group) {
optind = 1;
int opt, longindex;
while ((opt = getopt_long_only(opts_argc, opts_argv, "", opts_long, &longindex)) == 0) {
if (opts[longindex].group != group) {
continue;
}
if (!opts[longindex].handler(optarg)) {
fprintf(stderr, "Invalid option value: %s\n", opts_argv[optind - 1]);
exit(EXIT_FAILURE);
}
}
if (opt != -1) {
opts_print_usage();
exit(EXIT_FAILURE);
}
if (optind != opts_argc) {
fprintf(stderr, "Not a flag: %s\n", opts_argv[optind]);
exit(EXIT_FAILURE);
}
}
char *opts_split(const char **arg, char delim) {
char *split = strchr(*arg, delim); char *split = strchr(*arg, delim);
if (!split) { if (!split) {
return NULL; return NULL;
@@ -26,137 +94,3 @@ static char *opts_split(char **arg, char delim) {
*arg = split + 1; *arg = split + 1;
return ret; return ret;
} }
static bool opts_add_listen(char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (host) {
incoming_new(host, host_port, flow, passthrough);
free(host);
} else {
incoming_new(NULL, host_port, flow, passthrough);
}
return true;
}
static bool opts_add_connect(char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (!host) {
return false;
}
outgoing_new(host, host_port, flow, passthrough);
free(host);
return true;
}
static bool opts_add_file_write_int(char *path, struct flow *flow, void *passthrough) {
file_write_new(path, flow, passthrough);
return true;
}
static bool opts_add_file_append_int(char *path, struct flow *flow, void *passthrough) {
file_append_new(path, flow, passthrough);
return true;
}
static bool opts_add_exec(char *cmd, struct flow *flow, void *passthrough) {
exec_new(cmd, flow, passthrough);
return true;
}
static struct serializer *opts_get_serializer(char **arg) {
char *format = opts_split(arg, '=');
if (!format) {
return NULL;
}
struct serializer *serializer = send_get_serializer(format);
free(format);
if (!serializer) {
return NULL;
}
return serializer;
}
static bool opts_add_send(bool (*next)(char *, struct flow *, void *), struct flow *flow, char *arg) {
struct serializer *serializer = opts_get_serializer(&arg);
if (!serializer) {
return false;
}
return next(arg, flow, serializer);
}
bool opts_add_connect_receive(char *arg) {
return opts_add_connect(arg, receive_flow, NULL);
}
bool opts_add_connect_send(char *arg) {
return opts_add_send(opts_add_connect, send_flow, arg);
}
bool opts_add_connect_send_receive(char *arg) {
return opts_add_send(opts_add_connect, send_receive_flow, arg);
}
bool opts_add_listen_receive(char *arg) {
return opts_add_listen(arg, receive_flow, NULL);
}
bool opts_add_listen_send(char *arg) {
return opts_add_send(opts_add_listen, send_flow, arg);
}
bool opts_add_listen_send_receive(char *arg) {
return opts_add_send(opts_add_listen, send_receive_flow, arg);
}
bool opts_add_file_read(char *arg) {
file_read_new(arg, receive_flow, NULL);
return true;
}
bool opts_add_file_write(char *arg) {
return opts_add_send(opts_add_file_write_int, send_flow, arg);
}
bool opts_add_file_write_read(char *arg) {
return opts_add_send(opts_add_file_write_int, send_receive_flow, arg);
}
bool opts_add_file_append(char *arg) {
return opts_add_send(opts_add_file_append_int, send_flow, arg);
}
bool opts_add_file_append_read(char *arg) {
return opts_add_send(opts_add_file_append_int, send_receive_flow, arg);
}
bool opts_add_exec_receive(char *arg) {
exec_new(arg, receive_flow, NULL);
return true;
}
bool opts_add_exec_send(char *arg) {
return opts_add_send(opts_add_exec, send_flow, arg);
}
bool opts_add_exec_send_receive(char *arg) {
return opts_add_send(opts_add_exec, send_receive_flow, arg);
}
bool opts_add_stdin(char __attribute__((unused)) *arg) {
int fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 0);
assert(fd >= 0);
return flow_new_send_hello(fd, receive_flow, NULL, NULL);
}
bool opts_add_stdout(char *arg) {
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
return false;
}
int fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 0);
assert(fd >= 0);
return flow_new_send_hello(fd, send_flow, serializer, NULL);
}

View File

@@ -2,19 +2,10 @@
#include <stdbool.h> #include <stdbool.h>
bool opts_add_connect_receive(char *); typedef bool (*opts_handler)(const char *);
bool opts_add_connect_send(char *); typedef char opts_group[1];
bool opts_add_connect_send_receive(char *);
bool opts_add_listen_receive(char *); void opts_init(int, char *[]);
bool opts_add_listen_send(char *); void opts_add(const char *, const char *, opts_handler, opts_group);
bool opts_add_listen_send_receive(char *); void opts_call(opts_group);
bool opts_add_file_read(char *); char *opts_split(const char **, char);
bool opts_add_file_write(char *);
bool opts_add_file_write_read(char *);
bool opts_add_file_append(char *);
bool opts_add_file_append_read(char *);
bool opts_add_exec_receive(char *);
bool opts_add_exec_send(char *);
bool opts_add_exec_send_receive(char *);
bool opts_add_stdout(char *);
bool opts_add_stdin(char *);

View File

@@ -1,19 +1,22 @@
#include <stdlib.h>
#include <stdio.h>
#include <assert.h> #include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h> #include <errno.h>
#include <netdb.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "buf.h" #include "buf.h"
#include "flow.h" #include "flow.h"
#include "list.h" #include "log.h"
#include "opts.h"
#include "peer.h" #include "peer.h"
#include "receive.h"
#include "resolve.h" #include "resolve.h"
#include "socket.h" #include "send.h"
#include "send_receive.h"
#include "wakeup.h" #include "wakeup.h"
#include "uuid.h" #include "uuid.h"
@@ -33,29 +36,33 @@ struct outgoing {
}; };
static struct list_head outgoing_head = LIST_HEAD_INIT(outgoing_head); static struct list_head outgoing_head = LIST_HEAD_INIT(outgoing_head);
static opts_group outgoing_opts;
static char log_module = 'O';
static void outgoing_connect_result(struct outgoing *, int); static void outgoing_connect_result(struct outgoing *, int);
static void outgoing_resolve(struct outgoing *); static void outgoing_resolve(struct outgoing *);
static void outgoing_resolve_wrapper(struct peer *); static void outgoing_resolve_wrapper(struct peer *);
static void outgoing_retry(struct outgoing *outgoing) { static void outgoing_retry(struct outgoing *outgoing) {
outgoing->peer.fd = -1;
uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt); uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt);
fprintf(stderr, "O %s: Will retry in %ds\n", outgoing->id, delay / 1000); LOG(outgoing->id, "Will retry in %ds", delay / 1000);
outgoing->peer.event_handler = outgoing_resolve_wrapper; outgoing->peer.event_handler = outgoing_resolve_wrapper;
wakeup_add((struct peer *) outgoing, delay); wakeup_add(&outgoing->peer, delay);
} }
static void outgoing_connect_next(struct outgoing *outgoing) { static void outgoing_connect_next(struct outgoing *outgoing) {
if (outgoing->addr == NULL) { if (outgoing->addr == NULL) {
freeaddrinfo(outgoing->addrs); freeaddrinfo(outgoing->addrs);
fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service); LOG(outgoing->id, "Can't connect to any addresses of %s/%s", outgoing->node, outgoing->service);
outgoing_retry(outgoing); outgoing_retry(outgoing);
return; return;
} }
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "O %s: Connecting to %s/%s...\n", outgoing->id, hbuf, sbuf); LOG(outgoing->id, "Connecting to %s/%s...", hbuf, sbuf);
outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC, outgoing->addr->ai_protocol); outgoing->peer.fd = socket(outgoing->addr->ai_family, outgoing->addr->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC, outgoing->addr->ai_protocol);
assert(outgoing->peer.fd >= 0); assert(outgoing->peer.fd >= 0);
@@ -67,7 +74,7 @@ static void outgoing_connect_next(struct outgoing *outgoing) {
} }
static void outgoing_connect_handler(struct peer *peer) { static void outgoing_connect_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer; struct outgoing *outgoing = container_of(peer, struct outgoing, peer);
peer_epoll_del(&outgoing->peer); peer_epoll_del(&outgoing->peer);
@@ -78,11 +85,8 @@ static void outgoing_connect_handler(struct peer *peer) {
} }
static void outgoing_disconnect_handler(struct peer *peer) { static void outgoing_disconnect_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer; struct outgoing *outgoing = container_of(peer, struct outgoing, peer);
if (outgoing->peer.fd != -1) { LOG(outgoing->id, "Peer disconnected; reconnecting...");
assert(!close(outgoing->peer.fd));
}
fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id);
outgoing_retry(outgoing); outgoing_retry(outgoing);
} }
@@ -91,7 +95,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
switch (result) { switch (result) {
case 0: case 0:
fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); LOG(outgoing->id, "Connected to %s/%s", hbuf, sbuf);
freeaddrinfo(outgoing->addrs); freeaddrinfo(outgoing->addrs);
outgoing->attempt = 0; outgoing->attempt = 0;
int fd = outgoing->peer.fd; int fd = outgoing->peer.fd;
@@ -99,18 +103,17 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
outgoing->peer.event_handler = outgoing_disconnect_handler; outgoing->peer.event_handler = outgoing_disconnect_handler;
flow_socket_ready(fd, outgoing->flow); flow_socket_ready(fd, outgoing->flow);
flow_socket_connected(fd, outgoing->flow); flow_socket_connected(fd, outgoing->flow);
flow_new(fd, outgoing->flow, outgoing->passthrough, (struct peer *) outgoing); flow_new(fd, outgoing->flow, outgoing->passthrough, &outgoing->peer);
break; break;
case EINPROGRESS: case EINPROGRESS:
outgoing->peer.event_handler = outgoing_connect_handler; outgoing->peer.event_handler = outgoing_connect_handler;
peer_epoll_add((struct peer *) outgoing, EPOLLOUT); peer_epoll_add(&outgoing->peer, EPOLLOUT);
break; break;
default: default:
fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result)); LOG(outgoing->id, "Can't connect to %s/%s: %s", hbuf, sbuf, strerror(result));
assert(!close(outgoing->peer.fd)); assert(!close(outgoing->peer.fd));
outgoing->peer.fd = -1;
outgoing->addr = outgoing->addr->ai_next; outgoing->addr = outgoing->addr->ai_next;
// Tail recursion :/ // Tail recursion :/
outgoing_connect_next(outgoing); outgoing_connect_next(outgoing);
@@ -119,10 +122,10 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
} }
static void outgoing_resolve_handler(struct peer *peer) { static void outgoing_resolve_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer; struct outgoing *outgoing = container_of(peer, struct outgoing, peer);
int err = resolve_result(peer, &outgoing->addrs); int err = resolve_result(peer, &outgoing->addrs);
if (err) { if (err) {
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(err)); LOG(outgoing->id, "Failed to resolve %s/%s: %s", outgoing->node, outgoing->service, gai_strerror(err));
outgoing_retry(outgoing); outgoing_retry(outgoing);
} else { } else {
outgoing->addr = outgoing->addrs; outgoing->addr = outgoing->addrs;
@@ -131,26 +134,57 @@ static void outgoing_resolve_handler(struct peer *peer) {
} }
static void outgoing_resolve(struct outgoing *outgoing) { static void outgoing_resolve(struct outgoing *outgoing) {
fprintf(stderr, "O %s: Resolving %s/%s...\n", outgoing->id, outgoing->node, outgoing->service); LOG(outgoing->id, "Resolving %s/%s...", outgoing->node, outgoing->service);
outgoing->peer.event_handler = outgoing_resolve_handler; outgoing->peer.event_handler = outgoing_resolve_handler;
resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0); resolve(&outgoing->peer, outgoing->node, outgoing->service, 0);
} }
static void outgoing_resolve_wrapper(struct peer *peer) { static void outgoing_resolve_wrapper(struct peer *peer) {
outgoing_resolve((struct outgoing *) peer); outgoing_resolve(container_of(peer, struct outgoing, peer));
} }
static void outgoing_del(struct outgoing *outgoing) { static void outgoing_del(struct outgoing *outgoing) {
flow_ref_dec(outgoing->flow); flow_ref_dec(outgoing->flow);
if (outgoing->peer.fd >= 0) { peer_close(&outgoing->peer);
assert(!close(outgoing->peer.fd));
}
list_del(&outgoing->outgoing_list); list_del(&outgoing->outgoing_list);
free(outgoing->node); free(outgoing->node);
free(outgoing->service); free(outgoing->service);
free(outgoing); free(outgoing);
} }
static bool outgoing_add(const char *host_port, struct flow *flow, void *passthrough) {
char *host = opts_split(&host_port, '/');
if (!host) {
return false;
}
outgoing_new(host, host_port, flow, passthrough);
free(host);
return true;
}
static bool outgoing_connect_receive(const char *arg) {
return outgoing_add(arg, receive_flow, NULL);
}
static bool outgoing_connect_send(const char *arg) {
return send_add(outgoing_add, send_flow, arg);
}
static bool outgoing_connect_send_receive(const char *arg) {
return send_add(outgoing_add, send_receive_flow, arg);
}
void outgoing_opts_add() {
opts_add("connect-receive", "HOST/PORT", outgoing_connect_receive, outgoing_opts);
opts_add("connect-send", "FORMAT=HOST/PORT", outgoing_connect_send, outgoing_opts);
opts_add("connect-send-receive", "FORMAT=HOST/PORT", outgoing_connect_send_receive, outgoing_opts);
}
void outgoing_init() {
opts_call(outgoing_opts);
}
void outgoing_cleanup() { void outgoing_cleanup() {
struct outgoing *iter, *next; struct outgoing *iter, *next;
list_for_each_entry_safe(iter, next, &outgoing_head, outgoing_list) { list_for_each_entry_safe(iter, next, &outgoing_head, outgoing_list) {
@@ -158,7 +192,7 @@ void outgoing_cleanup() {
} }
} }
void outgoing_new(char *node, char *service, struct flow *flow, void *passthrough) { void outgoing_new(const char *node, const char *service, struct flow *flow, void *passthrough) {
flow_ref_inc(flow); flow_ref_inc(flow);
struct outgoing *outgoing = malloc(sizeof(*outgoing)); struct outgoing *outgoing = malloc(sizeof(*outgoing));

View File

@@ -2,5 +2,7 @@
struct flow; struct flow;
void outgoing_opts_add(void);
void outgoing_init(void);
void outgoing_cleanup(void); void outgoing_cleanup(void);
void outgoing_new(char *, char *, struct flow *, void *); void outgoing_new(const char *, const char *, struct flow *, void *);

View File

@@ -1,7 +1,6 @@
#pragma GCC diagnostic ignored "-Wtautological-constant-out-of-range-compare" #pragma GCC diagnostic ignored "-Wtautological-constant-out-of-range-compare"
#include <assert.h> #include <assert.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include "uuid.h" #include "uuid.h"

View File

@@ -1,61 +1,61 @@
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h>
#include <signal.h> #include <signal.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/epoll.h> #include <string.h>
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "log.h"
#include "server.h" #include "server.h"
#include "uuid.h"
#include "wakeup.h"
#include "peer.h" #include "peer.h"
static char log_module = 'X';
uint32_t peer_count_in = 0, peer_count_out = 0, peer_count_out_in = 0; uint32_t peer_count_in = 0, peer_count_out = 0, peer_count_out_in = 0;
static int peer_epoll_fd; static int peer_epoll_fd;
static int peer_shutdown_fd; static struct peer peer_shutdown_peer;
static bool peer_shutdown_flag = false; static bool peer_shutdown_flag = false;
static struct list_head peer_always_trigger_head = LIST_HEAD_INIT(peer_always_trigger_head); static struct list_head peer_always_trigger_head = LIST_HEAD_INIT(peer_always_trigger_head);
static void peer_shutdown_handler(struct peer *peer) { static void peer_shutdown() {
fprintf(stderr, "X %s: Shutting down\n", server_id); peer_close(&peer_shutdown_peer);
assert(!close(peer->fd));
free(peer);
peer_shutdown_flag = true; peer_shutdown_flag = true;
} }
static void peer_shutdown_handler(struct peer *peer) {
struct signalfd_siginfo siginfo;
assert(read(peer->fd, &siginfo, sizeof(siginfo)) == sizeof(siginfo));
LOG(server_id, "Received signal %u; shutting down", siginfo.ssi_signo);
peer_shutdown();
}
void peer_init() { void peer_init() {
peer_epoll_fd = epoll_create1(EPOLL_CLOEXEC); peer_epoll_fd = epoll_create1(EPOLL_CLOEXEC);
assert(peer_epoll_fd >= 0); assert(peer_epoll_fd >= 0);
int shutdown_fds[2]; sigset_t sigmask;
assert(!pipe2(shutdown_fds, O_CLOEXEC)); assert(!sigemptyset(&sigmask));
assert(!sigaddset(&sigmask, SIGINT));
assert(!sigaddset(&sigmask, SIGTERM));
peer_shutdown_peer.fd = signalfd(-1, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
assert(peer_shutdown_peer.fd >= 0);
peer_shutdown_peer.event_handler = peer_shutdown_handler;
peer_epoll_add(&peer_shutdown_peer, EPOLLIN);
struct peer *shutdown_peer = malloc(sizeof(*shutdown_peer)); assert(!sigprocmask(SIG_BLOCK, &sigmask, NULL));
assert(shutdown_peer);
shutdown_peer->fd = shutdown_fds[0];
shutdown_peer->event_handler = peer_shutdown_handler;
peer_epoll_add(shutdown_peer, EPOLLRDHUP);
peer_shutdown_fd = shutdown_fds[1];
signal(SIGINT, peer_shutdown);
} }
void peer_cleanup() { void peer_cleanup() {
assert(!close(peer_epoll_fd)); assert(!close(peer_epoll_fd));
} }
void peer_shutdown(int __attribute__((unused)) signal) {
if (peer_shutdown_fd != -1) {
assert(!close(peer_shutdown_fd));
peer_shutdown_fd = -1;
}
}
void peer_epoll_add(struct peer *peer, uint32_t events) { void peer_epoll_add(struct peer *peer, uint32_t events) {
struct epoll_event ev = { struct epoll_event ev = {
.events = events, .events = events,
@@ -87,6 +87,15 @@ void peer_epoll_del(struct peer *peer) {
} }
} }
void peer_close(struct peer *peer) {
if (peer->fd == -1) {
return;
}
peer_epoll_del(peer);
assert(!close(peer->fd));
peer->fd = -1;
}
void peer_call(struct peer *peer) { void peer_call(struct peer *peer) {
if (peer_shutdown_flag || !peer) { if (peer_shutdown_flag || !peer) {
return; return;
@@ -95,18 +104,20 @@ void peer_call(struct peer *peer) {
} }
void peer_loop() { void peer_loop() {
fprintf(stderr, "X %s: Starting event loop\n", server_id); LOG(server_id, "Starting event loop");
while (!peer_shutdown_flag) { while (!peer_shutdown_flag) {
if (!(peer_count_in + peer_count_out_in)) { if (!(peer_count_in + peer_count_out_in)) {
fprintf(stderr, "X %s: No remaining inputs\n", server_id); LOG(server_id, "No remaining inputs");
peer_shutdown(0); peer_shutdown();
break;
} else if (!(peer_count_out + peer_count_out_in)) { } else if (!(peer_count_out + peer_count_out_in)) {
fprintf(stderr, "X %s: No remaining outputs\n", server_id); LOG(server_id, "No remaining outputs");
peer_shutdown(0); peer_shutdown();
break;
} }
#define MAX_EVENTS 10 #define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS]; struct epoll_event events[MAX_EVENTS];
int delay = list_is_empty(&peer_always_trigger_head) ? wakeup_get_delay() : 0; int delay = list_is_empty(&peer_always_trigger_head) ? -1 : 0;
int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, delay); int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, delay);
if (nfds == -1 && errno == EINTR) { if (nfds == -1 && errno == EINTR) {
continue; continue;
@@ -124,7 +135,5 @@ void peer_loop() {
peer_call(iter); peer_call(iter);
} }
} }
wakeup_dispatch();
} }
} }

View File

@@ -18,8 +18,8 @@ extern uint32_t peer_count_in, peer_count_out, peer_count_out_in;
void peer_init(void); void peer_init(void);
void peer_cleanup(void); void peer_cleanup(void);
void peer_shutdown(int signal);
void peer_epoll_add(struct peer *, uint32_t); void peer_epoll_add(struct peer *, uint32_t);
void peer_epoll_del(struct peer *); void peer_epoll_del(struct peer *);
void peer_close(struct peer *);
void peer_call(struct peer *); void peer_call(struct peer *);
void peer_loop(void); void peer_loop(void);

View File

@@ -1,19 +1,17 @@
#pragma GCC diagnostic ignored "-Wcast-qual" #include <stdbool.h>
#pragma GCC diagnostic ignored "-Wpacked"
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include "buf.h" #include "buf.h"
#include "log.h"
#include "packet.h" #include "packet.h"
#include "server.h" #include "server.h"
#include "uuid.h"
#include "adsb.pb-c.h" #include "adsb.pb-c.h"
#include "proto.h" #include "proto.h"
#pragma GCC diagnostic ignored "-Wcast-qual"
#pragma GCC diagnostic ignored "-Wpacked"
#define PROTO_MAGIC "aDsB" #define PROTO_MAGIC "aDsB"
struct proto_parser_state { struct proto_parser_state {
@@ -24,6 +22,8 @@ struct proto_parser_state {
bool have_header; bool have_header;
}; };
static char log_module = 'R'; // borrowing
static Adsb *proto_prev = NULL; static Adsb *proto_prev = NULL;
static struct buf proto_hello_buf = BUF_INIT; static struct buf proto_hello_buf = BUF_INIT;
@@ -145,12 +145,12 @@ static bool proto_parse_header(AdsbHeader *header, struct packet *packet, struct
state->rssi_max = header->rssi_max; state->rssi_max = header->rssi_max;
if (!strcmp(header->server_id, (const char *) server_id)) { if (!strcmp(header->server_id, (const char *) server_id)) {
fprintf(stderr, "R %s: Attempt to receive proto data from our own server ID (%s); loop!\n", packet->source_id, server_id); LOG(packet->source_id, "Attempt to receive proto data from our own server ID (%s); loop!", server_id);
return false; return false;
} }
state->have_header = true; state->have_header = true;
fprintf(stderr, "R %s: Connected to server ID: %s\n", packet->source_id, header->server_id); LOG(packet->source_id, "Connected to server ID: %s", header->server_id);
return true; return true;
} }

View File

@@ -1,7 +1,6 @@
#include <assert.h> #include <assert.h>
#include <fcntl.h> #include <fcntl.h>
#include <limits.h> #include <limits.h>
#include <stdio.h>
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
@@ -16,7 +15,7 @@ static struct buf rand_buf = BUF_INIT;
static int rand_fd; static int rand_fd;
void rand_init() { void rand_init() {
rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC); rand_fd = open("/dev/urandom", O_RDONLY | O_CLOEXEC | O_NOCTTY);
assert(rand_fd >= 0); assert(rand_fd >= 0);
assert(read(rand_fd, buf_at(&rand_buf, 0), BUF_LEN_MAX) == BUF_LEN_MAX); assert(read(rand_fd, buf_at(&rand_buf, 0), BUF_LEN_MAX) == BUF_LEN_MAX);
rand_buf.length = BUF_LEN_MAX; rand_buf.length = BUF_LEN_MAX;

View File

@@ -1,11 +1,8 @@
#include <assert.h> #include <assert.h>
#include <stdio.h>
#include <string.h>
#include "buf.h" #include "buf.h"
#include "hex.h" #include "hex.h"
#include "packet.h" #include "packet.h"
#include "uuid.h"
#include "raw.h" #include "raw.h"

View File

@@ -1,18 +1,16 @@
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/socket.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <sys/types.h>
#include "airspy_adsb.h" #include "airspy_adsb.h"
#include "beast.h" #include "beast.h"
#include "buf.h" #include "buf.h"
#include "flow.h" #include "flow.h"
#include "json.h" #include "json.h"
#include "list.h" #include "log.h"
#include "packet.h" #include "packet.h"
#include "peer.h" #include "peer.h"
#include "proto.h" #include "proto.h"
@@ -39,6 +37,8 @@ struct receive {
}; };
static struct list_head receive_head = LIST_HEAD_INIT(receive_head); static struct list_head receive_head = LIST_HEAD_INIT(receive_head);
static char log_module = 'R';
static void receive_new(int, void *, struct peer *); static void receive_new(int, void *, struct peer *);
static struct flow _receive_flow = { static struct flow _receive_flow = {
@@ -92,7 +92,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac
for (size_t i = 0; i < NUM_PARSERS; i++) { for (size_t i = 0; i < NUM_PARSERS; i++) {
if (parsers[i].parse(buf, packet, state)) { if (parsers[i].parse(buf, packet, state)) {
fprintf(stderr, "R %s: Detected input format %s\n", receive->id, parsers[i].name); LOG(receive->id, "Detected input format: %s", parsers[i].name);
receive->parser_wrapper = receive_parse_wrapper; receive->parser_wrapper = receive_parse_wrapper;
receive->parser = parsers[i].parse; receive->parser = parsers[i].parse;
return true; return true;
@@ -105,17 +105,16 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac
} }
static void receive_del(struct receive *receive) { static void receive_del(struct receive *receive) {
fprintf(stderr, "R %s: Connection closed\n", receive->id); LOG(receive->id, "Connection closed");
peer_count_in--; peer_count_in--;
peer_epoll_del((struct peer *) receive); peer_close(&receive->peer);
assert(!close(receive->peer.fd));
list_del(&receive->receive_list); list_del(&receive->receive_list);
peer_call(receive->on_close); peer_call(receive->on_close);
free(receive); free(receive);
} }
static void receive_read(struct peer *peer) { static void receive_read(struct peer *peer) {
struct receive *receive = (struct receive *) peer; struct receive *receive = container_of(peer, struct receive, peer);
if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { if (buf_fill(&receive->buf, receive->peer.fd) <= 0) {
receive_del(receive); receive_del(receive);
@@ -134,14 +133,14 @@ static void receive_read(struct peer *peer) {
continue; continue;
} }
if (++packet.hops > receive_max_hops) { if (++packet.hops > receive_max_hops) {
fprintf(stderr, "R %s: Packet exceeded hop limit (%u > %u); dropping. You may have a loop in your configuration.\n", receive->id, packet.hops, receive_max_hops); LOG(receive->id, "Packet exceeded hop limit (%u > %u); dropping. You may have a loop in your configuration.", packet.hops, receive_max_hops);
continue; continue;
} }
send_write(&packet); send_write(&packet);
} }
if (receive->buf.length == BUF_LEN_MAX) { if (receive->buf.length == BUF_LEN_MAX) {
fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id); LOG(receive->id, "Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.");
receive_del(receive); receive_del(receive);
return; return;
} }
@@ -164,9 +163,9 @@ static void receive_new(int fd, void __attribute__((unused)) *passthrough, struc
list_add(&receive->receive_list, &receive_head); list_add(&receive->receive_list, &receive_head);
peer_epoll_add((struct peer *) receive, EPOLLIN); peer_epoll_add(&receive->peer, EPOLLIN);
fprintf(stderr, "R %s: New receive connection\n", receive->id); LOG(receive->id, "New receive connection");
} }
void receive_init() { void receive_init() {

View File

@@ -1,5 +1,3 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h> #include <netdb.h>
#include "asyncaddrinfo.h" #include "asyncaddrinfo.h"

View File

@@ -1,7 +1,5 @@
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <signal.h> #include <signal.h>
#include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <strings.h> #include <strings.h>
@@ -15,7 +13,8 @@
#include "buf.h" #include "buf.h"
#include "flow.h" #include "flow.h"
#include "json.h" #include "json.h"
#include "list.h" #include "log.h"
#include "opts.h"
#include "packet.h" #include "packet.h"
#include "peer.h" #include "peer.h"
#include "proto.h" #include "proto.h"
@@ -47,6 +46,8 @@ static struct flow _send_flow = {
}; };
struct flow *send_flow = &_send_flow; struct flow *send_flow = &_send_flow;
static char log_module = 'S';
typedef void (*serialize)(struct packet *, struct buf *); typedef void (*serialize)(struct packet *, struct buf *);
typedef void (*hello)(struct buf **); typedef void (*hello)(struct buf **);
static struct serializer { static struct serializer {
@@ -89,17 +90,16 @@ static struct serializer {
#define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers)) #define NUM_SERIALIZERS (sizeof(serializers) / sizeof(*serializers))
static void send_del(struct send *send) { static void send_del(struct send *send) {
fprintf(stderr, "S %s (%s): Connection closed\n", send->id, send->serializer->name); LOG(send->id, "Connection closed");
peer_count_out--; peer_count_out--;
peer_epoll_del((struct peer *) send); peer_close(&send->peer);
assert(!close(send->peer.fd));
list_del(&send->send_list); list_del(&send->send_list);
peer_call(send->on_close); peer_call(send->on_close);
free(send); free(send);
} }
static void send_del_wrapper(struct peer *peer) { static void send_del_wrapper(struct peer *peer) {
send_del((struct send *) peer); send_del(container_of(peer, struct send, peer));
} }
static void send_new(int fd, void *passthrough, struct peer *on_close) { static void send_new(int fd, void *passthrough, struct peer *on_close) {
@@ -119,9 +119,24 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) {
list_add(&send->send_list, &serializer->send_head); list_add(&send->send_list, &serializer->send_head);
peer_epoll_add((struct peer *) send, 0); peer_epoll_add(&send->peer, 0);
fprintf(stderr, "S %s (%s): New send connection\n", send->id, serializer->name); LOG(send->id, "New send connection: %s", serializer->name);
}
static struct serializer *send_parse_format(const char **arg) {
char *format = opts_split(arg, '=');
if (!format) {
return NULL;
}
struct serializer *serializer = send_get_serializer(format);
free(format);
if (!serializer) {
return NULL;
}
return serializer;
} }
void send_init() { void send_init() {
@@ -140,7 +155,7 @@ void send_cleanup() {
} }
} }
void *send_get_serializer(char *name) { void *send_get_serializer(const char *name) {
for (size_t i = 0; i < NUM_SERIALIZERS; i++) { for (size_t i = 0; i < NUM_SERIALIZERS; i++) {
if (strcasecmp(serializers[i].name, name) == 0) { if (strcasecmp(serializers[i].name, name) == 0) {
return &serializers[i]; return &serializers[i];
@@ -177,8 +192,8 @@ void send_write(struct packet *packet) {
} }
if (write(iter->peer.fd, buf_at(&buf, 0), buf.length) != (ssize_t) buf.length) { if (write(iter->peer.fd, buf_at(&buf, 0), buf.length) != (ssize_t) buf.length) {
// peer_loop() will see this shutdown and call send_del // peer_loop() will see this shutdown and call send_del
int res = shutdown(iter->peer.fd, SHUT_WR); // Ignore error
assert(res == 0 || (res == -1 && errno == ENOTSOCK)); shutdown(iter->peer.fd, SHUT_WR);
} }
} }
} }
@@ -190,3 +205,11 @@ void send_print_usage() {
fprintf(stderr, "\t%s\n", serializers[i].name); fprintf(stderr, "\t%s\n", serializers[i].name);
} }
} }
bool send_add(bool (*next)(const char *, struct flow *, void *), struct flow *flow, const char *arg) {
struct serializer *serializer = send_parse_format(&arg);
if (!serializer) {
return false;
}
return next(arg, flow, serializer);
}

View File

@@ -1,13 +1,16 @@
#pragma once #pragma once
#include <stdbool.h>
struct buf; struct buf;
struct flow; struct flow;
struct packet; struct packet;
void send_init(void); void send_init(void);
void send_cleanup(void); void send_cleanup(void);
void *send_get_serializer(char *); void *send_get_serializer(const char *);
void send_get_hello(struct buf **, void *); void send_get_hello(struct buf **, void *);
void send_write(struct packet *); void send_write(struct packet *);
void send_print_usage(void); void send_print_usage(void);
bool send_add(bool (*)(const char *, struct flow *, void *), struct flow *, const char *);
extern struct flow *send_flow; extern struct flow *send_flow;

View File

@@ -1,10 +1,8 @@
#include <assert.h> #include <assert.h>
#include <fcntl.h> #include <fcntl.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h>
#include "flow.h" #include "flow.h"
#include "list.h"
#include "peer.h" #include "peer.h"
#include "receive.h" #include "receive.h"
#include "send.h" #include "send.h"
@@ -37,7 +35,7 @@ static void send_receive_del(struct send_receive *send_receive) {
} }
static void send_receive_on_close(struct peer *peer) { static void send_receive_on_close(struct peer *peer) {
struct send_receive *send_receive = (struct send_receive *) peer; struct send_receive *send_receive = container_of(peer, struct send_receive, peer);
if (!--(send_receive->ref_count)) { if (!--(send_receive->ref_count)) {
send_receive_del(send_receive); send_receive_del(send_receive);
@@ -54,10 +52,10 @@ static void send_receive_new(int fd, void *passthrough, struct peer *on_close) {
send_receive->ref_count = 2; send_receive->ref_count = 2;
list_add(&send_receive->send_receive_list, &send_receive_head); list_add(&send_receive->send_receive_list, &send_receive_head);
flow_new(fd, send_flow, passthrough, on_close); flow_new(fd, send_flow, passthrough, &send_receive->peer);
int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0); int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0);
assert(fd2 >= 0); assert(fd2 >= 0);
flow_new(fd2, receive_flow, NULL, on_close); flow_new(fd2, receive_flow, NULL, &send_receive->peer);
} }
void send_receive_cleanup() { void send_receive_cleanup() {

View File

@@ -1,13 +1,85 @@
#include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <jansson.h>
#include <protobuf-c/protobuf-c.h>
#include "build.h"
#include "log.h"
#include "opts.h"
#include "uuid.h" #include "uuid.h"
#include "server.h" #include "server.h"
#pragma GCC diagnostic ignored "-Wdate-time"
uint8_t server_id[UUID_LEN]; uint8_t server_id[UUID_LEN];
char server_version[] = "https://github.com/flamingcowtv/adsb-tools#1"; char server_version[] = "https://github.com/flamingcowtv/adsb-tools#1";
static opts_group server_opts1, server_opts2;
static char log_module = 'X';
static bool server_detach(const char __attribute__ ((unused)) *arg) {
LOG(server_id, "Detaching");
int ret = fork();
assert(ret >= 0);
if (ret > 0) {
// We are the parent
exit(EXIT_SUCCESS);
}
LOG(server_id, "Background process ID: %u", getpid());
assert(setsid() >= 0);
return true;
}
static bool server_pid_file(const char *path) {
pid_t pid = getpid();
LOG(server_id, "Writing process ID %d to pid-file: %s", pid, path);
FILE *fh = fopen(path, "w");
assert(fh);
assert(fprintf(fh, "%d\n", pid) > 0);
assert(!fclose(fh));
return true;
}
void server_opts_add() {
opts_add("detach", NULL, server_detach, server_opts1);
opts_add("pid-file", "PATH", server_pid_file, server_opts2);
}
void server_init() { void server_init() {
uuid_gen(server_id); uuid_gen(server_id);
fprintf(stderr, "X %s: Server start\n", server_id); LOG(server_id, "Server start");
LOG(server_id, "Build data:");
LOG(server_id, "\tgit_last_change: %s", GIT_LAST_CHANGE);
LOG(server_id, "\tgit_local_clean: %s", GIT_LOCAL_CLEAN ? "true" : "false");
LOG(server_id, "\tclang_version: %s", __clang_version__);
LOG(server_id, "\tglibc_version: %d.%d", __GLIBC__, __GLIBC_MINOR__);
LOG(server_id, "\tjansson_version: %s", JANSSON_VERSION);
LOG(server_id, "\tprotobuf-c_version: %s", PROTOBUF_C_VERSION);
LOG(server_id, "\tdatetime: %s", __DATE__ " " __TIME__);
LOG(server_id, "\tusername: %s", USERNAME);
LOG(server_id, "\thostname: %s", HOSTNAME);
LOG(server_id, "Runtime data:");
struct utsname utsname;
assert(!uname(&utsname));
LOG(server_id, "\tusername: %s", getlogin());
LOG(server_id, "\thostname: %s", utsname.nodename);
LOG(server_id, "\tprocess_id: %d", getpid());
LOG(server_id, "\tsystem: %s", utsname.sysname);
LOG(server_id, "\trelease: %s", utsname.release);
LOG(server_id, "\tversion: %s", utsname.version);
LOG(server_id, "\tmachine: %s", utsname.machine);
opts_call(server_opts1);
opts_call(server_opts2);
} }

View File

@@ -3,4 +3,5 @@
extern uint8_t server_id[]; extern uint8_t server_id[];
extern char server_version[]; extern char server_version[];
void server_opts_add(void);
void server_init(void); void server_init(void);

View File

@@ -2,8 +2,9 @@
#include <errno.h> #include <errno.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "socket.h" #include "socket.h"

37
adsbus/stdinout.c Normal file
View File

@@ -0,0 +1,37 @@
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include "flow.h"
#include "opts.h"
#include "receive.h"
#include "send.h"
#include "stdinout.h"
static opts_group stdinout_opts;
static bool stdinout_stdin(const char __attribute__((unused)) *arg) {
int fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 0);
assert(fd >= 0);
return flow_new_send_hello(fd, receive_flow, NULL, NULL);
}
static bool stdinout_stdout(const char *arg) {
struct serializer *serializer = send_get_serializer(arg);
if (!serializer) {
return false;
}
int fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 0);
assert(fd >= 0);
return flow_new_send_hello(fd, send_flow, serializer, NULL);
}
void stdinout_opts_add() {
opts_add("stdin", NULL, stdinout_stdin, stdinout_opts);
opts_add("stdout", "FORMAT", stdinout_stdout, stdinout_opts);
}
void stdinout_init() {
opts_call(stdinout_opts);
}

4
adsbus/stdinout.h Normal file
View File

@@ -0,0 +1,4 @@
#pragma once
void stdinout_opts_add(void);
void stdinout_init(void);

View File

@@ -1,5 +1,3 @@
#include <stdint.h>
#include "hex.h" #include "hex.h"
#include "rand.h" #include "rand.h"

View File

@@ -1,91 +1,72 @@
#include <assert.h> #include <assert.h>
#include <fcntl.h>
#include <errno.h>
#include <limits.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <sys/timerfd.h>
#include <stdint.h>
#include <string.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h> #include <unistd.h>
#include "list.h"
#include "peer.h" #include "peer.h"
#include "rand.h" #include "rand.h"
#include "wakeup.h" #include "wakeup.h"
struct wakeup_entry { struct wakeup {
int fd; struct peer peer;
uint64_t absolute_time_ms; struct peer *inner_peer;
struct peer *peer;
struct list_head wakeup_list; struct list_head wakeup_list;
}; };
static struct list_head wakeup_head = LIST_HEAD_INIT(wakeup_head); static struct list_head wakeup_head = LIST_HEAD_INIT(wakeup_head);
static uint64_t wakeup_get_time_ms() { static void wakeup_del(struct wakeup *wakeup) {
struct timespec tp; peer_close(&wakeup->peer);
assert(!clock_gettime(CLOCK_MONOTONIC_COARSE, &tp)); list_del(&wakeup->wakeup_list);
#define MS_PER_S UINT64_C(1000) free(wakeup);
#define NS_PER_MS UINT64_C(1000000) }
assert(tp.tv_sec >= 0);
assert(tp.tv_nsec >= 0); static void wakeup_handler(struct peer *peer) {
return ((uint64_t) tp.tv_sec * MS_PER_S) + ((uint64_t) tp.tv_nsec / NS_PER_MS); struct wakeup *wakeup = container_of(peer, struct wakeup, peer);
uint64_t events;
assert(read(wakeup->peer.fd, &events, sizeof(events)) == sizeof(events));
assert(events == 1);
peer_call(wakeup->inner_peer);
wakeup_del(wakeup);
} }
void wakeup_init() { void wakeup_init() {
} }
void wakeup_cleanup() { void wakeup_cleanup() {
struct wakeup_entry *iter, *next; struct wakeup *iter, *next;
list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) { list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) {
free(iter); wakeup_del(iter);
}
}
int wakeup_get_delay() {
if (list_is_empty(&wakeup_head)) {
return -1;
}
uint64_t now = wakeup_get_time_ms();
struct wakeup_entry *next_to_fire = list_entry(wakeup_head.next, struct wakeup_entry, wakeup_list);
if (next_to_fire->absolute_time_ms > now) {
uint64_t delta = next_to_fire->absolute_time_ms - now;
assert(delta < INT_MAX);
return (int) delta;
} else {
return 0;
}
}
void wakeup_dispatch() {
uint64_t now = wakeup_get_time_ms();
struct wakeup_entry *iter, *next;
list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) {
if (iter->absolute_time_ms > now) {
break;
}
peer_call(iter->peer);
list_del(&iter->wakeup_list);
free(iter);
} }
} }
void wakeup_add(struct peer *peer, uint32_t delay_ms) { void wakeup_add(struct peer *peer, uint32_t delay_ms) {
struct wakeup_entry *entry = malloc(sizeof(*entry)); struct wakeup *wakeup = malloc(sizeof(*wakeup));
entry->absolute_time_ms = wakeup_get_time_ms() + delay_ms; wakeup->inner_peer = peer;
entry->peer = peer; list_add(&wakeup->wakeup_list, &wakeup_head);
struct wakeup_entry *iter, *next; wakeup->peer.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
list_for_each_entry_safe(iter, next, &wakeup_head, wakeup_list) { assert(wakeup->peer.fd >= 0);
if (iter->absolute_time_ms > entry->absolute_time_ms) {
list_add(&entry->wakeup_list, &iter->wakeup_list); #define MS_PER_S UINT64_C(1000)
return; #define NS_PER_MS UINT64_C(1000000)
} const struct itimerspec ts = {
} .it_interval = {
list_add(&entry->wakeup_list, &wakeup_head); .tv_sec = 0,
.tv_nsec = 0,
},
.it_value = {
.tv_sec = delay_ms / MS_PER_S,
.tv_nsec = (delay_ms % MS_PER_S) * NS_PER_MS,
},
};
assert(!timerfd_settime(wakeup->peer.fd, 0, &ts, NULL));
wakeup->peer.event_handler = wakeup_handler;
peer_epoll_add(&wakeup->peer, EPOLLIN);
} }
#define RETRY_MIN_MS 2000 #define RETRY_MIN_MS 2000

View File

@@ -6,7 +6,5 @@ struct peer;
void wakeup_init(void); void wakeup_init(void);
void wakeup_cleanup(void); void wakeup_cleanup(void);
int wakeup_get_delay(void);
void wakeup_dispatch(void);
void wakeup_add(struct peer *, uint32_t); void wakeup_add(struct peer *, uint32_t);
uint32_t wakeup_get_retry_delay_ms(uint32_t); uint32_t wakeup_get_retry_delay_ms(uint32_t);