diff --git a/adsbus/exec.c b/adsbus/exec.c index 9a3cceb..93770aa 100644 --- a/adsbus/exec.c +++ b/adsbus/exec.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "uuid.h" @@ -20,10 +21,8 @@ struct exec { struct peer peer; uint8_t id[UUID_LEN]; char *command; - exec_connection_handler handler; - exec_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; pid_t child; struct list_head exec_list; }; @@ -33,7 +32,7 @@ static struct list_head exec_head = LIST_HEAD_INIT(exec_head); static void exec_spawn_wrapper(struct peer *); static void exec_del(struct exec *exec) { - (*exec->count)--; + (*exec->flow->ref_count)--; if (exec->child > 0) { fprintf(stderr, "E %s: Sending SIGTERM to child process %d\n", exec->id, exec->child); // Racy with the process terminating, so don't assert on it @@ -62,11 +61,11 @@ static void exec_close_handler(struct peer *peer) { } static bool exec_hello(int fd, struct exec *exec) { - if (!exec->hello) { + if (!exec->flow->get_hello) { return true; } struct buf buf = BUF_INIT, *buf_ptr = &buf; - exec->hello(&buf_ptr, exec->passthrough); + exec->flow->get_hello(&buf_ptr, exec->passthrough); if (!buf_ptr->length) { return true; } @@ -84,7 +83,7 @@ static void exec_parent(struct exec *exec, pid_t child, int fd) { } exec->peer.event_handler = exec_close_handler; - exec->handler(fd, exec->passthrough, (struct peer *) exec); + exec->flow->new(fd, exec->passthrough, (struct peer *) exec); } static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { @@ -132,17 +131,15 @@ void exec_cleanup() { } } -void exec_new(char *command, exec_connection_handler handler, exec_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void exec_new(char *command, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct exec *exec = malloc(sizeof(*exec)); exec->peer.fd = -1; uuid_gen(exec->id); exec->command = strdup(command); - exec->handler = handler; - exec->hello = hello; + exec->flow = flow; exec->passthrough = passthrough; - exec->count = count; list_add(&exec->exec_list, &exec_head); diff --git a/adsbus/exec.h b/adsbus/exec.h index 2429f17..0c8c525 100644 --- a/adsbus/exec.h +++ b/adsbus/exec.h @@ -1,11 +1,6 @@ #pragma once -#include - -struct buf; -struct peer; +struct flow; void exec_cleanup(void); -typedef void (*exec_connection_handler)(int fd, void *, struct peer *); -typedef void (*exec_get_hello)(struct buf **, void *); -void exec_new(char *, exec_connection_handler, exec_get_hello, void *, uint32_t *); +void exec_new(char *, struct flow *, void *); diff --git a/adsbus/flow.h b/adsbus/flow.h new file mode 100644 index 0000000..e7ff369 --- /dev/null +++ b/adsbus/flow.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +struct buf; +struct peer; + +typedef void (*flow_bound_socket_init)(int); +typedef void (*flow_new)(int, void *, struct peer *); +typedef void (*flow_get_hello)(struct buf **, void *); +struct flow { + const char *name; + flow_bound_socket_init bound_socket_init; + flow_new new; + flow_get_hello get_hello; + uint32_t *ref_count; +}; diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 48dcbfd..74603b9 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "resolve.h" @@ -26,10 +27,8 @@ struct incoming { struct addrinfo *addrs; const char *error; uint32_t attempt; - incoming_connection_handler handler; - incoming_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; struct list_head incoming_list; }; @@ -45,11 +44,11 @@ static void incoming_retry(struct incoming *incoming) { } static bool incoming_hello(int fd, struct incoming *incoming) { - if (!incoming->hello) { + if (!incoming->flow->get_hello) { return true; } struct buf buf = BUF_INIT, *buf_ptr = &buf; - incoming->hello(&buf_ptr, incoming->passthrough); + incoming->flow->get_hello(&buf_ptr, incoming->passthrough); if (!buf_ptr->length) { return true; } @@ -85,11 +84,11 @@ static void incoming_handler(struct peer *peer) { return; } - incoming->handler(fd, incoming->passthrough, NULL); + incoming->flow->new(fd, incoming->passthrough, NULL); } static void incoming_del(struct incoming *incoming) { - (*incoming->count)--; + (*incoming->flow->ref_count)--; if (incoming->peer.fd >= 0) { assert(!close(incoming->peer.fd)); } @@ -163,8 +162,8 @@ void incoming_cleanup() { } } -void incoming_new(char *node, char *service, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void incoming_new(char *node, char *service, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct incoming *incoming = malloc(sizeof(*incoming)); incoming->peer.event_handler = incoming_handler; @@ -172,10 +171,8 @@ void incoming_new(char *node, char *service, incoming_connection_handler handler incoming->node = node ? strdup(node) : NULL; incoming->service = strdup(service); incoming->attempt = 0; - incoming->handler = handler; - incoming->hello = hello; + incoming->flow = flow; incoming->passthrough = passthrough; - incoming->count = count; list_add(&incoming->incoming_list, &incoming_head); diff --git a/adsbus/incoming.h b/adsbus/incoming.h index a6978ca..4691c1d 100644 --- a/adsbus/incoming.h +++ b/adsbus/incoming.h @@ -1,11 +1,6 @@ #pragma once -#include - -struct buf; -struct peer; +struct flow; void incoming_cleanup(void); -typedef void (*incoming_connection_handler)(int fd, void *, struct peer *); -typedef void (*incoming_get_hello)(struct buf **, void *); -void incoming_new(char *, char *, incoming_connection_handler, incoming_get_hello, void *, uint32_t *); +void incoming_new(char *, char *, struct flow *, void *); diff --git a/adsbus/opts.c b/adsbus/opts.c index f3b4475..ad90250 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -6,11 +6,10 @@ #include #include -#include "buf.h" #include "exec.h" +#include "flow.h" #include "incoming.h" #include "outgoing.h" -#include "peer.h" #include "receive.h" #include "send.h" @@ -26,13 +25,13 @@ static char *opts_split(char **arg, char delim) { return ret; } -static void opts_add_listen(char *host_port, incoming_connection_handler handler, incoming_get_hello hello, void *passthrough, uint32_t *count) { +static void opts_add_listen(char *host_port, struct flow *flow, void *passthrough) { char *host = opts_split(&host_port, '/'); if (host) { - incoming_new(host, host_port, handler, hello, passthrough, count); + incoming_new(host, host_port, flow, passthrough); free(host); } else { - incoming_new(NULL, host_port, handler, hello, passthrough, count); + incoming_new(NULL, host_port, flow, passthrough); } } @@ -57,7 +56,7 @@ bool opts_add_connect_receive(char *arg) { return false; } - outgoing_new(host, arg, receive_new, NULL, NULL, &peer_count_in); + outgoing_new(host, arg, receive_flow, NULL); free(host); return true; } @@ -73,13 +72,13 @@ bool opts_add_connect_send(char *arg) { return false; } - outgoing_new(host, arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + outgoing_new(host, arg, send_flow, serializer); free(host); return true; } bool opts_add_listen_receive(char *arg) { - opts_add_listen(arg, receive_new, NULL, NULL, &peer_count_in); + opts_add_listen(arg, receive_flow, NULL); return true; } @@ -89,7 +88,7 @@ bool opts_add_listen_send(char *arg) { return false; } - opts_add_listen(arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + opts_add_listen(arg, send_flow, serializer); return true; } @@ -129,7 +128,7 @@ bool opts_add_file_append(char *arg) { } bool opts_add_exec_receive(char *arg) { - exec_new(arg, receive_new, NULL, NULL, &peer_count_in); + exec_new(arg, receive_flow, NULL); return true; } @@ -139,7 +138,7 @@ bool opts_add_exec_send(char *arg) { return NULL; } - exec_new(arg, send_new_wrapper, send_hello, serializer, &peer_count_out); + exec_new(arg, send_flow, serializer); return true; } diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 928d80d..23bdeb1 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -9,6 +9,7 @@ #include #include "buf.h" +#include "flow.h" #include "list.h" #include "peer.h" #include "resolve.h" @@ -27,10 +28,8 @@ struct outgoing { struct addrinfo *addr; const char *error; uint32_t attempt; - outgoing_connection_handler handler; - outgoing_get_hello hello; + struct flow *flow; void *passthrough; - uint32_t *count; struct list_head outgoing_list; }; @@ -63,8 +62,8 @@ static void outgoing_connect_next(struct outgoing *outgoing) { assert(outgoing->peer.fd >= 0); struct buf buf = BUF_INIT, *buf_ptr = &buf; - if (outgoing->hello) { - outgoing->hello(&buf_ptr, outgoing->passthrough); + if (outgoing->flow->get_hello) { + outgoing->flow->get_hello(&buf_ptr, outgoing->passthrough); } ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno); @@ -102,7 +101,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { int fd = outgoing->peer.fd; outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_disconnect_handler; - outgoing->handler(fd, outgoing->passthrough, (struct peer *) outgoing); + outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing); break; case EINPROGRESS: @@ -144,7 +143,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) { } static void outgoing_del(struct outgoing *outgoing) { - (*outgoing->count)--; + (*outgoing->flow->ref_count)--; if (outgoing->peer.fd >= 0) { assert(!close(outgoing->peer.fd)); } @@ -160,18 +159,16 @@ void outgoing_cleanup() { } } -void outgoing_new(char *node, char *service, outgoing_connection_handler handler, outgoing_get_hello hello, void *passthrough, uint32_t *count) { - (*count)++; +void outgoing_new(char *node, char *service, struct flow *flow, void *passthrough) { + (*flow->ref_count)++; struct outgoing *outgoing = malloc(sizeof(*outgoing)); uuid_gen(outgoing->id); outgoing->node = strdup(node); outgoing->service = strdup(service); outgoing->attempt = 0; - outgoing->handler = handler; - outgoing->hello = hello; + outgoing->flow = flow; outgoing->passthrough = passthrough; - outgoing->count = count; list_add(&outgoing->outgoing_list, &outgoing_head); diff --git a/adsbus/outgoing.h b/adsbus/outgoing.h index a669b94..c39fdc5 100644 --- a/adsbus/outgoing.h +++ b/adsbus/outgoing.h @@ -1,9 +1,6 @@ #pragma once -struct buf; -struct peer; +struct flow; void outgoing_cleanup(void); -typedef void (*outgoing_connection_handler)(int fd, void *, struct peer *); -typedef void (*outgoing_get_hello)(struct buf **, void *); -void outgoing_new(char *, char *, outgoing_connection_handler, outgoing_get_hello, void *, uint32_t *); +void outgoing_new(char *, char *, struct flow *, void *); diff --git a/adsbus/receive.c b/adsbus/receive.c index bb463f5..67126bf 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -9,6 +9,7 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "flow.h" #include "json.h" #include "packet.h" #include "peer.h" @@ -36,6 +37,13 @@ struct receive { }; static struct receive *receive_head = NULL; +static struct flow _receive_flow = { + .name = "receive", + .new = receive_new, + .ref_count = &peer_count_in, +}; +struct flow *receive_flow = &_receive_flow; + static struct parser { char *name; parser parse; diff --git a/adsbus/receive.h b/adsbus/receive.h index 1c7bd92..a600a48 100644 --- a/adsbus/receive.h +++ b/adsbus/receive.h @@ -1,11 +1,11 @@ #pragma once -#include - #define PARSER_STATE_LEN 256 +struct flow; struct peer; void receive_cleanup(void); void receive_new(int, void *, struct peer *); void receive_print_usage(void); +extern struct flow *receive_flow; diff --git a/adsbus/send.c b/adsbus/send.c index b93dd2a..2db9b23 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -12,6 +12,7 @@ #include "airspy_adsb.h" #include "beast.h" #include "buf.h" +#include "flow.h" #include "json.h" #include "list.h" #include "packet.h" @@ -32,6 +33,14 @@ struct send { struct list_head send_list; }; +static struct flow _send_flow = { + .name = "send", + .new = send_new_wrapper, + .get_hello = send_hello, + .ref_count = &peer_count_out, +}; +struct flow *send_flow = &_send_flow; + typedef void (*serialize)(struct packet *, struct buf *); typedef void (*hello)(struct buf **); static struct serializer { diff --git a/adsbus/send.h b/adsbus/send.h index a50fa3f..8375d48 100644 --- a/adsbus/send.h +++ b/adsbus/send.h @@ -1,6 +1,9 @@ #pragma once +#include + struct buf; +struct flow; struct packet; struct peer; @@ -13,3 +16,4 @@ bool send_new_hello(int, struct serializer *, struct peer *); void send_hello(struct buf **, void *); void send_write(struct packet *); void send_print_usage(void); +extern struct flow *send_flow;