Skip to content

Commit

Permalink
fix tests - add postgres support - swap of logger - add pg queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Zwiterrion committed Dec 7, 2023
1 parent 07f988f commit f2cdf78
Show file tree
Hide file tree
Showing 26 changed files with 494 additions and 177 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ cli/go
cli/rust
cli/ts
*.wasmo
*.log
5 changes: 5 additions & 0 deletions server/configuration.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ module.exports = {
POOL_SIZE: process.env.PG_POOL_SIZE || 20,
PG_IDLE_TIMEOUT_MILLIS: process.env.PG_IDLE_TIMEOUT_MILLIS || 30000,
CONNECTION_TIMEOUT_MILLIS: process.env.PG_CONNECTION_TIMEOUT_MILLIS || 2000,
},

LOGGER: {
FILE: process.env.LOGGER_FILE || false,
LEVEL: process.env.LOGGER_LEVEL || 'info'
}
}
}
26 changes: 25 additions & 1 deletion server/datastores/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module.exports = class Datastore {
/**
* Initialize the datastore
*/
async initialize() { console.log('initialize') }
async initialize() { Promise.resolve() }
/**
* Get current user
*/
Expand Down Expand Up @@ -119,4 +119,28 @@ module.exports = class Datastore {
* @param {string} newName
*/
patchPluginName = (email, pluginId, newName) => Promise.resolve()

/**
* Check if a job with this id is running
* @param {string} pluginId
*/
isJobRunning = pluginId => Promise.resolve()

/**
* Clean up all legacy tasks on startup
*/
cleanJobs = () => Promise.resolve()

/**
* Remove specific job from datastore
* @param {string} pluginId
*/
removeJob = pluginId => Promise.resolve()

/**
* Get the minimum of time to wait before running the plugin
* @param {string} pluginId
* @returns {Promise<int>}
*/
waitingTimeBeforeNextRun = pluginId => Promise.resolve(-1)
};
2 changes: 1 addition & 1 deletion server/datastores/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let datastore;

if ([STORAGE.S3, STORAGE.DOCKER_S3].includes(ENV.STORAGE)) {
datastore = new S3Datastore();
} else if (STORAGE.DOCKER_S3_POSTGRES, STORAGE.S3_POSTGRES) {
} else if ([STORAGE.DOCKER_S3_POSTGRES, STORAGE.S3_POSTGRES].includes(ENV.STORAGE)) {
datastore = new PgDatastore(new S3Datastore());
} else {
datastore = new Datastore()
Expand Down
125 changes: 104 additions & 21 deletions server/datastores/postgres.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
const crypto = require('crypto')
const cron = require('node-cron');

const { format } = require('../utils');
const { ENV } = require("../configuration");

const Datastore = require('./api');
const S3Datastore = require('./s3');
const { Pool } = require('pg');

const manager = require("../logger");
const log = manager.createLogger('PG');

const configuration = {
host: ENV.PG.HOST,
port: ENV.PG.PORT,
database: ENV.PG.DATABASE,
user: ENV.PG.USER,
password: ENV.PG.PASSWORD,
max: ENV.PG.POOL_SIZE,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
}
const logger = require("../logger");
const { isAString } = require('../utils');

/**
* Class representing PG.
Expand All @@ -42,7 +31,7 @@ module.exports = class PgDatastore extends Datastore {


initialize = async () => {
log.info("Initialize pg client");
logger.info("Initialize pg client");

await this.#sourcesDatastore.initialize();

Expand All @@ -56,12 +45,15 @@ module.exports = class PgDatastore extends Datastore {
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
})
.on('error', err => log.error(err));
.on('error', err => logger.error(err));


return this.#pool.connect()
.then(client => {
client.query("CREATE TABLE IF NOT EXISTS users(id SERIAL, email VARCHAR, content JSONB)",)
Promise.all([
client.query("CREATE TABLE IF NOT EXISTS users(id SERIAL, email VARCHAR, content JSONB)"),
client.query("CREATE TABLE IF NOT EXISTS jobs(id SERIAL, pluginId VARCHAR UNIQUE, created_at TIMESTAMP default current_timestamp)"),
])
.then(() => client.release())
})
}
Expand All @@ -79,7 +71,7 @@ module.exports = class PgDatastore extends Datastore {
getUserPlugins = (email) => {
return this.getUser(email)
.then(user => user ? user.plugins : [])
.catch(err => log.error(err))
.catch(err => logger.error(err))
}

createUserIfNotExists = async (email) => {
Expand Down Expand Up @@ -127,7 +119,46 @@ module.exports = class PgDatastore extends Datastore {
}

putWasmInformationsToS3 = (email, pluginId, newHash, generateWasmName) => {
return this.#sourcesDatastore.putWasmInformationsToS3(email, pluginId, newHash, generateWasmName);
return this.getUser(email)
.then(data => {
const newPlugins = data.plugins.map(plugin => {
if (plugin.pluginId !== pluginId) {
return plugin;
}
let versions = plugin.versions || [];

// convert legacy array
if (versions.length > 0 && isAString(versions[0])) {
versions = versions.map(name => ({ name }))
}

const index = versions.findIndex(item => item.name === generateWasmName);
if (index === -1)
versions.push({
name: generateWasmName,
updated_at: Date.now(),
creator: email
})
else {
versions[index] = {
...versions[index],
updated_at: Date.now(),
creator: email
}
}

return {
...plugin,
last_hash: newHash,
wasm: generateWasmName,
versions
}
});
return this.updateUser(email, {
...data,
plugins: newPlugins
})
})
}

getWasm = (wasmId) => {
Expand Down Expand Up @@ -158,8 +189,8 @@ module.exports = class PgDatastore extends Datastore {
})
}

getConfigurations = (email, pluginId) => {
const plugin = this.#getPlugin(email, pluginId);
getConfigurations = async (email, pluginId) => {
const plugin = await this.#getPlugin(email, pluginId);

const files = [{
ext: 'json',
Expand Down Expand Up @@ -288,4 +319,56 @@ module.exports = class PgDatastore extends Datastore {
})
}))
}

isJobRunning = pluginId => {
return this.#pool.connect()
.then(client => {
return client.query("INSERT INTO jobs(pluginId) VALUES($1) on conflict (pluginId) do nothing", [pluginId])
.then(res => {
client.release()

return res.rowCount === 0;
})
})
}

cleanJobs = () => {
cron.schedule('0 */1 * * * *', this.cleanJobsRunner);
this.cleanJobsRunner()
}

cleanJobsRunner = () => {
return this.#pool.connect()
.then(client => {
return client.query(`DELETE from jobs WHERE created_at < NOW() - make_interval(mins => 1)`)
.then(() => {
client.release()
})
})
}

removeJob = pluginId => {
return this.#pool.connect()
.then(client => {
return client.query("DELETE FROM jobs WHERE pluginId = $1", [pluginId])
.then(() => client.release());
});
}

waitingTimeBeforeNextRun = pluginId => {
return this.#pool.connect()
.then(client => {
return client.query("SELECT created_at FROM jobs WHERE pluginId = $1", [pluginId])
.then(res => {
client.release();

if (res.rowCount === 0)
return null

const interval = 5 * 60 * 1000 - new Date() - new Date(res.rows[0]?.created_at);

return interval > 0 ? interval : 0;
});
});
}
};
65 changes: 35 additions & 30 deletions server/datastores/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ const fs = require('fs-extra');
const { format, isAString } = require('../utils');
const Datastore = require('./api');
const { ENV, STORAGE } = require("../configuration");
const manager = require("../logger");
const logger = require("../logger");
const consumers = require('node:stream/consumers');
const AdmZip = require("adm-zip");

