Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[runtime env] URI reference refactor #22828

Merged
merged 27 commits into from
Mar 21, 2022

Conversation

SongGuyang
Copy link
Contributor

@SongGuyang SongGuyang commented Mar 4, 2022

Why are these changes needed?

Future works

  • We don't remove the RuntimeEnvUris from RuntimeEnv protobuf in current PR because gcs also uses those URIs to do GC by runtime_env_manager. We should also clear this.
  • Ray client server shouldn't interact with agent directly. Or Ray client server should also decrease the reference count.
  • Currently, WorkerPool::HandleJobStarted will be called multiple times for one job. So we should make sure this function is idempotent. Can we change this logic and make this function be called only once?

Related issue number

#21695

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@SongGuyang
Copy link
Contributor Author

@edoakes @architkulkarni @rkooo567 This PR is not completed yet. But I have one question needed to discuss first. I saw that we also create runtime env in client server, but it doesn't delete the URI by RPC. Do we will keep this logic in future? Maybe there will be some corner cases which bring URI leakage(for example, client-server dies before the client job started) . I'm not sure the real effect.
Another thought, can we setup the environment of client-server by the way like job submission? If we can start client-server in an actor, we can avoid proxier communicating with agent.

@rkooo567
Copy link
Contributor

rkooo567 commented Mar 4, 2022

I believe that code path is a hack and we should remove it!

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 7, 2022
@SongGuyang SongGuyang changed the title [WIP][runtime env] URI reference refactor [runtime env] URI reference refactor Mar 8, 2022
@SongGuyang
Copy link
Contributor Author

@architkulkarni @edoakes @rkooo567 This PR is ready to review! I have updated some issues in the Future works part of the description. I'd like to discuss about this.

@SongGuyang SongGuyang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 8, 2022
Copy link
Contributor

@architkulkarni architkulkarni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just some minor comments and questions.

  • Regarding the HandleJobStarted future work item in the PR description, I don't know if it's possible to make it only be called once per job -- @wuisawesome maybe you have some thoughts about this?

  • I'm not too familiar with the logic of num_registered_workers and num_starting_workers so it's hard for me to review that part carefully. @SongGuyang maybe you could summarize the change, or someone else could review that part?

  • It seems like the worker pool is now calling AddURIReference, which creates the runtime env as a side effect. The name might be a little confusing. Shouldn't these methods be called CreateRuntimeEnvIfNeeded and DeleteRuntimeEnvIfNeeded, and internally the agent adjusts the refcounts if needed? I don't feel strongly about this though, I'm fine with leaving it.

return unused_uris

# Don't change URI reference for `client_server` because `client_server` doesn't
# send the `DecreaseRuntimeEnvReference` RPC when the client exits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for catching this! I think the only downside of this for the user is that when they connect with ray.init() with Ray Client, it will install the runtime environment, and then when the user runs their first task it will install the exact same runtime environment again, which could be slow.

I think the downside is not too severe.

