Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/drizzle postgres #894

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lib-cov
*.gz
.vscode
.yarn-cache
.DS_Store

pids
logs
Expand Down
23 changes: 11 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
version: "3.9"

volumes:
mongo-data:
opensensemap_backend:

services:
db:
image: mongo:5
container_name: osem-dev-mongo
image: timescale/timescaledb-ha:pg15.8-ts2.17.1
command:
- -cshared_preload_libraries=timescaledb,pg_cron
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=opensensemap
ports:
- "27017:27017"
- 5432:5432
volumes:
- mongo-data:/data/db
# - ./dumps/boxes:/exports/boxes
# - ./dumps/measurements:/exports/measurements
- ./.scripts/mongodb/osem_admin.sh:/docker-entrypoint-initdb.d/osem_admin.sh
# - ./.scripts/mongodb/osem_seed_boxes.sh:/docker-entrypoint-initdb.d/osem_seed_boxes.sh
# - ./.scripts/mongodb/osem_seed_measurements.sh:/docker-entrypoint-initdb.d/osem_seed_measurements.sh
- opensensemap_backend:/home/postgres/pgdata
19 changes: 12 additions & 7 deletions packages/api/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,26 @@
server.use(debugLogger);
}

db.connect()
.then(function () {
// attach Routes
const run = async function () {
try {
// TODO: Get a client from the Pool and test connection

Check failure on line 58 in packages/api/app.js

View workflow job for this annotation

GitHub Actions / 🔬 Lint code

Unexpected 'todo' comment: 'TODO: Get a client from the Pool and...'
await db.connect();

routes(server);

// start the server
server.listen(Number(config.get('port')), function () {
stdLogger.logger.info(`${server.name} listening at ${server.url}`);
postToMattermost(`openSenseMap API started. Version: ${getVersion}`);
});
})
.catch(function (err) {
stdLogger.logger.fatal(err, 'Couldn\'t connect to MongoDB. Exiting...');
} catch (error) {
stdLogger.logger.fatal(error, 'Couldn\'t connect to PostgreSQL. Exiting...');
process.exit(1);
});
}
};

// 🔥 Fire up API
run();

