Skip to content

Commit

Permalink
Collapse execute_pipeline_with_mode and _with_preset
Browse files Browse the repository at this point in the history
Summary: Seems straightforwardly easier to understand. Happy to add deprecation warnings rather than summarily deleting.

Test Plan: Unit

Reviewers: schrockn, alangenfeld, prha, sashank, nate, catherinewu, yuhan

Reviewed By: schrockn

Differential Revision: https://dagster.phacility.com/D2622
  • Loading branch information
mgasner committed Apr 21, 2020
1 parent e37c851 commit daa5837
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 190 deletions.
11 changes: 10 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Changelog

## (Future) 0.7.7
## 0.7.8 (Upcoming)

**Breaking Changes**

- The `execute_pipeline_with_mode` and `execute_pipeline_with_preset` APIs have been dropped in
favor of new top level arguments to `execute_pipeline`, `mode` and `preset`.
- The use of `RunConfig` to pass options to `execute_pipeline` has been deprecated, and `RunConfig`
will be removed in 0.8.0.

## 0.7.7

**Breaking Changes**

Expand Down
4 changes: 0 additions & 4 deletions docs/sections/api/apidocs/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ Executing pipelines

.. autofunction:: execute_pipeline_iterator

.. autofunction:: execute_pipeline_with_preset

.. autofunction:: execute_pipeline_with_mode

Executing solids
----------------

Expand Down
2 changes: 1 addition & 1 deletion docs/sections/tutorial/presets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ From the CLI, use ``-p`` or ``--preset``:
$ dagster pipeline execute -f presets.py -n presets_pipeline -p unittest
From Python, you can use :py:func:`execute_pipeline_with_preset <dagster.execute_pipeline_with_preset>`:
From Python, you can use :py:func:`execute_pipeline <dagster.execute_pipeline>`:

