More promise plumbing.

This commit is contained in:
Ian Gulliver
2015-06-28 11:43:17 -07:00
parent 4b79ef06de
commit a3b6977088
2 changed files with 9 additions and 7 deletions

View File

@@ -7,6 +7,7 @@ struct cosmo_command {
struct cosmo_command *prev;
struct cosmo_command *next;
json_t *command;
promise *promise;
};
struct cosmo {

View File

@@ -87,18 +87,19 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
return NULL;
}
static void cosmo_send_command_locked(cosmo *instance, json_t *command) {
static void cosmo_send_command_locked(cosmo *instance, json_t *command, promise *promise_obj) {
struct cosmo_command *command_obj = malloc(sizeof(*command_obj));
command_obj->command = command;
command_obj->promise = promise_obj;
cosmo_append_command(&instance->command_queue_head, &instance->command_queue_tail, command_obj);
instance->next_delay_ms = 0;
}
// Takes ownership of command.
static void cosmo_send_command(cosmo *instance, json_t *command) {
static void cosmo_send_command(cosmo *instance, json_t *command, promise *promise_obj) {
assert(command);
assert(!pthread_mutex_lock(&instance->lock));
cosmo_send_command_locked(instance, command);
cosmo_send_command_locked(instance, command, promise_obj);
assert(!pthread_cond_signal(&instance->cond));
assert(!pthread_mutex_unlock(&instance->lock));
}
@@ -400,7 +401,7 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman
}
}
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments));
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), NULL);
}
}
assert(!pthread_mutex_unlock(&instance->lock));
@@ -557,7 +558,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message
json_object_set_new(arguments, "last_id", json_integer(last_id));
json_object_set_new(subscription, "last_id", json_integer(last_id));
}
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments));
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), promise_obj);
}
assert(!pthread_mutex_unlock(&instance->lock));
@@ -579,7 +580,7 @@ void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
assert(!pthread_mutex_unlock(&instance->lock));
json_t *arguments = json_pack("{sO}", "subject", subject);
cosmo_send_command(instance, cosmo_command("unsubscribe", arguments));
cosmo_send_command(instance, cosmo_command("unsubscribe", arguments), promise_obj);
}
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) {
@@ -590,7 +591,7 @@ void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promi
"subject", subject,
"message", encoded,
"sender_message_id", sender_message_id);
cosmo_send_command(instance, cosmo_command("sendMessage", arguments));
cosmo_send_command(instance, cosmo_command("sendMessage", arguments), promise_obj);
free(encoded);
}