Move to condition variable communication.

This commit is contained in:
Ian Gulliver
2015-06-03 20:53:27 -07:00
parent 9e4cefb27e
commit c4ed549342

View File

@@ -1,5 +1,6 @@
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
@@ -20,7 +21,9 @@ typedef struct {
char instance_id[COSMO_UUID_SIZE]; char instance_id[COSMO_UUID_SIZE];
pthread_mutex_t lock; pthread_mutex_t lock;
int shutdown; pthread_cond_t cond;
bool shutdown;
json_t *command_queue;
pthread_t thread; pthread_t thread;
} cosmo; } cosmo;
@@ -68,7 +71,7 @@ static char *cosmo_build_rpc(cosmo *instance) {
return ret; return ret;
} }
static int cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL *curl) { static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL *curl) {
CURLcode res; CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, "https://playground.cosmopolite.org/cosmopolite/api"); curl_easy_setopt(curl, CURLOPT_URL, "https://playground.cosmopolite.org/cosmopolite/api");
@@ -86,17 +89,17 @@ static int cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer, CURL *
if (res != CURLE_OK) { if (res != CURLE_OK) {
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
return -1; return false;
} }
long return_code; long return_code;
assert(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &return_code) == CURLE_OK); assert(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &return_code) == CURLE_OK);
if (return_code != 200) { if (return_code != 200) {
fprintf(stderr, "server returned error: %ld\n", return_code); fprintf(stderr, "server returned error: %ld\n", return_code);
return -1; return false;
} }
return 0; return true;
} }
// Takes ownership of request // Takes ownership of request
@@ -115,7 +118,7 @@ static char *cosmo_send_http(cosmo *instance, char *request) {
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
free(request); free(request);
return ret ? NULL : transfer.recv_buf; return ret ? transfer.recv_buf : NULL;
} }
static void cosmo_send_rpc(cosmo *instance) { static void cosmo_send_rpc(cosmo *instance) {
@@ -145,32 +148,10 @@ static void *cosmo_thread_main(void *arg) {
assert(!pthread_mutex_lock(&instance->lock)); assert(!pthread_mutex_lock(&instance->lock));
while (!instance->shutdown) { while (!instance->shutdown) {
pthread_mutex_unlock(&instance->lock);
{
time_t t1, t2;
assert(time(&t1) != -1);
cosmo_send_rpc(instance); cosmo_send_rpc(instance);
assert(!pthread_cond_wait(&instance->cond, &instance->lock));
pthread_mutex_lock(&instance->lock);
if (instance->shutdown) {
break;
} }
pthread_mutex_unlock(&instance->lock); assert(!pthread_mutex_unlock(&instance->lock));
assert(time(&t2) != -1);
time_t elapsed = t2 - t1;
time_t to_wait = COSMO_CHECK_SECONDS - elapsed;
if (to_wait > 0) {
sleep(to_wait);
}
}
pthread_mutex_lock(&instance->lock);
}
pthread_mutex_unlock(&instance->lock);
return NULL; return NULL;
} }
@@ -186,7 +167,9 @@ cosmo *cosmo_create(char *client_id) {
strcpy(instance->client_id, client_id); strcpy(instance->client_id, client_id);
cosmo_generate_uuid(instance->instance_id); cosmo_generate_uuid(instance->instance_id);
assert(!pthread_mutex_init(&instance->lock, NULL)); assert(!pthread_mutex_init(&instance->lock, NULL));
instance->shutdown = 0; assert(!pthread_cond_init(&instance->cond, NULL));
instance->shutdown = false;
instance->command_queue = json_array();
assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance)); assert(!pthread_create(&instance->thread, NULL, cosmo_thread_main, instance));
return instance; return instance;
@@ -195,9 +178,12 @@ cosmo *cosmo_create(char *client_id) {
void cosmo_destroy(cosmo *instance) { void cosmo_destroy(cosmo *instance) {
pthread_mutex_lock(&instance->lock); pthread_mutex_lock(&instance->lock);
instance->shutdown = 1; instance->shutdown = 1;
pthread_cond_signal(&instance->cond);
pthread_mutex_unlock(&instance->lock); pthread_mutex_unlock(&instance->lock);
assert(!pthread_join(instance->thread, NULL)); assert(!pthread_join(instance->thread, NULL));
assert(!pthread_mutex_destroy(&instance->lock)); assert(!pthread_mutex_destroy(&instance->lock));
assert(!pthread_cond_destroy(&instance->cond));
json_decref(instance->command_queue);
free(instance); free(instance);
curl_global_cleanup(); curl_global_cleanup();
@@ -208,7 +194,7 @@ int main(int argc, char *argv[]) {
char client_id[COSMO_CLIENT_ID_SIZE]; char client_id[COSMO_CLIENT_ID_SIZE];
cosmo_generate_client_id(client_id); cosmo_generate_client_id(client_id);
cosmo *instance = cosmo_create(client_id); cosmo *instance = cosmo_create(client_id);
sleep(60); sleep(5);
cosmo_destroy(instance); cosmo_destroy(instance);
return 0; return 0;