Split out generic resolver functions.
This commit is contained in:
@@ -13,7 +13,7 @@ ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json -
|
|||||||
OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o
|
OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o
|
||||||
OBJ_FLOW = flow.o receive.o send.o
|
OBJ_FLOW = flow.o receive.o send.o
|
||||||
OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o
|
OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o
|
||||||
OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o
|
OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o
|
||||||
OBJ_PROTO = adsb.pb-c.o
|
OBJ_PROTO = adsb.pb-c.o
|
||||||
|
|
||||||
all: adsbus
|
all: adsbus
|
||||||
|
|||||||
125
adsbus/asyncaddrinfo.c
Normal file
125
adsbus/asyncaddrinfo.c
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
#include <assert.h>
|
||||||
|
#include <netdb.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "asyncaddrinfo.h"
|
||||||
|
|
||||||
|
struct asyncaddrinfo_resolution {
|
||||||
|
int return_fd;
|
||||||
|
|
||||||
|
char *node;
|
||||||
|
char *service;
|
||||||
|
struct addrinfo _hints, *hints;
|
||||||
|
|
||||||
|
int err;
|
||||||
|
struct addrinfo *addrs;
|
||||||
|
};
|
||||||
|
|
||||||
|
static size_t asyncaddrinfo_num_threads;
|
||||||
|
static pthread_t *asyncaddrinfo_threads = NULL;
|
||||||
|
static int asyncaddrinfo_write_fd;
|
||||||
|
|
||||||
|
static void *asyncaddrinfo_main(void *arg) {
|
||||||
|
int fd = (int) (intptr_t) arg;
|
||||||
|
struct asyncaddrinfo_resolution *res;
|
||||||
|
ssize_t len;
|
||||||
|
while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) {
|
||||||
|
res->err = getaddrinfo(res->node, res->service, res->hints, &res->addrs);
|
||||||
|
int return_fd = res->return_fd;
|
||||||
|
res->return_fd = -1;
|
||||||
|
assert(send(return_fd, &res, sizeof(res), MSG_EOR) == sizeof(res));
|
||||||
|
// Main thread now owns res
|
||||||
|
assert(!close(return_fd));
|
||||||
|
}
|
||||||
|
assert(!len);
|
||||||
|
assert(!close(fd));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void asyncaddrinfo_del(struct asyncaddrinfo_resolution *res) {
|
||||||
|
if (res->node) {
|
||||||
|
free(res->node);
|
||||||
|
res->node = NULL;
|
||||||
|
}
|
||||||
|
if (res->service) {
|
||||||
|
free(res->service);
|
||||||
|
res->service = NULL;
|
||||||
|
}
|
||||||
|
free(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
void asyncaddrinfo_init(size_t threads) {
|
||||||
|
assert(!asyncaddrinfo_threads);
|
||||||
|
|
||||||
|
int fds[2];
|
||||||
|
assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds));
|
||||||
|
asyncaddrinfo_write_fd = fds[1];
|
||||||
|
|
||||||
|
asyncaddrinfo_num_threads = threads;
|
||||||
|
asyncaddrinfo_threads = malloc(asyncaddrinfo_num_threads * sizeof(*asyncaddrinfo_threads));
|
||||||
|
assert(asyncaddrinfo_threads);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) {
|
||||||
|
int subfd = dup(fds[0]);
|
||||||
|
assert(subfd >= 0);
|
||||||
|
assert(!pthread_create(&asyncaddrinfo_threads[i], NULL, asyncaddrinfo_main, (void *) (intptr_t) subfd));
|
||||||
|
}
|
||||||
|
assert(!close(fds[0]));
|
||||||
|
}
|
||||||
|
|
||||||
|
void asyncaddrinfo_cleanup() {
|
||||||
|
assert(asyncaddrinfo_threads);
|
||||||
|
assert(!close(asyncaddrinfo_write_fd));
|
||||||
|
asyncaddrinfo_write_fd = -1;
|
||||||
|
for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) {
|
||||||
|
assert(!pthread_join(asyncaddrinfo_threads[i], NULL));
|
||||||
|
}
|
||||||
|
free(asyncaddrinfo_threads);
|
||||||
|
asyncaddrinfo_threads = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints) {
|
||||||
|
int fds[2];
|
||||||
|
assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds));
|
||||||
|
|
||||||
|
struct asyncaddrinfo_resolution *res = malloc(sizeof(*res));
|
||||||
|
assert(res);
|
||||||
|
res->return_fd = fds[1];
|
||||||
|
if (node) {
|
||||||
|
res->node = strdup(node);
|
||||||
|
assert(res->node);
|
||||||
|
} else {
|
||||||
|
res->node = NULL;
|
||||||
|
}
|
||||||
|
if (service) {
|
||||||
|
res->service = strdup(service);
|
||||||
|
assert(res->service);
|
||||||
|
} else {
|
||||||
|
res->service = NULL;
|
||||||
|
}
|
||||||
|
if (hints) {
|
||||||
|
memcpy(&res->_hints, hints, sizeof(res->_hints));
|
||||||
|
res->hints = &res->_hints;
|
||||||
|
} else {
|
||||||
|
res->hints = NULL;
|
||||||
|
}
|
||||||
|
assert(send(asyncaddrinfo_write_fd, &res, sizeof(res), MSG_EOR) == sizeof(res));
|
||||||
|
// Resolve thread now owns res
|
||||||
|
|
||||||
|
return fds[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
int asyncaddrinfo_result(int fd, struct addrinfo **addrs) {
|
||||||
|
struct asyncaddrinfo_resolution *res;
|
||||||
|
assert(recv(fd, &res, sizeof(res), 0) == sizeof(res));
|
||||||
|
assert(!close(fd));
|
||||||
|
*addrs = res->addrs;
|
||||||
|
int err = res->err;
|
||||||
|
asyncaddrinfo_del(res);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
8
adsbus/asyncaddrinfo.h
Normal file
8
adsbus/asyncaddrinfo.h
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
struct addrinfo;
|
||||||
|
|
||||||
|
void asyncaddrinfo_init(size_t threads);
|
||||||
|
void asyncaddrinfo_cleanup(void);
|
||||||
|
int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints);
|
||||||
|
int asyncaddrinfo_result(int fd, struct addrinfo **addrs);
|
||||||
@@ -1,86 +1,33 @@
|
|||||||
#include <assert.h>
|
#include <sys/types.h>
|
||||||
#include <fcntl.h>
|
#include <sys/socket.h>
|
||||||
#include <netdb.h>
|
#include <netdb.h>
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <pthread.h>
|
|
||||||
|
|
||||||
|
#include "asyncaddrinfo.h"
|
||||||
#include "peer.h"
|
#include "peer.h"
|
||||||
|
|
||||||
#include "resolve.h"
|
#include "resolve.h"
|
||||||
|
|
||||||
struct resolve {
|
|
||||||
int fd;
|
|
||||||
|
|
||||||
char *node;
|
|
||||||
char *service;
|
|
||||||
int flags;
|
|
||||||
|
|
||||||
int err;
|
|
||||||
struct addrinfo *addrs;
|
|
||||||
};
|
|
||||||
|
|
||||||
static pthread_t resolve_thread;
|
|
||||||
static int resolve_write_fd;
|
|
||||||
|
|
||||||
static void *resolve_main(void *arg) {
|
|
||||||
int fd = (int) (intptr_t) arg;
|
|
||||||
struct resolve *res;
|
|
||||||
ssize_t len;
|
|
||||||
while ((len = read(fd, &res, sizeof(res))) == sizeof(res)) {
|
|
||||||
struct addrinfo hints = {
|
|
||||||
.ai_family = AF_UNSPEC,
|
|
||||||
.ai_socktype = SOCK_STREAM,
|
|
||||||
.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | res->flags,
|
|
||||||
};
|
|
||||||
res->err = getaddrinfo(res->node, res->service, &hints, &res->addrs);
|
|
||||||
free(res->node);
|
|
||||||
free(res->service);
|
|
||||||
res->node = res->service = NULL;
|
|
||||||
assert(write(res->fd, &res, sizeof(res)) == sizeof(res));
|
|
||||||
close(res->fd);
|
|
||||||
res->fd = -1;
|
|
||||||
}
|
|
||||||
assert(!len);
|
|
||||||
assert(!close(fd));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void resolve_init() {
|
void resolve_init() {
|
||||||
int fds[2];
|
asyncaddrinfo_init(2);
|
||||||
assert(!pipe2(fds, O_CLOEXEC));
|
|
||||||
resolve_write_fd = fds[1];
|
|
||||||
assert(!pthread_create(&resolve_thread, NULL, resolve_main, (void *) (intptr_t) fds[0]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void resolve_cleanup() {
|
void resolve_cleanup() {
|
||||||
assert(!close(resolve_write_fd));
|
asyncaddrinfo_cleanup();
|
||||||
assert(!pthread_join(resolve_thread, NULL));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void resolve(struct peer *peer, const char *node, const char *service, int flags) {
|
void resolve(struct peer *peer, const char *node, const char *service, int flags) {
|
||||||
int fds[2];
|
struct addrinfo hints = {
|
||||||
assert(!pipe2(fds, O_CLOEXEC));
|
.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags,
|
||||||
|
.ai_family = AF_UNSPEC,
|
||||||
|
.ai_socktype = SOCK_STREAM,
|
||||||
|
};
|
||||||
|
|
||||||
struct resolve *res = malloc(sizeof(*res));
|
peer->fd = asyncaddrinfo_resolve(node, service, &hints);
|
||||||
res->fd = fds[1];
|
|
||||||
res->node = strdup(node);
|
|
||||||
res->service = strdup(service);
|
|
||||||
res->flags = flags;
|
|
||||||
assert(write(resolve_write_fd, &res, sizeof(res)) == sizeof(res));
|
|
||||||
|
|
||||||
peer->fd = fds[0];
|
|
||||||
peer_epoll_add(peer, EPOLLIN);
|
peer_epoll_add(peer, EPOLLIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
int resolve_result(struct peer *peer, struct addrinfo **addrs) {
|
int resolve_result(struct peer *peer, struct addrinfo **addrs) {
|
||||||
struct resolve *res;
|
int err = asyncaddrinfo_result(peer->fd, addrs);
|
||||||
assert(read(peer->fd, &res, sizeof(res)) == sizeof(res));
|
|
||||||
assert(!close(peer->fd));
|
|
||||||
peer->fd = -1;
|
peer->fd = -1;
|
||||||
*addrs = res->addrs;
|
|
||||||
int err = res->err;
|
|
||||||
free(res);
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user