Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[runtime env] URI reference refactor #22828

Merged
merged 27 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add comments
  • Loading branch information
SongGuyang committed Mar 8, 2022
commit d8fee9e14468e2562113706b21120fb2917b0a6a
7 changes: 6 additions & 1 deletion dashboard/modules/runtime_env/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(self, dashboard_agent):
# invalidate the env cache when a URI is deleted.
# This is a temporary mechanism until we have per-URI caching.
self._uris_to_envs: Dict[str, Set[str]] = defaultdict(set)

# The URI reference table which is used for GC. When the reference count is
# decreased to zero, the URI should be removed from this table and added to
# cache if needed.
self._uri_reference: Dict[str, int] = dict()
# Initialize internal KV to be used by the working_dir setup code.
_initialize_internal_kv(self._dashboard_agent.gcs_client)
Expand Down Expand Up @@ -165,6 +167,8 @@ def decrease_reference_for_uris(self, uris: list):
self._logger.info(f"Unused uris {unused_uris}.")
return unused_uris

# Don't change URI reference for `client_server` because `client_server` doesn't
# send the `DecreaseRuntimeEnvReference` RPC when the client exits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for catching this! I think the only downside of this for the user is that when they connect with ray.init() with Ray Client, it will install the runtime environment, and then when the user runs their first task it will install the exact same runtime environment again, which could be slow.

I think the downside is not too severe.

reference_exclude_sources: Set[str] = {
"client_server",
}
Expand Down Expand Up @@ -325,6 +329,7 @@ def setup_plugins():
self._logger.info(f"Sleeping for {SLEEP_FOR_TESTING_S}s.")
time.sleep(int(SLEEP_FOR_TESTING_S))

self._logger.info(f"Creating runtime env: {serialized_env}")
runtime_env_context: RuntimeEnvContext = None
error_message = None
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
Expand Down
39 changes: 0 additions & 39 deletions python/ray/util/client/server/proxier.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class SpecificServer:
port: int
process_handle_future: futures.Future
channel: "grpc._channel.Channel"
# serialized_runtime_env: str

def is_ready(self) -> bool:
"""Check if the server is ready or not (doesn't block)."""
Expand Down Expand Up @@ -96,10 +95,6 @@ def set_result(self, proc: Optional[ProcessInfo]) -> None:
if not self.is_ready():
self.process_handle_future.set_result(proc)

# def set_serialized_runtime_env(self, serialized_runtime_env) -> None:
# """Set serialized runtime env."""
# self.serialized_runtime_env = serialized_runtime_env


