-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][experimental] Fix bug when propagating DAG application exceptions #45237
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@@ -251,7 +251,7 @@ def get_actor_id(self) -> Optional[str]: | |||
""" | |||
# only worker mode has actor_id | |||
if self.worker.mode != ray._private.worker.WORKER_MODE: | |||
logger.warning( | |||
logger.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also changed this to DEBUG because it's very spammy when the driver calls this method. It seems fine to me if the driver calls it, not sure why this should be a warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks!
Signed-off-by: Stephanie Wang <[email protected]>
@@ -251,7 +251,7 @@ def get_actor_id(self) -> Optional[str]: | |||
""" | |||
# only worker mode has actor_id | |||
if self.worker.mode != ray._private.worker.WORKER_MODE: | |||
logger.warning( | |||
logger.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks!
return True | ||
except Exception as exc: | ||
# Previous task raised an application-level exception. | ||
# Propagate it and skip the actual task. | ||
output_writer.write(exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this not need _wrap_exception()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is because the exception has already been wrapped by the original task that errored. Will add a comment!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. It's wrapped in a RayTaskError, which is itself an exception. So exc
is an instance of RayTaskError here.
self.count = 0 | ||
|
||
def _fail_if_needed(self): | ||
if self.fail_after and self.count > self.fail_after: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor preference: self.count >= self.fail_after (because self.count starts at 0)
output_channels.end_read() | ||
|
||
with pytest.raises(RuntimeError): | ||
for i in range(99): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why loop 99 times? Shouldn't this already throw an exception on the first iteration because fail_after is set to 100?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah the failures are randomized.
except ValueError as exc: | ||
# ValueError is raised if a type hint was set and the returned | ||
# type did not match the hint. | ||
except IOError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: should we also string match the error message, or is this certain IOError == channel closed in this case? (it could be wrong if we have different close API?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm true, we should probably introduce a different Ray system error instead of using IOError. I'll add an issue to track this.
Signed-off-by: Stephanie Wang <[email protected]>
…ons (ray-project#45237) If a task in the DAG raised an application-level exception, we would re-raise correctly if it was read directly by the driver, but not if it was read by another actor in the DAG. This PR fixes the issue by writing the exception to the next actor. --------- Signed-off-by: Stephanie Wang <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
…ons (ray-project#45237) If a task in the DAG raised an application-level exception, we would re-raise correctly if it was read directly by the driver, but not if it was read by another actor in the DAG. This PR fixes the issue by writing the exception to the next actor. --------- Signed-off-by: Stephanie Wang <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]>
…ons (ray-project#45237) If a task in the DAG raised an application-level exception, we would re-raise correctly if it was read directly by the driver, but not if it was read by another actor in the DAG. This PR fixes the issue by writing the exception to the next actor. --------- Signed-off-by: Stephanie Wang <[email protected]>
…ons (ray-project#45237) If a task in the DAG raised an application-level exception, we would re-raise correctly if it was read directly by the driver, but not if it was read by another actor in the DAG. This PR fixes the issue by writing the exception to the next actor. --------- Signed-off-by: Stephanie Wang <[email protected]> Signed-off-by: gchurch <[email protected]>
Why are these changes needed?
If a task in the DAG raised an application-level exception, we would re-raise correctly if it was read directly by the driver, but not if it was read by another actor in the DAG. This PR fixes the issue by writing the exception to the next actor.