From e6885042f551e95f7ccf79cd846e114943a35a34 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 6 May 2014 22:46:07 -0700 Subject: [PATCH] Add pubsub server API, without client API or UI yet. --- api.py | 54 +++++++++++++++++++++----------- channel.py | 6 +++- lib/models.py | 71 ++++++++++++++++++++++++++++++++++++++++--- static/cosmopolite.js | 36 ++++++++++++++++------ 4 files changed, 135 insertions(+), 32 deletions(-) diff --git a/api.py b/api.py index 202d8db..c901866 100644 --- a/api.py +++ b/api.py @@ -26,6 +26,37 @@ from cosmopolite.lib import utils import config +def CreateChannel(google_user, client, args): + token = channel.create_channel( + client_id=str(client.key()), + duration_minutes=config.CHANNEL_DURATION_SECONDS / 60) + messages = [x.ToMessage() + for x in client.parent().GetStateEntries()] + if google_user: + messages.append({ + 'message_type': 'login', + 'google_user': google_user.email(), + }) + else: + messages.append({ + 'message_type': 'logout', + }) + + return { + 'token': token, + 'messages': messages, + } + + +def SendMessage(google_user, client, args): + subject = args['subject'] + message = args['message'] + + models.Subject.FindOrCreate(subject).SendMessage(message) + + return {} + + @db.transactional() def SetValue(google_user, client, args): entry_key = args['key'] @@ -57,25 +88,12 @@ def SetValue(google_user, client, args): return {} -def CreateChannel(google_user, client, args): - token = channel.create_channel( - client_id=str(client.key()), - duration_minutes=config.CHANNEL_DURATION_SECONDS / 60) - messages = [x.ToMessage() - for x in client.parent().GetStateEntries()] - if google_user: - messages.append({ - 'message_type': 'login', - 'google_user': google_user.email(), - }) - else: - messages.append({ - 'message_type': 'logout', - }) +def Subscribe(google_user, client, args): + subject = models.Subject.FindOrCreate(args['subject']) + messages = args.get('messages', 0) return { - 'token': token, - 'messages': messages, + 'messages': models.Subscription.FindOrCreate(subject, client, messages), } @@ -83,7 +101,9 @@ class APIWrapper(webapp2.RequestHandler): _COMMANDS = { 'createChannel': CreateChannel, + 'sendMessage': SendMessage, 'setValue': SetValue, + 'subscribe': Subscribe, } @utils.chaos_monkey diff --git a/channel.py b/channel.py index 17f9807..e502bf8 100644 --- a/channel.py +++ b/channel.py @@ -29,14 +29,18 @@ class OnChannelConnect(webapp2.RequestHandler): client.channel_active = True client.put() + class OnChannelDisconnect(webapp2.RequestHandler): @utils.local_namespace - @db.transactional() def post(self): client = models.Client.get(self.request.get('from')) client.channel_active = False client.put() + subscriptions = models.Subscription.all().filter('client =', client) + for subscription in subscriptions: + subscription.delete() + app = webapp2.WSGIApplication([ ('/_ah/channel/connected/', OnChannelConnect), diff --git a/lib/models.py b/lib/models.py index b20b513..36db2e3 100644 --- a/lib/models.py +++ b/lib/models.py @@ -79,7 +79,7 @@ class Profile(db.Model): class Client(db.Model): - # Parent: Profile + # parent=Profile first_seen = db.DateTimeProperty(required=True, auto_now_add=True) channel_active = db.BooleanProperty(required=True, default=False) @@ -96,11 +96,15 @@ class Client(db.Model): return cls.FromProfile(profile) def SendMessage(self, msg): - channel.send_message(str(self.key()), json.dumps(msg, default=utils.EncodeJSON)) + self.SendByKey(self.key(), msg) + + @staticmethod + def SendByKey(key, msg): + channel.send_message(str(key), json.dumps(msg, default=utils.EncodeJSON)) class StateEntry(db.Model): - # Parent: Profile + # parent=Profile last_set = db.DateTimeProperty(required=True, auto_now=True) entry_key = db.StringProperty(required=True) @@ -118,10 +122,67 @@ class StateEntry(db.Model): class Subject(db.Model): - name = db.StringProperty(required=True) + # key_name=name + + @classmethod + def FindOrCreate(cls, name): + subject = cls.get_by_key_name(name) + if subject: + return subject + return cls(key_name=name).put() + + @db.transactional() + def RecentMessages(self, num_messages): + query = ( + Message.all() + .ancestor(self) + .order('-created')) + if num_messages <= 0: + num_messages = None + return query.run(limit=num_messages) + + @db.transactional() + def SendMessage(self, message): + obj = Message(parent=self, message=message) + obj.put() + + json_message = obj.ToMessage() + + for subscription in Subscription.all().ancestor(self): + Client.SendByKey(Subscription.client.get_value_for_datastore(subscription), json_message) class Subscription(db.Model): - # Parent: Subject + # parent=Subject client = db.ReferenceProperty(reference_class=Client) + + @classmethod + @db.transactional() + def FindOrCreate(cls, subject, client, messages): + subscriptions = ( + cls.all(keys_only=True) + .ancestor(subject) + .filter('client =', client) + .fetch(1)) + if not subscriptions: + logging.info('no subscriptions found') + cls(parent=subject, client=client).put() + if messages == 0: + return [] + return [m.ToMessage() for m in subject.RecentMessages(messages)] + + +class Message(db.Model): + # parent=Subject + + created = db.DateTimeProperty(required=True, auto_now_add=True) + message = db.TextProperty(required=True) + + def ToMessage(self): + return { + 'message_type': 'message', + 'subject': self.parent_key().name(), + 'created': self.created, + 'message': self.message, + } diff --git a/static/cosmopolite.js b/static/cosmopolite.js index cb9fcd5..4deb88a 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -58,7 +58,7 @@ cosmopolite.Client.prototype.onReceiveMessage_ = function(data) { this.socket.close(); break; default: - console.log('Unknown message type: ' + data); + console.log('Unknown event type:', data); break; } }; @@ -67,10 +67,10 @@ cosmopolite.Client.prototype.registerMessageHandlers_ = function() { this.$(window).on('message', this.$.proxy(function(e) { if (e.originalEvent.origin != window.location.origin) { console.log( - 'Received message from bad origin: ' + e.originalEvent.origin); + 'Received message from bad origin:', e.originalEvent.origin); return; } - console.log('Received message: ' + e.originalEvent.data); + console.log('Received message:', e.originalEvent.data); this.onReceiveMessage_(e.originalEvent.data); }, this)); }; @@ -125,9 +125,7 @@ cosmopolite.Client.prototype.sendRPCs_ = function(commands, delay) { return; } if (data['status'] != 'ok') { - console.log( - 'Server returned unknown status (' + data['status'] + ') for RPC ' - + command); + console.log('Server returned unknown status:', data['status']); // TODO(flamingcow): Refresh the page? Show an alert? return; } @@ -172,7 +170,27 @@ cosmopolite.Client.prototype.getValue = function(key) { }; cosmopolite.Client.prototype.createChannel_ = function() { - this.sendRPC_('createChannel', {}, this.onCreateChannel_); + this.sendRPCs_([ + { + 'command': 'createChannel', + 'onSuccess': this.onCreateChannel_, + }, + // TODO(flamingcow): Remove debugging below. + { + 'command': 'subscribe', + 'arguments': { + 'subject': 'books', + 'messages': 5, + } + }, + { + 'command': 'sendMessage', + 'arguments': { + 'subject': 'books', + 'message': 'foobar', + } + }, + ]); }; cosmopolite.Client.prototype.onCreateChannel_ = function(data) { @@ -238,12 +256,12 @@ cosmopolite.Client.prototype.onServerMessage_ = function(msg) { break; default: // Client out of date? Force refresh? - console.log('Unknown message type: ' + msg.message_type); + console.log('Unknown channel message:', msg); break; } }; cosmopolite.Client.prototype.onSocketError_ = function(msg) { - console.log('Socket error: ' + msg); + console.log('Socket error:', msg); this.socket.close(); };