Add a third level construct under Profile and Client, Instance. This allows more than one connected object with the same client credentials.
This commit is contained in:
18
api.py
18
api.py
@@ -27,9 +27,9 @@ from cosmopolite.lib import utils
|
|||||||
import config
|
import config
|
||||||
|
|
||||||
|
|
||||||
def CreateChannel(google_user, client, args):
|
def CreateChannel(google_user, client, instance_id, args):
|
||||||
token = channel.create_channel(
|
token = channel.create_channel(
|
||||||
client_id=str(client.key()),
|
client_id=str(client.key()) + '/' + instance_id,
|
||||||
duration_minutes=config.CHANNEL_DURATION_SECONDS / 60)
|
duration_minutes=config.CHANNEL_DURATION_SECONDS / 60)
|
||||||
events = []
|
events = []
|
||||||
if google_user:
|
if google_user:
|
||||||
@@ -50,7 +50,7 @@ def CreateChannel(google_user, client, args):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def SendMessage(google_user, client, args):
|
def SendMessage(google_user, client, instance_id, args):
|
||||||
subject = args['subject']
|
subject = args['subject']
|
||||||
message = args['message']
|
message = args['message']
|
||||||
sender_message_id = args['sender_message_id']
|
sender_message_id = args['sender_message_id']
|
||||||
@@ -75,7 +75,8 @@ def SendMessage(google_user, client, args):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def Subscribe(google_user, client, args):
|
def Subscribe(google_user, client, instance_id, args):
|
||||||
|
instance = models.Instance.FromID(instance_id, client)
|
||||||
subject = models.Subject.FindOrCreate(args['subject'])
|
subject = models.Subject.FindOrCreate(args['subject'])
|
||||||
messages = args.get('messages', 0)
|
messages = args.get('messages', 0)
|
||||||
last_id = args.get('last_id', None)
|
last_id = args.get('last_id', None)
|
||||||
@@ -84,7 +85,8 @@ def Subscribe(google_user, client, args):
|
|||||||
try:
|
try:
|
||||||
ret = {
|
ret = {
|
||||||
'result': 'ok',
|
'result': 'ok',
|
||||||
'events': models.Subscription.FindOrCreate(subject, client, messages, last_id),
|
'events': models.Subscription.FindOrCreate(
|
||||||
|
subject, instance, messages, last_id),
|
||||||
}
|
}
|
||||||
except models.AccessDenied:
|
except models.AccessDenied:
|
||||||
logging.exception('Subscribe access denied')
|
logging.exception('Subscribe access denied')
|
||||||
@@ -100,9 +102,10 @@ def Subscribe(google_user, client, args):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def Unsubscribe(google_user, client, args):
|
def Unsubscribe(google_user, client, instance_id, args):
|
||||||
|
instance = models.Instance.FromID(instance_id, client)
|
||||||
subject = models.Subject.FindOrCreate(args['subject'])
|
subject = models.Subject.FindOrCreate(args['subject'])
|
||||||
models.Subscription.Remove(subject, client)
|
models.Subscription.Remove(subject, instance)
|
||||||
|
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
@@ -134,6 +137,7 @@ class APIWrapper(webapp2.RequestHandler):
|
|||||||
result = callback(
|
result = callback(
|
||||||
self.verified_google_user,
|
self.verified_google_user,
|
||||||
self.client,
|
self.client,
|
||||||
|
self.request_json['instance_id'],
|
||||||
command.get('arguments', {}))
|
command.get('arguments', {}))
|
||||||
# Magic: if result contains "events", haul them up a level so the
|
# Magic: if result contains "events", haul them up a level so the
|
||||||
# client can see them as a single stream.
|
# client can see them as a single stream.
|
||||||
|
|||||||
16
channel.py
16
channel.py
@@ -25,22 +25,24 @@ class OnChannelConnect(webapp2.RequestHandler):
|
|||||||
@utils.local_namespace
|
@utils.local_namespace
|
||||||
@db.transactional()
|
@db.transactional()
|
||||||
def post(self):
|
def post(self):
|
||||||
client = models.Client.get(self.request.get('from'))
|
client_key, instance_id = self.request.get('from').split('/', 1)
|
||||||
client.channel_active = True
|
client = models.Client.get(client_key)
|
||||||
client.put()
|
instance = models.Instance.FindOrCreate(instance_id, client)
|
||||||
|
|
||||||
|
|
||||||
class OnChannelDisconnect(webapp2.RequestHandler):
|
class OnChannelDisconnect(webapp2.RequestHandler):
|
||||||
@utils.local_namespace
|
@utils.local_namespace
|
||||||
def post(self):
|
def post(self):
|
||||||
client = models.Client.get(self.request.get('from'))
|
client_key, instance_id = self.request.get('from').split('/', 1)
|
||||||
client.channel_active = False
|
client = models.Client.get(client_key)
|
||||||
client.put()
|
instance = models.Instance.FindOrCreate(instance_id, client)
|
||||||
|
|
||||||
subscriptions = models.Subscription.all().filter('client =', client)
|
subscriptions = models.Subscription.all().filter('instance =', instance)
|
||||||
for subscription in subscriptions:
|
for subscription in subscriptions:
|
||||||
subscription.delete()
|
subscription.delete()
|
||||||
|
|
||||||
|
models.Instance.get(instance).delete()
|
||||||
|
|
||||||
|
|
||||||
app = webapp2.WSGIApplication([
|
app = webapp2.WSGIApplication([
|
||||||
('/_ah/channel/connected/', OnChannelConnect),
|
('/_ah/channel/connected/', OnChannelConnect),
|
||||||
|
|||||||
@@ -24,10 +24,11 @@ import utils
|
|||||||
|
|
||||||
# Profile
|
# Profile
|
||||||
# ↳ Client
|
# ↳ Client
|
||||||
|
# ↳ Instance
|
||||||
#
|
#
|
||||||
# Subject
|
# Subject
|
||||||
# ↳ Message
|
# ↳ Message
|
||||||
# ↳ Subscription (⤴︎ Client)
|
# ↳ Subscription (⤴︎ Instance)
|
||||||
|
|
||||||
|
|
||||||
class DuplicateMessage(Exception):
|
class DuplicateMessage(Exception):
|
||||||
@@ -71,7 +72,6 @@ class Client(db.Model):
|
|||||||
# parent=Profile
|
# parent=Profile
|
||||||
|
|
||||||
first_seen = db.DateTimeProperty(required=True, auto_now_add=True)
|
first_seen = db.DateTimeProperty(required=True, auto_now_add=True)
|
||||||
channel_active = db.BooleanProperty(required=True, default=False)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def FromProfile(cls, profile):
|
def FromProfile(cls, profile):
|
||||||
@@ -84,12 +84,38 @@ class Client(db.Model):
|
|||||||
profile = Profile.FromGoogleUser(google_user)
|
profile = Profile.FromGoogleUser(google_user)
|
||||||
return cls.FromProfile(profile)
|
return cls.FromProfile(profile)
|
||||||
|
|
||||||
def SendMessage(self, msg):
|
|
||||||
self.SendByKey(self.key(), msg)
|
|
||||||
|
|
||||||
@staticmethod
|
class Instance(db.Model):
|
||||||
def SendByKey(key, msg):
|
# parent=Client
|
||||||
channel.send_message(str(key), json.dumps(msg, default=utils.EncodeJSON))
|
|
||||||
|
id_ = db.StringProperty(required=True)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@db.transactional()
|
||||||
|
def FromID(cls, instance_id, client):
|
||||||
|
instances = (
|
||||||
|
cls.all(keys_only=True)
|
||||||
|
.filter('id_ =', instance_id)
|
||||||
|
.ancestor(client)
|
||||||
|
.fetch(1))
|
||||||
|
if instances:
|
||||||
|
return instances[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@db.transactional()
|
||||||
|
def FindOrCreate(cls, instance_id, client):
|
||||||
|
instance = cls.FromID(instance_id, client)
|
||||||
|
if instance:
|
||||||
|
return instance
|
||||||
|
else:
|
||||||
|
return cls(parent=client, id_=instance_id).put()
|
||||||
|
|
||||||
|
def SendMessage(self, msg):
|
||||||
|
channel.send_message(
|
||||||
|
str(self.parent_key()) + '/' + self.id_,
|
||||||
|
json.dumps(msg, default=utils.EncodeJSON))
|
||||||
|
|
||||||
|
|
||||||
class Subject(db.Model):
|
class Subject(db.Model):
|
||||||
@@ -195,10 +221,7 @@ class Subject(db.Model):
|
|||||||
key_=key)
|
key_=key)
|
||||||
obj.put()
|
obj.put()
|
||||||
|
|
||||||
return (
|
return (obj, list(Subscription.all().ancestor(subject)))
|
||||||
obj,
|
|
||||||
[Subscription.client.get_value_for_datastore(subscription)
|
|
||||||
for subscription in Subscription.all().ancestor(subject)])
|
|
||||||
|
|
||||||
def SendMessage(self, message, sender, sender_message_id, key=None):
|
def SendMessage(self, message, sender, sender_message_id, key=None):
|
||||||
writable_only_by = Subject.writable_only_by.get_value_for_datastore(self)
|
writable_only_by = Subject.writable_only_by.get_value_for_datastore(self)
|
||||||
@@ -208,7 +231,7 @@ class Subject(db.Model):
|
|||||||
obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key)
|
obj, subscriptions = self.PutMessage(message, sender, sender_message_id, key)
|
||||||
event = obj.ToEvent()
|
event = obj.ToEvent()
|
||||||
for subscription in subscriptions:
|
for subscription in subscriptions:
|
||||||
Client.SendByKey(subscription, event)
|
subscription.instance.SendMessage(event)
|
||||||
|
|
||||||
def ToDict(self):
|
def ToDict(self):
|
||||||
ret = {
|
ret = {
|
||||||
@@ -226,24 +249,24 @@ class Subject(db.Model):
|
|||||||
class Subscription(db.Model):
|
class Subscription(db.Model):
|
||||||
# parent=Subject
|
# parent=Subject
|
||||||
|
|
||||||
client = db.ReferenceProperty(reference_class=Client)
|
instance = db.ReferenceProperty(reference_class=Instance, required=True)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@db.transactional()
|
@db.transactional()
|
||||||
def FindOrCreate(cls, subject, client, messages=0, last_id=None):
|
def FindOrCreate(cls, subject, instance, messages=0, last_id=None):
|
||||||
readable_only_by = (
|
readable_only_by = (
|
||||||
Subject.readable_only_by.get_value_for_datastore(subject))
|
Subject.readable_only_by.get_value_for_datastore(subject))
|
||||||
if (readable_only_by and
|
if (readable_only_by and
|
||||||
readable_only_by != client.parent_key()):
|
readable_only_by != instance.parent().parent()):
|
||||||
raise AccessDenied
|
raise AccessDenied
|
||||||
|
|
||||||
subscriptions = (
|
subscriptions = (
|
||||||
cls.all(keys_only=True)
|
cls.all(keys_only=True)
|
||||||
.ancestor(subject)
|
.ancestor(subject)
|
||||||
.filter('client =', client)
|
.filter('instance =', instance)
|
||||||
.fetch(1))
|
.fetch(1))
|
||||||
if not subscriptions:
|
if not subscriptions:
|
||||||
cls(parent=subject, client=client).put()
|
cls(parent=subject, instance=instance).put()
|
||||||
events = []
|
events = []
|
||||||
if messages:
|
if messages:
|
||||||
events.extend(m.ToEvent() for m in subject.GetRecentMessages(messages))
|
events.extend(m.ToEvent() for m in subject.GetRecentMessages(messages))
|
||||||
@@ -253,11 +276,11 @@ class Subscription(db.Model):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@db.transactional()
|
@db.transactional()
|
||||||
def Remove(cls, subject, client):
|
def Remove(cls, subject, instance):
|
||||||
subscriptions = (
|
subscriptions = (
|
||||||
cls.all()
|
cls.all()
|
||||||
.ancestor(subject)
|
.ancestor(subject)
|
||||||
.filter('client =', client))
|
.filter('instance =', instance))
|
||||||
for subscription in subscriptions:
|
for subscription in subscriptions:
|
||||||
subscription.delete()
|
subscription.delete()
|
||||||
|
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) {
|
|||||||
this.subscriptions_ = {};
|
this.subscriptions_ = {};
|
||||||
this.profilePromises_ = [];
|
this.profilePromises_ = [];
|
||||||
|
|
||||||
|
this.instanceId_ = this.uuid_();
|
||||||
|
|
||||||
this.messageQueueKey_ = this.namespace_ + ':message_queue';
|
this.messageQueueKey_ = this.namespace_ + ':message_queue';
|
||||||
if (this.messageQueueKey_ in localStorage) {
|
if (this.messageQueueKey_ in localStorage) {
|
||||||
var messages = JSON.parse(localStorage[this.messageQueueKey_]);
|
var messages = JSON.parse(localStorage[this.messageQueueKey_]);
|
||||||
@@ -447,6 +449,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
var request = {
|
var request = {
|
||||||
|
'instance_id': this.instanceId_,
|
||||||
'commands': [],
|
'commands': [],
|
||||||
};
|
};
|
||||||
commands.forEach(function(command) {
|
commands.forEach(function(command) {
|
||||||
|
|||||||
@@ -441,6 +441,32 @@ asyncTest('sendMessage ACL', function() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
asyncTest('Two channels, one client', function() {
|
||||||
|
expect(2);
|
||||||
|
|
||||||
|
var namespace = randstring();
|
||||||
|
var subject = randstring();
|
||||||
|
var message = randstring();
|
||||||
|
|
||||||
|
var callbacks = {
|
||||||
|
'onMessage': function(msg) {
|
||||||
|
console.log('onMessage');
|
||||||
|
equal(msg['subject']['name'], subject, 'subject matches');
|
||||||
|
equal(msg['message'], message, 'message matches');
|
||||||
|
cosmo1.shutdown();
|
||||||
|
start();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var cosmo1 = new Cosmopolite(callbacks, null, namespace);
|
||||||
|
cosmo1.subscribe(subject).then(function() {
|
||||||
|
var cosmo2 = new Cosmopolite({}, null, namespace);
|
||||||
|
cosmo2.sendMessage(subject, message).then(function() {
|
||||||
|
cosmo2.shutdown();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
module('dev_appserver only');
|
module('dev_appserver only');
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user