Add support for re-subscribing to the same subject with different message fetching parameters.

This commit is contained in:
Ian Gulliver
2014-05-18 19:31:22 +03:00
parent 56f0f3aeba
commit 27e857197a
2 changed files with 52 additions and 14 deletions

View File

@@ -52,6 +52,9 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) {
this.loggingPrefix_(), '(re-)sending queued messages:', messages); this.loggingPrefix_(), '(re-)sending queued messages:', messages);
} }
messages.forEach(function(message) { messages.forEach(function(message) {
// We don't use sendMessage because we need to preserve the first
// message's client_message_id, which is intentionally not exposed via
// the sendMessage API
this.sendRPC_( this.sendRPC_(
'sendMessage', message, this.onMessageSent_.bind(this, message, null)); 'sendMessage', message, this.onMessageSent_.bind(this, message, null));
}.bind(this)); }.bind(this));
@@ -125,12 +128,14 @@ Cosmopolite.prototype.shutdown = function() {
*/ */
Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
if (subject in this.subscriptions_) { if (!(subject in this.subscriptions_)) {
console.log( this.subscriptions_[subject] = {
this.loggingPrefix_(), 'messages': [],
'not sending duplicate subscription request for subject:', subject); 'keys': {},
resolve(); 'state': this.SubscriptionState.PENDING,
};
} }
var args = { var args = {
'subject': subject, 'subject': subject,
}; };
@@ -143,15 +148,13 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
if (keys != null) { if (keys != null) {
args['keys'] = keys; args['keys'] = keys;
} }
var subscription = {
'messages': [],
'keys': {},
'state': this.SubscriptionState.PENDING,
};
this.subscriptions_[subject] = subscription;
this.sendRPC_('subscribe', args, function() { this.sendRPC_('subscribe', args, function() {
subscription.state = this.SubscriptionState.ACTIVE; // unsubscribe may have been called since we sent the RPC. That's racy
// without waiting for the promise, but do our best
if (subject in this.subscriptions_) {
this.subscriptions_[subject].state = this.SubscriptionState.ACTIVE;
}
resolve(); resolve();
}.bind(this)); }.bind(this));
}.bind(this)); }.bind(this));
@@ -619,7 +622,19 @@ Cosmopolite.prototype.onMessage_ = function(e) {
return; return;
} }
e['message'] = JSON.parse(e['message']); e['message'] = JSON.parse(e['message']);
subscription.messages.push(e);
// Reverse search for the position to insert this message, as iit will most
// likely be at the end.
var insertAfter;
for (var insertAfter = subscription.messages.length - 1;
insertAfter >= 0; insertAfter++) {
var message = subscription.messages[insertAfter];
if (message['id'] < e['message']['id']) {
break;
}
}
subscription.messages.splice(insertAfter + 1, 0, e);
if (e['key']) { if (e['key']) {
subscription.keys[e['key']] = e; subscription.keys[e['key']] = e;
} }

View File

@@ -275,7 +275,7 @@ test('getMessages/subscribe', function() {
}); });
asyncTest('subscribe barrier', function() { asyncTest('subscribe barrier', function() {
expect(2); expect(3);
var subject = randstring(); var subject = randstring();
var message = randstring(); var message = randstring();
@@ -287,6 +287,7 @@ asyncTest('subscribe barrier', function() {
// We are validating that the message event generated by the subscribe // We are validating that the message event generated by the subscribe
// call has already been processed by the time this promise fires // call has already been processed by the time this promise fires
equal(cosmo.getMessages(subject).length, 1, 'one message'); equal(cosmo.getMessages(subject).length, 1, 'one message');
equal(cosmo.getMessages(subject)[0]['subject']['name'], subject, 'subject matches');
equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches'); equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches');
cosmo.shutdown(); cosmo.shutdown();
start(); start();
@@ -294,6 +295,28 @@ asyncTest('subscribe barrier', function() {
}); });
}); });
asyncTest('resubscribe', function() {
expect(4);
var subject = randstring();
var message = randstring();
var cosmo = new Cosmopolite({}, null, randstring());
cosmo.sendMessage(subject, message).then(function() {
cosmo.subscribe(subject).then(function() {
equal(cosmo.getMessages(subject).length, 0, 'zero messages');
cosmo.subscribe(subject, -1).then(function() {
equal(cosmo.getMessages(subject).length, 1, 'one message');
equal(cosmo.getMessages(subject)[0]['subject']['name'], subject, 'subject matches');
equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches');
cosmo.shutdown();
start();
});
});
});
});
module('dev_appserver only'); module('dev_appserver only');