From adfc23cba7c5be253dd2abcbc2d157030b8b1615 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 1 Mar 2016 19:43:11 -0800 Subject: [PATCH] Split out generic resolver functions. --- adsbus/Makefile | 2 +- adsbus/asyncaddrinfo.c | 125 +++++++++++++++++++++++++++++++++++++++++ adsbus/asyncaddrinfo.h | 8 +++ adsbus/resolve.c | 77 ++++--------------------- 4 files changed, 146 insertions(+), 66 deletions(-) create mode 100644 adsbus/asyncaddrinfo.c create mode 100644 adsbus/asyncaddrinfo.h diff --git a/adsbus/Makefile b/adsbus/Makefile index beec994..65b33cf 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -13,7 +13,7 @@ ADSBUS_TEST_FLAGS ?= --stdin --stdout=airspy_adsb --stdout=beast --stdout=json - OBJ_TRANSPORT = exec.o file.o incoming.o outgoing.o OBJ_FLOW = flow.o receive.o send.o OBJ_PROTOCOL = airspy_adsb.o beast.o json.o proto.o raw.o stats.o -OBJ_UTIL = buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o +OBJ_UTIL = asyncaddrinfo.o buf.o hex.o list.o opts.o packet.o peer.o rand.o resolve.o server.o socket.o uuid.o wakeup.o OBJ_PROTO = adsb.pb-c.o all: adsbus diff --git a/adsbus/asyncaddrinfo.c b/adsbus/asyncaddrinfo.c new file mode 100644 index 0000000..74edf69 --- /dev/null +++ b/adsbus/asyncaddrinfo.c @@ -0,0 +1,125 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "asyncaddrinfo.h" + +struct asyncaddrinfo_resolution { + int return_fd; + + char *node; + char *service; + struct addrinfo _hints, *hints; + + int err; + struct addrinfo *addrs; +}; + +static size_t asyncaddrinfo_num_threads; +static pthread_t *asyncaddrinfo_threads = NULL; +static int asyncaddrinfo_write_fd; + +static void *asyncaddrinfo_main(void *arg) { + int fd = (int) (intptr_t) arg; + struct asyncaddrinfo_resolution *res; + ssize_t len; + while ((len = recv(fd, &res, sizeof(res), 0)) == sizeof(res)) { + res->err = getaddrinfo(res->node, res->service, res->hints, &res->addrs); + int return_fd = res->return_fd; + res->return_fd = -1; + assert(send(return_fd, &res, sizeof(res), MSG_EOR) == sizeof(res)); + // Main thread now owns res + assert(!close(return_fd)); + } + assert(!len); + assert(!close(fd)); + return NULL; +} + +static void asyncaddrinfo_del(struct asyncaddrinfo_resolution *res) { + if (res->node) { + free(res->node); + res->node = NULL; + } + if (res->service) { + free(res->service); + res->service = NULL; + } + free(res); +} + +void asyncaddrinfo_init(size_t threads) { + assert(!asyncaddrinfo_threads); + + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); + asyncaddrinfo_write_fd = fds[1]; + + asyncaddrinfo_num_threads = threads; + asyncaddrinfo_threads = malloc(asyncaddrinfo_num_threads * sizeof(*asyncaddrinfo_threads)); + assert(asyncaddrinfo_threads); + + for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { + int subfd = dup(fds[0]); + assert(subfd >= 0); + assert(!pthread_create(&asyncaddrinfo_threads[i], NULL, asyncaddrinfo_main, (void *) (intptr_t) subfd)); + } + assert(!close(fds[0])); +} + +void asyncaddrinfo_cleanup() { + assert(asyncaddrinfo_threads); + assert(!close(asyncaddrinfo_write_fd)); + asyncaddrinfo_write_fd = -1; + for (size_t i = 0; i < asyncaddrinfo_num_threads; i++) { + assert(!pthread_join(asyncaddrinfo_threads[i], NULL)); + } + free(asyncaddrinfo_threads); + asyncaddrinfo_threads = NULL; +} + +int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints) { + int fds[2]; + assert(!socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, fds)); + + struct asyncaddrinfo_resolution *res = malloc(sizeof(*res)); + assert(res); + res->return_fd = fds[1]; + if (node) { + res->node = strdup(node); + assert(res->node); + } else { + res->node = NULL; + } + if (service) { + res->service = strdup(service); + assert(res->service); + } else { + res->service = NULL; + } + if (hints) { + memcpy(&res->_hints, hints, sizeof(res->_hints)); + res->hints = &res->_hints; + } else { + res->hints = NULL; + } + assert(send(asyncaddrinfo_write_fd, &res, sizeof(res), MSG_EOR) == sizeof(res)); + // Resolve thread now owns res + + return fds[0]; +} + +int asyncaddrinfo_result(int fd, struct addrinfo **addrs) { + struct asyncaddrinfo_resolution *res; + assert(recv(fd, &res, sizeof(res), 0) == sizeof(res)); + assert(!close(fd)); + *addrs = res->addrs; + int err = res->err; + asyncaddrinfo_del(res); + return err; +} diff --git a/adsbus/asyncaddrinfo.h b/adsbus/asyncaddrinfo.h new file mode 100644 index 0000000..a0a7ef6 --- /dev/null +++ b/adsbus/asyncaddrinfo.h @@ -0,0 +1,8 @@ +#pragma once + +struct addrinfo; + +void asyncaddrinfo_init(size_t threads); +void asyncaddrinfo_cleanup(void); +int asyncaddrinfo_resolve(const char *node, const char *service, struct addrinfo *hints); +int asyncaddrinfo_result(int fd, struct addrinfo **addrs); diff --git a/adsbus/resolve.c b/adsbus/resolve.c index 51990a2..35d4864 100644 --- a/adsbus/resolve.c +++ b/adsbus/resolve.c @@ -1,86 +1,33 @@ -#include -#include +#include +#include #include -#include -#include -#include -#include +#include "asyncaddrinfo.h" #include "peer.h" #include "resolve.h" -struct resolve { - int fd; - - char *node; - char *service; - int flags; - - int err; - struct addrinfo *addrs; -}; - -static pthread_t resolve_thread; -static int resolve_write_fd; - -static void *resolve_main(void *arg) { - int fd = (int) (intptr_t) arg; - struct resolve *res; - ssize_t len; - while ((len = read(fd, &res, sizeof(res))) == sizeof(res)) { - struct addrinfo hints = { - .ai_family = AF_UNSPEC, - .ai_socktype = SOCK_STREAM, - .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | res->flags, - }; - res->err = getaddrinfo(res->node, res->service, &hints, &res->addrs); - free(res->node); - free(res->service); - res->node = res->service = NULL; - assert(write(res->fd, &res, sizeof(res)) == sizeof(res)); - close(res->fd); - res->fd = -1; - } - assert(!len); - assert(!close(fd)); - return NULL; -} - void resolve_init() { - int fds[2]; - assert(!pipe2(fds, O_CLOEXEC)); - resolve_write_fd = fds[1]; - assert(!pthread_create(&resolve_thread, NULL, resolve_main, (void *) (intptr_t) fds[0])); + asyncaddrinfo_init(2); } void resolve_cleanup() { - assert(!close(resolve_write_fd)); - assert(!pthread_join(resolve_thread, NULL)); + asyncaddrinfo_cleanup(); } void resolve(struct peer *peer, const char *node, const char *service, int flags) { - int fds[2]; - assert(!pipe2(fds, O_CLOEXEC)); + struct addrinfo hints = { + .ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags, + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }; - struct resolve *res = malloc(sizeof(*res)); - res->fd = fds[1]; - res->node = strdup(node); - res->service = strdup(service); - res->flags = flags; - assert(write(resolve_write_fd, &res, sizeof(res)) == sizeof(res)); - - peer->fd = fds[0]; + peer->fd = asyncaddrinfo_resolve(node, service, &hints); peer_epoll_add(peer, EPOLLIN); } int resolve_result(struct peer *peer, struct addrinfo **addrs) { - struct resolve *res; - assert(read(peer->fd, &res, sizeof(res)) == sizeof(res)); - assert(!close(peer->fd)); + int err = asyncaddrinfo_result(peer->fd, addrs); peer->fd = -1; - *addrs = res->addrs; - int err = res->err; - free(res); return err; }