diff --git a/clients/c/test.c b/clients/c/test.c index 4a693ed..5ddf4e2 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -25,7 +25,7 @@ typedef struct { } test_state; -void on_client_id_change(void *passthrough, const char *client_id) { +static void on_client_id_change(void *passthrough, const char *client_id) { test_state *state = passthrough; assert(!pthread_mutex_lock(&state->lock)); state->client_id_change_fired = true; @@ -34,7 +34,7 @@ void on_client_id_change(void *passthrough, const char *client_id) { assert(!pthread_mutex_unlock(&state->lock)); } -void on_connect(void *passthrough) { +static void on_connect(void *passthrough) { test_state *state = passthrough; assert(!pthread_mutex_lock(&state->lock)); state->disconnect_fired = false; @@ -43,7 +43,7 @@ void on_connect(void *passthrough) { assert(!pthread_mutex_unlock(&state->lock)); } -void on_disconnect(void *passthrough) { +static void on_disconnect(void *passthrough) { test_state *state = passthrough; assert(!pthread_mutex_lock(&state->lock)); state->connect_fired = false; @@ -52,7 +52,7 @@ void on_disconnect(void *passthrough) { assert(!pthread_mutex_unlock(&state->lock)); } -void on_logout(void *passthrough) { +static void on_logout(void *passthrough) { test_state *state = passthrough; assert(!pthread_mutex_lock(&state->lock)); state->logout_fired = true; @@ -60,7 +60,7 @@ void on_logout(void *passthrough) { assert(!pthread_mutex_unlock(&state->lock)); } -void on_message(const json_t *message, void *passthrough) { +static void on_message(const json_t *message, void *passthrough) { test_state *state = passthrough; assert(!pthread_mutex_lock(&state->lock)); state->last_message = message; @@ -68,7 +68,7 @@ void on_message(const json_t *message, void *passthrough) { assert(!pthread_mutex_unlock(&state->lock)); } -void wait_for_client_id_change(test_state *state) { +static void wait_for_client_id_change(test_state *state) { assert(!pthread_mutex_lock(&state->lock)); while (!state->client_id_change_fired) { assert(!pthread_cond_wait(&state->cond, &state->lock)); @@ -80,7 +80,7 @@ void wait_for_client_id_change(test_state *state) { assert(strlen(state->client_id)); } -const json_t *wait_for_message(test_state *state) { +static const json_t *wait_for_message(test_state *state) { assert(!pthread_mutex_lock(&state->lock)); while (!state->last_message) { assert(!pthread_cond_wait(&state->cond, &state->lock)); @@ -92,7 +92,7 @@ const json_t *wait_for_message(test_state *state) { return ret; } -void wait_for_logout(test_state *state) { +static void wait_for_logout(test_state *state) { assert(!pthread_mutex_lock(&state->lock)); while (!state->logout_fired) { assert(!pthread_cond_wait(&state->cond, &state->lock)); @@ -102,7 +102,7 @@ void wait_for_logout(test_state *state) { assert(!pthread_mutex_unlock(&state->lock)); } -void wait_for_connect(test_state *state) { +static void wait_for_connect(test_state *state) { assert(!pthread_mutex_lock(&state->lock)); while (!state->connect_fired) { assert(!pthread_cond_wait(&state->cond, &state->lock)); @@ -112,7 +112,7 @@ void wait_for_connect(test_state *state) { assert(!pthread_mutex_unlock(&state->lock)); } -void wait_for_disconnect(test_state *state) { +static void wait_for_disconnect(test_state *state) { assert(!pthread_mutex_lock(&state->lock)); while (!state->disconnect_fired) { assert(!pthread_cond_wait(&state->cond, &state->lock)); @@ -122,7 +122,7 @@ void wait_for_disconnect(test_state *state) { assert(!pthread_mutex_unlock(&state->lock)); } -test_state *create_test_state() { +static test_state *create_test_state() { test_state *ret = malloc(sizeof(test_state)); assert(ret); @@ -137,13 +137,13 @@ test_state *create_test_state() { return ret; } -void destroy_test_state(test_state *state) { +static void destroy_test_state(test_state *state) { assert(!pthread_mutex_destroy(&state->lock)); assert(!pthread_cond_destroy(&state->cond)); free(state); } -cosmo *create_client(test_state *state) { +static cosmo *create_client(test_state *state) { cosmo_callbacks callbacks = { .client_id_change = on_client_id_change, .connect = on_connect, @@ -156,7 +156,7 @@ cosmo *create_client(test_state *state) { return ret; } -json_t *random_subject(const char *readable_only_by, const char *writeable_only_by) { +static json_t *random_subject(const char *readable_only_by, const char *writeable_only_by) { char uuid[COSMO_UUID_SIZE]; cosmo_uuid(uuid); char name[COSMO_UUID_SIZE + 20]; @@ -164,13 +164,13 @@ json_t *random_subject(const char *readable_only_by, const char *writeable_only_ return cosmo_subject(name, readable_only_by, writeable_only_by); } -json_t *random_message() { +static json_t *random_message() { char uuid[COSMO_UUID_SIZE]; cosmo_uuid(uuid); return json_string(uuid); } -void run_test(const char *func_name, bool (*test)(test_state *)) { +static void run_test(const char *func_name, bool (*test)(test_state *)) { test_state *state = create_test_state(); fprintf(stderr, ANSI_COLOR_YELLOW "%s" ANSI_COLOR_RESET ":\n", func_name); if (test(state)) { @@ -181,13 +181,13 @@ void run_test(const char *func_name, bool (*test)(test_state *)) { destroy_test_state(state); } -bool test_create_shutdown(test_state *state) { +static bool test_create_shutdown(test_state *state) { cosmo *client = create_client(state); cosmo_shutdown(client); return true; } -bool test_message_round_trip(test_state *state) { +static bool test_message_round_trip(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -205,14 +205,14 @@ bool test_message_round_trip(test_state *state) { return true; } -bool test_client_id_change_fires(test_state *state) { +static bool test_client_id_change_fires(test_state *state) { cosmo *client = create_client(state); wait_for_client_id_change(state); cosmo_shutdown(client); return true; } -bool test_connect_logout_fires(test_state *state) { +static bool test_connect_logout_fires(test_state *state) { cosmo *client = create_client(state); wait_for_connect(state); wait_for_logout(state); @@ -220,7 +220,7 @@ bool test_connect_logout_fires(test_state *state) { return true; } -bool test_reconnect(test_state *state) { +static bool test_reconnect(test_state *state) { cosmo *client = create_client(state); wait_for_connect(state); assert(!curl_easy_setopt(client->curl, CURLOPT_PORT, 444)); @@ -231,7 +231,7 @@ bool test_reconnect(test_state *state) { return true; } -bool test_resubscribe_after_reconnect(test_state *state) { +static bool test_resubscribe_after_reconnect(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -258,7 +258,7 @@ bool test_resubscribe_after_reconnect(test_state *state) { return true; } -bool test_bulk_subscribe(test_state *state) { +static bool test_bulk_subscribe(test_state *state) { cosmo *client = create_client(state); json_t *subject1 = random_subject(NULL, NULL); @@ -284,7 +284,7 @@ bool test_bulk_subscribe(test_state *state) { return true; } -bool test_complex_object(test_state *state) { +static bool test_complex_object(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -306,7 +306,7 @@ bool test_complex_object(test_state *state) { return true; } -bool test_send_message_promise(test_state *state) { +static bool test_send_message_promise(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -327,7 +327,7 @@ bool test_send_message_promise(test_state *state) { return true; } -bool test_subscribe_unsubscribe_promise(test_state *state) { +static bool test_subscribe_unsubscribe_promise(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -348,7 +348,7 @@ bool test_subscribe_unsubscribe_promise(test_state *state) { return true; } -bool test_getmessages_subscribe(test_state *state) { +static bool test_getmessages_subscribe(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -363,7 +363,7 @@ bool test_getmessages_subscribe(test_state *state) { return true; } -bool test_subscribe_barrier(test_state *state) { +static bool test_subscribe_barrier(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -391,7 +391,7 @@ bool test_subscribe_barrier(test_state *state) { return true; } -bool test_resubscribe(test_state *state) { +static bool test_resubscribe(test_state *state) { cosmo *client = create_client(state); json_t *subject = random_subject(NULL, NULL); @@ -411,6 +411,44 @@ bool test_resubscribe(test_state *state) { return true; } +static bool test_message_ordering(test_state *state) { + cosmo *client = create_client(state); + + json_t *subject = random_subject(NULL, NULL); + char *messages[] = {"A", "B", "C", "D"}; + + int i; + for (i = 0; i < (sizeof(messages) / sizeof(*messages)); i++) { + char *message = messages[i]; + json_t *message_out = json_string(message); + promise *promise_obj = promise_create(NULL, NULL, NULL); + cosmo_send_message(client, subject, message_out, promise_obj); + json_decref(message_out); + assert(promise_wait(promise_obj, NULL)); + promise_destroy(promise_obj); + } + + promise *promise_obj = promise_create(NULL, NULL, NULL); + cosmo_subscribe(client, subject, 1, 0, promise_obj); + assert(promise_wait(promise_obj, NULL)); + promise_destroy(promise_obj); + + promise_obj = promise_create(NULL, NULL, NULL); + cosmo_subscribe(client, subject, 2, 0, promise_obj); + assert(promise_wait(promise_obj, NULL)); + promise_destroy(promise_obj); + + json_t *messages_in = cosmo_get_messages(client, subject); + assert(messages_in); + assert(json_array_size(messages_in) == 2); + json_decref(messages_in); + + json_decref(subject); + + cosmo_shutdown(client); + return true; +} + int main(int argc, char *argv[]) { RUN_TEST(test_create_shutdown); RUN_TEST(test_client_id_change_fires); @@ -425,6 +463,7 @@ int main(int argc, char *argv[]) { RUN_TEST(test_getmessages_subscribe); RUN_TEST(test_subscribe_barrier); RUN_TEST(test_resubscribe); + RUN_TEST(test_message_ordering); return 0; }