diff --git a/api.py b/api.py index 235747a..6b98055 100644 --- a/api.py +++ b/api.py @@ -52,6 +52,32 @@ def CreateChannel(google_user, client, instance_id, args): } +def Pin(google_user, client, instance_id, args): + instance = models.Instance.FromID(instance_id) + + subject = args['subject'] + message = args['message'] + sender_message_id = args['sender_message_id'] + + try: + models.Subject.FindOrCreate(subject).Pin( + message, client.parent_key(), sender_message_id, instance) + except models.DuplicateMessage: + logging.exception('Duplicate message: %s', sender_message_id) + return { + 'result': 'duplicate_message', + } + except models.AccessDenied: + logging.exception('Pin access denied') + return { + 'result': 'access_denied', + } + + return { + 'result': 'ok', + } + + def SendMessage(google_user, client, instance_id, args): subject = args['subject'] message = args['message'] @@ -110,6 +136,25 @@ def Subscribe(google_user, client, instance_id, args): return ret +def Unpin(google_user, client, instance_id, args): + instance = models.Instance.FromID(instance_id) + subject = args['subject'] + sender_message_id = args['sender_message_id'] + + try: + models.Subject.FindOrCreate(subject).Unpin( + client.parent_key(), sender_message_id, instance) + except models.AccessDenied: + logging.exception('Pin access denied') + return { + 'result': 'access_denied', + } + + return { + 'result': 'ok', + } + + def Unsubscribe(google_user, client, instance_id, args): instance = models.Instance.FromID(instance_id) subject = models.Subject.FindOrCreate(args['subject']) @@ -122,8 +167,10 @@ class APIWrapper(webapp2.RequestHandler): _COMMANDS = { 'createChannel': CreateChannel, + 'pin': Pin, 'sendMessage': SendMessage, 'subscribe': Subscribe, + 'unpin': Unpin, 'unsubscribe': Unsubscribe, } diff --git a/channel.py b/channel.py index 39db032..9644414 100644 --- a/channel.py +++ b/channel.py @@ -33,23 +33,21 @@ class OnChannelConnect(webapp2.RequestHandler): class OnChannelDisconnect(webapp2.RequestHandler): - @staticmethod - @db.transactional() - def DeleteInstance(instance_id): - instance = models.Instance.FromID(instance_id) - ret = instance.key() - instance.delete() - return ret - @utils.local_namespace def post(self): instance_id = self.request.get('from') - instance_key = self.DeleteInstance(instance_id) + instance = models.Instance.FromID(instance_id) - subscriptions = models.Subscription.all().filter('instance =', instance_key) + subscriptions = models.Subscription.all().filter('instance =', instance) for subscription in subscriptions: subscription.delete() + pins = models.Pin.all().filter('instance =', instance) + for pin in pins: + pin.Delete() + + instance.delete() + app = webapp2.WSGIApplication([ ('/_ah/channel/connected/', OnChannelConnect), diff --git a/lib/models.py b/lib/models.py index c83fd23..e108921 100644 --- a/lib/models.py +++ b/lib/models.py @@ -29,6 +29,7 @@ import utils # # Subject # ↳ Message +# ↳ Pin (⤴︎ Instance) # ↳ Subscription (⤴︎ Instance) @@ -172,6 +173,13 @@ class Subject(db.Model): return messages[0] return None + @db.transactional() + def GetPins(self): + query = ( + Pin.all() + .ancestor(self)) + return list(query) + @db.transactional() def PutMessage(self, message, sender, sender_message_id, key=None): """Internal helper for SendMessage(). @@ -209,16 +217,81 @@ class Subject(db.Model): return (obj, list(Subscription.all().ancestor(subject))) - def SendMessage(self, message, sender, sender_message_id, key=None): + def VerifyWritable(self, sender): writable_only_by = Subject.writable_only_by.get_value_for_datastore(self) if (writable_only_by and writable_only_by != sender): raise AccessDenied + + def SendMessage(self, message, sender, sender_message_id, key=None): + self.VerifyWritable(sender) obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key) event = obj.ToEvent() for subscription in subscriptions: subscription.SendMessage(event) + @db.transactional(xg=True) + def PutPin(self, message, sender, sender_message_id, instance): + """Internal helper for Pin().""" + # Reload the subject and instance to establish a barrier + subject = Subject.get(self.key()) + instance = Instance.get(instance.key()) + + # sender_message_id should be universal across all subjects, but we check + # it within just this subject to allow in-transaction verification. + pins = ( + Pin.all() + .ancestor(subject) + .filter('sender_message_id =', sender_message_id) + .fetch(1)) + if pins: + raise DuplicateMessage(sender_message_id) + + obj = Pin( + parent=subject, + message=message, + sender=sender, + sender_message_id=sender_message_id, + instance=instance) + obj.put() + + return (obj, list(Subscription.all().ancestor(subject))) + + def Pin(self, message, sender, sender_message_id, instance): + self.VerifyWritable(sender) + obj, subscriptions = self.PutPin( + message, sender, sender_message_id, instance) + event = obj.ToEvent() + for subscription in subscriptions: + subscription.SendMessage(event) + + @db.transactional(xg=True) + def RemovePin(self, sender, sender_message_id, instance): + # Reload the subject and instance to establish a barrier + subject = Subject.get(self.key()) + instance = Instance.get(instance.key()) + + pins = ( + Pin.all() + .ancestor(subject) + .filter('sender =', sender) + .filter('sender_message_id =', sender_message_id) + .filter('instance =', instance)) + + events = [] + for pin in pins: + events.append(pin.ToEvent(event_type='unpin')) + pin.delete() + + return (events, list(Subscription.all().ancestor(subject))) + + def Unpin(self, sender, sender_message_id, instance): + self.VerifyWritable(sender) + events, subscriptions = self.RemovePin(sender, sender_message_id, instance) + for event in events: + for subscription in subscriptions: + subscription.SendMessage(event) + def ToDict(self): ret = { 'name': self.name, @@ -253,7 +326,7 @@ class Subscription(db.Model): .fetch(1)) if not subscriptions: cls(parent=subject, instance=instance).put() - events = [] + events = [m.ToEvent() for m in subject.GetPins()] if messages: events.extend(m.ToEvent() for m in subject.GetRecentMessages(messages)) if last_id is not None: @@ -301,3 +374,26 @@ class Message(db.Model): if self.key_: ret['key'] = self.key_ return ret + + +class Pin(db.Model): + # parent=Subject + + created = db.DateTimeProperty(required=True, auto_now_add=True) + instance = db.ReferenceProperty(required=True, reference_class=Instance) + sender = db.ReferenceProperty(required=True, reference_class=Profile) + message = db.TextProperty(required=True) + sender_message_id = db.StringProperty(required=True) + + def ToEvent(self, event_type='pin'): + return { + 'event_type': event_type, + 'id': str(self.key()), + 'sender': str(Pin.sender.get_value_for_datastore(self)), + 'subject': self.parent().ToDict(), + 'created': self.created, + 'message': self.message, + } + + def Delete(self): + self.parent().Unpin(self.sender, self.sender_message_id, self.instance) diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 1db088b..2989e8b 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -43,6 +43,7 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) { this.rpcQueue_ = []; this.subscriptions_ = {}; + this.pins_ = {}; this.profilePromises_ = []; this.messageQueueKey_ = this.namespace_ + ':message_queue'; @@ -140,6 +141,7 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) { if (!(subjectString in this.subscriptions_)) { this.subscriptions_[subjectString] = { 'messages': [], + 'pins': [], 'keys': {}, 'state': this.SubscriptionState.PENDING, }; @@ -235,6 +237,18 @@ Cosmopolite.prototype.getMessages = function(subject) { return this.subscriptions_[subjectString].messages; }; +/** + * Fetch all current pins for a subject + * + * @param {!string} subject Subject name + * @const + */ +Cosmopolite.prototype.getPins = function(subject) { + var canonicalSubject = this.canonicalSubject_(subject); + var subjectString = JSON.stringify(canonicalSubject); + return this.subscriptions_[subjectString].pins; +}; + /** * Fetch the most recent message that defined a key * @@ -271,6 +285,50 @@ Cosmopolite.prototype.currentProfile = function() { return this.profile_; }; +/** + * Pin a message to the given subject, storing it and notifying all listeners. + * + * The message is deleted on unpin() or when we disconnect. + * + * The resulting Promise resolve callback is passed an ID that can later be + * passed to unpin(). + * + * @param {!*} subject Subject name or object + * @param {!*} message Message string or object + */ +Cosmopolite.prototype.pin = function(subject, message) { + return new Promise(function(resolve, reject) { + var id = this.uuid_(); + var args = { + 'subject': this.canonicalSubject_(subject), + 'message': JSON.stringify(message), + 'sender_message_id': id, + }; + + this.pins_[id] = args; + + this.sendRPC_('pin', args, resolve.bind(null, id)); + }.bind(this)); +}; + +/** + * Unpin a message from the given subject, storing it and notifying all listeners. + * + * @param {!string} id ID returned by pin()'s resolve callback + */ +Cosmopolite.prototype.unpin = function(id) { + return new Promise(function(resolve, reject) { + var args = { + 'subject': this.pins_[id]['subject'], + 'sender_message_id': id, + }; + + delete this.pins_[id]; + + this.sendRPC_('unpin', args, resolve); + }.bind(this)); +}; + /** * Generate a string identifying us to be included in log messages. * @@ -301,7 +359,7 @@ Cosmopolite.prototype.uuid_ = function() { /** * Canonicalize a subject name or object * - * @param {!*} subject A simple or complex representation of a subject + * @param {!Object|string|number} subject A simple or complex representation of a subject * @return {Object} A canonicalized object for RPCs */ Cosmopolite.prototype.canonicalSubject_ = function(subject) { @@ -543,7 +601,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { /** * Are we currently clear to put RPCs on the wire? * - * @return {Boolean} Yes or no? + * @return {boolean} Yes or no? */ Cosmopolite.prototype.maySendRPC_ = function() { if (!(this.namespace_ + ':client_id' in localStorage)) { @@ -591,6 +649,12 @@ Cosmopolite.prototype.resubscribe_ = function() { 'last_id': last_id, } }); + subscription.pins.forEach(function(pin) { + rpcs.push({ + 'command': 'pin', + 'arguments': pin, + }); + }, this); } this.sendRPCs_(rpcs); }; @@ -681,6 +745,12 @@ Cosmopolite.prototype.onSocketClose_ = function() { return; } + // We treat a disconnection as if all pins disappeared + for (var subject in this.subscriptions_) { + var subscription = this.subscriptions_[subject]; + subscription.pins.forEach(this.onUnpin_, this); + } + this.createChannel_(); }; @@ -773,6 +843,68 @@ Cosmopolite.prototype.onMessage_ = function(e) { } }; +/** + * Callback on receiving a 'pin' event from the server + * + * @param {!Object} e Event object + */ +Cosmopolite.prototype.onPin_ = function(e) { + var subjectString = JSON.stringify(e['subject']); + var subscription = this.subscriptions_[subjectString]; + if (!subscription) { + console.log( + this.loggingPrefix_(), + 'message from unrecognized subject:', e); + return; + } + var duplicate = subscription.pins.some(function(pin) { + return pin['id'] == e.id; + }); + if (duplicate) { + console.log(this.loggingPrefix_(), 'duplicate pin:', e); + return; + } + e['message'] = JSON.parse(e['message']); + + subscription.pins.push(e); + if ('onPin' in this.callbacks_) { + this.callbacks_['onPin'](e); + } +}; + +/** + * Callback on receiving an 'unpin' event from the server + * + * @param {!Object} e Event object + */ +Cosmopolite.prototype.onUnpin_ = function(e) { + var subjectString = JSON.stringify(e['subject']); + var subscription = this.subscriptions_[subjectString]; + if (!subscription) { + console.log( + this.loggingPrefix_(), + 'message from unrecognized subject:', e); + return; + } + var index; + for (index = 0; index < subscription.pins.length; index++) { + var pin = subscription.pins[index]; + if (pin['id'] == e['id']) { + break; + } + }; + if (index == subscription.pins.length) { + console.log(this.loggingPrefix_(), 'unknown pin:', e); + return; + } + e['message'] = JSON.parse(e['message']); + + subscription.pins.splice(index, 1)[0]; + if ('onUnpin' in this.callbacks_) { + this.callbacks_['onUnpin'](e); + } +}; + /** * Callback for Cosmopolite event (received via channel or pseudo-channel) * @@ -799,6 +931,12 @@ Cosmopolite.prototype.onServerEvent_ = function(e) { case 'message': this.onMessage_(e); break; + case 'pin': + this.onPin_(e); + break; + case 'unpin': + this.onUnpin_(e); + break; default: // Client out of date? Force refresh? console.log(this.loggingPrefix_(), 'unknown channel event:', e); diff --git a/static/debug.html b/static/debug.html index 465600d..1113d04 100644 --- a/static/debug.html +++ b/static/debug.html @@ -29,6 +29,7 @@ a {
+

@@ -37,6 +38,10 @@ a {
+
+ +
+