diff --git a/clients/c/cosmopolite.c b/clients/c/cosmopolite.c index daf47ee..e210727 100644 --- a/clients/c/cosmopolite.c +++ b/clients/c/cosmopolite.c @@ -43,6 +43,20 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) { return NULL; } +static void cosmo_send_command_locked(cosmo *instance, json_t *command) { + json_array_append_new(instance->command_queue, command); + instance->next_delay_ms = 0; +} + +// Takes ownership of command. +static void cosmo_send_command(cosmo *instance, json_t *command) { + assert(command); + assert(!pthread_mutex_lock(&instance->lock)); + cosmo_send_command_locked(instance, command); + assert(!pthread_cond_signal(&instance->cond)); + assert(!pthread_mutex_unlock(&instance->lock)); +} + static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) { cosmo_transfer *transfer = userp; size_t to_write = min(transfer->send_buf_len, size * nmemb); @@ -326,8 +340,7 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) { } } - json_array_append_new(instance->command_queue, cosmo_command("subscribe", arguments)); - instance->next_delay_ms = 0; + cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments)); } assert(!pthread_mutex_unlock(&instance->lock)); } @@ -410,16 +423,6 @@ static void *cosmo_thread_main(void *arg) { return NULL; } -// Takes ownership of command. -static void cosmo_send_command(cosmo *instance, json_t *command) { - assert(command); - assert(!pthread_mutex_lock(&instance->lock)); - json_array_append_new(instance->command_queue, command); - instance->next_delay_ms = 0; - assert(!pthread_cond_signal(&instance->cond)); - assert(!pthread_mutex_unlock(&instance->lock)); -} - // Public interface below @@ -444,26 +447,40 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char return ret; } -void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) { - assert(!pthread_mutex_lock(&instance->lock)); - json_t *subscription = cosmo_find_subscription(instance, subject); - if (!subscription) { - subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING); - json_array_append_new(instance->subscriptions, subscription); +void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id) { + if (json_is_array(subjects)) { + json_incref(subjects); + } else { + subjects = json_pack("[O]", subjects); + assert(subjects); } - json_t *arguments = json_pack("{sO}", "subject", subject); - if (messages) { - json_object_set_new(arguments, "messages", json_integer(messages)); - json_object_set_new(subscription, "num_messages", json_integer(messages)); - } - if (last_id) { - json_object_set_new(arguments, "last_id", json_integer(last_id)); - json_object_set_new(subscription, "last_id", json_integer(last_id)); + assert(!pthread_mutex_lock(&instance->lock)); + size_t i; + json_t *subject; + json_array_foreach(subjects, i, subject) { + json_t *subscription = cosmo_find_subscription(instance, subject); + if (!subscription) { + subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING); + json_array_append_new(instance->subscriptions, subscription); + } + + json_t *arguments = json_pack("{sO}", "subject", subject); + if (messages) { + json_object_set_new(arguments, "messages", json_integer(messages)); + json_object_set_new(subscription, "num_messages", json_integer(messages)); + } + if (last_id) { + json_object_set_new(arguments, "last_id", json_integer(last_id)); + json_object_set_new(subscription, "last_id", json_integer(last_id)); + } + cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments)); } assert(!pthread_mutex_unlock(&instance->lock)); - cosmo_send_command(instance, cosmo_command("subscribe", arguments)); + pthread_cond_signal(&instance->cond); + + json_decref(subjects); } void cosmo_unsubscribe(cosmo *instance, json_t *subject) { diff --git a/clients/c/cosmopolite.h b/clients/c/cosmopolite.h index 96d6ec0..4526e9f 100644 --- a/clients/c/cosmopolite.h +++ b/clients/c/cosmopolite.h @@ -28,7 +28,7 @@ void cosmo_shutdown(cosmo *instance); const char *cosmo_current_profile(cosmo *instance); json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by); -void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id); +void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id); void cosmo_unsubscribe(cosmo *instance, json_t *subject); void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message); diff --git a/clients/c/test.c b/clients/c/test.c index 70b5936..2b5eeac 100644 --- a/clients/c/test.c +++ b/clients/c/test.c @@ -226,12 +226,39 @@ bool test_resubscribe(test_state *state) { return true; } +bool test_bulk_subscribe(test_state *state) { + cosmo *client = create_client(state); + + json_t *subject1 = random_subject(NULL, NULL); + json_t *subject2 = random_subject(NULL, NULL); + json_t *subjects = json_pack("[oo]", subject1, subject2); + cosmo_subscribe(client, subjects, -1, 0); + + json_t *message_out = random_message(); + cosmo_send_message(client, subject1, message_out); + const json_t *message_in = wait_for_message(state); + assert(json_equal(message_out, json_object_get(message_in, "message"))); + json_decref(message_out); + + message_out = random_message(); + cosmo_send_message(client, subject2, message_out); + message_in = wait_for_message(state); + assert(json_equal(message_out, json_object_get(message_in, "message"))); + json_decref(message_out); + + json_decref(subjects); + + cosmo_shutdown(client); + return true; +} + int main(int argc, char *argv[]) { RUN_TEST(test_create_shutdown); RUN_TEST(test_connect_logout_fires); RUN_TEST(test_message_round_trip); RUN_TEST(test_resubscribe); RUN_TEST(test_reconnect); + RUN_TEST(test_bulk_subscribe); return 0; }