Skip to content

Commit

Permalink
Update execution cancelation to abort plugin. Closes #1220 (#1223)
Browse files Browse the repository at this point in the history
* WIP #1220 Add onAbort handler

* WIP #1220 Updated execution to check for executionId

* Add executionId to Execution and Job

* WIP removed references to the executor client
  • Loading branch information
brollb committed Sep 11, 2019
1 parent a1a5b3f commit ce3e43a
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 114 deletions.
142 changes: 32 additions & 110 deletions src/common/viz/Execute.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
/* globals define, WebGMEGlobal */
/* globals define */
// Mixin for executing jobs and pipelines
define([
'q',
'executor/ExecutorClient',
'deepforge/api/ExecPulseClient',
'deepforge/api/JobOriginClient',
'deepforge/Constants',
'panel/FloatingActionButton/styles/Materialize'
], function(
Q,
ExecutorClient,
ExecPulseClient,
JobOriginClient,
CONSTANTS,
Expand All @@ -22,11 +20,6 @@ define([
this.pulseClient = new ExecPulseClient({
logger: this.logger
});
this._executor = new ExecutorClient({
logger: this.logger.fork('ExecutorClient'),
serverPort: WebGMEGlobal.gmeConfig.server.port,
httpsecure: window.location.protocol === 'https:'
});
this.originManager = new JobOriginClient({logger: this.logger});
};

Expand All @@ -39,9 +32,11 @@ define([
};

Execute.prototype.runExecutionPlugin = function(pluginId, opts) {
var context = this.client.getCurrentPluginContext(pluginId),
var deferred = Q.defer(),
context = this.client.getCurrentPluginContext(pluginId),
node = opts.node || this.client.getNode(this._currentNodeId),
name = node.getAttribute('name'),
onPluginInitiated,
method;

// Set the activeNode
Expand All @@ -56,6 +51,17 @@ define([
return;
}

onPluginInitiated = (sender, event) => {
this.client.removeEventListener(this._client.CONSTANTS.PLUGIN_INITIATED, onPluginInitiated);
this.client.setAttribute(node.getId(), 'executionId', event.executionId);
deferred.resolve(event.executionId);
};

this.client.addEventListener(
this.client.CONSTANTS.PLUGIN_INITIATED,
onPluginInitiated
);

this.client[method](pluginId, context, (err, result) => {
var msg = err ? `${name} failed!` : `${name} executed successfully!`,
duration = err ? 4000 : 2000;
Expand All @@ -68,76 +74,15 @@ define([

Materialize.toast(msg, duration);
});

return deferred.promise;
};

Execute.prototype.isRunning = function(node) {
var baseId,
base,
type;

node = node || this.client.getNode(this._currentNodeId);
baseId = node.getBaseId();
base = this.client.getNode(baseId);
type = base.getAttribute('name');

if (type === 'Execution') {
return node.getAttribute('status') === 'running';
} else if (type === 'Job') {
return this.isRunningJob(node);
}
return false;
};

Execute.prototype.isRunningJob = function(job) {
var status = job.getAttribute('status');

return (status === 'running' || status === 'pending') &&
job.getAttribute('secret') && job.getAttribute('jobId');
};

Execute.prototype.silentStopJob = function(job) {
var jobHash,
secret;

job = job || this.client.getNode(this._currentNodeId);
jobHash = job.getAttribute('jobId');
secret = job.getAttribute('secret');
if (!jobHash || !secret) {
this.logger.error('Cannot stop job. Missing jobHash or secret');
return;
}

return this._executor.cancelJob(jobHash, secret)
.then(() => this.logger.info(`${jobHash} has been cancelled!`))
.fail(err => this.logger.error(`Job cancel failed: ${err}`));
};

Execute.prototype._setJobStopped = function(jobId, silent) {
if (!silent) {
var name = this.client.getNode(jobId).getAttribute('name');
this.client.startTransaction(`Stopping "${name}" job`);
}

this.client.delAttribute(jobId, 'jobId');
this.client.delAttribute(jobId, 'secret');
this.client.setAttribute(jobId, 'status', 'canceled');

if (!silent) {
this.client.completeTransaction();
}
return node.getAttribute('executionId');
};

Execute.prototype.stopJob = function(job, silent) {
var jobId;

job = job || this.client.getNode(this._currentNodeId);
jobId = job.getId();

this.silentStopJob(job);
this._setJobStopped(jobId, silent);
};


Execute.prototype.loadChildren = function(id) {
var deferred = Q.defer(),
execNode = this.client.getNode(id || this._currentNodeId),
Expand All @@ -163,49 +108,26 @@ define([
return deferred.promise;
};

Execute.prototype.stopExecution = function(id, inTransaction) {
var execNode = this.client.getNode(id || this._currentNodeId);

return this.loadChildren(id)
.then(() => this._stopExecution(execNode, inTransaction));
};

Execute.prototype.silentStopExecution = function(id) {
var execNode = this.client.getNode(id || this._currentNodeId);
Execute.prototype.stopExecution = function(nodeId=this._currentNodeId) {
const node = this.client.getNode(nodeId);
const base = this.client.getNode(node.getBaseId());
const type = base.getAttribute('name');

// Stop the execution w/o setting any attributes
return this.loadChildren(id)
.then(() => this._silentStopExecution(execNode));
};

Execute.prototype._stopExecution = function(execNode, inTransaction) {
var msg = `Canceling ${execNode.getAttribute('name')} execution`,
jobIds;

if (!inTransaction) {
this.client.startTransaction(msg);
let executionId = node.getAttribute('executionId');
this.client.delAttribute(nodeId, 'executionId');
if (type === 'Job' && !executionId) {
const execNode = this.client.getNode(node.getParentId());
executionId = execNode.getAttribute('executionId');
this.client.delAttribute(nodeId, 'executionId');
}

jobIds = this._silentStopExecution(execNode);

this.client.setAttribute(execNode.getId(), 'status', 'canceled');
jobIds.forEach(jobId => this._setJobStopped(jobId, true));

if (!inTransaction) {
this.client.completeTransaction();
if (executionId) {
this.client.abortPlugin(executionId);
} else {
this.logger.warn(`Could not find execution ID for ${nodeId}`);
}
};

Execute.prototype._silentStopExecution = function(execNode) {
var runningJobIds = execNode.getChildrenIds()
.map(id => this.client.getNode(id))
.filter(job => this.isRunning(job)); // get running jobs

runningJobIds.forEach(job => this.silentStopJob(job)); // stop them

return runningJobIds;
};

// Resuming Executions
Execute.prototype.checkJobExecution= function (job) {
var pipelineId = job.getParentId(),
Expand Down
4 changes: 4 additions & 0 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ define([
.catch(err => this._callback(err, this.result));
};

ExecuteJob.prototype.onAbort = function () {
this.canceled = true;
};

ExecuteJob.prototype.checkExecutionEnv = function () {
// Throw an exception if no resources
this.logger.info(`Checking execution environment`);
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/ExecuteJob/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
"disableServerSideExecution": false,
"disableBrowserSideExecution": false,
"writeAccessRequired": false,
"canBeAborted": true,
"configStructure": []
}
}
1 change: 1 addition & 0 deletions src/plugins/ExecutePipeline/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
},
"disableServerSideExecution": false,
"disableBrowserSideExecution": false,
"canBeAborted": true,
"configStructure": [
{
"name": "name",
Expand Down
Binary file modified src/seeds/pipeline/pipeline.webgmex
Binary file not shown.
2 changes: 1 addition & 1 deletion src/seeds/pipeline/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.1
0.11.0
2 changes: 1 addition & 1 deletion src/visualizers/panels/ForgeActionButton/Actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ define([
return this.isRunning();
},
action: function() {
this.stopJob();
this.stopExecution();
}
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ define([
if (this.currentJobId) { // Only if nested in a job
job = this._client.getNode(this.currentJobId);
if (this.isRunning(job)) {
this.stopJob(job);
this.stopExecution(this.currentJobId);
} else {
this.executeJob(job);
}
Expand Down

0 comments on commit ce3e43a

Please sign in to comment.