Suppress duplicate messages from a client (e.g. when network failure causes retries for RPCs that actually went through).

This commit is contained in:
Ian Gulliver
2014-05-16 23:07:38 +03:00
parent 81d7db3678
commit 0247b78c6b
3 changed files with 53 additions and 7 deletions

10
api.py
View File

@@ -53,9 +53,17 @@ def CreateChannel(google_user, client, args):
def SendMessage(google_user, client, args):
subject = args['subject']
message = args['message']
sender_message_id = args['sender_message_id']
key = args.get('key', None)
models.Subject.FindOrCreate(subject).SendMessage(message, client.parent_key(), key)
try:
models.Subject.FindOrCreate(subject).SendMessage(
message, client.parent_key(), sender_message_id, key)
except models.DuplicateMessage:
logging.exception('Duplicate message: %s', sender_message_id)
# We still return success since we assume that the message was already
# delivered. If it's really a client ID generation bug, we just swallowed
# a message.
return {}

View File

@@ -30,6 +30,10 @@ import utils
# ↳ Subscription (⤴︎ Client)
class DuplicateMessage(Exception):
pass
class Profile(db.Model):
google_user = db.UserProperty()
@@ -119,14 +123,29 @@ class Subject(db.Model):
return None
@db.transactional()
def PutMessage(self, message, sender, key=None):
def PutMessage(self, message, sender, sender_message_id, key=None):
"""Internal helper for SendMessage().
Unless/until channel.send_message becomes transactional, we have to finish
the datastore work (and any retries) before we start transmitting to
channels.
"""
obj = Message(parent=self, message=message, sender=sender, key_=key)
# sender_message_id should be universal across all subjects, but we check
# it within just this subject to allow in-transaction verification.
messages = (
Message.all()
.ancestor(self)
.filter('sender_message_id =', sender_message_id)
.fetch(1))
if messages:
raise DuplicateMessage(sender_message_id)
obj = Message(
parent=self,
message=message,
sender=sender,
sender_message_id=sender_message_id,
key_=key)
obj.put()
return (
@@ -134,8 +153,8 @@ class Subject(db.Model):
[Subscription.client.get_value_for_datastore(subscription)
for subscription in Subscription.all().ancestor(self)])
def SendMessage(self, message, sender, key=None):
obj, subscriptions = self.PutMessage(message, sender, key)
def SendMessage(self, message, sender, sender_message_id, key=None):
obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key)
event = obj.ToEvent()
for subscription in subscriptions:
Client.SendByKey(subscription, event)
@@ -177,6 +196,7 @@ class Message(db.Model):
created = db.DateTimeProperty(required=True, auto_now_add=True)
message = db.TextProperty(required=True)
sender = db.ReferenceProperty(required=True, reference_class=Profile)
sender_message_id = db.StringProperty(required=True)
# key and key_name are reserved
key_ = db.StringProperty()

View File

@@ -133,8 +133,9 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
throw "cosmopolite: not ready";
}
var args = {
'subject': subject,
'message': JSON.stringify(message),
'subject': subject,
'message': JSON.stringify(message),
'sender_message_id': this.uuid_(),
};
if (key) {
args['key'] = key;
@@ -189,6 +190,23 @@ Cosmopolite.prototype.loggingPrefix_ = function() {
return 'cosmopolite (' + this.namespace_ + '):';
};
/**
* Generate a v4 UUID.
*
* @return {string} A universally-unique random value.
* @const
*/
Cosmopolite.prototype.uuid_ = function() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = (Math.random() * 16) | 0;
if (c == 'x') {
return r.toString(16);
} else {
return (r & (0x03 | 0x08)).toString(16);
}
});
};
/**
* Callback when a script loads.
*/