diff --git a/clients/c/cosmopolite-int.h b/clients/c/cosmopolite-int.h index c9973f5..6858d4b 100644 --- a/clients/c/cosmopolite-int.h +++ b/clients/c/cosmopolite-int.h @@ -7,6 +7,7 @@ struct cosmo_command { struct cosmo_command *prev; struct cosmo_command *next; json_t *command; + promise *promise; }; struct cosmo { diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 89e39e2..efa0239 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -87,18 +87,19 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { return NULL; } -static void cosmo_send_command_locked(cosmo *instance, json_t *command) { +static void cosmo_send_command_locked(cosmo *instance, json_t *command, promise *promise_obj) { struct cosmo_command *command_obj = malloc(sizeof(*command_obj)); command_obj->command = command; + command_obj->promise = promise_obj; cosmo_append_command(&instance->command_queue_head, &instance->command_queue_tail, command_obj); instance->next_delay_ms = 0; } // Takes ownership of command. -static void cosmo_send_command(cosmo *instance, json_t *command) { +static void cosmo_send_command(cosmo *instance, json_t *command, promise *promise_obj) { assert(command); assert(!pthread_mutex_lock(&instance->lock)); - cosmo_send_command_locked(instance, command); + cosmo_send_command_locked(instance, command, promise_obj); assert(!pthread_cond_signal(&instance->cond)); assert(!pthread_mutex_unlock(&instance->lock)); } @@ -400,7 +401,7 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman } } - cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments)); + cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), NULL); } } assert(!pthread_mutex_unlock(&instance->lock)); @@ -557,7 +558,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message json_object_set_new(arguments, "last_id", json_integer(last_id)); json_object_set_new(subscription, "last_id", json_integer(last_id)); } - cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments)); + cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), promise_obj); } assert(!pthread_mutex_unlock(&instance->lock)); @@ -579,7 +580,7 @@ void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) { assert(!pthread_mutex_unlock(&instance->lock)); json_t *arguments = json_pack("{sO}", "subject", subject); - cosmo_send_command(instance, cosmo_command("unsubscribe", arguments)); + cosmo_send_command(instance, cosmo_command("unsubscribe", arguments), promise_obj); } void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) { @@ -590,7 +591,7 @@ void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promi "subject", subject, "message", encoded, "sender_message_id", sender_message_id); - cosmo_send_command(instance, cosmo_command("sendMessage", arguments)); + cosmo_send_command(instance, cosmo_command("sendMessage", arguments), promise_obj); free(encoded); }