Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix backfill stalling if first tick does not successfully submit runs #21454

Merged
merged 3 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
latest_storage_id=latest_storage_id,
)

def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)

def is_complete(self) -> bool:
"""The asset backfill is complete when all runs to be requested have finished (success,
failure, or cancellation). Since the AssetBackfillData object stores materialization states
Expand Down Expand Up @@ -727,10 +730,13 @@ def _submit_runs_and_update_backfill_in_chunks(
if retryable_error_raised:
# Code server became unavailable mid-backfill. Rewind the cursor back to the cursor
# from the previous iteration, to allow next iteration to reevaluate the same
# events.
# events. If the previous iteration had not requested the target roots, this will also
# ensure the next iteration requests the target roots
backfill_data_with_submitted_runs = (
backfill_data_with_submitted_runs.with_latest_storage_id(
previous_asset_backfill_data.latest_storage_id
).with_requested_runs_for_target_roots(
previous_asset_backfill_data.requested_runs_for_target_roots
)
)

Expand Down Expand Up @@ -1190,9 +1196,13 @@ def execute_asset_backfill_iteration_inner(
initial_candidates: Set[AssetKeyPartitionKey] = set()
request_roots = not asset_backfill_data.requested_runs_for_target_roots
if request_roots:
initial_candidates.update(
asset_backfill_data.get_target_root_asset_partitions(instance_queryer)
)
target_roots = asset_backfill_data.get_target_root_asset_partitions(instance_queryer)
# Because the code server may have failed while requesting roots, some roots may have
# already been requested. Checking here will reduce the amount of BFS work later in the iteration.
not_yet_requested = [
root for root in target_roots if root not in asset_backfill_data.requested_subset
]
initial_candidates.update(not_yet_requested)

yield None

Expand Down
192 changes: 192 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ def reusable():

partitions_d = StaticPartitionsDefinition(["foo_d"])

partitions_f = StaticPartitionsDefinition(["foo_f", "bar_f"])
partitions_g = StaticPartitionsDefinition(["foo_g", "bar_g"])


@asset(partitions_def=partitions_a)
def asset_a():
Expand Down Expand Up @@ -291,6 +294,23 @@ def asset_e(asset_a):
pass


@asset(partitions_def=partitions_f)
def asset_f():
pass


@asset(
partitions_def=partitions_g,
ins={
"asset_f": AssetIn(
partition_mapping=StaticPartitionMapping({"foo_f": "foo_g", "bar_f": "bar_g"})
)
},
)
def asset_g(asset_f):
pass


daily_partitions_def = DailyPartitionsDefinition("2023-01-01")


Expand Down Expand Up @@ -348,6 +368,8 @@ def the_repo():
daily_1,
daily_2,
asset_e,
asset_f,
asset_g,
asset_with_single_run_backfill_policy,
asset_with_multi_run_backfill_policy,
]
Expand Down Expand Up @@ -1327,6 +1349,176 @@ def raise_code_unreachable_error_on_second_call(*args, **kwargs):
)


def test_asset_backfill_first_iteration_code_location_unreachable_error_no_runs_submitted(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
# tests that we can recover from unreachable code location error during the first tick when
# we are requesting the root assets

asset_selection = [AssetKey("asset_a"), AssetKey("asset_e")]
asset_graph = workspace_context.create_request_context().asset_graph

num_partitions = 1
target_partitions = partitions_a.get_partition_keys()[0:num_partitions]
backfill_id = "backfill_with_roots"
backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags={},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=target_partitions,
dynamic_partitions_store=instance,
all_partitions=False,
)
instance.add_backfill(backfill)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

# The following backfill iteration will attempt to submit run requests for asset_a's partition.
# The call will raise a DagsterUserCodeUnreachableError and no runs will be submitted

def raise_code_unreachable_error(*args, **kwargs):
raise DagsterUserCodeUnreachableError()

with mock.patch(
"dagster._core.execution.submit_asset_runs._get_job_execution_data_from_run_request",
side_effect=raise_code_unreachable_error,
):
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)

assert instance.get_runs_count() == 0
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 0
)
assert not updated_backfill.asset_backfill_data.requested_runs_for_target_roots

# Execute backfill iteration again, confirming that the partition for asset_a is requested again
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
# Assert that one run is submitted
assert instance.get_runs_count() == 1

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 1
)


def test_asset_backfill_first_iteration_code_location_unreachable_error_some_runs_submitted(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
# tests that we can recover from unreachable code location error during the first tick when
# we are requesting the root assets
from dagster._core.execution.submit_asset_runs import _get_job_execution_data_from_run_request

asset_selection = [AssetKey("asset_f"), AssetKey("asset_g")]
asset_graph = workspace_context.create_request_context().asset_graph

num_partitions = 2
target_partitions = partitions_f.get_partition_keys()[0:num_partitions]
backfill_id = "backfill_with_roots_multiple_partitions"
backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags={},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=target_partitions,
dynamic_partitions_store=instance,
all_partitions=False,
)
instance.add_backfill(backfill)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

# The following backfill iteration will attempt to submit run requests for asset_f's two partitions.
# The first call to _get_job_execution_data_from_run_request will succeed, but the second call will
# raise a DagsterUserCodeUnreachableError. Subsequently only the first partition will be successfully
# submitted.
counter = 0

def raise_code_unreachable_error_on_second_call(*args, **kwargs):
nonlocal counter
if counter == 0:
counter += 1
return _get_job_execution_data_from_run_request(*args, **kwargs)
elif counter == 1:
counter += 1
raise DagsterUserCodeUnreachableError()
else:
# Should not attempt to create a run for the third partition if the second
# errored with DagsterUserCodeUnreachableError
raise Exception("Should not reach")

with mock.patch(
"dagster._core.execution.submit_asset_runs._get_job_execution_data_from_run_request",
side_effect=raise_code_unreachable_error_on_second_call,
):
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)

assert instance.get_runs_count() == 1
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 1
)
assert not updated_backfill.asset_backfill_data.requested_runs_for_target_roots

# Execute backfill iteration again, confirming that the remaining partition for asset_f is requested again
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
# Assert that one run is submitted
assert instance.get_runs_count() == 2

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 2
)


def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
Expand Down