diff --git a/api.py b/api.py index 0f6e4a3..3b5a7e6 100644 --- a/api.py +++ b/api.py @@ -53,9 +53,17 @@ def CreateChannel(google_user, client, args): def SendMessage(google_user, client, args): subject = args['subject'] message = args['message'] + sender_message_id = args['sender_message_id'] key = args.get('key', None) - models.Subject.FindOrCreate(subject).SendMessage(message, client.parent_key(), key) + try: + models.Subject.FindOrCreate(subject).SendMessage( + message, client.parent_key(), sender_message_id, key) + except models.DuplicateMessage: + logging.exception('Duplicate message: %s', sender_message_id) + # We still return success since we assume that the message was already + # delivered. If it's really a client ID generation bug, we just swallowed + # a message. return {} diff --git a/lib/models.py b/lib/models.py index 3e02b7e..06c8fe6 100644 --- a/lib/models.py +++ b/lib/models.py @@ -30,6 +30,10 @@ import utils # ↳ Subscription (⤴︎ Client) +class DuplicateMessage(Exception): + pass + + class Profile(db.Model): google_user = db.UserProperty() @@ -119,14 +123,29 @@ class Subject(db.Model): return None @db.transactional() - def PutMessage(self, message, sender, key=None): + def PutMessage(self, message, sender, sender_message_id, key=None): """Internal helper for SendMessage(). Unless/until channel.send_message becomes transactional, we have to finish the datastore work (and any retries) before we start transmitting to channels. """ - obj = Message(parent=self, message=message, sender=sender, key_=key) + # sender_message_id should be universal across all subjects, but we check + # it within just this subject to allow in-transaction verification. + messages = ( + Message.all() + .ancestor(self) + .filter('sender_message_id =', sender_message_id) + .fetch(1)) + if messages: + raise DuplicateMessage(sender_message_id) + + obj = Message( + parent=self, + message=message, + sender=sender, + sender_message_id=sender_message_id, + key_=key) obj.put() return ( @@ -134,8 +153,8 @@ class Subject(db.Model): [Subscription.client.get_value_for_datastore(subscription) for subscription in Subscription.all().ancestor(self)]) - def SendMessage(self, message, sender, key=None): - obj, subscriptions = self.PutMessage(message, sender, key) + def SendMessage(self, message, sender, sender_message_id, key=None): + obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key) event = obj.ToEvent() for subscription in subscriptions: Client.SendByKey(subscription, event) @@ -177,6 +196,7 @@ class Message(db.Model): created = db.DateTimeProperty(required=True, auto_now_add=True) message = db.TextProperty(required=True) sender = db.ReferenceProperty(required=True, reference_class=Profile) + sender_message_id = db.StringProperty(required=True) # key and key_name are reserved key_ = db.StringProperty() diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 1598229..7e0b1ab 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -133,8 +133,9 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) { throw "cosmopolite: not ready"; } var args = { - 'subject': subject, - 'message': JSON.stringify(message), + 'subject': subject, + 'message': JSON.stringify(message), + 'sender_message_id': this.uuid_(), }; if (key) { args['key'] = key; @@ -189,6 +190,23 @@ Cosmopolite.prototype.loggingPrefix_ = function() { return 'cosmopolite (' + this.namespace_ + '):'; }; +/** + * Generate a v4 UUID. + * + * @return {string} A universally-unique random value. + * @const + */ +Cosmopolite.prototype.uuid_ = function() { + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { + var r = (Math.random() * 16) | 0; + if (c == 'x') { + return r.toString(16); + } else { + return (r & (0x03 | 0x08)).toString(16); + } + }); +}; + /** * Callback when a script loads. */