Fix handling of initial messages on channel creation by returning them in the

RPC response instead of over the channel.
This commit is contained in:
Ian Gulliver
2014-03-25 14:19:13 -07:00
parent f01d7a4dc9
commit 7069d16aaa
3 changed files with 34 additions and 22 deletions

11
api.py
View File

@@ -66,10 +66,11 @@ class SetValue(webapp2.RequestHandler):
entry_value=entry_value) entry_value=entry_value)
entry.put() entry.put()
clients = (models.Client.all(keys_only=True) msg = entry.ToMessage()
clients = (models.Client.all()
.ancestor(self.client.parent_key())) .ancestor(self.client.parent_key()))
for client in clients: for client in clients:
entry.SendToClient(client) client.SendMessage(msg)
return {} return {}
@@ -108,12 +109,10 @@ class CreateChannel(webapp2.RequestHandler):
token = channel.create_channel( token = channel.create_channel(
client_id=str(self.client.key()), client_id=str(self.client.key()),
duration_minutes=config.CHANNEL_DURATION_SECONDS / 60) duration_minutes=config.CHANNEL_DURATION_SECONDS / 60)
entries = (models.StateEntry.all()
.ancestor(self.client.parent_key()))
for entry in entries:
entry.SendToClient(self.client.key())
return { return {
'token': token, 'token': token,
'messages': [x.ToMessage()
for x in self.client.parent().GetStateEntries()],
} }

View File

@@ -52,9 +52,7 @@ class Profile(db.Model):
# Merge from another profile into this one, using last_set time as the # Merge from another profile into this one, using last_set time as the
# arbiter. # arbiter.
my_states = {} my_states = {}
for state_entry in (StateEntry.all() for state_entry in self.GetStateEntries():
.ancestor(self)
.run()):
my_states[state_entry.entry_key] = state_entry my_states[state_entry.entry_key] = state_entry
for state_entry in (StateEntry.all() for state_entry in (StateEntry.all()
@@ -73,6 +71,10 @@ class Profile(db.Model):
entry_value=state_entry.entry_value entry_value=state_entry.entry_value
).put() ).put()
@db.transactional()
def GetStateEntries(self):
return StateEntry.all().ancestor(self)
class Client(db.Model): class Client(db.Model):
first_seen = db.DateTimeProperty(required=True, auto_now_add=True) first_seen = db.DateTimeProperty(required=True, auto_now_add=True)
@@ -89,18 +91,21 @@ class Client(db.Model):
profile = Profile.FromGoogleUser(google_user) profile = Profile.FromGoogleUser(google_user)
return cls.FromProfile(profile) return cls.FromProfile(profile)
def SendMessage(self, msg):
channel.send_message(str(self.key()), json.dumps(msg))
class StateEntry(db.Model): class StateEntry(db.Model):
last_set = db.DateTimeProperty(required=True, auto_now=True) last_set = db.DateTimeProperty(required=True, auto_now=True)
entry_key = db.StringProperty(required=True) entry_key = db.StringProperty(required=True)
entry_value = db.StringProperty(required=True) entry_value = db.StringProperty(required=True)
def SendToClient(self, client_id): def ToMessage(self):
channel.send_message(str(client_id), json.dumps({ return {
'message_type': 'state', 'message_type': 'state',
'key': self.entry_key, 'key': self.entry_key,
'value': self.entry_value, 'value': self.entry_value,
})) }
class Subject(db.Model): class Subject(db.Model):

View File

@@ -168,6 +168,11 @@ cosmopolite.Client.prototype.onCreateChannel_ = function(data) {
onmessage: this.$.proxy(this.onSocketMessage_, this), onmessage: this.$.proxy(this.onSocketMessage_, this),
onerror: this.$.proxy(this.onSocketError_, this), onerror: this.$.proxy(this.onSocketError_, this),
}); });
// Handle messages that were immediately available as if they came over the
// channel.
for (var i = 0; i < data['messages'].length; ++i) {
this.onServerMessage_(data['messages'][i]);
}
}; };
cosmopolite.Client.prototype.onSocketOpen_ = function() { cosmopolite.Client.prototype.onSocketOpen_ = function() {
@@ -184,12 +189,14 @@ cosmopolite.Client.prototype.onSocketClose_ = function() {
}; };
cosmopolite.Client.prototype.onSocketMessage_ = function(msg) { cosmopolite.Client.prototype.onSocketMessage_ = function(msg) {
console.log('message: ' + msg.data); this.onServerMessage_(JSON.parse(msg.data));
var parsed = JSON.parse(msg.data); };
switch (parsed.message_type) {
cosmopolite.Client.prototype.onServerMessage_ = function(msg) {
switch (msg.message_type) {
case 'state': case 'state':
var key = parsed['key']; var key = msg['key'];
var value = parsed['value']; var value = msg['value'];
if (this.stateCache_[key] == value) { if (this.stateCache_[key] == value) {
// Duplicate message. // Duplicate message.
break; break;
@@ -200,10 +207,11 @@ cosmopolite.Client.prototype.onSocketMessage_ = function(msg) {
} }
break; break;
default: default:
console.log('Unknown message type: ' + parsed.message_type); // Client out of date? Force refresh?
console.log('Unknown message type: ' + msg.message_type);
break; break;
} }
} };
cosmopolite.Client.prototype.onSocketError_ = function(msg) { cosmopolite.Client.prototype.onSocketError_ = function(msg) {
console.log('Socket error: ' + msg); console.log('Socket error: ' + msg);