Get everyone out of flow's business.
This commit is contained in:
@@ -66,14 +66,11 @@ static void exec_parent(struct exec *exec, pid_t child, int fd) {
|
||||
exec->child = child;
|
||||
fprintf(stderr, "E %s: Child started as process %d\n", exec->id, exec->child);
|
||||
|
||||
if (!flow_hello(fd, exec->flow, exec->passthrough)) {
|
||||
assert(!close(fd));
|
||||
exec->peer.event_handler = exec_close_handler;
|
||||
if (!flow_new_send_hello(fd, exec->flow, exec->passthrough, (struct peer *) exec)) {
|
||||
exec_close_handler((struct peer *) exec);
|
||||
return;
|
||||
}
|
||||
|
||||
exec->peer.event_handler = exec_close_handler;
|
||||
exec->flow->new(fd, exec->passthrough, (struct peer *) exec);
|
||||
}
|
||||
|
||||
static void __attribute__ ((noreturn)) exec_child(const struct exec *exec, int fd) {
|
||||
|
||||
@@ -94,13 +94,12 @@ static void file_open(struct file *file) {
|
||||
|
||||
file->retry = file_should_retry(fd, file);
|
||||
file->peer.event_handler = file_handle_close;
|
||||
if (!flow_hello(fd, file->flow, file->passthrough)) {
|
||||
file->attempt = 0;
|
||||
if (!flow_new_send_hello(fd, file->flow, file->passthrough, (struct peer *) file)) {
|
||||
fprintf(stderr, "F %s: Error writing greeting\n", file->id);
|
||||
file_retry(file);
|
||||
return;
|
||||
}
|
||||
file->attempt = 0;
|
||||
file->flow->new(fd, file->passthrough, (struct peer *) file);
|
||||
}
|
||||
|
||||
static void file_open_wrapper(struct peer *peer) {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#include <assert.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "buf.h"
|
||||
@@ -5,6 +6,15 @@
|
||||
|
||||
#include "flow.h"
|
||||
|
||||
static bool flow_send_hello(int fd, struct flow *flow, void *passthrough) {
|
||||
struct buf buf = BUF_INIT, *buf_ptr = &buf;
|
||||
flow_get_hello(flow, &buf_ptr, passthrough);
|
||||
if (!buf_ptr->length) {
|
||||
return true;
|
||||
}
|
||||
return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length);
|
||||
}
|
||||
|
||||
void flow_socket_ready(int fd, struct flow *flow) {
|
||||
socket_ready(fd);
|
||||
if (flow->socket_ready) {
|
||||
@@ -18,26 +28,26 @@ void flow_socket_connected(int fd, struct flow *flow) {
|
||||
}
|
||||
}
|
||||
|
||||
bool flow_hello(int fd, struct flow *flow, void *passthrough) {
|
||||
if (!flow->get_hello) {
|
||||
return true;
|
||||
}
|
||||
struct buf buf = BUF_INIT, *buf_ptr = &buf;
|
||||
flow->get_hello(&buf_ptr, passthrough);
|
||||
if (!buf_ptr->length) {
|
||||
return true;
|
||||
}
|
||||
return (write(fd, buf_at(buf_ptr, 0), buf_ptr->length) == (ssize_t) buf_ptr->length);
|
||||
void flow_new(int fd, struct flow *flow, void *passthrough, struct peer *on_close) {
|
||||
flow->new(fd, passthrough, on_close);
|
||||
}
|
||||
|
||||
bool flow_new(int fd, struct flow *flow, void *passthrough) {
|
||||
if (!flow_hello(fd, flow, passthrough)) {
|
||||
bool flow_new_send_hello(int fd, struct flow *flow, void *passthrough, struct peer *on_close) {
|
||||
if (!flow_send_hello(fd, flow, passthrough)) {
|
||||
assert(!close(fd));
|
||||
return false;
|
||||
}
|
||||
flow->new(fd, passthrough, NULL);
|
||||
flow_new(fd, flow, passthrough, on_close);
|
||||
return true;
|
||||
}
|
||||
|
||||
void flow_get_hello(struct flow *flow, struct buf **buf_ptr, void *passthrough) {
|
||||
if (!flow->get_hello) {
|
||||
return;
|
||||
}
|
||||
flow->get_hello(buf_ptr, passthrough);
|
||||
}
|
||||
|
||||
void flow_ref_inc(struct flow *flow) {
|
||||
(*flow->ref_count)++;
|
||||
}
|
||||
|
||||
@@ -17,7 +17,8 @@ struct flow {
|
||||
|
||||
void flow_socket_ready(int, struct flow *);
|
||||
void flow_socket_connected(int, struct flow *);
|
||||
bool flow_hello(int, struct flow *, void *);
|
||||
bool flow_new(int, struct flow *, void *);
|
||||
void flow_new(int, struct flow *, void *, struct peer *);
|
||||
bool flow_new_send_hello(int, struct flow *, void *, struct peer *);
|
||||
void flow_get_hello(struct flow *, struct buf **, void *);
|
||||
void flow_ref_inc(struct flow *);
|
||||
void flow_ref_dec(struct flow *);
|
||||
|
||||
@@ -66,13 +66,10 @@ static void incoming_handler(struct peer *peer) {
|
||||
|
||||
flow_socket_connected(fd, incoming->flow);
|
||||
|
||||
if (!flow_hello(fd, incoming->flow, incoming->passthrough)) {
|
||||
if (!flow_new_send_hello(fd, incoming->flow, incoming->passthrough, NULL)) {
|
||||
fprintf(stderr, "I %s: Error writing greeting\n", incoming->id);
|
||||
assert(!close(fd));
|
||||
return;
|
||||
}
|
||||
|
||||
incoming->flow->new(fd, incoming->passthrough, NULL);
|
||||
}
|
||||
|
||||
static void incoming_del(struct incoming *incoming) {
|
||||
|
||||
@@ -136,7 +136,7 @@ bool opts_add_exec_send(char *arg) {
|
||||
bool opts_add_stdin(char __attribute__((unused)) *arg) {
|
||||
int fd = fcntl(0, F_DUPFD_CLOEXEC, 0);
|
||||
assert(fd >= 0);
|
||||
return flow_new(fd, receive_flow, NULL);
|
||||
return flow_new_send_hello(fd, receive_flow, NULL, NULL);
|
||||
}
|
||||
|
||||
bool opts_add_stdout(char *arg) {
|
||||
@@ -146,5 +146,5 @@ bool opts_add_stdout(char *arg) {
|
||||
}
|
||||
int fd = fcntl(1, F_DUPFD_CLOEXEC, 0);
|
||||
assert(fd >= 0);
|
||||
return flow_new(fd, send_flow, serializer);
|
||||
return flow_new_send_hello(fd, send_flow, serializer, NULL);
|
||||
}
|
||||
|
||||
@@ -61,9 +61,7 @@ static void outgoing_connect_next(struct outgoing *outgoing) {
|
||||
assert(outgoing->peer.fd >= 0);
|
||||
|
||||
struct buf buf = BUF_INIT, *buf_ptr = &buf;
|
||||
if (outgoing->flow->get_hello) {
|
||||
outgoing->flow->get_hello(&buf_ptr, outgoing->passthrough);
|
||||
}
|
||||
flow_get_hello(outgoing->flow, &buf_ptr, outgoing->passthrough);
|
||||
ssize_t result = sendto(outgoing->peer.fd, buf_at(buf_ptr, 0), buf_ptr->length, MSG_FASTOPEN, outgoing->addr->ai_addr, outgoing->addr->ai_addrlen);
|
||||
outgoing_connect_result(outgoing, result == (ssize_t) buf_ptr->length ? EINPROGRESS : errno);
|
||||
}
|
||||
@@ -101,7 +99,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
|
||||
outgoing->peer.event_handler = outgoing_disconnect_handler;
|
||||
flow_socket_ready(fd, outgoing->flow);
|
||||
flow_socket_connected(fd, outgoing->flow);
|
||||
outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing);
|
||||
flow_new(fd, outgoing->flow, outgoing->passthrough, (struct peer *) outgoing);
|
||||
break;
|
||||
|
||||
case EINPROGRESS:
|
||||
|
||||
Reference in New Issue
Block a user