From d596ff1444455dad3ee3fdf5724d69321c053525 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Fri, 5 Jun 2015 23:20:07 -0700 Subject: [PATCH] Retry and backoff working properly. --- clients/c/test.c | 63 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/clients/c/test.c b/clients/c/test.c index 87629de..3e92de0 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -1,8 +1,11 @@ #include +#include +#include #include #include #include #include +#include #include #include @@ -14,6 +17,12 @@ #define COSMO_CHECK_SECONDS 10 #define min(a, b) ((a) < (b) ? (a) : (b)) +#define max(a, b) ((a) > (b) ? (a) : (b)) + +#define DELAY_MIN_MS 250 +#define DELAY_MAX_MS 32000 +#define DELAY_EXPONENT 1.1 +#define DELAY_STAGGER_FACTOR 10 typedef struct { char client_id[COSMO_UUID_SIZE]; @@ -23,6 +32,7 @@ typedef struct { pthread_cond_t cond; bool shutdown; json_t *command_queue; + uint64_t next_delay_ms; pthread_t thread; } cosmo; @@ -33,6 +43,8 @@ typedef struct { char *recv_buf; size_t recv_buf_len; + + int64_t retry_after; } cosmo_transfer; static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) { @@ -55,6 +67,18 @@ static size_t cosmo_write_callback(void *ptr, size_t size, size_t nmemb, void *u return to_read; } +static size_t cosmo_header_callback(char *ptr, size_t size, size_t nmemb, void *userp) { + cosmo_transfer *transfer = userp; + size_t length = size * nmemb; +#define RETRY_AFTER_HEADER "Retry-After: 0\r\n" +#define RETRY_AFTER_HEADER_SIZE (sizeof(RETRY_AFTER_HEADER) - 1) + if (length == RETRY_AFTER_HEADER_SIZE && + strncasecmp(ptr, RETRY_AFTER_HEADER, RETRY_AFTER_HEADER_SIZE) == 0) { + transfer->retry_after = 0; + } + return length; +} + static char *cosmo_build_rpc(cosmo *instance, json_t *commands) { json_t *to_send = json_pack("{sssssO}", "client_id", instance->client_id, "instance_id", instance->instance_id, "commands", commands); assert(to_send); @@ -77,6 +101,8 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL curl_easy_setopt(curl, CURLOPT_READDATA, transfer); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback); curl_easy_setopt(curl, CURLOPT_WRITEDATA, transfer); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback); + curl_easy_setopt(curl, CURLOPT_HEADERDATA, transfer); curl_easy_setopt(curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS); res = curl_easy_perform(curl); @@ -103,11 +129,16 @@ static char *cosmo_send_http(cosmo *instance, char *request) { .send_buf = request, .send_buf_len = strlen(request), .recv_buf = NULL, - .recv_buf_len = 0 + .recv_buf_len = 0, + .retry_after = -1 }; int ret = cosmo_send_http_int(instance, &transfer, curl); + if (transfer.retry_after >= 0) { + instance->next_delay_ms = transfer.retry_after * 1000; + } + curl_easy_cleanup(curl); free(request); @@ -168,9 +199,12 @@ static void *cosmo_thread_main(void *arg) { assert(!pthread_mutex_lock(&instance->lock)); while (!instance->shutdown) { - while (json_array_size(instance->command_queue)) { + if (json_array_size(instance->command_queue)) { json_t *commands = instance->command_queue; instance->command_queue = json_array(); + instance->next_delay_ms = pow(instance->next_delay_ms, DELAY_EXPONENT); + instance->next_delay_ms = min(DELAY_MAX_MS, max(DELAY_MIN_MS, instance->next_delay_ms)); + instance->next_delay_ms += random() % (instance->next_delay_ms / DELAY_STAGGER_FACTOR); assert(!pthread_mutex_unlock(&instance->lock)); json_t *to_retry = cosmo_send_rpc(instance, commands); @@ -179,7 +213,25 @@ static void *cosmo_thread_main(void *arg) { json_array_extend(instance->command_queue, to_retry); json_decref(to_retry); } - assert(!pthread_cond_wait(&instance->cond, &instance->lock)); + + if (json_array_size(instance->command_queue)) { + struct timeval tv; + assert(!gettimeofday(&tv, NULL)); + + struct timespec ts; + if (tv.tv_usec + ((instance->next_delay_ms % 1000) * 1000) > 1000000) { + // Carry + ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000) + 1; + ts.tv_nsec = (tv.tv_usec * 1000) + ((instance->next_delay_ms % 1000) * 1000000) - 1000000000; + } else { + ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000); + } + + int wait = pthread_cond_timedwait(&instance->cond, &instance->lock, &ts); + assert(wait == 0 || wait == ETIMEDOUT); + } else { + assert(!pthread_cond_wait(&instance->cond, &instance->lock)); + } } assert(!pthread_mutex_unlock(&instance->lock)); return NULL; @@ -190,6 +242,7 @@ static void cosmo_send_command(cosmo *instance, json_t *command) { assert(command); assert(!pthread_mutex_lock(&instance->lock)); json_array_append_new(instance->command_queue, command); + instance->next_delay_ms = 0; assert(!pthread_cond_signal(&instance->cond)); assert(!pthread_mutex_unlock(&instance->lock)); } @@ -232,6 +285,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subject, json_int_t messages, json cosmo *cosmo_create(char *client_id) { curl_global_init(CURL_GLOBAL_DEFAULT); + srandomdev(); cosmo *instance = malloc(sizeof(cosmo)); assert(instance); @@ -249,6 +303,7 @@ cosmo *cosmo_create(char *client_id) { void cosmo_destroy(cosmo *instance) { pthread_mutex_lock(&instance->lock); instance->shutdown = 1; + instance->next_delay_ms = 0; pthread_cond_signal(&instance->cond); pthread_mutex_unlock(&instance->lock); assert(!pthread_join(instance->thread, NULL)); @@ -268,7 +323,7 @@ int main(int argc, char *argv[]) { json_t *subject = cosmo_subject("foobar", NULL, NULL); cosmo_subscribe(instance, subject, -1, 0); json_decref(subject); - sleep(5); + sleep(120); cosmo_destroy(instance); return 0;