From 2790b6bce126ba1030c847a469c2d46cdb9811b3 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 16 Jun 2015 02:15:17 +0000 Subject: [PATCH] Subscription state tracking, some parsing cleanups. --- clients/c/cosmopolite.c | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index 3cf8a59..a4e3904 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -15,6 +15,9 @@ #define CYCLE_MS 10000 #define CYCLE_STAGGER_FACTOR 10 +#define SUBSCRIPTION_PENDING 1 +#define SUBSCRIPTION_ACTIVE 2 + typedef struct { char *send_buf; size_t send_buf_len; @@ -123,27 +126,16 @@ static char *cosmo_send_http(cosmo *instance, char *request) { } static void cosmo_handle_message(cosmo *instance, json_t *event) { - json_t *subject = json_object_get(event, "subject"); - if (!subject) { - fprintf(stderr, "message event without subject\n"); - return; - } - - json_t *id_json = json_object_get(event, "id"); - if (!id_json) { - fprintf(stderr, "message event without id\n"); - return; - } - json_int_t id = json_integer_value(id_json); - - json_t *message_content = json_object_get(event, "message"); - if (!message_content) { - fprintf(stderr, "message event without content\n"); + json_t *subject; + int id; + char *message_content; + if (json_unpack(event, "{sosiss}", "subject", &subject, "id", &id, "message", &message_content)) { + fprintf(stderr, "invalid message event\n"); return; } json_error_t err; - json_t *message_object = json_loads(json_string_value(message_content), JSON_DECODE_ANY, &err); + json_t *message_object = json_loads(message_content, JSON_DECODE_ANY, &err); if (!message_object) { fprintf(stderr, "error parsing message content: %s\n", err.text); return; @@ -274,6 +266,17 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { json_array_append(to_retry, command); continue; } + if (!strcmp(json_string_value(json_object_get(command, "command")), "subscribe")) { + json_t *subject = NULL; + assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject)); + assert(!pthread_mutex_lock(&instance->lock)); + json_t *subscription = cosmo_find_subscription(instance, subject); + if (subscription) { + // Might have unsubscribed later + json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE)); + } + assert(!pthread_mutex_unlock(&instance->lock)); + } } json_decref(commands); @@ -361,7 +364,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages assert(!pthread_mutex_lock(&instance->lock)); json_t *subscription = cosmo_find_subscription(instance, subject); if (!subscription) { - json_array_append_new(instance->subscriptions, json_pack("{sOs[]}", "subject", subject, "messages")); + json_array_append_new(instance->subscriptions, json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING)); } assert(!pthread_mutex_unlock(&instance->lock));