Non-blocking connection support.

This commit is contained in:
Ian Gulliver
2016-02-17 08:05:18 +00:00
parent a8f1cb5767
commit 0db4d15897
5 changed files with 124 additions and 96 deletions

View File

@@ -56,12 +56,8 @@ static bool parse_opts(int argc, char *argv[], int epoll_fd) {
delim++; delim++;
// TODO: Fix orphan malloc. // TODO: Fix orphan malloc.
struct backend *backend = malloc(sizeof(*backend)); struct backend *backend = backend_new(optarg, delim, epoll_fd);
assert(backend); assert(backend);
backend_init(backend);
if (!backend_connect(optarg, delim, backend, epoll_fd)) {
return false;
}
break; break;
case 'd': case 'd':
@@ -101,17 +97,7 @@ static int loop(int epoll_fd) {
for (int n = 0; n < nfds; n++) { for (int n = 0; n < nfds; n++) {
struct peer *peer = events[n].data.ptr; struct peer *peer = events[n].data.ptr;
switch (peer->type) { peer->event_handler(peer, epoll_fd);
case PEER_BACKEND:
if (!backend_read((struct backend *) peer)) {
return -1;
}
break;
default:
fprintf(stderr, "Unpossible: unknown peer type.\n");
return -1;
}
} }
} }
} }

131
backend.c
View File

