diff --git a/lib/models.py b/lib/models.py index 4bfc593..15f1141 100644 --- a/lib/models.py +++ b/lib/models.py @@ -146,22 +146,26 @@ class Subject(db.Model): the datastore work (and any retries) before we start transmitting to channels. """ + # We have to reload the Subject inside the transaction to get transactional + # ID generation + subject = Subject.get(self.key()) + # sender_message_id should be universal across all subjects, but we check # it within just this subject to allow in-transaction verification. messages = ( Message.all() - .ancestor(self) + .ancestor(subject) .filter('sender_message_id =', sender_message_id) .fetch(1)) if messages: raise DuplicateMessage(sender_message_id) - message_id = self.next_message_id - self.next_message_id += 1 - self.put() + message_id = subject.next_message_id + subject.next_message_id += 1 + subject.put() obj = Message( - parent=self, + parent=subject, message=message, sender=sender, sender_message_id=sender_message_id, @@ -172,7 +176,7 @@ class Subject(db.Model): return ( obj, [Subscription.client.get_value_for_datastore(subscription) - for subscription in Subscription.all().ancestor(self)]) + for subscription in Subscription.all().ancestor(subject)]) def SendMessage(self, message, sender, sender_message_id, key=None): obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key) diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 87f4bf7..6b810aa 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -627,7 +627,7 @@ Cosmopolite.prototype.onMessage_ = function(e) { // likely be at the end. var insertAfter; for (var insertAfter = subscription.messages.length - 1; - insertAfter >= 0; insertAfter++) { + insertAfter >= 0; insertAfter--) { var message = subscription.messages[insertAfter]; if (message['id'] < e['message']['id']) { break; diff --git a/static/test.js b/static/test.js index 943b029..1a97d63 100644 --- a/static/test.js +++ b/static/test.js @@ -307,9 +307,10 @@ asyncTest('resubscribe', function() { cosmo.subscribe(subject).then(function() { equal(cosmo.getMessages(subject).length, 0, 'zero messages'); cosmo.subscribe(subject, -1).then(function() { - equal(cosmo.getMessages(subject).length, 1, 'one message'); - equal(cosmo.getMessages(subject)[0]['subject']['name'], subject, 'subject matches'); - equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches'); + var messages = cosmo.getMessages(subject); + equal(messages.length, 1, 'one message'); + equal(messages[0]['subject']['name'], subject, 'subject matches'); + equal(messages[0]['message'], message, 'message matches'); cosmo.shutdown(); start(); }); @@ -317,6 +318,39 @@ asyncTest('resubscribe', function() { }); }); +asyncTest('Message ordering', function() { + expect(5); + + var subject = randstring(); + var messages = [ 'A', 'B', 'C', 'D', 'E', 'F' ]; + var keys = [ null, 'X', 'X', null, null, null ]; + + var cosmo = new Cosmopolite({}, null, randstring()); + + var sendNextMessage = function() { + if (messages.length) { + cosmo.sendMessage(subject, messages.shift(), keys.shift()).then(sendNextMessage); + } else { + cosmo.subscribe(subject, 1).then(function() { + cosmo.subscribe(subject, 2).then(function() { + cosmo.subscribe(subject, 0, null, ['X']).then(function() { + var fetched = cosmo.getMessages(subject); + equal(fetched.length, 3, 'three messages'); + equal(fetched[0]['message'], 'C', 'message 0: C matches'); + equal(fetched[1]['message'], 'E', 'message 1: E matches'); + equal(fetched[2]['message'], 'F', 'message 2: F matches'); + equal(cosmo.getKeyMessage(subject, 'X')['message'], 'C', 'key X matches'); + cosmo.shutdown(); + start(); + }); + }); + }); + } + }; + + sendNextMessage(); +}); + module('dev_appserver only');