Add bulk subscribe support.

This commit is contained in:
Ian Gulliver
2014-06-11 21:46:50 -07:00
parent bc42e8d5e3
commit 828a9c50ea
2 changed files with 93 additions and 42 deletions

View File

@@ -254,57 +254,84 @@ Cosmopolite.prototype.shutdown = function() {
* *
* Start receiving messages sent to this subject via the onMessage callback. * Start receiving messages sent to this subject via the onMessage callback.
* *
* @param {Cosmopolite.typeSubjectLoose} subject * @param {Cosmopolite.typeSubjectLoose|Array.<Cosmopolite.typeSubjectLoose>}
* subjects
* @param {?number=} opt_messages Number of recent messages to request; * @param {?number=} opt_messages Number of recent messages to request;
* 0 for none, -1 for all * 0 for none, -1 for all
* @param {?number=} opt_lastID ID of last message received; fetch messages * @param {?number=} opt_lastID ID of last message received; fetch messages
* since * since
* @return {Promise} * @return {Promise|Array.<Promise>}
*/ */
Cosmopolite.prototype.subscribe = function(subject, opt_messages, opt_lastID) { Cosmopolite.prototype.subscribe = function(subjects, opt_messages, opt_lastID) {
return this.newPromise_(function(resolve, reject) { var single = false;
/** @type {Cosmopolite.typeSubject} */
var canonicalSubject = this.canonicalSubject_(subject);
/** @type {string} */
var subjectString = this.subjectString_(canonicalSubject);
if (!(subjectString in this.subscriptions_)) {
this.subscriptions_[subjectString] = {
'subject': canonicalSubject,
'messages': [],
'pins': [],
'state': Cosmopolite.SubscriptionState_.PENDING
};
}
var args = { if (!(subjects instanceof Array)) {
'subject': canonicalSubject single = true;
}; subjects = [subjects];
if (opt_messages) { }
args['messages'] = opt_messages;
}
if (opt_lastID != null) {
args['last_id'] = opt_lastID;
}
this.sendRPC_('subscribe', args, function(response) { var ret = [];
var rpcs = [];
for (var i = 0; i < subjects.length; i++) {
var subject = subjects[i];
ret.push(this.newPromise_(function(resolve, reject) {
/** @type {Cosmopolite.typeSubject} */
var canonicalSubject = this.canonicalSubject_(subject);
/** @type {string} */ /** @type {string} */
var result = response['result']; var subjectString = this.subjectString_(canonicalSubject);
if (result == 'ok') { if (!(subjectString in this.subscriptions_)) {
// unsubscribe may have been called since we sent the RPC. That's racy this.subscriptions_[subjectString] = {
// without waiting for the promise, but do our best 'subject': canonicalSubject,
if (subjectString in this.subscriptions_) { 'messages': [],
this.subscriptions_[subjectString].state = 'pins': [],
Cosmopolite.SubscriptionState_.ACTIVE; 'state': Cosmopolite.SubscriptionState_.PENDING
} };
resolve();
this.trackEvent(
'send', 'event', 'cosmopolite', 'subscribe', subjectString);
} else {
delete this.subscriptions_[subjectString];
reject(new Error(result));
} }
});
}.bind(this)); var args = {
'subject': canonicalSubject
};
if (opt_messages) {
args['messages'] = opt_messages;
}
if (opt_lastID != null) {
args['last_id'] = opt_lastID;
}
var onSuccess = function(response) {
/** @type {string} */
var result = response['result'];
if (result == 'ok') {
// unsubscribe may have been called since we sent the RPC. That's racy
// without waiting for the promise, but do our best
if (subjectString in this.subscriptions_) {
this.subscriptions_[subjectString].state =
Cosmopolite.SubscriptionState_.ACTIVE;
}
resolve();
this.trackEvent(
'send', 'event', 'cosmopolite', 'subscribe', subjectString);
} else {
delete this.subscriptions_[subjectString];
reject(new Error(result));
}
};
rpcs.push({
'command': 'subscribe',
'arguments': args,
'onSuccess': onSuccess
});
}.bind(this)));
}
this.sendRPCs_(rpcs);
if (single) {
return ret[0];
} else {
return ret;
}
}; };

View File

@@ -138,6 +138,30 @@ asyncTest('Message round trip without channel', function() {
cosmo.subscribe(subject, -1); cosmo.subscribe(subject, -1);
}); });
asyncTest('Bulk subscribe', function() {
expect(2);
var subject1 = randstring();
var subject2 = randstring();
var message = randstring();
var messages = 0;
var callbacks = {
'onMessage': function(e) {
equal(e['message'], message, 'message matches');
if (++messages == 2) {
cosmo.shutdown();
start();
}
}
};
var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.sendMessage(subject1, message);
cosmo.sendMessage(subject2, message);
cosmo.subscribe([subject1, subject2], -1);
});
asyncTest('Complex object', function() { asyncTest('Complex object', function() {
expect(2); expect(2);