@@ -5,6 +5,7 @@
#include <netdb.h> #include <netdb.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <string.h> #include <string.h>
#include <errno.h>
#include "airspy_adsb.h" #include "airspy_adsb.h"
#include "beast.h" #include "beast.h"
@@ -12,7 +13,12 @@
#include "backend.h" #include "backend.h"
bool backend_autodetect_parse(struct backend *, struct packet *); static bool backend_autodetect_parse(struct backend *, struct packet *);
static void backend_connect(struct backend *, int);
static void backend_connect_next(struct backend *, int);
static void backend_connect_handler(struct peer *, int);
static void backend_read(struct peer *, int);
static parser parsers[] = { static parser parsers[] = {
@@ -22,84 +28,101 @@ static parser parsers[] = {
#define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers)) #define NUM_PARSERS (sizeof(parsers) / sizeof(*parsers))
void backend_init(struct backend *backend) { struct backend *backend_new(char *node, char *service, int epoll_fd) {
backend->type = PEER_BACKEND; struct backend *backend = malloc(sizeof(*backend));
backend->fd = -1; assert(backend);
backend->peer.fd = -1;
uuid_gen(backend->id);
backend->node = node;
backend->service = service;
buf_init(&backend->buf); buf_init(&backend->buf);
memset(backend->parser_state, 0, PARSER_STATE_LEN); memset(backend->parser_state, 0, PARSER_STATE_LEN);
backend->parser = backend_autodetect_parse; backend->parser = backend_autodetect_parse;
backend_connect(backend, epoll_fd);
return backend;
} }
bool backend_connect(char *node, char *service, struct backend *backend, int epoll_fd) { static void backend_connect(struct backend *backend, int epoll_fd) {
assert(backend->type == PEER_BACKEND); fprintf(stderr, "B %s: Connecting to %s %s...\n", backend->id, backend->node, backend->service);
uuid_gen(backend->id);
struct addrinfo *addrs;
{
struct addrinfo hints = { struct addrinfo hints = {
.ai_family = AF_UNSPEC, .ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM, .ai_socktype = SOCK_STREAM,
}; };
int gai_err = getaddrinfo(node, service, &hints, &addrs); int gai_err = getaddrinfo(backend->node, backend->service, &hints, &backend->addrs);
if (gai_err) { if (gai_err) {
fprintf(stderr, "B %s: getaddrinfo(%s %s): %s\n", backend->id, node, service, gai_strerror(gai_err)); fprintf(stderr, "B %s: getaddrinfo(%s %s): %s\n", backend->id, backend->node, backend->service, gai_strerror(gai_err));
return false; return;
} }
backend->addr = backend->addrs;
backend_connect_next(backend, epoll_fd);
} }
{ static void backend_connect_result(struct backend *backend, int epoll_fd, int result) {
struct addrinfo *addr; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
for (addr = addrs; addr != NULL; addr = addr->ai_next) { assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
backend->fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); switch (result) {
if (backend->fd == -1) { case 0:
perror("socket"); fprintf(stderr, "B %s: Connected to %s %s\n", backend->id, hbuf, sbuf);
continue; freeaddrinfo(backend->addrs);
} backend->peer.event_handler = backend_read;
peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLIN);
break;
if (connect(backend->fd, addr->ai_addr, addr->ai_addrlen) != -1) { case EINPROGRESS:
backend->peer.event_handler = backend_connect_handler;
peer_epoll_add((struct peer *) backend, epoll_fd, EPOLLOUT);
break;
default:
fprintf(stderr, "B %s: Can't connect to %s %s: %s\n", backend->id, hbuf, sbuf, strerror(result));
close(backend->peer.fd);
backend->addr = backend->addr->ai_next;
// Tail recursion :/
backend_connect_next(backend, epoll_fd);
break; break;
} }
close(backend->fd);
} }
if (addr == NULL) { static void backend_connect_next(struct backend *backend, int epoll_fd) {
freeaddrinfo(addrs); if (backend->addr == NULL) {
fprintf(stderr, "B %s: Can't connect to %s %s\n", backend->id, node, service); freeaddrinfo(backend->addrs);
return false; fprintf(stderr, "B %s: Can't connect to %s %s\n", backend->id, backend->node, backend->service);
return;
} }
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
if (getnameinfo(addr->ai_addr, addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0) { assert(getnameinfo(backend->addr->ai_addr, backend->addr->ai_addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0);
fprintf(stderr, "B %s: Connected to %s %s\n", backend->id, hbuf, sbuf); fprintf(stderr, "B %s: Connecting to %s %s...\n", backend->id, hbuf, sbuf);
}
backend->peer.fd = socket(backend->addr->ai_family, backend->addr->ai_socktype | SOCK_NONBLOCK, backend->addr->ai_protocol);
assert(backend->peer.fd >= 0);
int result = connect(backend->peer.fd, backend->addr->ai_addr, backend->addr->ai_addrlen);
backend_connect_result(backend, epoll_fd, result == 0 ? result : errno);
} }
freeaddrinfo(addrs); static void backend_connect_handler(struct peer *peer, int epoll_fd) {
struct backend *backend = (struct backend *) peer;
{ peer_epoll_del(peer, epoll_fd);
struct epoll_event ev = {
.events = EPOLLIN, int error;
.data = { socklen_t len = sizeof(error);
.ptr = backend, assert(getsockopt(backend->peer.fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0);
}, backend_connect_result(backend, epoll_fd, error);
};
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, backend->fd, &ev) == -1) {
perror("epoll_ctl");
return false;
}
} }
return true; static void backend_read(struct peer *peer, int epoll_fd) {
} struct backend *backend = (struct backend *) peer;
bool backend_read(struct backend *backend) { if (buf_fill(&backend->buf, backend->peer.fd) < 0) {
if (buf_fill(&backend->buf, backend->fd) < 0) {
fprintf(stderr, "B %s: Connection closed by backend\n", backend->id); fprintf(stderr, "B %s: Connection closed by backend\n", backend->id);
return false; close(backend->peer.fd);
peer_epoll_del(peer, epoll_fd);
// TODO: reconnect
return;
} }
struct packet packet = { struct packet packet = {
@@ -111,14 +134,14 @@ bool backend_read(struct backend *backend) {
if (backend->buf.length == BUF_LEN_MAX) { if (backend->buf.length == BUF_LEN_MAX) {
fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id); fprintf(stderr, "B %s: Input buffer overrun. This probably means that adsbus doesn't understand the protocol that this source is speaking.\n", backend->id);
return false; close(backend->peer.fd);
peer_epoll_del(peer, epoll_fd);
// TODO: reconnect
return;
} }
return true;
} }
bool backend_autodetect_parse(struct backend *backend, struct packet *packet) { static bool backend_autodetect_parse(struct backend *backend, struct packet *packet) {
assert(backend->type == PEER_BACKEND);
for (int i = 0; i < NUM_PARSERS; i++) { for (int i = 0; i < NUM_PARSERS; i++) {
if (parsers[i](backend, packet)) { if (parsers[i](backend, packet)) {
backend->parser = parsers[i]; backend->parser = parsers[i];

View File

@@ -7,17 +7,18 @@
#define PARSER_STATE_LEN 256 #define PARSER_STATE_LEN 256
struct backend; struct backend;
struct addrinfo;
typedef bool (*parser)(struct backend *, struct packet *); typedef bool (*parser)(struct backend *, struct packet *);
struct backend { struct backend {
enum peer_type type; struct peer peer;
char id[UUID_LEN]; char id[UUID_LEN];
int fd; char *node;
char *service;
struct addrinfo *addrs;
struct addrinfo *addr;
struct buf buf; struct buf buf;
char parser_state[PARSER_STATE_LEN]; char parser_state[PARSER_STATE_LEN];
parser parser; parser parser;
}; };
struct backend *backend_new(char *node, char *service, int epoll_fd);
void backend_init(struct backend *);
bool backend_connect(char *, char *, struct backend *, int);
bool backend_read(struct backend *);

View File

@@ -3,11 +3,27 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <sys/epoll.h>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include "common.h" #include "common.h"
void peer_epoll_add(struct peer *peer, int epoll_fd, uint32_t events) {
struct epoll_event ev = {
.events = events,
.data = {
.ptr = peer,
},
};
assert(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer->fd, &ev) == 0);
}
void peer_epoll_del(struct peer *peer, int epoll_fd) {
assert(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer->fd, NULL) == 0);
}
void buf_init(struct buf *buf) { void buf_init(struct buf *buf) {
buf->start = 0; buf->start = 0;
buf->length = 0; buf->length = 0;

View File

@@ -7,12 +7,14 @@
//////// peer //////// peer
// All specific peer structs must be castable to this. // All specific peer structs must be castable to this.
struct peer;
typedef void (*peer_event_handler)(struct peer *, int epoll_fd);
struct peer { struct peer {
enum peer_type { int fd;
PEER_BACKEND, peer_event_handler event_handler;
PEER_CLIENT,
} type;
}; };
void peer_epoll_add(struct peer *, int, uint32_t);
void peer_epoll_del(struct peer *, int);
//////// buf //////// buf