Reliable message delivery across instances using the same namespace.

This commit is contained in:
Ian Gulliver
2014-05-17 18:43:46 +03:00
parent 1a66b86c13
commit 60c44a25a2
2 changed files with 77 additions and 6 deletions

View File

@@ -43,6 +43,21 @@ Cosmopolite = function(callbacks, urlPrefix, namespace) {
this.rpcQueue_ = [];
this.subscriptions_ = {};
this.messageQueueKey_ = this.namespace_ + ':message_queue';
if (this.messageQueueKey_ in localStorage) {
var messages = JSON.parse(localStorage[this.messageQueueKey_]);
if (messages.length) {
console.log(
this.loggingPrefix_(), '(re-)sending queued messages:', messages);
}
messages.forEach(function(message) {
this.sendRPC_(
'sendMessage', message, this.onMessageSent_.bind(this, message, null));
}.bind(this));
} else {
localStorage[this.messageQueueKey_] = JSON.stringify([]);
}
var scriptUrls = [
'/_ah/channel/jsapi',
];
@@ -89,16 +104,18 @@ Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
'not sending duplication subscription request for subject:', subject);
resolve();
}
this.subscriptions_[subject] = {
'messages': [],
'keys': {},
};
var args = {
'subject': subject,
'messages': messages,
'keys': keys,
};
this.sendRPC_('subscribe', args, resolve);
this.sendRPC_('subscribe', args, function() {
this.subscriptions_[subject] = {
'messages': [],
'keys': {},
};
resolve();
}.bind(this));
}.bind(this));
};
@@ -137,7 +154,14 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
if (key) {
args['key'] = key;
}
this.sendRPC_('sendMessage', args, resolve);
// No message left behind.
var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]);
messageQueue.push(args);
localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue);
this.sendRPC_(
'sendMessage', args, this.onMessageSent_.bind(this, args, resolve));
}.bind(this));
};
@@ -257,6 +281,24 @@ Cosmopolite.prototype.registerMessageHandlers_ = function() {
window.addEventListener('message', this.messageHandler_);
};
/**
* Callback for a sendMessage RPC ack by the server.
*
* @param {Object} message Message details.
* @param {function()=} resolve Promise resolution callback.
*/
Cosmopolite.prototype.onMessageSent_ = function(message, resolve) {
// No message left behind.
var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]);
messageQueue = messageQueue.filter(function(queuedMessage) {
return message['sender_message_id'] != queuedMessage['sender_message_id'];
});
localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue);
if (resolve) {
resolve();
}
};
/**
* Send a single RPC to the server.
*
@@ -291,6 +333,9 @@ Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) {
* @param {number=} delay Seconds waited before executing this call (for backoff)
*/
Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
if (this.shutdown_) {
return;
}
var request = {
'commands': [],
};

View File

@@ -231,6 +231,32 @@ asyncTest('Duplicate message suppression', function() {
});
});
asyncTest('Message persistence', function() {
expect(2);
var subject = randstring();
var message = randstring();
var namespace = randstring();
// Send a message and shut down too fast for it to hit the wire.
var cosmo1 = new Cosmopolite({}, null, namespace);
cosmo1.sendMessage(subject, message);
cosmo1.shutdown();
var callbacks = {
'onMessage': function(msg) {
equal(msg['subject'], subject, 'subject matches');
equal(msg['message'], message, 'message matches');
cosmo2.shutdown();
start();
},
};
var cosmo2 = new Cosmopolite(callbacks, null, namespace);
cosmo2.subscribe(subject, -1);
// Should pick up the message from the persistent queue.
});
module('dev_appserver only');