831 lines
27 KiB
C
831 lines
27 KiB
C
#include <assert.h>
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <stdarg.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <strings.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <time.h>
|
|
#include <unistd.h>
|
|
|
|
#include <uuid/uuid.h>
|
|
|
|
#include "cosmopolite.h"
|
|
#include "cosmopolite-int.h"
|
|
|
|
#define min(a, b) ((a) < (b) ? (a) : (b))
|
|
#define max(a, b) ((a) > (b) ? (a) : (b))
|
|
|
|
#define CYCLE_MS 10000
|
|
#define CYCLE_STAGGER_FACTOR 10
|
|
#define CONNECT_TIMEOUT_S 60
|
|
|
|
enum {
|
|
SUBSCRIPTION_PENDING,
|
|
SUBSCRIPTION_ACTIVE,
|
|
};
|
|
|
|
typedef struct {
|
|
char *send_buf;
|
|
size_t send_buf_len;
|
|
|
|
char *recv_buf;
|
|
size_t recv_buf_len;
|
|
|
|
int64_t retry_after;
|
|
} cosmo_transfer;
|
|
|
|
static int cosmo_random_fd = -1;
|
|
|
|
static void cosmo_random_cleanup() {
|
|
assert(!close(cosmo_random_fd));
|
|
}
|
|
|
|
static uint64_t cosmo_random() {
|
|
if (cosmo_random_fd == -1) {
|
|
cosmo_random_fd = open("/dev/urandom", O_RDONLY);
|
|
assert(cosmo_random_fd >= 0);
|
|
atexit(cosmo_random_cleanup);
|
|
}
|
|
|
|
uint64_t ret;
|
|
assert(read(cosmo_random_fd, &ret, sizeof(ret)) == sizeof(ret));
|
|
return ret;
|
|
}
|
|
|
|
static void cosmo_log(cosmo *instance, const char *fmt, ...) {
|
|
if (!instance->debug) {
|
|
return;
|
|
}
|
|
|
|
va_list ap;
|
|
va_start(ap, fmt);
|
|
|
|
fprintf(stderr, "%s: ", instance->instance_id);
|
|
vfprintf(stderr, fmt, ap);
|
|
fprintf(stderr, "\n");
|
|
|
|
va_end(ap);
|
|
}
|
|
|
|
static void cosmo_append_command(struct cosmo_command **head, struct cosmo_command **tail, struct cosmo_command *command) {
|
|
command->prev = *tail;
|
|
if (command->prev) {
|
|
command->prev->next = command;
|
|
}
|
|
command->next = NULL;
|
|
*tail = command;
|
|
if (!*head) {
|
|
*head = command;
|
|
}
|
|
}
|
|
|
|
static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
|
|
size_t i;
|
|
json_t *subscription;
|
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
|
if (json_equal(json_object_get(subscription, "subject"), subject)) {
|
|
return subscription;
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void cosmo_remove_subscription(cosmo *instance, json_t *subject) {
|
|
size_t i;
|
|
json_t *subscription;
|
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
|
if (json_equal(json_object_get(subscription, "subject"), subject)) {
|
|
json_array_remove(instance->subscriptions, i);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void cosmo_send_command_locked(cosmo *instance, json_t *command, promise *promise_obj) {
|
|
struct cosmo_command *command_obj = malloc(sizeof(*command_obj));
|
|
command_obj->command = command;
|
|
command_obj->promise = promise_obj;
|
|
cosmo_append_command(&instance->command_queue_head, &instance->command_queue_tail, command_obj);
|
|
instance->next_delay_ms = 0;
|
|
}
|
|
|
|
// Takes ownership of command.
|
|
static void cosmo_send_command(cosmo *instance, json_t *command, promise *promise_obj) {
|
|
assert(command);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
cosmo_send_command_locked(instance, command, promise_obj);
|
|
assert(!pthread_cond_signal(&instance->cond));
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
}
|
|
|
|
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
|
cosmo_transfer *transfer = userp;
|
|
size_t to_write = min(transfer->send_buf_len, size * nmemb);
|
|
memcpy(ptr, transfer->send_buf, to_write);
|
|
transfer->send_buf += to_write;
|
|
transfer->send_buf_len -= to_write;
|
|
return to_write;
|
|
}
|
|
|
|
static size_t cosmo_write_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
|
cosmo_transfer *transfer = userp;
|
|
size_t to_read = size * nmemb;
|
|
transfer->recv_buf = realloc(transfer->recv_buf, transfer->recv_buf_len + to_read + 1);
|
|
assert(transfer->recv_buf);
|
|
memcpy(transfer->recv_buf + transfer->recv_buf_len, ptr, to_read);
|
|
transfer->recv_buf_len += to_read;
|
|
transfer->recv_buf[transfer->recv_buf_len] = '\0';
|
|
return to_read;
|
|
}
|
|
|
|
static size_t cosmo_header_callback(char *ptr, size_t size, size_t nmemb, void *userp) {
|
|
cosmo_transfer *transfer = userp;
|
|
size_t length = size * nmemb;
|
|
#define RETRY_AFTER_HEADER "Retry-After: 0\r\n"
|
|
#define RETRY_AFTER_HEADER_SIZE (sizeof(RETRY_AFTER_HEADER) - 1)
|
|
if (length == RETRY_AFTER_HEADER_SIZE &&
|
|
strncasecmp(ptr, RETRY_AFTER_HEADER, RETRY_AFTER_HEADER_SIZE) == 0) {
|
|
transfer->retry_after = 0;
|
|
}
|
|
return length;
|
|
}
|
|
|
|
static char *cosmo_build_rpc(const cosmo *instance, const json_t *commands) {
|
|
json_t *to_send = json_pack("{sssssO}", "client_id", instance->client_id, "instance_id", instance->instance_id, "commands", commands);
|
|
assert(to_send);
|
|
char *ret = json_dumps(to_send, 0);
|
|
assert(ret);
|
|
json_decref(to_send);
|
|
return ret;
|
|
}
|
|
|
|
static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) {
|
|
CURLcode res;
|
|
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_POSTFIELDSIZE, transfer->send_buf_len));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer));
|
|
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
res = curl_easy_perform(instance->curl);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
|
|
if (res) {
|
|
return false;
|
|
}
|
|
|
|
long return_code;
|
|
assert(curl_easy_getinfo(instance->curl, CURLINFO_RESPONSE_CODE, &return_code) == CURLE_OK);
|
|
if (return_code != 200) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
// Takes ownership of request.
|
|
static char *cosmo_send_http(cosmo *instance, char *request) {
|
|
cosmo_transfer transfer = {
|
|
.send_buf = request,
|
|
.send_buf_len = strlen(request),
|
|
.recv_buf = NULL,
|
|
.recv_buf_len = 0,
|
|
.retry_after = -1
|
|
};
|
|
|
|
int ret = cosmo_send_http_int(instance, &transfer);
|
|
|
|
if (transfer.retry_after >= 0) {
|
|
instance->next_delay_ms = transfer.retry_after * 1000;
|
|
}
|
|
|
|
free(request);
|
|
|
|
return ret ? transfer.recv_buf : NULL;
|
|
}
|
|
|
|
static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|
json_t *subject;
|
|
int id;
|
|
char *message_content;
|
|
if (json_unpack(event, "{sosiss}", "subject", &subject, "id", &id, "message", &message_content)) {
|
|
cosmo_log(instance, "invalid message event");
|
|
return;
|
|
}
|
|
|
|
json_error_t err;
|
|
json_t *message_object = json_loads(message_content, JSON_DECODE_ANY, &err);
|
|
if (!message_object) {
|
|
cosmo_log(instance, "error parsing message content: %s", err.text);
|
|
return;
|
|
}
|
|
json_object_set_new(event, "message", message_object);
|
|
|
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
|
if (!subscription) {
|
|
cosmo_log(instance, "message from unknown subject");
|
|
return;
|
|
}
|
|
|
|
json_t *messages = json_object_get(subscription, "messages");
|
|
ssize_t insert_after;
|
|
for (insert_after = json_array_size(messages) - 1; insert_after >= 0; insert_after--) {
|
|
json_t *message = json_array_get(messages, insert_after);
|
|
json_int_t message_id = json_integer_value(json_object_get(message, "id"));
|
|
if (message_id == id) {
|
|
return;
|
|
}
|
|
if (message_id < id) {
|
|
break;
|
|
}
|
|
}
|
|
json_array_insert(messages, insert_after + 1, event);
|
|
|
|
if (instance->callbacks.message) {
|
|
cosmo_log(instance, "callbacks.message()");
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
instance->callbacks.message(event, instance->passthrough);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_handle_client_id_change(cosmo *instance) {
|
|
if (instance->callbacks.client_id_change) {
|
|
cosmo_log(instance, "callbacks.client_id_change()");
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
instance->callbacks.client_id_change(instance->passthrough, instance->client_id);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_handle_connect(cosmo *instance) {
|
|
if (instance->connect_state == CONNECTED) {
|
|
return;
|
|
}
|
|
instance->connect_state = CONNECTED;
|
|
if (instance->callbacks.connect) {
|
|
cosmo_log(instance, "callbacks.connect()");
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
instance->callbacks.connect(instance->passthrough);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_handle_disconnect(cosmo *instance) {
|
|
if (instance->connect_state == DISCONNECTED) {
|
|
return;
|
|
}
|
|
instance->connect_state = DISCONNECTED;
|
|
if (instance->callbacks.disconnect) {
|
|
cosmo_log(instance, "callbacks.disconnect()");
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
instance->callbacks.disconnect(instance->passthrough);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_handle_login(cosmo *instance, json_t *event) {
|
|
if (instance->login_state == LOGGED_IN) {
|
|
return;
|
|
}
|
|
instance->login_state = LOGGED_IN;
|
|
if (instance->callbacks.login) {
|
|
cosmo_log(instance, "callbacks.login()");
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
instance->callbacks.login(instance->passthrough);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_handle_logout(cosmo *instance, json_t *event) {
|
|
if (instance->login_state == LOGGED_OUT) {
|
|
return;
|
|
}
|
|
instance->login_state = LOGGED_OUT;
|
|
if (instance->callbacks.logout) {
|
|
cosmo_log(instance, "callbacks.logout()");
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
instance->callbacks.logout(instance->passthrough);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_handle_event(cosmo *instance, json_t *event) {
|
|
json_t *event_id = json_object_get(event, "event_id");
|
|
if (event_id) {
|
|
json_array_append(instance->ack, event_id);
|
|
}
|
|
|
|
const char *event_type = json_string_value(json_object_get(event, "event_type"));
|
|
if (!strcmp(event_type, "message")) {
|
|
cosmo_handle_message(instance, event);
|
|
} else if (!strcmp(event_type, "login")) {
|
|
cosmo_handle_login(instance, event);
|
|
} else if (!strcmp(event_type, "logout")) {
|
|
cosmo_handle_logout(instance, event);
|
|
} else {
|
|
cosmo_log(instance, "unknown event type: %s", event_type);
|
|
}
|
|
}
|
|
|
|
static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
|
json_t *subject;
|
|
assert(!json_unpack(command->command, "{s{so}}", "arguments", "subject", &subject));
|
|
|
|
if (strcmp(result, "ok")) {
|
|
cosmo_remove_subscription(instance, subject);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_fail(command->promise, NULL, NULL);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
return;
|
|
}
|
|
|
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
|
if (subscription) {
|
|
// Might have unsubscribed later
|
|
json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE));
|
|
}
|
|
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_succeed(command->promise, NULL, NULL);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
|
|
static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_complete(command->promise, NULL, NULL, (strcmp(result, "ok") == 0));
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
|
|
static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
|
json_t *message;
|
|
int err = json_unpack(response, "{so}", "message", &message);
|
|
if (err || (strcmp(result, "ok") && strcmp(result, "duplicate_message"))) {
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_fail(command->promise, NULL, NULL);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
} else {
|
|
char *message_content;
|
|
assert(!json_unpack(message, "{ss}", "message", &message_content));
|
|
json_t *message_object = json_loads(message_content, JSON_DECODE_ANY, NULL);
|
|
assert(message_object);
|
|
json_object_set_new(message, "message", message_object);
|
|
|
|
json_incref(message);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_succeed(command->promise, message, (promise_cleanup) json_decref);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
}
|
|
}
|
|
|
|
static void cosmo_complete_rpc(cosmo *instance, struct cosmo_command *command, json_t *response) {
|
|
char *command_name, *result;
|
|
assert(!json_unpack(command->command, "{ss}", "command", &command_name));
|
|
assert(!json_unpack(response, "{ss}", "result", &result));
|
|
if (!strcmp(command_name, "subscribe")) {
|
|
cosmo_complete_subscribe(instance, command, response, result);
|
|
} else if (!strcmp(command_name, "unsubscribe")) {
|
|
cosmo_complete_unsubscribe(instance, command, response, result);
|
|
} else if (!strcmp(command_name, "sendMessage")) {
|
|
cosmo_complete_send_message(instance, command, response, result);
|
|
}
|
|
}
|
|
|
|
// Takes ownership of arguments.
|
|
static json_t *cosmo_command(const char *name, const json_t *arguments) {
|
|
return json_pack("{ssso}", "command", name, "arguments", arguments);
|
|
}
|
|
|
|
static void cosmo_resubscribe(cosmo *instance) {
|
|
size_t i;
|
|
json_t *subscription;
|
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
|
int state;
|
|
json_t *subject, *messages;
|
|
assert(!json_unpack(subscription, "{sisoso}", "state", &state, "subject", &subject, "messages", &messages));
|
|
|
|
if (state == SUBSCRIPTION_PENDING) {
|
|
continue;
|
|
}
|
|
|
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
|
if (json_array_size(messages)) {
|
|
// Restart at the last actual ID we received.
|
|
json_t *last_message = json_array_get(messages, json_array_size(messages) - 1);
|
|
json_object_set(arguments, "last_id", json_object_get(last_message, "id"));
|
|
} else {
|
|
json_t *num_messages = json_object_get(subscription, "num_messages");
|
|
if (num_messages) {
|
|
json_object_set(arguments, "messages", num_messages);
|
|
}
|
|
json_t *last_id = json_object_get(subscription, "last_id");
|
|
if (last_id) {
|
|
json_object_set(arguments, "last_id", last_id);
|
|
}
|
|
}
|
|
|
|
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), NULL);
|
|
}
|
|
}
|
|
|
|
// Takes ownership of commands.
|
|
// Takes ownership of ack.
|
|
static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_command *commands, json_t *ack) {
|
|
json_t *int_commands = json_array();
|
|
|
|
// Always poll.
|
|
json_t *arguments = json_pack("{so}", "ack", ack);
|
|
json_array_append_new(int_commands, cosmo_command("poll", arguments));
|
|
struct cosmo_command *command_iter = commands;
|
|
while (command_iter) {
|
|
json_array_append(int_commands, command_iter->command);
|
|
command_iter = command_iter->next;
|
|
}
|
|
|
|
char *request = cosmo_build_rpc(instance, int_commands);
|
|
cosmo_log(instance, "--> %s", request);
|
|
|
|
char *response = cosmo_send_http(instance, request);
|
|
json_decref(int_commands);
|
|
if (!response) {
|
|
return commands;
|
|
}
|
|
cosmo_log(instance, "<-- %s", response);
|
|
|
|
json_error_t error;
|
|
json_t *received = json_loads(response, 0, &error);
|
|
if (!received) {
|
|
cosmo_log(instance, "json_loads() failed: %s (json: \"%s\")", error.text, response);
|
|
free(response);
|
|
return commands;
|
|
}
|
|
free(response);
|
|
|
|
json_t *command_responses, *events, *profile;
|
|
if (json_unpack(received, "{sososo}", "profile", &profile, "responses", &command_responses, "events", &events)) {
|
|
cosmo_log(instance, "invalid server response");
|
|
json_decref(received);
|
|
return commands;
|
|
}
|
|
|
|
// TODO: major locking problems through here
|
|
if (!json_equal(instance->profile, profile)) {
|
|
json_decref(instance->profile);
|
|
json_incref(profile);
|
|
instance->profile = profile;
|
|
struct cosmo_get_profile *get_profile_iter = instance->get_profile_head;
|
|
while (get_profile_iter) {
|
|
struct cosmo_get_profile *next = get_profile_iter->next;
|
|
json_incref(instance->profile);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_succeed(get_profile_iter->promise, instance->profile, (promise_cleanup)json_decref);
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
free(get_profile_iter);
|
|
get_profile_iter = next;
|
|
}
|
|
instance->get_profile_head = NULL;
|
|
}
|
|
|
|
// Should actually be a monotonic clock.
|
|
assert(timespec_get(&instance->last_success, TIME_UTC) == TIME_UTC);
|
|
cosmo_handle_connect(instance);
|
|
|
|
size_t index;
|
|
json_t *event;
|
|
json_array_foreach(events, index, event) {
|
|
cosmo_handle_event(instance, event);
|
|
}
|
|
|
|
json_t *poll_response = json_array_get(command_responses, 0);
|
|
json_t *instance_generation;
|
|
if (json_unpack(poll_response, "{so}", "instance_generation", &instance_generation)) {
|
|
cosmo_log(instance, "invalid poll response");
|
|
} else {
|
|
if (!json_equal(instance_generation, instance->generation)) {
|
|
json_decref(instance->generation);
|
|
json_incref(instance_generation);
|
|
instance->generation = instance_generation;
|
|
cosmo_resubscribe(instance);
|
|
}
|
|
}
|
|
|
|
command_iter = commands;
|
|
struct cosmo_command *to_retry_head = NULL, *to_retry_tail = NULL;
|
|
json_t *command_response;
|
|
json_array_foreach(command_responses, index, command_response) {
|
|
if (index == 0) {
|
|
// Skip poll response; don't increment command_iter
|
|
continue;
|
|
}
|
|
|
|
if (!command_iter) {
|
|
cosmo_log(instance, "more responses than requests");
|
|
continue;
|
|
}
|
|
struct cosmo_command *command_next = command_iter->next;
|
|
|
|
char *result;
|
|
if (json_unpack(command_response, "{ss}", "result", &result)) {
|
|
cosmo_log(instance, "invalid command response");
|
|
cosmo_append_command(&to_retry_head, &to_retry_tail, command_iter);
|
|
command_iter = command_next;
|
|
continue;
|
|
}
|
|
if (!strcmp(result, "retry")) {
|
|
cosmo_append_command(&to_retry_head, &to_retry_tail, command_iter);
|
|
command_iter = command_next;
|
|
continue;
|
|
}
|
|
|
|
cosmo_complete_rpc(instance, command_iter, command_response);
|
|
|
|
json_decref(command_iter->command);
|
|
free(command_iter);
|
|
command_iter = command_next;
|
|
}
|
|
|
|
json_decref(received);
|
|
|
|
return to_retry_head;
|
|
}
|
|
|
|
static void *cosmo_thread_main(void *arg) {
|
|
cosmo *instance = arg;
|
|
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
while (!instance->shutdown) {
|
|
struct cosmo_command *commands = instance->command_queue_head;
|
|
instance->command_queue_head = instance->command_queue_tail = NULL;
|
|
json_t *ack = instance->ack;
|
|
instance->ack = json_array();
|
|
|
|
instance->next_delay_ms = CYCLE_MS;
|
|
instance->next_delay_ms += cosmo_random() % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR);
|
|
|
|
struct cosmo_command *to_retry = cosmo_send_rpc(instance, commands, ack);
|
|
{
|
|
struct timespec now;
|
|
assert(timespec_get(&now, TIME_UTC) == TIME_UTC);
|
|
if (now.tv_sec - instance->last_success.tv_sec > CONNECT_TIMEOUT_S) {
|
|
cosmo_handle_disconnect(instance);
|
|
}
|
|
}
|
|
|
|
if (to_retry) {
|
|
to_retry->prev = instance->command_queue_tail;
|
|
if (to_retry->prev) {
|
|
to_retry->prev->next = to_retry;
|
|
}
|
|
instance->command_queue_tail = to_retry;
|
|
if (!instance->command_queue_head) {
|
|
instance->command_queue_head = to_retry;
|
|
}
|
|
}
|
|
|
|
#define MS_PER_S 1000
|
|
#define NS_PER_MS 1000000
|
|
struct timespec ts;
|
|
assert(timespec_get(&ts, TIME_UTC) == TIME_UTC);
|
|
uint64_t target_ms = (ts.tv_sec * MS_PER_S) + (ts.tv_nsec / NS_PER_MS) + instance->next_delay_ms;
|
|
ts.tv_sec = target_ms / MS_PER_S;
|
|
ts.tv_nsec = (target_ms % MS_PER_S) * NS_PER_MS;
|
|
pthread_cond_timedwait(&instance->cond, &instance->lock, &ts);
|
|
}
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
return NULL;
|
|
}
|
|
|
|
|
|
// Public interface below
|
|
|
|
void cosmo_uuid(char *uuid) {
|
|
uuid_t uu;
|
|
uuid_generate(uu);
|
|
uuid_unparse_lower(uu, uuid);
|
|
}
|
|
|
|
void cosmo_get_profile(cosmo *instance, promise *promise_obj) {
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
if (json_is_string(instance->profile)) {
|
|
json_t *profile = instance->profile;
|
|
json_incref(profile);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
promise_succeed(promise_obj, instance->profile, (promise_cleanup)json_decref);
|
|
return;
|
|
}
|
|
struct cosmo_get_profile *entry = malloc(sizeof(*entry));
|
|
assert(entry);
|
|
entry->next = instance->get_profile_head;
|
|
entry->promise = promise_obj;
|
|
instance->get_profile_head = entry;
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
}
|
|
|
|
json_t *cosmo_current_profile(cosmo *instance) {
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
json_t *profile = instance->profile;
|
|
json_incref(profile);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
return profile;
|
|
}
|
|
|
|
json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by) {
|
|
json_t *ret = json_pack("{ss}", "name", name);
|
|
if (readable_only_by) {
|
|
json_object_set_new(ret, "readable_only_by", json_string(readable_only_by));
|
|
}
|
|
if (writeable_only_by) {
|
|
json_object_set_new(ret, "writeable_only_by", json_string(writeable_only_by));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id, promise *promise_obj) {
|
|
if (json_is_array(subjects)) {
|
|
json_incref(subjects);
|
|
} else {
|
|
subjects = json_pack("[O]", subjects);
|
|
assert(subjects);
|
|
}
|
|
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
size_t i;
|
|
json_t *subject;
|
|
json_array_foreach(subjects, i, subject) {
|
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
|
if (!subscription) {
|
|
subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING);
|
|
json_array_append_new(instance->subscriptions, subscription);
|
|
}
|
|
|
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
|
if (messages) {
|
|
json_object_set_new(arguments, "messages", json_integer(messages));
|
|
json_object_set_new(subscription, "num_messages", json_integer(messages));
|
|
}
|
|
if (last_id) {
|
|
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
|
json_object_set_new(subscription, "last_id", json_integer(last_id));
|
|
}
|
|
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), promise_obj);
|
|
}
|
|
assert(!pthread_cond_signal(&instance->cond));
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
|
|
json_decref(subjects);
|
|
}
|
|
|
|
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
cosmo_remove_subscription(instance, subject);
|
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
|
cosmo_send_command_locked(instance, cosmo_command("unsubscribe", arguments), promise_obj);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
}
|
|
|
|
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) {
|
|
char sender_message_id[COSMO_UUID_SIZE];
|
|
cosmo_uuid(sender_message_id);
|
|
char *encoded = json_dumps(message, JSON_ENCODE_ANY);
|
|
json_t *arguments = json_pack("{sOssss}",
|
|
"subject", subject,
|
|
"message", encoded,
|
|
"sender_message_id", sender_message_id);
|
|
cosmo_send_command(instance, cosmo_command("sendMessage", arguments), promise_obj);
|
|
free(encoded);
|
|
}
|
|
|
|
json_t *cosmo_get_messages(cosmo *instance, json_t *subject) {
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
|
if (!subscription) {
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
return NULL;
|
|
}
|
|
json_t *messages = json_object_get(subscription, "messages");
|
|
json_t *ret = json_deep_copy(messages);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
|
|
return ret;
|
|
}
|
|
|
|
json_t *cosmo_get_last_message(cosmo *instance, json_t *subject) {
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
|
if (!subscription) {
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
return NULL;
|
|
}
|
|
json_t *messages = json_object_get(subscription, "messages");
|
|
json_t *last_message = json_array_get(messages, json_array_size(messages) - 1);
|
|
json_t *ret = json_deep_copy(last_message);
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
|
|
return ret;
|
|
}
|
|
|
|
cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_callbacks *callbacks, const cosmo_options *options, void *passthrough) {
|
|
curl_global_init(CURL_GLOBAL_DEFAULT);
|
|
|
|
cosmo *instance = malloc(sizeof(cosmo));
|
|
assert(instance);
|
|
|
|
assert(!pthread_mutex_init(&instance->lock, NULL));
|
|
assert(!pthread_cond_init(&instance->cond, NULL));
|
|
|
|
assert(!pthread_mutex_lock(&instance->lock));
|
|
|
|
instance->debug = getenv("COSMO_DEBUG");
|
|
|
|
memcpy(&instance->callbacks, callbacks, sizeof(instance->callbacks));
|
|
if (options) {
|
|
memcpy(&instance->options, options, sizeof(instance->options));
|
|
} else {
|
|
memset(&instance->options, 0, sizeof(instance->options));
|
|
}
|
|
instance->passthrough = passthrough;
|
|
|
|
cosmo_uuid(instance->instance_id);
|
|
if (client_id) {
|
|
strcpy(instance->client_id, client_id);
|
|
} else {
|
|
cosmo_uuid(instance->client_id);
|
|
cosmo_handle_client_id_change(instance);
|
|
}
|
|
|
|
instance->curl = curl_easy_init();
|
|
assert(instance->curl);
|
|
char api_url[strlen(base_url) + 5];
|
|
sprintf(api_url, "%s/api", base_url);
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_URL, api_url));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH"));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT_MS, CYCLE_MS));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_POST, 1L));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback));
|
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback));
|
|
|
|
instance->shutdown = false;
|
|
instance->profile = json_null();
|
|
instance->get_profile_head = NULL;
|
|
instance->generation = json_null();
|
|
instance->command_queue_head = instance->command_queue_tail = NULL;
|
|
instance->ack = json_array();
|
|
assert(instance->ack);
|
|
instance->subscriptions = json_array();
|
|
assert(instance->subscriptions);
|
|
instance->next_delay_ms = 0;
|
|
|
|
instance->connect_state = INITIAL_CONNECT;
|
|
instance->login_state = LOGIN_UNKNOWN;
|
|
instance->last_success.tv_sec = 0;
|
|
|
|
assert(!pthread_mutex_unlock(&instance->lock));
|
|
|
|
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
|
|
return instance;
|
|
}
|
|
|
|
void cosmo_shutdown(cosmo *instance) {
|
|
pthread_mutex_lock(&instance->lock);
|
|
instance->shutdown = true;
|
|
instance->next_delay_ms = 0;
|
|
pthread_cond_signal(&instance->cond);
|
|
pthread_mutex_unlock(&instance->lock);
|
|
assert(!pthread_join(instance->thread, NULL));
|
|
|
|
assert(!pthread_mutex_destroy(&instance->lock));
|
|
assert(!pthread_cond_destroy(&instance->cond));
|
|
struct cosmo_command *command_iter = instance->command_queue_head;
|
|
while (command_iter) {
|
|
json_decref(command_iter->command);
|
|
struct cosmo_command *next = command_iter->next;
|
|
free(command_iter);
|
|
command_iter = next;
|
|
}
|
|
json_decref(instance->ack);
|
|
json_decref(instance->subscriptions);
|
|
json_decref(instance->profile);
|
|
struct cosmo_get_profile *get_profile_iter = instance->get_profile_head;
|
|
while (get_profile_iter) {
|
|
struct cosmo_get_profile *next = get_profile_iter->next;
|
|
promise_fail(get_profile_iter->promise, NULL, NULL);
|
|
free(get_profile_iter);
|
|
get_profile_iter = next;
|
|
}
|
|
json_decref(instance->generation);
|
|
curl_easy_cleanup(instance->curl);
|
|
|
|
free(instance);
|
|
|
|
curl_global_cleanup();
|
|
}
|
|
|