Decode incoming message JSON.
This commit is contained in:
@@ -141,6 +141,20 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|||||||
}
|
}
|
||||||
json_int_t id = json_integer_value(id_json);
|
json_int_t id = json_integer_value(id_json);
|
||||||
|
|
||||||
|
json_t *message_content = json_object_get(event, "message");
|
||||||
|
if (!message_content) {
|
||||||
|
fprintf(stderr, "message event without content\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
json_error_t err;
|
||||||
|
json_t *message_object = json_loads(json_string_value(message_content), 0, &err);
|
||||||
|
if (!message_object) {
|
||||||
|
fprintf(stderr, "error parsing message content: %s\n", err.text);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
json_object_set_new(event, "message", message_object);
|
||||||
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
json_t *subscription = cosmo_find_subscription(instance, subject);
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
if (!subscription) {
|
if (!subscription) {
|
||||||
@@ -148,6 +162,7 @@ static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
|||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
json_t *messages = json_object_get(subscription, "messages");
|
json_t *messages = json_object_get(subscription, "messages");
|
||||||
ssize_t insert_after;
|
ssize_t insert_after;
|
||||||
for (insert_after = json_array_size(messages) - 1; insert_after >= 0; insert_after--) {
|
for (insert_after = json_array_size(messages) - 1; insert_after >= 0; insert_after--) {
|
||||||
|
|||||||
Reference in New Issue
Block a user