def _match_running_client_server(command: List[str]) -> bool:
"""
Expand Down Expand Up @@ -290,30 +285,6 @@ def _increase_runtime_env_reference(
f"IncreaseRuntimeEnvReference request failed after {max_retries} attempts."
)

def _decrease_runtime_env_reference(
self, serialized_runtime_env: str, specific_server: SpecificServer
):
"""Decrease the runtime_env reference by sending an RPC to the agent."""
logger.info(
f"Decreasing runtime env reference for "
f"ray_client_server_{specific_server.port}."
f"Serialized runtime env is {serialized_runtime_env}."
)
decrease_request = runtime_env_agent_pb2.DecreaseRuntimeEnvReferenceRequest(
serialized_runtime_env=serialized_runtime_env
)

r = self._runtime_env_stub.DecreaseRuntimeEnvReferenceRequest(decrease_request)
if r.status == agent_manager_pb2.AgentRpcStatus.AGENT_RPC_STATUS_OK:
return
elif r.status == agent_manager_pb2.AgentRpcStatus.AGENT_RPC_STATUS_FAILED:
raise RuntimeError(
"Failed to decrease runtime_env reference for Ray client "
f"server, it is caused by:\n{r.error_message}"
)
else:
assert False, f"Unknown status: {r.status}."

def start_specific_server(self, client_id: str, job_config: JobConfig) -> bool:
"""
Start up a RayClient Server for an incoming client to
Expand Down Expand Up @@ -421,12 +392,6 @@ def _check_processes(self):
f"Specific server {client_id} is no longer running"
f", freeing its port {specific_server.port}"
)
# logger.info(f"serialized_env {specific_server}")
# serialized_env = specific_server.serialized_runtime_env
# if serialized_env is not None:
# self._decrease_runtime_env_reference(
# specific_server.serialized_runtime_env,
# specific_server)
del self.servers[client_id]
# Port is available to use again.
self._free_ports.append(specific_server.port)
Expand All @@ -439,10 +404,6 @@ def _cleanup(self) -> None:
for platforms where fate sharing is not supported.
"""
for server in self.servers.values():
# if server.serialized_runtime_env:
# self._decrease_runtime_env_reference(
# server.serialized_runtime_env,
# server)
server.kill()


Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/agent_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class AgentManager : public rpc::AgentManagerServiceHandler {
/// \param[in] job_id The job id which the runtime env belongs to.
/// \param[in] serialized_runtime_env The serialized runtime environment.
/// \param[in] serialized_allocated_resource_instances The serialized allocated resource
/// instances. \param[in] callback The callback function.
/// instances.
/// \param[in] callback The callback function.
virtual void IncreaseRuntimeEnvReference(
const JobID &job_id, const std::string &serialized_runtime_env,
const std::string &serialized_allocated_resource_instances,
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/scheduling/cluster_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ void ClusterResourceManager::AddOrUpdateNode(

void ClusterResourceManager::AddOrUpdateNode(scheduling::NodeID node_id,
const NodeResources &node_resources) {
// RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id.ToInt()
// << ", node_resources: " << node_resources.DebugString();
RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id.ToInt()
<< ", node_resources: " << node_resources.DebugString();
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// This node is new, so add it to the map.
Expand Down
7 changes: 4 additions & 3 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,

if (task_spec.HasRuntimeEnv()) {
// create runtime env.
RAY_LOG(DEBUG) << "[dedicated] IncreaseRuntimeEnvReference for task "
RAY_LOG(DEBUG) << "[dedicated] Creating runtime env for task "
<< task_spec.TaskId();
IncreaseRuntimeEnvReference(
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
Expand Down Expand Up @@ -1197,7 +1197,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
// Start a new worker process.
if (task_spec.HasRuntimeEnv()) {
// create runtime env.
RAY_LOG(DEBUG) << "IncreaseRuntimeEnvReference for task " << task_spec.TaskId();
RAY_LOG(DEBUG) << "Creating runtime env for task " << task_spec.TaskId();
IncreaseRuntimeEnvReference(
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
[this, start_worker_process_fn, callback, &state, task_spec](
Expand Down Expand Up @@ -1510,7 +1510,7 @@ void WorkerPool::IncreaseRuntimeEnvReference(
const std::string &serialized_runtime_env, const JobID &job_id,
const IncreaseRuntimeEnvReferenceCallback &callback,
const std::string &serialized_allocated_resource_instances) {
// create runtime env.
RAY_LOG(DEBUG) << "IncreaseRuntimeEnvReference " << serialized_runtime_env;
agent_manager_->IncreaseRuntimeEnvReference(
job_id, serialized_runtime_env, serialized_allocated_resource_instances,
[job_id, serialized_runtime_env = std::move(serialized_runtime_env), callback](
Expand All @@ -1528,6 +1528,7 @@ void WorkerPool::IncreaseRuntimeEnvReference(
}

void WorkerPool::DecreaseRuntimeEnvReference(const std::string &serialized_runtime_env) {
RAY_LOG(DEBUG) << "DecreaseRuntimeEnvReference " << serialized_runtime_env;
if (RuntimeEnvNotEmpty(serialized_runtime_env)) {
agent_manager_->DecreaseRuntimeEnvReference(
serialized_runtime_env, [serialized_runtime_env](bool success) {
Expand Down