Restructure promise API, use it in cosmo, add test.
This commit is contained in:
@@ -321,7 +321,7 @@ static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *comm
|
|||||||
|
|
||||||
if (strcmp(result, "ok")) {
|
if (strcmp(result, "ok")) {
|
||||||
cosmo_remove_subscription(instance, subject);
|
cosmo_remove_subscription(instance, subject);
|
||||||
promise_fail(command->promise, NULL);
|
promise_fail(command->promise, NULL, NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,20 +333,27 @@ static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *comm
|
|||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
promise_succeed(command->promise, NULL);
|
promise_succeed(command->promise, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
promise_complete(command->promise, NULL, (strcmp(result, "ok") == 0));
|
promise_complete(command->promise, NULL, NULL, (strcmp(result, "ok") == 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
json_t *message;
|
json_t *message;
|
||||||
int err = json_unpack(response, "{so}", "message", &message);
|
int err = json_unpack(response, "{so}", "message", &message);
|
||||||
if (err || strcmp(result, "ok")) {
|
if (err || strcmp(result, "ok")) {
|
||||||
promise_fail(command->promise, NULL);
|
promise_fail(command->promise, NULL, NULL);
|
||||||
} else {
|
} else {
|
||||||
promise_succeed(command->promise, message);
|
char *message_content;
|
||||||
|
assert(!json_unpack(message, "{ss}", "message", &message_content));
|
||||||
|
json_t *message_object = json_loads(message_content, JSON_DECODE_ANY, NULL);
|
||||||
|
assert(message_object);
|
||||||
|
json_object_set_new(message, "message", message_object);
|
||||||
|
|
||||||
|
json_incref(message);
|
||||||
|
promise_succeed(command->promise, message, (promise_cleanup) json_decref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,6 @@
|
|||||||
#include "promise.h"
|
#include "promise.h"
|
||||||
|
|
||||||
struct promise {
|
struct promise {
|
||||||
bool will_wait;
|
|
||||||
promise_callback on_success;
|
promise_callback on_success;
|
||||||
promise_callback on_failure;
|
promise_callback on_failure;
|
||||||
void *passthrough;
|
void *passthrough;
|
||||||
@@ -16,11 +15,11 @@ struct promise {
|
|||||||
|
|
||||||
bool success;
|
bool success;
|
||||||
void *result;
|
void *result;
|
||||||
|
promise_cleanup cleanup;
|
||||||
};
|
};
|
||||||
|
|
||||||
promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough) {
|
promise *promise_create(promise_callback on_success, promise_callback on_failure, void *passthrough) {
|
||||||
promise *promise_obj = malloc(sizeof(*promise_obj));
|
promise *promise_obj = malloc(sizeof(*promise_obj));
|
||||||
promise_obj->will_wait = will_wait;
|
|
||||||
promise_obj->on_success = on_success;
|
promise_obj->on_success = on_success;
|
||||||
promise_obj->on_failure = on_failure;
|
promise_obj->on_failure = on_failure;
|
||||||
promise_obj->passthrough = passthrough;
|
promise_obj->passthrough = passthrough;
|
||||||
@@ -31,32 +30,35 @@ promise *promise_create(bool will_wait, promise_callback on_success, promise_cal
|
|||||||
return promise_obj;
|
return promise_obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void promise_destroy(promise *promise_obj) {
|
void promise_destroy(promise *promise_obj) {
|
||||||
assert(!pthread_mutex_destroy(&promise_obj->lock));
|
assert(!pthread_mutex_destroy(&promise_obj->lock));
|
||||||
assert(!pthread_cond_destroy(&promise_obj->cond));
|
assert(!pthread_cond_destroy(&promise_obj->cond));
|
||||||
|
if (promise_obj->result && promise_obj->cleanup) {
|
||||||
|
promise_obj->cleanup(promise_obj->result);
|
||||||
|
}
|
||||||
free(promise_obj);
|
free(promise_obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool promise_wait(promise *promise_obj, void **result) {
|
bool promise_wait(promise *promise_obj, void **result) {
|
||||||
assert(promise_obj);
|
assert(promise_obj);
|
||||||
assert(!pthread_mutex_lock(&promise_obj->lock));
|
assert(!pthread_mutex_lock(&promise_obj->lock));
|
||||||
assert(promise_obj->will_wait);
|
|
||||||
while (!promise_obj->fulfilled) {
|
while (!promise_obj->fulfilled) {
|
||||||
pthread_cond_wait(&promise_obj->cond, &promise_obj->lock);
|
pthread_cond_wait(&promise_obj->cond, &promise_obj->lock);
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&promise_obj->lock));
|
assert(!pthread_mutex_unlock(&promise_obj->lock));
|
||||||
|
|
||||||
// promise_obj is now filled in, and owned solely by us.
|
|
||||||
bool success = promise_obj->success;
|
bool success = promise_obj->success;
|
||||||
if (result) {
|
if (result) {
|
||||||
*result = promise_obj->result;
|
*result = promise_obj->result;
|
||||||
}
|
}
|
||||||
promise_destroy(promise_obj);
|
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
void promise_complete(promise *promise_obj, void *result, bool success) {
|
void promise_complete(promise *promise_obj, void *result, promise_cleanup cleanup, bool success) {
|
||||||
if (!promise_obj) {
|
if (!promise_obj) {
|
||||||
|
if (result && cleanup) {
|
||||||
|
cleanup(result);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,24 +70,18 @@ void promise_complete(promise *promise_obj, void *result, bool success) {
|
|||||||
promise_obj->on_failure(promise_obj->passthrough, result);
|
promise_obj->on_failure(promise_obj->passthrough, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (promise_obj->will_wait) {
|
promise_obj->result = result;
|
||||||
// We don't own promise_obj; pass to promise_wait()
|
promise_obj->cleanup = cleanup;
|
||||||
promise_obj->result = result;
|
promise_obj->success = success;
|
||||||
promise_obj->success = success;
|
promise_obj->fulfilled = true;
|
||||||
promise_obj->fulfilled = true;
|
assert(!pthread_cond_signal(&promise_obj->cond));
|
||||||
assert(!pthread_cond_signal(&promise_obj->cond));
|
assert(!pthread_mutex_unlock(&promise_obj->lock));
|
||||||
assert(!pthread_mutex_unlock(&promise_obj->lock));
|
|
||||||
} else {
|
|
||||||
// We own promise_obj
|
|
||||||
assert(!pthread_mutex_unlock(&promise_obj->lock));
|
|
||||||
promise_destroy(promise_obj);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void promise_succeed(promise *promise_obj, void *result) {
|
void promise_succeed(promise *promise_obj, void *result, promise_cleanup cleanup) {
|
||||||
promise_complete(promise_obj, result, true);
|
promise_complete(promise_obj, result, cleanup, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void promise_fail(promise *promise_obj, void *result) {
|
void promise_fail(promise *promise_obj, void *result, promise_cleanup cleanup) {
|
||||||
promise_complete(promise_obj, result, false);
|
promise_complete(promise_obj, result, cleanup, false);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,13 +7,15 @@ typedef struct promise promise;
|
|||||||
|
|
||||||
// (passthrough, result)
|
// (passthrough, result)
|
||||||
typedef void (*promise_callback)(void *, void *);
|
typedef void (*promise_callback)(void *, void *);
|
||||||
|
typedef void (*promise_cleanup)(void *);
|
||||||
|
|
||||||
promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough);
|
promise *promise_create(promise_callback on_success, promise_callback on_failure, void *passthrough);
|
||||||
bool promise_wait(promise *promise_obj, void **result);
|
bool promise_wait(promise *promise_obj, void **result);
|
||||||
|
void promise_destroy(promise *promise_obj);
|
||||||
|
|
||||||
void promise_complete(promise *promise_obj, void *result, bool success);
|
void promise_complete(promise *promise_obj, void *result, promise_cleanup cleanup, bool success);
|
||||||
// Shortcuts for promise_complete()
|
// Shortcuts for promise_complete()
|
||||||
void promise_succeed(promise *promise_obj, void *result);
|
void promise_succeed(promise *promise_obj, void *result, promise_cleanup cleanup);
|
||||||
void promise_fail(promise *promise_obj, void *result);
|
void promise_fail(promise *promise_obj, void *result, promise_cleanup cleanup);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -306,6 +306,27 @@ bool test_complex_object(test_state *state) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool test_send_message_promise(test_state *state) {
|
||||||
|
cosmo *client = create_client(state);
|
||||||
|
|
||||||
|
json_t *subject = random_subject(NULL, NULL);
|
||||||
|
json_t *message_out = random_message();
|
||||||
|
|
||||||
|
promise *promise_obj = promise_create(NULL, NULL, NULL);
|
||||||
|
cosmo_send_message(client, subject, message_out, promise_obj);
|
||||||
|
json_t *result;
|
||||||
|
assert(promise_wait(promise_obj, (void **) &result));
|
||||||
|
assert(json_equal(subject, json_object_get(result, "subject")));
|
||||||
|
assert(json_equal(message_out, json_object_get(result, "message")));
|
||||||
|
promise_destroy(promise_obj);
|
||||||
|
|
||||||
|
json_decref(subject);
|
||||||
|
json_decref(message_out);
|
||||||
|
|
||||||
|
cosmo_shutdown(client);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
bool test_getmessages_subscribe(test_state *state) {
|
bool test_getmessages_subscribe(test_state *state) {
|
||||||
cosmo *client = create_client(state);
|
cosmo *client = create_client(state);
|
||||||
|
|
||||||
@@ -350,6 +371,7 @@ int main(int argc, char *argv[]) {
|
|||||||
RUN_TEST(test_reconnect);
|
RUN_TEST(test_reconnect);
|
||||||
RUN_TEST(test_bulk_subscribe);
|
RUN_TEST(test_bulk_subscribe);
|
||||||
RUN_TEST(test_complex_object);
|
RUN_TEST(test_complex_object);
|
||||||
|
RUN_TEST(test_send_message_promise);
|
||||||
RUN_TEST(test_getmessages_subscribe);
|
RUN_TEST(test_getmessages_subscribe);
|
||||||
RUN_TEST(test_resubscribe);
|
RUN_TEST(test_resubscribe);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user