Remove onReady callback and replace with a queue of RPCs ready to send when we have a client_id.

This commit is contained in:
Ian Gulliver
2014-05-17 17:48:08 +03:00
parent 994afde51f
commit 966c6c8c50
2 changed files with 40 additions and 73 deletions

View File

@@ -38,9 +38,9 @@ Cosmopolite = function(callbacks, urlPrefix, namespace) {
this.urlPrefix_ = urlPrefix || '/cosmopolite'; this.urlPrefix_ = urlPrefix || '/cosmopolite';
this.namespace_ = namespace || 'cosmopolite'; this.namespace_ = namespace || 'cosmopolite';
this.ready_ = false;
this.shutdown_ = false; this.shutdown_ = false;
this.rpcQueue_ = [];
this.subscriptions_ = {}; this.subscriptions_ = {};
var scriptUrls = [ var scriptUrls = [
@@ -81,10 +81,6 @@ Cosmopolite.prototype.shutdown = function() {
* @param {Array.<string>=} keys Key names to ensure we receive at least 1 message defining * @param {Array.<string>=} keys Key names to ensure we receive at least 1 message defining
*/ */
Cosmopolite.prototype.subscribe = function(subject, messages, keys) { Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
keys = keys || []; keys = keys || [];
if (subject in this.subscriptions_) { if (subject in this.subscriptions_) {
@@ -115,10 +111,6 @@ Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
* @param {!string} subject Subject name, as passed to subscribe() * @param {!string} subject Subject name, as passed to subscribe()
*/ */
Cosmopolite.prototype.unsubscribe = function(subject) { Cosmopolite.prototype.unsubscribe = function(subject) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
delete this.subscriptions_[subject]; delete this.subscriptions_[subject];
var args = { var args = {
@@ -136,10 +128,6 @@ Cosmopolite.prototype.unsubscribe = function(subject) {
* @param {string=} key Key name to associate this message with * @param {string=} key Key name to associate this message with
*/ */
Cosmopolite.prototype.sendMessage = function(subject, message, key) { Cosmopolite.prototype.sendMessage = function(subject, message, key) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
var args = { var args = {
'subject': subject, 'subject': subject,
@@ -160,9 +148,6 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
* @const * @const
*/ */
Cosmopolite.prototype.getMessages = function(subject) { Cosmopolite.prototype.getMessages = function(subject) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return this.subscriptions_[subject].messages; return this.subscriptions_[subject].messages;
}; };
@@ -174,9 +159,6 @@ Cosmopolite.prototype.getMessages = function(subject) {
* @const * @const
*/ */
Cosmopolite.prototype.getKeyMessage = function(subject, key) { Cosmopolite.prototype.getKeyMessage = function(subject, key) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return this.subscriptions_[subject].keys[key]; return this.subscriptions_[subject].keys[key];
}; };
@@ -285,13 +267,17 @@ Cosmopolite.prototype.registerMessageHandlers_ = function() {
* @param {function(Object)=} onSuccess Success callback function * @param {function(Object)=} onSuccess Success callback function
*/ */
Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) { Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) {
this.sendRPCs_([ var rpc = {
{ 'command': command,
'command': command, 'arguments': args,
'arguments': args, 'onSuccess': onSuccess,
'onSuccess': onSuccess, };
} if (this.namespace_ + ':client_id' in localStorage) {
]); this.sendRPCs_([rpc]);
} else {
// Initial RPC hasn't returned. Queue instead of sending.
this.rpcQueue_.push(rpc);
}
}; };
/** /**
@@ -352,6 +338,11 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
} }
if ('client_id' in data) { if ('client_id' in data) {
localStorage[this.namespace_ + ':client_id'] = data['client_id']; localStorage[this.namespace_ + ':client_id'] = data['client_id'];
// We may have queued RPCs for lack of a client_id.
if (this.rpcQueue_.length) {
this.sendRPCs_(this.rpcQueue_);
this.rpcQueue_ = [];
}
} }
if (data['status'] == 'retry') { if (data['status'] == 'retry') {
// Discard delay // Discard delay
@@ -413,12 +404,6 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) {
if (this.shutdown_) { if (this.shutdown_) {
return; return;
} }
if (!this.ready_) {
this.ready_ = true;
if ('onReady' in this.callbacks_) {
this.callbacks_['onReady']();
}
}
var channel = new goog.appengine.Channel(data['token']); var channel = new goog.appengine.Channel(data['token']);
console.log(this.loggingPrefix_(), 'opening channel:', data['token']); console.log(this.loggingPrefix_(), 'opening channel:', data['token']);
this.socket_ = channel.open({ this.socket_ = channel.open({

View File

@@ -93,10 +93,6 @@ asyncTest('Message round trip', function() {
var message = randstring(); var message = randstring();
var callbacks = { var callbacks = {
'onReady': function() {
cosmo.sendMessage(subject, message);
cosmo.subscribe(subject, -1);
},
'onMessage': function(e) { 'onMessage': function(e) {
equal(e['subject'], subject, 'subject matches'); equal(e['subject'], subject, 'subject matches');
equal(e['message'], message, 'message matches'); equal(e['message'], message, 'message matches');
@@ -106,6 +102,8 @@ asyncTest('Message round trip', function() {
}; };
var cosmo = new Cosmopolite(callbacks, null, randstring()); var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.sendMessage(subject, message);
cosmo.subscribe(subject, -1);
}); });
asyncTest('Overwrite key', function() { asyncTest('Overwrite key', function() {
@@ -119,10 +117,6 @@ asyncTest('Overwrite key', function() {
var messages = 0; var messages = 0;
var callbacks = { var callbacks = {
'onReady': function() {
cosmo.subscribe(subject, -1);
cosmo.sendMessage(subject, message1, key);
},
'onMessage': function(e) { 'onMessage': function(e) {
messages++; messages++;
equal(e['subject'], subject, 'subject matches'); equal(e['subject'], subject, 'subject matches');
@@ -141,6 +135,8 @@ asyncTest('Overwrite key', function() {
}; };
var cosmo = new Cosmopolite(callbacks, null, randstring()); var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.subscribe(subject, -1);
cosmo.sendMessage(subject, message1, key);
}); });
asyncTest('Complex object', function() { asyncTest('Complex object', function() {
@@ -159,10 +155,6 @@ asyncTest('Complex object', function() {
}; };
var callbacks = { var callbacks = {
'onReady': function() {
cosmo.sendMessage(subject, message);
cosmo.subscribe(subject, -1);
},
'onMessage': function(e) { 'onMessage': function(e) {
equal(e['subject'], subject, 'subject matches'); equal(e['subject'], subject, 'subject matches');
deepEqual(e['message'], message, 'message matches'); deepEqual(e['message'], message, 'message matches');
@@ -172,6 +164,8 @@ asyncTest('Complex object', function() {
}; };
var cosmo = new Cosmopolite(callbacks, null, randstring()); var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.sendMessage(subject, message);
cosmo.subscribe(subject, -1);
}); });
asyncTest('sendMessage Promise', function() { asyncTest('sendMessage Promise', function() {
@@ -180,17 +174,12 @@ asyncTest('sendMessage Promise', function() {
var subject = randstring(); var subject = randstring();
var message = randstring(); var message = randstring();
var callbacks = { var cosmo = new Cosmopolite({}, null, randstring());
'onReady': function() { cosmo.sendMessage(subject, message).then(function() {
cosmo.sendMessage(subject, message).then(function() { ok(true, 'sendMessage Promise fulfilled');
ok(true, 'sendMessage Promise fulfilled'); cosmo.shutdown();
cosmo.shutdown(); start();
start(); });
});
},
};
var cosmo = new Cosmopolite(callbacks, null, randstring());
}); });
asyncTest('subscribe/unsubscribe Promise', function() { asyncTest('subscribe/unsubscribe Promise', function() {
@@ -199,20 +188,15 @@ asyncTest('subscribe/unsubscribe Promise', function() {
var subject = randstring(); var subject = randstring();
var message = randstring(); var message = randstring();
var callbacks = { var cosmo = new Cosmopolite({}, null, randstring());
'onReady': function() { cosmo.subscribe(subject).then(function() {
cosmo.subscribe(subject).then(function() { ok(true, 'subscribe Promise fulfilled');
ok(true, 'subscribe Promise fulfilled'); cosmo.unsubscribe(subject).then(function() {
cosmo.unsubscribe(subject).then(function() { ok(true, 'unsubscribe Promise fulfilled');
ok(true, 'unsubscribe Promise fulfilled'); cosmo.shutdown();
cosmo.shutdown(); start();
start(); });
}); });
});
},
};
var cosmo = new Cosmopolite(callbacks, null, randstring());
}); });
@@ -253,10 +237,6 @@ asyncTest('Profile merge', function() {
logout(function() { logout(function() {
var callbacks = { var callbacks = {
'onReady': function() {
cosmo.sendMessage(subject, message);
cosmo.subscribe(subject, -1);
},
'onMessage': function(msg) { 'onMessage': function(msg) {
messages++; messages++;
equal(msg['subject'], subject, 'message #' + messages + ': subject matches'); equal(msg['subject'], subject, 'message #' + messages + ': subject matches');
@@ -277,5 +257,7 @@ asyncTest('Profile merge', function() {
}, },
}; };
var cosmo = new Cosmopolite(callbacks); var cosmo = new Cosmopolite(callbacks);
cosmo.sendMessage(subject, message);
cosmo.subscribe(subject, -1);
}); });
}); });