Implement subscription tracking, cosmo_unsubscribe()
This commit is contained in:
@@ -30,6 +30,17 @@ typedef struct {
|
|||||||
int64_t retry_after;
|
int64_t retry_after;
|
||||||
} cosmo_transfer;
|
} cosmo_transfer;
|
||||||
|
|
||||||
|
static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
|
||||||
|
size_t i;
|
||||||
|
json_t *subscription;
|
||||||
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
||||||
|
if (json_equal(json_object_get(subscription, "subject"), subject) == 0) {
|
||||||
|
return subscription;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
|
||||||
cosmo_transfer *transfer = userp;
|
cosmo_transfer *transfer = userp;
|
||||||
size_t to_write = min(transfer->send_buf_len, size * nmemb);
|
size_t to_write = min(transfer->send_buf_len, size * nmemb);
|
||||||
@@ -74,13 +85,13 @@ static char *cosmo_build_rpc(const cosmo *instance, const json_t *commands) {
|
|||||||
static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) {
|
static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) {
|
||||||
CURLcode res;
|
CURLcode res;
|
||||||
|
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_POSTFIELDSIZE, transfer->send_buf_len);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_POSTFIELDSIZE, transfer->send_buf_len));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer));
|
||||||
res = curl_easy_perform(instance->curl);
|
res = curl_easy_perform(instance->curl);
|
||||||
|
|
||||||
if (res != CURLE_OK) {
|
if (res) {
|
||||||
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
|
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -256,7 +267,7 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cosmo_subscribe(cosmo *instance, const json_t *subject, const json_int_t messages, const json_int_t last_id) {
|
void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) {
|
||||||
json_t *arguments = json_pack("{sO}", "subject", subject);
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
||||||
if (messages) {
|
if (messages) {
|
||||||
json_object_set_new(arguments, "messages", json_integer(messages));
|
json_object_set_new(arguments, "messages", json_integer(messages));
|
||||||
@@ -264,9 +275,30 @@ void cosmo_subscribe(cosmo *instance, const json_t *subject, const json_int_t me
|
|||||||
if (last_id) {
|
if (last_id) {
|
||||||
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
|
if (!subscription) {
|
||||||
|
json_array_append_new(instance->subscriptions, json_pack("{sOs[]s[]}", "subject", subject, "messages", "pins"));
|
||||||
|
}
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
cosmo_send_command(instance, cosmo_command("subscribe", arguments));
|
cosmo_send_command(instance, cosmo_command("subscribe", arguments));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cosmo_unsubscribe(cosmo *instance, json_t *subject) {
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
size_t i;
|
||||||
|
json_t *subscription;
|
||||||
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
||||||
|
if (json_equal(json_object_get(subscription, "subject"), subject) == 0) {
|
||||||
|
json_array_remove(instance->subscriptions, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
}
|
||||||
|
|
||||||
void cosmo_send_message(cosmo *instance, const json_t *subject, json_t *message) {
|
void cosmo_send_message(cosmo *instance, const json_t *subject, json_t *message) {
|
||||||
char sender_message_id[COSMO_UUID_SIZE];
|
char sender_message_id[COSMO_UUID_SIZE];
|
||||||
cosmo_uuid(sender_message_id);
|
cosmo_uuid(sender_message_id);
|
||||||
@@ -296,18 +328,23 @@ cosmo *cosmo_create(const char *base_url, const char *client_id) {
|
|||||||
assert(instance->curl);
|
assert(instance->curl);
|
||||||
char api_url[strlen(base_url) + 5];
|
char api_url[strlen(base_url) + 5];
|
||||||
sprintf(api_url, "%s/api", base_url);
|
sprintf(api_url, "%s/api", base_url);
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_URL, api_url);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_URL, api_url));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_PROTOCOLS, CURLPROTO_HTTPS));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTPS));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH");
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_SSL_CIPHER_LIST, "ECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH"));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_TIMEOUT, COSMO_CHECK_SECONDS));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_POST, 1L);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_POST, 1L));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_READFUNCTION, cosmo_read_callback));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEFUNCTION, cosmo_write_callback));
|
||||||
curl_easy_setopt(instance->curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback);
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERFUNCTION, cosmo_header_callback));
|
||||||
|
|
||||||
instance->shutdown = false;
|
instance->shutdown = false;
|
||||||
|
instance->profile = NULL;
|
||||||
instance->command_queue = json_array();
|
instance->command_queue = json_array();
|
||||||
|
assert(instance->command_queue);
|
||||||
|
instance->subscriptions = json_array();
|
||||||
|
assert(instance->subscriptions);
|
||||||
|
instance->next_delay_ms = 0;
|
||||||
|
|
||||||
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
|
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
|
||||||
return instance;
|
return instance;
|
||||||
@@ -324,6 +361,7 @@ void cosmo_shutdown(cosmo *instance) {
|
|||||||
assert(!pthread_mutex_destroy(&instance->lock));
|
assert(!pthread_mutex_destroy(&instance->lock));
|
||||||
assert(!pthread_cond_destroy(&instance->cond));
|
assert(!pthread_cond_destroy(&instance->cond));
|
||||||
json_decref(instance->command_queue);
|
json_decref(instance->command_queue);
|
||||||
|
json_decref(instance->subscriptions);
|
||||||
free(instance->profile);
|
free(instance->profile);
|
||||||
curl_easy_cleanup(instance->curl);
|
curl_easy_cleanup(instance->curl);
|
||||||
|
|
||||||
|
|||||||
@@ -15,10 +15,10 @@ typedef struct {
|
|||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
pthread_cond_t cond;
|
pthread_cond_t cond;
|
||||||
bool shutdown;
|
bool shutdown;
|
||||||
json_t *command_queue;
|
|
||||||
uint64_t next_delay_ms;
|
|
||||||
|
|
||||||
char *profile;
|
char *profile;
|
||||||
|
json_t *command_queue;
|
||||||
|
json_t *subscriptions;
|
||||||
|
uint64_t next_delay_ms;
|
||||||
|
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
CURL *curl;
|
CURL *curl;
|
||||||
@@ -32,11 +32,11 @@ void cosmo_shutdown(cosmo *instance);
|
|||||||
const char *cosmo_current_profile(cosmo *instance);
|
const char *cosmo_current_profile(cosmo *instance);
|
||||||
|
|
||||||
json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by);
|
json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by);
|
||||||
void cosmo_subscribe(cosmo *instance, const json_t *subject, const json_int_t messages, const json_int_t last_id);
|
void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id);
|
||||||
|
void cosmo_unsubscribe(cosmo *instance, json_t *subject);
|
||||||
void cosmo_send_message(cosmo *instance, const json_t *subject, json_t *message);
|
void cosmo_send_message(cosmo *instance, const json_t *subject, json_t *message);
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
void cosmo_unsubscribe(cosmo *instance, const json_t *subject);
|
|
||||||
json_t *cosmo_get_messages(cosmo *instance, const json_t *subject);
|
json_t *cosmo_get_messages(cosmo *instance, const json_t *subject);
|
||||||
json_t *cosmo_get_last_message(cosmo *instance, const json_t *subject);
|
json_t *cosmo_get_last_message(cosmo *instance, const json_t *subject);
|
||||||
json_t *cosmo_get_pins(cosmo *instance, const json_t *subject);
|
json_t *cosmo_get_pins(cosmo *instance, const json_t *subject);
|
||||||
|
|||||||
Reference in New Issue
Block a user