Add pubsub server API, without client API or UI yet.
This commit is contained in:
54
api.py
54
api.py
@@ -26,6 +26,37 @@ from cosmopolite.lib import utils
|
|||||||
import config
|
import config
|
||||||
|
|
||||||
|
|
||||||
|
def CreateChannel(google_user, client, args):
|
||||||
|
token = channel.create_channel(
|
||||||
|
client_id=str(client.key()),
|
||||||
|
duration_minutes=config.CHANNEL_DURATION_SECONDS / 60)
|
||||||
|
messages = [x.ToMessage()
|
||||||
|
for x in client.parent().GetStateEntries()]
|
||||||
|
if google_user:
|
||||||
|
messages.append({
|
||||||
|
'message_type': 'login',
|
||||||
|
'google_user': google_user.email(),
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
messages.append({
|
||||||
|
'message_type': 'logout',
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
'token': token,
|
||||||
|
'messages': messages,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def SendMessage(google_user, client, args):
|
||||||
|
subject = args['subject']
|
||||||
|
message = args['message']
|
||||||
|
|
||||||
|
models.Subject.FindOrCreate(subject).SendMessage(message)
|
||||||
|
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
@db.transactional()
|
@db.transactional()
|
||||||
def SetValue(google_user, client, args):
|
def SetValue(google_user, client, args):
|
||||||
entry_key = args['key']
|
entry_key = args['key']
|
||||||
@@ -57,25 +88,12 @@ def SetValue(google_user, client, args):
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def CreateChannel(google_user, client, args):
|
def Subscribe(google_user, client, args):
|
||||||
token = channel.create_channel(
|
subject = models.Subject.FindOrCreate(args['subject'])
|
||||||
client_id=str(client.key()),
|
messages = args.get('messages', 0)
|
||||||
duration_minutes=config.CHANNEL_DURATION_SECONDS / 60)
|
|
||||||
messages = [x.ToMessage()
|
|
||||||
for x in client.parent().GetStateEntries()]
|
|
||||||
if google_user:
|
|
||||||
messages.append({
|
|
||||||
'message_type': 'login',
|
|
||||||
'google_user': google_user.email(),
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
messages.append({
|
|
||||||
'message_type': 'logout',
|
|
||||||
})
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'token': token,
|
'messages': models.Subscription.FindOrCreate(subject, client, messages),
|
||||||
'messages': messages,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -83,7 +101,9 @@ class APIWrapper(webapp2.RequestHandler):
|
|||||||
|
|
||||||
_COMMANDS = {
|
_COMMANDS = {
|
||||||
'createChannel': CreateChannel,
|
'createChannel': CreateChannel,
|
||||||
|
'sendMessage': SendMessage,
|
||||||
'setValue': SetValue,
|
'setValue': SetValue,
|
||||||
|
'subscribe': Subscribe,
|
||||||
}
|
}
|
||||||
|
|
||||||
@utils.chaos_monkey
|
@utils.chaos_monkey
|
||||||
|
|||||||
@@ -29,14 +29,18 @@ class OnChannelConnect(webapp2.RequestHandler):
|
|||||||
client.channel_active = True
|
client.channel_active = True
|
||||||
client.put()
|
client.put()
|
||||||
|
|
||||||
|
|
||||||
class OnChannelDisconnect(webapp2.RequestHandler):
|
class OnChannelDisconnect(webapp2.RequestHandler):
|
||||||
@utils.local_namespace
|
@utils.local_namespace
|
||||||
@db.transactional()
|
|
||||||
def post(self):
|
def post(self):
|
||||||
client = models.Client.get(self.request.get('from'))
|
client = models.Client.get(self.request.get('from'))
|
||||||
client.channel_active = False
|
client.channel_active = False
|
||||||
client.put()
|
client.put()
|
||||||
|
|
||||||
|
subscriptions = models.Subscription.all().filter('client =', client)
|
||||||
|
for subscription in subscriptions:
|
||||||
|
subscription.delete()
|
||||||
|
|
||||||
|
|
||||||
app = webapp2.WSGIApplication([
|
app = webapp2.WSGIApplication([
|
||||||
('/_ah/channel/connected/', OnChannelConnect),
|
('/_ah/channel/connected/', OnChannelConnect),
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ class Profile(db.Model):
|
|||||||
|
|
||||||
|
|
||||||
class Client(db.Model):
|
class Client(db.Model):
|
||||||
# Parent: Profile
|
# parent=Profile
|
||||||
|
|
||||||
first_seen = db.DateTimeProperty(required=True, auto_now_add=True)
|
first_seen = db.DateTimeProperty(required=True, auto_now_add=True)
|
||||||
channel_active = db.BooleanProperty(required=True, default=False)
|
channel_active = db.BooleanProperty(required=True, default=False)
|
||||||
@@ -96,11 +96,15 @@ class Client(db.Model):
|
|||||||
return cls.FromProfile(profile)
|
return cls.FromProfile(profile)
|
||||||
|
|
||||||
def SendMessage(self, msg):
|
def SendMessage(self, msg):
|
||||||
channel.send_message(str(self.key()), json.dumps(msg, default=utils.EncodeJSON))
|
self.SendByKey(self.key(), msg)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def SendByKey(key, msg):
|
||||||
|
channel.send_message(str(key), json.dumps(msg, default=utils.EncodeJSON))
|
||||||
|
|
||||||
|
|
||||||
class StateEntry(db.Model):
|
class StateEntry(db.Model):
|
||||||
# Parent: Profile
|
# parent=Profile
|
||||||
|
|
||||||
last_set = db.DateTimeProperty(required=True, auto_now=True)
|
last_set = db.DateTimeProperty(required=True, auto_now=True)
|
||||||
entry_key = db.StringProperty(required=True)
|
entry_key = db.StringProperty(required=True)
|
||||||
@@ -118,10 +122,67 @@ class StateEntry(db.Model):
|
|||||||
|
|
||||||
|
|
||||||
class Subject(db.Model):
|
class Subject(db.Model):
|
||||||
name = db.StringProperty(required=True)
|
# key_name=name
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def FindOrCreate(cls, name):
|
||||||
|
subject = cls.get_by_key_name(name)
|
||||||
|
if subject:
|
||||||
|
return subject
|
||||||
|
return cls(key_name=name).put()
|
||||||
|
|
||||||
|
@db.transactional()
|
||||||
|
def RecentMessages(self, num_messages):
|
||||||
|
query = (
|
||||||
|
Message.all()
|
||||||
|
.ancestor(self)
|
||||||
|
.order('-created'))
|
||||||
|
if num_messages <= 0:
|
||||||
|
num_messages = None
|
||||||
|
return query.run(limit=num_messages)
|
||||||
|
|
||||||
|
@db.transactional()
|
||||||
|
def SendMessage(self, message):
|
||||||
|
obj = Message(parent=self, message=message)
|
||||||
|
obj.put()
|
||||||
|
|
||||||
|
json_message = obj.ToMessage()
|
||||||
|
|
||||||
|
for subscription in Subscription.all().ancestor(self):
|
||||||
|
Client.SendByKey(Subscription.client.get_value_for_datastore(subscription), json_message)
|
||||||
|
|
||||||
|
|
||||||
class Subscription(db.Model):
|
class Subscription(db.Model):
|
||||||
# Parent: Subject
|
# parent=Subject
|
||||||
|
|
||||||
client = db.ReferenceProperty(reference_class=Client)
|
client = db.ReferenceProperty(reference_class=Client)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@db.transactional()
|
||||||
|
def FindOrCreate(cls, subject, client, messages):
|
||||||
|
subscriptions = (
|
||||||
|
cls.all(keys_only=True)
|
||||||
|
.ancestor(subject)
|
||||||
|
.filter('client =', client)
|
||||||
|
.fetch(1))
|
||||||
|
if not subscriptions:
|
||||||
|
logging.info('no subscriptions found')
|
||||||
|
cls(parent=subject, client=client).put()
|
||||||
|
if messages == 0:
|
||||||
|
return []
|
||||||
|
return [m.ToMessage() for m in subject.RecentMessages(messages)]
|
||||||
|
|
||||||
|
|
||||||
|
class Message(db.Model):
|
||||||
|
# parent=Subject
|
||||||
|
|
||||||
|
created = db.DateTimeProperty(required=True, auto_now_add=True)
|
||||||
|
message = db.TextProperty(required=True)
|
||||||
|
|
||||||
|
def ToMessage(self):
|
||||||
|
return {
|
||||||
|
'message_type': 'message',
|
||||||
|
'subject': self.parent_key().name(),
|
||||||
|
'created': self.created,
|
||||||
|
'message': self.message,
|
||||||
|
}
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ cosmopolite.Client.prototype.onReceiveMessage_ = function(data) {
|
|||||||
this.socket.close();
|
this.socket.close();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
console.log('Unknown message type: ' + data);
|
console.log('Unknown event type:', data);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -67,10 +67,10 @@ cosmopolite.Client.prototype.registerMessageHandlers_ = function() {
|
|||||||
this.$(window).on('message', this.$.proxy(function(e) {
|
this.$(window).on('message', this.$.proxy(function(e) {
|
||||||
if (e.originalEvent.origin != window.location.origin) {
|
if (e.originalEvent.origin != window.location.origin) {
|
||||||
console.log(
|
console.log(
|
||||||
'Received message from bad origin: ' + e.originalEvent.origin);
|
'Received message from bad origin:', e.originalEvent.origin);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
console.log('Received message: ' + e.originalEvent.data);
|
console.log('Received message:', e.originalEvent.data);
|
||||||
this.onReceiveMessage_(e.originalEvent.data);
|
this.onReceiveMessage_(e.originalEvent.data);
|
||||||
}, this));
|
}, this));
|
||||||
};
|
};
|
||||||
@@ -125,9 +125,7 @@ cosmopolite.Client.prototype.sendRPCs_ = function(commands, delay) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (data['status'] != 'ok') {
|
if (data['status'] != 'ok') {
|
||||||
console.log(
|
console.log('Server returned unknown status:', data['status']);
|
||||||
'Server returned unknown status (' + data['status'] + ') for RPC '
|
|
||||||
+ command);
|
|
||||||
// TODO(flamingcow): Refresh the page? Show an alert?
|
// TODO(flamingcow): Refresh the page? Show an alert?
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -172,7 +170,27 @@ cosmopolite.Client.prototype.getValue = function(key) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
cosmopolite.Client.prototype.createChannel_ = function() {
|
cosmopolite.Client.prototype.createChannel_ = function() {
|
||||||
this.sendRPC_('createChannel', {}, this.onCreateChannel_);
|
this.sendRPCs_([
|
||||||
|
{
|
||||||
|
'command': 'createChannel',
|
||||||
|
'onSuccess': this.onCreateChannel_,
|
||||||
|
},
|
||||||
|
// TODO(flamingcow): Remove debugging below.
|
||||||
|
{
|
||||||
|
'command': 'subscribe',
|
||||||
|
'arguments': {
|
||||||
|
'subject': 'books',
|
||||||
|
'messages': 5,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'command': 'sendMessage',
|
||||||
|
'arguments': {
|
||||||
|
'subject': 'books',
|
||||||
|
'message': 'foobar',
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]);
|
||||||
};
|
};
|
||||||
|
|
||||||
cosmopolite.Client.prototype.onCreateChannel_ = function(data) {
|
cosmopolite.Client.prototype.onCreateChannel_ = function(data) {
|
||||||
@@ -238,12 +256,12 @@ cosmopolite.Client.prototype.onServerMessage_ = function(msg) {
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// Client out of date? Force refresh?
|
// Client out of date? Force refresh?
|
||||||
console.log('Unknown message type: ' + msg.message_type);
|
console.log('Unknown channel message:', msg);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
cosmopolite.Client.prototype.onSocketError_ = function(msg) {
|
cosmopolite.Client.prototype.onSocketError_ = function(msg) {
|
||||||
console.log('Socket error: ' + msg);
|
console.log('Socket error:', msg);
|
||||||
this.socket.close();
|
this.socket.close();
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user