-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[workflow] Fast workflow indexing #24767
Conversation
Sorry for the late review and thanks for working on this! Overall, I'm good about the protocol. My concern is that this protocol enforces readers to be single thread as well which seems not to work in the current system. One way is to ensure make read work without fixing the half updates and we only fix this when there is a write. |
let me try to implement another protocol using key creation time. I think it might address the multi-threading issues. |
@iycheng ready for reviewing again |
def cancel(workflow_id: str) -> None: | ||
try: | ||
workflow_manager = get_management_actor() | ||
ray.get(workflow_manager.cancel_workflow.remote(workflow_id)) | ||
except ValueError: | ||
wf_store = workflow_storage.get_workflow_storage(workflow_id) | ||
wf_store.save_workflow_meta(WorkflowMetaData(WorkflowStatus.CANCELED)) | ||
# TODO(suquark): Here we update workflow status "offline", so it is likely | ||
# thread-safe because there is no workflow management actor updating the | ||
# workflow concurrently. But we should be careful if we are going to | ||
# update more workflow status offline in the future. | ||
wf_store.update_workflow_status(WorkflowStatus.CANCELED) | ||
return | ||
ray.get(workflow_manager.cancel_workflow.remote(workflow_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move ray.get
back to try block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I think we put ray.get
in try block accidentally. ray.get
cannot generate ValueError
, only get_management_actor
generates ValueError
return WorkflowStatus(metadata["status"]) | ||
return WorkflowStatus.NONE | ||
|
||
def list_workflow(self) -> List[Tuple[str, WorkflowStatus]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an extra param here list_workflow(self, status=None)
Here if status is set, we'll only check the dirty directory and the specified status directory. I'm ok with another PR to fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just implemented it.
if s != prev_status: | ||
self._storage.delete( | ||
self._key_workflow_with_status(workflow_id, s) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to update this PR. But do you think in the future we can put the status in the dirty flag and only delete that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
It seems there is a bug here: we should always set up the flag I think. Another thing is that, I think we need to put status filter into storage layer so that we don't need to read successful workflow status which is not useful for resume_all/list_all with filter. I'm OK with this PR and having another one for this optimization. |
@iycheng I think we did not set the dirty flag because in that branch we already detected the dirty flag. Since the workflow status updating is single threaded, there is no need to create it again. (Create it again also does not work under concurrent case, because another faster process could delete the newly created flag anyway - the order of create/delete of different processes could be arbitrary). |
Got it and thanks for the explaination. |
@iycheng I just updated the PR and support status filter. I also fixed a bug: in the original |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
4fca233
to
7171aa1
Compare
CI failures seem unrelated. I'll merge this PR. |
Why are these changes needed?
This PR enables indexing for workflow status. So it would be much faster to list workflows and status.
The indexing is done by creating keys under corresponding status directories. For example,
RUNNING
directory contains all keys (named with workflow ids), which the corresponding workflow is running.One issue is that the cluster / workflow maybe crashed while updating the status, this would result in inconsistency status, because we have to create the new key, delete the old key and update workflow metadata, these actions cannot be combined as a single atomic operation. We use a special directory marking the status updating is underway. This makes us possible to detect unfinished status updating and fixing them. (See examples in newly added tests).
Checks
scripts/format.sh
to lint the changes in this PR.