// InternalServerError is the only error we want to report to Honeybadger..
server.on('InternalServer', function (req, res, err, callback) {
Expand Down
123 changes: 69 additions & 54 deletions packages/api/lib/controllers/boxesController.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
{ Box, User, Claim } = require('@sensebox/opensensemap-api-models'),
{ addCache, clearCache, checkContentType, redactEmail, postToMattermost } = require('../helpers/apiUtils'),
{ point } = require('@turf/helpers'),
classifyTransformer = require('../transformers/classifyTransformer'),

Check failure on line 63 in packages/api/lib/controllers/boxesController.js

View workflow job for this annotation

GitHub Actions / 🔬 Lint code

'classifyTransformer' is assigned a value but never used
{
retrieveParameters,
parseAndValidateTimeParamsForFindAllBoxes,
Expand All @@ -70,6 +70,11 @@
} = require('../helpers/userParamHelpers'),
handleError = require('../helpers/errorHandler'),
jsonstringify = require('stringify-stream');
const { findDeviceById } = require('@sensebox/opensensemap-api-models/src/box/box');
const { createDevice, findDevices, findDevicesMinimal, findTags, updateDevice, findById, generateSketch } = require('@sensebox/opensensemap-api-models/src/device');
const { findByUserId } = require('@sensebox/opensensemap-api-models/src/password');
const { getSensorsWithLastMeasurement } = require('@sensebox/opensensemap-api-models/src/sensor');
const { removeDevice, checkPassword } = require('@sensebox/opensensemap-api-models/src/user/user');

/**
* @apiDefine Addons
Expand Down Expand Up @@ -154,13 +159,14 @@
*/
const updateBox = async function updateBox (req, res) {
try {
let box = await Box.findBoxById(req._userParams.boxId, { lean: false, populate: false });
box = await box.updateBox(req._userParams);
if (box._sensorsChanged === true) {
req.user.mail('newSketch', box);
}

res.send({ code: 'Ok', data: box.toJSON({ includeSecrets: true }) });
let device = await findDeviceById(req._userParams.boxId);
device = await updateDevice(device.id, req._userParams);
// if (box._sensorsChanged === true) {
// req.user.mail('newSketch', box);
// }

// res.send({ code: 'Ok', data: box.toJSON({ includeSecrets: true }) });
res.send({ code: 'Ok', data: device });
clearCache(['getBoxes']);
} catch (err) {
return handleError(err);
Expand Down Expand Up @@ -220,7 +226,7 @@
* @apiParam {String=json,geojson} [format=json] the format the sensor data is returned in.
* @apiParam {String} [grouptag] only return boxes with this grouptag, allows to specify multiple separated with a comma
* @apiParam {String="homeEthernet","homeWifi","homeEthernetFeinstaub","homeWifiFeinstaub","luftdaten_sds011","luftdaten_sds011_dht11","luftdaten_sds011_dht22","luftdaten_sds011_bmp180","luftdaten_sds011_bme280"} [model] only return boxes with this model, allows to specify multiple separated with a comma
* @apiParam {Boolean="true","false"} [classify=false] if specified, the api will classify the boxes accordingly to their last measurements.
* @apiParam @deprecated {Boolean="true","false"} [classify=false] if specified, the api will classify the boxes accordingly to their last measurements.
* @apiParam {Boolean="true","false"} [minimal=false] if specified, the api will only return a minimal set of box metadata consisting of [_id, updatedAt, currentLocation, exposure, name] for a fast response.
* @apiParam {Boolean="true","false"} [full=false] if true the API will return populated lastMeasurements (use this with caution for now, expensive on the database)
* @apiParam {Number} [near] A comma separated coordinate, if specified, the api will only return senseBoxes within maxDistance (in m) of this location
Expand Down Expand Up @@ -305,38 +311,41 @@
let stringifier = jsonstringify({ open: '[', close: ']' });
// format
if (req._userParams.format === 'geojson') {
stringifier = jsonstringify({ open: '{"type":"FeatureCollection","features":[', close: ']}' }, geoJsonStringifyReplacer);

Check failure on line 314 in packages/api/lib/controllers/boxesController.js

View workflow job for this annotation

GitHub Actions / 🔬 Lint code

'stringifier' is assigned a value but never used
}

try {
let stream;
let devices;

// Search boxes by name
// Directly return results and do nothing else
if (req._userParams.name) {
stream = await Box.findBoxes(req._userParams);
// stream = await Box.findBoxes(req._userParams);
devices = await findDevices(req._userParams, { id: true, name: true, location: true });
} else if (req._userParams.minimal === 'true') {
devices = await findDevicesMinimal(req._userParams, { id: true, name: true, exposure: true, location: true, status: true });
// stream = await Box.findBoxesMinimal(req._userParams);
} else {
if (req._userParams.minimal === 'true') {
stream = await Box.findBoxesMinimal(req._userParams);
} else {
stream = await Box.findBoxesLastMeasurements(req._userParams);
}

if (req._userParams.classify === 'true') {
stream = stream
.pipe(new classifyTransformer())
.on('error', function (err) {
res.end(`Error: ${err.message}`);
});
}
// stream = await Box.findBoxesLastMeasurements(req._userParams);
}

stream
.pipe(stringifier)
.on('error', function (err) {
res.end(`Error: ${err.message}`);
})
.pipe(res);
// Deprecated: classify is performed by database
// if (req._userParams.classify === 'true') {
// stream = stream
// .pipe(new classifyTransformer())
// .on('error', function (err) {
// res.end(`Error: ${err.message}`);
// });
// }
// }

// stream
// .pipe(stringifier)
// .on('error', function (err) {
// res.end(`Error: ${err.message}`);
// })
// .pipe(res);
res.send(devices);
} catch (err) {
return handleError(err);
}
Expand Down Expand Up @@ -451,16 +460,17 @@
const { format, boxId } = req._userParams;

try {
const box = await Box.findBoxById(boxId);
const device = await findDeviceById(boxId);
const sensorsWithMeasurements = await getSensorsWithLastMeasurement(boxId);

if (format === 'geojson') {
const coordinates = box.currentLocation.coordinates;
box.currentLocation = undefined;
box.loc = undefined;
device.sensors = sensorsWithMeasurements;

return res.send(point(coordinates, box));
if (format === 'geojson') { // Handle with PostGIS Extension
const coordinates = [device.longitude, device.latitude];

return res.send(point(coordinates, device));
}
res.send(box);
res.send(device);
} catch (err) {
return handleError(err);
}
Expand Down Expand Up @@ -499,18 +509,19 @@
*/
const postNewBox = async function postNewBox (req, res) {
try {
let newBox = await req.user.addBox(req._userParams);
newBox = await Box.populate(newBox, Box.BOX_SUB_PROPS_FOR_POPULATION);
res.send(201, { message: 'Box successfully created', data: newBox });
const newDevice = await createDevice(req.user.id, req._userParams);
// TODO: only return specific fields newBox = await Box.populate(newBox, Box.BOX_SUB_PROPS_FOR_POPULATION);

Check failure on line 513 in packages/api/lib/controllers/boxesController.js

View workflow job for this annotation

GitHub Actions / 🔬 Lint code

Unexpected 'todo' comment: 'TODO: only return specific fields newBox...'

res.send(201, { message: 'Box successfully created', data: newDevice });
clearCache(['getBoxes', 'getStats']);
postToMattermost(
`New Box: ${req.user.name} (${redactEmail(
req.user.email
)}) just registered "${newBox.name}" (${
newBox.model
)}) just registered "${newDevice.name}" (${
newDevice.model
}): [https://opensensemap.org/explore/${
newBox._id
}](https://opensensemap.org/explore/${newBox._id})`
newDevice.id
}](https://opensensemap.org/explore/${newDevice.id})`
);
} catch (err) {
return handleError(err);
Expand All @@ -537,7 +548,7 @@
const getSketch = async function getSketch (req, res) {
res.header('Content-Type', 'text/plain; charset=utf-8');
try {
const box = await Box.findBoxById(req._userParams.boxId, { populate: false, lean: false });
const device = await findById(req._userParams.boxId, { accessToken: true, sensors: true });

const params = {
serialPort: req._userParams.serialPort,
Expand All @@ -553,11 +564,11 @@
};

// pass access token only if useAuth is true and access_token is available
if (box.access_token) {
params.access_token = box.access_token;
if (device.useAuth && device.accessToken) {
params.access_token = device.accessToken.token;
}

res.send(box.getSketch(params));
res.send(generateSketch(device, params));
} catch (err) {
return handleError(err);
}
Expand All @@ -577,11 +588,14 @@
const { password, boxId } = req._userParams;

try {
await req.user.checkPassword(password);
const box = await req.user.removeBox(boxId);
res.send({ code: 'Ok', message: 'box and all associated measurements marked for deletion' });
const hashedPassword = await findByUserId(req.user.id);

await checkPassword(password, hashedPassword);
const device = await removeDevice(boxId);

res.send({ code: 'Ok', message: 'device and all associated measurements marked for deletion' });
clearCache(['getBoxes', 'getStats']);
postToMattermost(`Box deleted: ${req.user.name} (${redactEmail(req.user.email)}) just deleted "${box.name}" (${boxId})`);
postToMattermost(`Device deleted: ${req.user.name} (${redactEmail(req.user.email)}) just deleted "${device.name}" (${boxId})`);

} catch (err) {
return handleError(err);
Expand Down Expand Up @@ -698,10 +712,11 @@

const getAllTags = async function getAllTags (req, res) {
try {
const grouptags = await Box.find().distinct('grouptag')
.exec();
const tags = await findTags();
// const grouptags = await Box.find().distinct('grouptag')
// .exec();

res.send({ code: 'Ok', data: grouptags });
res.send({ code: 'Ok', data: tags });
} catch (err) {
return handleError(err);
}
Expand Down
35 changes: 21 additions & 14 deletions packages/api/lib/controllers/measurementsController.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const { findAccessToken, findById, saveMeasurement, saveMeasurements } = require('@sensebox/opensensemap-api-models/src/device');
const { hasDecoder, decodeMeasurements } = require('@sensebox/opensensemap-api-models/src/measurement');
const {
BadRequestError,
UnsupportedMediaTypeError,
Expand Down Expand Up @@ -353,22 +355,24 @@ const getDataByGroupTag = async function getDataByGroupTag (req, res) {
* @apiHeader {String} Authorization Box' unique access_token. Will be used as authorization token if box has auth enabled (e.g. useAuth: true)
*/
const postNewMeasurement = async function postNewMeasurement (req, res) {
const { boxId, sensorId, value, createdAt, location } = req._userParams;
const { boxId: deviceId, sensorId, value, createdAt, location } = req._userParams;

try {
const box = await Box.findBoxById(boxId, { populate: false, lean: false });
if (box.useAuth && box.access_token && box.access_token !== req.headers.authorization) {
return Promise.reject(new UnauthorizedError('Box access token not valid!'));
const device = await findById(deviceId, { sensors: true });
const deviceAccessToken = await findAccessToken(deviceId);
if (device.useAuth && deviceAccessToken.token && deviceAccessToken.token !== req.headers.authorization) {
return Promise.reject(new UnauthorizedError('Device access token not valid!'));
}

const [measurement] = await Measurement.decodeMeasurements([{
const [measurement] = await decodeMeasurements([{
sensor_id: sensorId,
value,
createdAt,
location
}]);
await box.saveMeasurement(measurement);
res.send(201, 'Measurement saved in box');
// await box.saveMeasurement(measurement);
await saveMeasurement(device, measurement);
res.send(201, 'Measurement saved in device');
} catch (err) {
return handleError(err);
}
Expand Down Expand Up @@ -454,7 +458,7 @@ const postNewMeasurement = async function postNewMeasurement (req, res) {
* }
*/
const postNewMeasurements = async function postNewMeasurements (req, res) {
const { boxId, luftdaten, hackair } = req._userParams;
const { boxId: deviceId, luftdaten, hackair } = req._userParams;
let contentType = req.getContentType();

if (hackair) {
Expand All @@ -463,21 +467,24 @@ const postNewMeasurements = async function postNewMeasurements (req, res) {
contentType = 'luftdaten';
}

if (Measurement.hasDecoder(contentType)) {
if (hasDecoder(contentType)) {
try {
const box = await Box.findBoxById(boxId, { populate: false, lean: false, projection: { sensors: 1, locations: 1, lastMeasurementAt: 1, currentLocation: 1, model: 1, access_token: 1, useAuth: 1 } });
const device = await findById(deviceId, { sensors: true });
const deviceAccessToken = await findAccessToken(deviceId);
// const box = await Box.findBoxById(boxId, { populate: false, lean: false, projection: { sensors: 1, locations: 1, lastMeasurementAt: 1, currentLocation: 1, model: 1, access_token: 1, useAuth: 1 } });

// if (contentType === 'hackair' && box.access_token !== req.headers.authorization) {
// throw new UnauthorizedError('Box access token not valid!');
// }

// authorization for all boxes that have not opt out
if ((box.useAuth || contentType === 'hackair') && box.access_token && box.access_token !== req.headers.authorization) {
return Promise.reject(new UnauthorizedError('Box access token not valid!'));
if ((device.useAuth || contentType === 'hackair') && deviceAccessToken.token && deviceAccessToken.token !== req.headers.authorization) {
return Promise.reject(new UnauthorizedError('Device access token not valid!'));
}

const measurements = await Measurement.decodeMeasurements(req.body, { contentType, sensors: box.sensors });
await box.saveMeasurementsArray(measurements);
const measurements = await decodeMeasurements(req.body, { contentType, sensors: device.sensors });
// await box.saveMeasurementsArray(measurements);
await saveMeasurements(device, measurements);
res.send(201, 'Measurements saved in box');
} catch (err) {
return handleError(err);
Expand Down
16 changes: 7 additions & 9 deletions packages/api/lib/controllers/statisticsController.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
'use strict';

const { Box, Measurement } = require('@sensebox/opensensemap-api-models'),
const { measurementTable } = require('@sensebox/opensensemap-api-models/schema/schema');
const { rowCount, rowCountTimeBucket } = require('@sensebox/opensensemap-api-models/src/stats');

const { Box } = require('@sensebox/opensensemap-api-models'),
{ UnprocessableEntityError, BadRequestError } = require('restify-errors'),
idwTransformer = require('../transformers/idwTransformer'),
{ addCache, createDownloadFilename, computeTimestampTruncationLength, csvStringifier } = require('../helpers/apiUtils'),
Expand Down Expand Up @@ -28,14 +31,9 @@ const getStatistics = async function getStatistics (req, res) {
const { human } = req._userParams;
try {
let results = await Promise.all([
Box.count({}),
Measurement.count({}),
Measurement.count({
createdAt: {
'$gt': new Date(Date.now() - 60000),
'$lt': new Date()
}
})
rowCount('device'),
rowCount('sensor'),
rowCountTimeBucket(measurementTable, 'time', 60000)
]);
if (human === 'true') {
results = results.map(r => millify.default(r).toString());
Expand Down
Loading
Loading