.. literalinclude:: ../../../examples/dagster_examples/intro_tutorial/presets.py
:lines: 171
Expand Down
4 changes: 2 additions & 2 deletions examples/dagster_examples/intro_tutorial/modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
Field,
ModeDefinition,
String,
execute_pipeline_with_mode,
execute_pipeline,
pipeline,
resource,
solid,
Expand Down Expand Up @@ -148,7 +148,7 @@ def modes_pipeline():
},
'resources': {'warehouse': {'config': {'conn_str': ':memory:'}}},
}
result = execute_pipeline_with_mode(
result = execute_pipeline(
pipeline=modes_pipeline,
mode='unittest',
environment_dict=environment_dict,
Expand Down
4 changes: 2 additions & 2 deletions examples/dagster_examples/intro_tutorial/presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ModeDefinition,
PresetDefinition,
String,
execute_pipeline_with_preset,
execute_pipeline,
pipeline,
resource,
solid,
Expand Down Expand Up @@ -170,5 +170,5 @@ def presets_pipeline():


if __name__ == '__main__':
result = execute_pipeline_with_preset(presets_pipeline, 'unittest')
result = execute_pipeline(presets_pipeline, preset='unittest')
assert result.success
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# pylint: disable=unused-argument
import pytest

from dagster import execute_pipeline, execute_pipeline_with_mode, file_relative_path
from dagster import execute_pipeline, file_relative_path
from dagster.cli.load_handle import handle_for_pipeline_cli_args
from dagster.core.instance import DagsterInstance
from dagster.utils import load_yaml_from_globs
Expand Down Expand Up @@ -39,7 +39,7 @@ def test_ingest_pipeline_fast(postgres, pg_hostname):
ingest_config_dict = load_yaml_from_globs(
config_path('test_base.yaml'), config_path('local_fast_ingest.yaml')
)
result_ingest = execute_pipeline_with_mode(
result_ingest = execute_pipeline(
pipeline=ingest_pipeline_def,
mode='local',
environment_dict=ingest_config_dict,
Expand All @@ -59,7 +59,7 @@ def test_ingest_pipeline_fast_filesystem_storage(postgres, pg_hostname):
config_path('local_fast_ingest.yaml'),
config_path('filesystem_storage.yaml'),
)
result_ingest = execute_pipeline_with_mode(
result_ingest = execute_pipeline(
pipeline=ingest_pipeline_def,
mode='local',
environment_dict=ingest_config_dict,
Expand All @@ -77,7 +77,7 @@ def test_airline_pipeline_1_warehouse(postgres, pg_hostname):
warehouse_config_object = load_yaml_from_globs(
config_path('test_base.yaml'), config_path('local_warehouse.yaml')
)
result_warehouse = execute_pipeline_with_mode(
result_warehouse = execute_pipeline(
pipeline=warehouse_pipeline_def,
mode='local',
environment_dict=warehouse_config_object,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pandas import DataFrame, Timestamp
from requests import HTTPError

from dagster import ModeDefinition, execute_pipeline_with_mode, execute_solid, pipeline, seven
from dagster import ModeDefinition, execute_pipeline, execute_solid, pipeline, seven

START_TIME = 1514793600
VOLUME_TARGET_DIRECTORY = '/tmp/bar'
Expand Down Expand Up @@ -203,7 +203,7 @@ def test_generate_training_set(mocker):
mocker.patch('dagster_examples.bay_bikes.solids.read_sql_table', side_effect=mock_read_sql)

# Execute Pipeline
test_pipeline_result = execute_pipeline_with_mode(
test_pipeline_result = execute_pipeline(
pipeline=generate_test_training_set_pipeline,
mode='testing',
environment_dict=compose_training_data_env_dict(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dagster_examples.intro_tutorial.presets import SqlAlchemyPostgresWarehouse as sapw2
from dagster_examples.intro_tutorial.presets import presets_pipeline

from dagster import execute_pipeline_with_mode, execute_pipeline_with_preset
from dagster import execute_pipeline
from dagster.utils import pushd, script_relative_path

BUILDKITE = bool(os.getenv('BUILDKITE'))
Expand All @@ -27,12 +27,12 @@ def test_warehouse_resource(postgres):
'resources': {'warehouse': {'config': {'conn_str': postgres}}},
}
with pushd(script_relative_path('../../dagster_examples/intro_tutorial/')):
result = execute_pipeline_with_mode(
result = execute_pipeline(
pipeline=modes_pipeline, mode='dev', environment_dict=environment_dict,
)
assert result.success

if not BUILDKITE:
with pushd(script_relative_path('../../dagster_examples/intro_tutorial/')):
result = execute_pipeline_with_preset(presets_pipeline, preset_name='dev')
result = execute_pipeline(presets_pipeline, preset='dev')
assert result.success
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@
sum_solid,
)

from dagster import (
execute_pipeline,
execute_pipeline_with_preset,
execute_solid,
file_relative_path,
)
from dagster import execute_pipeline, execute_solid, file_relative_path


def test_execute_pandas_hello_world_solid():
Expand Down Expand Up @@ -79,8 +74,8 @@ def test_execute_pandas_hello_world_pipeline_with_read_csv():


def test_execute_hello_world_with_preset_test():
assert execute_pipeline_with_preset(pandas_hello_world_pipeline, 'test').success
assert execute_pipeline(pandas_hello_world_pipeline, preset='test').success


def test_execute_hello_world_with_preset_prod():
assert execute_pipeline_with_preset(pandas_hello_world_pipeline, 'prod').success
assert execute_pipeline(pandas_hello_world_pipeline, preset='prod').success
5 changes: 2 additions & 3 deletions examples/dagster_examples_tests/test_toys/test_toys.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
DagsterResourceFunctionError,
DagsterTypeCheckDidNotPass,
execute_pipeline,
execute_pipeline_with_mode,
)


Expand Down Expand Up @@ -65,7 +64,7 @@ def test_error_monster_success():
},
).success

assert execute_pipeline_with_mode(
assert execute_pipeline(
pipeline=error_monster,
mode='errorable_mode',
environment_dict={
Expand All @@ -81,7 +80,7 @@ def test_error_monster_success():

def test_error_monster_wrong_mode():
with pytest.raises(DagsterInvariantViolationError):
execute_pipeline_with_mode(
execute_pipeline(
pipeline=error_monster,
mode='nope',
environment_dict={
Expand Down
4 changes: 0 additions & 4 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@
execute_partition_set,
execute_pipeline,
execute_pipeline_iterator,
execute_pipeline_with_mode,
execute_pipeline_with_preset,
)
from dagster.core.execution.config import ExecutorConfig, RunConfig
from dagster.core.execution.context.compute import SolidExecutionContext
Expand Down Expand Up @@ -195,8 +193,6 @@
'default_executors',
'default_system_storage_defs',
'execute_pipeline_iterator',
'execute_pipeline_with_mode',
'execute_pipeline_with_preset',
'execute_pipeline',
'execute_solid_within_pipeline',
'fs_system_storage',
Expand Down
12 changes: 3 additions & 9 deletions python_modules/dagster/dagster/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@
import six
import yaml

from dagster import (
PipelineDefinition,
RunConfig,
check,
execute_pipeline,
execute_pipeline_with_preset,
)
from dagster import PipelineDefinition, RunConfig, check, execute_pipeline
from dagster.cli.load_handle import handle_for_pipeline_cli_args, handle_for_repo_cli_args
from dagster.cli.load_snapshot import get_pipeline_snapshot_from_cli_args
from dagster.core.definitions import ExecutionTargetHandle
Expand Down Expand Up @@ -329,9 +323,9 @@ def execute_execute_command_with_preset(preset_name, cli_args, _mode):
cli_args.pop('pipeline_name')
tags = get_tags_from_args(cli_args)

return execute_pipeline_with_preset(
return execute_pipeline(
pipeline,
preset_name,
preset=preset_name,
instance=DagsterInstance.get(),
raise_on_error=False,
run_config=RunConfig(tags=tags),
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/core/definitions/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PresetDefinition(namedtuple('_PresetDefinition', 'name environment_dict so
.. code-block:: python
execute_pipeline_with_preset(pipeline_def, 'example_preset')
execute_pipeline(pipeline_def, preset='example_preset')
Presets may also be used with the command line tools:
Expand Down
Loading

0 comments on commit daa5837

Please sign in to comment.