Merge branch 'master' of github.com:flamingcowtv/cosmopolite
This commit is contained in:
@@ -225,29 +225,31 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
|
|||||||
}
|
}
|
||||||
free(response);
|
free(response);
|
||||||
|
|
||||||
json_t *profile = json_object_get(received, "profile");
|
json_t *command_responses, *events;
|
||||||
if (profile) {
|
char *profile;
|
||||||
if (!instance->profile || strcmp(instance->profile, json_string_value(profile))) {
|
if (json_unpack(received, "{sssoso}", "profile", &profile, "responses", &command_responses, "events", &events)) {
|
||||||
free(instance->profile);
|
fprintf(stderr, "invalid server response\n");
|
||||||
instance->profile = strdup(json_string_value(profile));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fprintf(stderr, "response lacks \"profile\" key\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
json_t *command_responses = json_object_get(received, "responses");
|
|
||||||
if (!command_responses) {
|
|
||||||
fprintf(stderr, "response lacks \"responses\" key\n");
|
|
||||||
return commands;
|
return commands;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t index;
|
if (!instance->profile || strcmp(instance->profile, profile)) {
|
||||||
|
free(instance->profile);
|
||||||
|
instance->profile = strdup(profile);
|
||||||
|
}
|
||||||
|
|
||||||
json_t *events = json_object_get(received, "events");
|
size_t index;
|
||||||
if (events) {
|
json_t *event;
|
||||||
json_t *event;
|
json_array_foreach(events, index, event) {
|
||||||
json_array_foreach(events, index, event) {
|
cosmo_handle_event(instance, event);
|
||||||
cosmo_handle_event(instance, event);
|
}
|
||||||
|
|
||||||
|
json_t *poll_response = json_array_get(command_responses, 0);
|
||||||
|
int new_poll;
|
||||||
|
if (json_unpack(poll_response, "{sb}", "new", &new_poll)) {
|
||||||
|
fprintf(stderr, "invalid poll response\n");
|
||||||
|
} else {
|
||||||
|
if (new_poll) {
|
||||||
|
// TODO: resubscribe to ACTIVE subscriptions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,17 +258,20 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
|
|||||||
json_array_foreach(commands, index, command) {
|
json_array_foreach(commands, index, command) {
|
||||||
// +1 for the poll offset from int_commands.
|
// +1 for the poll offset from int_commands.
|
||||||
json_t *command_response = json_array_get(command_responses, index + 1);
|
json_t *command_response = json_array_get(command_responses, index + 1);
|
||||||
json_t *result = json_object_get(command_response, "result");
|
char *result;
|
||||||
if (!result) {
|
if (json_unpack(command_response, "{ss}", "result", &result)) {
|
||||||
fprintf(stderr, "response lacks \"result\" key\n");
|
fprintf(stderr, "invalid command response\n");
|
||||||
json_array_append(to_retry, command);
|
json_array_append(to_retry, command);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!strcmp(json_string_value(result), "retry")) {
|
if (!strcmp(result, "retry")) {
|
||||||
json_array_append(to_retry, command);
|
json_array_append(to_retry, command);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!strcmp(json_string_value(json_object_get(command, "command")), "subscribe")) {
|
|
||||||
|
char *command_name;
|
||||||
|
assert(!json_unpack(command, "{ss}", "command", &command_name));
|
||||||
|
if (!strcmp(command_name, "subscribe")) {
|
||||||
json_t *subject = NULL;
|
json_t *subject = NULL;
|
||||||
assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject));
|
assert(!json_unpack(command, "{s{so}}", "arguments", "subject", &subject));
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
@@ -353,18 +358,21 @@ json_t *cosmo_subject(const char *name, const char *readable_only_by, const char
|
|||||||
}
|
}
|
||||||
|
|
||||||
void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) {
|
void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages, const json_int_t last_id) {
|
||||||
json_t *arguments = json_pack("{sO}", "subject", subject);
|
|
||||||
if (messages) {
|
|
||||||
json_object_set_new(arguments, "messages", json_integer(messages));
|
|
||||||
}
|
|
||||||
if (last_id) {
|
|
||||||
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(!pthread_mutex_lock(&instance->lock));
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
json_t *subscription = cosmo_find_subscription(instance, subject);
|
json_t *subscription = cosmo_find_subscription(instance, subject);
|
||||||
if (!subscription) {
|
if (!subscription) {
|
||||||
json_array_append_new(instance->subscriptions, json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING));
|
subscription = json_pack("{sOs[]si}", "subject", subject, "messages", "state", SUBSCRIPTION_PENDING);
|
||||||
|
json_array_append_new(instance->subscriptions, subscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
||||||
|
if (messages) {
|
||||||
|
json_object_set_new(arguments, "messages", json_integer(messages));
|
||||||
|
json_object_set_new(subscription, "messages", json_integer(messages));
|
||||||
|
}
|
||||||
|
if (last_id) {
|
||||||
|
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
||||||
|
json_object_set_new(subscription, "last_id", json_integer(last_id));
|
||||||
}
|
}
|
||||||
assert(!pthread_mutex_unlock(&instance->lock));
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user