diff --git a/clients/c/cosmopolite-int.h b/clients/c/cosmopolite-int.h index 6858d4b..2da8957 100644 --- a/clients/c/cosmopolite-int.h +++ b/clients/c/cosmopolite-int.h @@ -20,8 +20,8 @@ struct cosmo { pthread_mutex_t lock; pthread_cond_t cond; bool shutdown; - char *profile; - char *generation; + json_t *profile; + json_t *generation; struct cosmo_command *command_queue_head; struct cosmo_command *command_queue_tail; json_t *ack; diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index efa0239..b866377 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -309,6 +309,38 @@ static json_t *cosmo_command(const char *name, const json_t *arguments) { return json_pack("{ssso}", "command", name, "arguments", arguments); } +static void cosmo_resubscribe(cosmo *instance) { + size_t i; + json_t *subscription; + json_array_foreach(instance->subscriptions, i, subscription) { + int state; + json_t *subject, *messages; + assert(!json_unpack(subscription, "{sisoso}", "state", &state, "subject", &subject, "messages", &messages)); + + if (state == SUBSCRIPTION_PENDING) { + continue; + } + + json_t *arguments = json_pack("{sO}", "subject", subject); + if (json_array_size(messages)) { + // Restart at the last actual ID we received. + json_t *last_message = json_array_get(messages, json_array_size(messages) - 1); + json_object_set(arguments, "last_id", json_object_get(last_message, "id")); + } else { + json_t *num_messages = json_object_get(subscription, "num_messages"); + if (num_messages) { + json_object_set(arguments, "messages", num_messages); + } + json_t *last_id = json_object_get(subscription, "last_id"); + if (last_id) { + json_object_set(arguments, "last_id", last_id); + } + } + + cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), NULL); + } +} + // Takes ownership of commands. // Takes ownership of ack. static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_command *commands, json_t *ack) { @@ -342,17 +374,17 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman } free(response); - json_t *command_responses, *events; - char *profile; - if (json_unpack(received, "{sssoso}", "profile", &profile, "responses", &command_responses, "events", &events)) { + json_t *command_responses, *events, *profile; + if (json_unpack(received, "{sososo}", "profile", &profile, "responses", &command_responses, "events", &events)) { cosmo_log(instance, "invalid server response"); json_decref(received); return commands; } - if (!instance->profile || strcmp(instance->profile, profile)) { - free(instance->profile); - instance->profile = strdup(profile); + if (!json_equal(instance->profile, profile)) { + json_decref(instance->profile); + json_incref(profile); + instance->profile = profile; } assert(!clock_gettime(CLOCK_MONOTONIC, &instance->last_success)); @@ -365,44 +397,16 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman } json_t *poll_response = json_array_get(command_responses, 0); - const char *instance_generation; - if (json_unpack(poll_response, "{ss}", "instance_generation", &instance_generation)) { + json_t *instance_generation; + if (json_unpack(poll_response, "{so}", "instance_generation", &instance_generation)) { cosmo_log(instance, "invalid poll response"); } else { assert(!pthread_mutex_lock(&instance->lock)); - if (!instance->generation || strcmp(instance_generation, instance->generation)) { - free(instance->generation); - instance->generation = strdup(instance_generation); - - size_t i; - json_t *subscription; - json_array_foreach(instance->subscriptions, i, subscription) { - int state; - json_t *subject, *messages; - assert(!json_unpack(subscription, "{sisoso}", "state", &state, "subject", &subject, "messages", &messages)); - - if (state == SUBSCRIPTION_PENDING) { - continue; - } - - json_t *arguments = json_pack("{sO}", "subject", subject); - if (json_array_size(messages)) { - // Restart at the last actual ID we received. - json_t *last_message = json_array_get(messages, json_array_size(messages) - 1); - json_object_set(arguments, "last_id", json_object_get(last_message, "id")); - } else { - json_t *num_messages = json_object_get(subscription, "num_messages"); - if (num_messages) { - json_object_set(arguments, "messages", num_messages); - } - json_t *last_id = json_object_get(subscription, "last_id"); - if (last_id) { - json_object_set(arguments, "last_id", last_id); - } - } - - cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), NULL); - } + if (!json_equal(instance_generation, instance->generation)) { + json_decref(instance->generation); + json_incref(instance_generation); + instance->generation = instance_generation; + cosmo_resubscribe(instance); } assert(!pthread_mutex_unlock(&instance->lock)); } @@ -517,7 +521,7 @@ void cosmo_uuid(char *uuid) { } const char *cosmo_current_profile(cosmo *instance) { - return instance->profile; + return json_string_value(instance->profile); } json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by) { @@ -669,8 +673,8 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback)); instance->shutdown = false; - instance->profile = NULL; - instance->generation = NULL; + instance->profile = json_null(); + instance->generation = json_null(); instance->command_queue_head = instance->command_queue_tail = NULL; instance->ack = json_array(); assert(instance->ack); @@ -705,8 +709,8 @@ void cosmo_shutdown(cosmo *instance) { } json_decref(instance->ack); json_decref(instance->subscriptions); - free(instance->profile); - free(instance->generation); + json_decref(instance->profile); + json_decref(instance->generation); curl_easy_cleanup(instance->curl); free(instance);