Cleanup how we cast and close peers.

This commit is contained in:
Ian Gulliver
2016-03-08 20:41:00 -08:00
parent cd302a1746
commit d39cd62991
9 changed files with 33 additions and 37 deletions

View File

@@ -50,10 +50,7 @@ static void exec_harvest(struct exec *exec) {
LOG(exec->id, "Client exited with signal %d", WTERMSIG(status)); LOG(exec->id, "Client exited with signal %d", WTERMSIG(status));
} }
} }
if (exec->log_peer.fd >= 0) { peer_close(&exec->log_peer);
assert(!close(exec->log_peer.fd));
exec->log_peer.fd = -1;
}
} }
static void exec_del(struct exec *exec) { static void exec_del(struct exec *exec) {
@@ -76,7 +73,7 @@ static void exec_close_handler(struct peer *peer) {
uint32_t delay = wakeup_get_retry_delay_ms(1); uint32_t delay = wakeup_get_retry_delay_ms(1);
LOG(exec->id, "Will retry in %ds", delay / 1000); LOG(exec->id, "Will retry in %ds", delay / 1000);
exec->peer.event_handler = exec_spawn_wrapper; exec->peer.event_handler = exec_spawn_wrapper;
wakeup_add((struct peer *) exec, delay); wakeup_add(&exec->peer, delay);
} }
static void exec_log_handler(struct peer *peer) { static void exec_log_handler(struct peer *peer) {
@@ -87,8 +84,7 @@ static void exec_log_handler(struct peer *peer) {
ssize_t ret = read(exec->log_peer.fd, linebuf, 4096); ssize_t ret = read(exec->log_peer.fd, linebuf, 4096);
if (ret <= 0) { if (ret <= 0) {
LOG(exec->id, "Log input stream closed"); LOG(exec->id, "Log input stream closed");
assert(!close(exec->log_peer.fd)); peer_close(&exec->log_peer);
exec->log_peer.fd = -1;
return; return;
} }
size_t len = (size_t) ret; size_t len = (size_t) ret;
@@ -114,8 +110,8 @@ static void exec_parent(struct exec *exec, pid_t child, int data_fd, int log_fd)
peer_epoll_add(&exec->log_peer, EPOLLIN); peer_epoll_add(&exec->log_peer, EPOLLIN);
exec->peer.event_handler = exec_close_handler; exec->peer.event_handler = exec_close_handler;
if (!flow_new_send_hello(data_fd, exec->flow, exec->passthrough, (struct peer *) exec)) { if (!flow_new_send_hello(data_fd, exec->flow, exec->passthrough, &exec->peer)) {
exec_close_handler((struct peer *) exec); exec_close_handler(&exec->peer);
return; return;
} }
} }

View File

@@ -72,7 +72,7 @@ static void file_retry(struct file *file) {
uint32_t delay = wakeup_get_retry_delay_ms(file->attempt++); uint32_t delay = wakeup_get_retry_delay_ms(file->attempt++);
LOG(file->id, "Will retry in %ds", delay / 1000); LOG(file->id, "Will retry in %ds", delay / 1000);
file->peer.event_handler = file_open_wrapper; file->peer.event_handler = file_open_wrapper;
wakeup_add((struct peer *) file, delay); wakeup_add(&file->peer, delay);
} }
static void file_handle_close(struct peer *peer) { static void file_handle_close(struct peer *peer) {
@@ -98,7 +98,7 @@ static void file_open(struct file *file) {
file->retry = file_should_retry(fd, file); file->retry = file_should_retry(fd, file);
file->peer.event_handler = file_handle_close; file->peer.event_handler = file_handle_close;
file->attempt = 0; file->attempt = 0;
if (!flow_new_send_hello(fd, file->flow, file->passthrough, (struct peer *) file)) { if (!flow_new_send_hello(fd, file->flow, file->passthrough, &file->peer)) {
LOG(file->id, "Error writing greeting"); LOG(file->id, "Error writing greeting");
file_retry(file); file_retry(file);
return; return;

View File

@@ -44,7 +44,7 @@ static void incoming_retry(struct incoming *incoming) {
uint32_t delay = wakeup_get_retry_delay_ms(incoming->attempt++); uint32_t delay = wakeup_get_retry_delay_ms(incoming->attempt++);
LOG(incoming->id, "Will retry in %ds", delay / 1000); LOG(incoming->id, "Will retry in %ds", delay / 1000);
incoming->peer.event_handler = incoming_resolve_wrapper; incoming->peer.event_handler = incoming_resolve_wrapper;
wakeup_add((struct peer *) incoming, delay); wakeup_add(&incoming->peer, delay);
} }
static void incoming_handler(struct peer *peer) { static void incoming_handler(struct peer *peer) {
@@ -79,9 +79,7 @@ static void incoming_handler(struct peer *peer) {
static void incoming_del(struct incoming *incoming) { static void incoming_del(struct incoming *incoming) {
flow_ref_dec(incoming->flow); flow_ref_dec(incoming->flow);
if (incoming->peer.fd >= 0) { peer_close(&incoming->peer);
assert(!close(incoming->peer.fd));
}
list_del(&incoming->incoming_list); list_del(&incoming->incoming_list);
free(incoming->node); free(incoming->node);
free(incoming->service); free(incoming->service);
@@ -134,13 +132,13 @@ static void incoming_listen(struct peer *peer) {
incoming->attempt = 0; incoming->attempt = 0;
incoming->peer.event_handler = incoming_handler; incoming->peer.event_handler = incoming_handler;
peer_epoll_add((struct peer *) incoming, EPOLLIN); peer_epoll_add(&incoming->peer, EPOLLIN);
} }
static void incoming_resolve(struct incoming *incoming) { static void incoming_resolve(struct incoming *incoming) {
LOG(incoming->id, "Resolving %s/%s...", incoming->node, incoming->service); LOG(incoming->id, "Resolving %s/%s...", incoming->node, incoming->service);
incoming->peer.event_handler = incoming_listen; incoming->peer.event_handler = incoming_listen;
resolve((struct peer *) incoming, incoming->node, incoming->service, AI_PASSIVE); resolve(&incoming->peer, incoming->node, incoming->service, AI_PASSIVE);
} }
static void incoming_resolve_wrapper(struct peer *peer) { static void incoming_resolve_wrapper(struct peer *peer) {

View File

@@ -48,7 +48,7 @@ static void outgoing_retry(struct outgoing *outgoing) {
uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt); uint32_t delay = wakeup_get_retry_delay_ms(++outgoing->attempt);
LOG(outgoing->id, "Will retry in %ds", delay / 1000); LOG(outgoing->id, "Will retry in %ds", delay / 1000);
outgoing->peer.event_handler = outgoing_resolve_wrapper; outgoing->peer.event_handler = outgoing_resolve_wrapper;
wakeup_add((struct peer *) outgoing, delay); wakeup_add(&outgoing->peer, delay);
} }
static void outgoing_connect_next(struct outgoing *outgoing) { static void outgoing_connect_next(struct outgoing *outgoing) {
@@ -85,9 +85,6 @@ static void outgoing_connect_handler(struct peer *peer) {
static void outgoing_disconnect_handler(struct peer *peer) { static void outgoing_disconnect_handler(struct peer *peer) {
struct outgoing *outgoing = (struct outgoing *) peer; struct outgoing *outgoing = (struct outgoing *) peer;
if (outgoing->peer.fd != -1) {
assert(!close(outgoing->peer.fd));
}
LOG(outgoing->id, "Peer disconnected; reconnecting..."); LOG(outgoing->id, "Peer disconnected; reconnecting...");
outgoing_retry(outgoing); outgoing_retry(outgoing);
} }
@@ -105,18 +102,17 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
outgoing->peer.event_handler = outgoing_disconnect_handler; outgoing->peer.event_handler = outgoing_disconnect_handler;
flow_socket_ready(fd, outgoing->flow); flow_socket_ready(fd, outgoing->flow);
flow_socket_connected(fd, outgoing->flow); flow_socket_connected(fd, outgoing->flow);
flow_new(fd, outgoing->flow, outgoing->passthrough, (struct peer *) outgoing); flow_new(fd, outgoing->flow, outgoing->passthrough, &outgoing->peer);
break; break;
case EINPROGRESS: case EINPROGRESS:
outgoing->peer.event_handler = outgoing_connect_handler; outgoing->peer.event_handler = outgoing_connect_handler;
peer_epoll_add((struct peer *) outgoing, EPOLLOUT); peer_epoll_add(&outgoing->peer, EPOLLOUT);
break; break;
default: default:
LOG(outgoing->id, "Can't connect to %s/%s: %s", hbuf, sbuf, strerror(result)); LOG(outgoing->id, "Can't connect to %s/%s: %s", hbuf, sbuf, strerror(result));
assert(!close(outgoing->peer.fd)); peer_close(&outgoing->peer);
outgoing->peer.fd = -1;
outgoing->addr = outgoing->addr->ai_next; outgoing->addr = outgoing->addr->ai_next;
// Tail recursion :/ // Tail recursion :/
outgoing_connect_next(outgoing); outgoing_connect_next(outgoing);
@@ -139,7 +135,7 @@ static void outgoing_resolve_handler(struct peer *peer) {
static void outgoing_resolve(struct outgoing *outgoing) { static void outgoing_resolve(struct outgoing *outgoing) {
LOG(outgoing->id, "Resolving %s/%s...", outgoing->node, outgoing->service); LOG(outgoing->id, "Resolving %s/%s...", outgoing->node, outgoing->service);
outgoing->peer.event_handler = outgoing_resolve_handler; outgoing->peer.event_handler = outgoing_resolve_handler;
resolve((struct peer *) outgoing, outgoing->node, outgoing->service, 0); resolve(&outgoing->peer, outgoing->node, outgoing->service, 0);
} }
static void outgoing_resolve_wrapper(struct peer *peer) { static void outgoing_resolve_wrapper(struct peer *peer) {
@@ -148,9 +144,7 @@ static void outgoing_resolve_wrapper(struct peer *peer) {
static void outgoing_del(struct outgoing *outgoing) { static void outgoing_del(struct outgoing *outgoing) {
flow_ref_dec(outgoing->flow); flow_ref_dec(outgoing->flow);
if (outgoing->peer.fd >= 0) { peer_close(&outgoing->peer);
assert(!close(outgoing->peer.fd));
}
list_del(&outgoing->outgoing_list); list_del(&outgoing->outgoing_list);
free(outgoing->node); free(outgoing->node);
free(outgoing->service); free(outgoing->service);

View File

@@ -90,6 +90,15 @@ void peer_epoll_del(struct peer *peer) {
} }
} }
void peer_close(struct peer *peer) {
if (peer->fd == -1) {
return;
}
peer_epoll_del(peer);
assert(!close(peer->fd));
peer->fd = -1;
}
void peer_call(struct peer *peer) { void peer_call(struct peer *peer) {
if (peer_shutdown_flag || !peer) { if (peer_shutdown_flag || !peer) {
return; return;

View File

@@ -21,5 +21,6 @@ void peer_cleanup(void);
void peer_shutdown(int signal); void peer_shutdown(int signal);
void peer_epoll_add(struct peer *, uint32_t); void peer_epoll_add(struct peer *, uint32_t);
void peer_epoll_del(struct peer *); void peer_epoll_del(struct peer *);
void peer_close(struct peer *);
void peer_call(struct peer *); void peer_call(struct peer *);
void peer_loop(void); void peer_loop(void);

View File

@@ -108,8 +108,7 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac
static void receive_del(struct receive *receive) { static void receive_del(struct receive *receive) {
LOG(receive->id, "Connection closed"); LOG(receive->id, "Connection closed");
peer_count_in--; peer_count_in--;
peer_epoll_del((struct peer *) receive); peer_close(&receive->peer);
assert(!close(receive->peer.fd));
list_del(&receive->receive_list); list_del(&receive->receive_list);
peer_call(receive->on_close); peer_call(receive->on_close);
free(receive); free(receive);
@@ -165,7 +164,7 @@ static void receive_new(int fd, void __attribute__((unused)) *passthrough, struc
list_add(&receive->receive_list, &receive_head); list_add(&receive->receive_list, &receive_head);
peer_epoll_add((struct peer *) receive, EPOLLIN); peer_epoll_add(&receive->peer, EPOLLIN);
LOG(receive->id, "New receive connection"); LOG(receive->id, "New receive connection");
} }

View File

@@ -92,8 +92,7 @@ static struct serializer {
static void send_del(struct send *send) { static void send_del(struct send *send) {
LOG(send->id, "Connection closed"); LOG(send->id, "Connection closed");
peer_count_out--; peer_count_out--;
peer_epoll_del((struct peer *) send); peer_close(&send->peer);
assert(!close(send->peer.fd));
list_del(&send->send_list); list_del(&send->send_list);
peer_call(send->on_close); peer_call(send->on_close);
free(send); free(send);
@@ -120,7 +119,7 @@ static void send_new(int fd, void *passthrough, struct peer *on_close) {
list_add(&send->send_list, &serializer->send_head); list_add(&send->send_list, &serializer->send_head);
peer_epoll_add((struct peer *) send, 0); peer_epoll_add(&send->peer, 0);
LOG(send->id, "New send connection: %s", serializer->name); LOG(send->id, "New send connection: %s", serializer->name);
} }

View File

@@ -52,10 +52,10 @@ static void send_receive_new(int fd, void *passthrough, struct peer *on_close) {
send_receive->ref_count = 2; send_receive->ref_count = 2;
list_add(&send_receive->send_receive_list, &send_receive_head); list_add(&send_receive->send_receive_list, &send_receive_head);
flow_new(fd, send_flow, passthrough, (struct peer *) send_receive); flow_new(fd, send_flow, passthrough, &send_receive->peer);
int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0); int fd2 = fcntl(fd, F_DUPFD_CLOEXEC, 0);
assert(fd2 >= 0); assert(fd2 >= 0);
flow_new(fd2, receive_flow, NULL, (struct peer *) send_receive); flow_new(fd2, receive_flow, NULL, &send_receive->peer);
} }
void send_receive_cleanup() { void send_receive_cleanup() {