Skip to content

Commit

Permalink
use record for Definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jul 22, 2024
1 parent 7b92746 commit 746232c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Optional,
Sequence,
Type,
TypedDict,
Union,
)

Expand All @@ -28,7 +27,7 @@
from dagster._core.execution.with_resources import with_resources
from dagster._core.executor.base import Executor
from dagster._core.instance import DagsterInstance
from dagster._record import record
from dagster._record import IHaveNew, record_custom
from dagster._utils.cached_method import cached_method

from .assets import AssetsDefinition, SourceAsset
Expand Down Expand Up @@ -254,7 +253,7 @@ def _create_repository_using_definitions_args(
resources: Optional[Mapping[str, Any]] = None,
executor: Optional[Union[ExecutorDefinition, Executor]] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None,
asset_checks: Optional[Iterable[AssetsDefinition]] = None,
) -> Union[RepositoryDefinition, PendingRepositoryDefinition]:
# First, dedupe all definition types.
sensors = dedupe_object_refs(sensors)
Expand Down Expand Up @@ -324,21 +323,8 @@ class BindResourcesToJobs(list):
"""


class DefinitionsArgs(TypedDict):
assets: Optional[Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]]
schedules: Optional[
Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
]
sensors: Optional[Iterable[SensorDefinition]]
jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]]
resources: Optional[Mapping[str, Any]]
executor: Optional[Union[ExecutorDefinition, Executor]]
loggers: Optional[Mapping[str, LoggerDefinition]]
asset_checks: Optional[Iterable[AssetChecksDefinition]]


@record
class Definitions:
@record_custom
class Definitions(IHaveNew):
"""A set of definitions explicitly available and loadable by Dagster tools.
Parameters:
Expand Down Expand Up @@ -441,7 +427,34 @@ class Definitions:
# There's a bug that means that sometimes it's Dagster's fault when AssetsDefinitions are
# passed here instead of AssetChecksDefinitions: https://github.com/dagster-io/dagster/issues/22064.
# After we fix the bug, we should remove AssetsDefinition from the set of accepted types.
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None
asset_checks: Optional[Iterable[AssetsDefinition]] = None

def __new__(
cls,
assets: Optional[
Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]
] = None,
schedules: Optional[
Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
] = None,
sensors: Optional[Iterable[SensorDefinition]] = None,
jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None,
resources: Optional[Mapping[str, Any]] = None,
executor: Optional[Union[ExecutorDefinition, Executor]] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
asset_checks: Optional[Iterable[AssetsDefinition]] = None,
):
return super().__new__(
cls,
assets=assets,
schedules=schedules,
sensors=sensors,
jobs=jobs,
resources=resources,
executor=executor,
loggers=loggers,
asset_checks=asset_checks,
)

@public
def get_job_def(self, name: str) -> JobDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,16 +836,16 @@ def logger2(_):
)

merged = Definitions.merge(defs1, defs2)
assert merged.original_args == {
"assets": [asset1, asset2],
"jobs": [job1, job2],
"schedules": [schedule1, schedule2],
"sensors": [sensor1, sensor2],
"resources": {"resource1": resource1, "resource2": resource2},
"loggers": {"logger1": logger1, "logger2": logger2},
"executor": in_process_executor,
"asset_checks": [],
}
assert merged == Definitions(
assets=[asset1, asset2],
jobs=[job1, job2],
schedules=[schedule1, schedule2],
sensors=[sensor1, sensor2],
resources={"resource1": resource1, "resource2": resource2},
loggers={"logger1": logger1, "logger2": logger2},
executor=in_process_executor,
asset_checks=[],
)


def test_resource_conflict_on_merge():
Expand Down Expand Up @@ -1022,8 +1022,8 @@ def the_schedule():
assert len(list(underlying_repo.schedule_defs)) == 1

# properties on the definitions object do not dedupe
assert len(defs.original_args["assets"]) == 2
assert len(defs.original_args["asset_checks"]) == 2
assert len(defs.original_args["jobs"]) == 2
assert len(defs.original_args["sensors"]) == 2
assert len(defs.original_args["schedules"]) == 2
assert len(defs.assets) == 2
assert len(defs.asset_checks) == 2
assert len(defs.jobs) == 2
assert len(defs.sensors) == 2
assert len(defs.schedules) == 2

0 comments on commit 746232c

Please sign in to comment.