diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index a4e3904..58aff61 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -225,29 +225,31 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { } free(response); - json_t *profile = json_object_get(received, "profile"); - if (profile) { - if (!instance->profile || strcmp(instance->profile, json_string_value(profile))) { - free(instance->profile); - instance->profile = strdup(json_string_value(profile)); - } - } else { - fprintf(stderr, "response lacks \"profile\" key\n"); - } - - json_t *command_responses = json_object_get(received, "responses"); - if (!command_responses) { - fprintf(stderr, "response lacks \"responses\" key\n"); + json_t *command_responses, *events; + char *profile; + if (json_unpack(received, "{sssoso}", "profile", &profile, "responses", &command_responses, "events", &events)) { + fprintf(stderr, "invalid server response\n"); return commands; } - size_t index; + if (!instance->profile || strcmp(instance->profile, profile)) { + free(instance->profile); + instance->profile = strdup(profile); + } - json_t *events = json_object_get(received, "events"); - if (events) { - json_t *event; - json_array_foreach(events, index, event) { - cosmo_handle_event(instance, event); + size_t index; + json_t *event; + json_array_foreach(events, index, event) { + cosmo_handle_event(instance, event); + } + + json_t *poll_response = json_array_get(command_responses, 0); + int new_poll; + if (json_unpack(poll_response, "{sb}", "new", &new_poll)) { + fprintf(stderr, "invalid poll response\n"); + } else { + if (new_poll) { + // TODO: resubscribe to ACTIVE subscriptions } } @@ -256,17 +258,20 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { json_array_foreach(commands, index, command) { // +1 for the poll offset from int_commands. json_t *command_response = json_array_get(command_responses, index + 1); - json_t *result = json_object_get(command_response, "result"); - if (!result) { - fprintf(stderr, "response lacks \"result\" key\n"); + char *result; + if (json_unpack(command_response, "{ss}", "result", &result)) { + fprintf(stderr, "invalid command response\n"); json_array_append(to_retry, command); continue; } - if (!strcmp(json_string_value(result), "retry")) { + if (!strcmp(result, "retry")) { json_array_append(to_retry, command); continue; } - if (!strcmp(json_string_value(json_object_get(command, "command")), "subscribe")) { + + char *command_name; + assert(!json_unpack(command, "{ss}", "command", &command_name)); + if (!strcmp(command_name, "subscribe")) { json_t *subject = NULL; assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject)); assert(!pthread_mutex_lock(&instance->lock)); @@ -353,18 +358,21 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char } void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) { - json_t *arguments = json_pack("{sO}", "subject", subject); - if (messages) { - json_object_set_new(arguments, "messages", json_integer(messages)); - } - if (last_id) { - json_object_set_new(arguments, "last_id", json_integer(last_id)); - } - assert(!pthread_mutex_lock(&instance->lock)); json_t *subscription = cosmo_find_subscription(instance, subject); if (!subscription) { - json_array_append_new(instance->subscriptions, json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING)); + subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING); + json_array_append_new(instance->subscriptions, subscription); + } + + json_t *arguments = json_pack("{sO}", "subject", subject); + if (messages) { + json_object_set_new(arguments, "messages", json_integer(messages)); + json_object_set_new(subscription, "messages", json_integer(messages)); + } + if (last_id) { + json_object_set_new(arguments, "last_id", json_integer(last_id)); + json_object_set_new(subscription, "last_id", json_integer(last_id)); } assert(!pthread_mutex_unlock(&instance->lock));