Start of move to polling.
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
CC ?= gcc
|
CC ?= gcc
|
||||||
CFLAGS ?= -Wall -Werror -I/usr/local/include -fpic -O -g
|
CFLAGS ?= -Wall -Werror -I/usr/local/include -fpic -O -g
|
||||||
LDFLAGS ?= -Wall -L/usr/local/lib -L. -O
|
LDFLAGS ?= -Wall -L/usr/local/lib -L. -O
|
||||||
LIBS ?= -lcurl -ljansson -luuid -lm -lpthread
|
LIBS ?= -lcurl -ljansson -luuid -lpthread
|
||||||
|
|
||||||
all: libcosmopolite.so
|
all: libcosmopolite.so
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <math.h>
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <strings.h>
|
#include <strings.h>
|
||||||
@@ -10,15 +9,11 @@
|
|||||||
|
|
||||||
#include "cosmopolite.h"
|
#include "cosmopolite.h"
|
||||||
|
|
||||||
#define COSMO_CHECK_SECONDS 10
|
|
||||||
|
|
||||||
#define min(a, b) ((a) < (b) ? (a) : (b))
|
#define min(a, b) ((a) < (b) ? (a) : (b))
|
||||||
#define max(a, b) ((a) > (b) ? (a) : (b))
|
#define max(a, b) ((a) > (b) ? (a) : (b))
|
||||||
|
|
||||||
#define DELAY_MIN_MS 250
|
#define CYCLE_MS 10000
|
||||||
#define DELAY_MAX_MS 32000
|
#define CYCLE_STAGGER_FACTOR 10
|
||||||
#define DELAY_EXPONENT 1.1
|
|
||||||
#define DELAY_STAGGER_FACTOR 10
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char *send_buf;
|
char *send_buf;
|
||||||
@@ -193,8 +188,16 @@ static void cosmo_handle_event(cosmo *instance, json_t *event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Takes ownership of arguments.
|
||||||
|
static json_t *cosmo_command(const char *name, const json_t *arguments) {
|
||||||
|
return json_pack("{ssso}", "command", name, "arguments", arguments);
|
||||||
|
}
|
||||||
|
|
||||||
// Takes ownership of commands.
|
// Takes ownership of commands.
|
||||||
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
||||||
|
// Always poll.
|
||||||
|
json_array_append(commands, cosmo_command("poll", json_array()));
|
||||||
|
|
||||||
char *request = cosmo_build_rpc(instance, commands);
|
char *request = cosmo_build_rpc(instance, commands);
|
||||||
|
|
||||||
char *response = cosmo_send_http(instance, request);
|
char *response = cosmo_send_http(instance, request);
|
||||||
@@ -282,12 +285,11 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
while (!instance->shutdown) {
|
while (!instance->shutdown) {
|
||||||
if (json_array_size(instance->command_queue)) {
|
|
||||||
json_t *commands = instance->command_queue;
|
json_t *commands = instance->command_queue;
|
||||||
instance->command_queue = json_array();
|
instance->command_queue = json_array();
|
||||||
instance->next_delay_ms = pow(instance->next_delay_ms, DELAY_EXPONENT);
|
|
||||||
instance->next_delay_ms = min(DELAY_MAX_MS, max(DELAY_MIN_MS, instance->next_delay_ms));
|
instance->next_delay_ms = CYCLE_MS;
|
||||||
instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / DELAY_STAGGER_FACTOR);
|
instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR);
|
||||||
|
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
json_t *to_retry = cosmo_send_rpc(instance, commands);
|
json_t *to_retry = cosmo_send_rpc(instance, commands);
|
||||||
@@ -295,25 +297,15 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
|
|
||||||
json_array_extend(instance->command_queue, to_retry);
|
json_array_extend(instance->command_queue, to_retry);
|
||||||
json_decref(to_retry);
|
json_decref(to_retry);
|
||||||
}
|
|
||||||
|
|
||||||
if (json_array_size(instance->command_queue)) {
|
|
||||||
struct timeval tv;
|
|
||||||
assert(!gettimeofday(&tv, NULL));
|
|
||||||
|
|
||||||
|
#define MS_PER_S 1000
|
||||||
|
#define NS_PER_MS 1000000
|
||||||
struct timespec ts;
|
struct timespec ts;
|
||||||
if (tv.tv_usec + ((instance->next_delay_ms % 1000) * 1000) > 1000000) {
|
assert(!clock_gettime(CLOCK_REALTIME, &ts));
|
||||||
// Carry
|
uint64_t target_ms = (ts.tv_sec * MS_PER_S) + (ts.tv_nsec / NS_PER_MS) + instance->next_delay_ms;
|
||||||
ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000) + 1;
|
ts.tv_sec = target_ms / MS_PER_S;
|
||||||
ts.tv_nsec = (tv.tv_usec * 1000) + ((instance->next_delay_ms % 1000) * 1000000) - 1000000000;
|
ts.tv_nsec = (target_ms % MS_PER_S) * NS_PER_MS;
|
||||||
} else {
|
|
||||||
ts.tv_sec = tv.tv_sec + (instance->next_delay_ms / 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_cond_timedwait(&instance->cond, &instance->lock, &ts);
|
pthread_cond_timedwait(&instance->cond, &instance->lock, &ts);
|
||||||
} else {
|
|
||||||
assert(!pthread_cond_wait(&instance->cond, &instance->lock));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
return NULL;
|
return NULL;
|
||||||
@@ -329,11 +321,6 @@ static void cosmo_send_command(cosmo *instance, json_t *command) {
|
|||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Takes ownership of arguments.
|
|
||||||
static json_t *cosmo_command(const char *name, const json_t *arguments) {
|
|
||||||
return json_pack("{ssso}", "command", name, "arguments", arguments);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Public interface below
|
// Public interface below
|
||||||
|
|
||||||
@@ -459,7 +446,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
|
|||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH"));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH"));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, CYCLE_MS / MS_PER_S));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_POST, 1L));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_POST, 1L));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback));
|
||||||
|
|||||||
Reference in New Issue
Block a user