From 4b79ef06defb237c2193735215c50e2b30e4f26c Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 28 Jun 2015 11:18:05 -0700 Subject: [PATCH] Plumbing for promise integration. --- clients/c/Makefile | 15 +++---- clients/c/cosmopolite.c | 6 +-- clients/c/cosmopolite.h | 15 ++++--- clients/c/promise.c | 86 +++++++++++++++++++++++++++++++++++++++++ clients/c/promise.h | 19 +++++++++ clients/c/test.c | 28 +++++++------- 6 files changed, 139 insertions(+), 30 deletions(-) create mode 100644 clients/c/promise.c create mode 100644 clients/c/promise.h diff --git a/clients/c/Makefile b/clients/c/Makefile index bfad189..23cc6bd 100644 --- a/clients/c/Makefile +++ b/clients/c/Makefile @@ -5,23 +5,24 @@ LIBS ?= -lcurl -ljansson -luuid -lpthread all: libcosmopolite.so -libcosmopolite.so: cosmopolite.o - $(CC) -shared $(LDFLAGS) -o libcosmopolite.so cosmopolite.o $(LIBS) +libcosmopolite.so: cosmopolite.o promise.o + $(CC) -shared $(LDFLAGS) -o libcosmopolite.so cosmopolite.o promise.o $(LIBS) %.o: %.c *.h $(CC) -c $(CFLAGS) $< -o $@ -install: libcosmopolite.so cosmopolite.h +install: libcosmopolite.so cosmopolite.h promise.h cp libcosmopolite.so /usr/local/lib cp cosmopolite.h /usr/local/include - chown root /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h - chmod 0644 /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h + cp promise.h /usr/local/include + chown root /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h /usr/local/include/promise.h + chmod 0644 /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h /usr/local/include/promise.h clean: rm -f test libcosmopolite.so *.o -test: test.o cosmopolite.o - $(CC) $(LDFLAGS) -o test test.o cosmopolite.o $(LIBS) +test: test.o cosmopolite.o promise.o + $(CC) $(LDFLAGS) -o test test.o cosmopolite.o promise.o $(LIBS) runtest: test valgrind --leak-check=full --show-reachable=yes --num-callers=20 --suppressions=suppressions ./test diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index e05dd99..89e39e2 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -530,7 +530,7 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char return ret; } -void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id) { +void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id, promise *promise_obj) { if (json_is_array(subjects)) { json_incref(subjects); } else { @@ -566,7 +566,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message json_decref(subjects); } -void cosmo_unsubscribe(cosmo *instance, json_t *subject) { +void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) { assert(!pthread_mutex_lock(&instance->lock)); size_t i; json_t *subscription; @@ -582,7 +582,7 @@ void cosmo_unsubscribe(cosmo *instance, json_t *subject) { cosmo_send_command(instance, cosmo_command("unsubscribe", arguments)); } -void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message) { +void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) { char sender_message_id[COSMO_UUID_SIZE]; cosmo_uuid(sender_message_id); char *encoded = json_dumps(message, JSON_ENCODE_ANY); diff --git a/clients/c/cosmopolite.h b/clients/c/cosmopolite.h index 5945d2a..8c23120 100644 --- a/clients/c/cosmopolite.h +++ b/clients/c/cosmopolite.h @@ -8,6 +8,8 @@ #include #include +#include "promise.h" + #define COSMO_UUID_SIZE 37 typedef struct { @@ -32,16 +34,17 @@ void cosmo_shutdown(cosmo *instance); const char *cosmo_current_profile(cosmo *instance); json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by); -void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id); -void cosmo_unsubscribe(cosmo *instance, json_t *subject); -void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message); +void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id, promise *promise_obj); +void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj); +void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj); json_t *cosmo_get_messages(cosmo *instance, json_t *subject); json_t *cosmo_get_last_message(cosmo *instance, json_t *subject); // TODO -json_t *cosmo_get_pins(cosmo *instance, json_t *subject); -void cosmo_pin(cosmo *instance, json_t *subject, json_t *message); -void cosmo_unpin(cosmo *instance, json_t *subject, json_t *message); +void cosmo_get_profile(cosmo *instance, promise *promise_obj); +json_t *cosmo_get_pins(cosmo *instance, json_t *subject, promise *promise_obj); +void cosmo_pin(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj); +void cosmo_unpin(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj); #endif diff --git a/clients/c/promise.c b/clients/c/promise.c new file mode 100644 index 0000000..a5b1848 --- /dev/null +++ b/clients/c/promise.c @@ -0,0 +1,86 @@ +#include +#include +#include + +#include "promise.h" + +struct promise { + bool will_wait; + promise_callback on_success; + promise_callback on_failure; + void *passthrough; + + bool fulfilled; + pthread_mutex_t lock; + pthread_cond_t cond; + + bool success; + void *result; +}; + +promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough) { + promise *promise_obj = malloc(sizeof(*promise_obj)); + promise_obj->will_wait = will_wait; + promise_obj->on_success = on_success; + promise_obj->on_failure = on_failure; + promise_obj->passthrough = passthrough; + + promise_obj->fulfilled = false; + assert(!pthread_mutex_init(&promise_obj->lock, NULL)); + assert(!pthread_cond_init(&promise_obj->cond, NULL)); + return promise_obj; +} + +static void promise_destroy(promise *promise_obj) { + assert(!pthread_mutex_destroy(&promise_obj->lock)); + assert(!pthread_cond_destroy(&promise_obj->cond)); + free(promise_obj); +} + +bool promise_wait(promise *promise_obj, void **result) { + assert(!pthread_mutex_lock(&promise_obj->lock)); + assert(promise_obj->will_wait); + while (!promise_obj->fulfilled) { + pthread_cond_wait(&promise_obj->cond, &promise_obj->lock); + } + assert(!pthread_mutex_unlock(&promise_obj->lock)); + + // promise_obj is now filled in, and owned solely by us. + bool success = promise_obj->success; + if (result) { + *result = promise_obj->result; + } + promise_destroy(promise_obj); + return success; +} + +void promise_complete(promise *promise_obj, void *result, bool success) { + assert(!pthread_mutex_lock(&promise_obj->lock)); + + if (success && promise_obj->on_success) { + promise_obj->on_success(promise_obj->passthrough, result); + } else if (!success && promise_obj->on_failure) { + promise_obj->on_failure(promise_obj->passthrough, result); + } + + if (promise_obj->will_wait) { + // We don't own promise_obj; pass to promise_wait() + promise_obj->result = result; + promise_obj->success = success; + promise_obj->fulfilled = true; + assert(!pthread_cond_signal(&promise_obj->cond)); + assert(!pthread_mutex_unlock(&promise_obj->lock)); + } else { + // We own promise_obj + assert(!pthread_mutex_unlock(&promise_obj->lock)); + promise_destroy(promise_obj); + } +} + +void promise_succeed(promise *promise_obj, void *result) { + promise_complete(promise_obj, result, true); +} + +void promise_fail(promise *promise_obj, void *result) { + promise_complete(promise_obj, result, false); +} diff --git a/clients/c/promise.h b/clients/c/promise.h new file mode 100644 index 0000000..0303dfc --- /dev/null +++ b/clients/c/promise.h @@ -0,0 +1,19 @@ +#ifndef _PROMISE_H +#define _PROMISE_H + +#include + +typedef struct promise promise; + +// (passthrough, result) +typedef void (*promise_callback)(void *, void *); + +promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough); +bool promise_wait(promise *promise_obj, void **result); + +void promise_complete(promise *promise_obj, void *result, bool success); +// Shortcuts for promise_complete() +void promise_succeed(promise *promise_obj, void *result); +void promise_fail(promise *promise_obj, void *result); + +#endif diff --git a/clients/c/test.c b/clients/c/test.c index 04efcd9..3c73535 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -191,10 +191,10 @@ bool test_message_round_trip(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); - cosmo_subscribe(client, subject, -1, 0); + cosmo_subscribe(client, subject, -1, 0, NULL); json_t *message_out = random_message(); - cosmo_send_message(client, subject, message_out); + cosmo_send_message(client, subject, message_out, NULL); const json_t *message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message"))); @@ -235,10 +235,10 @@ bool test_resubscribe_after_reconnect(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); - cosmo_subscribe(client, subject, -1, 0); + cosmo_subscribe(client, subject, -1, 0, NULL); json_t *message_out = random_message(); - cosmo_send_message(client, subject, message_out); + cosmo_send_message(client, subject, message_out, NULL); const json_t *message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message"))); json_decref(message_out); @@ -247,7 +247,7 @@ bool test_resubscribe_after_reconnect(test_state *state) { cosmo_uuid(client->instance_id); message_out = random_message(); - cosmo_send_message(client, subject, message_out); + cosmo_send_message(client, subject, message_out, NULL); message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message"))); json_decref(message_out); @@ -264,16 +264,16 @@ bool test_bulk_subscribe(test_state *state) { json_t *subject1 = random_subject(NULL, NULL); json_t *subject2 = random_subject(NULL, NULL); json_t *subjects = json_pack("[oo]", subject1, subject2); - cosmo_subscribe(client, subjects, -1, 0); + cosmo_subscribe(client, subjects, -1, 0, NULL); json_t *message_out = random_message(); - cosmo_send_message(client, subject1, message_out); + cosmo_send_message(client, subject1, message_out, NULL); const json_t *message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message"))); json_decref(message_out); message_out = random_message(); - cosmo_send_message(client, subject2, message_out); + cosmo_send_message(client, subject2, message_out, NULL); message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message"))); json_decref(message_out); @@ -288,14 +288,14 @@ bool test_complex_object(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); - cosmo_subscribe(client, subject, -1, 0); + cosmo_subscribe(client, subject, -1, 0, NULL); json_t *message_out = json_pack("{sssis[iiii]s{sssi}}", "foo", "bar", "zig", 5, "zag", 16, 22, 59, 76, "boo", "nested", "object", "eek", 100); - cosmo_send_message(client, subject, message_out); + cosmo_send_message(client, subject, message_out, NULL); const json_t *message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message"))); @@ -311,7 +311,7 @@ bool test_getmessages_subscribe(test_state *state) { json_t *subject = random_subject(NULL, NULL); assert(!cosmo_get_messages(client, subject)); - cosmo_subscribe(client, subject, -1, 0); + cosmo_subscribe(client, subject, -1, 0, NULL); json_t *messages = cosmo_get_messages(client, subject); assert(messages); json_decref(messages); @@ -327,10 +327,10 @@ bool test_resubscribe(test_state *state) { json_t *subject = random_subject(NULL, NULL); json_t *message_out = random_message(); - cosmo_send_message(client, subject, message_out); + cosmo_send_message(client, subject, message_out, NULL); - cosmo_subscribe(client, subject, 0, 0); - cosmo_subscribe(client, subject, -1, 0); + cosmo_subscribe(client, subject, 0, 0, NULL); + cosmo_subscribe(client, subject, -1, 0, NULL); const json_t *message_in = wait_for_message(state); assert(json_equal(message_out, json_object_get(message_in, "message")));