Actually start calling promises.
This commit is contained in:
@@ -87,6 +87,17 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cosmo_remove_subscription(cosmo *instance, json_t *subject) {
|
||||||
|
size_t i;
|
||||||
|
json_t *subscription;
|
||||||
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
||||||
|
if (json_equal(json_object_get(subscription, "subject"), subject)) {
|
||||||
|
json_array_remove(instance->subscriptions, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void cosmo_send_command_locked(cosmo *instance, json_t *command, promise *promise_obj) {
|
static void cosmo_send_command_locked(cosmo *instance, json_t *command, promise *promise_obj) {
|
||||||
struct cosmo_command *command_obj = malloc(sizeof(*command_obj));
|
struct cosmo_command *command_obj = malloc(sizeof(*command_obj));
|
||||||
command_obj->command = command;
|
command_obj->command = command;
|
||||||
@@ -304,6 +315,54 @@ static void cosmo_handle_event(cosmo *instance, json_t *event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
|
json_t *subject;
|
||||||
|
assert(!json_unpack(command->command, "{s{so}}", "arguments", "subject", &subject));
|
||||||
|
|
||||||
|
if (strcmp(result, "ok")) {
|
||||||
|
cosmo_remove_subscription(instance, subject);
|
||||||
|
promise_fail(command->promise, NULL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
|
if (subscription) {
|
||||||
|
// Might have unsubscribed later
|
||||||
|
json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE));
|
||||||
|
}
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
|
promise_succeed(command->promise, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
|
promise_complete(command->promise, NULL, (strcmp(result, "ok") == 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
|
json_t *message;
|
||||||
|
int err = json_unpack(response, "{so}", "message", &message);
|
||||||
|
if (err || strcmp(result, "ok")) {
|
||||||
|
promise_fail(command->promise, NULL);
|
||||||
|
} else {
|
||||||
|
promise_succeed(command->promise, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cosmo_complete_rpc(cosmo *instance, struct cosmo_command *command, json_t *response) {
|
||||||
|
char *command_name, *result;
|
||||||
|
assert(!json_unpack(command->command, "{ss}", "command", &command_name));
|
||||||
|
assert(!json_unpack(response, "{ss}", "result", &result));
|
||||||
|
if (!strcmp(command_name, "subscribe")) {
|
||||||
|
cosmo_complete_subscribe(instance, command, response, result);
|
||||||
|
} else if (!strcmp(command_name, "unsubscribe")) {
|
||||||
|
cosmo_complete_unsubscribe(instance, command, response, result);
|
||||||
|
} else if (!strcmp(command_name, "sendMessage")) {
|
||||||
|
cosmo_complete_send_message(instance, command, response, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Takes ownership of arguments.
|
// Takes ownership of arguments.
|
||||||
static json_t *cosmo_command(const char *name, const json_t *arguments) {
|
static json_t *cosmo_command(const char *name, const json_t *arguments) {
|
||||||
return json_pack("{ssso}", "command", name, "arguments", arguments);
|
return json_pack("{ssso}", "command", name, "arguments", arguments);
|
||||||
@@ -440,19 +499,7 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *command_name;
|
cosmo_complete_rpc(instance, command_iter, command_response);
|
||||||
assert(!json_unpack(command_iter->command, "{ss}", "command", &command_name));
|
|
||||||
if (!strcmp(command_name, "subscribe")) {
|
|
||||||
json_t *subject = NULL;
|
|
||||||
assert(!json_unpack(command_iter->command, "{s{so}}", "arguments", "subject", &subject));
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
|
||||||
json_t *subscription = cosmo_find_subscription(instance, subject);
|
|
||||||
if (subscription) {
|
|
||||||
// Might have unsubscribed later
|
|
||||||
json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE));
|
|
||||||
}
|
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
}
|
|
||||||
|
|
||||||
json_decref(command_iter->command);
|
json_decref(command_iter->command);
|
||||||
free(command_iter);
|
free(command_iter);
|
||||||
@@ -574,14 +621,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message
|
|||||||
|
|
||||||
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
|
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
size_t i;
|
cosmo_remove_subscription(instance, subject);
|
||||||
json_t *subscription;
|
|
||||||
json_array_foreach(instance->subscriptions, i, subscription) {
|
|
||||||
if (json_equal(json_object_get(subscription, "subject"), subject)) {
|
|
||||||
json_array_remove(instance->subscriptions, i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
json_t *arguments = json_pack("{sO}", "subject", subject);
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ static void promise_destroy(promise *promise_obj) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool promise_wait(promise *promise_obj, void **result) {
|
bool promise_wait(promise *promise_obj, void **result) {
|
||||||
|
assert(promise_obj);
|
||||||
assert(!pthread_mutex_lock(&promise_obj->lock));
|
assert(!pthread_mutex_lock(&promise_obj->lock));
|
||||||
assert(promise_obj->will_wait);
|
assert(promise_obj->will_wait);
|
||||||
while (!promise_obj->fulfilled) {
|
while (!promise_obj->fulfilled) {
|
||||||
@@ -55,6 +56,10 @@ bool promise_wait(promise *promise_obj, void **result) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void promise_complete(promise *promise_obj, void *result, bool success) {
|
void promise_complete(promise *promise_obj, void *result, bool success) {
|
||||||
|
if (!promise_obj) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assert(!pthread_mutex_lock(&promise_obj->lock));
|
assert(!pthread_mutex_lock(&promise_obj->lock));
|
||||||
|
|
||||||
if (success && promise_obj->on_success) {
|
if (success && promise_obj->on_success) {
|
||||||
|
|||||||
Reference in New Issue
Block a user