From 45d75172bf6a4789d1a5478d532950653fc5a87a Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sat, 6 Jun 2015 19:17:42 -0700 Subject: [PATCH] Handle incoming message events. Fix unsubscribe. --- clients/c/cosmopolite.c | 65 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 3 deletions(-) diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 686e040..4d97cc0 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -34,7 +34,7 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { size_t i; json_t *subscription; json_array_foreach(instance->subscriptions, i, subscription) { - if (json_equal(json_object_get(subscription, "subject"), subject) == 0) { + if (json_equal(json_object_get(subscription, "subject"), subject)) { return subscription; } } @@ -127,6 +127,54 @@ static char *cosmo_send_http(cosmo *instance, char *request) { return ret ? transfer.recv_buf : NULL; } +static void cosmo_handle_message(cosmo *instance, json_t *event) { + json_t *subject = json_object_get(event, "subject"); + if (!subject) { + fprintf(stderr, "message event without subject\n"); + return; + } + + json_t *id_json = json_object_get(event, "id"); + if (!id_json) { + fprintf(stderr, "message event without id\n"); + return; + } + json_int_t id = json_integer_value(id_json); + + assert(!pthread_mutex_lock(&instance->lock)); + json_t *subscription = cosmo_find_subscription(instance, subject); + if (!subscription) { + // Normal to sometimes get events after unsubscribe. + assert(!pthread_mutex_unlock(&instance->lock)); + return; + } + json_t *messages = json_object_get(subscription, "messages"); + ssize_t insert_after; + for (insert_after = json_array_size(messages) - 1; insert_after >= 0; insert_after--) { + json_t *message = json_array_get(messages, insert_after); + json_int_t message_id = json_integer_value(json_object_get(message, "id")); + if (message_id == id) { + assert(!pthread_mutex_unlock(&instance->lock)); + return; + } + if (message_id < id) { + break; + } + } + printf("new message: %lld\n", id); + json_array_insert(messages, insert_after + 1, event); + assert(!pthread_mutex_unlock(&instance->lock)); +} + +static void cosmo_handle_event(cosmo *instance, json_t *event) { + const char *event_type = json_string_value(json_object_get(event, "event_type")); + if (strcmp(event_type, "message") == 0) { + cosmo_handle_message(instance, event); + } else { + fprintf(stderr, "unknown event type: %s\n", event_type); + } +} + // Takes ownership of commands. static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { char *request = cosmo_build_rpc(instance, commands); @@ -178,8 +226,16 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { continue; } } - json_decref(commands); + + json_t *events = json_object_get(received, "events"); + if (events) { + json_t *event; + json_array_foreach(events, index, event) { + cosmo_handle_event(instance, event); + } + } + json_decref(received); return to_retry; @@ -291,12 +347,15 @@ void cosmo_unsubscribe(cosmo *instance, json_t *subject) { size_t i; json_t *subscription; json_array_foreach(instance->subscriptions, i, subscription) { - if (json_equal(json_object_get(subscription, "subject"), subject) == 0) { + if (json_equal(json_object_get(subscription, "subject"), subject)) { json_array_remove(instance->subscriptions, i); break; } } assert(!pthread_mutex_unlock(&instance->lock)); + + json_t *arguments = json_pack("{sO}", "subject", subject); + cosmo_send_command(instance, cosmo_command("unsubscribe", arguments)); } void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message) {