Plumbing for promise integration.

This commit is contained in:
Ian Gulliver
2015-06-28 11:18:05 -07:00
parent b29690734a
commit 4b79ef06de
6 changed files with 139 additions and 30 deletions

View File

@@ -5,23 +5,24 @@ LIBS ?= -lcurl -ljansson -luuid -lpthread
all: libcosmopolite.so
libcosmopolite.so: cosmopolite.o
$(CC) -shared $(LDFLAGS) -o libcosmopolite.so cosmopolite.o $(LIBS)
libcosmopolite.so: cosmopolite.o promise.o
$(CC) -shared $(LDFLAGS) -o libcosmopolite.so cosmopolite.o promise.o $(LIBS)
%.o: %.c *.h
$(CC) -c $(CFLAGS) $< -o $@
install: libcosmopolite.so cosmopolite.h
install: libcosmopolite.so cosmopolite.h promise.h
cp libcosmopolite.so /usr/local/lib
cp cosmopolite.h /usr/local/include
chown root /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h
chmod 0644 /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h
cp promise.h /usr/local/include
chown root /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h /usr/local/include/promise.h
chmod 0644 /usr/local/lib/libcosmopolite.so /usr/local/include/cosmopolite.h /usr/local/include/promise.h
clean:
rm -f test libcosmopolite.so *.o
test: test.o cosmopolite.o
$(CC) $(LDFLAGS) -o test test.o cosmopolite.o $(LIBS)
test: test.o cosmopolite.o promise.o
$(CC) $(LDFLAGS) -o test test.o cosmopolite.o promise.o $(LIBS)
runtest: test
valgrind --leak-check=full --show-reachable=yes --num-callers=20 --suppressions=suppressions ./test

View File

@@ -530,7 +530,7 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char
return ret;
}
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id) {
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id, promise *promise_obj) {
if (json_is_array(subjects)) {
json_incref(subjects);
} else {
@@ -566,7 +566,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message
json_decref(subjects);
}
void cosmo_unsubscribe(cosmo *instance, json_t *subject) {
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
assert(!pthread_mutex_lock(&instance->lock));
size_t i;
json_t *subscription;
@@ -582,7 +582,7 @@ void cosmo_unsubscribe(cosmo *instance, json_t *subject) {
cosmo_send_command(instance, cosmo_command("unsubscribe", arguments));
}
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message) {
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) {
char sender_message_id[COSMO_UUID_SIZE];
cosmo_uuid(sender_message_id);
char *encoded = json_dumps(message, JSON_ENCODE_ANY);

View File

@@ -8,6 +8,8 @@
#include <stdint.h>
#include <time.h>
#include "promise.h"
#define COSMO_UUID_SIZE 37
typedef struct {
@@ -32,16 +34,17 @@ void cosmo_shutdown(cosmo *instance);
const char *cosmo_current_profile(cosmo *instance);
json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by);
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id);
void cosmo_unsubscribe(cosmo *instance, json_t *subject);
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message);
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id, promise *promise_obj);
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj);
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj);
json_t *cosmo_get_messages(cosmo *instance, json_t *subject);
json_t *cosmo_get_last_message(cosmo *instance, json_t *subject);
// TODO
json_t *cosmo_get_pins(cosmo *instance, json_t *subject);
void cosmo_pin(cosmo *instance, json_t *subject, json_t *message);
void cosmo_unpin(cosmo *instance, json_t *subject, json_t *message);
void cosmo_get_profile(cosmo *instance, promise *promise_obj);
json_t *cosmo_get_pins(cosmo *instance, json_t *subject, promise *promise_obj);
void cosmo_pin(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj);
void cosmo_unpin(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj);
#endif

86
clients/c/promise.c Normal file
View File

@@ -0,0 +1,86 @@
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
#include "promise.h"
struct promise {
bool will_wait;
promise_callback on_success;
promise_callback on_failure;
void *passthrough;
bool fulfilled;
pthread_mutex_t lock;
pthread_cond_t cond;
bool success;
void *result;
};
promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough) {
promise *promise_obj = malloc(sizeof(*promise_obj));
promise_obj->will_wait = will_wait;
promise_obj->on_success = on_success;
promise_obj->on_failure = on_failure;
promise_obj->passthrough = passthrough;
promise_obj->fulfilled = false;
assert(!pthread_mutex_init(&promise_obj->lock, NULL));
assert(!pthread_cond_init(&promise_obj->cond, NULL));
return promise_obj;
}
static void promise_destroy(promise *promise_obj) {
assert(!pthread_mutex_destroy(&promise_obj->lock));
assert(!pthread_cond_destroy(&promise_obj->cond));
free(promise_obj);
}
bool promise_wait(promise *promise_obj, void **result) {
assert(!pthread_mutex_lock(&promise_obj->lock));
assert(promise_obj->will_wait);
while (!promise_obj->fulfilled) {
pthread_cond_wait(&promise_obj->cond, &promise_obj->lock);
}
assert(!pthread_mutex_unlock(&promise_obj->lock));
// promise_obj is now filled in, and owned solely by us.
bool success = promise_obj->success;
if (result) {
*result = promise_obj->result;
}
promise_destroy(promise_obj);
return success;
}
void promise_complete(promise *promise_obj, void *result, bool success) {
assert(!pthread_mutex_lock(&promise_obj->lock));
if (success && promise_obj->on_success) {
promise_obj->on_success(promise_obj->passthrough, result);
} else if (!success && promise_obj->on_failure) {
promise_obj->on_failure(promise_obj->passthrough, result);
}
if (promise_obj->will_wait) {
// We don't own promise_obj; pass to promise_wait()
promise_obj->result = result;
promise_obj->success = success;
promise_obj->fulfilled = true;
assert(!pthread_cond_signal(&promise_obj->cond));
assert(!pthread_mutex_unlock(&promise_obj->lock));
} else {
// We own promise_obj
assert(!pthread_mutex_unlock(&promise_obj->lock));
promise_destroy(promise_obj);
}
}
void promise_succeed(promise *promise_obj, void *result) {
promise_complete(promise_obj, result, true);
}
void promise_fail(promise *promise_obj, void *result) {
promise_complete(promise_obj, result, false);
}

