Skip to content

Commit

Permalink
make Definitions a record (#23119)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This reflects its post-#22909
role as a "dumb" data class.

## How I Tested These Changes
  • Loading branch information
sryza committed Jul 23, 2024
1 parent a722f24 commit f00450d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 91 deletions.
128 changes: 52 additions & 76 deletions python_modules/dagster/dagster/_core/definitions/definitions_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Optional,
Sequence,
Type,
TypedDict,
Union,
)

Expand All @@ -28,6 +27,7 @@
from dagster._core.execution.with_resources import with_resources
from dagster._core.executor.base import Executor
from dagster._core.instance import DagsterInstance
from dagster._record import IHaveNew, record_custom
from dagster._utils.cached_method import cached_method

from .assets import AssetsDefinition, SourceAsset
Expand Down Expand Up @@ -253,7 +253,7 @@ def _create_repository_using_definitions_args(
resources: Optional[Mapping[str, Any]] = None,
executor: Optional[Union[ExecutorDefinition, Executor]] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None,
asset_checks: Optional[Iterable[AssetsDefinition]] = None,
) -> Union[RepositoryDefinition, PendingRepositoryDefinition]:
# First, dedupe all definition types.
sensors = dedupe_object_refs(sensors)
Expand Down Expand Up @@ -323,20 +323,8 @@ class BindResourcesToJobs(list):
"""


class DefinitionsArgs(TypedDict):
assets: Optional[Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]]
schedules: Optional[
Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
]
sensors: Optional[Iterable[SensorDefinition]]
jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]]
resources: Optional[Mapping[str, Any]]
executor: Optional[Union[ExecutorDefinition, Executor]]
loggers: Optional[Mapping[str, LoggerDefinition]]
asset_checks: Optional[Iterable[AssetChecksDefinition]]


class Definitions:
@record_custom
class Definitions(IHaveNew):
"""A set of definitions explicitly available and loadable by Dagster tools.
Parameters:
Expand Down Expand Up @@ -425,10 +413,24 @@ class Definitions:
Any other object is coerced to a :py:class:`ResourceDefinition`.
"""

_original_args: DefinitionsArgs

def __init__(
self,
assets: Optional[Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]] = (
None
)
schedules: Optional[
Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
] = None
sensors: Optional[Iterable[SensorDefinition]] = None
jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None
resources: Optional[Mapping[str, Any]] = None
executor: Optional[Union[ExecutorDefinition, Executor]] = None
loggers: Optional[Mapping[str, LoggerDefinition]] = None
# There's a bug that means that sometimes it's Dagster's fault when AssetsDefinitions are
# passed here instead of AssetChecksDefinitions: https://github.com/dagster-io/dagster/issues/22064.
# After we fix the bug, we should remove AssetsDefinition from the set of accepted types.
asset_checks: Optional[Iterable[AssetsDefinition]] = None

def __new__(
cls,
assets: Optional[
Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]
] = None,
Expand All @@ -440,45 +442,19 @@ def __init__(
resources: Optional[Mapping[str, Any]] = None,
executor: Optional[Union[ExecutorDefinition, Executor]] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None,
asset_checks: Optional[Iterable[AssetsDefinition]] = None,
):
self._original_args = {
"assets": check.opt_iterable_param(
assets,
"assets",
(AssetsDefinition, SourceAsset, CacheableAssetsDefinition),
),
"schedules": check.opt_iterable_param(
schedules,
"schedules",
(ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition),
),
"sensors": check.opt_iterable_param(sensors, "sensors", SensorDefinition),
"jobs": check.opt_iterable_param(
jobs, "jobs", (JobDefinition, UnresolvedAssetJobDefinition)
),
"resources": check.opt_mapping_param(resources, "resources", key_type=str),
"executor": check.opt_inst_param(executor, "executor", (ExecutorDefinition, Executor)),
"loggers": check.opt_mapping_param(
loggers, "loggers", key_type=str, value_type=LoggerDefinition
),
# There's a bug that means that sometimes it's Dagster's fault when AssetsDefinitions are
# passed here instead of AssetChecksDefinitions: https://github.com/dagster-io/dagster/issues/22064.
# After we fix the bug, we should remove AssetsDefinition from the set of accepted types.
"asset_checks": check.opt_iterable_param(
asset_checks, "asset_checks", (AssetChecksDefinition, AssetsDefinition)
),
}

@property
def original_args(self) -> DefinitionsArgs:
return self._original_args

@property
def assets(
self,
) -> Optional[Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]]:
return self._original_args["assets"]
return super().__new__(
cls,
assets=assets,
schedules=schedules,
sensors=sensors,
jobs=jobs,
resources=resources,
executor=executor,
loggers=loggers,
asset_checks=asset_checks,
)

@public
def get_job_def(self, name: str) -> JobDefinition:
Expand Down Expand Up @@ -619,14 +595,14 @@ def get_inner_repository(
"""
return _create_repository_using_definitions_args(
name=SINGLETON_REPOSITORY_NAME,
assets=self.original_args["assets"],
schedules=self.original_args["schedules"],
sensors=self.original_args["sensors"],
jobs=self.original_args["jobs"],
resources=self.original_args["resources"],
executor=self.original_args["executor"],
loggers=self.original_args["loggers"],
asset_checks=self.original_args["asset_checks"],
assets=self.assets,
schedules=self.schedules,
sensors=self.sensors,
jobs=self.jobs,
resources=self.resources,
executor=self.executor,
loggers=self.loggers,
asset_checks=self.asset_checks,
)

