Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[graph diffing] Class for computing diff between parent and branch asset graphs- FOU-22 #19681

Merged
merged 20 commits into from
Feb 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update test suite
  • Loading branch information
jamiedemaria committed Feb 20, 2024
commit c206a6d4f1652cc2561a6930c0ffc82966a6229e
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pytest
from dagster import DagsterInstance, asset, instance_for_test
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.branch_changes import BranchChangeResolver
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.host_representation.origin import InProcessCodeLocationOrigin
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
Expand Down Expand Up @@ -55,7 +57,7 @@ def _make_location_entry(parent_graph_name: str, instance: DagsterInstance):
)


def _make_context(instance: DagsterInstance, parent_graph_names):
def _make_workspace_context(instance: DagsterInstance, parent_graph_names):
return WorkspaceRequestContext(
instance=mock.MagicMock(),
workspace_snapshot={
Expand All @@ -68,13 +70,13 @@ def _make_context(instance: DagsterInstance, parent_graph_names):
)


def get_parent_deployment_graph(parent_graph_name, instance):
return ExternalAssetGraph.from_workspace(_make_context(instance, [parent_graph_name]))


def get_branch_deployment_graph_with_code_changes(
def _get_branch_deployment_graph_with_code_changes(
parent_graph_name, new_assets: Optional[List] = None, updated_assets: Optional[List] = None
):
"""Applies the provided changes to a parent asset graph. We do this by getting the list of
assets from the parent Definitions object and adding in the new_assets and replacing an
updated_assets. Then we can make a new Definitions object and make an AssetGraph.
"""
parent_asset_graph = parent_deployment_defs_by_name[parent_graph_name].get_asset_graph()
parent_assets_by_key = {
asset.key: asset
Expand All @@ -94,36 +96,44 @@ def get_branch_deployment_graph_with_code_changes(
)


def compute_graph_diff(parent_graph: AssetGraph, branch_graph: AssetGraph):
changes = {}
for asset_key in branch_graph.all_asset_keys:
if asset_key in parent_graph.all_asset_keys:
all_changes = []
if branch_graph.get_code_version(asset_key) != parent_graph.get_code_version(asset_key):
all_changes.append("code_version")
if branch_graph.get_parents(asset_key) != parent_graph.get_parents(asset_key):
all_changes.append("inputs")
changes[asset_key] = all_changes
else:
changes[asset_key] = ["new"]
def get_branch_change_resolver_for_parent_graph(
instance,
parent_graph_name: str,
new_assets: Optional[List] = None,
updated_assets: Optional[List] = None,
):
"""Returns a subclass of BranchChangeResolver with some deployment-specific methods overwritten so that we can
effectively run unit tests. In our tests we want to always be considered in a branch deployment, and
the method for getting the parent asset graph is different than in a real branch deployment.
"""
branch_graph = _get_branch_deployment_graph_with_code_changes(
parent_graph_name=parent_graph_name, new_assets=new_assets, updated_assets=updated_assets
)

class TestingBranchChangeResolver(BranchChangeResolver):
def _get_parent_deployment_asset_graph(self):
return ExternalAssetGraph.from_workspace(
_make_workspace_context(instance, [parent_graph_name])
)

def _is_branch_deployment(self) -> bool:
# for testing, we want to always be in a branch deployment
return True

return changes
return TestingBranchChangeResolver(instance=instance, branch_asset_graph=branch_graph)


def test_new_asset(instance):
parent_deployment_asset_graph = get_parent_deployment_graph("basic_asset_graph", instance)

@asset
def new_asset():
return 1

branch_deployment_asset_graph = get_branch_deployment_graph_with_code_changes(
parent_graph_name="basic_asset_graph", new_assets=[new_asset]
resolver = get_branch_change_resolver_for_parent_graph(
instance=instance, parent_graph_name="basic_asset_graph", new_assets=[new_asset]
)

diff = compute_graph_diff(parent_deployment_asset_graph, branch_deployment_asset_graph)

assert diff.get(new_asset.key) == ["new"]
assert resolver.is_changed_in_branch(new_asset.key)
assert not resolver.is_changed_in_branch(AssetKey("upstream"))


"""
Expand Down