Pin test, fixes, cleanup

This commit is contained in:
Ian Gulliver
2014-05-27 15:33:51 -07:00
parent d96c31abf7
commit 1dd1490518
3 changed files with 47 additions and 18 deletions

View File

@@ -217,18 +217,14 @@ class Subject(db.Model):
for subscription in subscriptions: for subscription in subscriptions:
subscription.SendMessage(event) subscription.SendMessage(event)
@db.transactional(xg=True) @db.transactional()
def PutPin(self, message, sender, sender_message_id, instance): def PutPin(self, message, sender, sender_message_id, instance):
"""Internal helper for Pin().""" """Internal helper for Pin()."""
# Reload the subject and instance to establish a barrier
subject = Subject.get(self.key())
instance = Instance.get(instance.key())
# sender_message_id should be universal across all subjects, but we check # sender_message_id should be universal across all subjects, but we check
# it within just this subject to allow in-transaction verification. # it within just this subject to allow in-transaction verification.
pins = ( pins = (
Pin.all() Pin.all()
.ancestor(subject) .ancestor(self)
.filter('sender_message_id =', sender_message_id) .filter('sender_message_id =', sender_message_id)
.filter('instance =', instance) .filter('instance =', instance)
.fetch(1)) .fetch(1))
@@ -236,14 +232,14 @@ class Subject(db.Model):
raise DuplicateMessage(sender_message_id) raise DuplicateMessage(sender_message_id)
obj = Pin( obj = Pin(
parent=subject, parent=self,
message=message, message=message,
sender=sender, sender=sender,
sender_message_id=sender_message_id, sender_message_id=sender_message_id,
instance=instance) instance=instance)
obj.put() obj.put()
return (obj, list(Subscription.all().ancestor(subject))) return (obj, list(Subscription.all().ancestor(self)))
def Pin(self, message, sender, sender_message_id, instance): def Pin(self, message, sender, sender_message_id, instance):
self.VerifyWritable(sender) self.VerifyWritable(sender)
@@ -253,15 +249,11 @@ class Subject(db.Model):
for subscription in subscriptions: for subscription in subscriptions:
subscription.SendMessage(event) subscription.SendMessage(event)
@db.transactional(xg=True) @db.transactional()
def RemovePin(self, sender, sender_message_id, instance_key): def RemovePin(self, sender, sender_message_id, instance_key):
# Reload the subject and instance to establish a barrier
subject = Subject.get(self.key())
Instance.get(instance_key)
pins = ( pins = (
Pin.all() Pin.all()
.ancestor(subject) .ancestor(self)
.filter('sender =', sender) .filter('sender =', sender)
.filter('sender_message_id =', sender_message_id) .filter('sender_message_id =', sender_message_id)
.filter('instance =', instance_key)) .filter('instance =', instance_key))
@@ -271,7 +263,7 @@ class Subject(db.Model):
events.append(pin.ToEvent(event_type='unpin')) events.append(pin.ToEvent(event_type='unpin'))
pin.delete() pin.delete()
return (events, list(Subscription.all().ancestor(subject))) return (events, list(Subscription.all().ancestor(self)))
def Unpin(self, sender, sender_message_id, instance_key): def Unpin(self, sender, sender_message_id, instance_key):
self.VerifyWritable(sender) self.VerifyWritable(sender)

View File

@@ -389,9 +389,11 @@ Cosmopolite.prototype.pin = function(subject, message) {
'sender_message_id': id, 'sender_message_id': id,
}; };
this.pins_[id] = args;
this.sendRPC_('pin', args, resolve.bind(null, id)); this.sendRPC_('pin', args, function() {
this.pins_[id] = args;
resolve(id);
});
}.bind(this)); }.bind(this));
}; };
@@ -874,7 +876,11 @@ Cosmopolite.prototype.onSocketClose_ = function() {
// We treat a disconnection as if all pins disappeared // We treat a disconnection as if all pins disappeared
for (var subject in this.subscriptions_) { for (var subject in this.subscriptions_) {
var subscription = this.subscriptions_[subject]; var subscription = this.subscriptions_[subject];
subscription.pins.forEach(this.onUnpin_, this); subscription.pins.forEach(function(pin) {
// Stupid hack that saves complexity elsewhere
pin['message'] = JSON.stringify(pin['message']);
this.onUnpin_(pin);
}, this);
} }
this.createChannel_(); this.createChannel_();

View File

@@ -430,6 +430,37 @@ asyncTest('pin/unpin', function() {
var pin = cosmo.pin(subject, message); var pin = cosmo.pin(subject, message);
}); });
asyncTest('Repin', function() {
expect(8);
var subject = randstring();
var message = randstring();
var pins = 0;
var callbacks = {
'onPin': function(e) {
equal(subject, e['subject']['name'], 'onPin: subject matches');
equal(message, e['message'], 'onPin: message matches');
equal(cosmo.getPins(subject).length, 1);
if (++pins == 1) {
cosmo.socket_.close();
} else {
cosmo.shutdown();
start();
}
},
'onUnpin': function(e) {
equal(subject, e['subject']['name'], 'onUnpin: subject matches');
equal(message, e['message'], 'onUnpin: message matches');
},
}
var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.subscribe(subject);
var pin = cosmo.pin(subject, message);
});
module('dev_appserver only'); module('dev_appserver only');