Skip to content

Commit

Permalink
Merge pull request #204 from skadefro/master
Browse files Browse the repository at this point in the history
close 1.4.7
  • Loading branch information
skadefro authored Apr 29, 2022
2 parents 36df9d1 + 006fe4d commit 012724b
Show file tree
Hide file tree
Showing 25 changed files with 385 additions and 302 deletions.
3 changes: 2 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"runtimeArgs": [
"--nolazy",
"--trace-warnings",
"--inspect"
"--inspect",
"--preserve-symlinks"
],
"showAsyncStacks": true,
"trace": true,
Expand Down
14 changes: 4 additions & 10 deletions OpenFlow/src/Config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { fetch, toPassportConfig } from "passport-saml-metadata";
import * as fs from "fs";
import * as path from "path";
import * as retry from "async-retry";
import { DatabaseConnection } from "./DatabaseConnection";
// import { Logger } from "./Logger";
import { NoderedUtil } from "@openiap/openflow-api";
import { promiseRetry } from "./Logger";
export class Config {
public static getversion(): string {
let versionfile: string = path.join(__dirname, "VERSION");
Expand Down Expand Up @@ -428,22 +428,16 @@ export class Config {
}
public static async parse_federation_metadata(url: string): Promise<any> {
// if anything throws, we retry
return retry(async bail => {
return promiseRetry(async () => {
const reader: any = await fetch({ url });
if (NoderedUtil.IsNullUndefinded(reader)) { bail(new Error("Failed getting result")); return; }
if (NoderedUtil.IsNullUndefinded(reader)) { throw new Error("Failed getting result"); return; }
const config: any = toPassportConfig(reader);
// we need this, for Office 365 :-/
if (reader.signingCerts && reader.signingCerts.length > 1) {
config.cert = reader.signingCerts;
}
return config;
}, {
retries: 50,
onRetry: function (error: Error, count: number): void {
console.log("retry " + count + " error " + error.message + " getting " + url);
// Logger.instanse.warn("retry " + count + " error " + error.message + " getting " + url);
}
});
}, 10, 1000);
}
public static parseArray(s: string): string[] {
let arr = s.split(",");
Expand Down
7 changes: 7 additions & 0 deletions OpenFlow/src/Crypt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ export class Crypt {
throw new Error('jwt must be provided');
}
const o: any = jsonwebtoken.verify(token, Crypt.encryption_key);
let impostor: string = null;
if (!NoderedUtil.IsNullUndefinded(o) && !NoderedUtil.IsNullUndefinded(o.data) && !NoderedUtil.IsNullEmpty(o.data._id)) {
if (!NoderedUtil.IsNullEmpty(o.data.impostor)) {
impostor = o.data.impostor;
}
}
if (!NoderedUtil.IsNullUndefinded(o) && !NoderedUtil.IsNullUndefinded(o.data) && !NoderedUtil.IsNullEmpty(o.data._id) && o.data._id != WellknownIds.root) {
var id = o.data._id;
o.data = await DBHelper.FindById(o.data._id, token, null);
Expand All @@ -136,6 +142,7 @@ export class Crypt {
var b = true;
}
}
if (!NoderedUtil.IsNullEmpty(impostor)) o.data.impostor = impostor;
return TokenUser.assign(o.data);

}
Expand Down
66 changes: 46 additions & 20 deletions OpenFlow/src/DBHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,30 @@ import { LoginProvider } from "./LoginProvider";
import * as cacheManager from "cache-manager";
// var cacheManager = require('cache-manager');
var redisStore = require('cache-manager-ioredis');
var mongoStore = require('@skadefro/cache-manager-mongodb');

export class DBHelper {

public static memoryCache: any;
public static mongoCache: any;
public static async init() {
if (!NoderedUtil.IsNullUndefinded(this.memoryCache)) return;
if (Config.cache_store_type == "redis") {

this.mongoCache = cacheManager.caching({
store: mongoStore,
uri: Config.mongodb_url,
options: {
collection: "_cache",
compression: false,
poolSize: 5
}
});

if (Config.cache_store_type == "mongodb") {
this.memoryCache = this.mongoCache;
DBHelper.ensureotel();
return;
} else if (Config.cache_store_type == "redis") {
this.memoryCache = cacheManager.caching({
store: redisStore,
host: Config.cache_store_redis_host,
Expand All @@ -32,6 +49,7 @@ export class DBHelper {
redisClient.on('error', (error) => {
console.log(error);
});

DBHelper.ensureotel();
return;
}
Expand All @@ -45,8 +63,6 @@ export class DBHelper {
}
public static async clearCache(reason: string) {
this.init();
// Auth.ensureotel();
// this.memoryCache.reset();
var keys: string[];
if (Config.cache_store_type == "redis") {
keys = await this.memoryCache.keys('*');
Expand All @@ -71,16 +87,6 @@ export class DBHelper {
DBHelper.item_cache = Logger.otel.meter.createValueObserver("openflow_item_cache_count", {
description: 'Total number of cached items'
}, async (res) => {
// let keys: string[] = Object.keys(this.authorizationCache);
// let types = {};
// for (let i = keys.length - 1; i >= 0; i--) {
// if (!types[this.authorizationCache[keys[i]].type]) types[this.authorizationCache[keys[i]].type] = 0;
// types[this.authorizationCache[keys[i]].type]++;
// }
// keys = Object.keys(types);
// for (let i = keys.length - 1; i >= 0; i--) {
// res.observe(types[keys[i]], { ...Logger.otel.defaultlabels, type: keys[i] })
// }
var keys: any = null;
try {
if (Config.cache_store_type == "redis") {
Expand All @@ -105,7 +111,6 @@ export class DBHelper {
try {
if (NoderedUtil.IsNullEmpty(_id)) return null;
let item = await this.memoryCache.wrap("users" + _id, () => {
// if (jwt === null || jwt == undefined || jwt == "") { jwt = Crypt.rootToken(); }
if (Config.log_cache) Logger.instanse.debug("Add user to cache : " + _id);
return Config.db.getbyid<User>(_id, "users", Crypt.rootToken(), true, span);
});
Expand All @@ -120,36 +125,48 @@ export class DBHelper {
Logger.otel.endSpan(span);
}
}
public static async FindRequestTokenID(key: string, parent: Span): Promise<any> {
public static async FindRequestTokenID(key: string, parent: Span): Promise<TokenRequest> {
this.init();
const span: Span = Logger.otel.startSubSpan("dbhelper.FindRequestTokenID", parent);
try {
if (NoderedUtil.IsNullEmpty(key)) return null;
return await this.memoryCache.get("requesttoken" + key);
if (Config.cache_store_type == "redis") {
return await this.memoryCache.get("requesttoken" + key);
} else {
return await this.mongoCache.get("requesttoken" + key);
}
} catch (error) {
span?.recordException(error);
throw error;
} finally {
Logger.otel.endSpan(span);
}
}
public static async AdddRequestTokenID(key: string, data: any, parent: Span): Promise<any> {
public static async AdddRequestTokenID(key: string, data: any, parent: Span): Promise<TokenRequest> {
this.init();
const span: Span = Logger.otel.startSubSpan("dbhelper.FindRequestTokenID", parent);
try {
return await this.memoryCache.set("requesttoken" + key, data);
if (Config.cache_store_type == "redis") {
return await this.memoryCache.set("requesttoken" + key, data);
} else {
return await this.mongoCache.set("requesttoken" + key, data);
}
} catch (error) {
span?.recordException(error);
throw error;
} finally {
Logger.otel.endSpan(span);
}
}
public static async RemoveRequestTokenID(key: string, parent: Span): Promise<any> {
public static async RemoveRequestTokenID(key: string, parent: Span): Promise<TokenRequest> {
this.init();
const span: Span = Logger.otel.startSubSpan("dbhelper.FindRequestTokenID", parent);
try {
return await this.memoryCache.del("requesttoken" + key);
if (Config.cache_store_type == "redis") {
return await this.memoryCache.del("requesttoken" + key);
} else {
return await this.mongoCache.del("requesttoken" + key);
}
} catch (error) {
span?.recordException(error);
throw error;
Expand Down Expand Up @@ -598,4 +615,13 @@ export class DBHelper {
return { $set: updatedoc };
}
}
}
export class TokenRequest extends Base {
constructor(code: string) {
super();
this._type = "tokenrequest";
if (NoderedUtil.IsNullEmpty(code)) this.code = "";
}
public code: string;
public jwt: string;
}
42 changes: 16 additions & 26 deletions OpenFlow/src/DatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ const jsondiffpatch = require('jsondiffpatch').create({
});


Object.defineProperty(Promise, 'retry', {
configurable: true,
writable: true,
value: function retry(retries, executor) {
// console.warn(`${retries} retries left!`)
if (typeof retries !== 'number') {
throw new TypeError('retries is not a number')
}
return new Promise(executor).catch(error => retries > 0
? (Promise as any).retry(retries - 1, executor)
: Promise.reject(error)
)
}
})
export type GetDocumentVersionOptions = {
collectionname: string,
id: string,
Expand Down Expand Up @@ -159,18 +145,22 @@ export class DatabaseConnection extends events.EventEmitter {
}
const span: Span = Logger.otel.startSubSpan("db.connect", parent);
this.streams = [];
this.cli = await (Promise as any).retry(100, (resolve, reject) => {
const options: MongoClientOptions = { minPoolSize: Config.mongodb_minpoolsize, autoReconnect: false, useNewUrlParser: true, useUnifiedTopology: true };
MongoClient.connect(this.mongodburl, options).then((cli) => {
this.replicat = (cli as any).s.options.replicaSet;
resolve(cli);
span?.addEvent("Connected to mongodb");
}).catch((reason) => {
span?.recordException(reason);
console.error(reason);
reject(reason);
});
});
// this.cli = await (Promise as any).retry(100, (resolve, reject) => {
// const options: MongoClientOptions = { minPoolSize: Config.mongodb_minpoolsize, autoReconnect: false, useNewUrlParser: true, useUnifiedTopology: true };
// MongoClient.connect(this.mongodburl, options).then((cli) => {
// this.replicat = (cli as any).s.options.replicaSet;
// resolve(cli);
// span?.addEvent("Connected to mongodb");
// }).catch((reason) => {
// span?.recordException(reason);
// console.error(reason);
// reject(reason);
// });
// });
span?.addEvent("connecting to mongodb");
const options: MongoClientOptions = { minPoolSize: Config.mongodb_minpoolsize, autoReconnect: false, useNewUrlParser: true, useUnifiedTopology: true };
this.cli = await MongoClient.connect(this.mongodburl, options);
span?.addEvent("Connected to mongodb");

Logger.instanse.silly(`Really connected to mongodb`);
const errEvent = (error) => {
Expand Down
16 changes: 16 additions & 0 deletions OpenFlow/src/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ import { i_license_file, i_nodered_driver, i_otel } from "./commoninterfaces";
import { Config } from "./Config";
import { dockerdriver } from "./dockerdriver";
const path = require('path');

const MAX_RETRIES_DEFAULT = 5
export async function promiseRetry<T>(
fn: () => Promise<T>,
retries = MAX_RETRIES_DEFAULT,
retryIntervalMillis: number,
previousError?: Error
): Promise<T> {
return !retries
? Promise.reject(previousError)
: fn().catch(async (error) => {
await new Promise((resolve) => setTimeout(resolve, retryIntervalMillis))
return promiseRetry(fn, retries - 1, retryIntervalMillis, error)
})
}

export class Logger {
public static otel: i_otel;
public static License: i_license_file;
Expand Down
17 changes: 10 additions & 7 deletions OpenFlow/src/LoginProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ const multer = require('multer');
// const GridFsStorage = require('multer-gridfs-storage');
import { GridFsStorage } from "multer-gridfs-storage";
import { GridFSBucket, ObjectID, Binary } from "mongodb";
import { Base, User, NoderedUtil, TokenUser, WellknownIds, Rights, Role } from "@openiap/openflow-api";
import { DBHelper } from "./DBHelper";
import { Base, User, NoderedUtil, TokenUser, WellknownIds, Rights, Role, InsertOrUpdateOneMessage } from "@openiap/openflow-api";
import { DBHelper, TokenRequest } from "./DBHelper";
import { Span } from "@opentelemetry/api";
import { Logger } from "./Logger";
import { DatabaseConnection } from "./DatabaseConnection";
Expand Down Expand Up @@ -409,7 +409,7 @@ export class LoginProvider {
const span: Span = Logger.otel.startSpan("LoginProvider.login");
try {
const key = req.body.key;
var exists = await DBHelper.FindRequestTokenID(key, span);
let exists: TokenRequest = await DBHelper.FindRequestTokenID(key, span);
if (!NoderedUtil.IsNullUndefinded(exists)) return res.status(500).send({ message: "Illegal key" });
await DBHelper.AdddRequestTokenID(key, {}, span);
res.status(200).send({ message: "ok" });
Expand All @@ -424,11 +424,11 @@ export class LoginProvider {
const span: Span = Logger.otel.startSpan("LoginProvider.login");
try {
const key = req.query.key;
var exists = await DBHelper.FindRequestTokenID(key, span);
let exists: TokenRequest = null;
exists = await DBHelper.FindRequestTokenID(key, span);
if (NoderedUtil.IsNullUndefinded(exists)) {
res.status(200).send({ message: "Illegal key" });
return;
// return res.status(500).send({ message: "Illegal key" });
}

if (!NoderedUtil.IsNullEmpty(exists.jwt)) {
Expand Down Expand Up @@ -478,7 +478,7 @@ export class LoginProvider {
if (!NoderedUtil.IsNullEmpty(key)) {
if (req.user) {
const user: User = await DBHelper.FindById(req.user._id, undefined, span);
var exists = await DBHelper.FindRequestTokenID(key, span);
var exists: TokenRequest = await DBHelper.FindRequestTokenID(key, span);
if (!NoderedUtil.IsNullUndefinded(exists)) {
await DBHelper.AdddRequestTokenID(key, { jwt: Crypt.createToken(user, Config.longtoken_expires_in) }, span);
res.cookie("requesttoken", "", { expires: new Date(0) });
Expand Down Expand Up @@ -523,7 +523,10 @@ export class LoginProvider {
} catch (error) {
span?.recordException(error);
console.error(error.message ? error.message : error);
return res.status(500).send({ message: error.message ? error.message : error });
try {
return res.status(500).send({ message: error.message ? error.message : error });
} catch (error) {
}
}
Logger.otel.endSpan(span);
});
Expand Down
Loading

0 comments on commit 012724b

Please sign in to comment.