diff --git a/clients/c/Makefile b/clients/c/Makefile index 2c77cce..bfad189 100644 --- a/clients/c/Makefile +++ b/clients/c/Makefile @@ -5,9 +5,6 @@ LIBS ?= -lcurl -ljansson -luuid -lpthread all: libcosmopolite.so -test: test.o cosmopolite.o - $(CC) $(LDFLAGS) -o test test.o cosmopolite.o $(LIBS) - libcosmopolite.so: cosmopolite.o $(CC) -shared $(LDFLAGS) -o libcosmopolite.so cosmopolite.o $(LIBS) @@ -23,5 +20,8 @@ install: libcosmopolite.so cosmopolite.h clean: rm -f test libcosmopolite.so *.o -valgrind: +test: test.o cosmopolite.o + $(CC) $(LDFLAGS) -o test test.o cosmopolite.o $(LIBS) + +runtest: test valgrind --leak-check=full --show-reachable=yes --num-callers=20 --suppressions=suppressions ./test diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 7b80853..6ff7fc5 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -180,6 +180,11 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) { } static void cosmo_handle_event(cosmo *instance, json_t *event) { + json_t *event_id = json_object_get(event, "event_id"); + if (event_id) { + json_array_append(instance->ack, event_id); + } + const char *event_type = json_string_value(json_object_get(event, "event_type")); if (strcmp(event_type, "message") == 0) { cosmo_handle_message(instance, event); @@ -194,13 +199,19 @@ static json_t *cosmo_command(const char *name, const json_t *arguments) { } // Takes ownership of commands. -static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { - // Always poll. - json_array_append_new(commands, cosmo_command("poll", json_array())); +// Takes ownership of ack. +static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { + json_t *int_commands = json_array(); - char *request = cosmo_build_rpc(instance, commands); + // Always poll. + json_t *arguments = json_pack("{so}", "ack", ack); + json_array_append_new(int_commands, cosmo_command("poll", arguments)); + json_array_extend(int_commands, commands); + + char *request = cosmo_build_rpc(instance, int_commands); char *response = cosmo_send_http(instance, request); + json_decref(int_commands); if (!response) { return commands; } @@ -243,7 +254,8 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { json_t *to_retry = json_array(); json_t *command; json_array_foreach(commands, index, command) { - json_t *command_response = json_array_get(command_responses, index); + // +1 for the poll offset from int_commands. + json_t *command_response = json_array_get(command_responses, index + 1); json_t *result = json_object_get(command_response, "result"); if (!result) { fprintf(stderr, "response lacks \"result\" key\n"); @@ -251,24 +263,6 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { continue; } if (!strcmp(json_string_value(result), "retry")) { - // One giant hack to change queued subscribe messages to ask for less data. - // Remove this once we get channels working; it's not worth the yuck. - const char *name = json_string_value(json_object_get(command, "command")); - if (!strcmp(name, "subscribe")) { - json_t *arguments = json_object_get(command, "arguments"); - json_t *subject = json_object_get(arguments, "subject"); - json_t *last_message = cosmo_get_last_message(instance, subject); - if (last_message) { - json_t *last_id = json_object_get(arguments, "last_id"); - json_int_t last_id_val = last_id ? json_integer_value(last_id) : 0; - json_int_t local_last_id = json_integer_value(json_object_get(last_message, "id")); - if (local_last_id > last_id_val) { - json_object_del(arguments, "messages"); - json_object_set_new(arguments, "last_id", json_integer(local_last_id)); - } - json_decref(last_message); - } - } json_array_append(to_retry, command); continue; } @@ -287,12 +281,14 @@ static void *cosmo_thread_main(void *arg) { while (!instance->shutdown) { json_t *commands = instance->command_queue; instance->command_queue = json_array(); + json_t *ack = instance->ack; + instance->ack = json_array(); instance->next_delay_ms = CYCLE_MS; instance->next_delay_ms += rand_r(&instance->seedp) % (instance->next_delay_ms / CYCLE_STAGGER_FACTOR); assert(!pthread_mutex_unlock(&instance->lock)); - json_t *to_retry = cosmo_send_rpc(instance, commands); + json_t *to_retry = cosmo_send_rpc(instance, commands, ack); assert(!pthread_mutex_lock(&instance->lock)); json_array_extend(instance->command_queue, to_retry); @@ -456,6 +452,8 @@ cosmo *cosmo_create(const char *base_url, const char *client_id, const cosmo_cal instance->profile = NULL; instance->command_queue = json_array(); assert(instance->command_queue); + instance->ack = json_array(); + assert(instance->ack); instance->subscriptions = json_array(); assert(instance->subscriptions); instance->next_delay_ms = 0; @@ -475,6 +473,7 @@ void cosmo_shutdown(cosmo *instance) { assert(!pthread_mutex_destroy(&instance->lock)); assert(!pthread_cond_destroy(&instance->cond)); json_decref(instance->command_queue); + json_decref(instance->ack); json_decref(instance->subscriptions); free(instance->profile); curl_easy_cleanup(instance->curl); diff --git a/clients/c/cosmopolite.h b/clients/c/cosmopolite.h index b1b9071..77c02f5 100644 --- a/clients/c/cosmopolite.h +++ b/clients/c/cosmopolite.h @@ -24,6 +24,7 @@ typedef struct { bool shutdown; char *profile; json_t *command_queue; + json_t *ack; json_t *subscriptions; uint64_t next_delay_ms; unsigned int seedp; diff --git a/clients/c/test.c b/clients/c/test.c index 53f81e8..2c14d72 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -87,6 +87,7 @@ bool test_message_round_trip() { json_decref(subject); json_decref(message_out); + cosmo_shutdown(client); return true; }