Add real subject ACL support and tests.
This commit is contained in:
@@ -56,7 +56,8 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) {
|
||||
// message's client_message_id, which is intentionally not exposed via
|
||||
// the sendMessage API
|
||||
this.sendRPC_(
|
||||
'sendMessage', message, this.onMessageSent_.bind(this, message, null));
|
||||
'sendMessage', message,
|
||||
this.onMessageSent_.bind(this, message, null, null));
|
||||
}.bind(this));
|
||||
} else {
|
||||
localStorage[this.messageQueueKey_] = JSON.stringify([]);
|
||||
@@ -126,15 +127,17 @@ Cosmopolite.prototype.shutdown = function() {
|
||||
*
|
||||
* Start receiving messages sent to this subject via the onMessage callback.
|
||||
*
|
||||
* @param {!string} subject Subject name
|
||||
* @param {!*} subject Subject name or object
|
||||
* @param {number=} messages Number of recent messages to request; 0 for none, -1 for all
|
||||
* @param {number=} last_id ID of last message received; fetch all messages since
|
||||
* @param {Array.<string>=} keys Key names to ensure we receive at least 1 message defining
|
||||
*/
|
||||
Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
if (!(subject in this.subscriptions_)) {
|
||||
this.subscriptions_[subject] = {
|
||||
var canonicalSubject = this.canonicalSubject_(subject);
|
||||
var subjectString = JSON.stringify(canonicalSubject);
|
||||
if (!(subjectString in this.subscriptions_)) {
|
||||
this.subscriptions_[subjectString] = {
|
||||
'messages': [],
|
||||
'keys': {},
|
||||
'state': this.SubscriptionState.PENDING,
|
||||
@@ -142,7 +145,7 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
|
||||
}
|
||||
|
||||
var args = {
|
||||
'subject': subject,
|
||||
'subject': canonicalSubject,
|
||||
};
|
||||
if (messages) {
|
||||
args['messages'] = messages;
|
||||
@@ -154,13 +157,18 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
|
||||
args['keys'] = keys;
|
||||
}
|
||||
|
||||
this.sendRPC_('subscribe', args, function() {
|
||||
this.sendRPC_('subscribe', args, function(response) {
|
||||
// unsubscribe may have been called since we sent the RPC. That's racy
|
||||
// without waiting for the promise, but do our best
|
||||
if (subject in this.subscriptions_) {
|
||||
this.subscriptions_[subject].state = this.SubscriptionState.ACTIVE;
|
||||
if (subjectString in this.subscriptions_) {
|
||||
this.subscriptions_[subjectString].state = this.SubscriptionState.ACTIVE;
|
||||
}
|
||||
var result = response['result'];
|
||||
if (result == 'ok') {
|
||||
resolve();
|
||||
} else {
|
||||
reject();
|
||||
}
|
||||
resolve();
|
||||
}.bind(this));
|
||||
}.bind(this));
|
||||
};
|
||||
@@ -175,9 +183,11 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
|
||||
*/
|
||||
Cosmopolite.prototype.unsubscribe = function(subject) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
delete this.subscriptions_[subject];
|
||||
var canonicalSubject = this.canonicalSubject_(subject);
|
||||
var subjectString = JSON.stringify(canonicalSubject);
|
||||
delete this.subscriptions_[subjectString];
|
||||
var args = {
|
||||
'subject': subject,
|
||||
'subject': canonicalSubject,
|
||||
}
|
||||
this.sendRPC_('unsubscribe', args, resolve);
|
||||
}.bind(this));
|
||||
@@ -193,7 +203,7 @@ Cosmopolite.prototype.unsubscribe = function(subject) {
|
||||
Cosmopolite.prototype.sendMessage = function(subject, message, key) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
var args = {
|
||||
'subject': subject,
|
||||
'subject': this.canonicalSubject_(subject),
|
||||
'message': JSON.stringify(message),
|
||||
'sender_message_id': this.uuid_(),
|
||||
};
|
||||
@@ -207,7 +217,8 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
|
||||
localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue);
|
||||
|
||||
this.sendRPC_(
|
||||
'sendMessage', args, this.onMessageSent_.bind(this, args, resolve));
|
||||
'sendMessage', args,
|
||||
this.onMessageSent_.bind(this, args, resolve, reject));
|
||||
}.bind(this));
|
||||
};
|
||||
|
||||
@@ -218,7 +229,9 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
|
||||
* @const
|
||||
*/
|
||||
Cosmopolite.prototype.getMessages = function(subject) {
|
||||
return this.subscriptions_[subject].messages;
|
||||
var canonicalSubject = this.canonicalSubject_(subject);
|
||||
var subjectString = JSON.stringify(canonicalSubject);
|
||||
return this.subscriptions_[subjectString].messages;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -229,7 +242,9 @@ Cosmopolite.prototype.getMessages = function(subject) {
|
||||
* @const
|
||||
*/
|
||||
Cosmopolite.prototype.getKeyMessage = function(subject, key) {
|
||||
return this.subscriptions_[subject].keys[key];
|
||||
var canonicalSubject = this.canonicalSubject_(subject);
|
||||
var subjectString = JSON.stringify(canonicalSubject);
|
||||
return this.subscriptions_[subjectString].keys[key];
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -269,6 +284,31 @@ Cosmopolite.prototype.uuid_ = function() {
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Canonicalize a subject name or object
|
||||
*
|
||||
* @param {!*} subject A simple or complex representation of a subject
|
||||
* @return {Object} A canonicalized object for RPCs
|
||||
*/
|
||||
Cosmopolite.prototype.canonicalSubject_ = function(subject) {
|
||||
if (typeof(subject) == 'number') {
|
||||
subject = subject.toString();
|
||||
}
|
||||
if (typeof(subject) == 'string') {
|
||||
subject = {
|
||||
'name': subject,
|
||||
}
|
||||
}
|
||||
if (subject['readable_only_by'] === null) {
|
||||
delete subject['readable_only_by'];
|
||||
};
|
||||
if (subject['writable_only_by'] === null) {
|
||||
delete subject['writable_only_by'];
|
||||
};
|
||||
return subject;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Callback when a script loads.
|
||||
*/
|
||||
@@ -332,16 +372,26 @@ Cosmopolite.prototype.registerMessageHandlers_ = function() {
|
||||
*
|
||||
* @param {Object} message Message details.
|
||||
* @param {function()=} resolve Promise resolution callback.
|
||||
* @param {function()=} reject Promise rejection callback.
|
||||
* @param {Object=} response Server RPC response.
|
||||
*/
|
||||
Cosmopolite.prototype.onMessageSent_ = function(message, resolve) {
|
||||
Cosmopolite.prototype.onMessageSent_ = function(
|
||||
message, resolve, reject, response) {
|
||||
// No message left behind.
|
||||
var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]);
|
||||
messageQueue = messageQueue.filter(function(queuedMessage) {
|
||||
return message['sender_message_id'] != queuedMessage['sender_message_id'];
|
||||
});
|
||||
localStorage[this.messageQueueKey_] = JSON.stringify(messageQueue);
|
||||
if (resolve) {
|
||||
resolve();
|
||||
var result = response['result'];
|
||||
if (result == 'ok' || result == 'duplicate_message') {
|
||||
if (resolve) {
|
||||
resolve();
|
||||
}
|
||||
} else {
|
||||
if (reject) {
|
||||
reject();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -500,6 +550,7 @@ Cosmopolite.prototype.resubscribe_ = function() {
|
||||
var rpcs = [];
|
||||
for (var subject in this.subscriptions_) {
|
||||
var subscription = this.subscriptions_[subject];
|
||||
var canonicalSubject = JSON.parse(subject);
|
||||
if (subscription.state != this.SubscriptionState.ACTIVE) {
|
||||
continue;
|
||||
}
|
||||
@@ -510,7 +561,7 @@ Cosmopolite.prototype.resubscribe_ = function() {
|
||||
rpcs.push({
|
||||
'command': 'subscribe',
|
||||
'arguments': {
|
||||
'subject': subject,
|
||||
'subject': canonicalSubject,
|
||||
'last_id': last_id,
|
||||
}
|
||||
});
|
||||
@@ -657,7 +708,8 @@ Cosmopolite.prototype.onLogout_ = function(e) {
|
||||
* @param {!Object} e Event object
|
||||
*/
|
||||
Cosmopolite.prototype.onMessage_ = function(e) {
|
||||
var subscription = this.subscriptions_[e['subject']['name']];
|
||||
var subjectString = JSON.stringify(e['subject']);
|
||||
var subscription = this.subscriptions_[subjectString];
|
||||
if (!subscription) {
|
||||
console.log(
|
||||
this.loggingPrefix_(),
|
||||
|
||||
Reference in New Issue
Block a user