diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 58aff61..72ab98d 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -97,7 +97,6 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) { long return_code; assert(curl_easy_getinfo(instance->curl, CURLINFO_RESPONSE_CODE, &return_code) == CURLE_OK); if (return_code != 200) { - fprintf(stderr, "server returned error: %ld\n", return_code); return false; } @@ -229,6 +228,7 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { char *profile; if (json_unpack(received, "{sssoso}", "profile", &profile, "responses", &command_responses, "events", &events)) { fprintf(stderr, "invalid server response\n"); + json_decref(received); return commands; } @@ -249,7 +249,38 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { fprintf(stderr, "invalid poll response\n"); } else { if (new_poll) { - // TODO: resubscribe to ACTIVE subscriptions + assert(!pthread_mutex_lock(&instance->lock)); + size_t i; + json_t *subscription; + json_array_foreach(instance->subscriptions, i, subscription) { + int state; + json_t *subject, *messages; + assert(!json_unpack(subscription, "{sisoso}", "state", &state, "subject", &subject, "messages", &messages)); + + if (state == SUBSCRIPTION_PENDING) { + continue; + } + + json_t *arguments = json_pack("{sO}", "subject", subject); + if (json_array_size(messages)) { + // Restart at the last actual ID we received. + json_t *last_message = json_array_get(messages, json_array_size(messages) - 1); + json_object_set(arguments, "last_id", json_object_get(last_message, "id")); + } else { + json_t *num_messages = json_object_get(subscription, "num_messages"); + if (num_messages) { + json_object_set(arguments, "messages", num_messages); + } + json_t *last_id = json_object_get(subscription, "last_id"); + if (last_id) { + json_object_set(arguments, "last_id", last_id); + } + } + + json_array_append_new(instance->command_queue, cosmo_command("subscribe", arguments)); + instance->next_delay_ms = 0; + } + assert(!pthread_mutex_unlock(&instance->lock)); } } @@ -368,7 +399,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages json_t *arguments = json_pack("{sO}", "subject", subject); if (messages) { json_object_set_new(arguments, "messages", json_integer(messages)); - json_object_set_new(subscription, "messages", json_integer(messages)); + json_object_set_new(subscription, "num_messages", json_integer(messages)); } if (last_id) { json_object_set_new(arguments, "last_id", json_integer(last_id)); diff --git a/clients/c/test.c b/clients/c/test.c index 17b6884..6f8590b 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -142,10 +142,38 @@ bool test_logout_fires(test_state *state) { return true; } +bool test_resubscribe(test_state *state) { + cosmo *client = create_client(state); + + json_t *subject = random_subject(NULL, NULL); + cosmo_subscribe(client, subject, -1, 0); + + json_t *message_out = random_message(); + cosmo_send_message(client, subject, message_out); + const json_t *message_in = wait_for_message(state); + assert(json_equal(message_out, json_object_get(message_in, "message"))); + json_decref(message_out); + + // Reach in and reset the instance ID so we look new. + cosmo_uuid(client->instance_id); + + message_out = random_message(); + cosmo_send_message(client, subject, message_out); + message_in = wait_for_message(state); + assert(json_equal(message_out, json_object_get(message_in, "message"))); + json_decref(message_out); + + json_decref(subject); + + cosmo_shutdown(client); + return true; +} + int main(int argc, char *argv[]) { RUN_TEST(test_create_destroy); RUN_TEST(test_message_round_trip); RUN_TEST(test_logout_fires); + RUN_TEST(test_resubscribe); return 0; }