Skip to content

Commit

Permalink
time system is required for telemetry
Browse files Browse the repository at this point in the history
Simplify realtime provider

Due to nasa/openmct#1594 we no longer need
to worry about multiple calls for the same identifier.

Host all routes on same port, use host-relative urls for client

Update readme
  • Loading branch information
Pete Richards authored and psarram committed Oct 17, 2017
1 parent 88c54bf commit b1c24b4
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 111 deletions.
39 changes: 15 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ We're going to define a single `index.html` page. We'll include the Open MCT li
openmct.install(openmct.plugins.MyItems());
openmct.install(openmct.plugins.UTCTimeSystem());
openmct.time.clock('local', {start: -15 * 60 * 1000, end: 0});
openmct.time.timeSystem('utc');
openmct.install(openmct.plugins.Espresso());
openmct.start();
Expand Down Expand Up @@ -128,6 +129,7 @@ Next, we'll update index.html to include the file:
openmct.install(openmct.plugins.MyItems());
openmct.install(openmct.plugins.UTCTimeSystem());
openmct.time.clock('local', {start: -15 * 60 * 1000, end: 0});
openmct.time.timeSystem('utc');
openmct.install(openmct.plugins.Espresso());
openmct.install(DictionaryPlugin());
Expand Down Expand Up @@ -416,7 +418,7 @@ function HistoricalTelemetryPlugin() {
return domainObject.type === 'example.telemetry';
},
request: function (domainObject, options) {
var url = 'http://localhost:8081/telemetry/' +
var url = '/history/' +
domainObject.identifier.key +
'?start=' + options.start +
'&end=' + options.end;
Expand Down Expand Up @@ -457,6 +459,7 @@ With our adapter defined, we need to update `index.html` to include it.
openmct.install(openmct.plugins.MyItems());
openmct.install(openmct.plugins.UTCTimeSystem());
openmct.time.clock('local', {start: -15 * 60 * 1000, end: 0});
openmct.time.timeSystem('utc');
openmct.install(openmct.plugins.Espresso());
openmct.install(DictionaryPlugin());
Expand Down Expand Up @@ -484,39 +487,26 @@ Let's define our new plugin in a file named `realtime-telemetry-plugin.js`.
*/
function RealtimeTelemetryPlugin() {
return function (openmct) {
var socket = new WebSocket('ws://localhost:8082');
var listeners = {};
var socket = new WebSocket(location.origin.replace(/^http/, 'ws') + '/realtime/');
var listener = {};

socket.onmessage = function (event) {
point = JSON.parse(event.data);
if (listeners[point.id]) {
listeners[point.id].forEach(function (l) {
l(point);
});
if (listener[point.id]) {
listener[point.id](point);
}
};

var provider = {
supportsSubscribe: function (domainObject) {
return domainObject.type === 'example.telemetry';
},
subscribe: function (domainObject, callback, options) {
if (!listeners[domainObject.identifier.key]) {
listeners[domainObject.identifier.key] = [];
}
if (!listeners[domainObject.identifier.key].length) {
socket.send('subscribe ' + domainObject.identifier.key);
}
listeners[domainObject.identifier.key].push(callback);
return function () {
listeners[domainObject.identifier.key] =
listeners[domainObject.identifier.key].filter(function (c) {
return c !== callback;
});

if (!listeners[domainObject.identifier.key].length) {
socket.send('unsubscribe ' + domainObject.identifier.key);
}
subscribe: function (domainObject, callback) {
listener[domainObject.identifier.key] = callback;
socket.send('subscribe ' + domainObject.identifier.key);
return function unsubscribe() {
delete listener[domainObject.identifier.key];
socket.send('unsubscribe ' + domainObject.identifier.key);
};
}
};
Expand Down Expand Up @@ -549,6 +539,7 @@ With our realtime telemetry plugin defined, let's include it from `index.html`.
openmct.install(openmct.plugins.MyItems());
openmct.install(openmct.plugins.UTCTimeSystem());
openmct.time.clock('local', {start: -15 * 60 * 1000, end: 0});
openmct.time.timeSystem('utc');
openmct.install(openmct.plugins.Espresso());
openmct.install(DictionaryPlugin());
Expand Down
18 changes: 5 additions & 13 deletions example-server/history-server.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@


var express = require('express');

function HistoryServer(spacecraft, port) {
server = express();
function HistoryServer(spacecraft) {
var router = express.Router();

server.use(function (req, res, next) {
res.set('Access-Control-Allow-Origin', '*');
next();
});

server.get('/telemetry/:pointId', function (req, res) {
router.get('/:pointId', function (req, res) {
var start = +req.query.start;
var end = +req.query.end;
var ids = req.params.pointId.split(',');

var response = ids.reduce(function (resp, id) {
return resp.concat(spacecraft.history[id].filter(function (p) {
return p.timestamp > start && p.timestamp < end;
Expand All @@ -23,8 +16,7 @@ function HistoryServer(spacecraft, port) {
res.status(200).json(response).end();
});

server.listen(port);
console.log('History server now running at http://localhost:' + port);
return router;
}

module.exports = HistoryServer;
Expand Down
70 changes: 34 additions & 36 deletions example-server/realtime-server.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,41 @@
var WebSocketServer = require('ws').Server;

function RealtimeServer(spacecraft, port) {
this.spacecraft = spacecraft;
this.server = new WebSocketServer({ port: port });
this.server.on('connection', this.handleConnection.bind(this));
console.log('Realtime server started at ws://localhost:' + port);
};
var express = require('express');

function RealtimeServer(spacecraft) {

var router = express.Router();

router.ws('/', function (ws) {
var unlisten = spacecraft.listen(notifySubscribers);
var subscribed = {}; // Active subscriptions for this connection
var handlers = { // Handlers for specific requests
subscribe: function (id) {
subscribed[id] = true;
},
unsubscribe: function (id) {
delete subscribed[id];
}
};

function notifySubscribers(point) {
if (subscribed[point.id]) {
ws.send(JSON.stringify(point));
}
}

RealtimeServer.prototype.handleConnection = function (ws) {
var unlisten = this.spacecraft.listen(notifySubscribers);
subscribed = {}, // Active subscriptions for this connection
handlers = { // Handlers for specific requests
subscribe: function (id) {
subscribed[id] = true;
},
unsubscribe: function (id) {
delete subscribed[id];
// Listen for requests
ws.on('message', function (message) {
var parts = message.split(' '),
handler = handlers[parts[0]];
if (handler) {
handler.apply(handlers, parts.slice(1));
}
};
});

function notifySubscribers(point) {
if (subscribed[point.id]) {
ws.send(JSON.stringify(point));
}
}

// Listen for requests
ws.on('message', function (message) {
var parts = message.split(' '),
handler = handlers[parts[0]];
if (handler) {
handler.apply(handlers, parts.slice(1));
}
// Stop sending telemetry updates for this connection when closed
ws.on('close', unlisten);
});

// Stop sending telemetry updates for this connection when closed
ws.on('close', unlisten);
return router;
};



module.exports = RealtimeServer;
module.exports = RealtimeServer;
20 changes: 17 additions & 3 deletions example-server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,23 @@ var RealtimeServer = require('./realtime-server');
var HistoryServer = require('./history-server');
var StaticServer = require('./static-server');

var expressWs = require('express-ws');
var app = require('express')();
expressWs(app);

var spacecraft = new Spacecraft();
var realtimeServer = new RealtimeServer(spacecraft, 8082);
var historyServer = new HistoryServer(spacecraft, 8081);
var staticServer = new StaticServer(8080);
var realtimeServer = new RealtimeServer(spacecraft);
var historyServer = new HistoryServer(spacecraft);
var staticServer = new StaticServer();

app.use('/realtime', realtimeServer);
app.use('/history', historyServer);
app.use('/', staticServer);

var port = process.env.PORT || 8080

app.listen(port, function () {
console.log('Open MCT hosted at http://localhost:' + port);
console.log('History hosted at http://localhost:' + port + '/history');
console.log('Realtime hosted at ws://localhost:' + port + '/realtime');
});
16 changes: 7 additions & 9 deletions example-server/static-server.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
var express = require('express');

function StaticServer(port) {
var server = express();

server.use('/', express.static(__dirname + '/..'));

console.log('Open MCT hosted at http://localhost:' + port);

server.listen(port);
function StaticServer() {
var router = express.Router();

router.use('/', express.static(__dirname + '/..'));

return router
}

module.exports = StaticServer;
module.exports = StaticServer;
2 changes: 1 addition & 1 deletion historical-telemetry-plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function HistoricalTelemetryPlugin() {
return domainObject.type === 'example.telemetry';
},
request: function (domainObject, options) {
var url = 'http://localhost:8081/telemetry/' +
var url = '/history/' +
domainObject.identifier.key +
'?start=' + options.start +
'&end=' + options.end;
Expand Down
1 change: 1 addition & 0 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
openmct.install(openmct.plugins.MyItems());
openmct.install(openmct.plugins.UTCTimeSystem());
openmct.time.clock('local', {start: -15 * 60 * 1000, end: 0});
openmct.time.timeSystem('utc');
openmct.install(openmct.plugins.Espresso());

openmct.install(DictionaryPlugin());
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"homepage": "https://github.com/nasa/openmct-tutorial#readme",
"dependencies": {
"express": "^4.14.1",
"ws": "^2.0.3",
"openmct": "nasa/openmct"
"express-ws": "^3.0.0",
"openmct": "nasa/openmct",
"ws": "^2.0.3"
}
}
33 changes: 10 additions & 23 deletions realtime-telemetry-plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,26 @@
*/
function RealtimeTelemetryPlugin() {
return function (openmct) {
var socket = new WebSocket('ws://localhost:8082');
var listeners = {};
var socket = new WebSocket(location.origin.replace(/^http/, 'ws') + '/realtime/');
var listener = {};

socket.onmessage = function (event) {
point = JSON.parse(event.data);
if (listeners[point.id]) {
listeners[point.id].forEach(function (l) {
l(point);
});
if (listener[point.id]) {
listener[point.id](point);
}
};

var provider = {
supportsSubscribe: function (domainObject) {
return domainObject.type === 'example.telemetry';
},
subscribe: function (domainObject, callback, options) {
if (!listeners[domainObject.identifier.key]) {
listeners[domainObject.identifier.key] = [];
}
if (!listeners[domainObject.identifier.key].length) {
socket.send('subscribe ' + domainObject.identifier.key);
}
listeners[domainObject.identifier.key].push(callback);
return function () {
listeners[domainObject.identifier.key] =
listeners[domainObject.identifier.key].filter(function (c) {
return c !== callback;
});

if (!listeners[domainObject.identifier.key].length) {
socket.send('unsubscribe ' + domainObject.identifier.key);
}
subscribe: function (domainObject, callback) {
listener[domainObject.identifier.key] = callback;
socket.send('subscribe ' + domainObject.identifier.key);
return function unsubscribe() {
delete listener[domainObject.identifier.key];
socket.send('unsubscribe ' + domainObject.identifier.key);
};
}
};
Expand Down

0 comments on commit b1c24b4

Please sign in to comment.