Skip to content

Commit

Permalink
Rename ChangeReason to AssetDefinitionChangeType (dagster-io#23156)
Browse files Browse the repository at this point in the history
## Summary

Based on suggestion from @sryza, updates the name of `ChangeReason` to
`AssetDefinitionChangeType` now that the situations in which we're using
it have expanded.
  • Loading branch information
benpankow committed Jul 22, 2024
1 parent a0bb38e commit 075b254
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
DagsterError,
_check as check,
)
from dagster._core.definitions.asset_graph_differ import AssetGraphDiffer, ChangeReason
from dagster._core.definitions.asset_graph_differ import AssetDefinitionChangeType, AssetGraphDiffer
from dagster._core.definitions.asset_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -103,7 +103,7 @@
StaleCauseCategory, name="StaleCauseCategory"
)

GrapheneAssetChangedReason = graphene.Enum.from_enum(ChangeReason, name="ChangeReason")
GrapheneAssetChangedReason = graphene.Enum.from_enum(AssetDefinitionChangeType, name="ChangeReason")


class GrapheneUserAssetOwner(graphene.ObjectType):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@


@whitelist_for_serdes
class ChangeReason(Enum):
class AssetDefinitionChangeType(Enum):
"""What change an asset has undergone between two deployments. Used
in distinguishing asset definition changes in branch deployment and
in subsequent other deployments.
"""

NEW = "NEW"
CODE_VERSION = "CODE_VERSION"
DEPENDENCIES = "DEPENDENCIES"
Expand Down Expand Up @@ -110,52 +115,54 @@ def from_external_repositories(
base_asset_graph=(lambda: base_repo.asset_graph) if base_repo is not None else None,
)

def _compare_base_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[ChangeReason]:
def _compare_base_and_branch_assets(
self, asset_key: "AssetKey"
) -> Sequence[AssetDefinitionChangeType]:
"""Computes the diff between a branch deployment asset and the
corresponding base deployment asset.
"""
if self.base_asset_graph is None:
# if the base asset graph is None, it is because the asset graph in the branch deployment
# is new and doesn't exist in the base deployment. Thus all assets are new.
return [ChangeReason.NEW]
return [AssetDefinitionChangeType.NEW]

if asset_key not in self.base_asset_graph.all_asset_keys:
return [ChangeReason.NEW]
return [AssetDefinitionChangeType.NEW]

if asset_key not in self.branch_asset_graph.all_asset_keys:
return [ChangeReason.REMOVED]
return [AssetDefinitionChangeType.REMOVED]

branch_asset = self.branch_asset_graph.get(asset_key)
base_asset = self.base_asset_graph.get(asset_key)

changes = []
if branch_asset.code_version != base_asset.code_version:
changes.append(ChangeReason.CODE_VERSION)
changes.append(AssetDefinitionChangeType.CODE_VERSION)

if branch_asset.parent_keys != base_asset.parent_keys:
changes.append(ChangeReason.DEPENDENCIES)
changes.append(AssetDefinitionChangeType.DEPENDENCIES)
else:
# if the set of upstream dependencies is different, then we don't need to check if the partition mappings
# for dependencies have changed since ChangeReason.DEPENDENCIES is already in the list of changes
for upstream_asset in branch_asset.parent_keys:
if self.branch_asset_graph.get_partition_mapping(
asset_key, upstream_asset
) != self.base_asset_graph.get_partition_mapping(asset_key, upstream_asset):
changes.append(ChangeReason.DEPENDENCIES)
changes.append(AssetDefinitionChangeType.DEPENDENCIES)
break

if branch_asset.partitions_def != base_asset.partitions_def:
changes.append(ChangeReason.PARTITIONS_DEFINITION)
changes.append(AssetDefinitionChangeType.PARTITIONS_DEFINITION)

if branch_asset.tags != base_asset.tags:
changes.append(ChangeReason.TAGS)
changes.append(AssetDefinitionChangeType.TAGS)

if branch_asset.metadata != base_asset.metadata:
changes.append(ChangeReason.METADATA)
changes.append(AssetDefinitionChangeType.METADATA)

return changes

