Add support for ephemeral messages that are tied to an open channel, aka "pins".

This commit is contained in:
Ian Gulliver
2014-05-25 23:40:56 -07:00
parent 1e91f5babb
commit 8e7af2f5cf
6 changed files with 363 additions and 14 deletions

View File

@@ -43,6 +43,7 @@ var Cosmopolite = function(callbacks, urlPrefix, namespace) {
this.rpcQueue_ = [];
this.subscriptions_ = {};
this.pins_ = {};
this.profilePromises_ = [];
this.messageQueueKey_ = this.namespace_ + ':message_queue';
@@ -140,6 +141,7 @@ Cosmopolite.prototype.subscribe = function(subject, messages, last_id, keys) {
if (!(subjectString in this.subscriptions_)) {
this.subscriptions_[subjectString] = {
'messages': [],
'pins': [],
'keys': {},
'state': this.SubscriptionState.PENDING,
};
@@ -235,6 +237,18 @@ Cosmopolite.prototype.getMessages = function(subject) {
return this.subscriptions_[subjectString].messages;
};
/**
* Fetch all current pins for a subject
*
* @param {!string} subject Subject name
* @const
*/
Cosmopolite.prototype.getPins = function(subject) {
var canonicalSubject = this.canonicalSubject_(subject);
var subjectString = JSON.stringify(canonicalSubject);
return this.subscriptions_[subjectString].pins;
};
/**
* Fetch the most recent message that defined a key
*
@@ -271,6 +285,50 @@ Cosmopolite.prototype.currentProfile = function() {
return this.profile_;
};
/**
* Pin a message to the given subject, storing it and notifying all listeners.
*
* The message is deleted on unpin() or when we disconnect.
*
* The resulting Promise resolve callback is passed an ID that can later be
* passed to unpin().
*
* @param {!*} subject Subject name or object
* @param {!*} message Message string or object
*/
Cosmopolite.prototype.pin = function(subject, message) {
return new Promise(function(resolve, reject) {
var id = this.uuid_();
var args = {
'subject': this.canonicalSubject_(subject),
'message': JSON.stringify(message),
'sender_message_id': id,
};
this.pins_[id] = args;
this.sendRPC_('pin', args, resolve.bind(null, id));
}.bind(this));
};
/**
* Unpin a message from the given subject, storing it and notifying all listeners.
*
* @param {!string} id ID returned by pin()'s resolve callback
*/
Cosmopolite.prototype.unpin = function(id) {
return new Promise(function(resolve, reject) {
var args = {
'subject': this.pins_[id]['subject'],
'sender_message_id': id,
};
delete this.pins_[id];
this.sendRPC_('unpin', args, resolve);
}.bind(this));
};
/**
* Generate a string identifying us to be included in log messages.
*
@@ -301,7 +359,7 @@ Cosmopolite.prototype.uuid_ = function() {
/**
* Canonicalize a subject name or object
*
* @param {!*} subject A simple or complex representation of a subject
* @param {!Object|string|number} subject A simple or complex representation of a subject
* @return {Object} A canonicalized object for RPCs
*/
Cosmopolite.prototype.canonicalSubject_ = function(subject) {
@@ -543,7 +601,7 @@ Cosmopolite.prototype.sendRPCs_ = function(commands, delay) {
/**
* Are we currently clear to put RPCs on the wire?
*
* @return {Boolean} Yes or no?
* @return {boolean} Yes or no?
*/
Cosmopolite.prototype.maySendRPC_ = function() {
if (!(this.namespace_ + ':client_id' in localStorage)) {
@@ -591,6 +649,12 @@ Cosmopolite.prototype.resubscribe_ = function() {
'last_id': last_id,
}
});
subscription.pins.forEach(function(pin) {
rpcs.push({
'command': 'pin',
'arguments': pin,
});
}, this);
}
this.sendRPCs_(rpcs);
};
@@ -681,6 +745,12 @@ Cosmopolite.prototype.onSocketClose_ = function() {
return;
}
// We treat a disconnection as if all pins disappeared
for (var subject in this.subscriptions_) {
var subscription = this.subscriptions_[subject];
subscription.pins.forEach(this.onUnpin_, this);
}
this.createChannel_();
};
@@ -773,6 +843,68 @@ Cosmopolite.prototype.onMessage_ = function(e) {
}
};
/**
* Callback on receiving a 'pin' event from the server
*
* @param {!Object} e Event object
*/
Cosmopolite.prototype.onPin_ = function(e) {
var subjectString = JSON.stringify(e['subject']);
var subscription = this.subscriptions_[subjectString];
if (!subscription) {
console.log(
this.loggingPrefix_(),
'message from unrecognized subject:', e);
return;
}
var duplicate = subscription.pins.some(function(pin) {
return pin['id'] == e.id;
});
if (duplicate) {
console.log(this.loggingPrefix_(), 'duplicate pin:', e);
return;
}
e['message'] = JSON.parse(e['message']);
subscription.pins.push(e);
if ('onPin' in this.callbacks_) {
this.callbacks_['onPin'](e);
}
};
/**
* Callback on receiving an 'unpin' event from the server
*
* @param {!Object} e Event object
*/
Cosmopolite.prototype.onUnpin_ = function(e) {
var subjectString = JSON.stringify(e['subject']);
var subscription = this.subscriptions_[subjectString];
if (!subscription) {
console.log(
this.loggingPrefix_(),
'message from unrecognized subject:', e);
return;
}
var index;
for (index = 0; index < subscription.pins.length; index++) {
var pin = subscription.pins[index];
if (pin['id'] == e['id']) {
break;
}
};
if (index == subscription.pins.length) {
console.log(this.loggingPrefix_(), 'unknown pin:', e);
return;
}
e['message'] = JSON.parse(e['message']);
subscription.pins.splice(index, 1)[0];
if ('onUnpin' in this.callbacks_) {
this.callbacks_['onUnpin'](e);
}
};
/**
* Callback for Cosmopolite event (received via channel or pseudo-channel)
*
@@ -799,6 +931,12 @@ Cosmopolite.prototype.onServerEvent_ = function(e) {
case 'message':
this.onMessage_(e);
break;
case 'pin':
this.onPin_(e);
break;
case 'unpin':
this.onUnpin_(e);
break;
default:
// Client out of date? Force refresh?
console.log(this.loggingPrefix_(), 'unknown channel event:', e);

View File

@@ -29,6 +29,7 @@ a {
</div>
<div>
<input type="button" id="send" value="Send">
<input type="button" id="pin" value="Pin">
</div>
<hr>
@@ -37,6 +38,10 @@ a {
<hr>
<div id="pins"></div>
<hr>
<div id="messages"></div>
<script>
@@ -48,6 +53,8 @@ var key = document.getElementById('key');
var keys = document.getElementById('keys');
var pins = document.getElementById('pins');
var messages = document.getElementById('messages');
var addMessage = function(message) {
@@ -60,6 +67,17 @@ var addMessage = function(message) {
messages.appendChild(messageDiv);
};
var addPin = function(pin) {
var pinDiv = document.createElement('div');
pinDiv.appendChild(document.createTextNode(
(new Date(pin['created'] * 1000)).toString() +
' <????-' + (Math.abs(pin['sender'].hashCode()) % 10000) + '> ' +
pin['message']
));
pinDiv.pinId = pin['id'];
pins.appendChild(pinDiv);
};
window.addEventListener('load', function() {
var googleUser = document.getElementById('google_user');
@@ -99,6 +117,24 @@ window.addEventListener('load', function() {
}
}
},
onPin: function(pin) {
if (subscriptions.value != pin['subject']['name']) {
return;
}
addPin(pin);
},
onUnpin: function(pin) {
if (subscriptions.value != pin['subject']['name']) {
return;
}
for (var i = 0; i < pins.childNodes.length; i++) {
var pinDiv = pins.childNodes[i];
if (pinDiv.pinId == pin['id']) {
pins.removeChild(pinDiv);
break;
}
}
},
};
var debug = new Cosmopolite(callbacks);
@@ -121,12 +157,18 @@ window.addEventListener('load', function() {
debug.sendMessage(subscriptions.value, message.value, key.value);
});
document.getElementById('pin').addEventListener('click', function() {
debug.pin(subscriptions.value, message.value);
});
document.getElementById('subscriptions').addEventListener('change', function() {
messages.innerHTML = '';
pins.innerHTML = '';
if (!subscriptions.value) {
return;
}
debug.getMessages(subscriptions.value).forEach(addMessage);
debug.getPins(subscriptions.value).forEach(addPin);
});
});

View File

@@ -441,6 +441,34 @@ asyncTest('sendMessage ACL', function() {
});
});
asyncTest('pin/unpin', function() {
expect(5);
var subject = randstring();
var message = randstring();
var callbacks = {
'onPin': function(e) {
equal(subject, e['subject']['name'], 'onPin: subject matches');
equal(message, e['message'], 'onPin: message matches');
equal(cosmo.getPins(subject).length, 1);
pin.then(function(id) {
cosmo.unpin(id);
});
},
'onUnpin': function(e) {
equal(subject, e['subject']['name'], 'onUnpin: subject matches');
equal(message, e['message'], 'onUnpin: message matches');
cosmo.shutdown();
start();
},
}
var cosmo = new Cosmopolite(callbacks, null, randstring());
cosmo.subscribe(subject);
var pin = cosmo.pin(subject, message);
});
module('dev_appserver only');