Skip to content

Commit

Permalink
[1.8] remove op job versioning & memoization (dagster-io#23126)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This is a pre-1.0 experimental feature that has been superseded by asset
versioning. It was pretty cool stuff in its day, but I believe that day
has passed.

## How I Tested These Changes
  • Loading branch information
sryza committed Jul 22, 2024
1 parent 075b254 commit 91c1716
Show file tree
Hide file tree
Showing 46 changed files with 3,578 additions and 7,292 deletions.
15 changes: 0 additions & 15 deletions docs/content/_apidocs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -238,21 +238,6 @@ APIs from the core `dagster` package, divided roughly by topic:
machinery, storage, schedulers.
</td>
</tr>
<tr>
<td>
<a href="/_apidocs/memoization">Job-level versioning & memoization</a>{" "}
<Deprecated />
</td>
<td>
<strong>
Deprecated in favor of{" "}
<a href="/concepts/assets/software-defined-assets#asset-code-versions">
asset versioning
</a>
</strong>
. Code versioning and memoization of previous outputs based upon that versioning.
</td>
</tr>
<tr>
<td>
<a href="/_apidocs/repositories">Repositories</a> <Legacy />
Expand Down
7 changes: 1 addition & 6 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,6 @@
"title": "Observe your Airflow pipelines with Dagster",
"path": "/guides/migrations/observe-your-airflow-pipelines-with-dagster"
}

]
},
{
Expand Down Expand Up @@ -1320,10 +1319,6 @@
}
]
},
{
"title": "Job versioning & memoization (Deprecated)",
"path": "/_apidocs/memoization"
},
{
"title": "Repositories",
"path": "/_apidocs/repositories"
Expand Down Expand Up @@ -1593,4 +1588,4 @@
}
]
}
]
]
1 change: 0 additions & 1 deletion docs/sphinx/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
sections/api/apidocs/dynamic
sections/api/apidocs/types
sections/api/apidocs/utilities
sections/api/apidocs/memoization
sections/api/apidocs/libraries/dagster-airbyte
sections/api/apidocs/libraries/dagster-airflow
sections/api/apidocs/libraries/dagster-aws
Expand Down
32 changes: 0 additions & 32 deletions docs/sphinx/sections/api/apidocs/memoization.rst

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from typing import Optional

import pandas as pd
from dagster import AssetKey, ConfigurableIOManager, MemoizableIOManager, TableSchemaMetadataValue
from dagster import AssetKey, ConfigurableIOManager, TableSchemaMetadataValue
from dagster._core.definitions.metadata import MetadataValue
from dagster._utils.cached_method import cached_method
from pydantic import Field


class LocalCsvIOManager(ConfigurableIOManager, MemoizableIOManager):
class LocalCsvIOManager(ConfigurableIOManager):
"""Translates between Pandas DataFrames and CSVs on the local filesystem."""

base_dir: Optional[str] = Field(default=None)
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import os
import time
import uuid
from typing import Any, Mapping

import dagster._check as check
Expand All @@ -10,7 +9,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._core.storage.tags import DOCKER_IMAGE_TAG
from dagster._utils.merger import deep_merge_dicts, merge_dicts
from dagster._utils.merger import merge_dicts
from dagster._utils.yaml_utils import load_yaml_from_path
from dagster_k8s.client import DagsterKubernetesClient
from dagster_k8s.job import get_k8s_job_name
Expand All @@ -30,11 +29,9 @@
terminate_run_over_graphql,
)
from dagster_test.test_project import (
cleanup_memoized_results,
get_test_project_docker_image,
get_test_project_environments_path,
)
from dagster_test.test_project.test_jobs.repo import define_memoization_job


@pytest.mark.integration
Expand Down Expand Up @@ -431,66 +428,3 @@ def test_execute_on_k8s_retry_job(
assert DagsterEventType.STEP_SUCCESS in [
event.dagster_event.event_type for event in all_logs if event.is_dagster_event
]


@pytest.mark.integration
def test_memoization_k8s_executor(
dagster_instance_for_k8s_run_launcher,
user_code_namespace_for_k8s_run_launcher,
dagster_docker_image,
webserver_url_for_k8s_run_launcher,
):
ephemeral_path = str(uuid.uuid4())
run_config = deep_merge_dicts(
load_yaml_from_path(os.path.join(get_test_project_environments_path(), "env_s3.yaml")),
{
"execution": {
"config": {
"job_namespace": user_code_namespace_for_k8s_run_launcher,
"job_image": dagster_docker_image,
"image_pull_policy": image_pull_policy(),
}
},
},
)

run_config = deep_merge_dicts(
run_config,
{"resources": {"io_manager": {"config": {"s3_prefix": ephemeral_path}}}},
)

# wrap in try-catch to ensure that memoized results are always cleaned from s3 bucket
try:
job_name = "memoization_job_k8s"

run_ids = []
for _ in range(2):
run_id = launch_run_over_graphql(
webserver_url_for_k8s_run_launcher,
run_config=run_config,
job_name=job_name,
)

result = wait_for_job_and_get_raw_logs(
job_name="dagster-run-%s" % run_id,
namespace=user_code_namespace_for_k8s_run_launcher,
)

assert "RUN_SUCCESS" in result, f"no match, result: {result}"

run_ids.append(run_id)

# We expect that first run should have to run the step, since it has not yet been
# memoized.
unmemoized_run_id = run_ids[0]
events = dagster_instance_for_k8s_run_launcher.all_logs(unmemoized_run_id)
assert len(_get_step_execution_events(events)) == 1

# We expect that second run should not have to run the step, since it has been memoized.
memoized_run_id = run_ids[1]
events = dagster_instance_for_k8s_run_launcher.all_logs(memoized_run_id)
assert len(_get_step_execution_events(events)) == 0
finally:
cleanup_memoized_results(
define_memoization_job("k8s")(), dagster_instance_for_k8s_run_launcher, run_config
)
Loading

0 comments on commit 91c1716

Please sign in to comment.