diff --git a/api.py b/api.py index 6054ac0..cd5b10b 100644 --- a/api.py +++ b/api.py @@ -61,11 +61,18 @@ def SendMessage(google_user, client, args): message, client.parent_key(), sender_message_id, key) except models.DuplicateMessage: logging.exception('Duplicate message: %s', sender_message_id) - # We still return success since we assume that the message was already - # delivered. If it's really a client ID generation bug, we just swallowed - # a message. + return { + 'result': 'duplicate_message', + } + except models.AccessDenied: + logging.exception('SendMessage access denied') + return { + 'result': 'access_denied', + } - return {} + return { + 'result': 'ok', + } def Subscribe(google_user, client, args): @@ -74,13 +81,22 @@ def Subscribe(google_user, client, args): last_id = args.get('last_id', None) keys = args.get('keys', []) - ret = { - 'events': models.Subscription.FindOrCreate(subject, client, messages, last_id), - } + try: + ret = { + 'result': 'ok', + 'events': models.Subscription.FindOrCreate(subject, client, messages, last_id), + } + except models.AccessDenied: + logging.exception('Subscribe access denied') + return { + 'result': 'access_denied', + } + for key in keys: message = subject.GetKey(key) if message: ret['events'].append(message.ToEvent()) + return ret diff --git a/lib/models.py b/lib/models.py index 15f1141..9d988fc 100644 --- a/lib/models.py +++ b/lib/models.py @@ -34,6 +34,10 @@ class DuplicateMessage(Exception): pass +class AccessDenied(Exception): + pass + + class Profile(db.Model): google_user = db.UserProperty() @@ -99,11 +103,29 @@ class Subject(db.Model): next_message_id = db.IntegerProperty(required=True, default=1) @classmethod - def FindOrCreate(cls, name): - subjects = cls.all().filter('name =', name).fetch(1) + def FindOrCreate(cls, subject): + if 'readable_only_by' in subject: + readable_only_by = Profile.get(subject['readable_only_by']) + else: + readable_only_by = None + + if 'writable_only_by' in subject: + writable_only_by = Profile.get(subject['writable_only_by']) + else: + writable_only_by = None + + subjects = ( + cls.all() + .filter('name =', subject['name']) + .filter('readable_only_by =', readable_only_by) + .filter('writable_only_by =', writable_only_by) + .fetch(1)) if subjects: return subjects[0] - subject = cls(name=name) + subject = cls( + name=subject['name'], + readable_only_by=readable_only_by, + writable_only_by=writable_only_by) subject.put() return subject @@ -179,11 +201,27 @@ class Subject(db.Model): for subscription in Subscription.all().ancestor(subject)]) def SendMessage(self, message, sender, sender_message_id, key=None): + writable_only_by = Subject.writable_only_by.get_value_for_datastore(self) + if (writable_only_by and + writable_only_by != sender): + raise AccessDenied obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key) event = obj.ToEvent() for subscription in subscriptions: Client.SendByKey(subscription, event) + def ToDict(self): + ret = { + 'name': self.name, + } + readable_only_by = Subject.readable_only_by.get_value_for_datastore(self) + if readable_only_by: + ret['readable_only_by'] = readable_only_by + writable_only_by = Subject.writable_only_by.get_value_for_datastore(self) + if writable_only_by: + ret['writable_only_by'] = writable_only_by + return ret + class Subscription(db.Model): # parent=Subject @@ -193,6 +231,12 @@ class Subscription(db.Model): @classmethod @db.transactional() def FindOrCreate(cls, subject, client, messages=0, last_id=None): + readable_only_by = ( + Subject.readable_only_by.get_value_for_datastore(subject)) + if (readable_only_by and + readable_only_by != client.parent_key()): + raise AccessDenied + subscriptions = ( cls.all(keys_only=True) .ancestor(subject) @@ -235,9 +279,7 @@ class Message(db.Model): 'event_type': 'message', 'id': self.id_, 'sender': str(Message.sender.get_value_for_datastore(self)), - 'subject': { - 'name': self.parent().name, - }, + 'subject': self.parent().ToDict(), 'created': self.created, 'message': self.message, } diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 3df4533..3a2066e 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -56,7 +56,8 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) { // message's client_message_id, which is intentionally not exposed via // the sendMessage API this.sendRPC_( - 'sendMessage', message, this.onMessageSent_.bind(this, message, null)); + 'sendMessage', message, + this.onMessageSent_.bind(this, message, null, null)); }.bind(this)); } else { localStorage[this.messageQueueKey_] = JSON.stringify([]); @@ -126,15 +127,17 @@ Cosmopolite.prototype.shutdown = function() { * * Start receiving messages sent to this subject via the onMessage callback. * - * @param {!string} subject Subject name + * @param {!*} subject Subject name or object * @param {number=} messages Number of recent messages to request; 0 for none, -1 for all * @param {number=} last_id ID of last message received; fetch all messages since * @param {Array.=} keys Key names to ensure we receive at least 1 message defining */ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { return new Promise(function(resolve, reject) { - if (!(subject in this.subscriptions_)) { - this.subscriptions_[subject] = { + var canonicalSubject = this.canonicalSubject_(subject); + var subjectString = JSON.stringify(canonicalSubject); + if (!(subjectString in this.subscriptions_)) { + this.subscriptions_[subjectString] = { 'messages': [], 'keys': {}, 'state': this.SubscriptionState.PENDING, @@ -142,7 +145,7 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { } var args = { - 'subject': subject, + 'subject': canonicalSubject, }; if (messages) { args['messages'] = messages; @@ -154,13 +157,18 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { args['keys'] = keys; } - this.sendRPC_('subscribe', args, function() { + this.sendRPC_('subscribe', args, function(response) { // unsubscribe may have been called since we sent the RPC. That's racy // without waiting for the promise, but do our best - if (subject in this.subscriptions_) { - this.subscriptions_[subject].state = this.SubscriptionState.ACTIVE; + if (subjectString in this.subscriptions_) { + this.subscriptions_[subjectString].state = this.SubscriptionState.ACTIVE; + } + var result = response['result']; + if (result == 'ok') { + resolve(); + } else { + reject(); } - resolve(); }.bind(this)); }.bind(this)); }; @@ -175,9 +183,11 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { */ Cosmopolite.prototype.unsubscribe = function(subject) { return new Promise(function(resolve, reject) { - delete this.subscriptions_[subject]; + var canonicalSubject = this.canonicalSubject_(subject); + var subjectString = JSON.stringify(canonicalSubject); + delete this.subscriptions_[subjectString]; var args = { - 'subject': subject, + 'subject': canonicalSubject, } this.sendRPC_('unsubscribe', args, resolve); }.bind(this)); @@ -193,7 +203,7 @@ Cosmopolite.prototype.unsubscribe = function(subject) { Cosmopolite.prototype.sendMessage = function(subject, message, key) { return new Promise(function(resolve, reject) { var args = { - 'subject': subject, + 'subject': this.canonicalSubject_(subject), 'message': JSON.stringify(message), 'sender_message_id': this.uuid_(), }; @@ -207,7 +217,8 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) { localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue); this.sendRPC_( - 'sendMessage', args, this.onMessageSent_.bind(this, args, resolve)); + 'sendMessage', args, + this.onMessageSent_.bind(this, args, resolve, reject)); }.bind(this)); }; @@ -218,7 +229,9 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) { * @const */ Cosmopolite.prototype.getMessages = function(subject) { - return this.subscriptions_[subject].messages; + var canonicalSubject = this.canonicalSubject_(subject); + var subjectString = JSON.stringify(canonicalSubject); + return this.subscriptions_[subjectString].messages; }; /** @@ -229,7 +242,9 @@ Cosmopolite.prototype.getMessages = function(subject) { * @const */ Cosmopolite.prototype.getKeyMessage = function(subject, key) { - return this.subscriptions_[subject].keys[key]; + var canonicalSubject = this.canonicalSubject_(subject); + var subjectString = JSON.stringify(canonicalSubject); + return this.subscriptions_[subjectString].keys[key]; }; /** @@ -269,6 +284,31 @@ Cosmopolite.prototype.uuid_ = function() { }); }; +/** + * Canonicalize a subject name or object + * + * @param {!*} subject A simple or complex representation of a subject + * @return {Object} A canonicalized object for RPCs + */ +Cosmopolite.prototype.canonicalSubject_ = function(subject) { + if (typeof(subject) == 'number') { + subject = subject.toString(); + } + if (typeof(subject) == 'string') { + subject = { + 'name': subject, + } + } + if (subject['readable_only_by'] === null) { + delete subject['readable_only_by']; + }; + if (subject['writable_only_by'] === null) { + delete subject['writable_only_by']; + }; + return subject; +}; + + /** * Callback when a script loads. */ @@ -332,16 +372,26 @@ Cosmopolite.prototype.registerMessageHandlers_ = function() { * * @param {Object} message Message details. * @param {function()=} resolve Promise resolution callback. + * @param {function()=} reject Promise rejection callback. + * @param {Object=} response Server RPC response. */ -Cosmopolite.prototype.onMessageSent_ = function(message, resolve) { +Cosmopolite.prototype.onMessageSent_ = function( + message, resolve, reject, response) { // No message left behind. var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]); messageQueue = messageQueue.filter(function(queuedMessage) { return message['sender_message_id'] != queuedMessage['sender_message_id']; }); localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue); - if (resolve) { - resolve(); + var result = response['result']; + if (result == 'ok' || result == 'duplicate_message') { + if (resolve) { + resolve(); + } + } else { + if (reject) { + reject(); + } } }; @@ -500,6 +550,7 @@ Cosmopolite.prototype.resubscribe_ = function() { var rpcs = []; for (var subject in this.subscriptions_) { var subscription = this.subscriptions_[subject]; + var canonicalSubject = JSON.parse(subject); if (subscription.state != this.SubscriptionState.ACTIVE) { continue; } @@ -510,7 +561,7 @@ Cosmopolite.prototype.resubscribe_ = function() { rpcs.push({ 'command': 'subscribe', 'arguments': { - 'subject': subject, + 'subject': canonicalSubject, 'last_id': last_id, } }); @@ -657,7 +708,8 @@ Cosmopolite.prototype.onLogout_ = function(e) { * @param {!Object} e Event object */ Cosmopolite.prototype.onMessage_ = function(e) { - var subscription = this.subscriptions_[e['subject']['name']]; + var subjectString = JSON.stringify(e['subject']); + var subscription = this.subscriptions_[subjectString]; if (!subscription) { console.log( this.loggingPrefix_(), diff --git a/static/test.js b/static/test.js index f93f874..1c062b0 100644 --- a/static/test.js +++ b/static/test.js @@ -374,6 +374,83 @@ asyncTest('Reconnect channel', function() { }); }); +asyncTest('subscribe ACL', function() { + expect(2); + + var subject = randstring(); + + logout(function() { + var tempCallbacks = { + 'onLogout': function() { + var tempProfile = tempCosmo.profile(); + tempCosmo.shutdown(); + + var callbacks = { + 'onLogout': function() { + cosmo.subscribe({ + 'name': subject, + 'readable_only_by': cosmo.profile(), + }).then(function() { + ok(true, 'correct ACL succeeds'); + + cosmo.subscribe({ + 'name': subject, + 'readable_only_by': tempProfile, + }).then(null, function() { + ok(true, 'bad ACL fails'); + cosmo.shutdown(); + start(); + }); + + }); + }, + }; + var cosmo = new Cosmopolite(callbacks, null, randstring()); + }, + }; + var tempCosmo = new Cosmopolite(tempCallbacks, null, randstring()); + }); +}); + +asyncTest('sendMessage ACL', function() { + expect(2); + + var subject = randstring(); + var message = randstring(); + + logout(function() { + var tempCallbacks = { + 'onLogout': function() { + var tempProfile = tempCosmo.profile(); + tempCosmo.shutdown(); + + var callbacks = { + 'onLogout': function() { + cosmo.sendMessage({ + 'name': subject, + 'writable_only_by': cosmo.profile(), + }, message).then(function() { + ok(true, 'correct ACL succeeds'); + + cosmo.sendMessage({ + 'name': subject, + 'writable_only_by': tempProfile, + }, message).then(null, function() { + ok(true, 'bad ACL fails'); + cosmo.shutdown(); + start(); + }); + + }); + }, + }; + var cosmo = new Cosmopolite(callbacks, null, randstring()); + }, + }; + var tempCosmo = new Cosmopolite(tempCallbacks, null, randstring()); + }); +}); + module('dev_appserver only');