When rebuilding the channel, subscribe via last seen ID, so we don't miss messages from the time we were disconnected.

This commit is contained in:
Ian Gulliver
2014-05-18 11:20:44 +03:00
parent 63ac321537
commit 0380679ddb
3 changed files with 24 additions and 7 deletions

3
api.py
View File

@@ -71,10 +71,11 @@ def SendMessage(google_user, client, args):
def Subscribe(google_user, client, args): def Subscribe(google_user, client, args):
subject = models.Subject.FindOrCreate(args['subject']) subject = models.Subject.FindOrCreate(args['subject'])
messages = args.get('messages', 0) messages = args.get('messages', 0)
last_id = args.get('last_id', None)
keys = args.get('keys', []) keys = args.get('keys', [])
ret = { ret = {
'events': models.Subscription.FindOrCreate(subject, client, messages), 'events': models.Subscription.FindOrCreate(subject, client, messages, last_id),
} }
for key in keys: for key in keys:
message = subject.GetKey(key) message = subject.GetKey(key)

View File

@@ -117,6 +117,15 @@ class Subject(db.Model):
num_messages = None num_messages = None
return reversed(query.fetch(limit=num_messages)) return reversed(query.fetch(limit=num_messages))
@db.transactional()
def GetMessagesSince(self, last_id):
query = (
Message.all()
.ancestor(self)
.filter('id_ >', last_id)
.order('id_'))
return list(query)
@db.transactional() @db.transactional()
def GetKey(self, key): def GetKey(self, key):
messages = ( messages = (
@@ -179,7 +188,7 @@ class Subscription(db.Model):
@classmethod @classmethod
@db.transactional() @db.transactional()
def FindOrCreate(cls, subject, client, messages): def FindOrCreate(cls, subject, client, messages=0, last_id=None):
subscriptions = ( subscriptions = (
cls.all(keys_only=True) cls.all(keys_only=True)
.ancestor(subject) .ancestor(subject)
@@ -187,9 +196,12 @@ class Subscription(db.Model):
.fetch(1)) .fetch(1))
if not subscriptions: if not subscriptions:
cls(parent=subject, client=client).put() cls(parent=subject, client=client).put()
if messages == 0: events = []
return [] if messages:
return [m.ToEvent() for m in subject.GetRecentMessages(messages)] events.extend(m.ToEvent() for m in subject.GetRecentMessages(messages))
if last_id:
events.extend(m.ToEvent() for m in subject.GetMessagesSince(last_id))
return events
@classmethod @classmethod
@db.transactional() @db.transactional()

View File

@@ -423,13 +423,17 @@ Cosmopolite.prototype.createChannel_ = function() {
'onSuccess': this.onCreateChannel_, 'onSuccess': this.onCreateChannel_,
}, },
]; ];
// TODO(flamingcow): Need to restart from the latest message.
for (var subject in this.subscriptions_) { for (var subject in this.subscriptions_) {
var subscription = this.subscriptions_[subject];
var last_id = 0;
if (subscription.messages.length > 0) {
last_id = subscription.messages[subscription.messages.length - 1]['id'];
}
rpcs.push({ rpcs.push({
'command': 'subscribe', 'command': 'subscribe',
'arguments': { 'arguments': {
'subject': subject, 'subject': subject,
'messages': 0, 'last_id': last_id,
} }
}); });
} }