From 4398deb2d4e8cbe98139892b7fefd8a73a98b819 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 17 Feb 2016 22:14:04 -0800 Subject: [PATCH] Working model for triggering reconnects on outgoing connections. --- outgoing.c | 20 ++++++++++++++++++-- receive.c | 2 -- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/outgoing.c b/outgoing.c index b2e3755..2bc4606 100644 --- a/outgoing.c +++ b/outgoing.c @@ -22,11 +22,13 @@ struct outgoing { }; static void outgoing_connect_result(struct outgoing *, int); +static void outgoing_resolve(struct outgoing *); static void outgoing_connect_next(struct outgoing *outgoing) { if (outgoing->addr == NULL) { freeaddrinfo(outgoing->addrs); fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service); + // TODO: timed retry return; } @@ -44,7 +46,7 @@ static void outgoing_connect_next(struct outgoing *outgoing) { static void outgoing_connect_handler(struct peer *peer) { struct outgoing *outgoing = (struct outgoing *) peer; - peer_epoll_del(peer); + peer_epoll_del(&outgoing->peer); int error; socklen_t len = sizeof(error); @@ -52,6 +54,14 @@ static void outgoing_connect_handler(struct peer *peer) { outgoing_connect_result(outgoing, error); } +static void outgoing_disconnect_handler(struct peer *peer) { + struct outgoing *outgoing = (struct outgoing *) peer; + close(outgoing->peer.fd); + fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id); + + outgoing_resolve(outgoing); +} + static void outgoing_connect_result(struct outgoing *outgoing, int result) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0); @@ -59,7 +69,12 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { case 0: fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf); freeaddrinfo(outgoing->addrs); - outgoing->handler(outgoing->peer.fd, outgoing->passthrough); + + // We listen for just hangup on this fd. We pass a duplicate to the handler, so our epoll setings are independent. + outgoing->peer.event_handler = outgoing_disconnect_handler; + peer_epoll_add((struct peer *) outgoing, EPOLLRDHUP); + + outgoing->handler(dup(outgoing->peer.fd), outgoing->passthrough); break; case EINPROGRESS: @@ -88,6 +103,7 @@ static void outgoing_resolve(struct outgoing *outgoing) { int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs); if (gai_err) { fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err)); + // TODO: timed retry return; } outgoing->addr = outgoing->addrs; diff --git a/receive.c b/receive.c index 3f33701..c712b82 100644 --- a/receive.c +++ b/receive.c @@ -67,7 +67,6 @@ static void receive_read(struct peer *peer) { if (buf_fill(&receive->buf, receive->peer.fd) <= 0) { fprintf(stderr, "R %s: Connection closed by peer\n", receive->id); close(receive->peer.fd); - // TODO: reconnect return; } @@ -79,7 +78,6 @@ static void receive_read(struct peer *peer) { if (receive->buf.length == BUF_LEN_MAX) { fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id); close(receive->peer.fd); - // TODO: reconnect return; } }