From 28c1f9612c735f58d16d4c5d2a8f9df4c9a3bd64 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 3 Jun 2015 22:24:07 -0700 Subject: [PATCH] Actually send commands! --- clients/c/test.c | 61 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/clients/c/test.c b/clients/c/test.c index 6f7d4e5..a8b77a7 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -62,8 +62,8 @@ static size_t cosmo_write_callback(void *ptr, size_t size, size_t nmemb, void *u return to_read; } -static char *cosmo_build_rpc(cosmo *instance) { - json_t *to_send = json_pack("{sssss[]}", "client_id", instance->client_id, "instance_id", instance->instance_id, "commands"); +static char *cosmo_build_rpc(cosmo *instance, json_t *commands) { + json_t *to_send = json_pack("{sssssO}", "client_id", instance->client_id, "instance_id", instance->instance_id, "commands", commands); assert(to_send); char *ret = json_dumps(to_send, 0); assert(ret); @@ -121,8 +121,8 @@ static char *cosmo_send_http(cosmo *instance, char *request) { return ret ? transfer.recv_buf : NULL; } -static void cosmo_send_rpc(cosmo *instance) { - char *request = cosmo_build_rpc(instance); +static void cosmo_send_rpc(cosmo *instance, json_t *commands) { + char *request = cosmo_build_rpc(instance, commands); char *response = cosmo_send_http(instance, request); if (!response) { @@ -136,10 +136,9 @@ static void cosmo_send_rpc(cosmo *instance) { free(response); return; } + printf("response: %s\n", response); free(response); - printf("profile: %s\n", json_string_value(json_object_get(received, "profile"))); - json_decref(received); } @@ -148,17 +147,62 @@ static void *cosmo_thread_main(void *arg) { assert(!pthread_mutex_lock(&instance->lock)); while (!instance->shutdown) { - cosmo_send_rpc(instance); + while (json_array_size(instance->command_queue)) { + json_t *commands = instance->command_queue; + instance->command_queue = json_array(); + assert(!pthread_mutex_unlock(&instance->lock)); + cosmo_send_rpc(instance, commands); + assert(!pthread_mutex_lock(&instance->lock)); + } assert(!pthread_cond_wait(&instance->cond, &instance->lock)); } assert(!pthread_mutex_unlock(&instance->lock)); return NULL; } +// Takes ownership of command. +static void cosmo_send_command(cosmo *instance, json_t *command) { + assert(command); + assert(!pthread_mutex_lock(&instance->lock)); + json_array_append_new(instance->command_queue, command); + assert(!pthread_cond_signal(&instance->cond)); + assert(!pthread_mutex_unlock(&instance->lock)); +} + +// Takes ownership of arguments. +static json_t *cosmo_command(char *name, json_t *arguments) { + return json_pack("{ssso}", "command", name, "arguments", arguments); +} + + +// Public interface below + void cosmo_generate_client_id(char *client_id) { cosmo_generate_uuid(client_id); } +json_t *cosmo_subject(char *name, char *readable_only_by, char *writeable_only_by) { + json_t *ret = json_pack("{ss}", "name", name); + if (readable_only_by) { + json_object_set_new(ret, "readable_only_by", json_string(readable_only_by)); + } + if (writeable_only_by) { + json_object_set_new(ret, "writeable_only_by", json_string(writeable_only_by)); + } + return ret; +} + +void cosmo_subscribe(cosmo *instance, json_t *subject, json_int_t messages, json_int_t last_id) { + json_t *arguments = json_pack("{sO}", "subject", subject); + if (messages) { + json_object_set_new(arguments, "messages", json_integer(messages)); + } + if (last_id) { + json_object_set_new(arguments, "last_id", json_integer(last_id)); + } + cosmo_send_command(instance, cosmo_command("subscribe", arguments)); +} + cosmo *cosmo_create(char *client_id) { curl_global_init(CURL_GLOBAL_DEFAULT); @@ -194,6 +238,9 @@ int main(int argc, char *argv[]) { char client_id[COSMO_CLIENT_ID_SIZE]; cosmo_generate_client_id(client_id); cosmo *instance = cosmo_create(client_id); + json_t *subject = cosmo_subject("foobar", NULL, NULL); + cosmo_subscribe(instance, subject, -1, 0); + json_decref(subject); sleep(5); cosmo_destroy(instance);