More resilient method for determining when we've reconnected.

This commit is contained in:
Ian Gulliver
2015-06-20 05:45:26 +00:00
parent 9a726558ca
commit 33ce1fe805
2 changed files with 11 additions and 5 deletions

View File

@@ -13,6 +13,7 @@ struct cosmo {
pthread_cond_t cond; pthread_cond_t cond;
bool shutdown; bool shutdown;
char *profile; char *profile;
char *generation;
json_t *command_queue; json_t *command_queue;
json_t *ack; json_t *ack;
json_t *subscriptions; json_t *subscriptions;

View File

@@ -307,12 +307,15 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
} }
json_t *poll_response = json_array_get(command_responses, 0); json_t *poll_response = json_array_get(command_responses, 0);
int new_poll; const char *instance_generation;
if (json_unpack(poll_response, "{sb}", "new", &new_poll)) { if (json_unpack(poll_response, "{ss}", "instance_generation", &instance_generation)) {
fprintf(stderr, "invalid poll response\n"); fprintf(stderr, "invalid poll response\n");
} else { } else {
if (new_poll) { assert(!pthread_mutex_lock(&instance->lock));
assert(!pthread_mutex_lock(&instance->lock)); if (!instance->generation || strcmp(instance_generation, instance->generation)) {
free(instance->generation);
instance->generation = strdup(instance_generation);
size_t i; size_t i;
json_t *subscription; json_t *subscription;
json_array_foreach(instance->subscriptions, i, subscription) { json_array_foreach(instance->subscriptions, i, subscription) {
@@ -342,8 +345,8 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments)); cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments));
} }
assert(!pthread_mutex_unlock(&instance->lock));
} }
assert(!pthread_mutex_unlock(&instance->lock));
} }
json_t *to_retry = json_array(); json_t *to_retry = json_array();
@@ -575,6 +578,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
instance->shutdown = false; instance->shutdown = false;
instance->profile = NULL; instance->profile = NULL;
instance->generation = NULL;
instance->command_queue = json_array(); instance->command_queue = json_array();
assert(instance->command_queue); assert(instance->command_queue);
instance->ack = json_array(); instance->ack = json_array();
@@ -605,6 +609,7 @@ void cosmo_shutdown(cosmo *instance) {
json_decref(instance->ack); json_decref(instance->ack);
json_decref(instance->subscriptions); json_decref(instance->subscriptions);
free(instance->profile); free(instance->profile);
free(instance->generation);
curl_easy_cleanup(instance->curl); curl_easy_cleanup(instance->curl);
free(instance); free(instance);