diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 7978038..c9e2190 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -87,6 +87,17 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { return NULL; } +void cosmo_remove_subscription(cosmo *instance, json_t *subject) { + size_t i; + json_t *subscription; + json_array_foreach(instance->subscriptions, i, subscription) { + if (json_equal(json_object_get(subscription, "subject"), subject)) { + json_array_remove(instance->subscriptions, i); + break; + } + } +} + static void cosmo_send_command_locked(cosmo *instance, json_t *command, promise *promise_obj) { struct cosmo_command *command_obj = malloc(sizeof(*command_obj)); command_obj->command = command; @@ -304,6 +315,54 @@ static void cosmo_handle_event(cosmo *instance, json_t *event) { } } +static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { + json_t *subject; + assert(!json_unpack(command->command, "{s{so}}", "arguments", "subject", &subject)); + + if (strcmp(result, "ok")) { + cosmo_remove_subscription(instance, subject); + promise_fail(command->promise, NULL); + return; + } + + assert(!pthread_mutex_lock(&instance->lock)); + json_t *subscription = cosmo_find_subscription(instance, subject); + if (subscription) { + // Might have unsubscribed later + json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE)); + } + assert(!pthread_mutex_unlock(&instance->lock)); + + promise_succeed(command->promise, NULL); +} + +static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { + promise_complete(command->promise, NULL, (strcmp(result, "ok") == 0)); +} + +static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { + json_t *message; + int err = json_unpack(response, "{so}", "message", &message); + if (err || strcmp(result, "ok")) { + promise_fail(command->promise, NULL); + } else { + promise_succeed(command->promise, message); + } +} + +static void cosmo_complete_rpc(cosmo *instance, struct cosmo_command *command, json_t *response) { + char *command_name, *result; + assert(!json_unpack(command->command, "{ss}", "command", &command_name)); + assert(!json_unpack(response, "{ss}", "result", &result)); + if (!strcmp(command_name, "subscribe")) { + cosmo_complete_subscribe(instance, command, response, result); + } else if (!strcmp(command_name, "unsubscribe")) { + cosmo_complete_unsubscribe(instance, command, response, result); + } else if (!strcmp(command_name, "sendMessage")) { + cosmo_complete_send_message(instance, command, response, result); + } +} + // Takes ownership of arguments. static json_t *cosmo_command(const char *name, const json_t *arguments) { return json_pack("{ssso}", "command", name, "arguments", arguments); @@ -440,19 +499,7 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman continue; } - char *command_name; - assert(!json_unpack(command_iter->command, "{ss}", "command", &command_name)); - if (!strcmp(command_name, "subscribe")) { - json_t *subject = NULL; - assert(!json_unpack(command_iter->command, "{s{so}}", "arguments", "subject", &subject)); - assert(!pthread_mutex_lock(&instance->lock)); - json_t *subscription = cosmo_find_subscription(instance, subject); - if (subscription) { - // Might have unsubscribed later - json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE)); - } - assert(!pthread_mutex_unlock(&instance->lock)); - } + cosmo_complete_rpc(instance, command_iter, command_response); json_decref(command_iter->command); free(command_iter); @@ -574,14 +621,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) { assert(!pthread_mutex_lock(&instance->lock)); - size_t i; - json_t *subscription; - json_array_foreach(instance->subscriptions, i, subscription) { - if (json_equal(json_object_get(subscription, "subject"), subject)) { - json_array_remove(instance->subscriptions, i); - break; - } - } + cosmo_remove_subscription(instance, subject); assert(!pthread_mutex_unlock(&instance->lock)); json_t *arguments = json_pack("{sO}", "subject", subject); diff --git a/clients/c/promise.c b/clients/c/promise.c index a5b1848..a980c29 100644 --- a/clients/c/promise.c +++ b/clients/c/promise.c @@ -38,6 +38,7 @@ static void promise_destroy(promise *promise_obj) { } bool promise_wait(promise *promise_obj, void **result) { + assert(promise_obj); assert(!pthread_mutex_lock(&promise_obj->lock)); assert(promise_obj->will_wait); while (!promise_obj->fulfilled) { @@ -55,6 +56,10 @@ bool promise_wait(promise *promise_obj, void **result) { } void promise_complete(promise *promise_obj, void *result, bool success) { + if (!promise_obj) { + return; + } + assert(!pthread_mutex_lock(&promise_obj->lock)); if (success && promise_obj->on_success) {