diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 1c9da9d..4d4348d 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -30,6 +30,17 @@ typedef struct { int64_t retry_after; } cosmo_transfer; +static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { + size_t i; + json_t *subscription; + json_array_foreach(instance->subscriptions, i, subscription) { + if (json_equal(json_object_get(subscription, "subject"), subject) == 0) { + return subscription; + } + } + return NULL; +} + static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) { cosmo_transfer *transfer = userp; size_t to_write = min(transfer->send_buf_len, size * nmemb); @@ -74,13 +85,13 @@ static char *cosmo_build_rpc(const cosmo *instance, const json_t *commands) { static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) { CURLcode res; - curl_easy_setopt(instance->curl, CURLOPT_POSTFIELDSIZE, transfer->send_buf_len); - curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer); - curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer); - curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer); + assert(!curl_easy_setopt(instance->curl, CURLOPT_POSTFIELDSIZE, transfer->send_buf_len)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer)); res = curl_easy_perform(instance->curl); - if (res != CURLE_OK) { + if (res) { fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); return false; } @@ -256,7 +267,7 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char return ret; } -void cosmo_subscribe(cosmo *instance, const json_t *subject, const json_int_t messages, const json_int_t last_id) { +void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) { json_t *arguments = json_pack("{sO}", "subject", subject); if (messages) { json_object_set_new(arguments, "messages", json_integer(messages)); @@ -264,9 +275,30 @@ void cosmo_subscribe(cosmo *instance, const json_t *subject, const json_int_t me if (last_id) { json_object_set_new(arguments, "last_id", json_integer(last_id)); } + + assert(!pthread_mutex_lock(&instance->lock)); + json_t *subscription = cosmo_find_subscription(instance, subject); + if (!subscription) { + json_array_append_new(instance->subscriptions, json_pack("{sOs[]s[]}", "subject", subject, "messages", "pins")); + } + assert(!pthread_mutex_unlock(&instance->lock)); + cosmo_send_command(instance, cosmo_command("subscribe", arguments)); } +void cosmo_unsubscribe(cosmo *instance, json_t *subject) { + assert(!pthread_mutex_lock(&instance->lock)); + size_t i; + json_t *subscription; + json_array_foreach(instance->subscriptions, i, subscription) { + if (json_equal(json_object_get(subscription, "subject"), subject) == 0) { + json_array_remove(instance->subscriptions, i); + break; + } + } + assert(!pthread_mutex_unlock(&instance->lock)); +} + void cosmo_send_message(cosmo *instance, const json_t *subject, json_t *message) { char sender_message_id[COSMO_UUID_SIZE]; cosmo_uuid(sender_message_id); @@ -296,18 +328,23 @@ cosmo *cosmo_create(const char *base_url, const char *client_id) { assert(instance->curl); char api_url[strlen(base_url) + 5]; sprintf(api_url, "%s/api", base_url); - curl_easy_setopt(instance->curl, CURLOPT_URL, api_url); - curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); - curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS); - curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH"); - curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS); - curl_easy_setopt(instance->curl, CURLOPT_POST, 1L); - curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback); - curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback); - curl_easy_setopt(instance->curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback); + assert(!curl_easy_setopt(instance->curl, CURLOPT_URL, api_url)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH")); + assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_POST, 1L)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback)); + assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback)); instance->shutdown = false; + instance->profile = NULL; instance->command_queue = json_array(); + assert(instance->command_queue); + instance->subscriptions = json_array(); + assert(instance->subscriptions); + instance->next_delay_ms = 0; assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance)); return instance; @@ -324,6 +361,7 @@ void cosmo_shutdown(cosmo *instance) { assert(!pthread_mutex_destroy(&instance->lock)); assert(!pthread_cond_destroy(&instance->cond)); json_decref(instance->command_queue); + json_decref(instance->subscriptions); free(instance->profile); curl_easy_cleanup(instance->curl); diff --git a/clients/c/cosmopolite.h b/clients/c/cosmopolite.h index 980a0a0..9677fdb 100644 --- a/clients/c/cosmopolite.h +++ b/clients/c/cosmopolite.h @@ -15,10 +15,10 @@ typedef struct { pthread_mutex_t lock; pthread_cond_t cond; bool shutdown; - json_t *command_queue; - uint64_t next_delay_ms; - char *profile; + json_t *command_queue; + json_t *subscriptions; + uint64_t next_delay_ms; pthread_t thread; CURL *curl; @@ -32,11 +32,11 @@ void cosmo_shutdown(cosmo *instance); const char *cosmo_current_profile(cosmo *instance); json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by); -void cosmo_subscribe(cosmo *instance, const json_t *subject, const json_int_t messages, const json_int_t last_id); +void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id); +void cosmo_unsubscribe(cosmo *instance, json_t *subject); void cosmo_send_message(cosmo *instance, const json_t *subject, json_t *message); // TODO -void cosmo_unsubscribe(cosmo *instance, const json_t *subject); json_t *cosmo_get_messages(cosmo *instance, const json_t *subject); json_t *cosmo_get_last_message(cosmo *instance, const json_t *subject); json_t *cosmo_get_pins(cosmo *instance, const json_t *subject);