diff --git a/clients/c/test.c b/clients/c/test.c index a8b77a7..87629de 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -11,13 +11,12 @@ #include #define COSMO_UUID_SIZE 37 -#define COSMO_CLIENT_ID_SIZE COSMO_UUID_SIZE #define COSMO_CHECK_SECONDS 10 #define min(a, b) ((a) < (b) ? (a) : (b)) typedef struct { - char client_id[COSMO_CLIENT_ID_SIZE]; + char client_id[COSMO_UUID_SIZE]; char instance_id[COSMO_UUID_SIZE]; pthread_mutex_t lock; @@ -36,12 +35,6 @@ typedef struct { size_t recv_buf_len; } cosmo_transfer; -static void cosmo_generate_uuid(char *uuid) { - uuid_t uu; - uuid_generate(uu); - uuid_unparse_lower(uu, uuid); -} - static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) { cosmo_transfer *transfer = userp; size_t to_write = min(transfer->send_buf_len, size * nmemb); @@ -102,7 +95,7 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL return true; } -// Takes ownership of request +// Takes ownership of request. static char *cosmo_send_http(cosmo *instance, char *request) { CURL *curl = curl_easy_init(); assert(curl); @@ -121,12 +114,13 @@ static char *cosmo_send_http(cosmo *instance, char *request) { return ret ? transfer.recv_buf : NULL; } -static void cosmo_send_rpc(cosmo *instance, json_t *commands) { +// Takes ownership of commands. +static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { char *request = cosmo_build_rpc(instance, commands); char *response = cosmo_send_http(instance, request); if (!response) { - return; + return commands; } json_error_t error; @@ -134,12 +128,39 @@ static void cosmo_send_rpc(cosmo *instance, json_t *commands) { if (!received) { fprintf(stderr, "json_loads() failed: %s (json: \"%s\")\n", error.text, response); free(response); - return; + return commands; } printf("response: %s\n", response); free(response); + json_t *command_responses = json_object_get(received, "responses"); + if (!command_responses) { + fprintf(stderr, "response lacks \"responses\" key\n"); + return commands; + } + + json_t *to_retry = json_array(); + size_t index; + json_t *command; + json_array_foreach(commands, index, command) { + json_t *command_response = json_array_get(command_responses, index); + json_t *result = json_object_get(command_response, "result"); + if (!result) { + fprintf(stderr, "response lacks \"result\" key\n"); + json_array_append(to_retry, command); + continue; + } + if (!strcmp(json_string_value(result), "retry")) { + json_array_append(to_retry, command); + continue; + } + // Other result code. + } + + json_decref(commands); json_decref(received); + + return to_retry; } static void *cosmo_thread_main(void *arg) { @@ -150,9 +171,13 @@ static void *cosmo_thread_main(void *arg) { while (json_array_size(instance->command_queue)) { json_t *commands = instance->command_queue; instance->command_queue = json_array(); + assert(!pthread_mutex_unlock(&instance->lock)); - cosmo_send_rpc(instance, commands); + json_t *to_retry = cosmo_send_rpc(instance, commands); assert(!pthread_mutex_lock(&instance->lock)); + + json_array_extend(instance->command_queue, to_retry); + json_decref(to_retry); } assert(!pthread_cond_wait(&instance->cond, &instance->lock)); } @@ -177,8 +202,10 @@ static json_t *cosmo_command(char *name, json_t *arguments) { // Public interface below -void cosmo_generate_client_id(char *client_id) { - cosmo_generate_uuid(client_id); +void cosmo_generate_uuid(char *uuid) { + uuid_t uu; + uuid_generate(uu); + uuid_unparse_lower(uu, uuid); } json_t *cosmo_subject(char *name, char *readable_only_by, char *writeable_only_by) { @@ -235,8 +262,8 @@ void cosmo_destroy(cosmo *instance) { int main(int argc, char *argv[]) { - char client_id[COSMO_CLIENT_ID_SIZE]; - cosmo_generate_client_id(client_id); + char client_id[COSMO_UUID_SIZE]; + cosmo_generate_uuid(client_id); cosmo *instance = cosmo_create(client_id); json_t *subject = cosmo_subject("foobar", NULL, NULL); cosmo_subscribe(instance, subject, -1, 0);