Skip to content

Commit

Permalink
[dagster-airflow 2/2] Remove class-based airflow test fixtures
Browse files Browse the repository at this point in the history
Summary:

Test Plan: unit

Reviewers:

Subscribers:
  • Loading branch information
Nate Kupp committed Jan 3, 2020
1 parent 29c38a8 commit 449e9b8
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 293 deletions.
214 changes: 102 additions & 112 deletions python_modules/dagster-airflow/dagster_airflow/test_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@
from dagster.utils import load_yaml_from_glob_list


def get_dagster_docker_image():
# Will be set in environment by pipeline.py -> tox.ini to:
# ${AWS_ACCOUNT_ID}.dkr.ecr.us-west-1.amazonaws.com/dagster-docker-buildkite:${BUILDKITE_BUILD_ID}-${TOX_PY_VERSION}
return os.environ['DAGSTER_DOCKER_IMAGE']


def execute_tasks_in_dag(dag, tasks, run_id, execution_date):
assert isinstance(dag, DAG)

Expand All @@ -45,138 +39,134 @@ def execute_tasks_in_dag(dag, tasks, run_id, execution_date):
return results


@pytest.fixture(scope='class')
def dagster_airflow_python_operator_pipeline(request):
@pytest.fixture(scope='function')
def dagster_airflow_python_operator_pipeline():
'''This is a test fixture for running Dagster pipelines as Airflow DAGs.
Usage:
# alternatively, import this fixture into your conftest.py
from dagster_airflow.test_fixtures import dagster_airflow_python_operator_pipeline
class TestMyPipeline(object):
handle = ExecutionTargetHandle.for_pipeline_fn(define_pipeline)
config = {'solids': {'my_solid': 'foo'}}
# alternatively, pass a list of globs to be assembled into a config yaml
# config_yaml = ['environments/test_*.yaml']
run_id = 'test_run_3'
execution_date = datetime.datetime(2019, 1, 1)
def test_pipeline_results(dagster_airflow_python_operator_pipeline):
# This is a list of the parsed JSON returned by calling executePlan for each
# solid in the pipeline
results = dagster_airflow_python_operator_pipeline
assert len(results) = 3
# This produces a list of the parsed JSON returned by calling executePlan for each
# solid in the pipeline
results = dagster_airflow_python_operator_pipeline(
pipeline_name='test_pipeline',
handle=ExecutionTargetHandle.for_pipeline_fn(define_pipeline),
environment_yaml=['environments/test_*.yaml']
)
assert len(results) == 3
'''
from .factory import make_airflow_dag_for_handle
from .vendor.python_operator import PythonOperator

handle = getattr(request.cls, 'handle')
pipeline_name = getattr(request.cls, 'pipeline_name')
environment_dict = getattr(request.cls, 'environment_dict', None)
environment_yaml = getattr(request.cls, 'environment_yaml', None)
op_kwargs = getattr(request.cls, 'op_kwargs', {})
mode = getattr(request.cls, 'mode', None)
def _pipeline_fn(
handle,
pipeline_name,
environment_dict=None,
environment_yaml=None,
op_kwargs=None,
mode=None,
execution_date=timezone.utcnow(),
):
if environment_dict is None and environment_yaml is not None:
environment_dict = load_yaml_from_glob_list(environment_yaml)

if environment_dict is None and environment_yaml is not None:
environment_dict = load_yaml_from_glob_list(environment_yaml)
run_id = getattr(request.cls, 'run_id', str(uuid.uuid4()))
execution_date = getattr(request.cls, 'execution_date', timezone.utcnow())
dag, tasks = make_airflow_dag_for_handle(
handle, pipeline_name, environment_dict, mode=mode, op_kwargs=op_kwargs
)
assert isinstance(dag, DAG)

dag, tasks = make_airflow_dag_for_handle(
handle, pipeline_name, environment_dict, mode=mode, op_kwargs=op_kwargs
)
for task in tasks:
assert isinstance(task, PythonOperator)

assert isinstance(dag, DAG)
return execute_tasks_in_dag(
dag, tasks, run_id=str(uuid.uuid4()), execution_date=execution_date
)

for task in tasks:
assert isinstance(task, PythonOperator)
return _pipeline_fn

return execute_tasks_in_dag(dag, tasks, run_id, execution_date)


@pytest.fixture(scope='class')
def dagster_airflow_docker_operator_pipeline(request):
@pytest.fixture(scope='function')
def dagster_airflow_docker_operator_pipeline():
'''This is a test fixture for running Dagster pipelines as containerized Airflow DAGs.
Usage:
# alternatively, import this fixture into your conftest.py
from dagster_airflow.test_fixtures import dagster_airflow_docker_operator_pipeline
class TestMyPipeline(object):
pipeline = define_my_pipeline()
config = {'solids': {'my_solid': 'foo'}}
# alternatively, pass a list of globs to be assembled into a config yaml
# config_yaml = ['environments/test_*.yaml']
run_id = 'test_run_3'
execution_date = datetime.datetime(2019, 1, 1)
def test_pipeline_results(dagster_airflow_docker_operator_pipeline):
# This is a list of the parsed JSON returned by calling executePlan for each
# solid in the pipeline
results = dagster_airflow_docker_operator_pipeline
assert len(results) = 3
'''
from .factory import make_airflow_dag_containerized_for_handle
from .operators.docker_operator import DagsterDockerOperator

