From be831f40d3cb386eea5515b1705ef14705471bf8 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 6 Jul 2015 04:12:33 +0000 Subject: [PATCH] Fix the thread locking model to be harder to screw up. --- clients/c/cosmopolite.c | 55 +++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index be8436c..7998f97 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -169,7 +169,10 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) { assert(!curl_easy_setopt(instance->curl, CURLOPT_READDATA, transfer)); assert(!curl_easy_setopt(instance->curl, CURLOPT_WRITEDATA, transfer)); assert(!curl_easy_setopt(instance->curl, CURLOPT_HEADERDATA, transfer)); + + assert(!pthread_mutex_unlock(&instance->lock)); res = curl_easy_perform(instance->curl); + assert(!pthread_mutex_lock(&instance->lock)); if (res) { return false; @@ -222,11 +225,9 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) { } json_object_set_new(event, "message", message_object); - assert(!pthread_mutex_lock(&instance->lock)); json_t *subscription = cosmo_find_subscription(instance, subject); if (!subscription) { cosmo_log(instance, "message from unknown subject"); - assert(!pthread_mutex_unlock(&instance->lock)); return; } @@ -236,7 +237,6 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) { json_t *message = json_array_get(messages, insert_after); json_int_t message_id = json_integer_value(json_object_get(message, "id")); if (message_id == id) { - assert(!pthread_mutex_unlock(&instance->lock)); return; } if (message_id < id) { @@ -244,18 +244,21 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) { } } json_array_insert(messages, insert_after + 1, event); - assert(!pthread_mutex_unlock(&instance->lock)); if (instance->callbacks.message) { cosmo_log(instance, "callbacks.message()"); + assert(!pthread_mutex_unlock(&instance->lock)); instance->callbacks.message(event, instance->passthrough); + assert(!pthread_mutex_lock(&instance->lock)); } } static void cosmo_handle_client_id_change(cosmo *instance) { if (instance->callbacks.client_id_change) { cosmo_log(instance, "callbacks.client_id_change()"); + assert(!pthread_mutex_unlock(&instance->lock)); instance->callbacks.client_id_change(instance->passthrough, instance->client_id); + assert(!pthread_mutex_lock(&instance->lock)); } } @@ -266,7 +269,9 @@ static void cosmo_handle_connect(cosmo *instance) { instance->connect_state = CONNECTED; if (instance->callbacks.connect) { cosmo_log(instance, "callbacks.connect()"); + assert(!pthread_mutex_unlock(&instance->lock)); instance->callbacks.connect(instance->passthrough); + assert(!pthread_mutex_lock(&instance->lock)); } } @@ -277,7 +282,9 @@ static void cosmo_handle_disconnect(cosmo *instance) { instance->connect_state = DISCONNECTED; if (instance->callbacks.disconnect) { cosmo_log(instance, "callbacks.disconnect()"); + assert(!pthread_mutex_unlock(&instance->lock)); instance->callbacks.disconnect(instance->passthrough); + assert(!pthread_mutex_lock(&instance->lock)); } } @@ -288,7 +295,9 @@ static void cosmo_handle_login(cosmo *instance, json_t *event) { instance->login_state = LOGGED_IN; if (instance->callbacks.login) { cosmo_log(instance, "callbacks.login()"); + assert(!pthread_mutex_unlock(&instance->lock)); instance->callbacks.login(instance->passthrough); + assert(!pthread_mutex_lock(&instance->lock)); } } @@ -299,7 +308,9 @@ static void cosmo_handle_logout(cosmo *instance, json_t *event) { instance->login_state = LOGGED_OUT; if (instance->callbacks.logout) { cosmo_log(instance, "callbacks.logout()"); + assert(!pthread_mutex_unlock(&instance->lock)); instance->callbacks.logout(instance->passthrough); + assert(!pthread_mutex_lock(&instance->lock)); } } @@ -327,30 +338,36 @@ static void cosmo_complete_subscribe(cosmo *instance, struct cosmo_command *comm if (strcmp(result, "ok")) { cosmo_remove_subscription(instance, subject); + assert(!pthread_mutex_unlock(&instance->lock)); promise_fail(command->promise, NULL, NULL); + assert(!pthread_mutex_lock(&instance->lock)); return; } - assert(!pthread_mutex_lock(&instance->lock)); json_t *subscription = cosmo_find_subscription(instance, subject); if (subscription) { // Might have unsubscribed later json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE)); } - assert(!pthread_mutex_unlock(&instance->lock)); + assert(!pthread_mutex_unlock(&instance->lock)); promise_succeed(command->promise, NULL, NULL); + assert(!pthread_mutex_lock(&instance->lock)); } static void cosmo_complete_unsubscribe(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { + assert(!pthread_mutex_unlock(&instance->lock)); promise_complete(command->promise, NULL, NULL, (strcmp(result, "ok") == 0)); + assert(!pthread_mutex_lock(&instance->lock)); } static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *command, json_t *response, char *result) { json_t *message; int err = json_unpack(response, "{so}", "message", &message); if (err || (strcmp(result, "ok") && strcmp(result, "duplicate_message"))) { + assert(!pthread_mutex_unlock(&instance->lock)); promise_fail(command->promise, NULL, NULL); + assert(!pthread_mutex_lock(&instance->lock)); } else { char *message_content; assert(!json_unpack(message, "{ss}", "message", &message_content)); @@ -359,7 +376,9 @@ static void cosmo_complete_send_message(cosmo *instance, struct cosmo_command *c json_object_set_new(message, "message", message_object); json_incref(message); + assert(!pthread_mutex_unlock(&instance->lock)); promise_succeed(command->promise, message, (promise_cleanup) json_decref); + assert(!pthread_mutex_lock(&instance->lock)); } } @@ -462,7 +481,9 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman while (get_profile_iter) { struct cosmo_get_profile *next = get_profile_iter->next; json_incref(instance->profile); + assert(!pthread_mutex_unlock(&instance->lock)); promise_succeed(get_profile_iter->promise, instance->profile, (promise_cleanup)json_decref); + assert(!pthread_mutex_lock(&instance->lock)); free(get_profile_iter); get_profile_iter = next; } @@ -484,14 +505,12 @@ static struct cosmo_command *cosmo_send_rpc(cosmo *instance, struct cosmo_comman if (json_unpack(poll_response, "{so}", "instance_generation", &instance_generation)) { cosmo_log(instance, "invalid poll response"); } else { - assert(!pthread_mutex_lock(&instance->lock)); if (!json_equal(instance_generation, instance->generation)) { json_decref(instance->generation); json_incref(instance_generation); instance->generation = instance_generation; cosmo_resubscribe(instance); } - assert(!pthread_mutex_unlock(&instance->lock)); } command_iter = commands; @@ -547,7 +566,6 @@ static void *cosmo_thread_main(void *arg) { instance->next_delay_ms = CYCLE_MS; instance->next_delay_ms += cosmo_random() % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR); - assert(!pthread_mutex_unlock(&instance->lock)); struct cosmo_command *to_retry = cosmo_send_rpc(instance, commands, ack); { struct timespec now; @@ -556,7 +574,6 @@ static void *cosmo_thread_main(void *arg) { cosmo_handle_disconnect(instance); } } - assert(!pthread_mutex_lock(&instance->lock)); if (to_retry) { to_retry->prev = instance->command_queue_tail; @@ -656,20 +673,18 @@ void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t message } cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments), promise_obj); } + assert(!pthread_cond_signal(&instance->cond)); assert(!pthread_mutex_unlock(&instance->lock)); - pthread_cond_signal(&instance->cond); - json_decref(subjects); } void cosmo_unsubscribe(cosmo *instance, json_t *subject, promise *promise_obj) { assert(!pthread_mutex_lock(&instance->lock)); cosmo_remove_subscription(instance, subject); - assert(!pthread_mutex_unlock(&instance->lock)); - json_t *arguments = json_pack("{sO}", "subject", subject); - cosmo_send_command(instance, cosmo_command("unsubscribe", arguments), promise_obj); + cosmo_send_command_locked(instance, cosmo_command("unsubscribe", arguments), promise_obj); + assert(!pthread_mutex_unlock(&instance->lock)); } void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message, promise *promise_obj) { @@ -719,6 +734,11 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal cosmo *instance = malloc(sizeof(cosmo)); assert(instance); + assert(!pthread_mutex_init(&instance->lock, NULL)); + assert(!pthread_cond_init(&instance->cond, NULL)); + + assert(!pthread_mutex_lock(&instance->lock)); + instance->debug = getenv("COSMO_DEBUG"); memcpy(&instance->callbacks, callbacks, sizeof(instance->callbacks)); @@ -737,9 +757,6 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal cosmo_handle_client_id_change(instance); } - assert(!pthread_mutex_init(&instance->lock, NULL)); - assert(!pthread_cond_init(&instance->cond, NULL)); - instance->curl = curl_easy_init(); assert(instance->curl); char api_url[strlen(base_url) + 5]; @@ -770,6 +787,8 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal instance->login_state = LOGIN_UNKNOWN; instance->last_success.tv_sec = 0; + assert(!pthread_mutex_unlock(&instance->lock)); + assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance)); return instance; }