async def _setup_runtime_env(
serialized_runtime_env, serialized_allocated_resource_instances
runtime_env, serialized_runtime_env, serialized_allocated_resource_instances
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems slightly odd to have both serialized_runtime_env and runtime_env args; do we need both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add runtime_env here because I don't want to deserialize twice. Just an optimization for performance.

if r.status == agent_manager_pb2.AgentRpcStatus.AGENT_RPC_STATUS_OK:
# specific_server.set_serialized_runtime_env(serialized_runtime_env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this comment?

// Kill all the workers that have been started but not registered.
for (const auto &starting_worker : entry.second.starting_worker_processes) {
procs_to_kill.insert(starting_worker.second.proc);
// Kill all the worker processes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave the NOTE in, or does it no longer apply?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it applies. Here we kill all the worker processes when the worker pool is deconstructed. I didn't modify this logic. But I have redefine starting_worker_processes to worker_processes which means that we will store the alive processes all the way. So we can kill all the processes by this.

@@ -596,28 +579,35 @@ void WorkerPool::MarkPortAsFree(int port) {
}
}

static bool RuntimeEnvNotEmpty(const std::string &serialized_runtime_env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently have IsRuntimeEnvEmpty() in runtime_env_common.h, does it make sense to use that one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good! I will reuse yours.

@@ -746,7 +734,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
auto &state = GetStateForLanguage(driver->GetLanguage());
state.registered_drivers.insert(std::move(driver));
const auto job_id = driver->GetAssignedJobId();
all_jobs_[job_id] = job_config;
HandleJobStarted(job_id, job_config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much context here, do you know why HandleJobStarted wasn't called here previously? Was this a bug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Archit's comment. Either way, can we get a unit test to ensure the ref counting behavior is correct and stays correct?

Copy link
Contributor Author

@SongGuyang SongGuyang Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandleJobStarted only do two things: First, add job to all_jobs_ . Second, install runtime env eagerly if needed.

Before current PR:
HandleJobStarted will be triggered by the publishing channel of job. So here, in RegisterDriver, it's ok to only add job to all_jobs_.

In current PR:
To guarantee the creating runtime env RPC only be called once, I have modified the logic of HandleJobStarted. HandleJobStarted will check if the job already been added. So, I should also use HandleJobStarted here to make sure the creating runtime env RPC could be called.

@@ -461,11 +461,13 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
};

/// Some basic information about the starting worker process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Some basic information about the starting worker process.
/// Some basic information about the worker process.

/// | | |
/// +------------------------------------------------------+
///
/// Now, we can delete the runtime env resources safely.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there still value in keeping this? (Maybe moving it above the Increase/DecreaseURIReference methods)? It's true that the actual adding and deleting of the references is now happening in the agent, but the AddURIReference and DeleteURIReference calls are still coming from this file. So the information here is still accurate and useful, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can add a doc to Increase/DecreaseURIReference methods. But it will be more simple which is entire different from this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SongGuyang
Copy link
Contributor Author

@rkooo567 please help to review the part of worker_pool?

@SongGuyang
Copy link
Contributor Author

  • It seems like the worker pool is now calling AddURIReference, which creates the runtime env as a side effect. The name might be a little confusing. Shouldn't these methods be called CreateRuntimeEnvIfNeeded and DeleteRuntimeEnvIfNeeded, and internally the agent adjusts the refcounts if needed? I don't feel strongly about this though, I'm fine with leaving it.

@edoakes @rkooo567 what do you think about the RPC name? CreateRuntimeEnvIfNeeded and DeleteRuntimeEnvIfNeeded seem also good for me, but there is no meaning of reference count. I'm a little entangled with the names. 😅

@rkooo567
Copy link
Contributor

rkooo567 commented Mar 9, 2022

I think it is okay not to expose ref count to worker pool, so “ifNeeded” sounds good to me?

@rkooo567
Copy link
Contributor

rkooo567 commented Mar 9, 2022

And I will review it today!

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the initial pass! One of concerns I have is that gRPC requests are usually unordered afaik, and this could cause issues? (If Delete request comes before Create). Do you know if this is possible when the caller is always the same?

PIP = 3
CONDA = 4

def get_uris_from_runtime_env(self, runtime_env: RuntimeEnv):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just put all of them to a single class? I think it could be

class URIReferenceTable:
    def add_uris(self, runtime_env):
        uris = self. get_uris_from_runtime_env(runtime_env)
        self. increase_reference_for_uris(uris)

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also expose some gRPC endpoints to query ref count information in the future.

try:
serialized_env = request.serialized_runtime_env
runtime_env = RuntimeEnv.deserialize(serialized_env)
uris = self.get_uris_from_runtime_env(runtime_env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put these two lines out of the try/except? They seem to be unrelated to the except block (it basically says it fails to parse runtime env?)

uris = self.get_uris_from_runtime_env(runtime_env)
if request.source_process not in self.reference_exclude_sources:
    self.increase_reference_for_uris(uris)

So something like

        try:
            serialized_env = request.serialized_runtime_env
            runtime_env = RuntimeEnv.deserialize(serialized_env)
        except Exception as e:
            self._logger.exception(
                "[Increase] Failed to parse runtime env: " f"{serialized_env}"
            )
            return runtime_env_agent_pb2.CreateRuntimeEnvIfNeededReply(
                status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
                error_message="".join(
                    traceback.format_exception(type(e), e, e.__traceback__)
                ),
            )

        uris = self.get_uris_from_runtime_env(runtime_env)
        if request.source_process not in self.reference_exclude_sources:
            self.increase_reference_for_uris(uris)

@@ -243,7 +318,9 @@ def setup_plugins():
"Runtime env already failed. "
f"Env: {serialized_env}, err: {error_message}"
)
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
if request.source_process not in self.reference_exclude_sources:
self.decrease_reference_for_uris(uris)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The python code with try/except seems to make it difficult to make correct atomic ref counting. Why don't we refactor code in this way?

increase_ref_count(runtime_env)
status = create_runtime_env(runtime_env)
if status != success:
    decrease_ref_count(runtime_env)

reply(status)
    


try:
runtime_env = RuntimeEnv.deserialize(request.serialized_runtime_env)
uris = self.get_uris_from_runtime_env(runtime_env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above?

@@ -197,24 +186,19 @@ void WorkerPool::update_worker_startup_token_counter() {
worker_startup_token_counter_ += 1;
}

void WorkerPool::AddStartingWorkerProcess(
void WorkerPool::AddWorkerProcess(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for the name change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before current PR, we will delete the process when the worker finishes registration.
In current PR, I change this logic. The process will be stored all the way before the worker exits.

///
/// `CreateRuntimeEnvIfNeeded` means increasing the reference count for the runtime env
/// and `DeleteRuntimeEnvIfNeeded` means decreasing the reference count. We increase or
/// decrease runtime env reference in the cases below:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention this happens from the agent now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual ref counting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (worker_state.starting_worker_processes.count(worker_startup_token) > 0) {
auto it = worker_state.worker_processes.find(worker_startup_token);
if (it != worker_state.worker_processes.end() &&
it->second.num_starting_workers != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can num_starting_workers be negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can't. But I also could change this code to it->second.num_starting_workers > 0.

auto &state = GetStateForLanguage(worker->GetLanguage());
auto it = state.worker_processes.find(worker->GetStartupToken());
if (it != state.worker_processes.end()) {
it->second.num_registered_workers--;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it impossible the worker disconnected was a "starting workers"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this code could be problematic if that's the case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think this could happen by low probability. I have deleted this field and add a field alive_started_workers to record the started workers.

CreateRuntimeEnv(
RAY_LOG(DEBUG) << "[dedicated] Creating runtime env for task "
<< task_spec.TaskId();
CreateRuntimeEnvIfNeeded(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why don't we call it GetOrCreateRuntimeEnv?

if (it != state.worker_processes.end()) {
it->second.num_registered_workers--;
if (it->second.num_registered_workers == 0 && it->second.num_starting_workers == 0) {
DeleteRuntimeEnvIfNeeded(it->second.runtime_env_info.serialized_runtime_env());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this API, my impression is the right name is

DeleteRuntimeEnvIfPossible

or

MarkRuntimeEnvUnused

Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateRuntimeEnvOrGet and DeleteRuntimeEnvIfPossible? I like symmetrical names. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm imo these two has a slightly different semantics to have symmetric names (create is get Or create , and delete it Delete or do nothing). But it is up to you.

@SongGuyang
Copy link
Contributor Author

SongGuyang commented Mar 10, 2022

I had the initial pass! One of concerns I have is that gRPC requests are usually unordered afaik, and this could cause issues? (If Delete request comes before Create). Do you know if this is possible when the caller is always the same?

TCP can guarantee the order, I'm not sure if grpc local buffer will break this(I guess it won't). But I think there is no consistency issue even though the Delete request comes before Create. If the Delete request comes first, agent will mark the runtime env unused and put it into the cache. When Create request comes, if the cache is not evicted, the runtime env will be marked used. Otherwise, agent will setup the runtime env again. If will bring extra workload, but not serious.

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 10, 2022
@SongGuyang
Copy link
Contributor Author

@rkooo567 Take a look again? We will merge today if no new comments.


def __init__(
self,
uris_parser: Callable[[RuntimeEnv], None],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this makes me think all logic here should be just within RuntimeEnvAgent class. Having a separate class like this requires you to pass callbacks, which are weird.

Can you just remove this class and make them a private method of RuntimeEnvAgent?

class RuntimeEnvAgent:
    def _increase_ref_count():
    def _decrease_ref_count():

self._unused_uris_callback = unused_uris_callback
self._unused_runtime_env_callback = unused_runtime_env_callback
# Don't change URI reference for `client_server` because `client_server` doesn't
# send the `DeleteRuntimeEnvIfPossible` RPC when the client exits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also comment that here? Othewise, this part will be very confusing when somebody looks at it

@@ -216,7 +343,67 @@ def setup_plugins():

return context

serialized_env = request.serialized_runtime_env
# Create runtime env with retry times. This function won't raise exceptions.
# Returns a tuple which contains result(bool), runtime env context(str), and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you follow the proper Python-style docstring (https://docs.ray.io/en/master/ray-contribute/getting-involved.html#code-style)? Also, consider adding type annotation t

        async def _create_runtime_env_with_retry(
            runtime_env: RuntimeEnv, 
            serialized_runtime_env: str, 
            serialized_allocated_resource_instances: str
        ) -> Tuple[X, Y, Z]:
        """
         Blah blah
         
         Args:
         Returns:

        """

@rkooo567
Copy link
Contributor

^ last 3 comments !

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 17, 2022
@SongGuyang
Copy link
Contributor Author

@rkooo567 Do you remember that you comment here #22828 (comment) to ask me using a single class? I think the current class is clear and the callback is reasonable. If you insist, I will add a TODO here and change it in future PR.
btw, Only left one day for me to merge this PR. Can you approve first? I don't think the remaining comment issues are big.

@rkooo567
Copy link
Contributor

@rkooo567 Do you remember that you comment here #22828 (comment) to ask me using a single class?

Yeah, I remember that. However, when I made a suggestion, I didn't know this would require the callback arguments to modify the parent's states. I don't agree this is reasonable & clear because you don't have any unit test on this class (so there's not much meaning having this additional layer) & this sort of callback is generally harder to read and used to avoid poor class structure.

I really think we should avoid merging PRs just because we want to merge it before branch cut. This PR doesn't fix any existing user issues or critical bugs. There's no urgent features that have a hard dependency on it. I really don't understand why you rush on this. This is how we accumulate lots of tech debt to the codebase...

If you really believe the new class is more reasonable, then go ahead. But please at least address these two comments.

#22828 (comment)
#22828 (comment)

@SongGuyang
Copy link
Contributor Author

I have pushed new commit here alipay@57f1bbc but current PR don't change.

@rkooo567
Copy link
Contributor

#22828 (comment) is this a bug? I heard from some people that Github has some outage now https://www.githubstatus.com/

@SongGuyang
Copy link
Contributor Author

Oh, no😭

@rkooo567
Copy link
Contributor

If it takes time to resolve it, I can probably just merge it and you can create a follow up PR for the fix

@SongGuyang
Copy link
Contributor Author

SongGuyang commented Mar 17, 2022

@rkooo567 I really appreciate for your careful review. This help me a lot to enhance my code. Respect for your pay out. But I don't think I'm producing tech debt. I also try to address or feed back all the comments carefully, right? If my PR don't meet the requirement of merging, no problem, I will improve.

About the class, I think the benefit is that it could make the responsibility clearly. ReferenceTable only maintains the reference count and RuntimeEnvAgent handles others. If you think I lost the unit test, I'm ok to add.

@SongGuyang
Copy link
Contributor Author

If it takes time to resolve it, I can probably just merge it and you can create a follow up PR for the fix

Not so urgent. I will wait github recovered tomorrow. I believe it will not invalid more than one day. 😁

@SongGuyang
Copy link
Contributor Author

Add a reference table test b80289e.

@SongGuyang SongGuyang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 18, 2022
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it has conflicts?

Conflicts:
	dashboard/modules/runtime_env/runtime_env_agent.py
	python/ray/tests/test_runtime_env_2.py
	python/ray/util/client/server/proxier.py
	src/ray/protobuf/runtime_env_agent.proto
	src/ray/raylet/agent_manager.cc
	src/ray/raylet/worker_pool.cc
@SongGuyang
Copy link
Contributor Author

Looks like it has conflicts?

Yep, I merged master just now. Let's merge this if no tests broken.

@architkulkarni architkulkarni added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Mar 18, 2022
@architkulkarni
Copy link
Contributor

serve:test_cli flaky on master, Mac tests stalled

@SongGuyang
Copy link
Contributor Author

@rkooo567 Is time to merge?

@edoakes edoakes merged commit 69af976 into ray-project:master Mar 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants