Fix the thread locking model to be harder to screw up.
This commit is contained in:
@@ -169,7 +169,10 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) {
|
|||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer));
|
||||||
assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer));
|
assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer));
|
||||||
|
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
res = curl_easy_perform(instance->curl);
|
res = curl_easy_perform(instance->curl);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
return false;
|
return false;
|
||||||
@@ -222,11 +225,9 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|||||||
}
|
}
|
||||||
json_object_set_new(event, "message", message_object);
|
json_object_set_new(event, "message", message_object);
|
||||||
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
|
||||||
json_t *subscription = cosmo_find_subscription(instance, subject);
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
if (!subscription) {
|
if (!subscription) {
|
||||||
cosmo_log(instance, "message from unknown subject");
|
cosmo_log(instance, "message from unknown subject");
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -236,7 +237,6 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|||||||
json_t *message = json_array_get(messages, insert_after);
|
json_t *message = json_array_get(messages, insert_after);
|
||||||
json_int_t message_id = json_integer_value(json_object_get(message, "id"));
|
json_int_t message_id = json_integer_value(json_object_get(message, "id"));
|
||||||
if (message_id == id) {
|
if (message_id == id) {
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (message_id < id) {
|
if (message_id < id) {
|
||||||
@@ -244,18 +244,21 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
json_array_insert(messages, insert_after + 1, event);
|
json_array_insert(messages, insert_after + 1, event);
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
|
|
||||||
if (instance->callbacks.message) {
|
if (instance->callbacks.message) {
|
||||||
cosmo_log(instance, "callbacks.message()");
|
cosmo_log(instance, "callbacks.message()");
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
instance->callbacks.message(event, instance->passthrough);
|
instance->callbacks.message(event, instance->passthrough);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_handle_client_id_change(cosmo *instance) {
|
static void cosmo_handle_client_id_change(cosmo *instance) {
|
||||||
if (instance->callbacks.client_id_change) {
|
if (instance->callbacks.client_id_change) {
|
||||||
cosmo_log(instance, "callbacks.client_id_change()");
|
cosmo_log(instance, "callbacks.client_id_change()");
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
instance->callbacks.client_id_change(instance->passthrough, instance->client_id);
|
instance->callbacks.client_id_change(instance->passthrough, instance->client_id);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -266,7 +269,9 @@ static void cosmo_handle_connect(cosmo *instance) {
|
|||||||
instance->connect_state = CONNECTED;
|
instance->connect_state = CONNECTED;
|
||||||
if (instance->callbacks.connect) {
|
if (instance->callbacks.connect) {
|
||||||
cosmo_log(instance, "callbacks.connect()");
|
cosmo_log(instance, "callbacks.connect()");
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
instance->callbacks.connect(instance->passthrough);
|
instance->callbacks.connect(instance->passthrough);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -277,7 +282,9 @@ static void cosmo_handle_disconnect(cosmo *instance) {
|
|||||||
instance->connect_state = DISCONNECTED;
|
instance->connect_state = DISCONNECTED;
|
||||||
if (instance->callbacks.disconnect) {
|
if (instance->callbacks.disconnect) {
|
||||||
cosmo_log(instance, "callbacks.disconnect()");
|
cosmo_log(instance, "callbacks.disconnect()");
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
instance->callbacks.disconnect(instance->passthrough);
|
instance->callbacks.disconnect(instance->passthrough);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,7 +295,9 @@ static void cosmo_handle_login(cosmo *instance, json_t *event) {
|
|||||||
instance->login_state = LOGGED_IN;
|
instance->login_state = LOGGED_IN;
|
||||||
if (instance->callbacks.login) {
|
if (instance->callbacks.login) {
|
||||||
cosmo_log(instance, "callbacks.login()");
|
cosmo_log(instance, "callbacks.login()");
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
instance->callbacks.login(instance->passthrough);
|
instance->callbacks.login(instance->passthrough);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -299,7 +308,9 @@ static void cosmo_handle_logout(cosmo *instance, json_t *event) {
|
|||||||
instance->login_state = LOGGED_OUT;
|
instance->login_state = LOGGED_OUT;
|
||||||
if (instance->callbacks.logout) {
|
if (instance->callbacks.logout) {
|
||||||
cosmo_log(instance, "callbacks.logout()");
|
cosmo_log(instance, "callbacks.logout()");
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
instance->callbacks.logout(instance->passthrough);
|
instance->callbacks.logout(instance->passthrough);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -327,30 +338,36 @@ static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *comm
|
|||||||
|
|
||||||
if (strcmp(result, "ok")) {
|
if (strcmp(result, "ok")) {
|
||||||
cosmo_remove_subscription(instance, subject);
|
cosmo_remove_subscription(instance, subject);
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
promise_fail(command->promise, NULL, NULL);
|
promise_fail(command->promise, NULL, NULL);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
|
||||||
json_t *subscription = cosmo_find_subscription(instance, subject);
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
if (subscription) {
|
if (subscription) {
|
||||||
// Might have unsubscribed later
|
// Might have unsubscribed later
|
||||||
json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE));
|
json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE));
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
|
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
promise_succeed(command->promise, NULL, NULL);
|
promise_succeed(command->promise, NULL, NULL);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
promise_complete(command->promise, NULL, NULL, (strcmp(result, "ok") == 0));
|
promise_complete(command->promise, NULL, NULL, (strcmp(result, "ok") == 0));
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) {
|
||||||
json_t *message;
|
json_t *message;
|
||||||
int err = json_unpack(response, "{so}", "message", &message);
|
int err = json_unpack(response, "{so}", "message", &message);
|
||||||
if (err || (strcmp(result, "ok") && strcmp(result, "duplicate_message"))) {
|
if (err || (strcmp(result, "ok") && strcmp(result, "duplicate_message"))) {
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
promise_fail(command->promise, NULL, NULL);
|
promise_fail(command->promise, NULL, NULL);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
} else {
|
} else {
|
||||||
char *message_content;
|
char *message_content;
|
||||||
assert(!json_unpack(message, "{ss}", "message", &message_content));
|
assert(!json_unpack(message, "{ss}", "message", &message_content));
|
||||||
@@ -359,7 +376,9 @@ static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *c
|
|||||||
json_object_set_new(message, "message", message_object);
|
json_object_set_new(message, "message", message_object);
|
||||||
|
|
||||||
json_incref(message);
|
json_incref(message);
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
promise_succeed(command->promise, message, (promise_cleanup) json_decref);
|
promise_succeed(command->promise, message, (promise_cleanup) json_decref);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -462,7 +481,9 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman
|
|||||||
while (get_profile_iter) {
|
while (get_profile_iter) {
|
||||||
struct cosmo_get_profile *next = get_profile_iter->next;
|
struct cosmo_get_profile *next = get_profile_iter->next;
|
||||||
json_incref(instance->profile);
|
json_incref(instance->profile);
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
promise_succeed(get_profile_iter->promise, instance->profile, (promise_cleanup)json_decref);
|
promise_succeed(get_profile_iter->promise, instance->profile, (promise_cleanup)json_decref);
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
free(get_profile_iter);
|
free(get_profile_iter);
|
||||||
get_profile_iter = next;
|
get_profile_iter = next;
|
||||||
}
|
}
|
||||||
@@ -484,14 +505,12 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman
|
|||||||
if (json_unpack(poll_response, "{so}", "instance_generation", &instance_generation)) {
|
if (json_unpack(poll_response, "{so}", "instance_generation", &instance_generation)) {
|
||||||
cosmo_log(instance, "invalid poll response");
|
cosmo_log(instance, "invalid poll response");
|
||||||
} else {
|
} else {
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
|
||||||
if (!json_equal(instance_generation, instance->generation)) {
|
if (!json_equal(instance_generation, instance->generation)) {
|
||||||
json_decref(instance->generation);
|
json_decref(instance->generation);
|
||||||
json_incref(instance_generation);
|
json_incref(instance_generation);
|
||||||
instance->generation = instance_generation;
|
instance->generation = instance_generation;
|
||||||
cosmo_resubscribe(instance);
|
cosmo_resubscribe(instance);
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
command_iter = commands;
|
command_iter = commands;
|
||||||
@@ -547,7 +566,6 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
instance->next_delay_ms = CYCLE_MS;
|
instance->next_delay_ms = CYCLE_MS;
|
||||||
instance->next_delay_ms += cosmo_random() % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR);
|
instance->next_delay_ms += cosmo_random() % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR);
|
||||||
|
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
struct cosmo_command *to_retry = cosmo_send_rpc(instance, commands, ack);
|
struct cosmo_command *to_retry = cosmo_send_rpc(instance, commands, ack);
|
||||||
{
|
{
|
||||||
struct timespec now;
|
struct timespec now;
|
||||||
@@ -556,7 +574,6 @@ static void *cosmo_thread_main(void *arg) {
|
|||||||
cosmo_handle_disconnect(instance);
|
cosmo_handle_disconnect(instance);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
|
||||||
|
|
||||||
if (to_retry) {
|
if (to_retry) {
|
||||||
to_retry->prev = instance->command_queue_tail;
|
to_retry->prev = instance->command_queue_tail;
|
||||||
@@ -656,20 +673,18 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message
|
|||||||
}
|
}
|
||||||
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), promise_obj);
|
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), promise_obj);
|
||||||
}
|
}
|
||||||
|
assert(!pthread_cond_signal(&instance->cond));
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
pthread_cond_signal(&instance->cond);
|
|
||||||
|
|
||||||
json_decref(subjects);
|
json_decref(subjects);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
|
void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) {
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
cosmo_remove_subscription(instance, subject);
|
cosmo_remove_subscription(instance, subject);
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
|
||||||
|
|
||||||
json_t *arguments = json_pack("{sO}", "subject", subject);
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
||||||
cosmo_send_command(instance, cosmo_command("unsubscribe", arguments), promise_obj);
|
cosmo_send_command_locked(instance, cosmo_command("unsubscribe", arguments), promise_obj);
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) {
|
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) {
|
||||||
@@ -719,6 +734,11 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
|
|||||||
cosmo *instance = malloc(sizeof(cosmo));
|
cosmo *instance = malloc(sizeof(cosmo));
|
||||||
assert(instance);
|
assert(instance);
|
||||||
|
|
||||||
|
assert(!pthread_mutex_init(&instance->lock, NULL));
|
||||||
|
assert(!pthread_cond_init(&instance->cond, NULL));
|
||||||
|
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
|
||||||
instance->debug = getenv("COSMO_DEBUG");
|
instance->debug = getenv("COSMO_DEBUG");
|
||||||
|
|
||||||
memcpy(&instance->callbacks, callbacks, sizeof(instance->callbacks));
|
memcpy(&instance->callbacks, callbacks, sizeof(instance->callbacks));
|
||||||
@@ -737,9 +757,6 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
|
|||||||
cosmo_handle_client_id_change(instance);
|
cosmo_handle_client_id_change(instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(!pthread_mutex_init(&instance->lock, NULL));
|
|
||||||
assert(!pthread_cond_init(&instance->cond, NULL));
|
|
||||||
|
|
||||||
instance->curl = curl_easy_init();
|
instance->curl = curl_easy_init();
|
||||||
assert(instance->curl);
|
assert(instance->curl);
|
||||||
char api_url[strlen(base_url) + 5];
|
char api_url[strlen(base_url) + 5];
|
||||||
@@ -770,6 +787,8 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
|
|||||||
instance->login_state = LOGIN_UNKNOWN;
|
instance->login_state = LOGIN_UNKNOWN;
|
||||||
instance->last_success.tv_sec = 0;
|
instance->last_success.tv_sec = 0;
|
||||||
|
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
|
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user