From b29690734a9299e1e28662d5dec0f617a739bc7f Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Jun 2015 09:54:03 -0700 Subject: [PATCH] Move command queue to our own linked list, so we can attach promises. Fix OS X build. --- clients/c/cosmopolite-int.h | 9 ++- clients/c/cosmopolite.c | 108 ++++++++++++++++++++++++++++-------- 2 files changed, 93 insertions(+), 24 deletions(-) diff --git a/clients/c/cosmopolite-int.h b/clients/c/cosmopolite-int.h index 93f2166..c9973f5 100644 --- a/clients/c/cosmopolite-int.h +++ b/clients/c/cosmopolite-int.h @@ -3,6 +3,12 @@ // Declarations that aren't in the public API but are available to the test suite. +struct cosmo_command { + struct cosmo_command *prev; + struct cosmo_command *next; + json_t *command; +}; + struct cosmo { char client_id[COSMO_UUID_SIZE]; char instance_id[COSMO_UUID_SIZE]; @@ -15,7 +21,8 @@ struct cosmo { bool shutdown; char *profile; char *generation; - json_t *command_queue; + struct cosmo_command *command_queue_head; + struct cosmo_command *command_queue_tail; json_t *ack; json_t *subscriptions; uint64_t next_delay_ms; diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 6bf4879..e05dd99 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -18,6 +18,22 @@ #define CYCLE_STAGGER_FACTOR 10 #define CONNECT_TIMEOUT_S 60 +#ifdef __MACH__ +// OS X is missing clock_gettime() +#define CLOCK_MONOTONIC 0 +#define CLOCK_REALTIME 0 +int clock_gettime(int clk_id, struct timespec *ts) { + struct timeval tv; + int err = gettimeofday(&tv, NULL); + if (err) { + return err; + } + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000; + return 0; +} +#endif + enum { SUBSCRIPTION_PENDING, SUBSCRIPTION_ACTIVE, @@ -48,6 +64,18 @@ static void cosmo_log(cosmo *instance, const char *fmt, ...) { va_end(ap); } +static void cosmo_append_command(struct cosmo_command **head, struct cosmo_command **tail, struct cosmo_command *command) { + command->prev = *tail; + if (command->prev) { + command->prev->next = command; + } + command->next = NULL; + *tail = command; + if (!*head) { + *head = command; + } +} + static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { size_t i; json_t *subscription; @@ -60,7 +88,9 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { } static void cosmo_send_command_locked(cosmo *instance, json_t *command) { - json_array_append_new(instance->command_queue, command); + struct cosmo_command *command_obj = malloc(sizeof(*command_obj)); + command_obj->command = command; + cosmo_append_command(&instance->command_queue_head, &instance->command_queue_tail, command_obj); instance->next_delay_ms = 0; } @@ -280,13 +310,17 @@ static json_t *cosmo_command(const char *name, const json_t *arguments) { // Takes ownership of commands. // Takes ownership of ack. -static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { +static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_command *commands, json_t *ack) { json_t *int_commands = json_array(); // Always poll. json_t *arguments = json_pack("{so}", "ack", ack); json_array_append_new(int_commands, cosmo_command("poll", arguments)); - json_array_extend(int_commands, commands); + struct cosmo_command *command_iter = commands; + while (command_iter) { + json_array_append(int_commands, command_iter->command); + command_iter = command_iter->next; + } char *request = cosmo_build_rpc(instance, int_commands); cosmo_log(instance, "--> %s", request); @@ -372,27 +406,39 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { assert(!pthread_mutex_unlock(&instance->lock)); } - json_t *to_retry = json_array(); - json_t *command; - json_array_foreach(commands, index, command) { - // +1 for the poll offset from int_commands. - json_t *command_response = json_array_get(command_responses, index + 1); + command_iter = commands; + struct cosmo_command *to_retry_head = NULL, *to_retry_tail = NULL; + json_t *command_response; + json_array_foreach(command_responses, index, command_response) { + if (index == 0) { + // Skip poll response; don't increment command_iter + continue; + } + + if (!command_iter) { + cosmo_log(instance, "more responses than requests"); + continue; + } + struct cosmo_command *command_next = command_iter->next; + char *result; if (json_unpack(command_response, "{ss}", "result", &result)) { cosmo_log(instance, "invalid command response"); - json_array_append(to_retry, command); + cosmo_append_command(&to_retry_head, &to_retry_tail, command_iter); + command_iter = command_next; continue; } if (!strcmp(result, "retry")) { - json_array_append(to_retry, command); + cosmo_append_command(&to_retry_head, &to_retry_tail, command_iter); + command_iter = command_next; continue; } char *command_name; - assert(!json_unpack(command, "{ss}", "command", &command_name)); + assert(!json_unpack(command_iter->command, "{ss}", "command", &command_name)); if (!strcmp(command_name, "subscribe")) { json_t *subject = NULL; - assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject)); + assert(!json_unpack(command_iter->command, "{s{so}}", "arguments", "subject", &subject)); assert(!pthread_mutex_lock(&instance->lock)); json_t *subscription = cosmo_find_subscription(instance, subject); if (subscription) { @@ -401,12 +447,15 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { } assert(!pthread_mutex_unlock(&instance->lock)); } + + json_decref(command_iter->command); + free(command_iter); + command_iter = command_next; } - json_decref(commands); json_decref(received); - return to_retry; + return to_retry_head; } static void *cosmo_thread_main(void *arg) { @@ -414,8 +463,8 @@ static void *cosmo_thread_main(void *arg) { assert(!pthread_mutex_lock(&instance->lock)); while (!instance->shutdown) { - json_t *commands = instance->command_queue; - instance->command_queue = json_array(); + struct cosmo_command *commands = instance->command_queue_head; + instance->command_queue_head = instance->command_queue_tail = NULL; json_t *ack = instance->ack; instance->ack = json_array(); @@ -423,7 +472,7 @@ static void *cosmo_thread_main(void *arg) { instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR); assert(!pthread_mutex_unlock(&instance->lock)); - json_t *to_retry = cosmo_send_rpc(instance, commands, ack); + struct cosmo_command *to_retry = cosmo_send_rpc(instance, commands, ack); { struct timespec now; assert(!clock_gettime(CLOCK_MONOTONIC, &now)); @@ -433,8 +482,16 @@ static void *cosmo_thread_main(void *arg) { } assert(!pthread_mutex_lock(&instance->lock)); - json_array_extend(instance->command_queue, to_retry); - json_decref(to_retry); + if (to_retry) { + to_retry->prev = instance->command_queue_tail; + if (to_retry->prev) { + to_retry->prev->next = to_retry; + } + instance->command_queue_tail = to_retry; + if (!instance->command_queue_head) { + instance->command_queue_head = to_retry; + } + } #define MS_PER_S 1000 #define NS_PER_MS 1000000 @@ -584,13 +641,13 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal } instance->passthrough = passthrough; + cosmo_uuid(instance->instance_id); if (client_id) { strcpy(instance->client_id, client_id); } else { cosmo_uuid(instance->client_id); cosmo_handle_client_id_change(instance); } - cosmo_uuid(instance->instance_id); assert(!pthread_mutex_init(&instance->lock, NULL)); assert(!pthread_cond_init(&instance->cond, NULL)); @@ -613,8 +670,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal instance->shutdown = false; instance->profile = NULL; instance->generation = NULL; - instance->command_queue = json_array(); - assert(instance->command_queue); + instance->command_queue_head = instance->command_queue_tail = NULL; instance->ack = json_array(); assert(instance->ack); instance->subscriptions = json_array(); @@ -639,7 +695,13 @@ void cosmo_shutdown(cosmo *instance) { assert(!pthread_mutex_destroy(&instance->lock)); assert(!pthread_cond_destroy(&instance->cond)); - json_decref(instance->command_queue); + struct cosmo_command *command_iter = instance->command_queue_head; + while (command_iter) { + json_decref(command_iter->command); + struct cosmo_command *next = command_iter->next; + free(command_iter); + command_iter = next; + } json_decref(instance->ack); json_decref(instance->subscriptions); free(instance->profile);