From 2f6db632b2ef023447e175231aee50b68218a40b Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 2 Mar 2016 22:25:56 -0800 Subject: [PATCH] Get everyone out of flow's business. --- adsbus/exec.c | 7 ++----- adsbus/file.c | 5 ++--- adsbus/flow.c | 36 +++++++++++++++++++++++------------- adsbus/flow.h | 5 +++-- adsbus/incoming.c | 5 +---- adsbus/opts.c | 4 ++-- adsbus/outgoing.c | 6 ++---- 7 files changed, 35 insertions(+), 33 deletions(-) diff --git a/adsbus/exec.c b/adsbus/exec.c index 21fff8e..e072c74 100644 --- a/adsbus/exec.c +++ b/adsbus/exec.c @@ -66,14 +66,11 @@ static void exec_parent(struct exec *exec, pid_t child, int fd) { exec->child = child; fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child); - if (!flow_hello(fd, exec->flow, exec->passthrough)) { - assert(!close(fd)); + exec->peer.event_handler = exec_close_handler; + if (!flow_new_send_hello(fd, exec->flow, exec->passthrough, (struct peer *) exec)) { exec_close_handler((struct peer *) exec); return; } - - exec->peer.event_handler = exec_close_handler; - exec->flow->new(fd, exec->passthrough, (struct peer *) exec); } static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) { diff --git a/adsbus/file.c b/adsbus/file.c index 7df7460..2d09009 100644 --- a/adsbus/file.c +++ b/adsbus/file.c @@ -94,13 +94,12 @@ static void file_open(struct file *file) { file->retry = file_should_retry(fd, file); file->peer.event_handler = file_handle_close; - if (!flow_hello(fd, file->flow, file->passthrough)) { + file->attempt = 0; + if (!flow_new_send_hello(fd, file->flow, file->passthrough, (struct peer *) file)) { fprintf(stderr, "F %s: Error writing greeting\n", file->id); file_retry(file); return; } - file->attempt = 0; - file->flow->new(fd, file->passthrough, (struct peer *) file); } static void file_open_wrapper(struct peer *peer) { diff --git a/adsbus/flow.c b/adsbus/flow.c index 9cc4b17..9bebc33 100644 --- a/adsbus/flow.c +++ b/adsbus/flow.c @@ -1,3 +1,4 @@ +#include #include #include "buf.h" @@ -5,6 +6,15 @@ #include "flow.h" +static bool flow_send_hello(int fd, struct flow *flow, void *passthrough) { + struct buf buf = BUF_INIT, *buf_ptr = &buf; + flow_get_hello(flow, &buf_ptr, passthrough); + if (!buf_ptr->length) { + return true; + } + return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); +} + void flow_socket_ready(int fd, struct flow *flow) { socket_ready(fd); if (flow->socket_ready) { @@ -18,26 +28,26 @@ void flow_socket_connected(int fd, struct flow *flow) { } } -bool flow_hello(int fd, struct flow *flow, void *passthrough) { - if (!flow->get_hello) { - return true; - } - struct buf buf = BUF_INIT, *buf_ptr = &buf; - flow->get_hello(&buf_ptr, passthrough); - if (!buf_ptr->length) { - return true; - } - return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length); +void flow_new(int fd, struct flow *flow, void *passthrough, struct peer *on_close) { + flow->new(fd, passthrough, on_close); } -bool flow_new(int fd, struct flow *flow, void *passthrough) { - if (!flow_hello(fd, flow, passthrough)) { +bool flow_new_send_hello(int fd, struct flow *flow, void *passthrough, struct peer *on_close) { + if (!flow_send_hello(fd, flow, passthrough)) { + assert(!close(fd)); return false; } - flow->new(fd, passthrough, NULL); + flow_new(fd, flow, passthrough, on_close); return true; } +void flow_get_hello(struct flow *flow, struct buf **buf_ptr, void *passthrough) { + if (!flow->get_hello) { + return; + } + flow->get_hello(buf_ptr, passthrough); +} + void flow_ref_inc(struct flow *flow) { (*flow->ref_count)++; } diff --git a/adsbus/flow.h b/adsbus/flow.h index c3bfc1d..301c8ec 100644 --- a/adsbus/flow.h +++ b/adsbus/flow.h @@ -17,7 +17,8 @@ struct flow { void flow_socket_ready(int, struct flow *); void flow_socket_connected(int, struct flow *); -bool flow_hello(int, struct flow *, void *); -bool flow_new(int, struct flow *, void *); +void flow_new(int, struct flow *, void *, struct peer *); +bool flow_new_send_hello(int, struct flow *, void *, struct peer *); +void flow_get_hello(struct flow *, struct buf **, void *); void flow_ref_inc(struct flow *); void flow_ref_dec(struct flow *); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 344710b..daced14 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -66,13 +66,10 @@ static void incoming_handler(struct peer *peer) { flow_socket_connected(fd, incoming->flow); - if (!flow_hello(fd, incoming->flow, incoming->passthrough)) { + if (!flow_new_send_hello(fd, incoming->flow, incoming->passthrough, NULL)) { fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); - assert(!close(fd)); return; } - - incoming->flow->new(fd, incoming->passthrough, NULL); } static void incoming_del(struct incoming *incoming) { diff --git a/adsbus/opts.c b/adsbus/opts.c index e29ec7f..d149b4f 100644 --- a/adsbus/opts.c +++ b/adsbus/opts.c @@ -136,7 +136,7 @@ bool opts_add_exec_send(char *arg) { bool opts_add_stdin(char __attribute__((unused)) *arg) { int fd = fcntl(0, F_DUPFD_CLOEXEC, 0); assert(fd >= 0); - return flow_new(fd, receive_flow, NULL); + return flow_new_send_hello(fd, receive_flow, NULL, NULL); } bool opts_add_stdout(char *arg) { @@ -146,5 +146,5 @@ bool opts_add_stdout(char *arg) { } int fd = fcntl(1, F_DUPFD_CLOEXEC, 0); assert(fd >= 0); - return flow_new(fd, send_flow, serializer); + return flow_new_send_hello(fd, send_flow, serializer, NULL); } diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index b0d141b..6ccf25b 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -61,9 +61,7 @@ static void outgoing_connect_next(struct outgoing *outgoing) { assert(outgoing->peer.fd >= 0); struct buf buf = BUF_INIT, *buf_ptr = &buf; - if (outgoing->flow->get_hello) { - outgoing->flow->get_hello(&buf_ptr, outgoing->passthrough); - } + flow_get_hello(outgoing->flow, &buf_ptr, outgoing->passthrough); ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen); outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno); } @@ -101,7 +99,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { outgoing->peer.event_handler = outgoing_disconnect_handler; flow_socket_ready(fd, outgoing->flow); flow_socket_connected(fd, outgoing->flow); - outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing); + flow_new(fd, outgoing->flow, outgoing->passthrough, (struct peer *) outgoing); break; case EINPROGRESS: