From 6d7745c6cd3765f38dae127937bc570bb1848e78 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 18 May 2014 19:06:27 +0300 Subject: [PATCH] Support last_id in subscribe() API. Add a subscription state machine to avoid sending subscribe requests with bad parameters. --- static/cosmopolite.js | 58 ++++++++++++++++++++++++++++++++++--------- static/test.js | 2 +- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/static/cosmopolite.js b/static/cosmopolite.js index e033ae1..c6e8d69 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -85,6 +85,18 @@ Cosmopolite.prototype.ChannelState = { }; +/** + * Subscription states + * @enum {number} + * @const + * @private + */ +Cosmopolite.prototype.SubscriptionState = { + PENDING: 1, + ACTIVE: 2, +}; + + /** * Shutdown this instance. * @@ -107,28 +119,41 @@ Cosmopolite.prototype.shutdown = function() { * Start receiving messages sent to this subject via the onMessage callback. * * @param {!string} subject Subject name - * @param {!number} messages Number of recent messages to request; 0 for none, -1 for all + * @param {number=} messages Number of recent messages to request; 0 for none, -1 for all + * @param {number=} last_id ID of last message received; fetch all messages since * @param {Array.=} keys Key names to ensure we receive at least 1 message defining */ -Cosmopolite.prototype.subscribe = function(subject, messages, keys) { +Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { return new Promise(function(resolve, reject) { - keys = keys || []; if (subject in this.subscriptions_) { console.log( this.loggingPrefix_(), - 'not sending duplication subscription request for subject:', subject); + 'not sending duplicate subscription request for subject:', subject); resolve(); } var args = { 'subject': subject, - 'messages': messages, - 'keys': keys, }; - this.subscriptions_[subject] = { + if (messages) { + args['messages'] = messages; + } + if (last_id != null) { + args['last_id'] = last_id; + } + if (keys != null) { + args['keys'] = keys; + } + var subscription = { 'messages': [], - 'keys': {}, + 'keys': {}, + 'state': this.SubscriptionState.PENDING, }; - this.sendRPC_('subscribe', args, resolve); + this.subscriptions_[subject] = subscription; + + this.sendRPC_('subscribe', args, function() { + subscription.state = this.SubscriptionState.ACTIVE; + resolve(); + }.bind(this)); }.bind(this)); }; @@ -390,6 +415,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { return; } var data = xhr.response; + if ('google_user_id' in data) { localStorage[this.namespace_ + ':google_user_id'] = data['google_user_id']; @@ -402,6 +428,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { this.rpcQueue_ = []; } } + if (data['status'] == 'retry') { // Discard delay this.sendRPCs_(commands); @@ -413,14 +440,18 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { // TODO(flamingcow): Refresh the page? Show an alert? return; } + + // Handle events that were immediately available as if they came over the + // channel. Fire them before the message callbacks, so clients can use + // events like the subscribe promise fulfillment as a barrier for initial + // data. + data['events'].forEach(this.onServerEvent_, this); + for (var i = 0; i < data['responses'].length; i++) { if (commands[i]['onSuccess']) { commands[i]['onSuccess'].bind(this)(data['responses'][i]); } } - // Handle events that were immediately available as if they came over the - // channel. - data['events'].forEach(this.onServerEvent_, this); }.bind(this)); xhr.addEventListener('error', retryAfterDelay); @@ -446,6 +477,9 @@ Cosmopolite.prototype.createChannel_ = function() { ]; for (var subject in this.subscriptions_) { var subscription = this.subscriptions_[subject]; + if (subscription.state != this.SubscriptionState.ACTIVE) { + continue; + } var last_id = 0; if (subscription.messages.length > 0) { last_id = subscription.messages[subscription.messages.length - 1]['id']; diff --git a/static/test.js b/static/test.js index 59cf186..c7bc8dd 100644 --- a/static/test.js +++ b/static/test.js @@ -226,7 +226,7 @@ asyncTest('Duplicate message suppression', function() { }; cosmo.sendMessage(subject, message1, key).then(function() { cosmo.sendMessage(subject, message2, key).then(function() { - cosmo.subscribe(subject, 0, [key]); + cosmo.subscribe(subject, 0, null, [key]); }); }); });