const log = manager.createLogger('S3');

/**
* Class representing S3.
* @extends Datastore
Expand All @@ -39,21 +37,21 @@ module.exports = class S3Datastore extends Datastore {

return this.#state.instance.send(new HeadBucketCommand(params))
.then(() => {
log.info("Using existing bucket")
logger.info("Using existing bucket")
})
.catch(res => {
if (res || res.$metadata.httpStatusCode === 404 ||
res.$metadata.httpStatusCode === 403 ||
res.$metadata.httpStatusCode === 400) {
log.info(`Bucket ${this.#state.Bucket} is missing.`)
logger.info(`Bucket ${this.#state.Bucket} is missing.`)
return new Promise(resolve => {
this.#state.instance.send(new CreateBucketCommand(params), err => {
if (err) {
console.log("Failed to create missing bucket")
console.log(err)
// process.exit(1)
logger.error("Failed to create missing bucket")
logger.error(err)
process.exit(1)
} else {
log.info(`Bucket ${this.#state.Bucket} created.`)
logger.info(`Bucket ${this.#state.Bucket} created.`)
resolve()
}
});
Expand All @@ -66,17 +64,17 @@ module.exports = class S3Datastore extends Datastore {

initialize = async () => {
if (!ENV.S3_BUCKET) {
console.log("[S3 INITIALIZATION](failed): S3 Bucket is missing");
logger.error("[S3 INITIALIZATION](failed): S3 Bucket is missing");
process.exit(1);
}

log.info("Initialize s3 client");
logger.info("Initialize s3 client");

if (ENV.STORAGE === STORAGE.DOCKER_S3 || ENV.STORAGE === STORAGE.DOCKER_S3_POSTGRES) {
const URL = url.parse(ENV.S3_ENDPOINT);

const ip = await new Promise(resolve => dns.lookup(URL.hostname, (_, ip) => resolve(ip)));
log.debug(`${URL.protocol}//${ip}:${URL.port}${URL.pathname}`)
logger.debug(`${URL.protocol}//${ip}:${URL.port}${URL.pathname}`)
this.#state = {
instance: new S3Client({
region: ENV.AWS_DEFAULT_REGION,
Expand All @@ -95,7 +93,7 @@ module.exports = class S3Datastore extends Datastore {
Bucket: ENV.S3_BUCKET
}

log.info("Bucket initialized");
logger.info("Bucket initialized");
}

return this.#createBucketIfMissing();
Expand Down Expand Up @@ -188,7 +186,7 @@ module.exports = class S3Datastore extends Datastore {
Key: 'users.json'
}))
.then(data => new fetch.Response(data.Body).json())
.catch(err => log.error(err))
.catch(err => logger.error(err))
}

updateUser = (email, content) => {
Expand Down Expand Up @@ -300,25 +298,32 @@ module.exports = class S3Datastore extends Datastore {
})
}

getWasm = (wasmId) => {
#getWasm = async name => {
const { instance, Bucket } = this.#state;

return new Promise(resolve => {
instance.send(new GetObjectCommand({
try {
const data = await instance.send(new GetObjectCommand({
Bucket,
wasmId
Key: name
}))
.then(data => new fetch.Response(data.Body).buffer())
.then(data => {
resolve({ content: data });
})
.catch(err => {
resolve({
error: err.Code,
status: err?.$metadata?.httpStatusCode || 404
})
});
});
const content = await new fetch.Response(data.Body).buffer();
return { content };
} catch (err) {
return {
error: err.Code,
status: err?.$metadata?.httpStatusCode || 404
}
}
}

getWasm = (wasmId) => {
return this.#getWasm(wasmId)
.then(out => {
if (out.error) {
return this.#getWasm(wasmId.replace('.wasm', ''));
} else
return out;
})
}

run = (wasm, { input, functionName, wasi }) => {
Expand Down Expand Up @@ -452,7 +457,7 @@ module.exports = class S3Datastore extends Datastore {
}

return instance.send(new DeleteObjectCommand(params))
.catch(err => { log.error(err) });
.catch(err => { logger.error(err) });
}

deletePlugin = (email, pluginId) => {
Expand Down
Loading

0 comments on commit f2cdf78

Please sign in to comment.