diff --git a/clients/c/Makefile b/clients/c/Makefile index 998bf6f..2c77cce 100644 --- a/clients/c/Makefile +++ b/clients/c/Makefile @@ -1,7 +1,7 @@ CC ?= gcc CFLAGS ?= -Wall -Werror -I/usr/local/include -fpic -O -g LDFLAGS ?= -Wall -L/usr/local/lib -L. -O -LIBS ?= -lcurl -ljansson -luuid -lm -lpthread +LIBS ?= -lcurl -ljansson -luuid -lpthread all: libcosmopolite.so diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 536cd30..6fefd6e 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -10,15 +9,11 @@ #include "cosmopolite.h" -#define COSMO_CHECK_SECONDS 10 - #define min(a, b) ((a) < (b) ? (a) : (b)) #define max(a, b) ((a) > (b) ? (a) : (b)) -#define DELAY_MIN_MS 250 -#define DELAY_MAX_MS 32000 -#define DELAY_EXPONENT 1.1 -#define DELAY_STAGGER_FACTOR 10 +#define CYCLE_MS 10000 +#define CYCLE_STAGGER_FACTOR 10 typedef struct { char *send_buf; @@ -193,8 +188,16 @@ static void cosmo_handle_event(cosmo *instance, json_t *event) { } } +// Takes ownership of arguments. +static json_t *cosmo_command(const char *name, const json_t *arguments) { + return json_pack("{ssso}", "command", name, "arguments", arguments); +} + // Takes ownership of commands. static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { + // Always poll. + json_array_append(commands, cosmo_command("poll", json_array())); + char *request = cosmo_build_rpc(instance, commands); char *response = cosmo_send_http(instance, request); @@ -282,38 +285,27 @@ static void *cosmo_thread_main(void *arg) { assert(!pthread_mutex_lock(&instance->lock)); while (!instance->shutdown) { - if (json_array_size(instance->command_queue)) { - json_t *commands = instance->command_queue; - instance->command_queue = json_array(); - instance->next_delay_ms = pow(instance->next_delay_ms, DELAY_EXPONENT); - instance->next_delay_ms = min(DELAY_MAX_MS, max(DELAY_MIN_MS, instance->next_delay_ms)); - instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / DELAY_STAGGER_FACTOR); + json_t *commands = instance->command_queue; + instance->command_queue = json_array(); - assert(!pthread_mutex_unlock(&instance->lock)); - json_t *to_retry = cosmo_send_rpc(instance, commands); - assert(!pthread_mutex_lock(&instance->lock)); + instance->next_delay_ms = CYCLE_MS; + instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR); - json_array_extend(instance->command_queue, to_retry); - json_decref(to_retry); - } + assert(!pthread_mutex_unlock(&instance->lock)); + json_t *to_retry = cosmo_send_rpc(instance, commands); + assert(!pthread_mutex_lock(&instance->lock)); - if (json_array_size(instance->command_queue)) { - struct timeval tv; - assert(!gettimeofday(&tv, NULL)); + json_array_extend(instance->command_queue, to_retry); + json_decref(to_retry); - struct timespec ts; - if (tv.tv_usec + ((instance->next_delay_ms % 1000) * 1000) > 1000000) { - // Carry - ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000) + 1; - ts.tv_nsec = (tv.tv_usec * 1000) + ((instance->next_delay_ms % 1000) * 1000000) - 1000000000; - } else { - ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000); - } - - pthread_cond_timedwait(&instance->cond, &instance->lock, &ts); - } else { - assert(!pthread_cond_wait(&instance->cond, &instance->lock)); - } +#define MS_PER_S 1000 +#define NS_PER_MS 1000000 + struct timespec ts; + assert(!clock_gettime(CLOCK_REALTIME, &ts)); + uint64_t target_ms = (ts.tv_sec * MS_PER_S) + (ts.tv_nsec / NS_PER_MS) + instance->next_delay_ms; + ts.tv_sec = target_ms / MS_PER_S; + ts.tv_nsec = (target_ms % MS_PER_S) * NS_PER_MS; + pthread_cond_timedwait(&instance->cond, &instance->lock, &ts); } assert(!pthread_mutex_unlock(&instance->lock)); return NULL; @@ -329,11 +321,6 @@ static void cosmo_send_command(cosmo *instance, json_t *command) { assert(!pthread_mutex_unlock(&instance->lock)); } -// Takes ownership of arguments. -static json_t *cosmo_command(const char *name, const json_t *arguments) { - return json_pack("{ssso}", "command", name, "arguments", arguments); -} - // Public interface below @@ -459,7 +446,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal assert(!curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS)); assert(!curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS)); assert(!curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH")); - assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, CYCLE_MS / MS_PER_S)); assert(!curl_easy_setopt(instance->curl, CURLOPT_POST, 1L)); assert(!curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback)); assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback));