Bulk subscribe support (mainly just to mirror JS)

This commit is contained in:
Ian Gulliver
2015-06-20 01:16:50 +00:00
parent 65c118ce6c
commit aa5751c89d
3 changed files with 72 additions and 28 deletions

View File

@@ -43,6 +43,20 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
return NULL;
}
static void cosmo_send_command_locked(cosmo *instance, json_t *command) {
json_array_append_new(instance->command_queue, command);
instance->next_delay_ms = 0;
}
// Takes ownership of command.
static void cosmo_send_command(cosmo *instance, json_t *command) {
assert(command);
assert(!pthread_mutex_lock(&instance->lock));
cosmo_send_command_locked(instance, command);
assert(!pthread_cond_signal(&instance->cond));
assert(!pthread_mutex_unlock(&instance->lock));
}
static size_t cosmo_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
cosmo_transfer *transfer = userp;
size_t to_write = min(transfer->send_buf_len, size * nmemb);
@@ -326,8 +340,7 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
}
}
json_array_append_new(instance->command_queue, cosmo_command("subscribe", arguments));
instance->next_delay_ms = 0;
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments));
}
assert(!pthread_mutex_unlock(&instance->lock));
}
@@ -410,16 +423,6 @@ static void *cosmo_thread_main(void *arg) {
return NULL;
}
// Takes ownership of command.
static void cosmo_send_command(cosmo *instance, json_t *command) {
assert(command);
assert(!pthread_mutex_lock(&instance->lock));
json_array_append_new(instance->command_queue, command);
instance->next_delay_ms = 0;
assert(!pthread_cond_signal(&instance->cond));
assert(!pthread_mutex_unlock(&instance->lock));
}
// Public interface below
@@ -444,26 +447,40 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char
return ret;
}
void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) {
assert(!pthread_mutex_lock(&instance->lock));
json_t *subscription = cosmo_find_subscription(instance, subject);
if (!subscription) {
subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING);
json_array_append_new(instance->subscriptions, subscription);
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id) {
if (json_is_array(subjects)) {
json_incref(subjects);
} else {
subjects = json_pack("[O]", subjects);
assert(subjects);
}
json_t *arguments = json_pack("{sO}", "subject", subject);
if (messages) {
json_object_set_new(arguments, "messages", json_integer(messages));
json_object_set_new(subscription, "num_messages", json_integer(messages));
}
if (last_id) {
json_object_set_new(arguments, "last_id", json_integer(last_id));
json_object_set_new(subscription, "last_id", json_integer(last_id));
assert(!pthread_mutex_lock(&instance->lock));
size_t i;
json_t *subject;
json_array_foreach(subjects, i, subject) {
json_t *subscription = cosmo_find_subscription(instance, subject);
if (!subscription) {
subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING);
json_array_append_new(instance->subscriptions, subscription);
}
json_t *arguments = json_pack("{sO}", "subject", subject);
if (messages) {
json_object_set_new(arguments, "messages", json_integer(messages));
json_object_set_new(subscription, "num_messages", json_integer(messages));
}
if (last_id) {
json_object_set_new(arguments, "last_id", json_integer(last_id));
json_object_set_new(subscription, "last_id", json_integer(last_id));
}
cosmo_send_command_locked(instance, cosmo_command("subscribe", arguments));
}
assert(!pthread_mutex_unlock(&instance->lock));
cosmo_send_command(instance, cosmo_command("subscribe", arguments));
pthread_cond_signal(&instance->cond);
json_decref(subjects);
}
void cosmo_unsubscribe(cosmo *instance, json_t *subject) {

View File

@@ -28,7 +28,7 @@ void cosmo_shutdown(cosmo *instance);
const char *cosmo_current_profile(cosmo *instance);
json_t *cosmo_subject(const char *name, const char *readable_only_by, const char *writeable_only_by);
void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id);
void cosmo_subscribe(cosmo *instance, json_t *subjects, const json_int_t messages, const json_int_t last_id);
void cosmo_unsubscribe(cosmo *instance, json_t *subject);
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message);

View File

@@ -226,12 +226,39 @@ bool test_resubscribe(test_state *state) {
return true;
}
bool test_bulk_subscribe(test_state *state) {
cosmo *client = create_client(state);
json_t *subject1 = random_subject(NULL, NULL);
json_t *subject2 = random_subject(NULL, NULL);
json_t *subjects = json_pack("[oo]", subject1, subject2);
cosmo_subscribe(client, subjects, -1, 0);
json_t *message_out = random_message();
cosmo_send_message(client, subject1, message_out);
const json_t *message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
json_decref(message_out);
message_out = random_message();
cosmo_send_message(client, subject2, message_out);
message_in = wait_for_message(state);
assert(json_equal(message_out, json_object_get(message_in, "message")));
json_decref(message_out);
json_decref(subjects);
cosmo_shutdown(client);
return true;
}
int main(int argc, char *argv[]) {
RUN_TEST(test_create_shutdown);
RUN_TEST(test_connect_logout_fires);
RUN_TEST(test_message_round_trip);
RUN_TEST(test_resubscribe);
RUN_TEST(test_reconnect);
RUN_TEST(test_bulk_subscribe);
return 0;
}