diff --git a/src/plugins/ExecutePipeline/ExecutePipeline.js b/src/plugins/ExecutePipeline/ExecutePipeline.js index efbafdb38..b1f17037a 100644 --- a/src/plugins/ExecutePipeline/ExecutePipeline.js +++ b/src/plugins/ExecutePipeline/ExecutePipeline.js @@ -4,6 +4,7 @@ define([ 'plugin/CreateExecution/CreateExecution/CreateExecution', 'plugin/ExecuteJob/ExecuteJob/ExecuteJob', + 'deepforge/storage/index', 'common/storage/constants', 'common/core/constants', 'deepforge/Constants', @@ -13,6 +14,7 @@ define([ ], function ( CreateExecution, ExecuteJob, + Storage, STORAGE_CONSTANTS, GME_CONSTANTS, CONSTANTS, @@ -427,10 +429,14 @@ define([ msg += 'finished!'; } + if (!this.isDebugMode()) { + await this.deleteIntermediateData(); + } + const isDeleted = await this.isDeleted(); this.stopExecHeartBeat(); - if (!isDeleted) { + if (!isDeleted) { this.logger.debug(`Pipeline "${name}" complete!`); this.core.setAttribute(this.activeNode, 'endTime', Date.now()); this.core.setAttribute(this.activeNode, 'status', @@ -443,30 +449,49 @@ define([ this._finished = true; this.resultMsg(msg); await this.save('Pipeline execution finished'); - this.result.setSuccess(!this.pipelineError); - this._callback(this.pipelineError || null, this.result); } else { // deleted! this.logger.debug('Execution has been deleted!'); - this.result.setSuccess(!this.pipelineError); - this._callback(this.pipelineError || null, this.result); } + this.result.setSuccess(!this.pipelineError); + this._callback(this.pipelineError || null, this.result); }; - ExecutePipeline.prototype.isDeleted = function () { - var activeId = this.core.getPath(this.activeNode); + ExecutePipeline.prototype.isDebugMode = function() { + return !this.core.getAttribute(this.activeNode, 'snapshot'); + }; + + ExecutePipeline.prototype.getStorageConfig = function () { + const storage = this.getCurrentConfig().storage || {}; + storage.id = storage.id || 'gme'; + storage.config = storage.config || {}; + return storage; + }; + + ExecutePipeline.prototype.deleteIntermediateData = async function() { + const storageDir = this.getStorageDir(); + const config = this.getStorageConfig(); + const client = await Storage.getClient(config.id, this.logger, config.config); + return client.deleteDir(storageDir); + }; + + ExecutePipeline.prototype.getStorageDir = function () { + const execName = this.core.getAttribute(this.activeNode, 'name').replace(/\//g, '_'); + return `${this.projectId}/executions/${execName}/`; + }; + + ExecutePipeline.prototype.isDeleted = async function () { + const activeId = this.core.getPath(this.activeNode); // Check if the current execution has been deleted - return this.project.getBranchHash(this.branchName) - .then(hash => this.updateNodes(hash)) - .then(() => this.core.loadByPath(this.rootNode, activeId)) - .then(node => { - var deleted = node === null, - msg = `Verified that execution is ${deleted ? '' : 'not '}deleted`; - - this.logger.debug(msg); - return deleted; - }) - .fail(err => this.logger.error(err)); + const hash = await this.project.getBranchHash(this.branchName); + await this.updateNodes(hash); + + const node = await this.core.loadByPath(this.rootNode, activeId); + const isDeleted = node === null; + const msg = `Verified that execution is ${isDeleted ? '' : 'not '}deleted`; + + this.logger.debug(msg); + return isDeleted; }; ExecutePipeline.prototype.onPipelineDeleted = function () { diff --git a/src/plugins/GenerateJob/GenerateJob.js b/src/plugins/GenerateJob/GenerateJob.js index 3be2c20ef..663c97466 100644 --- a/src/plugins/GenerateJob/GenerateJob.js +++ b/src/plugins/GenerateJob/GenerateJob.js @@ -265,6 +265,15 @@ define([ return configs; }; + GenerateJob.prototype.getStorageDir = function () { + const operation = this.activeNode; + const execution = this.core.getParent(this.core.getParent(operation)); + + const execName = this.core.getAttribute(execution, 'name').replace(/\//g, '_'); + const jobId = this.core.getPath(this.activeNode).replace(/\//g, '_'); + return `${this.projectId}/executions/${execName}/${jobId}/`; + }; + GenerateJob.prototype.createInputs = async function (node, files) { this.logger.info('Retrieving inputs and deserialize fns...'); const allInputs = await this.getInputs(node); @@ -277,8 +286,7 @@ define([ .filter(pair => !!this.getAttribute(pair[2], 'data')); // remove empty inputs const storage = this.getStorageConfig(); - const jobId = this.core.getPath(this.activeNode).replace(/\//g, '_'); - const storageDir = `${this.projectId}/executions/${jobId}`; + const storageDir = this.getStorageDir(); const configs = { storage: {