|
|
|
|
@@ -180,6 +180,11 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void cosmo_handle_event(cosmo *instance, json_t *event) {
|
|
|
|
|
json_t *event_id = json_object_get(event, "event_id");
|
|
|
|
|
if (event_id) {
|
|
|
|
|
json_array_append(instance->ack, event_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const char *event_type = json_string_value(json_object_get(event, "event_type"));
|
|
|
|
|
if (strcmp(event_type, "message") == 0) {
|
|
|
|
|
cosmo_handle_message(instance, event);
|
|
|
|
|
@@ -194,13 +199,19 @@ static json_t *cosmo_command(const char *name, const json_t *arguments) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Takes ownership of commands.
|
|
|
|
|
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
|
|
|
|
// Always poll.
|
|
|
|
|
json_array_append_new(commands, cosmo_command("poll", json_array()));
|
|
|
|
|
// Takes ownership of ack.
|
|
|
|
|
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
|
|
|
|
|
json_t *int_commands = json_array();
|
|
|
|
|
|
|
|
|
|
char *request = cosmo_build_rpc(instance, commands);
|
|
|
|
|
// Always poll.
|
|
|
|
|
json_t *arguments = json_pack("{so}", "ack", ack);
|
|
|
|
|
json_array_append_new(int_commands, cosmo_command("poll", arguments));
|
|
|
|
|
json_array_extend(int_commands, commands);
|
|
|
|
|
|
|
|
|
|
char *request = cosmo_build_rpc(instance, int_commands);
|
|
|
|
|
|
|
|
|
|
char *response = cosmo_send_http(instance, request);
|
|
|
|
|
json_decref(int_commands);
|
|
|
|
|
if (!response) {
|
|
|
|
|
return commands;
|
|
|
|
|
}
|
|
|
|
|
@@ -243,7 +254,8 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
|
|
|
|
json_t *to_retry = json_array();
|
|
|
|
|
json_t *command;
|
|
|
|
|
json_array_foreach(commands, index, command) {
|
|
|
|
|
json_t *command_response = json_array_get(command_responses, index);
|
|
|
|
|
// +1 for the poll offset from int_commands.
|
|
|
|
|
json_t *command_response = json_array_get(command_responses, index + 1);
|
|
|
|
|
json_t *result = json_object_get(command_response, "result");
|
|
|
|
|
if (!result) {
|
|
|
|
|
fprintf(stderr, "response lacks \"result\" key\n");
|
|
|
|
|
@@ -251,24 +263,6 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (!strcmp(json_string_value(result), "retry")) {
|
|
|
|
|
// One giant hack to change queued subscribe messages to ask for less data.
|
|
|
|
|
// Remove this once we get channels working; it's not worth the yuck.
|
|
|
|
|
const char *name = json_string_value(json_object_get(command, "command"));
|
|
|
|
|
if (!strcmp(name, "subscribe")) {
|
|
|
|
|
json_t *arguments = json_object_get(command, "arguments");
|
|
|
|
|
json_t *subject = json_object_get(arguments, "subject");
|
|
|
|
|
json_t *last_message = cosmo_get_last_message(instance, subject);
|
|
|
|
|
if (last_message) {
|
|
|
|
|
json_t *last_id = json_object_get(arguments, "last_id");
|
|
|
|
|
json_int_t last_id_val = last_id ? json_integer_value(last_id) : 0;
|
|
|
|
|
json_int_t local_last_id = json_integer_value(json_object_get(last_message, "id"));
|
|
|
|
|
if (local_last_id > last_id_val) {
|
|
|
|
|
json_object_del(arguments, "messages");
|
|
|
|
|
json_object_set_new(arguments, "last_id", json_integer(local_last_id));
|
|
|
|
|
}
|
|
|
|
|
json_decref(last_message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
json_array_append(to_retry, command);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
@@ -287,12 +281,14 @@ static void *cosmo_thread_main(void *arg) {
|
|
|
|
|
while (!instance->shutdown) {
|
|
|
|
|
json_t *commands = instance->command_queue;
|
|
|
|
|
instance->command_queue = json_array();
|
|
|
|
|
json_t *ack = instance->ack;
|
|
|
|
|
instance->ack = json_array();
|
|
|
|
|
|
|
|
|
|
instance->next_delay_ms = CYCLE_MS;
|
|
|
|
|
instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR);
|
|
|
|
|
|
|
|
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
|
|
|
json_t *to_retry = cosmo_send_rpc(instance, commands);
|
|
|
|
|
json_t *to_retry = cosmo_send_rpc(instance, commands, ack);
|
|
|
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
|
|
|
|
|
|
|
|
json_array_extend(instance->command_queue, to_retry);
|
|
|
|
|
@@ -456,6 +452,8 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
|
|
|
|
|
instance->profile = NULL;
|
|
|
|
|
instance->command_queue = json_array();
|
|
|
|
|
assert(instance->command_queue);
|
|
|
|
|
instance->ack = json_array();
|
|
|
|
|
assert(instance->ack);
|
|
|
|
|
instance->subscriptions = json_array();
|
|
|
|
|
assert(instance->subscriptions);
|
|
|
|
|
instance->next_delay_ms = 0;
|
|
|
|
|
@@ -475,6 +473,7 @@ void cosmo_shutdown(cosmo *instance) {
|
|
|
|
|
assert(!pthread_mutex_destroy(&instance->lock));
|
|
|
|
|
assert(!pthread_cond_destroy(&instance->cond));
|
|
|
|
|
json_decref(instance->command_queue);
|
|
|
|
|
json_decref(instance->ack);
|
|
|
|
|
json_decref(instance->subscriptions);
|
|
|
|
|
free(instance->profile);
|
|
|
|
|
curl_easy_cleanup(instance->curl);
|
|
|
|
|
|