Add a message ordering test. Fix a nasty ID generation bug.
This commit is contained in:
@@ -146,22 +146,26 @@ class Subject(db.Model):
|
|||||||
the datastore work (and any retries) before we start transmitting to
|
the datastore work (and any retries) before we start transmitting to
|
||||||
channels.
|
channels.
|
||||||
"""
|
"""
|
||||||
|
# We have to reload the Subject inside the transaction to get transactional
|
||||||
|
# ID generation
|
||||||
|
subject = Subject.get(self.key())
|
||||||
|
|
||||||
# sender_message_id should be universal across all subjects, but we check
|
# sender_message_id should be universal across all subjects, but we check
|
||||||
# it within just this subject to allow in-transaction verification.
|
# it within just this subject to allow in-transaction verification.
|
||||||
messages = (
|
messages = (
|
||||||
Message.all()
|
Message.all()
|
||||||
.ancestor(self)
|
.ancestor(subject)
|
||||||
.filter('sender_message_id =', sender_message_id)
|
.filter('sender_message_id =', sender_message_id)
|
||||||
.fetch(1))
|
.fetch(1))
|
||||||
if messages:
|
if messages:
|
||||||
raise DuplicateMessage(sender_message_id)
|
raise DuplicateMessage(sender_message_id)
|
||||||
|
|
||||||
message_id = self.next_message_id
|
message_id = subject.next_message_id
|
||||||
self.next_message_id += 1
|
subject.next_message_id += 1
|
||||||
self.put()
|
subject.put()
|
||||||
|
|
||||||
obj = Message(
|
obj = Message(
|
||||||
parent=self,
|
parent=subject,
|
||||||
message=message,
|
message=message,
|
||||||
sender=sender,
|
sender=sender,
|
||||||
sender_message_id=sender_message_id,
|
sender_message_id=sender_message_id,
|
||||||
@@ -172,7 +176,7 @@ class Subject(db.Model):
|
|||||||
return (
|
return (
|
||||||
obj,
|
obj,
|
||||||
[Subscription.client.get_value_for_datastore(subscription)
|
[Subscription.client.get_value_for_datastore(subscription)
|
||||||
for subscription in Subscription.all().ancestor(self)])
|
for subscription in Subscription.all().ancestor(subject)])
|
||||||
|
|
||||||
def SendMessage(self, message, sender, sender_message_id, key=None):
|
def SendMessage(self, message, sender, sender_message_id, key=None):
|
||||||
obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key)
|
obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key)
|
||||||
|
|||||||
@@ -627,7 +627,7 @@ Cosmopolite.prototype.onMessage_ = function(e) {
|
|||||||
// likely be at the end.
|
// likely be at the end.
|
||||||
var insertAfter;
|
var insertAfter;
|
||||||
for (var insertAfter = subscription.messages.length - 1;
|
for (var insertAfter = subscription.messages.length - 1;
|
||||||
insertAfter >= 0; insertAfter++) {
|
insertAfter >= 0; insertAfter--) {
|
||||||
var message = subscription.messages[insertAfter];
|
var message = subscription.messages[insertAfter];
|
||||||
if (message['id'] < e['message']['id']) {
|
if (message['id'] < e['message']['id']) {
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -307,9 +307,10 @@ asyncTest('resubscribe', function() {
|
|||||||
cosmo.subscribe(subject).then(function() {
|
cosmo.subscribe(subject).then(function() {
|
||||||
equal(cosmo.getMessages(subject).length, 0, 'zero messages');
|
equal(cosmo.getMessages(subject).length, 0, 'zero messages');
|
||||||
cosmo.subscribe(subject, -1).then(function() {
|
cosmo.subscribe(subject, -1).then(function() {
|
||||||
equal(cosmo.getMessages(subject).length, 1, 'one message');
|
var messages = cosmo.getMessages(subject);
|
||||||
equal(cosmo.getMessages(subject)[0]['subject']['name'], subject, 'subject matches');
|
equal(messages.length, 1, 'one message');
|
||||||
equal(cosmo.getMessages(subject)[0]['message'], message, 'message matches');
|
equal(messages[0]['subject']['name'], subject, 'subject matches');
|
||||||
|
equal(messages[0]['message'], message, 'message matches');
|
||||||
cosmo.shutdown();
|
cosmo.shutdown();
|
||||||
start();
|
start();
|
||||||
});
|
});
|
||||||
@@ -317,6 +318,39 @@ asyncTest('resubscribe', function() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
asyncTest('Message ordering', function() {
|
||||||
|
expect(5);
|
||||||
|
|
||||||
|
var subject = randstring();
|
||||||
|
var messages = [ 'A', 'B', 'C', 'D', 'E', 'F' ];
|
||||||
|
var keys = [ null, 'X', 'X', null, null, null ];
|
||||||
|
|
||||||
|
var cosmo = new Cosmopolite({}, null, randstring());
|
||||||
|
|
||||||
|
var sendNextMessage = function() {
|
||||||
|
if (messages.length) {
|
||||||
|
cosmo.sendMessage(subject, messages.shift(), keys.shift()).then(sendNextMessage);
|
||||||
|
} else {
|
||||||
|
cosmo.subscribe(subject, 1).then(function() {
|
||||||
|
cosmo.subscribe(subject, 2).then(function() {
|
||||||
|
cosmo.subscribe(subject, 0, null, ['X']).then(function() {
|
||||||
|
var fetched = cosmo.getMessages(subject);
|
||||||
|
equal(fetched.length, 3, 'three messages');
|
||||||
|
equal(fetched[0]['message'], 'C', 'message 0: C matches');
|
||||||
|
equal(fetched[1]['message'], 'E', 'message 1: E matches');
|
||||||
|
equal(fetched[2]['message'], 'F', 'message 2: F matches');
|
||||||
|
equal(cosmo.getKeyMessage(subject, 'X')['message'], 'C', 'key X matches');
|
||||||
|
cosmo.shutdown();
|
||||||
|
start();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
sendNextMessage();
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
module('dev_appserver only');
|
module('dev_appserver only');
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user