Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[workflow] Fast workflow indexing #24767

Merged
merged 12 commits into from
May 25, 2022
Merged

Conversation

suquark
Copy link
Member

@suquark suquark commented May 13, 2022

Why are these changes needed?

This PR enables indexing for workflow status. So it would be much faster to list workflows and status.

The indexing is done by creating keys under corresponding status directories. For example, RUNNING directory contains all keys (named with workflow ids), which the corresponding workflow is running.

One issue is that the cluster / workflow maybe crashed while updating the status, this would result in inconsistency status, because we have to create the new key, delete the old key and update workflow metadata, these actions cannot be combined as a single atomic operation. We use a special directory marking the status updating is underway. This makes us possible to detect unfinished status updating and fixing them. (See examples in newly added tests).

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@suquark suquark marked this pull request as ready for review May 13, 2022 19:53
@suquark suquark mentioned this pull request May 14, 2022
15 tasks
@suquark suquark requested a review from fishbone May 16, 2022 22:53
@suquark suquark requested a review from fishbone May 17, 2022 00:14
@fishbone
Copy link
Contributor

Sorry for the late review and thanks for working on this!

Overall, I'm good about the protocol. My concern is that this protocol enforces readers to be single thread as well which seems not to work in the current system.

One way is to ensure make read work without fixing the half updates and we only fix this when there is a write.

@fishbone fishbone added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 17, 2022
@suquark
Copy link
Member Author

suquark commented May 17, 2022

let me try to implement another protocol using key creation time. I think it might address the multi-threading issues.

@suquark suquark removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 20, 2022
@suquark suquark requested a review from fishbone May 20, 2022 00:33
@suquark
Copy link
Member Author

suquark commented May 20, 2022

@iycheng ready for reviewing again

Comment on lines 127 to +138
def cancel(workflow_id: str) -> None:
try:
workflow_manager = get_management_actor()
ray.get(workflow_manager.cancel_workflow.remote(workflow_id))
except ValueError:
wf_store = workflow_storage.get_workflow_storage(workflow_id)
wf_store.save_workflow_meta(WorkflowMetaData(WorkflowStatus.CANCELED))
# TODO(suquark): Here we update workflow status "offline", so it is likely
# thread-safe because there is no workflow management actor updating the
# workflow concurrently. But we should be careful if we are going to
# update more workflow status offline in the future.
wf_store.update_workflow_status(WorkflowStatus.CANCELED)
return
ray.get(workflow_manager.cancel_workflow.remote(workflow_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move ray.get back to try block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I think we put ray.get in try block accidentally. ray.get cannot generate ValueError, only get_management_actor generates ValueError

return WorkflowStatus(metadata["status"])
return WorkflowStatus.NONE

def list_workflow(self) -> List[Tuple[str, WorkflowStatus]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an extra param here list_workflow(self, status=None)

Here if status is set, we'll only check the dirty directory and the specified status directory. I'm ok with another PR to fix this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just implemented it.

Comment on lines +144 to +147
if s != prev_status:
self._storage.delete(
self._key_workflow_with_status(workflow_id, s)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to update this PR. But do you think in the future we can put the status in the dirty flag and only delete that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

@fishbone
Copy link
Contributor

fishbone commented May 21, 2022

It seems there is a bug here: we should always set up the flag I think.

Another thing is that, I think we need to put status filter into storage layer so that we don't need to read successful workflow status which is not useful for resume_all/list_all with filter. I'm OK with this PR and having another one for this optimization.

@fishbone fishbone added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 21, 2022
@suquark
Copy link
Member Author

suquark commented May 21, 2022

@iycheng I think we did not set the dirty flag because in that branch we already detected the dirty flag. Since the workflow status updating is single threaded, there is no need to create it again. (Create it again also does not work under concurrent case, because another faster process could delete the newly created flag anyway - the order of create/delete of different processes could be arbitrary).

@fishbone
Copy link
Contributor

Got it and thanks for the explaination.

@suquark suquark requested a review from fishbone May 23, 2022 18:40
@suquark suquark removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 23, 2022
@suquark
Copy link
Member Author

suquark commented May 23, 2022

@iycheng I just updated the PR and support status filter. I also fixed a bug: in the original workflow.api.list_all(), status_filter = set(WorkflowStatus.__members__.keys()) returns a set of strings instead of a set of workflow status. It is clearly against the behaviors of other branches in the function, and this bug reveals when I implement status filter in the storage layer.

Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@fishbone fishbone added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 23, 2022
@suquark
Copy link
Member Author

suquark commented May 25, 2022

CI failures seem unrelated. I'll merge this PR.

@suquark suquark merged commit f67871c into ray-project:master May 25, 2022
@suquark suquark deleted the workflow_indexing branch May 25, 2022 03:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants