Support last_id in subscribe() API. Add a subscription state machine to avoid sending subscribe requests with bad parameters.

This commit is contained in:
Ian Gulliver
2014-05-18 19:06:27 +03:00
parent c8c968a2aa
commit 6d7745c6cd
2 changed files with 47 additions and 13 deletions

View File

@@ -85,6 +85,18 @@ Cosmopolite.prototype.ChannelState = {
};
/**
* Subscription states
* @enum {number}
* @const
* @private
*/
Cosmopolite.prototype.SubscriptionState = {
PENDING: 1,
ACTIVE: 2,
};
/**
* Shutdown this instance.
*
@@ -107,28 +119,41 @@ Cosmopolite.prototype.shutdown = function() {
* Start receiving messages sent to this subject via the onMessage callback.
*
* @param {!string} subject Subject name
* @param {!number} messages Number of recent messages to request; 0 for none, -1 for all
* @param {number=} messages Number of recent messages to request; 0 for none, -1 for all
* @param {number=} last_id ID of last message received; fetch all messages since
* @param {Array.<string>=} keys Key names to ensure we receive at least 1 message defining
*/
Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
return new Promise(function(resolve, reject) {
keys = keys || [];
if (subject in this.subscriptions_) {
console.log(
this.loggingPrefix_(),
'not sending duplication subscription request for subject:', subject);
'not sending duplicate subscription request for subject:', subject);
resolve();
}
var args = {
'subject': subject,
'messages': messages,
'keys': keys,
};
this.subscriptions_[subject] = {
if (messages) {
args['messages'] = messages;
}
if (last_id != null) {
args['last_id'] = last_id;
}
if (keys != null) {
args['keys'] = keys;
}
var subscription = {
'messages': [],
'keys': {},
'keys': {},
'state': this.SubscriptionState.PENDING,
};
this.sendRPC_('subscribe', args, resolve);
this.subscriptions_[subject] = subscription;
this.sendRPC_('subscribe', args, function() {
subscription.state = this.SubscriptionState.ACTIVE;
resolve();
}.bind(this));
}.bind(this));
};
@@ -390,6 +415,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
return;
}
var data = xhr.response;
if ('google_user_id' in data) {
localStorage[this.namespace_ + ':google_user_id'] =
data['google_user_id'];
@@ -402,6 +428,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
this.rpcQueue_ = [];
}
}
if (data['status'] == 'retry') {
// Discard delay
this.sendRPCs_(commands);
@@ -413,14 +440,18 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
// TODO(flamingcow): Refresh the page? Show an alert?
return;
}
// Handle events that were immediately available as if they came over the
// channel. Fire them before the message callbacks, so clients can use
// events like the subscribe promise fulfillment as a barrier for initial
// data.
data['events'].forEach(this.onServerEvent_, this);
for (var i = 0; i < data['responses'].length; i++) {
if (commands[i]['onSuccess']) {
commands[i]['onSuccess'].bind(this)(data['responses'][i]);
}
}
// Handle events that were immediately available as if they came over the
// channel.
data['events'].forEach(this.onServerEvent_, this);
}.bind(this));
xhr.addEventListener('error', retryAfterDelay);
@@ -446,6 +477,9 @@ Cosmopolite.prototype.createChannel_ = function() {
];
for (var subject in this.subscriptions_) {
var subscription = this.subscriptions_[subject];
if (subscription.state != this.SubscriptionState.ACTIVE) {
continue;
}
var last_id = 0;
if (subscription.messages.length > 0) {
last_id = subscription.messages[subscription.messages.length - 1]['id'];

View File

@@ -226,7 +226,7 @@ asyncTest('Duplicate message suppression', function() {
};
cosmo.sendMessage(subject, message1, key).then(function() {
cosmo.sendMessage(subject, message2, key).then(function() {
cosmo.subscribe(subject, 0, [key]);
cosmo.subscribe(subject, 0, null, [key]);
});
});
});