diff --git a/static/cosmopolite.js b/static/cosmopolite.js index c6e8d69..87f4bf7 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -52,6 +52,9 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) { this.loggingPrefix_(), '(re-)sending queued messages:', messages); } messages.forEach(function(message) { + // We don't use sendMessage because we need to preserve the first + // message's client_message_id, which is intentionally not exposed via + // the sendMessage API this.sendRPC_( 'sendMessage', message, this.onMessageSent_.bind(this, message, null)); }.bind(this)); @@ -125,12 +128,14 @@ Cosmopolite.prototype.shutdown = function() { */ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { return new Promise(function(resolve, reject) { - if (subject in this.subscriptions_) { - console.log( - this.loggingPrefix_(), - 'not sending duplicate subscription request for subject:', subject); - resolve(); + if (!(subject in this.subscriptions_)) { + this.subscriptions_[subject] = { + 'messages': [], + 'keys': {}, + 'state': this.SubscriptionState.PENDING, + }; } + var args = { 'subject': subject, }; @@ -143,15 +148,13 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { if (keys != null) { args['keys'] = keys; } - var subscription = { - 'messages': [], - 'keys': {}, - 'state': this.SubscriptionState.PENDING, - }; - this.subscriptions_[subject] = subscription; this.sendRPC_('subscribe', args, function() { - subscription.state = this.SubscriptionState.ACTIVE; + // unsubscribe may have been called since we sent the RPC. That's racy + // without waiting for the promise, but do our best + if (subject in this.subscriptions_) { + this.subscriptions_[subject].state = this.SubscriptionState.ACTIVE; + } resolve(); }.bind(this)); }.bind(this)); @@ -619,7 +622,19 @@ Cosmopolite.prototype.onMessage_ = function(e) { return; } e['message'] = JSON.parse(e['message']); - subscription.messages.push(e); + + // Reverse search for the position to insert this message, as iit will most + // likely be at the end. + var insertAfter; + for (var insertAfter = subscription.messages.length - 1; + insertAfter >= 0; insertAfter++) { + var message = subscription.messages[insertAfter]; + if (message['id'] < e['message']['id']) { + break; + } + } + subscription.messages.splice(insertAfter + 1, 0, e); + if (e['key']) { subscription.keys[e['key']] = e; } diff --git a/static/test.js b/static/test.js index 8bbbeff..943b029 100644 --- a/static/test.js +++ b/static/test.js @@ -275,7 +275,7 @@ test('getMessages/subscribe', function() { }); asyncTest('subscribe barrier', function() { - expect(2); + expect(3); var subject = randstring(); var message = randstring(); @@ -287,6 +287,7 @@ asyncTest('subscribe barrier', function() { // We are validating that the message event generated by the subscribe // call has already been processed by the time this promise fires equal(cosmo.getMessages(subject).length, 1, 'one message'); + equal(cosmo.getMessages(subject)[0]['subject']['name'], subject, 'subject matches'); equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches'); cosmo.shutdown(); start(); @@ -294,6 +295,28 @@ asyncTest('subscribe barrier', function() { }); }); +asyncTest('resubscribe', function() { + expect(4); + + var subject = randstring(); + var message = randstring(); + + var cosmo = new Cosmopolite({}, null, randstring()); + + cosmo.sendMessage(subject, message).then(function() { + cosmo.subscribe(subject).then(function() { + equal(cosmo.getMessages(subject).length, 0, 'zero messages'); + cosmo.subscribe(subject, -1).then(function() { + equal(cosmo.getMessages(subject).length, 1, 'one message'); + equal(cosmo.getMessages(subject)[0]['subject']['name'], subject, 'subject matches'); + equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches'); + cosmo.shutdown(); + start(); + }); + }); + }); +}); + module('dev_appserver only');