diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 6c5cd6f..04e0c2c 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -254,57 +254,84 @@ Cosmopolite.prototype.shutdown = function() { * * Start receiving messages sent to this subject via the onMessage callback. * - * @param {Cosmopolite.typeSubjectLoose} subject + * @param {Cosmopolite.typeSubjectLoose|Array.} + * subjects * @param {?number=} opt_messages Number of recent messages to request; * 0 for none, -1 for all * @param {?number=} opt_lastID ID of last message received; fetch messages * since - * @return {Promise} + * @return {Promise|Array.} */ -Cosmopolite.prototype.subscribe = function(subject, opt_messages, opt_lastID) { - return this.newPromise_(function(resolve, reject) { - /** @type {Cosmopolite.typeSubject} */ - var canonicalSubject = this.canonicalSubject_(subject); - /** @type {string} */ - var subjectString = this.subjectString_(canonicalSubject); - if (!(subjectString in this.subscriptions_)) { - this.subscriptions_[subjectString] = { - 'subject': canonicalSubject, - 'messages': [], - 'pins': [], - 'state': Cosmopolite.SubscriptionState_.PENDING - }; - } +Cosmopolite.prototype.subscribe = function(subjects, opt_messages, opt_lastID) { + var single = false; - var args = { - 'subject': canonicalSubject - }; - if (opt_messages) { - args['messages'] = opt_messages; - } - if (opt_lastID != null) { - args['last_id'] = opt_lastID; - } + if (!(subjects instanceof Array)) { + single = true; + subjects = [subjects]; + } - this.sendRPC_('subscribe', args, function(response) { + var ret = []; + var rpcs = []; + for (var i = 0; i < subjects.length; i++) { + var subject = subjects[i]; + + ret.push(this.newPromise_(function(resolve, reject) { + /** @type {Cosmopolite.typeSubject} */ + var canonicalSubject = this.canonicalSubject_(subject); /** @type {string} */ - var result = response['result']; - if (result == 'ok') { - // unsubscribe may have been called since we sent the RPC. That's racy - // without waiting for the promise, but do our best - if (subjectString in this.subscriptions_) { - this.subscriptions_[subjectString].state = - Cosmopolite.SubscriptionState_.ACTIVE; - } - resolve(); - this.trackEvent( - 'send', 'event', 'cosmopolite', 'subscribe', subjectString); - } else { - delete this.subscriptions_[subjectString]; - reject(new Error(result)); + var subjectString = this.subjectString_(canonicalSubject); + if (!(subjectString in this.subscriptions_)) { + this.subscriptions_[subjectString] = { + 'subject': canonicalSubject, + 'messages': [], + 'pins': [], + 'state': Cosmopolite.SubscriptionState_.PENDING + }; } - }); - }.bind(this)); + + var args = { + 'subject': canonicalSubject + }; + if (opt_messages) { + args['messages'] = opt_messages; + } + if (opt_lastID != null) { + args['last_id'] = opt_lastID; + } + + var onSuccess = function(response) { + /** @type {string} */ + var result = response['result']; + if (result == 'ok') { + // unsubscribe may have been called since we sent the RPC. That's racy + // without waiting for the promise, but do our best + if (subjectString in this.subscriptions_) { + this.subscriptions_[subjectString].state = + Cosmopolite.SubscriptionState_.ACTIVE; + } + resolve(); + this.trackEvent( + 'send', 'event', 'cosmopolite', 'subscribe', subjectString); + } else { + delete this.subscriptions_[subjectString]; + reject(new Error(result)); + } + }; + rpcs.push({ + 'command': 'subscribe', + 'arguments': args, + 'onSuccess': onSuccess + }); + }.bind(this))); + } + + this.sendRPCs_(rpcs); + + if (single) { + return ret[0]; + } else { + return ret; + } }; diff --git a/static/test.js b/static/test.js index c82671a..00086c1 100644 --- a/static/test.js +++ b/static/test.js @@ -138,6 +138,30 @@ asyncTest('Message round trip without channel', function() { cosmo.subscribe(subject, -1); }); +asyncTest('Bulk subscribe', function() { + expect(2); + + var subject1 = randstring(); + var subject2 = randstring(); + var message = randstring(); + + var messages = 0; + + var callbacks = { + 'onMessage': function(e) { + equal(e['message'], message, 'message matches'); + if (++messages == 2) { + cosmo.shutdown(); + start(); + } + } + }; + + var cosmo = new Cosmopolite(callbacks, null, randstring()); + cosmo.sendMessage(subject1, message); + cosmo.sendMessage(subject2, message); + cosmo.subscribe([subject1, subject2], -1); +}); asyncTest('Complex object', function() { expect(2);