Handle incoming message events. Fix unsubscribe.

This commit is contained in:
Ian Gulliver
2015-06-06 19:17:42 -07:00
parent 75853c47ac
commit 45d75172bf

View File

@@ -34,7 +34,7 @@ static json_t *cosmo_find_subscription(cosmo *instance, json_t *subject) {
size_t i; size_t i;
json_t *subscription; json_t *subscription;
json_array_foreach(instance->subscriptions, i, subscription) { json_array_foreach(instance->subscriptions, i, subscription) {
if (json_equal(json_object_get(subscription, "subject"), subject) == 0) { if (json_equal(json_object_get(subscription, "subject"), subject)) {
return subscription; return subscription;
} }
} }
@@ -127,6 +127,54 @@ static char *cosmo_send_http(cosmo *instance, char *request) {
return ret ? transfer.recv_buf : NULL; return ret ? transfer.recv_buf : NULL;
} }
static void cosmo_handle_message(cosmo *instance, json_t *event) {
json_t *subject = json_object_get(event, "subject");
if (!subject) {
fprintf(stderr, "message event without subject\n");
return;
}
json_t *id_json = json_object_get(event, "id");
if (!id_json) {
fprintf(stderr, "message event without id\n");
return;
}
json_int_t id = json_integer_value(id_json);
assert(!pthread_mutex_lock(&instance->lock));
json_t *subscription = cosmo_find_subscription(instance, subject);
if (!subscription) {
// Normal to sometimes get events after unsubscribe.
assert(!pthread_mutex_unlock(&instance->lock));
return;
}
json_t *messages = json_object_get(subscription, "messages");
ssize_t insert_after;
for (insert_after = json_array_size(messages) - 1; insert_after >= 0; insert_after--) {
json_t *message = json_array_get(messages, insert_after);
json_int_t message_id = json_integer_value(json_object_get(message, "id"));
if (message_id == id) {
assert(!pthread_mutex_unlock(&instance->lock));
return;
}
if (message_id < id) {
break;
}
}
printf("new message: %lld\n", id);
json_array_insert(messages, insert_after + 1, event);
assert(!pthread_mutex_unlock(&instance->lock));
}
static void cosmo_handle_event(cosmo *instance, json_t *event) {
const char *event_type = json_string_value(json_object_get(event, "event_type"));
if (strcmp(event_type, "message") == 0) {
cosmo_handle_message(instance, event);
} else {
fprintf(stderr, "unknown event type: %s\n", event_type);
}
}
// Takes ownership of commands. // Takes ownership of commands.
static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) { static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
char *request = cosmo_build_rpc(instance, commands); char *request = cosmo_build_rpc(instance, commands);
@@ -178,8 +226,16 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands) {
continue; continue;
} }
} }
json_decref(commands); json_decref(commands);
json_t *events = json_object_get(received, "events");
if (events) {
json_t *event;
json_array_foreach(events, index, event) {
cosmo_handle_event(instance, event);
}
}
json_decref(received); json_decref(received);
return to_retry; return to_retry;
@@ -291,12 +347,15 @@ void cosmo_unsubscribe(cosmo *instance, json_t *subject) {
size_t i; size_t i;
json_t *subscription; json_t *subscription;
json_array_foreach(instance->subscriptions, i, subscription) { json_array_foreach(instance->subscriptions, i, subscription) {
if (json_equal(json_object_get(subscription, "subject"), subject) == 0) { if (json_equal(json_object_get(subscription, "subject"), subject)) {
json_array_remove(instance->subscriptions, i); json_array_remove(instance->subscriptions, i);
break; break;
} }
} }
assert(!pthread_mutex_unlock(&instance->lock)); assert(!pthread_mutex_unlock(&instance->lock));
json_t *arguments = json_pack("{sO}", "subject", subject);
cosmo_send_command(instance, cosmo_command("unsubscribe", arguments));
} }
void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message) { void cosmo_send_message(cosmo *instance, json_t *subject, json_t *message) {