Delay RPC sending and subject resubscription until we have an open channel on the client side, to try to work around prod appengine disappearing messages. Add a test for channel reconnection and re-subscription.

This commit is contained in:
Ian Gulliver
2014-05-19 15:21:21 +03:00
parent 640da84124
commit c361d41fcf
2 changed files with 97 additions and 24 deletions

View File

@@ -38,7 +38,7 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) {
this.urlPrefix_ = urlPrefix || '/cosmopolite';
this.namespace_ = namespace || 'cosmopolite';
this.channelState_ = this.ChannelState.DOWN;
this.channelState_ = this.ChannelState.CLOSED;
this.shutdown_ = false;
this.rpcQueue_ = [];
@@ -82,9 +82,14 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) {
* @private
*/
Cosmopolite.prototype.ChannelState = {
DOWN: 1,
// No channel open, no RPC pending
CLOSED: 1,
// No channel open, RPC pending
PENDING: 2,
UP: 3,
// RPC complete, channel opening
OPENING: 3,
// Channel opened
OPEN: 3,
};
@@ -355,10 +360,10 @@ Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) {
'arguments': args,
'onSuccess': onSuccess,
};
if (this.namespace_ + ':client_id' in localStorage) {
if (this.maySendRPC_()) {
this.sendRPCs_([rpc]);
} else {
// Initial RPC hasn't returned. Queue instead of sending.
// Queue instead of sending.
this.rpcQueue_.push(rpc);
}
};
@@ -374,7 +379,7 @@ Cosmopolite.prototype.sendRPC_ = function(command, args, onSuccess) {
* @param {number=} delay Seconds waited before executing this call (for backoff)
*/
Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
if (this.shutdown_) {
if (this.shutdown_ || !commands.length) {
return;
}
var request = {
@@ -425,11 +430,6 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
}
if ('client_id' in data) {
localStorage[this.namespace_ + ':client_id'] = data['client_id'];
// We may have queued RPCs for lack of a client_id.
if (this.rpcQueue_.length) {
this.sendRPCs_(this.rpcQueue_);
this.rpcQueue_ = [];
}
}
if (data['status'] == 'retry') {
@@ -444,6 +444,8 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
return;
}
this.flushRPCQueue_();
// Handle events that were immediately available as if they came over the
// channel. Fire them before the message callbacks, so clients can use
// events like the subscribe promise fulfillment as a barrier for initial
@@ -463,21 +465,39 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
};
/**
* Send RPCs to create a server -> client channel and (re-)subscribe to subjects
* Are we currently clear to put RPCs on the wire?
*
* @return {Boolean} Yes or no?
*/
Cosmopolite.prototype.createChannel_ = function() {
if (this.channelState_ == this.ChannelState.DOWN) {
this.channelState_ = this.ChannelState.PENDING;
} else {
Cosmopolite.prototype.maySendRPC_ = function() {
if (!(this.namespace_ + ':client_id' in localStorage)) {
return false;
}
if (this.channelState_ != this.ChannelState.OPEN) {
return false;
}
return true;
}
/**
* Send queued RPCs
*/
Cosmopolite.prototype.flushRPCQueue_ = function() {
if (!this.maySendRPC_() || !this.rpcQueue_.length) {
return;
}
var rpcs = [
{
'command': 'createChannel',
'onSuccess': this.onCreateChannel_,
},
];
this.sendRPCs_(this.rpcQueue_);
this.rpcQueue_ = [];
};
/**
* Resubscribe to subjects (i.e. after reconnection)
*/
Cosmopolite.prototype.resubscribe_ = function() {
var rpcs = [];
for (var subject in this.subscriptions_) {
var subscription = this.subscriptions_[subject];
if (subscription.state != this.SubscriptionState.ACTIVE) {
@@ -498,6 +518,26 @@ Cosmopolite.prototype.createChannel_ = function() {
this.sendRPCs_(rpcs);
};
/**
* Send RPC to create a server -> client channel
*/
Cosmopolite.prototype.createChannel_ = function() {
if (this.channelState_ == this.ChannelState.CLOSED) {
this.channelState_ = this.ChannelState.PENDING;
} else {
return;
}
var rpcs = [
{
'command': 'createChannel',
'onSuccess': this.onCreateChannel_,
},
];
// sendRPCs instead of sendRPC so we don't queue.
this.sendRPCs_(rpcs);
};
/**
* Callback for channel creation on the server side
*
@@ -511,7 +551,7 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) {
}
if (this.channelState_ == this.ChannelState.PENDING) {
this.channelState_ = this.ChannelState.OPEN;
this.channelState_ = this.ChannelState.OPENING;
} else {
return;
}
@@ -531,9 +571,19 @@ Cosmopolite.prototype.onCreateChannel_ = function(data) {
*/
Cosmopolite.prototype.onSocketOpen_ = function() {
console.log(this.loggingPrefix_(), 'channel opened');
if (this.shutdown_ && this.socket_) {
this.socket_.close();
};
if (this.channelState_ == this.ChannelState.OPENING) {
this.channelState_ = this.ChannelState.OPEN;
} else {
return;
}
this.flushRPCQueue_();
this.resubscribe_();
};
/**
@@ -547,7 +597,7 @@ Cosmopolite.prototype.onSocketClose_ = function() {
}
if (this.channelState_ == this.ChannelState.OPEN) {
this.channelState_ = this.ChannelState.DOWN;
this.channelState_ = this.ChannelState.CLOSED;
} else {
return;
}

View File

@@ -351,6 +351,29 @@ asyncTest('Message ordering', function() {
sendNextMessage();
});
asyncTest('Reconnect channel', function() {
expect(2);
var subject = randstring();
var message = randstring();
var callbacks = {
'onMessage': function(msg) {
equal(msg['subject']['name'], subject, 'subject matches');
equal(msg['message'], message, 'message matches');
cosmo.shutdown();
start();
},
};
var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.subscribe(subject, 0).then(function() {
// Reach inside to forcefully close the socket
cosmo.socket_.close();
cosmo.sendMessage(subject, message);
});
});
module('dev_appserver only');