Add test for two messages with the same key. Fixes to clean shutdown and server-side message duplication.
This commit is contained in:
@@ -135,14 +135,26 @@ class Subject(db.Model):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@db.transactional()
|
@db.transactional()
|
||||||
def SendMessage(self, message, sender, key=None):
|
def PutMessage(self, message, sender, key=None):
|
||||||
|
"""Internal helper for SendMessage().
|
||||||
|
|
||||||
|
Unless/until channel.send_message becomes transactional, we have to finish
|
||||||
|
the datastore work (and any retries) before we start transmitting to
|
||||||
|
channels.
|
||||||
|
"""
|
||||||
obj = Message(parent=self, message=message, sender=sender, key_=key)
|
obj = Message(parent=self, message=message, sender=sender, key_=key)
|
||||||
obj.put()
|
obj.put()
|
||||||
|
|
||||||
event = obj.ToEvent()
|
return (
|
||||||
|
obj,
|
||||||
|
[Subscription.client.get_value_for_datastore(subscription)
|
||||||
|
for subscription in Subscription.all().ancestor(self)])
|
||||||
|
|
||||||
for subscription in Subscription.all().ancestor(self):
|
def SendMessage(self, message, sender, key=None):
|
||||||
Client.SendByKey(Subscription.client.get_value_for_datastore(subscription), event)
|
obj, subscriptions = self.PutMessage(message, sender, key)
|
||||||
|
event = obj.ToEvent()
|
||||||
|
for subscription in subscriptions:
|
||||||
|
Client.SendByKey(subscription, event)
|
||||||
|
|
||||||
|
|
||||||
class Subscription(db.Model):
|
class Subscription(db.Model):
|
||||||
|
|||||||
@@ -448,7 +448,9 @@ Cosmopolite.prototype.onServerEvent_ = function(e) {
|
|||||||
*/
|
*/
|
||||||
Cosmopolite.prototype.onSocketError_ = function(msg) {
|
Cosmopolite.prototype.onSocketError_ = function(msg) {
|
||||||
console.log('cosmopolite: socket error:', msg);
|
console.log('cosmopolite: socket error:', msg);
|
||||||
this.socket_.close();
|
if (this.socket_) {
|
||||||
|
this.socket_.close();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Exported values */
|
/* Exported values */
|
||||||
|
|||||||
@@ -77,6 +77,56 @@ asyncTest('Message round trip', function() {
|
|||||||
var cosmo2 = new Cosmopolite(callbacks2, null, randstring());
|
var cosmo2 = new Cosmopolite(callbacks2, null, randstring());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
asyncTest('Overwrite key', function() {
|
||||||
|
expect(8);
|
||||||
|
|
||||||
|
var subject = randstring();
|
||||||
|
var message1 = randstring();
|
||||||
|
var message2 = randstring();
|
||||||
|
var key = randstring();
|
||||||
|
|
||||||
|
var messages1 = 0;
|
||||||
|
|
||||||
|
var callbacks1 = {
|
||||||
|
'onReady': function() {
|
||||||
|
cosmo1.subscribe(subject, -1);
|
||||||
|
cosmo1.sendMessage(subject, message1, key);
|
||||||
|
},
|
||||||
|
'onMessage': function(e) {
|
||||||
|
messages1++;
|
||||||
|
if (messages1 == 1) {
|
||||||
|
cosmo1.sendMessage(subject, message2, key);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var messages2 = 0;
|
||||||
|
|
||||||
|
var callbacks2 = {
|
||||||
|
'onReady': function() {
|
||||||
|
cosmo2.subscribe(subject, -1);
|
||||||
|
},
|
||||||
|
'onMessage': function(e) {
|
||||||
|
messages2++;
|
||||||
|
equal(e['subject'], subject, 'subject matches');
|
||||||
|
equal(e['key'], key, 'key matches');
|
||||||
|
if (messages2 == 1) {
|
||||||
|
equal(e['message'], message1, 'message #1 matches');
|
||||||
|
equal(cosmo2.getKeyMessage(subject, key)['message'], message1, 'message #1 matches by key')
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
equal(e['message'], message2, 'message #2 matches');
|
||||||
|
equal(cosmo2.getKeyMessage(subject, key)['message'], message2, 'message #2 matches by key')
|
||||||
|
cosmo1.shutdown();
|
||||||
|
cosmo2.shutdown();
|
||||||
|
start();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var cosmo1 = new Cosmopolite(callbacks1, null, randstring());
|
||||||
|
var cosmo2 = new Cosmopolite(callbacks2, null, randstring());
|
||||||
|
});
|
||||||
|
|
||||||
</script>
|
</script>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|||||||
Reference in New Issue
Block a user