Working model for triggering reconnects on outgoing connections.
This commit is contained in:
20
outgoing.c
20
outgoing.c
@@ -22,11 +22,13 @@ struct outgoing {
|
|||||||
};
|
};
|
||||||
|
|
||||||
static void outgoing_connect_result(struct outgoing *, int);
|
static void outgoing_connect_result(struct outgoing *, int);
|
||||||
|
static void outgoing_resolve(struct outgoing *);
|
||||||
|
|
||||||
static void outgoing_connect_next(struct outgoing *outgoing) {
|
static void outgoing_connect_next(struct outgoing *outgoing) {
|
||||||
if (outgoing->addr == NULL) {
|
if (outgoing->addr == NULL) {
|
||||||
freeaddrinfo(outgoing->addrs);
|
freeaddrinfo(outgoing->addrs);
|
||||||
fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service);
|
fprintf(stderr, "O %s: Can't connect to any addresses of %s/%s\n", outgoing->id, outgoing->node, outgoing->service);
|
||||||
|
// TODO: timed retry
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -44,7 +46,7 @@ static void outgoing_connect_next(struct outgoing *outgoing) {
|
|||||||
static void outgoing_connect_handler(struct peer *peer) {
|
static void outgoing_connect_handler(struct peer *peer) {
|
||||||
struct outgoing *outgoing = (struct outgoing *) peer;
|
struct outgoing *outgoing = (struct outgoing *) peer;
|
||||||
|
|
||||||
peer_epoll_del(peer);
|
peer_epoll_del(&outgoing->peer);
|
||||||
|
|
||||||
int error;
|
int error;
|
||||||
socklen_t len = sizeof(error);
|
socklen_t len = sizeof(error);
|
||||||
@@ -52,6 +54,14 @@ static void outgoing_connect_handler(struct peer *peer) {
|
|||||||
outgoing_connect_result(outgoing, error);
|
outgoing_connect_result(outgoing, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void outgoing_disconnect_handler(struct peer *peer) {
|
||||||
|
struct outgoing *outgoing = (struct outgoing *) peer;
|
||||||
|
close(outgoing->peer.fd);
|
||||||
|
fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id);
|
||||||
|
|
||||||
|
outgoing_resolve(outgoing);
|
||||||
|
}
|
||||||
|
|
||||||
static void outgoing_connect_result(struct outgoing *outgoing, int result) {
|
static void outgoing_connect_result(struct outgoing *outgoing, int result) {
|
||||||
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
|
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
|
||||||
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
|
assert(getnameinfo(outgoing->addr->ai_addr, outgoing->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
|
||||||
@@ -59,7 +69,12 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
|
|||||||
case 0:
|
case 0:
|
||||||
fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf);
|
fprintf(stderr, "O %s: Connected to %s/%s\n", outgoing->id, hbuf, sbuf);
|
||||||
freeaddrinfo(outgoing->addrs);
|
freeaddrinfo(outgoing->addrs);
|
||||||
outgoing->handler(outgoing->peer.fd, outgoing->passthrough);
|
|
||||||
|
// We listen for just hangup on this fd. We pass a duplicate to the handler, so our epoll setings are independent.
|
||||||
|
outgoing->peer.event_handler = outgoing_disconnect_handler;
|
||||||
|
peer_epoll_add((struct peer *) outgoing, EPOLLRDHUP);
|
||||||
|
|
||||||
|
outgoing->handler(dup(outgoing->peer.fd), outgoing->passthrough);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case EINPROGRESS:
|
case EINPROGRESS:
|
||||||
@@ -88,6 +103,7 @@ static void outgoing_resolve(struct outgoing *outgoing) {
|
|||||||
int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs);
|
int gai_err = getaddrinfo(outgoing->node, outgoing->service, &hints, &outgoing->addrs);
|
||||||
if (gai_err) {
|
if (gai_err) {
|
||||||
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err));
|
fprintf(stderr, "O %s: Failed to resolve %s/%s: %s\n", outgoing->id, outgoing->node, outgoing->service, gai_strerror(gai_err));
|
||||||
|
// TODO: timed retry
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
outgoing->addr = outgoing->addrs;
|
outgoing->addr = outgoing->addrs;
|
||||||
|
|||||||
@@ -67,7 +67,6 @@ static void receive_read(struct peer *peer) {
|
|||||||
if (buf_fill(&receive->buf, receive->peer.fd) <= 0) {
|
if (buf_fill(&receive->buf, receive->peer.fd) <= 0) {
|
||||||
fprintf(stderr, "R %s: Connection closed by peer\n", receive->id);
|
fprintf(stderr, "R %s: Connection closed by peer\n", receive->id);
|
||||||
close(receive->peer.fd);
|
close(receive->peer.fd);
|
||||||
// TODO: reconnect
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,7 +78,6 @@ static void receive_read(struct peer *peer) {
|
|||||||
if (receive->buf.length == BUF_LEN_MAX) {
|
if (receive->buf.length == BUF_LEN_MAX) {
|
||||||
fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id);
|
fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id);
|
||||||
close(receive->peer.fd);
|
close(receive->peer.fd);
|
||||||
// TODO: reconnect
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user