From 6bfa10e82ad60d130181d80ed406a16b470c3086 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Fri, 23 May 2014 10:31:52 -0700 Subject: [PATCH] Add a third level construct under Profile and Client, Instance. This allows more than one connected object with the same client credentials. --- api.py | 18 ++++++++----- channel.py | 16 +++++++----- lib/models.py | 61 +++++++++++++++++++++++++++++-------------- static/cosmopolite.js | 3 +++ static/test.js | 26 ++++++++++++++++++ 5 files changed, 91 insertions(+), 33 deletions(-) diff --git a/api.py b/api.py index cd5b10b..b7f8873 100644 --- a/api.py +++ b/api.py @@ -27,9 +27,9 @@ from cosmopolite.lib import utils import config -def CreateChannel(google_user, client, args): +def CreateChannel(google_user, client, instance_id, args): token = channel.create_channel( - client_id=str(client.key()), + client_id=str(client.key()) + '/' + instance_id, duration_minutes=config.CHANNEL_DURATION_SECONDS / 60) events = [] if google_user: @@ -50,7 +50,7 @@ def CreateChannel(google_user, client, args): } -def SendMessage(google_user, client, args): +def SendMessage(google_user, client, instance_id, args): subject = args['subject'] message = args['message'] sender_message_id = args['sender_message_id'] @@ -75,7 +75,8 @@ def SendMessage(google_user, client, args): } -def Subscribe(google_user, client, args): +def Subscribe(google_user, client, instance_id, args): + instance = models.Instance.FromID(instance_id, client) subject = models.Subject.FindOrCreate(args['subject']) messages = args.get('messages', 0) last_id = args.get('last_id', None) @@ -84,7 +85,8 @@ def Subscribe(google_user, client, args): try: ret = { 'result': 'ok', - 'events': models.Subscription.FindOrCreate(subject, client, messages, last_id), + 'events': models.Subscription.FindOrCreate( + subject, instance, messages, last_id), } except models.AccessDenied: logging.exception('Subscribe access denied') @@ -100,9 +102,10 @@ def Subscribe(google_user, client, args): return ret -def Unsubscribe(google_user, client, args): +def Unsubscribe(google_user, client, instance_id, args): + instance = models.Instance.FromID(instance_id, client) subject = models.Subject.FindOrCreate(args['subject']) - models.Subscription.Remove(subject, client) + models.Subscription.Remove(subject, instance) return {} @@ -134,6 +137,7 @@ class APIWrapper(webapp2.RequestHandler): result = callback( self.verified_google_user, self.client, + self.request_json['instance_id'], command.get('arguments', {})) # Magic: if result contains "events", haul them up a level so the # client can see them as a single stream. diff --git a/channel.py b/channel.py index e502bf8..4fb4259 100644 --- a/channel.py +++ b/channel.py @@ -25,22 +25,24 @@ class OnChannelConnect(webapp2.RequestHandler): @utils.local_namespace @db.transactional() def post(self): - client = models.Client.get(self.request.get('from')) - client.channel_active = True - client.put() + client_key, instance_id = self.request.get('from').split('/', 1) + client = models.Client.get(client_key) + instance = models.Instance.FindOrCreate(instance_id, client) class OnChannelDisconnect(webapp2.RequestHandler): @utils.local_namespace def post(self): - client = models.Client.get(self.request.get('from')) - client.channel_active = False - client.put() + client_key, instance_id = self.request.get('from').split('/', 1) + client = models.Client.get(client_key) + instance = models.Instance.FindOrCreate(instance_id, client) - subscriptions = models.Subscription.all().filter('client =', client) + subscriptions = models.Subscription.all().filter('instance =', instance) for subscription in subscriptions: subscription.delete() + models.Instance.get(instance).delete() + app = webapp2.WSGIApplication([ ('/_ah/channel/connected/', OnChannelConnect), diff --git a/lib/models.py b/lib/models.py index 973298a..6d88bd9 100644 --- a/lib/models.py +++ b/lib/models.py @@ -24,10 +24,11 @@ import utils # Profile # ↳ Client +# ↳ Instance # # Subject # ↳ Message -# ↳ Subscription (⤴︎ Client) +# ↳ Subscription (⤴︎ Instance) class DuplicateMessage(Exception): @@ -71,7 +72,6 @@ class Client(db.Model): # parent=Profile first_seen = db.DateTimeProperty(required=True, auto_now_add=True) - channel_active = db.BooleanProperty(required=True, default=False) @classmethod def FromProfile(cls, profile): @@ -84,12 +84,38 @@ class Client(db.Model): profile = Profile.FromGoogleUser(google_user) return cls.FromProfile(profile) - def SendMessage(self, msg): - self.SendByKey(self.key(), msg) - @staticmethod - def SendByKey(key, msg): - channel.send_message(str(key), json.dumps(msg, default=utils.EncodeJSON)) +class Instance(db.Model): + # parent=Client + + id_ = db.StringProperty(required=True) + + @classmethod + @db.transactional() + def FromID(cls, instance_id, client): + instances = ( + cls.all(keys_only=True) + .filter('id_ =', instance_id) + .ancestor(client) + .fetch(1)) + if instances: + return instances[0] + else: + return None + + @classmethod + @db.transactional() + def FindOrCreate(cls, instance_id, client): + instance = cls.FromID(instance_id, client) + if instance: + return instance + else: + return cls(parent=client, id_=instance_id).put() + + def SendMessage(self, msg): + channel.send_message( + str(self.parent_key()) + '/' + self.id_, + json.dumps(msg, default=utils.EncodeJSON)) class Subject(db.Model): @@ -195,10 +221,7 @@ class Subject(db.Model): key_=key) obj.put() - return ( - obj, - [Subscription.client.get_value_for_datastore(subscription) - for subscription in Subscription.all().ancestor(subject)]) + return (obj, list(Subscription.all().ancestor(subject))) def SendMessage(self, message, sender, sender_message_id, key=None): writable_only_by = Subject.writable_only_by.get_value_for_datastore(self) @@ -208,7 +231,7 @@ class Subject(db.Model): obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key) event = obj.ToEvent() for subscription in subscriptions: - Client.SendByKey(subscription, event) + subscription.instance.SendMessage(event) def ToDict(self): ret = { @@ -226,24 +249,24 @@ class Subject(db.Model): class Subscription(db.Model): # parent=Subject - client = db.ReferenceProperty(reference_class=Client) + instance = db.ReferenceProperty(reference_class=Instance, required=True) @classmethod @db.transactional() - def FindOrCreate(cls, subject, client, messages=0, last_id=None): + def FindOrCreate(cls, subject, instance, messages=0, last_id=None): readable_only_by = ( Subject.readable_only_by.get_value_for_datastore(subject)) if (readable_only_by and - readable_only_by != client.parent_key()): + readable_only_by != instance.parent().parent()): raise AccessDenied subscriptions = ( cls.all(keys_only=True) .ancestor(subject) - .filter('client =', client) + .filter('instance =', instance) .fetch(1)) if not subscriptions: - cls(parent=subject, client=client).put() + cls(parent=subject, instance=instance).put() events = [] if messages: events.extend(m.ToEvent() for m in subject.GetRecentMessages(messages)) @@ -253,11 +276,11 @@ class Subscription(db.Model): @classmethod @db.transactional() - def Remove(cls, subject, client): + def Remove(cls, subject, instance): subscriptions = ( cls.all() .ancestor(subject) - .filter('client =', client)) + .filter('instance =', instance)) for subscription in subscriptions: subscription.delete() diff --git a/static/cosmopolite.js b/static/cosmopolite.js index fcdf7d5..fd0c3f2 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -45,6 +45,8 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) { this.subscriptions_ = {}; this.profilePromises_ = []; + this.instanceId_ = this.uuid_(); + this.messageQueueKey_ = this.namespace_ + ':message_queue'; if (this.messageQueueKey_ in localStorage) { var messages = JSON.parse(localStorage[this.messageQueueKey_]); @@ -447,6 +449,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) { return; } var request = { + 'instance_id': this.instanceId_, 'commands': [], }; commands.forEach(function(command) { diff --git a/static/test.js b/static/test.js index dddfe7f..9d9eb67 100644 --- a/static/test.js +++ b/static/test.js @@ -441,6 +441,32 @@ asyncTest('sendMessage ACL', function() { }); }); +asyncTest('Two channels, one client', function() { + expect(2); + + var namespace = randstring(); + var subject = randstring(); + var message = randstring(); + + var callbacks = { + 'onMessage': function(msg) { + console.log('onMessage'); + equal(msg['subject']['name'], subject, 'subject matches'); + equal(msg['message'], message, 'message matches'); + cosmo1.shutdown(); + start(); + }, + }; + + var cosmo1 = new Cosmopolite(callbacks, null, namespace); + cosmo1.subscribe(subject).then(function() { + var cosmo2 = new Cosmopolite({}, null, namespace); + cosmo2.sendMessage(subject, message).then(function() { + cosmo2.shutdown(); + }); + }); +}); + module('dev_appserver only');