From f25517096794f961d598c3e89f85a97dd12e1283 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 22 Feb 2016 14:37:00 -0800 Subject: [PATCH] Move wakeup into the main thread to remove complexity. Looks like CLOCK_MONOTONIC_COARSE doesn't need a syscall. --- adsbus/Makefile | 2 +- adsbus/common.c | 8 ++- adsbus/outgoing.c | 5 +- adsbus/wakeup.c | 166 ++++++++++++---------------------------------- adsbus/wakeup.h | 2 + 5 files changed, 53 insertions(+), 130 deletions(-) diff --git a/adsbus/Makefile b/adsbus/Makefile index 92fb54c..f48b412 100644 --- a/adsbus/Makefile +++ b/adsbus/Makefile @@ -1,5 +1,5 @@ CC ?= clang -CFLAGS ?= -Wall -Werror -O4 -g --std=gnu11 --pedantic-errors -fPIE -pie -fstack-protector-strong -pthread +CFLAGS ?= -Wall -Werror -O4 -g --std=gnu11 --pedantic-errors -fPIE -pie -fstack-protector-strong LDFLAGS ?= $(CFLAGS) -Wl,-z,relro -Wl,-z,now LIBS ?= -luuid -ljansson diff --git a/adsbus/common.c b/adsbus/common.c index c707963..5c85d03 100644 --- a/adsbus/common.c +++ b/adsbus/common.c @@ -12,7 +12,7 @@ #include #include "common.h" - +#include "wakeup.h" static char server_id[UUID_LEN]; static int peer_epoll_fd; @@ -68,16 +68,18 @@ void peer_loop() { while (!peer_canceled) { #define MAX_EVENTS 10 struct epoll_event events[MAX_EVENTS]; - int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, -1); + int nfds = epoll_wait(peer_epoll_fd, events, MAX_EVENTS, wakeup_get_delay()); if (nfds == -1 && errno == EINTR) { continue; } - assert(nfds > 0); + assert(nfds >= 0); for (int n = 0; n < nfds; n++) { struct peer *peer = events[n].data.ptr; peer->event_handler(peer); } + + wakeup_dispatch(); } assert(!close(peer_epoll_fd)); } diff --git a/adsbus/outgoing.c b/adsbus/outgoing.c index 43919ea..2a87665 100644 --- a/adsbus/outgoing.c +++ b/adsbus/outgoing.c @@ -100,6 +100,7 @@ static void outgoing_connect_result(struct outgoing *outgoing, int result) { default: fprintf(stderr, "O %s: Can't connect to %s/%s: %s\n", outgoing->id, hbuf, sbuf, strerror(result)); assert(!close(outgoing->peer.fd)); + outgoing->peer.fd = -1; outgoing->addr = outgoing->addr->ai_next; // Tail recursion :/ outgoing_connect_next(outgoing); @@ -130,7 +131,9 @@ static void outgoing_resolve_wrapper(struct peer *peer) { } static void outgoing_del(struct outgoing *outgoing) { - assert(!close(outgoing->peer.fd)); + if (outgoing->peer.fd >= 0) { + assert(!close(outgoing->peer.fd)); + } free(outgoing->node); free(outgoing->service); free(outgoing); diff --git a/adsbus/wakeup.c b/adsbus/wakeup.c index 662d505..116dbc5 100644 --- a/adsbus/wakeup.c +++ b/adsbus/wakeup.c @@ -9,29 +9,19 @@ #include #include #include -#include #include "common.h" #include "wakeup.h" -struct wakeup_peer { - struct peer peer; - struct peer *inner_peer; -}; - -struct wakeup_request { +struct wakeup_entry { int fd; uint64_t absolute_time_ms; -}; - -struct wakeup_entry { - struct wakeup_request request; + struct peer *peer; struct wakeup_entry *next; }; -static pthread_t wakeup_thread; -static int wakeup_write_fd; +struct wakeup_entry *head = NULL; static uint64_t wakeup_get_time_ms() { struct timespec tp; @@ -41,13 +31,44 @@ static uint64_t wakeup_get_time_ms() { return (tp.tv_sec * MS_PER_S) + (tp.tv_nsec / NS_PER_MS); } -static void wakeup_request_add(struct wakeup_entry **head, struct wakeup_request *request) { - struct wakeup_entry *entry = malloc(sizeof(*entry)); - memcpy(&entry->request, request, sizeof(entry->request)); +void wakeup_init() { +} - struct wakeup_entry *prev = NULL, *iter = *head; +void wakeup_cleanup() { + while (head) { + struct wakeup_entry *next = head->next; + free(head); + head = next; + } +} + +int wakeup_get_delay() { + if (!head) { + return -1; + } + int delay = head->absolute_time_ms - wakeup_get_time_ms(); + return delay < 0 ? 0 : delay; +} + +void wakeup_dispatch() { + uint64_t now = wakeup_get_time_ms(); + while (head && head->absolute_time_ms <= now) { + struct peer *peer = head->peer; + struct wakeup_entry *next = head->next; + free(head); + head = next; + peer->event_handler(peer); + } +} + +void wakeup_add(struct peer *peer, uint32_t delay_ms) { + struct wakeup_entry *entry = malloc(sizeof(*entry)); + entry->absolute_time_ms = wakeup_get_time_ms() + delay_ms; + entry->peer = peer; + + struct wakeup_entry *prev = NULL, *iter = head; while (iter) { - if (iter->request.absolute_time_ms > entry->request.absolute_time_ms) { + if (iter->absolute_time_ms > entry->absolute_time_ms) { break; } prev = iter; @@ -58,112 +79,7 @@ static void wakeup_request_add(struct wakeup_entry **head, struct wakeup_request entry->next = prev->next; prev->next = entry; } else { - entry->next = *head; - *head = entry; + entry->next = head; + head = entry; } } - -static void *wakeup_main(void *arg) { - int read_fd = (intptr_t) arg; - - int epoll_fd = epoll_create1(0); - assert(epoll_fd >= 0); - - struct epoll_event ev = { - .events = EPOLLIN, - .data = { - .fd = read_fd, - }, - }; - assert(!epoll_ctl(epoll_fd, EPOLL_CTL_ADD, read_fd, &ev)); - - struct wakeup_entry *head = NULL; - - while (1) { - int delay = -1; - if (head) { - // This call is the whole reason we need a separate thread: we want to avoid getting the time inside the tight main thread loop. - delay = head->request.absolute_time_ms - wakeup_get_time_ms(); - delay = delay < 0 ? 0 : delay; - } - -#define MAX_EVENTS 1 - struct epoll_event events[MAX_EVENTS]; - int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, delay); - if (nfds == -1 && errno == EINTR) { - continue; - } - - if (nfds == 1) { - assert(events[0].data.fd == read_fd); - struct wakeup_request request; - ssize_t result = read(read_fd, &request, sizeof(request)); - if (result == 0) { - // Peer closed connection, shutdown thread - break; - } - assert(result == sizeof(request)); - wakeup_request_add(&head, &request); - } else { - assert(nfds == 0); - } - - uint64_t now = wakeup_get_time_ms(); - while (head && head->request.absolute_time_ms <= now) { - close(head->request.fd); - struct wakeup_entry *next = head->next; - free(head); - head = next; - } - } - - while (head) { - close(head->request.fd); - struct wakeup_entry *next = head->next; - free(head); - head = next; - } - - assert(!close(read_fd)); - assert(!close(epoll_fd)); - return NULL; -} - -static void wakeup_handler(struct peer *peer) { - struct wakeup_peer *outer_peer = (struct wakeup_peer *) peer; - assert(!close(outer_peer->peer.fd)); - - struct peer *inner_peer = outer_peer->inner_peer; - free(outer_peer); - inner_peer->event_handler(inner_peer); -} - -void wakeup_init() { - int pipefd[2]; - assert(!pipe2(pipefd, O_NONBLOCK)); - assert(!pthread_create(&wakeup_thread, NULL, wakeup_main, (void *) (intptr_t) pipefd[0])); - wakeup_write_fd = pipefd[1]; -} - -void wakeup_cleanup() { - assert(!close(wakeup_write_fd)); - assert(!pthread_join(wakeup_thread, NULL)); -} - -void wakeup_add(struct peer *peer, uint32_t delay_ms) { - int pipefd[2]; - assert(!pipe2(pipefd, O_NONBLOCK)); - - struct wakeup_request request = { 0 }; - request.fd = pipefd[1], - request.absolute_time_ms = wakeup_get_time_ms() + delay_ms; - assert(write(wakeup_write_fd, &request, sizeof(request)) == sizeof(request)); - - struct wakeup_peer *outer_peer = malloc(sizeof(*outer_peer)); - assert(outer_peer); - outer_peer->peer.fd = pipefd[0]; - outer_peer->peer.event_handler = wakeup_handler; - outer_peer->inner_peer = peer; - - peer_epoll_add((struct peer *) outer_peer, EPOLLRDHUP); -} diff --git a/adsbus/wakeup.h b/adsbus/wakeup.h index 7ac34ee..b09291f 100644 --- a/adsbus/wakeup.h +++ b/adsbus/wakeup.h @@ -4,4 +4,6 @@ struct peer; void wakeup_init(); void wakeup_cleanup(); +int wakeup_get_delay(); +void wakeup_dispatch(); void wakeup_add(struct peer *, uint32_t);