def get_asset_graph(self) -> AssetGraph:
Expand Down Expand Up @@ -679,13 +655,13 @@ def merge(*def_sets: "Definitions") -> "Definitions":
executor_index: Optional[int] = None

for i, def_set in enumerate(def_sets):
assets.extend(def_set.original_args["assets"] or [])
asset_checks.extend(def_set.original_args["asset_checks"] or [])
schedules.extend(def_set.original_args["schedules"] or [])
sensors.extend(def_set.original_args["sensors"] or [])
jobs.extend(def_set.original_args["jobs"] or [])
assets.extend(def_set.assets or [])
asset_checks.extend(def_set.asset_checks or [])
schedules.extend(def_set.schedules or [])
sensors.extend(def_set.sensors or [])
jobs.extend(def_set.jobs or [])

for resource_key, resource_value in (def_set.original_args["resources"] or {}).items():
for resource_key, resource_value in (def_set.resources or {}).items():
if resource_key in resources:
raise DagsterInvariantViolationError(
f"Definitions objects {resource_key_indexes[resource_key]} and {i} both have a "
Expand All @@ -694,7 +670,7 @@ def merge(*def_sets: "Definitions") -> "Definitions":
resources[resource_key] = resource_value
resource_key_indexes[resource_key] = i

for logger_key, logger_value in (def_set.original_args["loggers"] or {}).items():
for logger_key, logger_value in (def_set.loggers or {}).items():
if logger_key in loggers:
raise DagsterInvariantViolationError(
f"Definitions objects {logger_key_indexes[logger_key]} and {i} both have a "
Expand All @@ -703,13 +679,13 @@ def merge(*def_sets: "Definitions") -> "Definitions":
loggers[logger_key] = logger_value
logger_key_indexes[logger_key] = i

if def_set.original_args["executor"] is not None:
if executor is not None and executor != def_set.original_args["executor"]:
if def_set.executor is not None:
if executor is not None and executor != def_set.executor:
raise DagsterInvariantViolationError(
f"Definitions objects {executor_index} and {i} both have an executor"
)

executor = def_set.original_args["executor"]
executor = def_set.executor
executor_index = i

return Definitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,16 +836,16 @@ def logger2(_):
)

merged = Definitions.merge(defs1, defs2)
assert merged.original_args == {
"assets": [asset1, asset2],
"jobs": [job1, job2],
"schedules": [schedule1, schedule2],
"sensors": [sensor1, sensor2],
"resources": {"resource1": resource1, "resource2": resource2},
"loggers": {"logger1": logger1, "logger2": logger2},
"executor": in_process_executor,
"asset_checks": [],
}
assert merged == Definitions(
assets=[asset1, asset2],
jobs=[job1, job2],
schedules=[schedule1, schedule2],
sensors=[sensor1, sensor2],
resources={"resource1": resource1, "resource2": resource2},
loggers={"logger1": logger1, "logger2": logger2},
executor=in_process_executor,
asset_checks=[],
)


def test_resource_conflict_on_merge():
Expand Down Expand Up @@ -1022,8 +1022,8 @@ def the_schedule():
assert len(list(underlying_repo.schedule_defs)) == 1

# properties on the definitions object do not dedupe
assert len(defs.original_args["assets"]) == 2
assert len(defs.original_args["asset_checks"]) == 2
assert len(defs.original_args["jobs"]) == 2
assert len(defs.original_args["sensors"]) == 2
assert len(defs.original_args["schedules"]) == 2
assert len(defs.assets) == 2
assert len(defs.asset_checks) == 2
assert len(defs.jobs) == 2
assert len(defs.sensors) == 2
assert len(defs.schedules) == 2

0 comments on commit f00450d

Please sign in to comment.