From c361d41fcf0a1de26e602bc6e8b4acf2f9dd068c Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 19 May 2014 15:21:21 +0300 Subject: [PATCH] Delay RPC sending and subject resubscription until we have an open channel on the client side, to try to work around prod appengine disappearing messages. Add a test for channel reconnection and re-subscription. --- static/cosmopolite.js | 98 ++++++++++++++++++++++++++++++++----------- static/test.js | 23 ++++++++++ 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/static/cosmopolite.js b/static/cosmopolite.js index b9b05cd..3df4533 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -38,7 +38,7 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) { this.urlPrefix_ = urlPrefix || '/cosmopolite'; this.namespace_ = namespace || 'cosmopolite'; - this.channelState_ = this.ChannelState.DOWN; + this.channelState_ = this.ChannelState.CLOSED; this.shutdown_ = false; this.rpcQueue_ = []; @@ -82,9 +82,14 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) { * @private */ Cosmopolite.prototype.ChannelState = { - DOWN: 1, + // No channel open, no RPC pending + CLOSED: 1, + // No channel open, RPC pending PENDING: 2, - UP: 3, + // RPC complete, channel opening + OPENING: 3, + // Channel opened + OPEN: 3, }; @@ -355,10 +360,10 @@ Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) { 'arguments': args, 'onSuccess': onSuccess, }; - if (this.namespace_ + ':client_id' in localStorage) { + if (this.maySendRPC_()) { this.sendRPCs_([rpc]); } else { - // Initial RPC hasn't returned. Queue instead of sending. + // Queue instead of sending. this.rpcQueue_.push(rpc); } }; @@ -374,7 +379,7 @@ Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) { * @param {number=} delay Seconds waited before executing this call (for backoff) */ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { - if (this.shutdown_) { + if (this.shutdown_ || !commands.length) { return; } var request = { @@ -425,11 +430,6 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { } if ('client_id' in data) { localStorage[this.namespace_ + ':client_id'] = data['client_id']; - // We may have queued RPCs for lack of a client_id. - if (this.rpcQueue_.length) { - this.sendRPCs_(this.rpcQueue_); - this.rpcQueue_ = []; - } } if (data['status'] == 'retry') { @@ -444,6 +444,8 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { return; } + this.flushRPCQueue_(); + // Handle events that were immediately available as if they came over the // channel. Fire them before the message callbacks, so clients can use // events like the subscribe promise fulfillment as a barrier for initial @@ -463,21 +465,39 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { }; /** - * Send RPCs to create a server -> client channel and (re-)subscribe to subjects + * Are we currently clear to put RPCs on the wire? + * + * @return {Boolean} Yes or no? */ -Cosmopolite.prototype.createChannel_ = function() { - if (this.channelState_ == this.ChannelState.DOWN) { - this.channelState_ = this.ChannelState.PENDING; - } else { +Cosmopolite.prototype.maySendRPC_ = function() { + if (!(this.namespace_ + ':client_id' in localStorage)) { + return false; + } + + if (this.channelState_ != this.ChannelState.OPEN) { + return false; + } + + return true; +} + +/** + * Send queued RPCs + */ +Cosmopolite.prototype.flushRPCQueue_ = function() { + if (!this.maySendRPC_() || !this.rpcQueue_.length) { return; } - var rpcs = [ - { - 'command': 'createChannel', - 'onSuccess': this.onCreateChannel_, - }, - ]; + this.sendRPCs_(this.rpcQueue_); + this.rpcQueue_ = []; +}; + +/** + * Resubscribe to subjects (i.e. after reconnection) + */ +Cosmopolite.prototype.resubscribe_ = function() { + var rpcs = []; for (var subject in this.subscriptions_) { var subscription = this.subscriptions_[subject]; if (subscription.state != this.SubscriptionState.ACTIVE) { @@ -498,6 +518,26 @@ Cosmopolite.prototype.createChannel_ = function() { this.sendRPCs_(rpcs); }; +/** + * Send RPC to create a server -> client channel + */ +Cosmopolite.prototype.createChannel_ = function() { + if (this.channelState_ == this.ChannelState.CLOSED) { + this.channelState_ = this.ChannelState.PENDING; + } else { + return; + } + + var rpcs = [ + { + 'command': 'createChannel', + 'onSuccess': this.onCreateChannel_, + }, + ]; + // sendRPCs instead of sendRPC so we don't queue. + this.sendRPCs_(rpcs); +}; + /** * Callback for channel creation on the server side * @@ -511,7 +551,7 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) { } if (this.channelState_ == this.ChannelState.PENDING) { - this.channelState_ = this.ChannelState.OPEN; + this.channelState_ = this.ChannelState.OPENING; } else { return; } @@ -531,9 +571,19 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) { */ Cosmopolite.prototype.onSocketOpen_ = function() { console.log(this.loggingPrefix_(), 'channel opened'); + if (this.shutdown_ && this.socket_) { this.socket_.close(); }; + + if (this.channelState_ == this.ChannelState.OPENING) { + this.channelState_ = this.ChannelState.OPEN; + } else { + return; + } + + this.flushRPCQueue_(); + this.resubscribe_(); }; /** @@ -547,7 +597,7 @@ Cosmopolite.prototype.onSocketClose_ = function() { } if (this.channelState_ == this.ChannelState.OPEN) { - this.channelState_ = this.ChannelState.DOWN; + this.channelState_ = this.ChannelState.CLOSED; } else { return; } diff --git a/static/test.js b/static/test.js index 88d7771..f93f874 100644 --- a/static/test.js +++ b/static/test.js @@ -351,6 +351,29 @@ asyncTest('Message ordering', function() { sendNextMessage(); }); +asyncTest('Reconnect channel', function() { + expect(2); + + var subject = randstring(); + var message = randstring(); + + var callbacks = { + 'onMessage': function(msg) { + equal(msg['subject']['name'], subject, 'subject matches'); + equal(msg['message'], message, 'message matches'); + cosmo.shutdown(); + start(); + }, + }; + + var cosmo = new Cosmopolite(callbacks, null, randstring()); + cosmo.subscribe(subject, 0).then(function() { + // Reach inside to forcefully close the socket + cosmo.socket_.close(); + cosmo.sendMessage(subject, message); + }); +}); + module('dev_appserver only');