Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear intermediate execution data. Closes #1264 #1326

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions src/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
define([
'plugin/CreateExecution/CreateExecution/CreateExecution',
'plugin/ExecuteJob/ExecuteJob/ExecuteJob',
'deepforge/storage/index',
'common/storage/constants',
'common/core/constants',
'deepforge/Constants',
Expand All @@ -13,6 +14,7 @@ define([
], function (
CreateExecution,
ExecuteJob,
Storage,
STORAGE_CONSTANTS,
GME_CONSTANTS,
CONSTANTS,
Expand Down Expand Up @@ -427,10 +429,14 @@ define([
msg += 'finished!';
}

if (!this.isDebugMode()) {
await this.deleteIntermediateData();
}

const isDeleted = await this.isDeleted();
this.stopExecHeartBeat();
if (!isDeleted) {

if (!isDeleted) {
this.logger.debug(`Pipeline "${name}" complete!`);
this.core.setAttribute(this.activeNode, 'endTime', Date.now());
this.core.setAttribute(this.activeNode, 'status',
Expand All @@ -443,30 +449,49 @@ define([
this._finished = true;
this.resultMsg(msg);
await this.save('Pipeline execution finished');
this.result.setSuccess(!this.pipelineError);
this._callback(this.pipelineError || null, this.result);
} else { // deleted!
this.logger.debug('Execution has been deleted!');
this.result.setSuccess(!this.pipelineError);
this._callback(this.pipelineError || null, this.result);
}
this.result.setSuccess(!this.pipelineError);
this._callback(this.pipelineError || null, this.result);
};

ExecutePipeline.prototype.isDeleted = function () {
var activeId = this.core.getPath(this.activeNode);
ExecutePipeline.prototype.isDebugMode = function() {
return !this.core.getAttribute(this.activeNode, 'snapshot');
};

ExecutePipeline.prototype.getStorageConfig = function () {
const storage = this.getCurrentConfig().storage || {};
storage.id = storage.id || 'gme';
storage.config = storage.config || {};
return storage;
};

ExecutePipeline.prototype.deleteIntermediateData = async function() {
const storageDir = this.getStorageDir();
const config = this.getStorageConfig();
const client = await Storage.getClient(config.id, this.logger, config.config);
return client.deleteDir(storageDir);
};

ExecutePipeline.prototype.getStorageDir = function () {
const execName = this.core.getAttribute(this.activeNode, 'name').replace(/\//g, '_');
return `${this.projectId}/executions/${execName}/`;
};

ExecutePipeline.prototype.isDeleted = async function () {
const activeId = this.core.getPath(this.activeNode);

// Check if the current execution has been deleted
return this.project.getBranchHash(this.branchName)
.then(hash => this.updateNodes(hash))
.then(() => this.core.loadByPath(this.rootNode, activeId))
.then(node => {
var deleted = node === null,
msg = `Verified that execution is ${deleted ? '' : 'not '}deleted`;

this.logger.debug(msg);
return deleted;
})
.fail(err => this.logger.error(err));
const hash = await this.project.getBranchHash(this.branchName);
await this.updateNodes(hash);

const node = await this.core.loadByPath(this.rootNode, activeId);
const isDeleted = node === null;
const msg = `Verified that execution is ${isDeleted ? '' : 'not '}deleted`;

this.logger.debug(msg);
return isDeleted;
};

ExecutePipeline.prototype.onPipelineDeleted = function () {
Expand Down
12 changes: 10 additions & 2 deletions src/plugins/GenerateJob/GenerateJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ define([
return configs;
};

GenerateJob.prototype.getStorageDir = function () {
const operation = this.activeNode;
const execution = this.core.getParent(this.core.getParent(operation));

const execName = this.core.getAttribute(execution, 'name').replace(/\//g, '_');
const jobId = this.core.getPath(this.activeNode).replace(/\//g, '_');
return `${this.projectId}/executions/${execName}/${jobId}/`;
};

GenerateJob.prototype.createInputs = async function (node, files) {
this.logger.info('Retrieving inputs and deserialize fns...');
const allInputs = await this.getInputs(node);
Expand All @@ -277,8 +286,7 @@ define([
.filter(pair => !!this.getAttribute(pair[2], 'data')); // remove empty inputs

const storage = this.getStorageConfig();
const jobId = this.core.getPath(this.activeNode).replace(/\//g, '_');
const storageDir = `${this.projectId}/executions/${jobId}`;
const storageDir = this.getStorageDir();

const configs = {
storage: {
Expand Down