disconnect callback

This commit is contained in:
Ian Gulliver
2015-06-18 05:43:05 +00:00
parent d0d0ed1e21
commit d89d1cc2f6
3 changed files with 60 additions and 3 deletions

View File

@@ -14,9 +14,12 @@
#define CYCLE_MS 10000
#define CYCLE_STAGGER_FACTOR 10
#define CONNECT_TIMEOUT_S 60
#define SUBSCRIPTION_PENDING 1
#define SUBSCRIPTION_ACTIVE 2
enum {
SUBSCRIPTION_PENDING,
SUBSCRIPTION_ACTIVE,
};
typedef struct {
char *send_buf;
@@ -90,7 +93,6 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) {
res = curl_easy_perform(instance->curl);
if (res) {
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
return false;
}
@@ -180,6 +182,16 @@ static void cosmo_handle_connect(cosmo *instance) {
}
}
static void cosmo_handle_disconnect(cosmo *instance) {
if (instance->connect_state == DISCONNECTED) {
return;
}
instance->connect_state = DISCONNECTED;
if (instance->callbacks.disconnect) {
instance->callbacks.disconnect(instance->passthrough);
}
}
static void cosmo_handle_logout(cosmo *instance, json_t *event) {
if (instance->login_state == LOGGED_OUT) {
return;
@@ -251,6 +263,7 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
instance->profile = strdup(profile);
}
assert(!clock_gettime(CLOCK_MONOTONIC, &instance->last_success));
cosmo_handle_connect(instance);
size_t index;
@@ -352,6 +365,13 @@ static void *cosmo_thread_main(void *arg) {
assert(!pthread_mutex_unlock(&instance->lock));
json_t *to_retry = cosmo_send_rpc(instance, commands, ack);
{
struct timespec now;
assert(!clock_gettime(CLOCK_MONOTONIC, &now));
if (now.tv_sec - instance->last_success.tv_sec > CONNECT_TIMEOUT_S) {
cosmo_handle_disconnect(instance);
}
}
assert(!pthread_mutex_lock(&instance->lock));
json_array_extend(instance->command_queue, to_retry);
@@ -526,6 +546,7 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal
instance->connect_state = INITIAL_CONNECT;
instance->login_state = LOGIN_UNKNOWN;
instance->last_success.tv_sec = 0;
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
return instance;

View File

@@ -6,11 +6,13 @@
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include <time.h>
#define COSMO_UUID_SIZE 37
typedef struct {
void (*connect)(void *);
void (*disconnect)(void *);
void (*logout)(void *);
void (*message)(const json_t *, void *);
} cosmo_callbacks;
@@ -36,6 +38,7 @@ typedef struct {
CONNECTED,
DISCONNECTED,
} connect_state;
struct timespec last_success;
enum {
LOGIN_UNKNOWN,

View File

@@ -17,6 +17,7 @@ typedef struct {
const json_t *last_message;
bool logout_fired;
bool connect_fired;
bool disconnect_fired;
} test_state;
@@ -28,6 +29,14 @@ void on_connect(void *passthrough) {
assert(!pthread_mutex_unlock(&state->lock));
}
void on_disconnect(void *passthrough) {
test_state *state = passthrough;
assert(!pthread_mutex_lock(&state->lock));
state->disconnect_fired = true;
assert(!pthread_cond_signal(&state->cond));
assert(!pthread_mutex_unlock(&state->lock));
}
void on_logout(void *passthrough) {
test_state *state = passthrough;
assert(!pthread_mutex_lock(&state->lock));
@@ -76,6 +85,16 @@ void wait_for_connect(test_state *state) {
assert(!pthread_mutex_unlock(&state->lock));
}
void wait_for_disconnect(test_state *state) {
assert(!pthread_mutex_lock(&state->lock));
while (!state->disconnect_fired) {
assert(!pthread_cond_wait(&state->cond, &state->lock));
}
state->disconnect_fired = false;
assert(!pthread_mutex_unlock(&state->lock));
}
test_state *create_test_state() {
test_state *ret = malloc(sizeof(test_state));
assert(ret);
@@ -85,6 +104,7 @@ test_state *create_test_state() {
ret->last_message = NULL;
ret->logout_fired = false;
ret->connect_fired = false;
ret->disconnect_fired = false;
return ret;
}
@@ -100,6 +120,7 @@ cosmo *create_client(test_state *state) {
cosmo_callbacks callbacks = {
.connect = on_connect,
.disconnect = on_disconnect,
.logout = on_logout,
.message = on_message,
};
@@ -163,6 +184,17 @@ bool test_connect_fires(test_state *state) {
return true;
}
bool test_disconnect_fires(test_state *state) {
cosmo *client = create_client(state);
wait_for_connect(state);
assert(!curl_easy_setopt(client->curl, CURLOPT_TIMEOUT_MS, 1));
wait_for_disconnect(state);
assert(!curl_easy_setopt(client->curl, CURLOPT_TIMEOUT_MS, 10000));
wait_for_connect(state);
cosmo_shutdown(client);
return true;
}
bool test_logout_fires(test_state *state) {
cosmo *client = create_client(state);
wait_for_logout(state);
@@ -200,6 +232,7 @@ bool test_resubscribe(test_state *state) {
int main(int argc, char *argv[]) {
RUN_TEST(test_create_destroy);
RUN_TEST(test_connect_fires);
RUN_TEST(test_disconnect_fires);
RUN_TEST(test_logout_fires);
RUN_TEST(test_message_round_trip);
RUN_TEST(test_resubscribe);