Make subscribe/unsubscribe/sendMessage return Promises that fire on RPC return.

This commit is contained in:
Ian Gulliver
2014-05-17 16:52:28 +03:00
parent 96b17ad6ff
commit 994afde51f
2 changed files with 81 additions and 29 deletions

View File

@@ -84,22 +84,26 @@ Cosmopolite.prototype.subscribe = function(subject, messages, keys) {
if (!this.ready_) { if (!this.ready_) {
throw "cosmopolite: not ready"; throw "cosmopolite: not ready";
} }
return new Promise(function(resolve, reject) {
keys = keys || []; keys = keys || [];
if (subject in this.subscriptions_) { if (subject in this.subscriptions_) {
console.log( console.log(
this.loggingPrefix_(), this.loggingPrefix_(),
'not sending duplication subscription request for subject:', subject); 'not sending duplication subscription request for subject:', subject);
return; resolve();
} }
this.subscriptions_[subject] = { this.subscriptions_[subject] = {
'messages': [], 'messages': [],
'keys': {}, 'keys': {},
}; };
this.sendRPC_('subscribe', { var args = {
'subject': subject, 'subject': subject,
'messages': messages, 'messages': messages,
'keys': keys, 'keys': keys,
}); };
this.sendRPC_('subscribe', args, resolve);
}.bind(this));
}; };
/** /**
@@ -114,10 +118,14 @@ Cosmopolite.prototype.unsubscribe = function(subject) {
if (!this.ready_) { if (!this.ready_) {
throw "cosmopolite: not ready"; throw "cosmopolite: not ready";
} }
return new Promise(function(resolve, reject) {
delete this.subscriptions_[subject]; delete this.subscriptions_[subject];
this.sendRPC_('unsubscribe', { var args = {
'subject': subject, 'subject': subject,
}); }
this.sendRPC_('unsubscribe', args, resolve);
}.bind(this));
}; };
/** /**
@@ -131,6 +139,8 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
if (!this.ready_) { if (!this.ready_) {
throw "cosmopolite: not ready"; throw "cosmopolite: not ready";
} }
return new Promise(function(resolve, reject) {
var args = { var args = {
'subject': subject, 'subject': subject,
'message': JSON.stringify(message), 'message': JSON.stringify(message),
@@ -139,7 +149,8 @@ Cosmopolite.prototype.sendMessage = function(subject, message, key) {
if (key) { if (key) {
args['key'] = key; args['key'] = key;
} }
this.sendRPC_('sendMessage', args); this.sendRPC_('sendMessage', args, resolve);
}.bind(this));
}; };
/** /**

View File

@@ -174,6 +174,47 @@ asyncTest('Complex object', function() {
var cosmo = new Cosmopolite(callbacks, null, randstring()); var cosmo = new Cosmopolite(callbacks, null, randstring());
}); });
asyncTest('sendMessage Promise', function() {
expect(1);
var subject = randstring();
var message = randstring();
var callbacks = {
'onReady': function() {
cosmo.sendMessage(subject, message).then(function() {
ok(true, 'sendMessage Promise fulfilled');
cosmo.shutdown();
start();
});
},
};
var cosmo = new Cosmopolite(callbacks, null, randstring());
});
asyncTest('subscribe/unsubscribe Promise', function() {
expect(2);
var subject = randstring();
var message = randstring();
var callbacks = {
'onReady': function() {
cosmo.subscribe(subject).then(function() {
ok(true, 'subscribe Promise fulfilled');
cosmo.unsubscribe(subject).then(function() {
ok(true, 'unsubscribe Promise fulfilled');
cosmo.shutdown();
start();
});
});
},
};
var cosmo = new Cosmopolite(callbacks, null, randstring());
});
module('dev_appserver only'); module('dev_appserver only');