diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 2749b57..1dce891 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -290,6 +290,12 @@ Cosmopolite.prototype.subscribe = function(subjects, opt_messages, opt_lastID) { var subject = subjects[i]; ret.push(this.newPromise_(function(resolve, reject) { + if (subject['local'] && (subject['readable_only_by'] || subject['writable_only_by'])) { + console.log(this.loggingPrefix_(), 'local subjects can\'t have ACLs:', subject); + reject(); + return; + } + /** @type {Cosmopolite.typeSubject} */ var canonicalSubject = this.canonicalSubject_(subject); /** @type {string} */ @@ -303,6 +309,13 @@ Cosmopolite.prototype.subscribe = function(subjects, opt_messages, opt_lastID) { }; } + if (subject['local']) { + this.subscriptions_[subjectString].state = + Cosmopolite.SubscriptionState_.ACTIVE; + resolve(); + return; + } + var args = { 'subject': canonicalSubject }; @@ -374,6 +387,10 @@ Cosmopolite.prototype.unsubscribe = function(subject) { /** @type {string} */ var subjectString = this.subjectString_(canonicalSubject); delete this.subscriptions_[subjectString]; + if (subject['local']) { + resolve(); + return; + } var args = { 'subject': canonicalSubject }; @@ -390,12 +407,25 @@ Cosmopolite.prototype.unsubscribe = function(subject) { */ Cosmopolite.prototype.sendMessage = function(subject, message) { return this.newPromise_(function(resolve, reject) { + if (subject['local'] && (subject['readable_only_by'] || subject['writable_only_by'])) { + console.log(this.loggingPrefix_(), 'local subjects can\'t have ACLs:', subject); + reject(); + return; + } + var args = /** @type {Cosmopolite.typeMessage} */ ({ 'subject': this.canonicalSubject_(subject), 'message': JSON.stringify(message), 'sender_message_id': this.uuid() }); + if (subject['local']) { + resolve(message); + args['id'] = this.uuid(); + this.onMessage_(args); + return; + } + // No message left behind. var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]); messageQueue.push(args); @@ -496,11 +526,20 @@ Cosmopolite.prototype.pin = function(subject, message) { 'sender_message_id': id }; - - this.sendRPC_('pin', args, function() { + var onSuccess = function() { this.pins_[id] = args; resolve(id); - }); + } + + if (subject['local']) { + onSuccess.bind(this)(); + // Ugly hack, but we need this to promise to resolve before the callback fires. + // This is the equivalent of sched_yield(), and probably about as reliable. + window.setTimeout(this.onPin_.bind(this, Object.create(args)), 0); + return; + } + + this.sendRPC_('pin', args, onSuccess); }.bind(this)); }; @@ -520,6 +559,12 @@ Cosmopolite.prototype.unpin = function(id) { delete this.pins_[id]; + if (pin['subject']['local']) { + resolve(); + window.setTimeout(this.onUnpin_.bind(this, Object.create(pin)), 0); + return; + } + this.sendRPC_('unpin', args, resolve); }.bind(this)); }; @@ -682,6 +727,9 @@ Cosmopolite.prototype.canonicalSubject_ = function(subject) { if (subject['writable_only_by'] === null) { delete subject['writable_only_by']; } + if (!subject['local']) { + delete subject['local']; + } return subject; }; @@ -698,7 +746,8 @@ Cosmopolite.prototype.subjectString_ = function(subject) { return [ subject['name'], subject['readable_only_by'], - subject['writable_only_by'] + subject['writable_only_by'], + subject['local'] ].toString(); }; @@ -970,6 +1019,9 @@ Cosmopolite.prototype.onReconnect_ = function() { if (subscription.state != Cosmopolite.SubscriptionState_.ACTIVE) { continue; } + if (subject['local']) { + continue; + } /** @type {number} */ var lastID = 0; if (subscription.messages.length > 0) { @@ -986,6 +1038,9 @@ Cosmopolite.prototype.onReconnect_ = function() { for (var id in this.pins_) { /** @type {Cosmopolite.typeMessage} */ var pin = this.pins_[id]; + if (pin.subject['local']) { + continue; + } rpcs.push({ 'command': 'pin', 'arguments': pin diff --git a/static/test.js b/static/test.js index 1b467c1..c920393 100644 --- a/static/test.js +++ b/static/test.js @@ -648,6 +648,137 @@ QUnit.asyncTest('stopImmediatePropagation', function(assert) { cosmo.subscribe(subject, -1); }); +QUnit.asyncTest('Local subject -- Message round trip', function(assert) { + assert.expect(3); + + var subject = { + 'name': randstring(), + 'local': true + }; + var message = randstring(); + + var cosmo = new Cosmopolite(null, randstring()); + + cosmo.addEventListener('message', function(e) { + assert.equal(e.detail['subject']['name'], subject['name'], 'subject matches'); + assert.ok(e.detail['subject']['local'], 'subject still local'); + assert.equal(e.detail['message'], message, 'message matches'); + cosmo.shutdown(); + QUnit.start(); + }); + + cosmo.subscribe(subject, -1); + cosmo.sendMessage(subject, message); +}); + +QUnit.asyncTest('Local subject -- Subject is distinct', function(assert) { + assert.expect(1); + + var subject1 = { + 'name': randstring(), + 'local': true + }; + var subject2 = subject1['name']; + var message = randstring(); + + var cosmo = new Cosmopolite(null, randstring()); + + cosmo.addEventListener('message', function(e) { + assert.ok(false, 'message received on wrong subject'); + }); + + cosmo.subscribe(subject2, -1).then(function() { + assert.ok(true, 'subscribe resolved'); + cosmo.sendMessage(subject1, message); + window.setTimeout(function() { + cosmo.shutdown(); + QUnit.start(); + }, 5000); + }); +}); + +QUnit.asyncTest('Local subject -- ACLs are rejected', function(assert) { + assert.expect(4); + + var subject_read = { + 'name': randstring(), + 'readable_only_by': 'foo', + 'local': true + }; + var subject_write = { + 'name': randstring(), + 'writable_only_by': 'foo', + 'local': true + }; + var message = randstring(); + + var cosmo = new Cosmopolite(null, randstring()); + + cosmo.subscribe(subject_read, -1).then(function() { + assert.ok(false, 'subscribe of readable_only_by/local resolved'); + }).catch(function() { + assert.ok(true, 'subscribe of readable_only_by/local failed'); + }); + + cosmo.subscribe(subject_write, -1).then(function() { + assert.ok(false, 'subscribe of writable_only_by/local resolved'); + }).catch(function() { + assert.ok(true, 'subscribe of writable_only_by/local failed'); + }); + + cosmo.sendMessage(subject_read, message).then(function() { + assert.ok(false, 'sendMessage of readable_only_by/local resolved'); + }).catch(function() { + assert.ok(true, 'sendMessage of readable_only_by/local failed'); + }); + + cosmo.sendMessage(subject_write, message).then(function() { + assert.ok(false, 'sendMessage of writable_only_by/local resolved'); + }).catch(function() { + assert.ok(true, 'sendMessage of writable_only_by/local failed'); + cosmo.shutdown(); + QUnit.start(); + }); +}); + +QUnit.asyncTest('Local subject -- pin/unpin', function(assert) { + assert.expect(7); + + var subject = { + 'name': randstring(), + 'local': true + }; + var message = randstring(); + + var cosmo = new Cosmopolite(null, randstring()); + + cosmo.addEventListener('pin', function(e) { + assert.equal(subject['name'], e.detail['subject']['name'], + 'onPin: subject matches'); + assert.ok(e.detail['subject']['local'], 'onPin: local set'); + assert.equal(message, e.detail['message'], + 'onPin: message matches'); + assert.equal(cosmo.getPins(subject).length, 1); + pin.then(function(id) { + cosmo.unpin(id); + }); + }); + + cosmo.addEventListener('unpin', function(e) { + assert.equal(subject['name'], e.detail['subject']['name'], + 'onUnpin: subject matches'); + assert.ok(e.detail['subject']['local'], 'onUnpin: local set'); + assert.equal(message, e.detail['message'], + 'onUnpin: message matches'); + cosmo.shutdown(); + QUnit.start(); + }); + + cosmo.subscribe(subject); + var pin = cosmo.pin(subject, message); +}); + + module('dev_appserver only');