Move command queue to our own linked list, so we can attach promises.

Fix OS X build.
This commit is contained in:
Ian Gulliver
2015-06-28 09:54:03 -07:00
parent f3e38ec8a6
commit b29690734a
2 changed files with 93 additions and 24 deletions

View File

@@ -3,6 +3,12 @@
// Declarations that aren't in the public API but are available to the test suite.
struct cosmo_command {
struct cosmo_command *prev;
struct cosmo_command *next;
json_t *command;
};
struct cosmo {
char client_id[COSMO_UUID_SIZE];
char instance_id[COSMO_UUID_SIZE];
@@ -15,7 +21,8 @@ struct cosmo {
bool shutdown;
char *profile;
char *generation;
json_t *command_queue;
struct cosmo_command *command_queue_head;
struct cosmo_command *command_queue_tail;
json_t *ack;
json_t *subscriptions;
uint64_t next_delay_ms;

View File

@@ -18,6 +18,22 @@
#define CYCLE_STAGGER_FACTOR 10
#define CONNECT_TIMEOUT_S 60
#ifdef __MACH__
// OS X is missing clock_gettime()
#define CLOCK_MONOTONIC 0
#define CLOCK_REALTIME 0
int clock_gettime(int clk_id, struct timespec *ts) {
struct timeval tv;
int err = gettimeofday(&tv, NULL);
if (err) {
return err;
}
ts->tv_sec = tv.tv_sec;
ts->tv_nsec = tv.tv_usec * 1000;
return 0;
}
#endif
enum {
SUBSCRIPTION_PENDING,
SUBSCRIPTION_ACTIVE,
@@ -48,6 +64,18 @@ static void cosmo_log(cosmo *instance, const char *fmt, ...) {
va_end(ap);
}
static void cosmo_append_command(struct cosmo_command **head, struct cosmo_command **tail, struct cosmo_command *command) {
command->prev = *tail;
if (command->prev) {
command->prev->next = command;
}
command->next = NULL;
*tail = command;
if (!*head) {
*head = command;
}
}
static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
size_t i;
json_t *subscription;
@@ -60,7 +88,9 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
}
static void cosmo_send_command_locked(cosmo *instance, json_t *command) {
json_array_append_new(instance->command_queue, command);
struct cosmo_command *command_obj = malloc(sizeof(*command_obj));
command_obj->command = command;
cosmo_append_command(&instance->command_queue_head, &instance->command_queue_tail, command_obj);
instance->next_delay_ms = 0;
}
@@ -280,13 +310,17 @@ static json_t *cosmo_command(const char *name, const json_t *arguments) {
// Takes ownership of commands.
// Takes ownership of ack.
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_command *commands, json_t *ack) {
json_t *int_commands = json_array();
// Always poll.
json_t *arguments = json_pack("{so}", "ack", ack);
json_array_append_new(int_commands, cosmo_command("poll", arguments));
json_array_extend(int_commands, commands);
struct cosmo_command *command_iter = commands;
while (command_iter) {
json_array_append(int_commands, command_iter->command);
command_iter = command_iter->next;
}
char *request = cosmo_build_rpc(instance, int_commands);
cosmo_log(instance, "--> %s", request);
@@ -372,27 +406,39 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
assert(!pthread_mutex_unlock(&instance->lock));
}
json_t *to_retry = json_array();
json_t *command;
json_array_foreach(commands, index, command) {
// +1 for the poll offset from int_commands.
json_t *command_response = json_array_get(command_responses, index + 1);
command_iter = commands;
struct cosmo_command *to_retry_head = NULL, *to_retry_tail = NULL;
json_t *command_response;
json_array_foreach(command_responses, index, command_response) {
if (index == 0) {
// Skip poll response; don't increment command_iter
continue;
}
if (!command_iter) {
cosmo_log(instance, "more responses than requests");
continue;
}
struct cosmo_command *command_next = command_iter->next;
char *result;
if (json_unpack(command_response, "{ss}", "result", &result)) {
cosmo_log(instance, "invalid command response");
json_array_append(to_retry, command);
cosmo_append_command(&to_retry_head, &to_retry_tail, command_iter);
command_iter = command_next;
continue;
}
if (!strcmp(result, "retry")) {
json_array_append(to_retry, command);
cosmo_append_command(&to_retry_head, &to_retry_tail, command_iter);
command_iter = command_next;
continue;
}
char *command_name;
assert(!json_unpack(command, "{ss}", "command", &command_name));
assert(!json_unpack(command_iter->command, "{ss}", "command", &command_name));
if (!strcmp(command_name, "subscribe")) {
json_t *subject = NULL;
assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject));
assert(!json_unpack(command_iter->command, "{s{so}}", "arguments", "subject", &subject));
assert(!pthread_mutex_lock(&instance->lock));
json_t *subscription = cosmo_find_subscription(instance, subject);
if (subscription) {
@@ -401,12 +447,15 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
}
assert(!pthread_mutex_unlock(&instance->lock));
}
json_decref(command_iter->command);
free(command_iter);
command_iter = command_next;
}
json_decref(commands);
json_decref(received);
return to_retry;
return to_retry_head;
}
static void *cosmo_thread_main(void *arg) {
@@ -414,8 +463,8 @@ static void *cosmo_thread_main(void *arg) {
assert(!pthread_mutex_lock(&instance->lock));
while (!instance->shutdown) {
json_t *commands = instance->command_queue;
instance->command_queue = json_array();
struct cosmo_command *commands = instance->command_queue_head;
instance->command_queue_head = instance->command_queue_tail = NULL;
json_t *ack = instance->ack;
instance->ack = json_array();
@@ -423,7 +472,7 @@ static void *cosmo_thread_main(void *arg) {
instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR);
assert(!pthread_mutex_unlock(&instance->lock));
json_t *to_retry = cosmo_send_rpc(instance, commands, ack);
struct cosmo_command *to_retry = cosmo_send_rpc(instance, commands, ack);
{
struct timespec now;
assert(!clock_gettime(CLOCK_MONOTONIC, &now));
@@ -433,8 +482,16 @@ static void *cosmo_thread_main(void *arg) {
}
assert(!pthread_mutex_lock(&instance->lock));
json_array_extend(instance->command_queue, to_retry);
json_decref(to_retry);
if (to_retry) {
to_retry->prev = instance->command_queue_tail;
if (to_retry->prev) {
to_retry->prev->next = to_retry;
}
instance->command_queue_tail = to_retry;
if (!instance->command_queue_head) {
instance->command_queue_head = to_retry;
}
}
#define MS_PER_S 1000
#define NS_PER_MS 1000000
@@ -584,13 +641,13 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
}
instance->passthrough = passthrough;
cosmo_uuid(instance->instance_id);
if (client_id) {
strcpy(instance->client_id, client_id);
} else {
cosmo_uuid(instance->client_id);
cosmo_handle_client_id_change(instance);
}
cosmo_uuid(instance->instance_id);
assert(!pthread_mutex_init(&instance->lock, NULL));
assert(!pthread_cond_init(&instance->cond, NULL));
@@ -613,8 +670,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
instance->shutdown = false;
instance->profile = NULL;
instance->generation = NULL;
instance->command_queue = json_array();
assert(instance->command_queue);
instance->command_queue_head = instance->command_queue_tail = NULL;
instance->ack = json_array();
assert(instance->ack);
instance->subscriptions = json_array();
@@ -639,7 +695,13 @@ void cosmo_shutdown(cosmo *instance) {
assert(!pthread_mutex_destroy(&instance->lock));
assert(!pthread_cond_destroy(&instance->cond));
json_decref(instance->command_queue);
struct cosmo_command *command_iter = instance->command_queue_head;
while (command_iter) {
json_decref(command_iter->command);
struct cosmo_command *next = command_iter->next;
free(command_iter);
command_iter = next;
}
json_decref(instance->ack);
json_decref(instance->subscriptions);
free(instance->profile);