Skip to content

Commit

Permalink
(execution-plan-snapshot-3) Make execution plan available on Instance…
Browse files Browse the repository at this point in the history
…CreateRunArgs

Summary:
We are persisting execution plan snapshots in addition to
pipeline snaphosts, so execution plan snapshot must be made available
to the run creation process in DagsterInstance.

Depends on D2621

Test Plan: BK

Reviewers: sashank, alangenfeld, max

Reviewed By: sashank

Differential Revision: https://dagster.phacility.com/D2624
  • Loading branch information
schrockn committed Apr 21, 2020
1 parent b616fcc commit d62487a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dagster.core.events import EngineEventData
from dagster.core.execution.api import create_execution_plan
from dagster.core.instance import InstanceCreateRunArgs
from dagster.core.snap.execution_plan_snapshot import snapshot_from_execution_plan
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus
from dagster.core.utils import make_new_run_id
from dagster.utils.error import SerializableErrorInfo
Expand Down Expand Up @@ -94,9 +95,13 @@ def _start_pipeline_execution(graphene_info, execution_params, is_reexecuted=Fal
# Otherwise we know we are creating a new run, and we can
# use the new machinery that persists a pipeline snapshot
# with the run.

run = instance.create_run_with_snapshot(
InstanceCreateRunArgs(
pipeline_snapshot=pipeline_def.get_pipeline_snapshot(),
execution_plan_snapshot=snapshot_from_execution_plan(
execution_plan, pipeline_def.get_pipeline_snapshot_id()
),
run_id=execution_params.execution_metadata.run_id
if execution_params.execution_metadata.run_id
else make_new_run_id(),
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/core/definitions/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ def new_with(self, name=None, mode_defs=None, preset_defs=None):
def get_pipeline_snapshot(self):
return self.get_pipeline_index().pipeline_snapshot

def get_pipeline_snapshot_id(self):
return self.get_pipeline_index().pipeline_snapshot_id

def get_pipeline_index(self):
if self._cached_pipeline_index is None:
from dagster.core.snap.pipeline_snapshot import PipelineIndex, PipelineSnapshot
Expand Down
53 changes: 44 additions & 9 deletions python_modules/dagster/dagster/core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.execution.retries import Retries
from dagster.core.instance import DagsterInstance, InstanceCreateRunArgs
from dagster.core.snap.execution_plan_snapshot import snapshot_from_execution_plan
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus
from dagster.core.system_config.objects import EnvironmentConfig
from dagster.core.telemetry import telemetry_wrapper
Expand Down Expand Up @@ -122,13 +123,24 @@ def _pipeline_execution_iterator(pipeline_context, execution_plan, pipeline_run)

def execute_run_iterator(pipeline, pipeline_run, instance):
check.inst_param(pipeline, 'pipeline', PipelineDefinition)
instance = check.inst_param(instance, 'instance', DagsterInstance)
check.inst_param(pipeline_run, 'pipeline_run', PipelineRun)
check.inst_param(instance, 'instance', DagsterInstance)
check.invariant(pipeline_run.status == PipelineRunStatus.NOT_STARTED)

execution_plan = create_execution_plan(
pipeline, environment_dict=pipeline_run.environment_dict, pipeline_run=pipeline_run
)

return _execute_run_iterator_with_plan(pipeline, pipeline_run, instance, execution_plan)


def _execute_run_iterator_with_plan(pipeline, pipeline_run, instance, execution_plan):
check.inst_param(pipeline, 'pipeline', PipelineDefinition)
check.inst_param(pipeline_run, 'pipeline_run', PipelineRun)
check.inst_param(instance, 'instance', DagsterInstance)
check.inst_param(execution_plan, 'execution_plan', ExecutionPlan)
check.invariant(pipeline_run.status == PipelineRunStatus.NOT_STARTED)

initialization_manager = pipeline_initialization_manager(
pipeline, pipeline_run.environment_dict, pipeline_run, instance, execution_plan,
)
Expand Down Expand Up @@ -215,14 +227,23 @@ def _check_execute_pipeline_args(
check.opt_inst_param(instance, 'instance', DagsterInstance)
instance = instance or DagsterInstance.ephemeral()

fake_pipeline_run_to_pass_to_create_run = check_pipeline_run(
pipeline_run_from_run_config(run_config), pipeline
)

execution_plan = create_execution_plan(
pipeline, environment_dict, fake_pipeline_run_to_pass_to_create_run,
)

pipeline_run = _create_run(
instance,
pipeline,
check_pipeline_run(pipeline_run_from_run_config(run_config), pipeline),
fake_pipeline_run_to_pass_to_create_run,
environment_dict,
execution_plan,
)

return pipeline, environment_dict, instance, pipeline_run
return pipeline, environment_dict, instance, pipeline_run, execution_plan


def execute_pipeline_iterator(
Expand Down Expand Up @@ -267,7 +288,13 @@ def execute_pipeline_iterator(
Returns:
Iterator[DagsterEvent]: The stream of events resulting from pipeline execution.
'''
pipeline, environment_dict, instance, pipeline_run = _check_execute_pipeline_args(
(
pipeline,
environment_dict,
instance,
pipeline_run,
execution_plan,
) = _check_execute_pipeline_args(
'execute_pipeline_iterator',
pipeline=pipeline,
environment_dict=environment_dict,
Expand All @@ -278,7 +305,7 @@ def execute_pipeline_iterator(
instance=instance,
)

return execute_run_iterator(pipeline, pipeline_run, instance)
return _execute_run_iterator_with_plan(pipeline, pipeline_run, instance, execution_plan)


@telemetry_wrapper
Expand Down Expand Up @@ -328,7 +355,13 @@ def execute_pipeline(
This is the entrypoint for dagster CLI execution. For the dagster-graphql entrypoint, see
``dagster.core.execution.api.execute_plan()``.
'''
pipeline, environment_dict, instance, pipeline_run = _check_execute_pipeline_args(
(
pipeline,
environment_dict,
instance,
pipeline_run,
execution_plan,
) = _check_execute_pipeline_args(
'execute_pipeline',
pipeline=pipeline,
environment_dict=environment_dict,
Expand All @@ -339,8 +372,6 @@ def execute_pipeline(
instance=instance,
)

execution_plan = create_execution_plan(pipeline, environment_dict, pipeline_run)

initialization_manager = pipeline_initialization_manager(
pipeline,
environment_dict,
Expand Down Expand Up @@ -440,10 +471,14 @@ def step_output_event_filter(pipe_iterator):
yield step_event


def _create_run(instance, pipeline_def, run_config, environment_dict):
def _create_run(instance, pipeline_def, run_config, environment_dict, execution_plan):

return instance.create_run_with_snapshot(
InstanceCreateRunArgs(
pipeline_snapshot=pipeline_def.get_pipeline_snapshot(),
execution_plan_snapshot=snapshot_from_execution_plan(
execution_plan, pipeline_def.get_pipeline_snapshot_id()
),
run_id=run_config.run_id,
environment_dict=environment_dict,
mode=run_config.mode,
Expand Down
10 changes: 8 additions & 2 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ def emit(self, record):
class InstanceCreateRunArgs(
namedtuple(
'_InstanceCreateRunArgs',
'pipeline_snapshot run_id environment_dict mode selector '
'step_keys_to_execute status tags root_run_id parent_run_id',
'pipeline_snapshot execution_plan_snapshot run_id environment_dict '
'mode selector step_keys_to_execute status tags root_run_id '
'parent_run_id',
),
):
def __new__(
cls,
pipeline_snapshot,
execution_plan_snapshot,
run_id,
environment_dict,
mode,
Expand All @@ -89,13 +91,17 @@ def __new__(
parent_run_id,
):

from dagster.core.snap.execution_plan_snapshot import ExecutionPlanSnapshot
from dagster.core.snap.pipeline_snapshot import PipelineSnapshot

return super(InstanceCreateRunArgs, cls).__new__(
cls,
pipeline_snapshot=check.inst_param(
pipeline_snapshot, 'pipeline_snapshot', PipelineSnapshot
),
execution_plan_snapshot=check.inst_param(
execution_plan_snapshot, 'execution_plan_snapshot', ExecutionPlanSnapshot
),
run_id=check.str_param(run_id, 'run_id'),
environment_dict=check.opt_dict_param(
environment_dict, 'environment_dict', key_type=str
Expand Down

0 comments on commit d62487a

Please sign in to comment.