Skip to content

Commit

Permalink
fix backfill stalling if first tick does not successfully submit runs (
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria authored May 3, 2024
1 parent a9f82c2 commit 957cc2e
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 4 deletions.
18 changes: 14 additions & 4 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
latest_storage_id=latest_storage_id,
)

def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)

def is_complete(self) -> bool:
"""The asset backfill is complete when all runs to be requested have finished (success,
failure, or cancellation). Since the AssetBackfillData object stores materialization states
Expand Down Expand Up @@ -727,10 +730,13 @@ def _submit_runs_and_update_backfill_in_chunks(
if retryable_error_raised:
# Code server became unavailable mid-backfill. Rewind the cursor back to the cursor
# from the previous iteration, to allow next iteration to reevaluate the same
# events.
# events. If the previous iteration had not requested the target roots, this will also
# ensure the next iteration requests the target roots
backfill_data_with_submitted_runs = (
backfill_data_with_submitted_runs.with_latest_storage_id(
previous_asset_backfill_data.latest_storage_id
).with_requested_runs_for_target_roots(
previous_asset_backfill_data.requested_runs_for_target_roots
)
)

Expand Down Expand Up @@ -1190,9 +1196,13 @@ def execute_asset_backfill_iteration_inner(
initial_candidates: Set[AssetKeyPartitionKey] = set()
request_roots = not asset_backfill_data.requested_runs_for_target_roots
if request_roots:
initial_candidates.update(
asset_backfill_data.get_target_root_asset_partitions(instance_queryer)
)
target_roots = asset_backfill_data.get_target_root_asset_partitions(instance_queryer)
# Because the code server may have failed while requesting roots, some roots may have
# already been requested. Checking here will reduce the amount of BFS work later in the iteration.
not_yet_requested = [
root for root in target_roots if root not in asset_backfill_data.requested_subset
]
initial_candidates.update(not_yet_requested)

yield None

Expand Down
192 changes: 192 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ def reusable():

partitions_d = StaticPartitionsDefinition(["foo_d"])

partitions_f = StaticPartitionsDefinition(["foo_f", "bar_f"])
partitions_g = StaticPartitionsDefinition(["foo_g", "bar_g"])


@asset(partitions_def=partitions_a)
def asset_a():
Expand Down Expand Up @@ -291,6 +294,23 @@ def asset_e(asset_a):
pass


@asset(partitions_def=partitions_f)
def asset_f():
pass


@asset(
partitions_def=partitions_g,
ins={
"asset_f": AssetIn(
partition_mapping=StaticPartitionMapping({"foo_f": "foo_g", "bar_f": "bar_g"})
)
},
)
def asset_g(asset_f):
pass


daily_partitions_def = DailyPartitionsDefinition("2023-01-01")


Expand Down Expand Up @@ -348,6 +368,8 @@ def the_repo():
daily_1,
daily_2,
asset_e,
asset_f,
asset_g,
asset_with_single_run_backfill_policy,
asset_with_multi_run_backfill_policy,
]
Expand Down Expand Up @@ -1327,6 +1349,176 @@ def raise_code_unreachable_error_on_second_call(*args, **kwargs):
)


def test_asset_backfill_first_iteration_code_location_unreachable_error_no_runs_submitted(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
# tests that we can recover from unreachable code location error during the first tick when
# we are requesting the root assets

asset_selection = [AssetKey("asset_a"), AssetKey("asset_e")]
asset_graph = workspace_context.create_request_context().asset_graph

num_partitions = 1
target_partitions = partitions_a.get_partition_keys()[0:num_partitions]
backfill_id = "backfill_with_roots"
backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags={},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=target_partitions,
dynamic_partitions_store=instance,
all_partitions=False,
)
instance.add_backfill(backfill)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

# The following backfill iteration will attempt to submit run requests for asset_a's partition.
# The call will raise a DagsterUserCodeUnreachableError and no runs will be submitted

def raise_code_unreachable_error(*args, **kwargs):
raise DagsterUserCodeUnreachableError()

with mock.patch(
"dagster._core.execution.submit_asset_runs._get_job_execution_data_from_run_request",
side_effect=raise_code_unreachable_error,
):
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)

assert instance.get_runs_count() == 0
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 0
)
assert not updated_backfill.asset_backfill_data.requested_runs_for_target_roots

# Execute backfill iteration again, confirming that the partition for asset_a is requested again
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
# Assert that one run is submitted
assert instance.get_runs_count() == 1

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 1
)


def test_asset_backfill_first_iteration_code_location_unreachable_error_some_runs_submitted(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
# tests that we can recover from unreachable code location error during the first tick when
# we are requesting the root assets
from dagster._core.execution.submit_asset_runs import _get_job_execution_data_from_run_request

asset_selection = [AssetKey("asset_f"), AssetKey("asset_g")]
asset_graph = workspace_context.create_request_context().asset_graph

num_partitions = 2
target_partitions = partitions_f.get_partition_keys()[0:num_partitions]
backfill_id = "backfill_with_roots_multiple_partitions"
backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags={},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=target_partitions,
dynamic_partitions_store=instance,
all_partitions=False,
)
instance.add_backfill(backfill)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

# The following backfill iteration will attempt to submit run requests for asset_f's two partitions.
# The first call to _get_job_execution_data_from_run_request will succeed, but the second call will
# raise a DagsterUserCodeUnreachableError. Subsequently only the first partition will be successfully
# submitted.
counter = 0

def raise_code_unreachable_error_on_second_call(*args, **kwargs):
nonlocal counter
if counter == 0:
counter += 1
return _get_job_execution_data_from_run_request(*args, **kwargs)
elif counter == 1:
counter += 1
raise DagsterUserCodeUnreachableError()
else:
# Should not attempt to create a run for the third partition if the second
# errored with DagsterUserCodeUnreachableError
raise Exception("Should not reach")

with mock.patch(
"dagster._core.execution.submit_asset_runs._get_job_execution_data_from_run_request",
side_effect=raise_code_unreachable_error_on_second_call,
):
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)

assert instance.get_runs_count() == 1
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 1
)
assert not updated_backfill.asset_backfill_data.requested_runs_for_target_roots

# Execute backfill iteration again, confirming that the remaining partition for asset_f is requested again
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
# Assert that one run is submitted
assert instance.get_runs_count() == 2

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 2
)


def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
Expand Down

0 comments on commit 957cc2e

Please sign in to comment.