Skip to content

Commit

Permalink
add migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
Zwiterrion committed Jun 7, 2024
1 parent 307634e commit 7ccdcc4
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 56 deletions.
10 changes: 7 additions & 3 deletions server/datastores/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ module.exports = class Datastore {
/**
* Initialize the datastore
*/
async initialize() { Promise.resolve() }
initialize() { Promise.resolve() }
/**
* Check and apply migrations to legacy database
*/
applyMigrations() { Promise.resolve() }
/**
* List of created plugins, whole database
*/
Expand Down Expand Up @@ -107,14 +111,14 @@ module.exports = class Datastore {
* @returns sources as buffer
*/
getSources = pluginId => { return Promise.resolve() }

/**
* Get log files of specific plugin at add them to files
* @param {string} pluginId
* @param {any} files
* @returns
*/
getConfigurationsFile (pluginId, files) { return Promise.resolve() }
getConfigurationsFile(pluginId, files) { return Promise.resolve() }

/**
* Fetch configuration file
Expand Down
50 changes: 50 additions & 0 deletions server/datastores/migrations/pg-1.22.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
const consumers = require('node:stream/consumers');
const { GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");
const fetch = require('node-fetch');
const { fromUtf8 } = require("@aws-sdk/util-utf8-node");

async function hasPluginsFile(pgDatastore) {
const client = await pgDatastore.pool.connect();

const res = await client.query("SELECT * FROM plugins");
client.release()

return res.rowCount > 0
}

async function getUsers(pgDatastore) {
const client = await pgDatastore.pool.connect();

const res = await client.query("SELECT * FROM users");
client.release()

return res.rows
}

async function createPlugin(pgDatastore, plugin) {
pgDatastore.pool.connect()
.then(client => {
return client.query("INSERT INTO plugins(id, content) VALUES($1, $2::jsonb)", [plugin.pluginId, JSON.stringify({ plugin })])
.then(() => client.release())
})
}

async function PG_1_22(pgDatastore) {
const migrated = await hasPluginsFile(pgDatastore)

console.log('apply PG_1_22', migrated)

if (!migrated) {
const usersPlugins = await getUsers(pgDatastore)

console.log('migrate users : ', users)

return usersPlugins.map(user => {
return Promise.all(user.plugins.map(plugin => createPlugin(pgDatastore, plugin)))
})
}
}

module.exports = {
PG_1_22
}
126 changes: 126 additions & 0 deletions server/datastores/migrations/s3-1.22.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
const consumers = require('node:stream/consumers');
const { GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");
const fetch = require('node-fetch');
const { fromUtf8 } = require("@aws-sdk/util-utf8-node");

async function hasPluginsFile(s3Datastore) {
const { instance, Bucket } = s3Datastore.state;

try {
const rawData = await instance.send(new GetObjectCommand({
Bucket,
Key: 'plugins.json'
}))
await new fetch.Response(rawData.Body).json();

return true
} catch (err) {
return false
}
}

async function getUsers(s3Datastore) {
const { instance, Bucket } = s3Datastore.state;
try {
const rawData = await instance.send(new GetObjectCommand({
Bucket,
Key: 'users.json'
}))
return new fetch.Response(rawData.Body).json();
} catch (err) {
return []
}
}

function getUser(s3Datastore, email) {
const { instance, Bucket } = s3Datastore.state;

return new Promise(resolve => {
instance.send(new GetObjectCommand({
Bucket,
Key: `${email}.json`
}))
.then(data => {
try {
if (data && data.Body) {
consumers.json(data.Body)
.then(resolve)
}
else
resolve({})
} catch (_err) {
resolve({})
}
})
.catch(_err => {
resolve({})
})
})
}

function getUserPlugins(s3Datastore, email) {
return getUser(s3Datastore, email)
.then(data => data.plugins || [])
}

async function createPlugin(s3Datastore, email, pluginId, content) {
const { instance, Bucket } = s3Datastore.state;

console.log(content)

const params = {
Bucket,
Key: `${pluginId}.json`,
Body: fromUtf8(JSON.stringify(content)),
ContentType: 'application/json',
}

return instance.send(new PutObjectCommand(params))
}

async function S3_1_22(s3Datastore) {
const migrated = await hasPluginsFile(s3Datastore)

console.log('apply S3_1_22', migrated)

if (!migrated) {
const users = await getUsers(s3Datastore)

console.log('migrate users : ', users)

const usersPlugins = await Promise.all(users.map(user => getUserPlugins(s3Datastore, user)));

await Promise.all(usersPlugins.map((userPlugins, idx) => {
return Promise.all(userPlugins.map(plugin => createPlugin(s3Datastore, users[idx], plugin.pluginId, {
...plugin,
admins: [users[idx]],
users: []
})))
}))

// TODO - problem came from the email formatter [email protected] and wasmototools

const pluginsByUser = usersPlugins.reduce((acc, userPlugins, idx) => {
return [...acc, ...userPlugins.map(plugin => ({
plugin: {
...plugin,
admins: [users[idx]],
users: []
}, email: users[idx]
}))]
}, [])

pluginsByUser
.reduce((promise, item) => promise.then(() => new Promise(resolve => {
const { plugin, email } = item;

console.log('add ', email, plugin)
s3Datastore.addPluginToList(email, plugin)
.then(resolve)
})), Promise.resolve())
}
}

module.exports = {
S3_1_22
}
46 changes: 26 additions & 20 deletions server/datastores/postgres.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ const { Pool } = require('pg');

const logger = require("../logger");
const { isAString } = require('../utils');
const { PG_1_22 } = require('./migrations/pg-1.22');

module.exports = class PgDatastore extends Datastore {
/** @type {S3Datastore} */
#sourcesDatastore = undefined;

/** @type {Pool} */
#pool = undefined;
pool = undefined;

/**
* @param {S3Datastore} sourcesDatastore
Expand All @@ -29,9 +30,9 @@ module.exports = class PgDatastore extends Datastore {
initialize = async () => {
logger.info("Initialize pg client");

await this.#sourcesDatastore.initialize();
await this.#sourcesDatastore.initialize(true);

this.#pool = new Pool({
this.pool = new Pool({
host: ENV.PG.HOST,
port: ENV.PG.PORT,
database: ENV.PG.DATABASE,
Expand All @@ -44,17 +45,22 @@ module.exports = class PgDatastore extends Datastore {
.on('error', err => logger.error(err));


const client = await this.#pool.connect();
const client = await this.pool.connect();

await Promise.all([
client.query("CREATE TABLE IF NOT EXISTS plugins(id VARCHAR(200), content JSONB)"),
client.query("CREATE TABLE IF NOT EXISTS jobs(id SERIAL, plugin_id VARCHAR UNIQUE, created_at TIMESTAMP default current_timestamp)"),
]);
client.release()


await this.applyMigrations()
}

applyMigrations = () => PG_1_22(this)

getPlugins = () => {
return this.#pool.connect()
return this.pool.connect()
.then(async client => {
const res = await client.query("SELECT content FROM plugins", [])
client.release();
Expand All @@ -64,7 +70,7 @@ module.exports = class PgDatastore extends Datastore {
}

getUserPlugins = async email => {
return this.#pool.connect()
return this.pool.connect()
.then(async client => {
const res = await client.query("SELECT content FROM plugins WHERE content->'admins' ? $1::text OR content->'users' ? $1::text", [email])
client.release();
Expand All @@ -77,7 +83,7 @@ module.exports = class PgDatastore extends Datastore {
const optUser = await this.getUser(email);

if (!optUser) {
await this.#pool.connect()
await this.pool.connect()
.then(client => {
return client.query("INSERT INTO users(email, content) VALUES($1, $2::jsonb)", [email, {}])
.then(() => client.release())
Expand All @@ -86,7 +92,7 @@ module.exports = class PgDatastore extends Datastore {
}

getUsers = () => {
return this.#pool.connect()
return this.pool.connect()
.then(async client => {
const res = await client.query("SELECT * from users");
client.release();
Expand Down Expand Up @@ -158,7 +164,7 @@ module.exports = class PgDatastore extends Datastore {
if (email === "*")
return Promise.resolve()

return this.#pool.connect()
return this.pool.connect()
.then(client => client.query("SELECT * FROM plugins WHERE id = $1::text AND (content->'admins' ? $2::text OR content->'users' ? $2::text)", [pluginId, email])
.then(res => {
client.release()
Expand All @@ -177,7 +183,7 @@ module.exports = class PgDatastore extends Datastore {
return {}
}

return this.#pool.connect()
return this.pool.connect()
.then(client => client.query("SELECT * FROM plugins WHERE id = $1::text", [pluginId])
.then(res => {
client.release()
Expand Down Expand Up @@ -256,7 +262,7 @@ module.exports = class PgDatastore extends Datastore {

console.log('generate pluginID', pluginId, newPlugin.pluginId)

return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query("INSERT INTO plugins(id, content) VALUES($1, $2::jsonb)", [newPlugin.pluginId, JSON.stringify({
...newPlugin,
Expand Down Expand Up @@ -326,7 +332,7 @@ module.exports = class PgDatastore extends Datastore {
}

addPluginToList = async (email, plugin) => {
return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query("INSERT INTO plugins(id, content) VALUES($1, $2::jsonb)", [plugin.pluginId, JSON.stringify({
pluginId: plugin.pluginId,
Expand All @@ -338,23 +344,23 @@ module.exports = class PgDatastore extends Datastore {
}

updatePluginList = async (pluginId, content) => {
return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query("UPDATE plugins SET content = $1::jsonb WHERE id = $2", [JSON.stringify(content), pluginId])
.then(() => client.release())
})
}

removePluginFromList = async (pluginId) => {
return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query("DELETE FROM plugins WHERE id = $1", [pluginId])
.then(() => client.release())
})
}

isJobRunning = pluginId => {
return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query("INSERT INTO jobs(plugin_id) VALUES($1) on conflict (plugin_id) do nothing", [pluginId])
.then(res => {
Expand All @@ -371,7 +377,7 @@ module.exports = class PgDatastore extends Datastore {
}

cleanJobsRunner = () => {
return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query(`DELETE from jobs WHERE created_at < NOW() - make_interval(mins => 1)`)
.then(() => {
Expand All @@ -381,15 +387,15 @@ module.exports = class PgDatastore extends Datastore {
}

removeJob = async pluginId => {
const client = await this.#pool.connect();
const client = await this.pool.connect();

await client.query("DELETE FROM jobs WHERE plugin_id = $1", [pluginId]);

client.release();
}

waitingTimeBeforeNextRun = pluginId => {
return this.#pool.connect()
return this.pool.connect()
.then(client => {
return client.query("SELECT created_at FROM jobs WHERE plugin_id = $1", [pluginId])
.then(res => {
Expand All @@ -408,7 +414,7 @@ module.exports = class PgDatastore extends Datastore {
getInvitation = (email, pluginId) => this.getPlugin(email, pluginId)

canSharePlugin = async (email, pluginId) => {
return this.#pool.connect()
return this.pool.connect()
.then(client => client.query("SELECT * FROM plugins WHERE id = $1::text AND content->'admins' ? $2::text", [pluginId, email])
.then(res => {
client.release()
Expand All @@ -417,7 +423,7 @@ module.exports = class PgDatastore extends Datastore {
}

getPluginUsers = (email, pluginId) => {
return this.#pool.connect()
return this.pool.connect()
.then(client => client.query(`SELECT content->'admins' as admins, content->'users' as users
FROM plugins
WHERE id = $1::text AND content->'admins' ? $2::text`, [pluginId, email])
Expand Down
Loading

0 comments on commit 7ccdcc4

Please sign in to comment.