diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index c9e2190..4ca15e5 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -321,7 +321,7 @@ static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *comm if (strcmp(result, "ok")) { cosmo_remove_subscription(instance, subject); - promise_fail(command->promise, NULL); + promise_fail(command->promise, NULL, NULL); return; } @@ -333,20 +333,27 @@ static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *comm } assert(!pthread_mutex_unlock(&instance->lock)); - promise_succeed(command->promise, NULL); + promise_succeed(command->promise, NULL, NULL); } static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { - promise_complete(command->promise, NULL, (strcmp(result, "ok") == 0)); + promise_complete(command->promise, NULL, NULL, (strcmp(result, "ok") == 0)); } static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { json_t *message; int err = json_unpack(response, "{so}", "message", &message); if (err || strcmp(result, "ok")) { - promise_fail(command->promise, NULL); + promise_fail(command->promise, NULL, NULL); } else { - promise_succeed(command->promise, message); + char *message_content; + assert(!json_unpack(message, "{ss}", "message", &message_content)); + json_t *message_object = json_loads(message_content, JSON_DECODE_ANY, NULL); + assert(message_object); + json_object_set_new(message, "message", message_object); + + json_incref(message); + promise_succeed(command->promise, message, (promise_cleanup) json_decref); } } diff --git a/clients/c/promise.c b/clients/c/promise.c index a980c29..8c25760 100644 --- a/clients/c/promise.c +++ b/clients/c/promise.c @@ -5,7 +5,6 @@ #include "promise.h" struct promise { - bool will_wait; promise_callback on_success; promise_callback on_failure; void *passthrough; @@ -16,11 +15,11 @@ struct promise { bool success; void *result; + promise_cleanup cleanup; }; -promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough) { +promise *promise_create(promise_callback on_success, promise_callback on_failure, void *passthrough) { promise *promise_obj = malloc(sizeof(*promise_obj)); - promise_obj->will_wait = will_wait; promise_obj->on_success = on_success; promise_obj->on_failure = on_failure; promise_obj->passthrough = passthrough; @@ -31,32 +30,35 @@ promise *promise_create(bool will_wait, promise_callback on_success, promise_cal return promise_obj; } -static void promise_destroy(promise *promise_obj) { +void promise_destroy(promise *promise_obj) { assert(!pthread_mutex_destroy(&promise_obj->lock)); assert(!pthread_cond_destroy(&promise_obj->cond)); + if (promise_obj->result && promise_obj->cleanup) { + promise_obj->cleanup(promise_obj->result); + } free(promise_obj); } bool promise_wait(promise *promise_obj, void **result) { assert(promise_obj); assert(!pthread_mutex_lock(&promise_obj->lock)); - assert(promise_obj->will_wait); while (!promise_obj->fulfilled) { pthread_cond_wait(&promise_obj->cond, &promise_obj->lock); } assert(!pthread_mutex_unlock(&promise_obj->lock)); - // promise_obj is now filled in, and owned solely by us. bool success = promise_obj->success; if (result) { *result = promise_obj->result; } - promise_destroy(promise_obj); return success; } -void promise_complete(promise *promise_obj, void *result, bool success) { +void promise_complete(promise *promise_obj, void *result, promise_cleanup cleanup, bool success) { if (!promise_obj) { + if (result && cleanup) { + cleanup(result); + } return; } @@ -68,24 +70,18 @@ void promise_complete(promise *promise_obj, void *result, bool success) { promise_obj->on_failure(promise_obj->passthrough, result); } - if (promise_obj->will_wait) { - // We don't own promise_obj; pass to promise_wait() - promise_obj->result = result; - promise_obj->success = success; - promise_obj->fulfilled = true; - assert(!pthread_cond_signal(&promise_obj->cond)); - assert(!pthread_mutex_unlock(&promise_obj->lock)); - } else { - // We own promise_obj - assert(!pthread_mutex_unlock(&promise_obj->lock)); - promise_destroy(promise_obj); - } + promise_obj->result = result; + promise_obj->cleanup = cleanup; + promise_obj->success = success; + promise_obj->fulfilled = true; + assert(!pthread_cond_signal(&promise_obj->cond)); + assert(!pthread_mutex_unlock(&promise_obj->lock)); } -void promise_succeed(promise *promise_obj, void *result) { - promise_complete(promise_obj, result, true); +void promise_succeed(promise *promise_obj, void *result, promise_cleanup cleanup) { + promise_complete(promise_obj, result, cleanup, true); } -void promise_fail(promise *promise_obj, void *result) { - promise_complete(promise_obj, result, false); +void promise_fail(promise *promise_obj, void *result, promise_cleanup cleanup) { + promise_complete(promise_obj, result, cleanup, false); } diff --git a/clients/c/promise.h b/clients/c/promise.h index 0303dfc..862e100 100644 --- a/clients/c/promise.h +++ b/clients/c/promise.h @@ -7,13 +7,15 @@ typedef struct promise promise; // (passthrough, result) typedef void (*promise_callback)(void *, void *); +typedef void (*promise_cleanup)(void *); -promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough); +promise *promise_create(promise_callback on_success, promise_callback on_failure, void *passthrough); bool promise_wait(promise *promise_obj, void **result); +void promise_destroy(promise *promise_obj); -void promise_complete(promise *promise_obj, void *result, bool success); +void promise_complete(promise *promise_obj, void *result, promise_cleanup cleanup, bool success); // Shortcuts for promise_complete() -void promise_succeed(promise *promise_obj, void *result); -void promise_fail(promise *promise_obj, void *result); +void promise_succeed(promise *promise_obj, void *result, promise_cleanup cleanup); +void promise_fail(promise *promise_obj, void *result, promise_cleanup cleanup); #endif diff --git a/clients/c/test.c b/clients/c/test.c index 3c73535..b43d59f 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -306,6 +306,27 @@ bool test_complex_object(test_state *state) { return true; } +bool test_send_message_promise(test_state *state) { + cosmo *client = create_client(state); + + json_t *subject = random_subject(NULL, NULL); + json_t *message_out = random_message(); + + promise *promise_obj = promise_create(NULL, NULL, NULL); + cosmo_send_message(client, subject, message_out, promise_obj); + json_t *result; + assert(promise_wait(promise_obj, (void **) &result)); + assert(json_equal(subject, json_object_get(result, "subject"))); + assert(json_equal(message_out, json_object_get(result, "message"))); + promise_destroy(promise_obj); + + json_decref(subject); + json_decref(message_out); + + cosmo_shutdown(client); + return true; +} + bool test_getmessages_subscribe(test_state *state) { cosmo *client = create_client(state); @@ -350,6 +371,7 @@ int main(int argc, char *argv[]) { RUN_TEST(test_reconnect); RUN_TEST(test_bulk_subscribe); RUN_TEST(test_complex_object); + RUN_TEST(test_send_message_promise); RUN_TEST(test_getmessages_subscribe); RUN_TEST(test_resubscribe);