diff --git a/adsbus/flow.c b/adsbus/flow.c index caec90f..9cc4b17 100644 --- a/adsbus/flow.c +++ b/adsbus/flow.c @@ -12,6 +12,12 @@ void flow_socket_ready(int fd, struct flow *flow) { } } +void flow_socket_connected(int fd, struct flow *flow) { + if (flow->socket_connected) { + flow->socket_connected(fd); + } +} + bool flow_hello(int fd, struct flow *flow, void *passthrough) { if (!flow->get_hello) { return true; diff --git a/adsbus/flow.h b/adsbus/flow.h index 8365131..c3bfc1d 100644 --- a/adsbus/flow.h +++ b/adsbus/flow.h @@ -9,12 +9,14 @@ struct peer; struct flow { const char *name; void (*socket_ready)(int); + void (*socket_connected)(int); void (*new)(int, void *, struct peer *); void (*get_hello)(struct buf **, void *); uint32_t *ref_count; }; void flow_socket_ready(int, struct flow *); +void flow_socket_connected(int, struct flow *); bool flow_hello(int, struct flow *, void *); bool flow_new(int, struct flow *, void *); void flow_ref_inc(struct flow *); diff --git a/adsbus/incoming.c b/adsbus/incoming.c index eeee032..344710b 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -64,6 +64,8 @@ static void incoming_handler(struct peer *peer) { local_hbuf, local_sbuf, peer_hbuf, peer_sbuf); + flow_socket_connected(fd, incoming->flow); + if (!flow_hello(fd, incoming->flow, incoming->passthrough)) { fprintf(stderr, "I %s: Error writing greeting\n", incoming->id); assert(!close(fd)); diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 6b8b4d9..b0d141b 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -95,11 +95,12 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { case 0: fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); freeaddrinfo(outgoing->addrs); - flow_socket_ready(outgoing->peer.fd, outgoing->flow); outgoing->attempt = 0; int fd = outgoing->peer.fd; outgoing->peer.fd = -1; outgoing->peer.event_handler = outgoing_disconnect_handler; + flow_socket_ready(fd, outgoing->flow); + flow_socket_connected(fd, outgoing->flow); outgoing->flow->new(fd, outgoing->passthrough, (struct peer *) outgoing); break; diff --git a/adsbus/receive.c b/adsbus/receive.c index bdde8a5..4eb80dc 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -41,6 +41,7 @@ static void receive_new(int, void *, struct peer *); static struct flow _receive_flow = { .name = "receive", + .socket_connected = socket_connected_receive, .new = receive_new, .ref_count = &peer_count_in, }; @@ -133,8 +134,6 @@ static void receive_read(struct peer *peer) { static void receive_new(int fd, void __attribute__((unused)) *passthrough, struct peer *on_close) { peer_count_in++; - socket_receive(fd); - struct receive *receive = malloc(sizeof(*receive)); assert(receive); diff --git a/adsbus/send.c b/adsbus/send.c index 822697a..d7bc01e 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -39,6 +39,7 @@ static void send_get_hello(struct buf **, void *); static struct flow _send_flow = { .name = "send", .socket_ready = socket_ready_send, + .socket_connected = socket_connected_send, .new = send_new, .get_hello = send_get_hello, .ref_count = &peer_count_out, @@ -105,8 +106,6 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) { peer_count_out++; - socket_send(fd); - struct send *send = malloc(sizeof(*send)); assert(send); diff --git a/adsbus/socket.c b/adsbus/socket.c index 29cbb37..aa998a8 100644 --- a/adsbus/socket.c +++ b/adsbus/socket.c @@ -48,13 +48,13 @@ void socket_ready_send(int fd) { assert(res == 0); } -void socket_send(int fd) { +void socket_connected_send(int fd) { // Called by data flow code; NOT safe to assume that fd is a socket int res = shutdown(fd, SHUT_RD); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); } -void socket_receive(int fd) { +void socket_connected_receive(int fd) { // Called by data flow code; NOT safe to assume that fd is a socket int res = shutdown(fd, SHUT_WR); assert(res == 0 || (res == -1 && errno == ENOTSOCK)); diff --git a/adsbus/socket.h b/adsbus/socket.h index 7b0eadb..a79e361 100644 --- a/adsbus/socket.h +++ b/adsbus/socket.h @@ -4,5 +4,5 @@ void socket_pre_bind(int); void socket_pre_listen(int); void socket_ready(int); void socket_ready_send(int); -void socket_send(int); -void socket_receive(int); +void socket_connected_send(int); +void socket_connected_receive(int);