Response parsing and RPC retries.
This commit is contained in:
@@ -11,13 +11,12 @@
|
|||||||
#include <uuid/uuid.h>
|
#include <uuid/uuid.h>
|
||||||
|
|
||||||
#define COSMO_UUID_SIZE 37
|
#define COSMO_UUID_SIZE 37
|
||||||
#define COSMO_CLIENT_ID_SIZE COSMO_UUID_SIZE
|
|
||||||
#define COSMO_CHECK_SECONDS 10
|
#define COSMO_CHECK_SECONDS 10
|
||||||
|
|
||||||
#define min(a, b) ((a) < (b) ? (a) : (b))
|
#define min(a, b) ((a) < (b) ? (a) : (b))
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char client_id[COSMO_CLIENT_ID_SIZE];
|
char client_id[COSMO_UUID_SIZE];
|
||||||
char instance_id[COSMO_UUID_SIZE];
|
char instance_id[COSMO_UUID_SIZE];
|
||||||
|
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
@@ -36,12 +35,6 @@ typedef struct {
|
|||||||
size_t recv_buf_len;
|
size_t recv_buf_len;
|
||||||
} cosmo_transfer;
|
} cosmo_transfer;
|
||||||
|
|
||||||
static void cosmo_generate_uuid(char *uuid) {
|
|
||||||
uuid_t uu;
|
|
||||||
uuid_generate(uu);
|
|
||||||
uuid_unparse_lower(uu, uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
||||||
cosmo_transfer *transfer = userp;
|
cosmo_transfer *transfer = userp;
|
||||||
size_t to_write = min(transfer->send_buf_len, size * nmemb);
|
size_t to_write = min(transfer->send_buf_len, size * nmemb);
|
||||||
@@ -102,7 +95,7 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Takes ownership of request
|
// Takes ownership of request.
|
||||||
static char *cosmo_send_http(cosmo *instance, char *request) {
|
static char *cosmo_send_http(cosmo *instance, char *request) {
|
||||||
CURL *curl = curl_easy_init();
|
CURL *curl = curl_easy_init();
|
||||||
assert(curl);
|
assert(curl);
|
||||||
@@ -121,12 +114,13 @@ static char *cosmo_send_http(cosmo *instance, char *request) {
|
|||||||
return ret ? transfer.recv_buf : NULL;
|
return ret ? transfer.recv_buf : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
// Takes ownership of commands.
|
||||||
|
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
||||||
char *request = cosmo_build_rpc(instance, commands);
|
char *request = cosmo_build_rpc(instance, commands);
|
||||||
|
|
||||||
char *response = cosmo_send_http(instance, request);
|
char *response = cosmo_send_http(instance, request);
|
||||||
if (!response) {
|
if (!response) {
|
||||||
return;
|
return commands;
|
||||||
}
|
}
|
||||||
|
|
||||||
json_error_t error;
|
json_error_t error;
|
||||||
@@ -134,12 +128,39 @@ static void cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
|||||||
if (!received) {
|
if (!received) {
|
||||||
fprintf(stderr, "json_loads() failed: %s (json: \"%s\")\n", error.text, response);
|
fprintf(stderr, "json_loads() failed: %s (json: \"%s\")\n", error.text, response);
|
||||||
free(response);
|
free(response);
|
||||||
return;
|
return commands;
|
||||||
}
|
}
|
||||||
printf("response: %s\n", response);
|
printf("response: %s\n", response);
|
||||||
free(response);
|
free(response);
|
||||||
|
|
||||||
|
json_t *command_responses = json_object_get(received, "responses");
|
||||||
|
if (!command_responses) {
|
||||||
|
fprintf(stderr, "response lacks \"responses\" key\n");
|
||||||
|
return commands;
|
||||||
|
}
|
||||||
|
|
||||||
|
json_t *to_retry = json_array();
|
||||||
|
size_t index;
|
||||||
|
json_t *command;
|
||||||
|
json_array_foreach(commands, index, command) {
|
||||||
|
json_t *command_response = json_array_get(command_responses, index);
|
||||||
|
json_t *result = json_object_get(command_response, "result");
|
||||||
|
if (!result) {
|
||||||
|
fprintf(stderr, "response lacks \"result\" key\n");
|
||||||
|
json_array_append(to_retry, command);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!strcmp(json_string_value(result), "retry")) {
|
||||||
|
json_array_append(to_retry, command);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Other result code.
|
||||||
|
}
|
||||||
|
|
||||||
|
json_decref(commands);
|
||||||
json_decref(received);
|
json_decref(received);
|
||||||
|
|
||||||
|
return to_retry;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *cosmo_thread_main(void *arg) {
|
static void *cosmo_thread_main(void *arg) {
|
||||||
@@ -150,9 +171,13 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
while (json_array_size(instance->command_queue)) {
|
while (json_array_size(instance->command_queue)) {
|
||||||
json_t *commands = instance->command_queue;
|
json_t *commands = instance->command_queue;
|
||||||
instance->command_queue = json_array();
|
instance->command_queue = json_array();
|
||||||
|
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
cosmo_send_rpc(instance, commands);
|
json_t *to_retry = cosmo_send_rpc(instance, commands);
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
|
||||||
|
json_array_extend(instance->command_queue, to_retry);
|
||||||
|
json_decref(to_retry);
|
||||||
}
|
}
|
||||||
assert(!pthread_cond_wait(&instance->cond, &instance->lock));
|
assert(!pthread_cond_wait(&instance->cond, &instance->lock));
|
||||||
}
|
}
|
||||||
@@ -177,8 +202,10 @@ static json_t *cosmo_command(char *name, json_t *arguments) {
|
|||||||
|
|
||||||
// Public interface below
|
// Public interface below
|
||||||
|
|
||||||
void cosmo_generate_client_id(char *client_id) {
|
void cosmo_generate_uuid(char *uuid) {
|
||||||
cosmo_generate_uuid(client_id);
|
uuid_t uu;
|
||||||
|
uuid_generate(uu);
|
||||||
|
uuid_unparse_lower(uu, uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
json_t *cosmo_subject(char *name, char *readable_only_by, char *writeable_only_by) {
|
json_t *cosmo_subject(char *name, char *readable_only_by, char *writeable_only_by) {
|
||||||
@@ -235,8 +262,8 @@ void cosmo_destroy(cosmo *instance) {
|
|||||||
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
char client_id[COSMO_CLIENT_ID_SIZE];
|
char client_id[COSMO_UUID_SIZE];
|
||||||
cosmo_generate_client_id(client_id);
|
cosmo_generate_uuid(client_id);
|
||||||
cosmo *instance = cosmo_create(client_id);
|
cosmo *instance = cosmo_create(client_id);
|
||||||
json_t *subject = cosmo_subject("foobar", NULL, NULL);
|
json_t *subject = cosmo_subject("foobar", NULL, NULL);
|
||||||
cosmo_subscribe(instance, subject, -1, 0);
|
cosmo_subscribe(instance, subject, -1, 0);
|
||||||
|
|||||||
Reference in New Issue
Block a user