diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 48cd5c0..103464e 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -14,9 +14,12 @@ #define CYCLE_MS 10000 #define CYCLE_STAGGER_FACTOR 10 +#define CONNECT_TIMEOUT_S 60 -#define SUBSCRIPTION_PENDING 1 -#define SUBSCRIPTION_ACTIVE 2 +enum { + SUBSCRIPTION_PENDING, + SUBSCRIPTION_ACTIVE, +}; typedef struct { char *send_buf; @@ -90,7 +93,6 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) { res = curl_easy_perform(instance->curl); if (res) { - fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); return false; } @@ -180,6 +182,16 @@ static void cosmo_handle_connect(cosmo *instance) { } } +static void cosmo_handle_disconnect(cosmo *instance) { + if (instance->connect_state == DISCONNECTED) { + return; + } + instance->connect_state = DISCONNECTED; + if (instance->callbacks.disconnect) { + instance->callbacks.disconnect(instance->passthrough); + } +} + static void cosmo_handle_logout(cosmo *instance, json_t *event) { if (instance->login_state == LOGGED_OUT) { return; @@ -251,6 +263,7 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { instance->profile = strdup(profile); } + assert(!clock_gettime(CLOCK_MONOTONIC, &instance->last_success)); cosmo_handle_connect(instance); size_t index; @@ -352,6 +365,13 @@ static void *cosmo_thread_main(void *arg) { assert(!pthread_mutex_unlock(&instance->lock)); json_t *to_retry = cosmo_send_rpc(instance, commands, ack); + { + struct timespec now; + assert(!clock_gettime(CLOCK_MONOTONIC, &now)); + if (now.tv_sec - instance->last_success.tv_sec > CONNECT_TIMEOUT_S) { + cosmo_handle_disconnect(instance); + } + } assert(!pthread_mutex_lock(&instance->lock)); json_array_extend(instance->command_queue, to_retry); @@ -526,6 +546,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal instance->connect_state = INITIAL_CONNECT; instance->login_state = LOGIN_UNKNOWN; + instance->last_success.tv_sec = 0; assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance)); return instance; diff --git a/clients/c/cosmopolite.h b/clients/c/cosmopolite.h index 88c3e84..53b34ee 100644 --- a/clients/c/cosmopolite.h +++ b/clients/c/cosmopolite.h @@ -6,11 +6,13 @@ #include #include #include +#include #define COSMO_UUID_SIZE 37 typedef struct { void (*connect)(void *); + void (*disconnect)(void *); void (*logout)(void *); void (*message)(const json_t *, void *); } cosmo_callbacks; @@ -36,6 +38,7 @@ typedef struct { CONNECTED, DISCONNECTED, } connect_state; + struct timespec last_success; enum { LOGIN_UNKNOWN, diff --git a/clients/c/test.c b/clients/c/test.c index 0eb0a08..5beb386 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -17,6 +17,7 @@ typedef struct { const json_t *last_message; bool logout_fired; bool connect_fired; + bool disconnect_fired; } test_state; @@ -28,6 +29,14 @@ void on_connect(void *passthrough) { assert(!pthread_mutex_unlock(&state->lock)); } +void on_disconnect(void *passthrough) { + test_state *state = passthrough; + assert(!pthread_mutex_lock(&state->lock)); + state->disconnect_fired = true; + assert(!pthread_cond_signal(&state->cond)); + assert(!pthread_mutex_unlock(&state->lock)); +} + void on_logout(void *passthrough) { test_state *state = passthrough; assert(!pthread_mutex_lock(&state->lock)); @@ -76,6 +85,16 @@ void wait_for_connect(test_state *state) { assert(!pthread_mutex_unlock(&state->lock)); } +void wait_for_disconnect(test_state *state) { + assert(!pthread_mutex_lock(&state->lock)); + while (!state->disconnect_fired) { + assert(!pthread_cond_wait(&state->cond, &state->lock)); + } + + state->disconnect_fired = false; + assert(!pthread_mutex_unlock(&state->lock)); +} + test_state *create_test_state() { test_state *ret = malloc(sizeof(test_state)); assert(ret); @@ -85,6 +104,7 @@ test_state *create_test_state() { ret->last_message = NULL; ret->logout_fired = false; ret->connect_fired = false; + ret->disconnect_fired = false; return ret; } @@ -100,6 +120,7 @@ cosmo *create_client(test_state *state) { cosmo_callbacks callbacks = { .connect = on_connect, + .disconnect = on_disconnect, .logout = on_logout, .message = on_message, }; @@ -163,6 +184,17 @@ bool test_connect_fires(test_state *state) { return true; } +bool test_disconnect_fires(test_state *state) { + cosmo *client = create_client(state); + wait_for_connect(state); + assert(!curl_easy_setopt(client->curl, CURLOPT_TIMEOUT_MS, 1)); + wait_for_disconnect(state); + assert(!curl_easy_setopt(client->curl, CURLOPT_TIMEOUT_MS, 10000)); + wait_for_connect(state); + cosmo_shutdown(client); + return true; +} + bool test_logout_fires(test_state *state) { cosmo *client = create_client(state); wait_for_logout(state); @@ -200,6 +232,7 @@ bool test_resubscribe(test_state *state) { int main(int argc, char *argv[]) { RUN_TEST(test_create_destroy); RUN_TEST(test_connect_fires); + RUN_TEST(test_disconnect_fires); RUN_TEST(test_logout_fires); RUN_TEST(test_message_round_trip); RUN_TEST(test_resubscribe);