19
clients/c/promise.h Normal file
View File

@@ -0,0 +1,19 @@
#ifndef _PROMISE_H
#define _PROMISE_H
#include <stdbool.h>
typedef struct promise promise;
// (passthrough, result)
typedef void (*promise_callback)(void *, void *);
promise *promise_create(bool will_wait, promise_callback on_success, promise_callback on_failure, void *passthrough);
bool promise_wait(promise *promise_obj, void **result);
void promise_complete(promise *promise_obj, void *result, bool success);
// Shortcuts for promise_complete()
void promise_succeed(promise *promise_obj, void *result);
void promise_fail(promise *promise_obj, void *result);
#endif

View File

@@ -191,10 +191,10 @@ bool test_message_round_trip(test_state *state) {
cosmo *client = create_client(state);
json_t *subject = random_subject(NULL, NULL);
cosmo_subscribe(client, subject, -1, 0);
cosmo_subscribe(client, subject, -1, 0, NULL);
json_t *message_out = random_message();
cosmo_send_message(client, subject, message_out);
cosmo_send_message(client, subject, message_out, NULL);
const json_t *message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
@@ -235,10 +235,10 @@ bool test_resubscribe_after_reconnect(test_state *state) {
cosmo *client = create_client(state);
json_t *subject = random_subject(NULL, NULL);
cosmo_subscribe(client, subject, -1, 0);
cosmo_subscribe(client, subject, -1, 0, NULL);
json_t *message_out = random_message();
cosmo_send_message(client, subject, message_out);
cosmo_send_message(client, subject, message_out, NULL);
const json_t *message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
json_decref(message_out);
@@ -247,7 +247,7 @@ bool test_resubscribe_after_reconnect(test_state *state) {
cosmo_uuid(client->instance_id);
message_out = random_message();
cosmo_send_message(client, subject, message_out);
cosmo_send_message(client, subject, message_out, NULL);
message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
json_decref(message_out);
@@ -264,16 +264,16 @@ bool test_bulk_subscribe(test_state *state) {
json_t *subject1 = random_subject(NULL, NULL);
json_t *subject2 = random_subject(NULL, NULL);
json_t *subjects = json_pack("[oo]", subject1, subject2);
cosmo_subscribe(client, subjects, -1, 0);
cosmo_subscribe(client, subjects, -1, 0, NULL);
json_t *message_out = random_message();
cosmo_send_message(client, subject1, message_out);
cosmo_send_message(client, subject1, message_out, NULL);
const json_t *message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
json_decref(message_out);
message_out = random_message();
cosmo_send_message(client, subject2, message_out);
cosmo_send_message(client, subject2, message_out, NULL);
message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
json_decref(message_out);
@@ -288,14 +288,14 @@ bool test_complex_object(test_state *state) {
cosmo *client = create_client(state);
json_t *subject = random_subject(NULL, NULL);
cosmo_subscribe(client, subject, -1, 0);
cosmo_subscribe(client, subject, -1, 0, NULL);
json_t *message_out = json_pack("{sssis[iiii]s{sssi}}",
"foo", "bar",
"zig", 5,
"zag", 16, 22, 59, 76,
"boo", "nested", "object", "eek", 100);
cosmo_send_message(client, subject, message_out);
cosmo_send_message(client, subject, message_out, NULL);
const json_t *message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
@@ -311,7 +311,7 @@ bool test_getmessages_subscribe(test_state *state) {
json_t *subject = random_subject(NULL, NULL);
assert(!cosmo_get_messages(client, subject));
cosmo_subscribe(client, subject, -1, 0);
cosmo_subscribe(client, subject, -1, 0, NULL);
json_t *messages = cosmo_get_messages(client, subject);
assert(messages);
json_decref(messages);
@@ -327,10 +327,10 @@ bool test_resubscribe(test_state *state) {
json_t *subject = random_subject(NULL, NULL);
json_t *message_out = random_message();
cosmo_send_message(client, subject, message_out);
cosmo_send_message(client, subject, message_out, NULL);
cosmo_subscribe(client, subject, 0, 0);
cosmo_subscribe(client, subject, -1, 0);
cosmo_subscribe(client, subject, 0, 0, NULL);
cosmo_subscribe(client, subject, -1, 0, NULL);
const json_t *message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));