Add support for "local" subjects, which don't get pushed to the serve and get destroyed on unsubscribe.

This commit is contained in:
Ian Gulliver
2015-10-15 19:05:39 +02:00
parent 9b5a55b737
commit a59f7f97f3
2 changed files with 190 additions and 4 deletions

View File

@@ -290,6 +290,12 @@ Cosmopolite.prototype.subscribe = function(subjects, opt_messages, opt_lastID) {
var subject = subjects[i];
ret.push(this.newPromise_(function(resolve, reject) {
if (subject['local'] && (subject['readable_only_by'] || subject['writable_only_by'])) {
console.log(this.loggingPrefix_(), 'local subjects can\'t have ACLs:', subject);
reject();
return;
}
/** @type {Cosmopolite.typeSubject} */
var canonicalSubject = this.canonicalSubject_(subject);
/** @type {string} */
@@ -303,6 +309,13 @@ Cosmopolite.prototype.subscribe = function(subjects, opt_messages, opt_lastID) {
};
}
if (subject['local']) {
this.subscriptions_[subjectString].state =
Cosmopolite.SubscriptionState_.ACTIVE;
resolve();
return;
}
var args = {
'subject': canonicalSubject
};
@@ -374,6 +387,10 @@ Cosmopolite.prototype.unsubscribe = function(subject) {
/** @type {string} */
var subjectString = this.subjectString_(canonicalSubject);
delete this.subscriptions_[subjectString];
if (subject['local']) {
resolve();
return;
}
var args = {
'subject': canonicalSubject
};
@@ -390,12 +407,25 @@ Cosmopolite.prototype.unsubscribe = function(subject) {
*/
Cosmopolite.prototype.sendMessage = function(subject, message) {
return this.newPromise_(function(resolve, reject) {
if (subject['local'] && (subject['readable_only_by'] || subject['writable_only_by'])) {
console.log(this.loggingPrefix_(), 'local subjects can\'t have ACLs:', subject);
reject();
return;
}
var args = /** @type {Cosmopolite.typeMessage} */ ({
'subject': this.canonicalSubject_(subject),
'message': JSON.stringify(message),
'sender_message_id': this.uuid()
});
if (subject['local']) {
resolve(message);
args['id'] = this.uuid();
this.onMessage_(args);
return;
}
// No message left behind.
var messageQueue = JSON.parse(localStorage[this.messageQueueKey_]);
messageQueue.push(args);
@@ -496,11 +526,20 @@ Cosmopolite.prototype.pin = function(subject, message) {
'sender_message_id': id
};
this.sendRPC_('pin', args, function() {
var onSuccess = function() {
this.pins_[id] = args;
resolve(id);
});
}
if (subject['local']) {
onSuccess.bind(this)();
// Ugly hack, but we need this to promise to resolve before the callback fires.
// This is the equivalent of sched_yield(), and probably about as reliable.
window.setTimeout(this.onPin_.bind(this, Object.create(args)), 0);
return;
}
this.sendRPC_('pin', args, onSuccess);
}.bind(this));
};
@@ -520,6 +559,12 @@ Cosmopolite.prototype.unpin = function(id) {
delete this.pins_[id];
if (pin['subject']['local']) {
resolve();
window.setTimeout(this.onUnpin_.bind(this, Object.create(pin)), 0);
return;
}
this.sendRPC_('unpin', args, resolve);
}.bind(this));
};
@@ -682,6 +727,9 @@ Cosmopolite.prototype.canonicalSubject_ = function(subject) {
if (subject['writable_only_by'] === null) {
delete subject['writable_only_by'];
}
if (!subject['local']) {
delete subject['local'];
}
return subject;
};
@@ -698,7 +746,8 @@ Cosmopolite.prototype.subjectString_ = function(subject) {
return [
subject['name'],
subject['readable_only_by'],
subject['writable_only_by']
subject['writable_only_by'],
subject['local']
].toString();
};
@@ -970,6 +1019,9 @@ Cosmopolite.prototype.onReconnect_ = function() {
if (subscription.state != Cosmopolite.SubscriptionState_.ACTIVE) {
continue;
}
if (subject['local']) {
continue;
}
/** @type {number} */
var lastID = 0;
if (subscription.messages.length > 0) {
@@ -986,6 +1038,9 @@ Cosmopolite.prototype.onReconnect_ = function() {
for (var id in this.pins_) {
/** @type {Cosmopolite.typeMessage} */
var pin = this.pins_[id];
if (pin.subject['local']) {
continue;
}
rpcs.push({
'command': 'pin',
'arguments': pin