Add resubscription after reconnect, with a test.
This commit is contained in:
@@ -97,7 +97,6 @@ static bool cosmo_send_http_int(cosmo *instance, cosmo_transfer *transfer) {
|
|||||||
long return_code;
|
long return_code;
|
||||||
assert(curl_easy_getinfo(instance->curl, CURLINFO_RESPONSE_CODE, &return_code) == CURLE_OK);
|
assert(curl_easy_getinfo(instance->curl, CURLINFO_RESPONSE_CODE, &return_code) == CURLE_OK);
|
||||||
if (return_code != 200) {
|
if (return_code != 200) {
|
||||||
fprintf(stderr, "server returned error: %ld\n", return_code);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,6 +228,7 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
|
|||||||
char *profile;
|
char *profile;
|
||||||
if (json_unpack(received, "{sssoso}", "profile", &profile, "responses", &command_responses, "events", &events)) {
|
if (json_unpack(received, "{sssoso}", "profile", &profile, "responses", &command_responses, "events", &events)) {
|
||||||
fprintf(stderr, "invalid server response\n");
|
fprintf(stderr, "invalid server response\n");
|
||||||
|
json_decref(received);
|
||||||
return commands;
|
return commands;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -249,7 +249,38 @@ static json_t *cosmo_send_rpc(cosmo *instance, json_t *commands, json_t *ack) {
|
|||||||
fprintf(stderr, "invalid poll response\n");
|
fprintf(stderr, "invalid poll response\n");
|
||||||
} else {
|
} else {
|
||||||
if (new_poll) {
|
if (new_poll) {
|
||||||
// TODO: resubscribe to ACTIVE subscriptions
|
assert(!pthread_mutex_lock(&instance->lock));
|
||||||
|
size_t i;
|
||||||
|
json_t *subscription;
|
||||||
|
json_array_foreach(instance->subscriptions, i, subscription) {
|
||||||
|
int state;
|
||||||
|
json_t *subject, *messages;
|
||||||
|
assert(!json_unpack(subscription, "{sisoso}", "state", &state, "subject", &subject, "messages", &messages));
|
||||||
|
|
||||||
|
if (state == SUBSCRIPTION_PENDING) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
||||||
|
if (json_array_size(messages)) {
|
||||||
|
// Restart at the last actual ID we received.
|
||||||
|
json_t *last_message = json_array_get(messages, json_array_size(messages) - 1);
|
||||||
|
json_object_set(arguments, "last_id", json_object_get(last_message, "id"));
|
||||||
|
} else {
|
||||||
|
json_t *num_messages = json_object_get(subscription, "num_messages");
|
||||||
|
if (num_messages) {
|
||||||
|
json_object_set(arguments, "messages", num_messages);
|
||||||
|
}
|
||||||
|
json_t *last_id = json_object_get(subscription, "last_id");
|
||||||
|
if (last_id) {
|
||||||
|
json_object_set(arguments, "last_id", last_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
json_array_append_new(instance->command_queue, cosmo_command("subscribe", arguments));
|
||||||
|
instance->next_delay_ms = 0;
|
||||||
|
}
|
||||||
|
assert(!pthread_mutex_unlock(&instance->lock));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -368,7 +399,7 @@ void cosmo_subscribe(cosmo *instance, json_t *subject, const json_int_t messages
|
|||||||
json_t *arguments = json_pack("{sO}", "subject", subject);
|
json_t *arguments = json_pack("{sO}", "subject", subject);
|
||||||
if (messages) {
|
if (messages) {
|
||||||
json_object_set_new(arguments, "messages", json_integer(messages));
|
json_object_set_new(arguments, "messages", json_integer(messages));
|
||||||
json_object_set_new(subscription, "messages", json_integer(messages));
|
json_object_set_new(subscription, "num_messages", json_integer(messages));
|
||||||
}
|
}
|
||||||
if (last_id) {
|
if (last_id) {
|
||||||
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
json_object_set_new(arguments, "last_id", json_integer(last_id));
|
||||||
|
|||||||
@@ -142,10 +142,38 @@ bool test_logout_fires(test_state *state) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool test_resubscribe(test_state *state) {
|
||||||
|
cosmo *client = create_client(state);
|
||||||
|
|
||||||
|
json_t *subject = random_subject(NULL, NULL);
|
||||||
|
cosmo_subscribe(client, subject, -1, 0);
|
||||||
|
|
||||||
|
json_t *message_out = random_message();
|
||||||
|
cosmo_send_message(client, subject, message_out);
|
||||||
|
const json_t *message_in = wait_for_message(state);
|
||||||
|
assert(json_equal(message_out, json_object_get(message_in, "message")));
|
||||||
|
json_decref(message_out);
|
||||||
|
|
||||||
|
// Reach in and reset the instance ID so we look new.
|
||||||
|
cosmo_uuid(client->instance_id);
|
||||||
|
|
||||||
|
message_out = random_message();
|
||||||
|
cosmo_send_message(client, subject, message_out);
|
||||||
|
message_in = wait_for_message(state);
|
||||||
|
assert(json_equal(message_out, json_object_get(message_in, "message")));
|
||||||
|
json_decref(message_out);
|
||||||
|
|
||||||
|
json_decref(subject);
|
||||||
|
|
||||||
|
cosmo_shutdown(client);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
RUN_TEST(test_create_destroy);
|
RUN_TEST(test_create_destroy);
|
||||||
RUN_TEST(test_message_round_trip);
|
RUN_TEST(test_message_round_trip);
|
||||||
RUN_TEST(test_logout_fires);
|
RUN_TEST(test_logout_fires);
|
||||||
|
RUN_TEST(test_resubscribe);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user