def get_changes_for_asset(self, asset_key: "AssetKey") -> Sequence[ChangeReason]:
def get_changes_for_asset(self, asset_key: "AssetKey") -> Sequence[AssetDefinitionChangeType]:
"""Returns list of ChangeReasons for asset_key as compared to the base deployment."""
return self._compare_base_and_branch_assets(asset_key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest
from dagster import DagsterInstance, instance_for_test
from dagster._core.definitions.asset_graph_differ import AssetGraphDiffer, ChangeReason
from dagster._core.definitions.asset_graph_differ import AssetDefinitionChangeType, AssetGraphDiffer
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.repository_definition.valid_definitions import (
SINGLETON_REPOSITORY_NAME,
Expand Down Expand Up @@ -112,7 +112,7 @@ def test_new_asset(instance):
branch_code_location_to_definitions={"basic_asset_graph": "branch_deployment_new_asset"},
)

assert differ.get_changes_for_asset(AssetKey("new_asset")) == [ChangeReason.NEW]
assert differ.get_changes_for_asset(AssetKey("new_asset")) == [AssetDefinitionChangeType.NEW]
assert len(differ.get_changes_for_asset(AssetKey("upstream"))) == 0


Expand All @@ -126,7 +126,9 @@ def test_removed_asset(instance) -> None:
},
)

assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.REMOVED]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [
AssetDefinitionChangeType.REMOVED
]
assert len(differ.get_changes_for_asset(AssetKey("upstream"))) == 0


Expand All @@ -140,8 +142,10 @@ def test_new_asset_connected(instance):
},
)

assert differ.get_changes_for_asset(AssetKey("new_asset")) == [ChangeReason.NEW]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.DEPENDENCIES]
assert differ.get_changes_for_asset(AssetKey("new_asset")) == [AssetDefinitionChangeType.NEW]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [
AssetDefinitionChangeType.DEPENDENCIES
]
assert len(differ.get_changes_for_asset(AssetKey("upstream"))) == 0


Expand All @@ -155,7 +159,9 @@ def test_update_code_version(instance):
},
)

assert differ.get_changes_for_asset(AssetKey("upstream")) == [ChangeReason.CODE_VERSION]
assert differ.get_changes_for_asset(AssetKey("upstream")) == [
AssetDefinitionChangeType.CODE_VERSION
]
assert len(differ.get_changes_for_asset(AssetKey("downstream"))) == 0


Expand All @@ -169,7 +175,9 @@ def test_change_inputs(instance):
},
)

assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.DEPENDENCIES]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [
AssetDefinitionChangeType.DEPENDENCIES
]
assert len(differ.get_changes_for_asset(AssetKey("upstream"))) == 0


Expand All @@ -184,8 +192,8 @@ def test_multiple_changes_for_one_asset(instance):
)

assert differ.get_changes_for_asset(AssetKey("downstream")) == [
ChangeReason.CODE_VERSION,
ChangeReason.DEPENDENCIES,
AssetDefinitionChangeType.CODE_VERSION,
AssetDefinitionChangeType.DEPENDENCIES,
]
assert len(differ.get_changes_for_asset(AssetKey("upstream"))) == 0

Expand All @@ -200,7 +208,9 @@ def test_change_then_revert(instance):
},
)

assert differ.get_changes_for_asset(AssetKey("upstream")) == [ChangeReason.CODE_VERSION]
assert differ.get_changes_for_asset(AssetKey("upstream")) == [
AssetDefinitionChangeType.CODE_VERSION
]
assert len(differ.get_changes_for_asset(AssetKey("downstream"))) == 0

differ = get_asset_graph_differ(
Expand All @@ -226,7 +236,7 @@ def test_large_asset_graph(instance):

for i in range(6, 1000):
key = AssetKey(f"asset_{i}")
assert differ.get_changes_for_asset(key) == [ChangeReason.DEPENDENCIES]
assert differ.get_changes_for_asset(key) == [AssetDefinitionChangeType.DEPENDENCIES]

for i in range(6):
key = AssetKey(f"asset_{i}")
Expand All @@ -247,8 +257,10 @@ def test_multiple_code_locations(instance):
)

# if the code_versions_asset_graph were in the diff computation, ChangeReason.CODE_VERSION would be in the list
assert differ.get_changes_for_asset(AssetKey("new_asset")) == [ChangeReason.NEW]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.DEPENDENCIES]
assert differ.get_changes_for_asset(AssetKey("new_asset")) == [AssetDefinitionChangeType.NEW]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [
AssetDefinitionChangeType.DEPENDENCIES
]
assert len(differ.get_changes_for_asset(AssetKey("upstream"))) == 0


Expand All @@ -259,9 +271,9 @@ def test_new_code_location(instance):
base_code_locations=[],
branch_code_location_to_definitions={"basic_asset_graph": "branch_deployment_new_asset"},
)
assert differ.get_changes_for_asset(AssetKey("new_asset")) == [ChangeReason.NEW]
assert differ.get_changes_for_asset(AssetKey("upstream")) == [ChangeReason.NEW]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.NEW]
assert differ.get_changes_for_asset(AssetKey("new_asset")) == [AssetDefinitionChangeType.NEW]
assert differ.get_changes_for_asset(AssetKey("upstream")) == [AssetDefinitionChangeType.NEW]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [AssetDefinitionChangeType.NEW]


