Skip to content

Commit

Permalink
in Definitions.merge, don't error on duplicate resource and logger va…
Browse files Browse the repository at this point in the history
…lues (dagster-io#22945)

## Summary & Motivation

After this PR, the following no longer errors:
```python
defs1 = Definitions(resources={"resource1": 4})
defs2 = Definitions(resources={"resource1": 4})

merged = Definitions.merge(defs1, defs2)
```

## How I Tested These Changes
  • Loading branch information
sryza committed Jul 23, 2024
1 parent 1174ea8 commit e3faf83
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -662,25 +662,25 @@ def merge(*def_sets: "Definitions") -> "Definitions":
jobs.extend(def_set.jobs or [])

for resource_key, resource_value in (def_set.resources or {}).items():
if resource_key in resources:
if resource_key in resources and resources[resource_key] is not resource_value:
raise DagsterInvariantViolationError(
f"Definitions objects {resource_key_indexes[resource_key]} and {i} both have a "
f"resource with key '{resource_key}'"
f"Definitions objects {resource_key_indexes[resource_key]} and {i} have "
f"different resources with same key '{resource_key}'"
)
resources[resource_key] = resource_value
resource_key_indexes[resource_key] = i

for logger_key, logger_value in (def_set.loggers or {}).items():
if logger_key in loggers:
if logger_key in loggers and loggers[logger_key] is not logger_value:
raise DagsterInvariantViolationError(
f"Definitions objects {logger_key_indexes[logger_key]} and {i} both have a "
f"logger with key '{logger_key}'"
f"Definitions objects {logger_key_indexes[logger_key]} and {i} have "
f"different loggers with same key '{logger_key}'"
)
loggers[logger_key] = logger_value
logger_key_indexes[logger_key] = i

if def_set.executor is not None:
if executor is not None and executor != def_set.executor:
if executor is not None and executor is not def_set.executor:
raise DagsterInvariantViolationError(
f"Definitions objects {executor_index} and {i} both have an executor"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,30 +850,54 @@ def logger2(_):

def test_resource_conflict_on_merge():
defs1 = Definitions(resources={"resource1": 4})
defs2 = Definitions(resources={"resource1": 4})
defs2 = Definitions(resources={"resource1": 5})

with pytest.raises(
DagsterInvariantViolationError,
match="Definitions objects 0 and 1 both have a resource with key 'resource1'",
match="Definitions objects 0 and 1 have different resources with same key 'resource1'",
):
Definitions.merge(defs1, defs2)


def test_resource_conflict_on_merge_same_value():
defs1 = Definitions(resources={"resource1": 4})
defs2 = Definitions(resources={"resource1": 4})

merged = Definitions.merge(defs1, defs2)
assert merged.resources == {"resource1": 4}


def test_logger_conflict_on_merge():
@logger
def logger1(_):
raise Exception("not executed")

@logger
def logger2(_):
raise Exception("also not executed")

defs1 = Definitions(loggers={"logger1": logger1})
defs2 = Definitions(loggers={"logger1": logger1})
defs2 = Definitions(loggers={"logger1": logger2})

with pytest.raises(
DagsterInvariantViolationError,
match="Definitions objects 0 and 1 both have a logger with key 'logger1'",
match="Definitions objects 0 and 1 have different loggers with same key 'logger1'",
):
Definitions.merge(defs1, defs2)


def test_logger_conflict_on_merge_same_vlaue():
@logger
def logger1(_):
raise Exception("not executed")

defs1 = Definitions(loggers={"logger1": logger1})
defs2 = Definitions(loggers={"logger1": logger1})

merged = Definitions.merge(defs1, defs2)
assert merged.loggers == {"logger1": logger1}


def test_executor_conflict_on_merge():
defs1 = Definitions(executor=in_process_executor)
defs2 = Definitions(executor=multiprocess_executor)
Expand All @@ -884,6 +908,13 @@ def test_executor_conflict_on_merge():
Definitions.merge(defs1, defs2)


def test_executor_conflict_on_merge_same_value():
defs1 = Definitions(executor=in_process_executor)
defs2 = Definitions(executor=in_process_executor)

assert Definitions.merge(defs1, defs2).executor == in_process_executor


def test_get_all_asset_specs():
@asset(tags={"foo": "fooval"})
def asset1(): ...
Expand Down

0 comments on commit e3faf83

Please sign in to comment.