Subscription state tracking, some parsing cleanups.
This commit is contained in:
@@ -15,6 +15,9 @@
|
|||||||
#define CYCLE_MS 10000
|
#define CYCLE_MS 10000
|
||||||
#define CYCLE_STAGGER_FACTOR 10
|
#define CYCLE_STAGGER_FACTOR 10
|
||||||
|
|
||||||
|
#define SUBSCRIPTION_PENDING 1
|
||||||
|
#define SUBSCRIPTION_ACTIVE 2
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char *send_buf;
|
char *send_buf;
|
||||||
size_t send_buf_len;
|
size_t send_buf_len;
|
||||||
@@ -123,27 +126,16 @@ static char *cosmo_send_http(cosmo *instance, char *request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
static void cosmo_handle_message(cosmo *instance, json_t *event) {
|
||||||
json_t *subject = json_object_get(event, "subject");
|
json_t *subject;
|
||||||
if (!subject) {
|
int id;
|
||||||
fprintf(stderr, "message event without subject\n");
|
char *message_content;
|
||||||
return;
|
if (json_unpack(event, "{sosiss}", "subject", &subject, "id", &id, "message", &message_content)) {
|
||||||
}
|
fprintf(stderr, "invalid message event\n");
|
||||||
|
|
||||||
json_t *id_json = json_object_get(event, "id");
|
|
||||||
if (!id_json) {
|
|
||||||
fprintf(stderr, "message event without id\n");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
json_int_t id = json_integer_value(id_json);
|
|
||||||
|
|
||||||
json_t *message_content = json_object_get(event, "message");
|
|
||||||
if (!message_content) {
|
|
||||||
fprintf(stderr, "message event without content\n");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
json_error_t err;
|
json_error_t err;
|
||||||
json_t *message_object = json_loads(json_string_value(message_content), JSON_DECODE_ANY, &err);
|
json_t *message_object = json_loads(message_content, JSON_DECODE_ANY, &err);
|
||||||
if (!message_object) {
|
if (!message_object) {
|
||||||
fprintf(stderr, "error parsing message content: %s\n", err.text);
|
fprintf(stderr, "error parsing message content: %s\n", err.text);
|
||||||
return;
|
return;
|
||||||
@@ -274,6 +266,17 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
|
|||||||
json_array_append(to_retry, command);
|
json_array_append(to_retry, command);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (!strcmp(json_string_value(json_object_get(command, "command")), "subscribe")) {
|
||||||
|
json_t *subject = NULL;
|
||||||
|
assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject));
|
||||||
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
|
if (subscription) {
|
||||||
|
// Might have unsubscribed later
|
||||||
|
json_object_set_new(subscription, "state", json_integer(SUBSCRIPTION_ACTIVE));
|
||||||
|
}
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
json_decref(commands);
|
json_decref(commands);
|
||||||
|
|
||||||
@@ -361,7 +364,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages
|
|||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
json_t *subscription = cosmo_find_subscription(instance, subject);
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
if (!subscription) {
|
if (!subscription) {
|
||||||
json_array_append_new(instance->subscriptions, json_pack("{sOs[]}", "subject", subject, "messages"));
|
json_array_append_new(instance->subscriptions, json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING));
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user