diff --git a/adsbus/Makefile b/adsbus/Makefile index 98a406d..c8f8619 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -11,7 +11,7 @@ VALGRIND_FLAGS ?= --error-exitcode=1 --trace-children=yes --track-fds=yes --show ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json --stdout=proto --stdout=raw --stdout=stats OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o -OBJ_FLOW = flow.o receive.o send.o +OBJ_FLOW = flow.o receive.o send.o send_receive.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o diff --git a/adsbus/adsbus.c b/adsbus/adsbus.c index ac9ebab..a54742d 100644 --- a/adsbus/adsbus.c +++ b/adsbus/adsbus.c @@ -19,6 +19,7 @@ #include "receive.h" #include "resolve.h" #include "send.h" +#include "send_receive.h" #include "server.h" #include "stats.h" #include "wakeup.h" @@ -32,13 +33,18 @@ static void print_usage(const char *name) { "\t--help\n" "\t--connect-receive=HOST/PORT\n" "\t--connect-send=FORMAT=HOST/PORT\n" + "\t--connect-send-receive=FORMAT=HOST/PORT\n" "\t--listen-receive=[HOST/]PORT\n" "\t--listen-send=FORMAT=[HOST/]PORT\n" + "\t--listen-send-receive=FORMAT=[HOST/]PORT\n" "\t--file-read=PATH\n" "\t--file-write=FORMAT=PATH\n" + "\t--file-write-read=FORMAT=PATH\n" "\t--file-append=FORMAT=PATH\n" + "\t--file-append-read=FORMAT=PATH\n" "\t--exec-receive=COMMAND\n" "\t--exec-send=FORMAT=COMMAND\n" + "\t--exec-send-receive=FORMAT=COMMAND\n" "\t--stdin\n" "\t--stdout=FORMAT\n" , name); @@ -48,19 +54,24 @@ static void print_usage(const char *name) { static bool parse_opts(int argc, char *argv[]) { static struct option long_options[] = { - {"connect-receive", required_argument, 0, 'c'}, - {"connect-send", required_argument, 0, 's'}, - {"listen-receive", required_argument, 0, 'l'}, - {"listen-send", required_argument, 0, 'm'}, - {"file-read", required_argument, 0, 'r'}, - {"file-write", required_argument, 0, 'w'}, - {"file-append", required_argument, 0, 'a'}, - {"exec-receive", required_argument, 0, 'e'}, - {"exec-send", required_argument, 0, 'f'}, - {"stdin", no_argument, 0, 'i'}, - {"stdout", required_argument, 0, 'o'}, - {"help", no_argument, 0, 'h'}, - {0, 0, 0, 0 }, + {"connect-receive", required_argument, 0, 'c'}, + {"connect-send", required_argument, 0, 's'}, + {"connect-send-receive", required_argument, 0, 't'}, + {"listen-receive", required_argument, 0, 'l'}, + {"listen-send", required_argument, 0, 'm'}, + {"listen-send-receive", required_argument, 0, 'n'}, + {"file-read", required_argument, 0, 'r'}, + {"file-write", required_argument, 0, 'w'}, + {"file-write-read", required_argument, 0, 'x'}, + {"file-append", required_argument, 0, 'a'}, + {"file-append-read", required_argument, 0, 'b'}, + {"exec-receive", required_argument, 0, 'e'}, + {"exec-send", required_argument, 0, 'f'}, + {"exec-send-receive", required_argument, 0, 'g'}, + {"stdin", no_argument, 0, 'i'}, + {"stdout", required_argument, 0, 'o'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0 }, }; int opt; @@ -75,6 +86,10 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_connect_send; break; + case 't': + handler = opts_add_connect_send_receive; + break; + case 'l': handler = opts_add_listen_receive; break; @@ -83,6 +98,10 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_listen_send; break; + case 'n': + handler = opts_add_listen_send_receive; + break; + case 'r': handler = opts_add_file_read; break; @@ -91,10 +110,18 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_file_write; break; + case 'x': + handler = opts_add_file_write_read; + break; + case 'a': handler = opts_add_file_append; break; + case 'b': + handler = opts_add_file_append_read; + break; + case 'e': handler = opts_add_exec_receive; break; @@ -103,6 +130,10 @@ static bool parse_opts(int argc, char *argv[]) { handler = opts_add_exec_send; break; + case 'g': + handler = opts_add_exec_send_receive; + break; + case 'i': handler = opts_add_stdin; break; @@ -163,6 +194,7 @@ int main(int argc, char *argv[]) { receive_cleanup(); send_cleanup(); + send_receive_cleanup(); incoming_cleanup(); outgoing_cleanup(); exec_cleanup(); diff --git a/adsbus/opts.c b/adsbus/opts.c index d149b4f..b577a46 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -13,6 +13,7 @@ #include "outgoing.h" #include "receive.h" #include "send.h" +#include "send_receive.h" #include "opts.h" @@ -26,7 +27,7 @@ static char *opts_split(char **arg, char delim) { return ret; } -static void opts_add_listen(char *host_port, struct flow *flow, void *passthrough) { +static bool opts_add_listen(char *host_port, struct flow *flow, void *passthrough) { char *host = opts_split(&host_port, '/'); if (host) { incoming_new(host, host_port, flow, passthrough); @@ -34,6 +35,33 @@ static void opts_add_listen(char *host_port, struct flow *flow, void *passthroug } else { incoming_new(NULL, host_port, flow, passthrough); } + return true; +} + +static bool opts_add_connect(char *host_port, struct flow *flow, void *passthrough) { + char *host = opts_split(&host_port, '/'); + if (!host) { + return false; + } + + outgoing_new(host, host_port, flow, passthrough); + free(host); + return true; +} + +static bool opts_add_file_write_int(char *path, struct flow *flow, void *passthrough) { + file_write_new(path, flow, passthrough); + return true; +} + +static bool opts_add_file_append_int(char *path, struct flow *flow, void *passthrough) { + file_append_new(path, flow, passthrough); + return true; +} + +static bool opts_add_exec(char *cmd, struct flow *flow, void *passthrough) { + exec_new(cmd, flow, passthrough); + return true; } static struct serializer *opts_get_serializer(char **arg) { @@ -51,46 +79,36 @@ static struct serializer *opts_get_serializer(char **arg) { return serializer; } -bool opts_add_connect_receive(char *arg) { - char *host = opts_split(&arg, '/'); - if (!host) { +static bool opts_add_send(bool (*next)(char *, struct flow *, void *), struct flow *flow, char *arg) { + struct serializer *serializer = opts_get_serializer(&arg); + if (!serializer) { return false; } + return next(arg, flow, serializer); +} - outgoing_new(host, arg, receive_flow, NULL); - free(host); - return true; +bool opts_add_connect_receive(char *arg) { + return opts_add_connect(arg, receive_flow, NULL); } bool opts_add_connect_send(char *arg) { - struct serializer *serializer = opts_get_serializer(&arg); - if (!serializer) { - return false; - } + return opts_add_send(opts_add_connect, send_flow, arg); +} - char *host = opts_split(&arg, '/'); - if (!host) { - return false; - } - - outgoing_new(host, arg, send_flow, serializer); - free(host); - return true; +bool opts_add_connect_send_receive(char *arg) { + return opts_add_send(opts_add_connect, send_receive_flow, arg); } bool opts_add_listen_receive(char *arg) { - opts_add_listen(arg, receive_flow, NULL); - return true; + return opts_add_listen(arg, receive_flow, NULL); } bool opts_add_listen_send(char *arg) { - struct serializer *serializer = opts_get_serializer(&arg); - if (!serializer) { - return false; - } + return opts_add_send(opts_add_listen, send_flow, arg); +} - opts_add_listen(arg, send_flow, serializer); - return true; +bool opts_add_listen_send_receive(char *arg) { + return opts_add_send(opts_add_listen, send_receive_flow, arg); } bool opts_add_file_read(char *arg) { @@ -99,23 +117,19 @@ bool opts_add_file_read(char *arg) { } bool opts_add_file_write(char *arg) { - struct serializer *serializer = opts_get_serializer(&arg); - if (!serializer) { - return false; - } + return opts_add_send(opts_add_file_write_int, send_flow, arg); +} - file_write_new(arg, send_flow, serializer); - return true; +bool opts_add_file_write_read(char *arg) { + return opts_add_send(opts_add_file_write_int, send_receive_flow, arg); } bool opts_add_file_append(char *arg) { - struct serializer *serializer = opts_get_serializer(&arg); - if (!serializer) { - return false; - } + return opts_add_send(opts_add_file_append_int, send_flow, arg); +} - file_append_new(arg, send_flow, serializer); - return true; +bool opts_add_file_append_read(char *arg) { + return opts_add_send(opts_add_file_append_int, send_receive_flow, arg); } bool opts_add_exec_receive(char *arg) { @@ -124,13 +138,11 @@ bool opts_add_exec_receive(char *arg) { } bool opts_add_exec_send(char *arg) { - struct serializer *serializer = opts_get_serializer(&arg); - if (!serializer) { - return false; - } + return opts_add_send(opts_add_exec, send_flow, arg); +} - exec_new(arg, send_flow, serializer); - return true; +bool opts_add_exec_send_receive(char *arg) { + return opts_add_send(opts_add_exec, send_receive_flow, arg); } bool opts_add_stdin(char __attribute__((unused)) *arg) { diff --git a/adsbus/opts.h b/adsbus/opts.h index e6a9013..3c03ce8 100644 --- a/adsbus/opts.h +++ b/adsbus/opts.h @@ -4,12 +4,17 @@ bool opts_add_connect_receive(char *); bool opts_add_connect_send(char *); +bool opts_add_connect_send_receive(char *); bool opts_add_listen_receive(char *); bool opts_add_listen_send(char *); +bool opts_add_listen_send_receive(char *); bool opts_add_file_read(char *); bool opts_add_file_write(char *); +bool opts_add_file_write_read(char *); bool opts_add_file_append(char *); +bool opts_add_file_append_read(char *); bool opts_add_exec_receive(char *); bool opts_add_exec_send(char *); +bool opts_add_exec_send_receive(char *); bool opts_add_stdout(char *); bool opts_add_stdin(char *); diff --git a/adsbus/packet.h b/adsbus/packet.h index 66f50f5..37d5df9 100644 --- a/adsbus/packet.h +++ b/adsbus/packet.h @@ -3,9 +3,12 @@ #include #include +struct stat; + #define PACKET_DATA_LEN_MAX 14 struct packet { const uint8_t *source_id; + struct stat *input_stat; enum packet_type { PACKET_TYPE_NONE, PACKET_TYPE_MODE_AC, diff --git a/adsbus/receive.c b/adsbus/receive.c index 4eb80dc..fca11e3 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "airspy_adsb.h" @@ -27,6 +28,7 @@ typedef bool (*parser_wrapper)(struct receive *, struct packet *); typedef bool (*parser)(struct buf *, struct packet *, void *state); struct receive { struct peer peer; + struct stat stat; struct peer *on_close; uint8_t id[UUID_LEN]; struct buf buf; @@ -114,6 +116,7 @@ static void receive_read(struct peer *peer) { while (receive->buf.length) { struct packet packet = { .source_id = receive->id, + .input_stat = &receive->stat, }; if (!receive->parser_wrapper(receive, &packet)) { break; @@ -144,6 +147,7 @@ static void receive_new(int fd, void __attribute__((unused)) *passthrough, struc buf_init(&receive->buf); memset(receive->parser_state, 0, PARSER_STATE_LEN); receive->parser_wrapper = receive_autodetect_parse; + assert(!fstat(fd, &receive->stat)); list_add(&receive->receive_list, &receive_head); diff --git a/adsbus/send.c b/adsbus/send.c index 2dc6ce4..5522442 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -27,6 +28,7 @@ struct send { struct peer peer; + struct stat stat; struct peer *on_close; uint8_t id[UUID_LEN]; struct serializer *serializer; @@ -113,6 +115,8 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) { send->on_close = on_close; uuid_gen(send->id); send->serializer = serializer; + assert(!fstat(fd, &send->stat)); + list_add(&send->send_list, &serializer->send_head); peer_epoll_add((struct peer *) send, 0); @@ -166,6 +170,11 @@ void send_write(struct packet *packet) { } struct send *iter, *next; list_for_each_entry_safe(iter, next, &serializer->send_head, send_list) { + if (iter->stat.st_dev == packet->input_stat->st_dev && + iter->stat.st_ino == packet->input_stat->st_ino) { + // Same socket that this packet came from + continue; + } if (write(iter->peer.fd, buf_at(&buf, 0), buf.length) != (ssize_t) buf.length) { // peer_loop() will see this shutdown and call send_del int res = shutdown(iter->peer.fd, SHUT_WR); diff --git a/adsbus/send_receive.c b/adsbus/send_receive.c new file mode 100644 index 0000000..0285cf2 --- /dev/null +++ b/adsbus/send_receive.c @@ -0,0 +1,68 @@ +#include +#include +#include +#include + +#include "flow.h" +#include "list.h" +#include "peer.h" +#include "receive.h" +#include "send.h" + +#include "send_receive.h" + +struct send_receive { + struct peer peer; + struct peer *on_close; + uint8_t ref_count; + struct list_head send_receive_list; +}; + +static struct list_head send_receive_head = LIST_HEAD_INIT(send_receive_head); + +static void send_receive_new(int, void *, struct peer *); + +static struct flow _send_receive_flow = { + .name = "send_receive", + .new = send_receive_new, + .get_hello = send_get_hello, + .ref_count = &peer_count_out_in, +}; +struct flow *send_receive_flow = &_send_receive_flow; + +static void send_receive_del(struct send_receive *send_receive) { + list_del(&send_receive->send_receive_list); + peer_call(send_receive->on_close); + free(send_receive); +} + +static void send_receive_on_close(struct peer *peer) { + struct send_receive *send_receive = (struct send_receive *) peer; + + if (!--(send_receive->ref_count)) { + send_receive_del(send_receive); + } +} + +static void send_receive_new(int fd, void *passthrough, struct peer *on_close) { + struct send_receive *send_receive = malloc(sizeof(*send_receive)); + assert(send_receive); + + send_receive->peer.fd = -1; + send_receive->peer.event_handler = send_receive_on_close; + send_receive->on_close = on_close; + send_receive->ref_count = 2; + list_add(&send_receive->send_receive_list, &send_receive_head); + + flow_new(fd, send_flow, passthrough, on_close); + int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0); + assert(fd2 >= 0); + flow_new(fd2, receive_flow, NULL, on_close); +} + +void send_receive_cleanup() { + struct send_receive *iter, *next; + list_for_each_entry_safe(iter, next, &send_receive_head, send_receive_list) { + send_receive_del(iter); + } +} diff --git a/adsbus/send_receive.h b/adsbus/send_receive.h new file mode 100644 index 0000000..0f5583b --- /dev/null +++ b/adsbus/send_receive.h @@ -0,0 +1,4 @@ +#pragma once + +void send_receive_cleanup(void); +extern struct flow *send_receive_flow;