diff --git a/lib/models.py b/lib/models.py index 239f404..d968677 100644 --- a/lib/models.py +++ b/lib/models.py @@ -135,14 +135,26 @@ class Subject(db.Model): return None @db.transactional() - def SendMessage(self, message, sender, key=None): + def PutMessage(self, message, sender, key=None): + """Internal helper for SendMessage(). + + Unless/until channel.send_message becomes transactional, we have to finish + the datastore work (and any retries) before we start transmitting to + channels. + """ obj = Message(parent=self, message=message, sender=sender, key_=key) obj.put() - event = obj.ToEvent() + return ( + obj, + [Subscription.client.get_value_for_datastore(subscription) + for subscription in Subscription.all().ancestor(self)]) - for subscription in Subscription.all().ancestor(self): - Client.SendByKey(Subscription.client.get_value_for_datastore(subscription), event) + def SendMessage(self, message, sender, key=None): + obj, subscriptions = self.PutMessage(message, sender, key) + event = obj.ToEvent() + for subscription in subscriptions: + Client.SendByKey(subscription, event) class Subscription(db.Model): diff --git a/static/cosmopolite.js b/static/cosmopolite.js index 688cf95..99c41de 100644 --- a/static/cosmopolite.js +++ b/static/cosmopolite.js @@ -448,7 +448,9 @@ Cosmopolite.prototype.onServerEvent_ = function(e) { */ Cosmopolite.prototype.onSocketError_ = function(msg) { console.log('cosmopolite: socket error:', msg); - this.socket_.close(); + if (this.socket_) { + this.socket_.close(); + } }; /* Exported values */ diff --git a/static/test.html b/static/test.html index 22fe1d0..7242235 100644 --- a/static/test.html +++ b/static/test.html @@ -77,6 +77,56 @@ asyncTest('Message round trip', function() { var cosmo2 = new Cosmopolite(callbacks2, null, randstring()); }); +asyncTest('Overwrite key', function() { + expect(8); + + var subject = randstring(); + var message1 = randstring(); + var message2 = randstring(); + var key = randstring(); + + var messages1 = 0; + + var callbacks1 = { + 'onReady': function() { + cosmo1.subscribe(subject, -1); + cosmo1.sendMessage(subject, message1, key); + }, + 'onMessage': function(e) { + messages1++; + if (messages1 == 1) { + cosmo1.sendMessage(subject, message2, key); + } + }, + }; + + var messages2 = 0; + + var callbacks2 = { + 'onReady': function() { + cosmo2.subscribe(subject, -1); + }, + 'onMessage': function(e) { + messages2++; + equal(e['subject'], subject, 'subject matches'); + equal(e['key'], key, 'key matches'); + if (messages2 == 1) { + equal(e['message'], message1, 'message #1 matches'); + equal(cosmo2.getKeyMessage(subject, key)['message'], message1, 'message #1 matches by key') + return; + } + equal(e['message'], message2, 'message #2 matches'); + equal(cosmo2.getKeyMessage(subject, key)['message'], message2, 'message #2 matches by key') + cosmo1.shutdown(); + cosmo2.shutdown(); + start(); + }, + }; + + var cosmo1 = new Cosmopolite(callbacks1, null, randstring()); + var cosmo2 = new Cosmopolite(callbacks2, null, randstring()); +}); +