Much closer to complete cleanup before exit.
This commit is contained in:
@@ -2,10 +2,15 @@
|
||||
#include <stdio.h>
|
||||
#include <getopt.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <assert.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "wakeup.h"
|
||||
|
||||
#include "incoming.h"
|
||||
#include "outgoing.h"
|
||||
|
||||
#include "receive.h"
|
||||
#include "send.h"
|
||||
|
||||
@@ -92,6 +97,8 @@ static bool parse_opts(int argc, char *argv[]) {
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
assert(!close(0));
|
||||
|
||||
hex_init();
|
||||
|
||||
peer_init();
|
||||
@@ -112,5 +119,8 @@ int main(int argc, char *argv[]) {
|
||||
wakeup_cleanup();
|
||||
send_cleanup();
|
||||
|
||||
incoming_cleanup();
|
||||
outgoing_cleanup();
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -1,26 +1,52 @@
|
||||
#define _GNU_SOURCE
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdbool.h>
|
||||
#include <assert.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#include <errno.h>
|
||||
#include <uuid/uuid.h>
|
||||
|
||||
#include "common.h"
|
||||
|
||||
|
||||
static int epoll_fd;
|
||||
static char server_id[UUID_LEN];
|
||||
static int peer_epoll_fd;
|
||||
static int peer_cancel_fd;
|
||||
static bool peer_canceled = false;
|
||||
|
||||
static void peer_cancel(int signal) {
|
||||
assert(!close(peer_cancel_fd));
|
||||
}
|
||||
|
||||
static void peer_cancel_handler(struct peer *peer) {
|
||||
fprintf(stderr, "X %s: Shutting down\n", server_id);
|
||||
assert(!close(peer->fd));
|
||||
free(peer);
|
||||
peer_canceled = true;
|
||||
}
|
||||
|
||||
void peer_init() {
|
||||
uuid_gen(server_id);
|
||||
|
||||
peer_epoll_fd = epoll_create1(0);
|
||||
assert(peer_epoll_fd >= 0);
|
||||
|
||||
int cancel_fds[2];
|
||||
assert(!pipe2(cancel_fds, O_NONBLOCK));
|
||||
|
||||
struct peer *cancel_peer = malloc(sizeof(*cancel_peer));
|
||||
assert(cancel_peer);
|
||||
cancel_peer->fd = cancel_fds[0];
|
||||
cancel_peer->event_handler = peer_cancel_handler;
|
||||
peer_epoll_add(cancel_peer, EPOLLRDHUP);
|
||||
|
||||
peer_cancel_fd = cancel_fds[1];
|
||||
signal(SIGINT, peer_cancel);
|
||||
epoll_fd = epoll_create1(0);
|
||||
assert(epoll_fd >= 0);
|
||||
}
|
||||
|
||||
void peer_epoll_add(struct peer *peer, uint32_t events) {
|
||||
@@ -30,28 +56,30 @@ void peer_epoll_add(struct peer *peer, uint32_t events) {
|
||||
.ptr = peer,
|
||||
},
|
||||
};
|
||||
assert(!epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev));
|
||||
assert(!epoll_ctl(peer_epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev));
|
||||
}
|
||||
|
||||
void peer_epoll_del(struct peer *peer) {
|
||||
assert(!epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL));
|
||||
assert(!epoll_ctl(peer_epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL));
|
||||
}
|
||||
|
||||
void peer_loop() {
|
||||
fprintf(stderr, "X %s: Starting event loop\n", server_id);
|
||||
while (!peer_canceled) {
|
||||
#define MAX_EVENTS 10
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
|
||||
if (nfds < 0) {
|
||||
perror("epoll_wait");
|
||||
break;
|
||||
int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, -1);
|
||||
if (nfds == -1 && errno == EINTR) {
|
||||
continue;
|
||||
}
|
||||
assert(nfds > 0);
|
||||
|
||||
for (int n = 0; n < nfds; n++) {
|
||||
struct peer *peer = events[n].data.ptr;
|
||||
peer->event_handler(peer);
|
||||
}
|
||||
}
|
||||
assert(!close(peer_epoll_fd));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -13,15 +13,19 @@
|
||||
#include "common.h"
|
||||
#include "incoming.h"
|
||||
|
||||
struct incoming;
|
||||
struct incoming {
|
||||
struct peer peer;
|
||||
char id[UUID_LEN];
|
||||
const char *node;
|
||||
const char *service;
|
||||
char *node;
|
||||
char *service;
|
||||
incoming_connection_handler handler;
|
||||
void *passthrough;
|
||||
struct incoming *next;
|
||||
};
|
||||
|
||||
static struct incoming *incoming_head = NULL;
|
||||
|
||||
static void incoming_handler(struct peer *peer) {
|
||||
struct incoming *incoming = (struct incoming *) peer;
|
||||
|
||||
@@ -48,7 +52,23 @@ static void incoming_handler(struct peer *peer) {
|
||||
incoming->handler(fd, incoming->passthrough);
|
||||
}
|
||||
|
||||
void incoming_new(const char *node, const char *service, incoming_connection_handler handler, void *passthrough) {
|
||||
static void incoming_del(struct incoming *incoming) {
|
||||
assert(!close(incoming->peer.fd));
|
||||
free(incoming->node);
|
||||
free(incoming->service);
|
||||
free(incoming);
|
||||
}
|
||||
|
||||
void incoming_cleanup() {
|
||||
struct incoming *iter = incoming_head;
|
||||
while (iter) {
|
||||
struct incoming *next = iter->next;
|
||||
incoming_del(iter);
|
||||
iter = next;
|
||||
}
|
||||
}
|
||||
|
||||
void incoming_new(char *node, char *service, incoming_connection_handler handler, void *passthrough) {
|
||||
struct incoming *incoming = malloc(sizeof(*incoming));
|
||||
incoming->peer.event_handler = incoming_handler;
|
||||
uuid_gen(incoming->id);
|
||||
@@ -69,7 +89,7 @@ void incoming_new(const char *node, const char *service, incoming_connection_han
|
||||
int gai_err = getaddrinfo(incoming->node, incoming->service, &hints, &addrs);
|
||||
if (gai_err) {
|
||||
fprintf(stderr, "I %s: Failed to resolve %s/%s: %s\n", incoming->id, incoming->node, incoming->service, gai_strerror(gai_err));
|
||||
// TODO: add incoming_del, free strdup values
|
||||
// TODO: retry
|
||||
free(incoming);
|
||||
return;
|
||||
}
|
||||
@@ -88,7 +108,7 @@ void incoming_new(const char *node, const char *service, incoming_connection_han
|
||||
|
||||
if (bind(incoming->peer.fd, addr->ai_addr, addr->ai_addrlen) != 0) {
|
||||
fprintf(stderr, "I %s: Failed to bind to %s/%s: %s\n", incoming->id, hbuf, sbuf, strerror(errno));
|
||||
close(incoming->peer.fd);
|
||||
assert(!close(incoming->peer.fd));
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -100,10 +120,13 @@ void incoming_new(const char *node, const char *service, incoming_connection_han
|
||||
|
||||
if (addr == NULL) {
|
||||
fprintf(stderr, "I %s: Failed to bind any addresses for %s/%s...\n", incoming->id, incoming->node, incoming->service);
|
||||
// TODO: use incoming_del
|
||||
// TODO: retry
|
||||
free(incoming);
|
||||
return;
|
||||
}
|
||||
|
||||
peer_epoll_add((struct peer *) incoming, EPOLLIN);
|
||||
|
||||
incoming->next = incoming_head;
|
||||
incoming_head = incoming;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
void incoming_cleanup();
|
||||
typedef void (*incoming_connection_handler)(int fd, void *);
|
||||
void incoming_new(const char *, const char *, incoming_connection_handler, void *);
|
||||
void incoming_new(char *, char *, incoming_connection_handler, void *);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <sys/types.h>
|
||||
@@ -10,17 +11,21 @@
|
||||
#include "common.h"
|
||||
#include "outgoing.h"
|
||||
|
||||
struct outgoing;
|
||||
struct outgoing {
|
||||
struct peer peer;
|
||||
char id[UUID_LEN];
|
||||
const char *node;
|
||||
const char *service;
|
||||
char *node;
|
||||
char *service;
|
||||
struct addrinfo *addrs;
|
||||
struct addrinfo *addr;
|
||||
outgoing_connection_handler handler;
|
||||
void *passthrough;
|
||||
struct outgoing *next;
|
||||
};
|
||||
|
||||
static struct outgoing *outgoing_head = NULL;
|
||||
|
||||
static void outgoing_connect_result(struct outgoing *, int);
|
||||
static void outgoing_resolve(struct outgoing *);
|
||||
|
||||
@@ -56,7 +61,7 @@ static void outgoing_connect_handler(struct peer *peer) {
|
||||
|
||||
static void outgoing_disconnect_handler(struct peer *peer) {
|
||||
struct outgoing *outgoing = (struct outgoing *) peer;
|
||||
close(outgoing->peer.fd);
|
||||
assert(!close(outgoing->peer.fd));
|
||||
fprintf(stderr, "O %s: Peer disconnected; reconnecting...\n", outgoing->id);
|
||||
|
||||
outgoing_resolve(outgoing);
|
||||
@@ -84,7 +89,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) {
|
||||
|
||||
default:
|
||||
fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result));
|
||||
close(outgoing->peer.fd);
|
||||
assert(!close(outgoing->peer.fd));
|
||||
outgoing->addr = outgoing->addr->ai_next;
|
||||
// Tail recursion :/
|
||||
outgoing_connect_next(outgoing);
|
||||
@@ -110,12 +115,32 @@ static void outgoing_resolve(struct outgoing *outgoing) {
|
||||
outgoing_connect_next(outgoing);
|
||||
}
|
||||
|
||||
void outgoing_new(const char *node, const char *service, outgoing_connection_handler handler, void *passthrough) {
|
||||
static void outgoing_del(struct outgoing *outgoing) {
|
||||
assert(!close(outgoing->peer.fd));
|
||||
free(outgoing->node);
|
||||
free(outgoing->service);
|
||||
free(outgoing);
|
||||
}
|
||||
|
||||
void outgoing_cleanup() {
|
||||
struct outgoing *iter = outgoing_head;
|
||||
while (iter) {
|
||||
struct outgoing *next = iter->next;
|
||||
outgoing_del(iter);
|
||||
iter = next;
|
||||
}
|
||||
}
|
||||
|
||||
void outgoing_new(char *node, char *service, outgoing_connection_handler handler, void *passthrough) {
|
||||
struct outgoing *outgoing = malloc(sizeof(*outgoing));
|
||||
uuid_gen(outgoing->id);
|
||||
outgoing->node = strdup(node);
|
||||
outgoing->service = strdup(service);
|
||||
outgoing->handler = handler;
|
||||
outgoing->passthrough = passthrough;
|
||||
|
||||
outgoing->next = outgoing_head;
|
||||
outgoing_head = outgoing;
|
||||
|
||||
outgoing_resolve(outgoing);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
void outgoing_cleanup();
|
||||
typedef void (*outgoing_connection_handler)(int fd, void *);
|
||||
void outgoing_new(const char *, const char *, outgoing_connection_handler, void *);
|
||||
void outgoing_new(char *, char *, outgoing_connection_handler, void *);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
@@ -56,12 +57,17 @@ static bool receive_autodetect_parse(struct receive *receive, struct packet *pac
|
||||
return false;
|
||||
}
|
||||
|
||||
static void receive_del(struct receive *receive) {
|
||||
assert(!close(receive->peer.fd));
|
||||
free(receive);
|
||||
}
|
||||
|
||||
static void receive_read(struct peer *peer) {
|
||||
struct receive *receive = (struct receive *) peer;
|
||||
|
||||
if (buf_fill(&receive->buf, receive->peer.fd) <= 0) {
|
||||
fprintf(stderr, "R %s: Connection closed by peer\n", receive->id);
|
||||
close(receive->peer.fd);
|
||||
receive_del(receive);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -72,7 +78,7 @@ static void receive_read(struct peer *peer) {
|
||||
|
||||
if (receive->buf.length == BUF_LEN_MAX) {
|
||||
fprintf(stderr, "R %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", receive->id);
|
||||
close(receive->peer.fd);
|
||||
receive_del(receive);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <pthread.h>
|
||||
|
||||
@@ -22,15 +23,19 @@ static void *wakeup_main(void *arg) {
|
||||
};
|
||||
assert(!epoll_ctl(epoll_fd, EPOLL_CTL_ADD, read_fd, &ev));
|
||||
|
||||
while (1) {
|
||||
#define MAX_EVENTS 10
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
|
||||
if (nfds < 0) {
|
||||
perror("epoll_wait");
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
|
||||
if (nfds == -1 && errno == EINTR) {
|
||||
continue;
|
||||
}
|
||||
assert(nfds >= 0);
|
||||
break; // XXX
|
||||
}
|
||||
|
||||
close(read_fd);
|
||||
close(epoll_fd);
|
||||
assert(!close(read_fd));
|
||||
assert(!close(epoll_fd));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -42,7 +47,7 @@ void wakeup_init() {
|
||||
}
|
||||
|
||||
void wakeup_cleanup() {
|
||||
close(wakeup_write_fd);
|
||||
assert(!close(wakeup_write_fd));
|
||||
assert(!pthread_join(wakeup_thread, NULL));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user