Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[graph diffing] Class for computing diff between parent and branch asset graphs- FOU-22 #19681

Merged
merged 20 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from enum import Enum
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Union

import dagster._check as check
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.host_representation import ExternalRepository
from dagster._core.workspace.context import BaseWorkspaceRequestContext

if TYPE_CHECKING:
from dagster._core.definitions.events import (
AssetKey,
)


class ChangeReason(Enum):
NEW = "NEW"
CODE_VERSION = "CODE_VERSION"
INPUTS = "INPUTS"


def _get_external_repo_from_context(
context: BaseWorkspaceRequestContext, code_location_name: str, repository_name: str
) -> Optional[ExternalRepository]:
"""Returns the ExternalRepository specified by the code location name and repository name
for the provided workspace context. If the repository doesn't exist, return None.
"""
if context.has_code_location(code_location_name):
cl = context.get_code_location(code_location_name)
if cl.has_repository(repository_name):
return cl.get_repository(repository_name)


class AssetGraphDiffer:
"""Given two asset graphs, base_asset_graph and branch_asset_graph, we can compute how the
assets in branch_asset_graph have changed with respect to base_asset_graph. The ChangeReason
enum contains the list of potential changes an asset can undergo. If the base_asset_graph is None,
this indicates that the branch_asset_graph does not yet exist in the base deployment. In this case
we will consider every asset New.
"""

_branch_asset_graph: Optional["ExternalAssetGraph"]
_branch_asset_graph_load_fn: Optional[Callable[[], "ExternalAssetGraph"]]
_base_asset_graph: Optional["ExternalAssetGraph"]
_base_asset_graph_load_fn: Optional[Callable[[], "ExternalAssetGraph"]]

def __init__(
self,
branch_asset_graph: Union["ExternalAssetGraph", Callable[[], "ExternalAssetGraph"]],
base_asset_graph: Optional[
Union["ExternalAssetGraph", Callable[[], "ExternalAssetGraph"]]
] = None,
):
if base_asset_graph is None:
# if base_asset_graph is None, then the asset graph in the branch deployment does not exist
# in the parent deployment
self._base_asset_graph = None
self._base_asset_graph_load_fn = None
elif isinstance(base_asset_graph, ExternalAssetGraph):
self._base_asset_graph = base_asset_graph
self._base_asset_graph_load_fn = None
else:
self._base_asset_graph = None
self._base_asset_graph_load_fn = base_asset_graph

if isinstance(branch_asset_graph, ExternalAssetGraph):
self._branch_asset_graph = branch_asset_graph
self._branch_asset_graph_load_fn = None
else:
self._branch_asset_graph = None
self._branch_asset_graph_load_fn = branch_asset_graph

@classmethod
def from_external_repositories(
cls,
code_location_name: str,
repository_name: str,
branch_workspace: BaseWorkspaceRequestContext,
parent_workspace: BaseWorkspaceRequestContext,
) -> "AssetGraphDiffer":
"""Constructs a ParentAssetGraphDiffer for a particular repository in a code location for two
deployment workspaces, the parent deployment and the branch deployment.

We cannot make ExternalAssetGraphs directly from the workspaces because if multiple code locations
use the same asset key, those asset keys will override each other in the dictionaries the ExternalAssetGraph
creates (see from_repository_handles_and_external_asset_nodes in ExternalAssetGraph). We need to ensure
that we are comparing assets in the same code location and repository, so we need to make the
ExternalAssetGraph from an ExternalRepository to ensure that there are no duplicate asset keys
that could override each other.
"""
check.inst_param(branch_workspace, "branch_workspace", BaseWorkspaceRequestContext)
check.inst_param(parent_workspace, "parent_workspace", BaseWorkspaceRequestContext)

branch_repo = _get_external_repo_from_context(
branch_workspace, code_location_name, repository_name
)
if branch_repo is None:
raise DagsterInvariantViolationError(
f"Repository {repository_name} does not exist in code location {code_location_name} for the branch deployment."
)
parent_repo = _get_external_repo_from_context(
parent_workspace, code_location_name, repository_name
)
return AssetGraphDiffer(
branch_asset_graph=lambda: ExternalAssetGraph.from_external_repository(branch_repo),
base_asset_graph=(lambda: ExternalAssetGraph.from_external_repository(parent_repo))
if parent_repo is not None
else None,
)

