Skip to content

Commit

Permalink
Record data provenance in Data nodes. Closes #1705 (#1708)
Browse files Browse the repository at this point in the history
* WIP Add ImplicitOperation to metamodel

* Update pipeline seed to 0.20.0

* remove auto-created operation in implicit operation

* Record data provenance in Data nodes. Closes #1705

* Add ExecutionHelpers

* WIP clean up debug logs and comments

* Update tests

* Update pipeline library in project seed

* Update devProject seed

* Update devProject

* Update pipeline seed

* Update project, devProject seeds

* Include assets in devProject

* Use all valid names - not just set values
  • Loading branch information
brollb committed May 21, 2020
1 parent 2e3fd9b commit b67d6ba
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 136 deletions.
79 changes: 79 additions & 0 deletions src/common/plugin/ExecutionHelpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*globals define*/
define([
'underscore',
], function(
_,
) {
class ExecutionHelpers {
constructor(core, rootNode) {
this.core = core;
this.rootNode = rootNode;
}

async snapshotOperation(node, dst, base) {
// If we are making a snapshot, we should copy the base operation
// and set the attributes, add the child nodes, etc
if (!base) {
base = this.core.getBase(node);
}
const snapshot = this.core.createNode({
base: base,
parent: dst
});

const names = this.core.getValidAttributeNames(node);
const values = names.map(name => this.core.getAttribute(node, name));
names.forEach((name, i) =>
this.core.setAttribute(snapshot, name, values[i]));

const ptrNames = this.core.getValidPointerNames(node)
.filter(name => name !== 'base');
const setPointers = Promise.all(
ptrNames.map(async name => {
const targetPath = this.core.getPointerPath(node, name);
if (targetPath) {
const target = await this.core.loadByPath(this.rootNode, targetPath);
const targetCopy = this.core.copyNode(target, snapshot);
this.core.setPointer(snapshot, name, targetCopy);
}
})
);
await setPointers;

// Copy the data I/O
const metaTypeComparator = (a, b) => {
const aId = this.core.getPath(this.core.getMetaType(a));
const bId = this.core.getPath(this.core.getMetaType(b));

return aId < bId ? -1 : 1;
};

const srcCntrs = (await this.core.loadChildren(node))
.sort(metaTypeComparator);
const [dstInput, dstOutput] = (await this.core.loadChildren(snapshot))
.sort(metaTypeComparator);
const [srcInputs, srcOutputs] = await Promise.all(srcCntrs.map(ctr => this.core.loadChildren(ctr)));
const copies = srcInputs.map(n => this.core.copyNode(n, dstInput));
copies.push(...srcOutputs.map(n => this.shallowCopy(n, dstOutput)));
const oldNewPairs = _.zip(srcInputs.concat(srcOutputs), copies);
oldNewPairs.push([node, snapshot]);
return {snapshot, pairs: oldNewPairs};
}

shallowCopy (original, dst) {
const attrNames = this.core.getAttributeNames(original);
const copy = this.core.createNode({
base: this.core.getMetaType(original),
parent: dst
});

const values = attrNames.map(name => this.core.getAttribute(original, name));
attrNames.forEach((name, i) =>
this.core.setAttribute(copy, name, values[i]));

return copy;
}
}

return ExecutionHelpers;
});
4 changes: 1 addition & 3 deletions src/common/plugin/LocalExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ define([
const dstStorage = await this.getStorageClient();
jobLogger.append(`Saving output data to ${dstStorage.name}...`);

const createParams = {base: this.META.Data, parent: artifactsDir};
for (let i = dataNodes.length; i--;) {
const artifact = this.core.createNode(createParams);
const artifact = this.core.copyNode(dataNodes[i], artifactsDir);
const createdAt = Date.now();
const originalData = JSON.parse(this.core.getAttribute(dataNodes[i], 'data'));

Expand All @@ -98,7 +97,6 @@ define([
this.core.setAttribute(artifact, 'data', JSON.stringify(userAsset));
this.core.setAttribute(artifact, 'name', name);
this.core.setAttribute(artifact, 'createdAt', createdAt);
this.core.setPointer(artifact, 'origin', inputs[0][2]);
}

this.logger.info(`Saved ${dataNodes.length} artifacts in ${this.projectId}.`);
Expand Down
99 changes: 11 additions & 88 deletions src/plugins/CreateExecution/CreateExecution.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

define([
'q',
'deepforge/plugin/ExecutionHelpers',
'deepforge/plugin/LocalExecutor',
'text!./metadata.json',
'underscore',
'plugin/PluginBase'
], function (
Q,
ExecutionHelpers,
LocalExecutor,
pluginMetadata,
_,
Expand Down Expand Up @@ -72,7 +74,7 @@ define([
const ctnrCtnr = this.core.getParent(pipelineContainer) || this.rootNode;
return this.core.loadChildren(ctnrCtnr)
.then(children => {
var execPath = this.core.getPath(this.META.Execution);
const execPath = this.core.getPath(this.META.Execution);

// Find a node in the root that can contain only executions
return children.find(child => {
Expand Down Expand Up @@ -195,23 +197,24 @@ define([
});
};

CreateExecution.prototype.copyOperations = function (nodes, dst) {
var snapshot = !this.getCurrentConfig().debug;
CreateExecution.prototype.copyOperations = async function (nodes, dst) {
const snapshot = !this.getCurrentConfig().debug;

if (snapshot) {
const helpers = new ExecutionHelpers(this.core, this.rootNode);
this.logger.debug('Execution is a snapshot -> severing the inheritance');
return Q.all(nodes.map(node => {

return Promise.all(nodes.map(async node => {
if (this.isLocalOperation(node) ||
this.isMetaTypeOf(node, this.META.Transporter)) {

return [[node, this.core.copyNode(node, dst)]];
} else if (this.isMetaTypeOf(node, this.META.Operation)) {
return this.snapshotNode(node, dst);
const {pairs} = await helpers.snapshotOperation(node, dst, this.META.Operation);
return pairs;
}
}))
.then(pairs => pairs.filter(pair => !!pair)
.reduce((l1, l2) => l1.concat(l2))
);
.then(pairs => pairs.filter(pair => !!pair).flat());

} else if (nodes.length) {
this.logger.debug('Execution is not a snapshot -> doing a simple copy');
Expand All @@ -221,86 +224,6 @@ define([
return [];
};

CreateExecution.prototype.snapshotNode = async function (op, dst) {
// If we are making a snapshot, we should copy the base operation
// and set the attributes, add the child nodes, etc
var base = this.core.getBase(this.core.getBase(op)),
values,
snapshot = this.core.createNode({
base: base,
parent: dst
});

// Copy over the attributes
const names = this.core.getValidAttributeNames(op);
values = names.map(name => this.core.getAttribute(op, name));
names.forEach((name, i) =>
this.core.setAttribute(snapshot, name, values[i]));

// Copy the pointers
const ptrNames = this.core.getValidPointerNames(op);
const setPointers = Promise.all(
ptrNames.map(async name => {
const targetPath = this.core.getPointerPath(op, name);
if (targetPath) {
const target = await this.core.loadByPath(this.rootNode, targetPath);
this.core.setPointer(snapshot, name, target);
}
})
);
await setPointers;

// Copy the data I/O
var srcCntrs = this.core.getChildrenPaths(op),
dstCntrs = this.core.getChildrenPaths(snapshot);

return Q.all([srcCntrs, dstCntrs].map(ids =>
Q.all(ids.map(id => this.core.loadByPath(this.rootNode, id)))))
.then(cntrs => {
var srcCntrs,
dstCntrs;

// Sort all containers by metatype id
cntrs.map(l => l.sort((a, b) => {
var aId = this.core.getPath(this.core.getMetaType(a)),
bId = this.core.getPath(this.core.getMetaType(b));

return aId < bId ? -1 : 1;
}));

srcCntrs = cntrs[0];
dstCntrs = cntrs[1];
return Q.all(srcCntrs.map(ctr => Q.all(this.core.getChildrenPaths(ctr)
.map(id => this.core.loadByPath(this.rootNode, id)))))
.then(cntrs =>
cntrs.map((nodes, i) =>
nodes.map(n => [n, this.copyDataNode(n, dstCntrs[i])]))
);
})
.then(nodes => {
nodes = nodes.reduce((l1, l2) => l1.concat(l2), []);
nodes.push([op, snapshot]);
return nodes;
});
};

CreateExecution.prototype.copyDataNode = function (original, dst) {
// Create new node of the given type
var attrNames = this.core.getAttributeNames(original),
values,
copy = this.core.createNode({
base: this.core.getMetaType(original),
parent: dst
});

// Set the 'name', 'data' attributes
values = attrNames.map(name => this.core.getAttribute(original, name));
attrNames.forEach((name, i) =>
this.core.setAttribute(copy, name, values[i]));

return copy;
};

CreateExecution.prototype.addDataToMap = function (srcOp, dstOp, map) {
return Q.all(
[srcOp, dstOp]
Expand Down
35 changes: 28 additions & 7 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ define([
'deepforge/plugin/LocalExecutor',
'deepforge/plugin/PtrCodeGen',
'deepforge/plugin/Operation',
'deepforge/plugin/ExecutionHelpers',
'deepforge/api/JobLogsClient',
'deepforge/api/JobOriginClient',
'deepforge/api/ExecPulseClient',
Expand All @@ -28,6 +29,7 @@ define([
LocalExecutor, // DeepForge operation primitives
PtrCodeGen,
OperationPlugin,
ExecutionHelpers,
JobLogsClient,
JobOriginClient,
ExecPulseClient,
Expand Down Expand Up @@ -656,7 +658,7 @@ define([
this.logManager.deleteLog(jobId);
if (status === this.compute.SUCCESS) {
const results = await this.compute.getResultsInfo(jobInfo);
this.onDistOperationComplete(op, results);
await this.recordOperationOutputs(op, results);
} else {
// Parse the most precise error and present it in the toast...
const lastline = result.stdout.split('\n').filter(l => !!l).pop() || '';
Expand All @@ -676,31 +678,50 @@ define([
}
};

ExecuteJob.prototype.onDistOperationComplete = async function (node, results) {
ExecuteJob.prototype.recordOperationOutputs = async function (node, results) {
const nodeId = this.core.getPath(node);
const outputPorts = await this.getOutputs(node);
const outputs = outputPorts.map(tuple => [tuple[0], tuple[2]]);

for (let i = outputs.length; i--;) {
const [name, node] = outputs[i];
const [name, dataNode] = outputs[i];
const {type, dataInfo} = results[name];

if (type) {
this.core.setAttribute(node, 'type', type);
this.core.setAttribute(dataNode, 'type', type);
this.logger.info(`Setting ${nodeId} data type to ${type}`);
} else {
this.logger.warn(`No data type found for ${nodeId}`);
}

if (dataInfo) {
this.core.setAttribute(node, 'data', JSON.stringify(dataInfo));
this.core.setAttribute(dataNode, 'data', JSON.stringify(dataInfo));
this.logger.info(`Setting ${nodeId} data to ${dataInfo}`);
}

await this.recordProvenance(dataNode, node);
}

return this.onOperationComplete(node);
};

ExecuteJob.prototype.recordProvenance = async function (dataNode, opNode) {
const oldProvId = this.core.getPointerPath(dataNode, 'provenance');
if (oldProvId) {
const executedJob = await this.core.loadByPath(this.rootNode, oldProvId);
this.core.deleteNode(executedJob);
}

const helpers = new ExecutionHelpers(this.core, this.rootNode);
const executedJob = this.core.createNode({
base: this.META.ExecutedJob,
parent: dataNode
});
const {snapshot} = await helpers.snapshotOperation(opNode, executedJob, this.META.Operation);
this.core.setPointer(executedJob, 'operation', snapshot);
this.core.setPointer(dataNode, 'provenance', executedJob);
};

//////////////////////////// Special Operations ////////////////////////////
ExecuteJob.prototype.executeLocalOperation = async function (node) {
const type = this.getLocalOperationType(node);
Expand All @@ -714,14 +735,14 @@ define([

try {
await this[type](node);
this.onOperationEnd(null, node);
await this.onOperationEnd(null, node);
} catch (err) {
const job = this.core.getParent(node);
const stdout = this.core.getAttribute(job, 'stdout') +
'\n' + red(err.toString());

this.core.setAttribute(job, 'stdout', stdout);
this.onOperationEnd(err, node);
await this.onOperationEnd(err, node);
}
};

Expand Down
Loading

0 comments on commit b67d6ba

Please sign in to comment.