diff --git a/api.py b/api.py index d419df0..159737a 100644 --- a/api.py +++ b/api.py @@ -51,7 +51,7 @@ def SendMessage(google_user, client, args): subject = args['subject'] message = args['message'] - models.Subject.FindOrCreate(subject).SendMessage(message) + models.Subject.FindOrCreate(subject).SendMessage(message, client.parent_key()) return {} @@ -96,6 +96,13 @@ def Subscribe(google_user, client, args): } +def Unsubscribe(google_user, client, args): + subject = models.Subject.FindOrCreate(args['subject']) + models.Subscription.Remove(subject, client) + + return {} + + class APIWrapper(webapp2.RequestHandler): _COMMANDS = { @@ -103,6 +110,7 @@ class APIWrapper(webapp2.RequestHandler): 'sendMessage': SendMessage, 'setValue': SetValue, 'subscribe': Subscribe, + 'unsubscribe': Unsubscribe, } @utils.chaos_monkey diff --git a/lib/models.py b/lib/models.py index 7b79619..dec2ace 100644 --- a/lib/models.py +++ b/lib/models.py @@ -142,8 +142,8 @@ class Subject(db.Model): return query.run(limit=num_messages) @db.transactional() - def SendMessage(self, message): - obj = Message(parent=self, message=message) + def SendMessage(self, message, sender): + obj = Message(parent=self, message=message, sender=sender) obj.put() event = obj.ToEvent() @@ -166,22 +166,34 @@ class Subscription(db.Model): .filter('client =', client) .fetch(1)) if not subscriptions: - logging.info('no subscriptions found') cls(parent=subject, client=client).put() if messages == 0: return [] return [m.ToEvent() for m in subject.RecentMessages(messages)] + @classmethod + @db.transactional() + def Remove(cls, subject, client): + subscriptions = ( + cls.all() + .ancestor(subject) + .filter('client =', client)) + for subscription in subscriptions: + subscription.delete() + class Message(db.Model): # parent=Subject created = db.DateTimeProperty(required=True, auto_now_add=True) message = db.TextProperty(required=True) + sender = db.ReferenceProperty(required=True, reference_class=Profile) def ToEvent(self): return { 'event_type': 'message', + 'id': self.key().id(), + 'sender': str(Message.sender.get_value_for_datastore(self)), 'subject': self.parent_key().name(), 'created': self.created, 'message': self.message, diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 6c4056c..748a81c 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -22,6 +22,7 @@ cosmopolite.Client = function(opt_callbacks, opt_urlPrefix, opt_namespace) { this.namespace_ = opt_namespace || 'cosmopolite'; this.stateCache_ = {}; + this.subscriptions_ = {}; var scriptUrls = [ 'https://ajax.googleapis.com/ajax/libs/jquery/2.1.0/jquery.min.js', @@ -36,6 +37,56 @@ cosmopolite.Client = function(opt_callbacks, opt_urlPrefix, opt_namespace) { }, this); }; +cosmopolite.Client.prototype.setValue = function(key, value, is_public) { + this.sendRPC_('setValue', { + 'key': key, + 'value': value, + 'public': is_public, + }); + // Provide immediate feedback without waiting for a round trip. + // We'll also get a response from the server, so this should be eventually + // consistent. + if ('onStateChange' in this.callbacks_) { + this.callbacks_['onStateChange'](key, value); + } +}; + +cosmopolite.Client.prototype.getValue = function(key) { + return this.stateCache_[key]; +}; + +cosmopolite.Client.prototype.subscribe = function(subject, messages) { + if (subject in this.subscriptions_) { + console.log('Not sending duplication subscription request for subject:', subject); + return; + } + this.subscriptions_[subject] = { + 'messages': [], + }; + this.sendRPC_('subscribe', { + 'subject': subject, + 'messages': messages, + }); +}; + +cosmopolite.Client.prototype.unsubscribe = function(subject) { + delete this.subscriptions_[subject]; + this.sendRPC_('unsubscribe', { + 'subject': subject, + }); +}; + +cosmopolite.Client.prototype.sendMessage = function(subject, message) { + this.sendRPC_('sendMessage', { + 'subject': subject, + 'message': message, + }); +}; + +cosmopolite.Client.prototype.getMessages = function(subject) { + return this.subscriptions_[subject].messages; +}; + cosmopolite.Client.prototype.onLoad_ = function() { if (--this.numScriptsToLoad_ > 0) { return; @@ -151,46 +202,24 @@ cosmopolite.Client.prototype.sendRPCs_ = function(commands, delay) { }); }; -cosmopolite.Client.prototype.setValue = function(key, value, is_public) { - this.sendRPC_('setValue', { - 'key': key, - 'value': value, - 'public': is_public, - }) - // Provide immediate feedback without waiting for a round trip. - // We'll also get a response from the server, so this should be eventually - // consistent. - if ('onStateChange' in this.callbacks_) { - this.callbacks_['onStateChange'](key, value); - } -}; - -cosmopolite.Client.prototype.getValue = function(key) { - return this.stateCache_[key]; -}; - cosmopolite.Client.prototype.createChannel_ = function() { - this.sendRPCs_([ - { - 'command': 'createChannel', - 'onSuccess': this.onCreateChannel_, - }, - // TODO(flamingcow): Remove debugging below. - { - 'command': 'subscribe', - 'arguments': { - 'subject': 'books', - 'messages': 5, - } - }, - { - 'command': 'sendMessage', - 'arguments': { - 'subject': 'books', - 'message': 'foobar', - } - }, - ]); + var rpcs = [ + { + 'command': 'createChannel', + 'onSuccess': this.onCreateChannel_, + }, + ]; + // TODO(flamingcow): Need to restart from the latest message. + for (var subject in this.subscriptions_) { + rpcs.push({ + 'command': 'subscribe', + 'arguments': { + 'subject': subject, + 'messages': 0, + } + }); + } + this.sendRPCs_(rpcs); }; cosmopolite.Client.prototype.onCreateChannel_ = function(data) { @@ -244,7 +273,7 @@ cosmopolite.Client.prototype.onServerEvent_ = function(e) { case 'login': if ('onLogin' in this.callbacks_) { this.callbacks_['onLogin']( - e.google_user, + e['google_user'], this.urlPrefix_ + '/auth/logout'); } break; @@ -254,6 +283,24 @@ cosmopolite.Client.prototype.onServerEvent_ = function(e) { this.urlPrefix_ + '/auth/login'); } break; + case 'message': + if ('onMessage' in this.callbacks_) { + if (!(e['subject'] in this.subscriptions_)) { + console.log('Message from unrecognized subject:', e); + break; + } + var subscription = this.subscriptions_[e['subject']]; + var duplicate = subscription.messages.some(function(message) { + return message['id'] == e.id; + }); + if (duplicate) { + console.log('Duplicate message:', e); + break; + } + subscription.messages.push(e); + this.callbacks_['onMessage'](e); + } + break; default: // Client out of date? Force refresh? console.log('Unknown channel event:', e); diff --git a/static/debug.html b/static/debug.html index 1a7234f..36d3b72 100644 --- a/static/debug.html +++ b/static/debug.html @@ -28,12 +28,33 @@ a { +