From d39cd629916523d7f9df5115b04112cee8c03646 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 8 Mar 2016 20:41:00 -0800 Subject: [PATCH] Cleanup how we cast and close peers. --- adsbus/exec.c | 14 +++++--------- adsbus/file.c | 4 ++-- adsbus/incoming.c | 10 ++++------ adsbus/outgoing.c | 18 ++++++------------ adsbus/peer.c | 9 +++++++++ adsbus/peer.h | 1 + adsbus/receive.c | 5 ++--- adsbus/send.c | 5 ++--- adsbus/send_receive.c | 4 ++-- 9 files changed, 33 insertions(+), 37 deletions(-) diff --git a/adsbus/exec.c b/adsbus/exec.c index 4248129..b239320 100644 --- a/adsbus/exec.c +++ b/adsbus/exec.c @@ -50,10 +50,7 @@ static void exec_harvest(struct exec *exec) { LOG(exec->id, "Client exited with signal %d", WTERMSIG(status)); } } - if (exec->log_peer.fd >= 0) { - assert(!close(exec->log_peer.fd)); - exec->log_peer.fd = -1; - } + peer_close(&exec->log_peer); } static void exec_del(struct exec *exec) { @@ -76,7 +73,7 @@ static void exec_close_handler(struct peer *peer) { uint32_t delay = wakeup_get_retry_delay_ms(1); LOG(exec->id, "Will retry in %ds", delay / 1000); exec->peer.event_handler = exec_spawn_wrapper; - wakeup_add((struct peer *) exec, delay); + wakeup_add(&exec->peer, delay); } static void exec_log_handler(struct peer *peer) { @@ -87,8 +84,7 @@ static void exec_log_handler(struct peer *peer) { ssize_t ret = read(exec->log_peer.fd, linebuf, 4096); if (ret <= 0) { LOG(exec->id, "Log input stream closed"); - assert(!close(exec->log_peer.fd)); - exec->log_peer.fd = -1; + peer_close(&exec->log_peer); return; } size_t len = (size_t) ret; @@ -114,8 +110,8 @@ static void exec_parent(struct exec *exec, pid_t child, int data_fd, int log_fd) peer_epoll_add(&exec->log_peer, EPOLLIN); exec->peer.event_handler = exec_close_handler; - if (!flow_new_send_hello(data_fd, exec->flow, exec->passthrough, (struct peer *) exec)) { - exec_close_handler((struct peer *) exec); + if (!flow_new_send_hello(data_fd, exec->flow, exec->passthrough, &exec->peer)) { + exec_close_handler(&exec->peer); return; } } diff --git a/adsbus/file.c b/adsbus/file.c index 65638a1..dd4d2ef 100644 --- a/adsbus/file.c +++ b/adsbus/file.c @@ -72,7 +72,7 @@ static void file_retry(struct file *file) { uint32_t delay = wakeup_get_retry_delay_ms(file->attempt++); LOG(file->id, "Will retry in %ds", delay / 1000); file->peer.event_handler = file_open_wrapper; - wakeup_add((struct peer *) file, delay); + wakeup_add(&file->peer, delay); } static void file_handle_close(struct peer *peer) { @@ -98,7 +98,7 @@ static void file_open(struct file *file) { file->retry = file_should_retry(fd, file); file->peer.event_handler = file_handle_close; file->attempt = 0; - if (!flow_new_send_hello(fd, file->flow, file->passthrough, (struct peer *) file)) { + if (!flow_new_send_hello(fd, file->flow, file->passthrough, &file->peer)) { LOG(file->id, "Error writing greeting"); file_retry(file); return; diff --git a/adsbus/incoming.c b/adsbus/incoming.c index 55130ae..db243bd 100644 --- a/adsbus/incoming.c +++ b/adsbus/incoming.c @@ -44,7 +44,7 @@ static void incoming_retry(struct incoming *incoming) { uint32_t delay = wakeup_get_retry_delay_ms(incoming->attempt++); LOG(incoming->id, "Will retry in %ds", delay / 1000); incoming->peer.event_handler = incoming_resolve_wrapper; - wakeup_add((struct peer *) incoming, delay); + wakeup_add(&incoming->peer, delay); } static void incoming_handler(struct peer *peer) { @@ -79,9 +79,7 @@ static void incoming_handler(struct peer *peer) { static void incoming_del(struct incoming *incoming) { flow_ref_dec(incoming->flow); - if (incoming->peer.fd >= 0) { - assert(!close(incoming->peer.fd)); - } + peer_close(&incoming->peer); list_del(&incoming->incoming_list); free(incoming->node); free(incoming->service); @@ -134,13 +132,13 @@ static void incoming_listen(struct peer *peer) { incoming->attempt = 0; incoming->peer.event_handler = incoming_handler; - peer_epoll_add((struct peer *) incoming, EPOLLIN); + peer_epoll_add(&incoming->peer, EPOLLIN); } static void incoming_resolve(struct incoming *incoming) { LOG(incoming->id, "Resolving %s/%s...", incoming->node, incoming->service); incoming->peer.event_handler = incoming_listen; - resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE); + resolve(&incoming->peer, incoming->node, incoming->service, AI_PASSIVE); } static void incoming_resolve_wrapper(struct peer *peer) { diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index cce6dc9..7188a29 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -48,7 +48,7 @@ static void outgoing_retry(struct outgoing *outgoing) { uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt); LOG(outgoing->id, "Will retry in %ds", delay / 1000); outgoing->peer.event_handler = outgoing_resolve_wrapper; - wakeup_add((struct peer *) outgoing, delay); + wakeup_add(&outgoing->peer, delay); } static void outgoing_connect_next(struct outgoing *outgoing) { @@ -85,9 +85,6 @@ static void outgoing_connect_handler(struct peer *peer) { static void outgoing_disconnect_handler(struct peer *peer) { struct outgoing *outgoing = (struct outgoing *) peer; - if (outgoing->peer.fd != -1) { - assert(!close(outgoing->peer.fd)); - } LOG(outgoing->id, "Peer disconnected; reconnecting..."); outgoing_retry(outgoing); } @@ -105,18 +102,17 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { outgoing->peer.event_handler = outgoing_disconnect_handler; flow_socket_ready(fd, outgoing->flow); flow_socket_connected(fd, outgoing->flow); - flow_new(fd, outgoing->flow, outgoing->passthrough, (struct peer *) outgoing); + flow_new(fd, outgoing->flow, outgoing->passthrough, &outgoing->peer); break; case EINPROGRESS: outgoing->peer.event_handler = outgoing_connect_handler; - peer_epoll_add((struct peer *) outgoing, EPOLLOUT); + peer_epoll_add(&outgoing->peer, EPOLLOUT); break; default: LOG(outgoing->id, "Can't connect to %s/%s: %s", hbuf, sbuf, strerror(result)); - assert(!close(outgoing->peer.fd)); - outgoing->peer.fd = -1; + peer_close(&outgoing->peer); outgoing->addr = outgoing->addr->ai_next; // Tail recursion :/ outgoing_connect_next(outgoing); @@ -139,7 +135,7 @@ static void outgoing_resolve_handler(struct peer *peer) { static void outgoing_resolve(struct outgoing *outgoing) { LOG(outgoing->id, "Resolving %s/%s...", outgoing->node, outgoing->service); outgoing->peer.event_handler = outgoing_resolve_handler; - resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0); + resolve(&outgoing->peer, outgoing->node, outgoing->service, 0); } static void outgoing_resolve_wrapper(struct peer *peer) { @@ -148,9 +144,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) { static void outgoing_del(struct outgoing *outgoing) { flow_ref_dec(outgoing->flow); - if (outgoing->peer.fd >= 0) { - assert(!close(outgoing->peer.fd)); - } + peer_close(&outgoing->peer); list_del(&outgoing->outgoing_list); free(outgoing->node); free(outgoing->service); diff --git a/adsbus/peer.c b/adsbus/peer.c index 2f3e66e..f1aa29a 100644 --- a/adsbus/peer.c +++ b/adsbus/peer.c @@ -90,6 +90,15 @@ void peer_epoll_del(struct peer *peer) { } } +void peer_close(struct peer *peer) { + if (peer->fd == -1) { + return; + } + peer_epoll_del(peer); + assert(!close(peer->fd)); + peer->fd = -1; +} + void peer_call(struct peer *peer) { if (peer_shutdown_flag || !peer) { return; diff --git a/adsbus/peer.h b/adsbus/peer.h index 1b380ea..8c23cd4 100644 --- a/adsbus/peer.h +++ b/adsbus/peer.h @@ -21,5 +21,6 @@ void peer_cleanup(void); void peer_shutdown(int signal); void peer_epoll_add(struct peer *, uint32_t); void peer_epoll_del(struct peer *); +void peer_close(struct peer *); void peer_call(struct peer *); void peer_loop(void); diff --git a/adsbus/receive.c b/adsbus/receive.c index 95dbc3b..308672e 100644 --- a/adsbus/receive.c +++ b/adsbus/receive.c @@ -108,8 +108,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac static void receive_del(struct receive *receive) { LOG(receive->id, "Connection closed"); peer_count_in--; - peer_epoll_del((struct peer *) receive); - assert(!close(receive->peer.fd)); + peer_close(&receive->peer); list_del(&receive->receive_list); peer_call(receive->on_close); free(receive); @@ -165,7 +164,7 @@ static void receive_new(int fd, void __attribute__((unused)) *passthrough, struc list_add(&receive->receive_list, &receive_head); - peer_epoll_add((struct peer *) receive, EPOLLIN); + peer_epoll_add(&receive->peer, EPOLLIN); LOG(receive->id, "New receive connection"); } diff --git a/adsbus/send.c b/adsbus/send.c index 85bcc47..ffd49f4 100644 --- a/adsbus/send.c +++ b/adsbus/send.c @@ -92,8 +92,7 @@ static struct serializer { static void send_del(struct send *send) { LOG(send->id, "Connection closed"); peer_count_out--; - peer_epoll_del((struct peer *) send); - assert(!close(send->peer.fd)); + peer_close(&send->peer); list_del(&send->send_list); peer_call(send->on_close); free(send); @@ -120,7 +119,7 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) { list_add(&send->send_list, &serializer->send_head); - peer_epoll_add((struct peer *) send, 0); + peer_epoll_add(&send->peer, 0); LOG(send->id, "New send connection: %s", serializer->name); } diff --git a/adsbus/send_receive.c b/adsbus/send_receive.c index 408b600..15af572 100644 --- a/adsbus/send_receive.c +++ b/adsbus/send_receive.c @@ -52,10 +52,10 @@ static void send_receive_new(int fd, void *passthrough, struct peer *on_close) { send_receive->ref_count = 2; list_add(&send_receive->send_receive_list, &send_receive_head); - flow_new(fd, send_flow, passthrough, (struct peer *) send_receive); + flow_new(fd, send_flow, passthrough, &send_receive->peer); int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0); assert(fd2 >= 0); - flow_new(fd2, receive_flow, NULL, (struct peer *) send_receive); + flow_new(fd2, receive_flow, NULL, &send_receive->peer); } void send_receive_cleanup() {