Move onReady callback to after channel response from server, so we have a client_id before we start sending other RPCs.

This commit is contained in:
Ian Gulliver
2014-05-14 23:28:56 +03:00
parent 3a3cba9be3
commit 1e755c9693
3 changed files with 37 additions and 23 deletions

1
api.py
View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import webapp2 import webapp2
from google.appengine.api import channel from google.appengine.api import channel

View File

@@ -38,6 +38,7 @@ Cosmopolite = function(callbacks, urlPrefix, namespace) {
this.urlPrefix_ = urlPrefix || '/cosmopolite'; this.urlPrefix_ = urlPrefix || '/cosmopolite';
this.namespace_ = namespace || 'cosmopolite'; this.namespace_ = namespace || 'cosmopolite';
this.ready_ = false;
this.shutdown_ = false; this.shutdown_ = false;
this.subscriptions_ = {}; this.subscriptions_ = {};
@@ -77,9 +78,12 @@ Cosmopolite.prototype.shutdown = function() {
* @param {Array.<string>=} keys Key names to ensure we receive at least 1 message defining * @param {Array.<string>=} keys Key names to ensure we receive at least 1 message defining
*/ */
Cosmopolite.prototype.subscribe = function(subject, messages, keys) { Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
keys = keys || []; keys = keys || [];
if (subject in this.subscriptions_) { if (subject in this.subscriptions_) {
console.log('Not sending duplication subscription request for subject:', subject); console.log('cosmopolite: not sending duplication subscription request for subject:', subject);
return; return;
} }
this.subscriptions_[subject] = { this.subscriptions_[subject] = {
@@ -102,6 +106,9 @@ Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
* @param {!string} subject Subject name, as passed to subscribe() * @param {!string} subject Subject name, as passed to subscribe()
*/ */
Cosmopolite.prototype.unsubscribe = function(subject) { Cosmopolite.prototype.unsubscribe = function(subject) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
delete this.subscriptions_[subject]; delete this.subscriptions_[subject];
this.sendRPC_('unsubscribe', { this.sendRPC_('unsubscribe', {
'subject': subject, 'subject': subject,
@@ -116,7 +123,10 @@ Cosmopolite.prototype.unsubscribe = function(subject) {
* @param {string=} key Key name to associate this message with * @param {string=} key Key name to associate this message with
*/ */
Cosmopolite.prototype.sendMessage = function(subject, message, key) { Cosmopolite.prototype.sendMessage = function(subject, message, key) {
args = { if (!this.ready_) {
throw "cosmopolite: not ready";
}
var args = {
'subject': subject, 'subject': subject,
'message': JSON.stringify(message), 'message': JSON.stringify(message),
}; };
@@ -133,6 +143,9 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
* @const * @const
*/ */
Cosmopolite.prototype.getMessages = function(subject) { Cosmopolite.prototype.getMessages = function(subject) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return this.subscriptions_[subject].messages; return this.subscriptions_[subject].messages;
}; };
@@ -144,6 +157,9 @@ Cosmopolite.prototype.getMessages = function(subject) {
* @const * @const
*/ */
Cosmopolite.prototype.getKeyMessage = function(subject, key) { Cosmopolite.prototype.getKeyMessage = function(subject, key) {
if (!this.ready_) {
throw "cosmopolite: not ready";
}
return this.subscriptions_[subject].keys[key]; return this.subscriptions_[subject].keys[key];
}; };
@@ -161,9 +177,6 @@ Cosmopolite.prototype.onLoad_ = function() {
} }
this.registerMessageHandlers_(); this.registerMessageHandlers_();
this.createChannel_(); this.createChannel_();
if ('onReady' in this.callbacks_) {
this.callbacks_['onReady']();
};
}; };
/** /**
@@ -183,7 +196,7 @@ Cosmopolite.prototype.onReceiveMessage_ = function(data) {
this.socket_.close(); this.socket_.close();
break; break;
default: default:
console.log('Unknown event type:', data); console.log('cosmopolite: unknown event type:', data);
break; break;
} }
}; };
@@ -198,10 +211,10 @@ Cosmopolite.prototype.registerMessageHandlers_ = function() {
this.$(window).on('message', this.$.proxy(function(e) { this.$(window).on('message', this.$.proxy(function(e) {
if (e.originalEvent.origin != window.location.origin) { if (e.originalEvent.origin != window.location.origin) {
console.log( console.log(
'Received message from bad origin:', e.originalEvent.origin); 'cosmopolite: received message from bad origin:', e.originalEvent.origin);
return; return;
} }
console.log('Received browser message:', e.originalEvent.data); console.log('cosmopolite: received browser message:', e.originalEvent.data);
this.onReceiveMessage_(e.originalEvent.data); this.onReceiveMessage_(e.originalEvent.data);
}, this)); }, this));
}; };
@@ -275,7 +288,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
return; return;
} }
if (data['status'] != 'ok') { if (data['status'] != 'ok') {
console.log('Server returned unknown status:', data['status']); console.log('cosmopolite: server returned unknown status:', data['status']);
// TODO(flamingcow): Refresh the page? Show an alert? // TODO(flamingcow): Refresh the page? Show an alert?
return; return;
} }
@@ -293,7 +306,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
xhr.getResponseHeader('Retry-After') || xhr.getResponseHeader('Retry-After') ||
Math.min(32, Math.max(2, delay || 2)); Math.min(32, Math.max(2, delay || 2));
console.log( console.log(
'RPC failed. Will retry in ' + intDelay + ' seconds'); 'cosmopolite: RPC failed; will retry in ' + intDelay + ' seconds');
function retry() { function retry() {
this.sendRPCs_(commands, Math.pow(intDelay, 2)); this.sendRPCs_(commands, Math.pow(intDelay, 2));
} }
@@ -335,8 +348,14 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) {
if (this.shutdown_) { if (this.shutdown_) {
return; return;
} }
if (!this.ready_) {
this.ready_ = true;
if ('onReady' in this.callbacks_) {
this.callbacks_['onReady']();
}
}
var channel = new goog.appengine.Channel(data['token']); var channel = new goog.appengine.Channel(data['token']);
console.log('Opening channel...'); console.log('cosmopolite: opening channel:', data['token']);
this.socket_ = channel.open({ this.socket_ = channel.open({
onopen: this.$.proxy(this.onSocketOpen_, this), onopen: this.$.proxy(this.onSocketOpen_, this),
onclose: this.$.proxy(this.onSocketClose_, this), onclose: this.$.proxy(this.onSocketClose_, this),
@@ -349,7 +368,7 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) {
* Callback from channel library for successful open * Callback from channel library for successful open
*/ */
Cosmopolite.prototype.onSocketOpen_ = function() { Cosmopolite.prototype.onSocketOpen_ = function() {
console.log('Channel opened'); console.log('cosmopolite: channel opened');
}; };
/** /**
@@ -359,7 +378,7 @@ Cosmopolite.prototype.onSocketClose_ = function() {
if (!this.socket_) { if (!this.socket_) {
return; return;
} }
console.log('Channel closed'); console.log('cosmopolite: channel closed');
this.socket_ = null; this.socket_ = null;
this.createChannel_(); this.createChannel_();
}; };
@@ -396,14 +415,14 @@ Cosmopolite.prototype.onServerEvent_ = function(e) {
case 'message': case 'message':
var subscription = this.subscriptions_[e['subject']]; var subscription = this.subscriptions_[e['subject']];
if (!subscription) { if (!subscription) {
console.log('Message from unrecognized subject:', e); console.log('cosmopolite: message from unrecognized subject:', e);
break; break;
} }
var duplicate = subscription.messages.some(function(message) { var duplicate = subscription.messages.some(function(message) {
return message['id'] == e.id; return message['id'] == e.id;
}); });
if (duplicate) { if (duplicate) {
console.log('Duplicate message:', e); console.log('cosmopolite: duplicate message:', e);
break; break;
} }
e['message'] = JSON.parse(e['message']); e['message'] = JSON.parse(e['message']);
@@ -417,7 +436,7 @@ Cosmopolite.prototype.onServerEvent_ = function(e) {
break; break;
default: default:
// Client out of date? Force refresh? // Client out of date? Force refresh?
console.log('Unknown channel event:', e); console.log('cosmopolite: unknown channel event:', e);
break; break;
} }
}; };
@@ -428,7 +447,7 @@ Cosmopolite.prototype.onServerEvent_ = function(e) {
* @param {!string} msg Descriptive text * @param {!string} msg Descriptive text
*/ */
Cosmopolite.prototype.onSocketError_ = function(msg) { Cosmopolite.prototype.onSocketError_ = function(msg) {
console.log('Socket error:', msg); console.log('cosmopolite: socket error:', msg);
this.socket_.close(); this.socket_.close();
}; };

View File

@@ -54,23 +54,17 @@ asyncTest('Message round trip', function() {
var subject = randstring(); var subject = randstring();
var message = randstring(); var message = randstring();
console.log('subject:', subject);
console.log('message:', message);
var callbacks1 = { var callbacks1 = {
'onReady': function() { 'onReady': function() {
console.log('onReady 1');
cosmo1.sendMessage(subject, message); cosmo1.sendMessage(subject, message);
}, },
}; };
var callbacks2 = { var callbacks2 = {
'onReady': function() { 'onReady': function() {
console.log('onReady 2');
cosmo2.subscribe(subject, -1); cosmo2.subscribe(subject, -1);
}, },
'onMessage': function(e) { 'onMessage': function(e) {
console.log('onMessage');
equal(e['subject'], subject, 'subject matches'); equal(e['subject'], subject, 'subject matches');
equal(e['message'], message, 'message matches'); equal(e['message'], message, 'message matches');
cosmo1.shutdown(); cosmo1.shutdown();