diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 63b62d4..b8e8039 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -43,6 +43,21 @@ Cosmopolite = function(callbacks, urlPrefix, namespace) { this.rpcQueue_ = []; this.subscriptions_ = {}; + this.messageQueueKey_ = this.namespace_ + ':message_queue'; + if (this.messageQueueKey_ in localStorage) { + var messages = JSON.parse(localStorage[this.messageQueueKey_]); + if (messages.length) { + console.log( + this.loggingPrefix_(), '(re-)sending queued messages:', messages); + } + messages.forEach(function(message) { + this.sendRPC_( + 'sendMessage', message, this.onMessageSent_.bind(this, message, null)); + }.bind(this)); + } else { + localStorage[this.messageQueueKey_] = JSON.stringify([]); + } + var scriptUrls = [ '/_ah/channel/jsapi', ]; @@ -89,16 +104,18 @@ Cosmopolite.prototype.subscribe = function(subject, messages, keys) { 'not sending duplication subscription request for subject:', subject); resolve(); } - this.subscriptions_[subject] = { - 'messages': [], - 'keys': {}, - }; var args = { 'subject': subject, 'messages': messages, 'keys': keys, }; - this.sendRPC_('subscribe', args, resolve); + this.sendRPC_('subscribe', args, function() { + this.subscriptions_[subject] = { + 'messages': [], + 'keys': {}, + }; + resolve(); + }.bind(this)); }.bind(this)); }; @@ -137,7 +154,14 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) { if (key) { args['key'] = key; } - this.sendRPC_('sendMessage', args, resolve); + + // No message left behind. + var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]); + messageQueue.push(args); + localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue); + + this.sendRPC_( + 'sendMessage', args, this.onMessageSent_.bind(this, args, resolve)); }.bind(this)); }; @@ -257,6 +281,24 @@ Cosmopolite.prototype.registerMessageHandlers_ = function() { window.addEventListener('message', this.messageHandler_); }; +/** + * Callback for a sendMessage RPC ack by the server. + * + * @param {Object} message Message details. + * @param {function()=} resolve Promise resolution callback. + */ +Cosmopolite.prototype.onMessageSent_ = function(message, resolve) { + // No message left behind. + var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]); + messageQueue = messageQueue.filter(function(queuedMessage) { + return message['sender_message_id'] != queuedMessage['sender_message_id']; + }); + localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue); + if (resolve) { + resolve(); + } +}; + /** * Send a single RPC to the server. * @@ -291,6 +333,9 @@ Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) { * @param {number=} delay Seconds waited before executing this call (for backoff) */ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { + if (this.shutdown_) { + return; + } var request = { 'commands': [], }; diff --git a/static/test.js b/static/test.js index 49d11a9..84e1903 100644 --- a/static/test.js +++ b/static/test.js @@ -231,6 +231,32 @@ asyncTest('Duplicate message suppression', function() { }); }); +asyncTest('Message persistence', function() { + expect(2); + + var subject = randstring(); + var message = randstring(); + var namespace = randstring(); + + // Send a message and shut down too fast for it to hit the wire. + var cosmo1 = new Cosmopolite({}, null, namespace); + cosmo1.sendMessage(subject, message); + cosmo1.shutdown(); + + var callbacks = { + 'onMessage': function(msg) { + equal(msg['subject'], subject, 'subject matches'); + equal(msg['message'], message, 'message matches'); + cosmo2.shutdown(); + start(); + }, + }; + + var cosmo2 = new Cosmopolite(callbacks, null, namespace); + cosmo2.subscribe(subject, -1); + // Should pick up the message from the persistent queue. +}); + module('dev_appserver only');