Retry and backoff working properly.
This commit is contained in:
@@ -1,8 +1,11 @@
|
|||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <math.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <strings.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
@@ -14,6 +17,12 @@
|
|||||||
#define COSMO_CHECK_SECONDS 10
|
#define COSMO_CHECK_SECONDS 10
|
||||||
|
|
||||||
#define min(a, b) ((a) < (b) ? (a) : (b))
|
#define min(a, b) ((a) < (b) ? (a) : (b))
|
||||||
|
#define max(a, b) ((a) > (b) ? (a) : (b))
|
||||||
|
|
||||||
|
#define DELAY_MIN_MS 250
|
||||||
|
#define DELAY_MAX_MS 32000
|
||||||
|
#define DELAY_EXPONENT 1.1
|
||||||
|
#define DELAY_STAGGER_FACTOR 10
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char client_id[COSMO_UUID_SIZE];
|
char client_id[COSMO_UUID_SIZE];
|
||||||
@@ -23,6 +32,7 @@ typedef struct {
|
|||||||
pthread_cond_t cond;
|
pthread_cond_t cond;
|
||||||
bool shutdown;
|
bool shutdown;
|
||||||
json_t *command_queue;
|
json_t *command_queue;
|
||||||
|
uint64_t next_delay_ms;
|
||||||
|
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
} cosmo;
|
} cosmo;
|
||||||
@@ -33,6 +43,8 @@ typedef struct {
|
|||||||
|
|
||||||
char *recv_buf;
|
char *recv_buf;
|
||||||
size_t recv_buf_len;
|
size_t recv_buf_len;
|
||||||
|
|
||||||
|
int64_t retry_after;
|
||||||
} cosmo_transfer;
|
} cosmo_transfer;
|
||||||
|
|
||||||
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
||||||
@@ -55,6 +67,18 @@ static size_t cosmo_write_callback(void *ptr, size_t size, size_t nmemb, void *u
|
|||||||
return to_read;
|
return to_read;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static size_t cosmo_header_callback(char *ptr, size_t size, size_t nmemb, void *userp) {
|
||||||
|
cosmo_transfer *transfer = userp;
|
||||||
|
size_t length = size * nmemb;
|
||||||
|
#define RETRY_AFTER_HEADER "Retry-After: 0\r\n"
|
||||||
|
#define RETRY_AFTER_HEADER_SIZE (sizeof(RETRY_AFTER_HEADER) - 1)
|
||||||
|
if (length == RETRY_AFTER_HEADER_SIZE &&
|
||||||
|
strncasecmp(ptr, RETRY_AFTER_HEADER, RETRY_AFTER_HEADER_SIZE) == 0) {
|
||||||
|
transfer->retry_after = 0;
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
static char *cosmo_build_rpc(cosmo *instance, json_t *commands) {
|
static char *cosmo_build_rpc(cosmo *instance, json_t *commands) {
|
||||||
json_t *to_send = json_pack("{sssssO}", "client_id", instance->client_id, "instance_id", instance->instance_id, "commands", commands);
|
json_t *to_send = json_pack("{sssssO}", "client_id", instance->client_id, "instance_id", instance->instance_id, "commands", commands);
|
||||||
assert(to_send);
|
assert(to_send);
|
||||||
@@ -77,6 +101,8 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL
|
|||||||
curl_easy_setopt(curl, CURLOPT_READDATA, transfer);
|
curl_easy_setopt(curl, CURLOPT_READDATA, transfer);
|
||||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback);
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback);
|
||||||
curl_easy_setopt(curl, CURLOPT_WRITEDATA, transfer);
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, transfer);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_HEADERDATA, transfer);
|
||||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS);
|
curl_easy_setopt(curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS);
|
||||||
res = curl_easy_perform(curl);
|
res = curl_easy_perform(curl);
|
||||||
|
|
||||||
@@ -103,11 +129,16 @@ static char *cosmo_send_http(cosmo *instance, char *request) {
|
|||||||
.send_buf = request,
|
.send_buf = request,
|
||||||
.send_buf_len = strlen(request),
|
.send_buf_len = strlen(request),
|
||||||
.recv_buf = NULL,
|
.recv_buf = NULL,
|
||||||
.recv_buf_len = 0
|
.recv_buf_len = 0,
|
||||||
|
.retry_after = -1
|
||||||
};
|
};
|
||||||
|
|
||||||
int ret = cosmo_send_http_int(instance, &transfer, curl);
|
int ret = cosmo_send_http_int(instance, &transfer, curl);
|
||||||
|
|
||||||
|
if (transfer.retry_after >= 0) {
|
||||||
|
instance->next_delay_ms = transfer.retry_after * 1000;
|
||||||
|
}
|
||||||
|
|
||||||
curl_easy_cleanup(curl);
|
curl_easy_cleanup(curl);
|
||||||
free(request);
|
free(request);
|
||||||
|
|
||||||
@@ -168,9 +199,12 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
while (!instance->shutdown) {
|
while (!instance->shutdown) {
|
||||||
while (json_array_size(instance->command_queue)) {
|
if (json_array_size(instance->command_queue)) {
|
||||||
json_t *commands = instance->command_queue;
|
json_t *commands = instance->command_queue;
|
||||||
instance->command_queue = json_array();
|
instance->command_queue = json_array();
|
||||||
|
instance->next_delay_ms = pow(instance->next_delay_ms, DELAY_EXPONENT);
|
||||||
|
instance->next_delay_ms = min(DELAY_MAX_MS, max(DELAY_MIN_MS, instance->next_delay_ms));
|
||||||
|
instance->next_delay_ms += random() % (instance->next_delay_ms / DELAY_STAGGER_FACTOR);
|
||||||
|
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
json_t *to_retry = cosmo_send_rpc(instance, commands);
|
json_t *to_retry = cosmo_send_rpc(instance, commands);
|
||||||
@@ -179,8 +213,26 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
json_array_extend(instance->command_queue, to_retry);
|
json_array_extend(instance->command_queue, to_retry);
|
||||||
json_decref(to_retry);
|
json_decref(to_retry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (json_array_size(instance->command_queue)) {
|
||||||
|
struct timeval tv;
|
||||||
|
assert(!gettimeofday(&tv, NULL));
|
||||||
|
|
||||||
|
struct timespec ts;
|
||||||
|
if (tv.tv_usec + ((instance->next_delay_ms % 1000) * 1000) > 1000000) {
|
||||||
|
// Carry
|
||||||
|
ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000) + 1;
|
||||||
|
ts.tv_nsec = (tv.tv_usec * 1000) + ((instance->next_delay_ms % 1000) * 1000000) - 1000000000;
|
||||||
|
} else {
|
||||||
|
ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
int wait = pthread_cond_timedwait(&instance->cond, &instance->lock, &ts);
|
||||||
|
assert(wait == 0 || wait == ETIMEDOUT);
|
||||||
|
} else {
|
||||||
assert(!pthread_cond_wait(&instance->cond, &instance->lock));
|
assert(!pthread_cond_wait(&instance->cond, &instance->lock));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -190,6 +242,7 @@ static void cosmo_send_command(cosmo *instance, json_t *command) {
|
|||||||
assert(command);
|
assert(command);
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
json_array_append_new(instance->command_queue, command);
|
json_array_append_new(instance->command_queue, command);
|
||||||
|
instance->next_delay_ms = 0;
|
||||||
assert(!pthread_cond_signal(&instance->cond));
|
assert(!pthread_cond_signal(&instance->cond));
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
}
|
}
|
||||||
@@ -232,6 +285,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subject, json_int_t messages, json
|
|||||||
|
|
||||||
cosmo *cosmo_create(char *client_id) {
|
cosmo *cosmo_create(char *client_id) {
|
||||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||||
|
srandomdev();
|
||||||
|
|
||||||
cosmo *instance = malloc(sizeof(cosmo));
|
cosmo *instance = malloc(sizeof(cosmo));
|
||||||
assert(instance);
|
assert(instance);
|
||||||
@@ -249,6 +303,7 @@ cosmo *cosmo_create(char *client_id) {
|
|||||||
void cosmo_destroy(cosmo *instance) {
|
void cosmo_destroy(cosmo *instance) {
|
||||||
pthread_mutex_lock(&instance->lock);
|
pthread_mutex_lock(&instance->lock);
|
||||||
instance->shutdown = 1;
|
instance->shutdown = 1;
|
||||||
|
instance->next_delay_ms = 0;
|
||||||
pthread_cond_signal(&instance->cond);
|
pthread_cond_signal(&instance->cond);
|
||||||
pthread_mutex_unlock(&instance->lock);
|
pthread_mutex_unlock(&instance->lock);
|
||||||
assert(!pthread_join(instance->thread, NULL));
|
assert(!pthread_join(instance->thread, NULL));
|
||||||
@@ -268,7 +323,7 @@ int main(int argc, char *argv[]) {
|
|||||||
json_t *subject = cosmo_subject("foobar", NULL, NULL);
|
json_t *subject = cosmo_subject("foobar", NULL, NULL);
|
||||||
cosmo_subscribe(instance, subject, -1, 0);
|
cosmo_subscribe(instance, subject, -1, 0);
|
||||||
json_decref(subject);
|
json_decref(subject);
|
||||||
sleep(5);
|
sleep(120);
|
||||||
cosmo_destroy(instance);
|
cosmo_destroy(instance);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user