Event cleanup pipeline.

This commit is contained in:
Ian Gulliver
2015-06-14 21:48:36 -07:00
parent 4a6cb1f9bd
commit 1325ad570e
4 changed files with 36 additions and 15 deletions

3
api.py
View File

@@ -56,6 +56,9 @@ def Poll(google_user, client, client_address, instance_id, args):
instance = models.Instance.FindOrCreate(instance_id, polling=True, active=True)
assert instance.polling
# Update last_poll
instance.save()
events = []
if google_user:
events.append({

View File

@@ -29,7 +29,6 @@ class OnChannelConnect(webapp2.RequestHandler):
@db.transactional()
def post(self):
instance_id = self.request.get('from')
logging.info('Instance: %s', instance_id)
instance = models.Instance.FromID(instance_id)
if not instance:
logging.warning('Channel opened with invalid instance_id: %s', instance_id)
@@ -47,18 +46,8 @@ class OnChannelDisconnect(webapp2.RequestHandler):
@utils.local_namespace
def post(self):
instance_id = self.request.get('from')
logging.info('Instance: %s', instance_id)
instance_key = db.Key.from_path('Instance', instance_id)
subscriptions = list(models.Subscription.all().filter('instance =', instance_key))
if subscriptions:
db.delete(subscriptions)
pins = models.Pin.all().filter('instance =', instance_key)
for pin in pins:
pin.Delete()
db.delete(instance_key)
instance = models.Instance.FromID(instance_id)
instance.Delete()
app = webapp2.WSGIApplication([

View File

@@ -7,6 +7,10 @@ handlers:
script: cosmopolite.auth.app
secure: always
- url: /cosmopolite/cron/.*
script: cosmopolite.cron.app
login: admin
- url: /_ah/channel/.*
script: cosmopolite.channel.app

View File

@@ -94,6 +94,7 @@ class Instance(db.Model):
# key_name=instance_id
active = db.BooleanProperty(required=True, default=False)
polling = db.BooleanProperty(required=True, default=False)
last_poll = db.DateTimeProperty(required=True, auto_now=True)
@classmethod
def FromID(cls, instance_id):
@@ -105,6 +106,19 @@ class Instance(db.Model):
logging.info('Instance: %s', instance_id)
return cls.get_or_insert(instance_id, **kwargs)
def Delete(self):
logging.info('Deleting instance %s', self.key().name())
subscriptions = Subscription.all().filter('instance =', self)
for subscription in subscriptions:
subscription.Delete()
pins = Pin.all().filter('instance =', self)
for pin in pins:
pin.Delete()
self.delete()
def GetSubscriptions(self):
return (
Subscription.all()
@@ -327,6 +341,7 @@ class Subject(db.Model):
.filter('instance =', instance_key))
events = []
# TODO: bulk delete
for pin in pins:
events.append(pin.ToEvent(event_type='unpin'))
pin.delete()
@@ -428,7 +443,7 @@ class Subscription(db.Model):
.filter('readable_only_by_me =', readable_only_by_me)
.filter('writable_only_by_me =', writable_only_by_me))
for subscription in subscriptions:
subscription.delete()
subscription.Delete()
def SendMessage(self, msg):
encoded = json.dumps(msg, default=utils.EncodeJSON)
@@ -445,13 +460,23 @@ class Subscription(db.Model):
Event.all()
.ancestor(self))
ret = []
to_delete = []
for e in events:
if str(e.key().id()) in acks:
e.delete()
to_delete.append(e)
else:
ret.append(e.ToEvent())
db.delete(events)
return ret
def Delete(self):
events = (
Event.all()
.ancestor(self))
db.delete(events)
self.delete()
class Event(db.Model):
# parent=Subscription