From df6f32b27d63cbe2b843e49608816d22b02963c6 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 11 Nov 2020 09:14:30 -0600 Subject: [PATCH 1/2] Purge compute job after recording results. Closes #1978 --- src/common/compute/backends/local/Client.js | 2 +- src/plugins/ExecuteJob/ExecuteJob.js | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/common/compute/backends/local/Client.js b/src/common/compute/backends/local/Client.js index c92b4b92c..4d70c5720 100644 --- a/src/common/compute/backends/local/Client.js +++ b/src/common/compute/backends/local/Client.js @@ -96,7 +96,7 @@ define([ return JSON.parse(resultsTxt); } - async purge (job) { + async purgeJob (job) { const {hash} = job; if (hash === this.currentJob) { throw new Error('Cannot purge running job.'); diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index 0869d1007..e121c806c 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -62,6 +62,7 @@ define([ ExecuteJobMetadata.call(this); this.pluginMetadata = pluginMetadata; this._running = null; + this._computeJobs = {}; // Metadata updating this.lastAppliedCmd = {}; @@ -190,6 +191,7 @@ define([ compute.on('end', async (id/*, info*/) => { + const computeJob = this._computeJobs[id]; try { const job = this.getNodeForJobId(id); if (job === null) { @@ -197,13 +199,16 @@ define([ this.canceled, `Cannot find node for job ID in running pipeline: ${id}` ); + await compute.purgeJob(computeJob); return; } this.cleanJobHashInfo(id); await this.onOperationEnd(null, job); + await compute.purgeJob(computeJob); } catch (err) { this.logger.error(`Error when processing operation end: ${err}`); await this.save('Saving remaining edits before pipeline exits w/ error.'); + await compute.purgeJob(computeJob); throw err; } } @@ -534,6 +539,7 @@ define([ this.startExecHeartBeat(); } + this._computeJobs[jobInfo.hash] = computeJob; return await this.recordJobOrigin(jobInfo.hash, job); }; @@ -673,14 +679,14 @@ define([ this.logManager.deleteLog(jobId); if (status === this.compute.SUCCESS) { const results = await this.compute.getResultsInfo(jobInfo); - await this.recordOperationOutputs(op, results); + return this.recordOperationOutputs(op, results); } else { // Parse the most precise error and present it in the toast... const lastline = result.stdout.split('\n').filter(l => !!l).pop() || ''; if (lastline.includes('Error')) { - this.onOperationFail(op, lastline); + return this.onOperationFail(op, lastline); } else { - this.onOperationFail(op, `Operation "${opName}" failed!`); + return this.onOperationFail(op, `Operation "${opName}" failed!`); } } } else { // something bad happened... From 4ecc033da69570e73505d170b06e6215e6e8b538 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 11 Nov 2020 09:19:34 -0600 Subject: [PATCH 2/2] Replace return with await --- src/plugins/ExecuteJob/ExecuteJob.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index e121c806c..46557d334 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -648,9 +648,11 @@ define([ ExecuteJob.prototype.onOperationEnd = async function (err, job) { if (err) { - return this.onOperationFail(job, err); + await this.onOperationFail(job, err); + return; } else if (this.isLocalOperation(job)) { - return this.onOperationComplete(job); + await this.onOperationComplete(job); + return; } const op = await this.getOperation(job); @@ -666,7 +668,7 @@ define([ this.logger.debug(`"${name}" has been CANCELED!`); const stdout = await this.logManager.getLog(jobId); this.core.setAttribute(job, 'stdout', stdout); - return this.onOperationCanceled(op); + await this.onOperationCanceled(op); } if (status === this.compute.SUCCESS || status === this.compute.FAILED) { @@ -679,14 +681,14 @@ define([ this.logManager.deleteLog(jobId); if (status === this.compute.SUCCESS) { const results = await this.compute.getResultsInfo(jobInfo); - return this.recordOperationOutputs(op, results); + await this.recordOperationOutputs(op, results); } else { // Parse the most precise error and present it in the toast... const lastline = result.stdout.split('\n').filter(l => !!l).pop() || ''; if (lastline.includes('Error')) { - return this.onOperationFail(op, lastline); + await this.onOperationFail(op, lastline); } else { - return this.onOperationFail(op, `Operation "${opName}" failed!`); + await this.onOperationFail(op, `Operation "${opName}" failed!`); } } } else { // something bad happened... @@ -695,7 +697,7 @@ define([ this.core.setAttribute(job, 'stdout', consoleErr); this.logger.error(err); - return this.onOperationFail(op, err); + await this.onOperationFail(op, err); } };