def test_change_partitions_definitions(instance):
Expand All @@ -274,22 +286,22 @@ def test_change_partitions_definitions(instance):
},
)
assert differ.get_changes_for_asset(AssetKey("daily_upstream")) == [
ChangeReason.PARTITIONS_DEFINITION
AssetDefinitionChangeType.PARTITIONS_DEFINITION
]
assert differ.get_changes_for_asset(AssetKey("daily_downstream")) == [
ChangeReason.PARTITIONS_DEFINITION
AssetDefinitionChangeType.PARTITIONS_DEFINITION
]
assert differ.get_changes_for_asset(AssetKey("static_upstream")) == [
ChangeReason.PARTITIONS_DEFINITION
AssetDefinitionChangeType.PARTITIONS_DEFINITION
]
assert differ.get_changes_for_asset(AssetKey("static_downstream")) == [
ChangeReason.PARTITIONS_DEFINITION
AssetDefinitionChangeType.PARTITIONS_DEFINITION
]
assert differ.get_changes_for_asset(AssetKey("multi_partitioned_upstream")) == [
ChangeReason.PARTITIONS_DEFINITION
AssetDefinitionChangeType.PARTITIONS_DEFINITION
]
assert differ.get_changes_for_asset(AssetKey("multi_partitioned_downstream")) == [
ChangeReason.PARTITIONS_DEFINITION
AssetDefinitionChangeType.PARTITIONS_DEFINITION
]


Expand All @@ -303,14 +315,16 @@ def test_change_partition_mapping(instance):
},
)
assert len(differ.get_changes_for_asset(AssetKey("daily_upstream"))) == 0
assert differ.get_changes_for_asset(AssetKey("daily_downstream")) == [ChangeReason.DEPENDENCIES]
assert differ.get_changes_for_asset(AssetKey("daily_downstream")) == [
AssetDefinitionChangeType.DEPENDENCIES
]
assert len(differ.get_changes_for_asset(AssetKey("static_upstream"))) == 0
assert differ.get_changes_for_asset(AssetKey("static_downstream")) == [
ChangeReason.DEPENDENCIES
AssetDefinitionChangeType.DEPENDENCIES
]
assert len(differ.get_changes_for_asset(AssetKey("multi_partitioned_upstream"))) == 0
assert differ.get_changes_for_asset(AssetKey("multi_partitioned_downstream")) == [
ChangeReason.DEPENDENCIES
AssetDefinitionChangeType.DEPENDENCIES
]


Expand All @@ -321,10 +335,10 @@ def test_change_tags(instance):
base_code_locations=["tags_asset_graph"],
branch_code_location_to_definitions={"tags_asset_graph": "branch_deployment_change_tags"},
)
assert differ.get_changes_for_asset(AssetKey("upstream")) == [ChangeReason.TAGS]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.TAGS]
assert differ.get_changes_for_asset(AssetKey("fruits")) == [ChangeReason.TAGS]
assert differ.get_changes_for_asset(AssetKey("letters")) == [ChangeReason.TAGS]
assert differ.get_changes_for_asset(AssetKey("upstream")) == [AssetDefinitionChangeType.TAGS]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [AssetDefinitionChangeType.TAGS]
assert differ.get_changes_for_asset(AssetKey("fruits")) == [AssetDefinitionChangeType.TAGS]
assert differ.get_changes_for_asset(AssetKey("letters")) == [AssetDefinitionChangeType.TAGS]
assert len(differ.get_changes_for_asset(AssetKey("numbers"))) == 0


Expand All @@ -337,8 +351,12 @@ def test_change_metadata(instance):
"metadata_asset_graph": "branch_deployment_change_metadata"
},
)
assert differ.get_changes_for_asset(AssetKey("upstream")) == [ChangeReason.METADATA]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [ChangeReason.METADATA]
assert differ.get_changes_for_asset(AssetKey("fruits")) == [ChangeReason.METADATA]
assert differ.get_changes_for_asset(AssetKey("letters")) == [ChangeReason.METADATA]
assert differ.get_changes_for_asset(AssetKey("upstream")) == [
AssetDefinitionChangeType.METADATA
]
assert differ.get_changes_for_asset(AssetKey("downstream")) == [
AssetDefinitionChangeType.METADATA
]
assert differ.get_changes_for_asset(AssetKey("fruits")) == [AssetDefinitionChangeType.METADATA]
assert differ.get_changes_for_asset(AssetKey("letters")) == [AssetDefinitionChangeType.METADATA]
assert len(differ.get_changes_for_asset(AssetKey("numbers"))) == 0

0 comments on commit 075b254

Please sign in to comment.