handle = getattr(request.cls, 'handle')
pipeline_name = getattr(request.cls, 'pipeline_name')
# Removed image parameter and made it hard-coded. See:
# https://github.com/dagster-io/dagster/issues/2041
# getattr(request.cls, 'image')
image = get_dagster_docker_image()
environment_dict = getattr(request.cls, 'environment_dict', None)
environment_yaml = getattr(request.cls, 'environment_yaml', [])
op_kwargs = getattr(request.cls, 'op_kwargs', {})

if environment_dict is None and environment_yaml is not None:
environment_dict = load_yaml_from_glob_list(environment_yaml)
run_id = getattr(request.cls, 'run_id', str(uuid.uuid4()))
execution_date = getattr(request.cls, 'execution_date', timezone.utcnow())

dag, tasks = make_airflow_dag_containerized_for_handle(
handle, pipeline_name, image, environment_dict, op_kwargs=op_kwargs
)

for task in tasks:
assert isinstance(task, DagsterDockerOperator)

return execute_tasks_in_dag(dag, tasks, run_id, execution_date)


@pytest.fixture(scope='class')
def dagster_airflow_k8s_operator_pipeline(request):
def _pipeline_fn(
handle,
pipeline_name,
image,
environment_dict=None,
environment_yaml=None,
op_kwargs=None,
mode=None,
execution_date=timezone.utcnow(),
):
if environment_dict is None and environment_yaml is not None:
environment_dict = load_yaml_from_glob_list(environment_yaml)

dag, tasks = make_airflow_dag_containerized_for_handle(
handle=handle,
pipeline_name=pipeline_name,
image=image,
mode=mode,
environment_dict=environment_dict,
op_kwargs=op_kwargs,
)
assert isinstance(dag, DAG)

for task in tasks:
assert isinstance(task, DagsterDockerOperator)

return execute_tasks_in_dag(
dag, tasks, run_id=str(uuid.uuid4()), execution_date=execution_date
)

return _pipeline_fn


@pytest.fixture(scope='function')
def dagster_airflow_k8s_operator_pipeline():
'''This is a test fixture for running Dagster pipelines on Airflow + K8s.
'''
from .factory import make_airflow_dag_kubernetized_for_handle
from .operators.kubernetes_operator import DagsterKubernetesPodOperator

handle = getattr(request.cls, 'handle')
pipeline_name = getattr(request.cls, 'pipeline_name')
# Removed image parameter and made it hard-coded. See:
# https://github.com/dagster-io/dagster/issues/2041
# getattr(request.cls, 'image')
image = get_dagster_docker_image()
namespace = getattr(request.cls, 'namespace', 'default')
environment_dict = getattr(request.cls, 'environment_dict', None)
environment_yaml = getattr(request.cls, 'environment_yaml', [])
op_kwargs = getattr(request.cls, 'op_kwargs', {})

if environment_dict is None and environment_yaml is not None:
environment_dict = load_yaml_from_glob_list(environment_yaml)
run_id = getattr(request.cls, 'run_id', str(uuid.uuid4()))
execution_date = getattr(request.cls, 'execution_date', timezone.utcnow())

dag, tasks = make_airflow_dag_kubernetized_for_handle(
handle=handle,
pipeline_name=pipeline_name,
image=image,
namespace=namespace,
environment_dict=environment_dict,
op_kwargs=op_kwargs,
)

for task in tasks:
assert isinstance(task, DagsterKubernetesPodOperator)

return execute_tasks_in_dag(dag, tasks, run_id, execution_date)
def _pipeline_fn(
handle,
pipeline_name,
image,
environment_dict=None,
environment_yaml=None,
op_kwargs=None,
mode=None,
namespace='default',
execution_date=timezone.utcnow(),
):
if environment_dict is None and environment_yaml is not None:
environment_dict = load_yaml_from_glob_list(environment_yaml)

dag, tasks = make_airflow_dag_kubernetized_for_handle(
handle=handle,
pipeline_name=pipeline_name,
image=image,
mode=mode,
namespace=namespace,
environment_dict=environment_dict,
op_kwargs=op_kwargs,
)
assert isinstance(dag, DAG)

for task in tasks:
assert isinstance(task, DagsterKubernetesPodOperator)

return execute_tasks_in_dag(
dag, tasks, run_id=str(uuid.uuid4()), execution_date=execution_date
)

return _pipeline_fn
16 changes: 13 additions & 3 deletions python_modules/dagster-airflow/dagster_airflow_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import docker
import pytest
import six
from dagster_airflow.test_fixtures import get_dagster_docker_image

from dagster import check
from dagster.utils import load_yaml_from_path, mkdir_p, pushd, script_relative_path
Expand Down Expand Up @@ -105,11 +104,22 @@ def docker_client():


@pytest.fixture(scope='session')
def build_docker_image(test_repo_path, docker_client):
def dagster_docker_image():
assert (
'DAGSTER_DOCKER_IMAGE' in os.environ
), 'DAGSTER_DOCKER_IMAGE must be set in your environment for these tests'

# Will be set in environment by .buildkite/pipeline.py -> tox.ini to:
# ${AWS_ACCOUNT_ID}.dkr.ecr.us-west-1.amazonaws.com/dagster-docker-buildkite:${BUILDKITE_BUILD_ID}-${TOX_PY_VERSION}
return os.environ['DAGSTER_DOCKER_IMAGE']


@pytest.fixture(scope='session')
def build_docker_image(test_repo_path, docker_client, dagster_docker_image):
with pushd(test_repo_path):
subprocess.check_output(['./build.sh'], shell=True)

return get_dagster_docker_image()
return dagster_docker_image


@pytest.fixture(scope='session')
Expand Down
Loading

0 comments on commit 449e9b8

Please sign in to comment.