def _compare_parent_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[ChangeReason]:
"""Computes the diff between a branch deployment asset and the
corresponding parent deployment asset.
"""
if self.base_asset_graph is None:
# if the parent asset graph is None, it is because the the asset graph in the branch deployment
# is new and doesn't exist in the parent deployment. Thus all assets are new.
return [ChangeReason.NEW]

if asset_key not in self.base_asset_graph.all_asset_keys:
return [ChangeReason.NEW]

changes = []
if self.branch_asset_graph.get_code_version(
asset_key
) != self.base_asset_graph.get_code_version(asset_key):
changes.append(ChangeReason.CODE_VERSION)

if self.branch_asset_graph.get_parents(asset_key) != self.base_asset_graph.get_parents(
asset_key
):
changes.append(ChangeReason.INPUTS)

return changes

def get_changes_for_asset(self, asset_key: "AssetKey") -> Sequence[ChangeReason]:
"""Returns list of ChangeReasons for asset_key as compared to the parent deployment."""
return self._compare_parent_and_branch_assets(asset_key)

@property
def branch_asset_graph(self) -> "ExternalAssetGraph":
if self._branch_asset_graph is None:
self._branch_asset_graph = check.not_none(self._branch_asset_graph_load_fn)()
return self._branch_asset_graph

@property
def base_asset_graph(self) -> Optional["ExternalAssetGraph"]:
if self._base_asset_graph is None and self._base_asset_graph_load_fn is not None:
self._base_asset_graph = self._base_asset_graph_load_fn()
return self._base_asset_graph
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import Definitions, asset


@asset
def upstream():
return 1


@asset
def downstream(upstream):
return upstream + 1


defs = Definitions(assets=[upstream, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import Definitions, asset


@asset
def upstream():
return 1


@asset
def downstream():
return 2


defs = Definitions(assets=[upstream, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster import Definitions, asset


@asset
def upstream():
return 1


@asset
def downstream(upstream):
return upstream + 1


@asset
def new_asset():
return 1


defs = Definitions(assets=[upstream, downstream, new_asset])
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster import Definitions, asset


@asset
def new_asset():
return 1


@asset
def upstream():
return 1


@asset
def downstream(upstream, new_asset):
return upstream + new_asset


defs = Definitions(assets=[new_asset, upstream, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import Definitions, asset


@asset(code_version="1")
def upstream():
return 1


@asset(code_version="1")
def downstream(upstream):
return upstream + 1


defs = Definitions(assets=[upstream, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import Definitions, asset


@asset(code_version="1")
def upstream():
return 1


@asset(code_version="2")
def downstream():
return 2


defs = Definitions(assets=[upstream, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import Definitions, asset


@asset(code_version="2")
def upstream():
return 1


@asset(code_version="1")
def downstream(upstream):
return upstream + 1


defs = Definitions(assets=[upstream, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dagster import Definitions, asset


def create_large_asset_graph():
NUM_ASSETS = 1000

all_assets = []
for i in range(NUM_ASSETS):
dep_start = i - 50 if i - 50 > 0 else 0

@asset(key=f"asset_{i}", deps=[f"asset_{j}" for j in range(dep_start, i, 5)])
def the_asset():
return

all_assets.append(the_asset)

return all_assets


defs = Definitions(create_large_asset_graph())
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dagster import Definitions, asset


def create_large_asset_graph():
NUM_ASSETS = 1000

all_assets = []
for i in range(NUM_ASSETS):
dep_start = i - 60 if i - 60 > 0 else 0

@asset(key=f"asset_{i}", deps=[f"asset_{j}" for j in range(dep_start, i, 6)])
def the_asset():
return

all_assets.append(the_asset)

return all_assets


defs = Definitions(create_large_asset_graph())
Loading