Skip to content

Commit

Permalink
[Core][deprecate run_function_on_all_workers 3/n] delete run_function…
Browse files Browse the repository at this point in the history
…_on_all_workers (ray-project#30895)

This function is deprecated. The only use of run_funcion_on_all_workers in ray core has been replaced by ray-project#31383
Delete it to make our worker prestart change simpler.
This PR depends on ray-project#31383 ray-project#31528
  • Loading branch information
scv119 committed Jun 8, 2023
1 parent a2023c4 commit 30e6b29
Show file tree
Hide file tree
Showing 13 changed files with 18 additions and 294 deletions.
33 changes: 0 additions & 33 deletions python/ray/_private/import_thread.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import logging
import threading
import traceback
from collections import defaultdict

import ray
import ray._private.profiling as profiling
from ray import JobID
from ray import cloudpickle as pickle
from ray._private import ray_constants
Expand Down Expand Up @@ -163,9 +161,6 @@ def _process_key(self, key):
# for profiling).
# with profiling.profile("register_remote_function"):
(self.worker.function_actor_manager.fetch_and_register_remote_function(key))
elif key.startswith(b"FunctionsToRun:"):
with profiling.profile("fetch_and_run_function"):
self.fetch_and_execute_function_to_run(key)
elif key.startswith(b"ActorClass:"):
# Keep track of the fact that this actor class has been
# exported so that we know it is safe to turn this worker
Expand All @@ -181,34 +176,6 @@ def _process_key(self, key):
else:
assert False, "This code should be unreachable."

def fetch_and_execute_function_to_run(self, key):
"""Run on arbitrary function on the worker."""
(job_id, serialized_function) = self._internal_kv_multiget(
key, ["job_id", "function"]
)
if self.worker.mode == ray.SCRIPT_MODE:
return

try:
# FunctionActorManager may call pickle.loads at the same time.
# Importing the same module in different threads causes deadlock.
with self.worker.function_actor_manager.lock:
# Deserialize the function.
function = pickle.loads(serialized_function)
# Run the function.
function({"worker": self.worker})
except Exception:
# If an exception was thrown when the function was run, we record
# the traceback and notify the scheduler of the failure.
traceback_str = traceback.format_exc()
# Log the error message.
ray._private.utils.push_error_to_driver(
self.worker,
ray_constants.FUNCTION_TO_RUN_PUSH_ERROR,
traceback_str,
job_id=ray.JobID(job_id),
)

def _internal_kv_multiget(self, key, fields):
vals = self.gcs_client.internal_kv_get(
key, ray_constants.KV_NAMESPACE_FUNCTION_TABLE
Expand Down
103 changes: 3 additions & 100 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import atexit
import faulthandler
import functools
import hashlib
import inspect
import io
import json
Expand Down Expand Up @@ -56,13 +55,13 @@

# Ray modules
import ray.actor
import ray.cloudpickle as pickle
import ray.cloudpickle as pickle # noqa
import ray.job_config
import ray.remote_function
from ray import ActorID, JobID, Language, ObjectRef
from ray._private import ray_option_utils
from ray._private.client_mode_hook import client_mode_hook
from ray._private.function_manager import FunctionActorManager, make_function_table_key
from ray._private.function_manager import FunctionActorManager

from ray._private.inspect_util import is_cython
from ray._private.ray_logging import (
Expand All @@ -76,7 +75,7 @@
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
from ray._private.runtime_env.setup_hook import upload_worker_setup_hook_if_needed
from ray._private.storage import _load_class
from ray._private.utils import check_oversized_function, get_ray_doc_version
from ray._private.utils import get_ray_doc_version
from ray.exceptions import ObjectStoreFullError, RayError, RaySystemError, RayTaskError
from ray.experimental.internal_kv import (
_initialize_internal_kv,
Expand Down Expand Up @@ -417,15 +416,12 @@ class Worker:
node (ray._private.node.Node): The node this worker is attached to.
mode: The mode of the worker. One of SCRIPT_MODE, LOCAL_MODE, and
WORKER_MODE.
cached_functions_to_run: A list of functions to run on all of
the workers that should be exported as soon as connect is called.
"""

def __init__(self):
"""Initialize a Worker object."""
self.node = None
self.mode = None
self.cached_functions_to_run: list = []
self.actors = {}
# When the worker is constructed. Record the original value of the
# CUDA_VISIBLE_DEVICES environment variable.
Expand Down Expand Up @@ -770,82 +766,6 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None):
debugger_breakpoint,
)

@Deprecated(
message="This function is deprecated and will be removed by Ray 2.4. "
"Please use Runtime Environments "
f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core"
"/handling-dependencies.html "
"to manage dependencies in workers.",
warning=True,
)
def run_function_on_all_workers(self, function: callable):
"""This function has been deprecated given the following issues:
- no guarantee that the function run before the remote function run.
- pubsub signal might be lost in some failure cases.
This API will be deleted once we move the working dir init away.
NO NEW CODE SHOULD USE THIS API.
Run arbitrary code on all of the workers.
This function will first be run on the driver, and then it will be
exported to all of the workers to be run. It will also be run on any
new workers that register later. If ray.init has not been called yet,
then cache the function and export it later.
Args:
function: The function to run on all of the workers. It
takes only one argument, a worker info dict. If it returns
anything, its return values will not be used.
"""
# If ray.init has not been called yet, then cache the function and
# export it when connect is called. Otherwise, run the function on all
# workers.
if self.mode is None:
self.cached_functions_to_run.append(function)
else:
# Attempt to pickle the function before we need it. This could
# fail, and it is more convenient if the failure happens before we
# actually run the function locally.
pickled_function = pickle.dumps(function)

function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE
)
key = make_function_table_key(
b"FunctionsToRun", self.current_job_id, function_to_run_id
)
# First run the function on the driver.
# We always run the task locally.
function({"worker": self})

check_oversized_function(
pickled_function, function.__name__, "function", self
)

# Run the function on all workers.
if (
self.gcs_client.internal_kv_put(
key,
pickle.dumps(
{
"job_id": self.current_job_id.binary(),
"function_id": function_to_run_id,
"function": pickled_function,
}
),
True,
ray_constants.KV_NAMESPACE_FUNCTION_TABLE,
)
!= 0
):
self.function_actor_manager.export_key(key)
# TODO(rkn): If the worker fails after it calls setnx and before it
# successfully completes the hset and rpush, then the program will
# most likely hang. This could be fixed by making these three
# operations into a transaction (or by implementing a custom
# command that does all three things).

def main_loop(self):
"""The main loop a worker runs to receive and execute tasks."""

Expand Down Expand Up @@ -2134,7 +2054,6 @@ def connect(
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
assert not worker.connected, error_message
assert worker.cached_functions_to_run is not None, error_message

# Enable nice stack traces on SIGSEGV etc.
try:
Expand Down Expand Up @@ -2360,21 +2279,6 @@ def connect(
worker.logger_thread.daemon = True
worker.logger_thread.start()

if mode == SCRIPT_MODE:
# TODO(rkn): Here we first export functions to run, then remote
# functions. The order matters. For example, one of the functions to
# run may set the Python path, which is needed to import a module used
# to define a remote function. We may want to change the order to
# simply be the order in which the exports were defined on the driver.
# In addition, we will need to retain the ability to decide what the
# first few exports are (mostly to set the Python path). Additionally,
# note that the first exports to be defined on the driver will be the
# ones defined in separate modules that are imported by the driver.
# Export cached functions_to_run.
for function in worker.cached_functions_to_run:
worker.run_function_on_all_workers(function)
worker.cached_functions_to_run = None

# Setup tracing here
tracing_hook_val = worker.gcs_client.internal_kv_get(
b"tracing_startup_hook", ray_constants.KV_NAMESPACE_TRACING
Expand Down Expand Up @@ -2422,7 +2326,6 @@ def disconnect(exiting_interpreter=False):
global_worker_stdstream_dispatcher.remove_handler("ray_print_logs")

worker.node = None # Disconnect the worker from the node.
worker.cached_functions_to_run = []
worker.serialization_context_map.clear()
try:
ray_actor = ray.actor
Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/tests/test_ecosystem.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

import numpy as np
import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -111,6 +113,7 @@ def test_to_dask_tensor_column_cast_arrow(ray_start_regular_shared):
ctx.enable_tensor_extension_casting = original


@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
def test_from_modin(ray_start_regular_shared):
import modin.pandas as mopd

Expand All @@ -123,6 +126,7 @@ def test_from_modin(ray_start_regular_shared):
assert df.equals(dfds)


@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
def test_to_modin(ray_start_regular_shared):
# create two modin dataframes
# one directly from a pandas dataframe, and
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
import sys
from typing import List, Optional

import pandas as pd
Expand Down Expand Up @@ -1021,6 +1022,7 @@ def test_from_dask_e2e(ray_start_regular_shared, enable_optimizer):
_check_usage_record(["FromDask"])


@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
@pytest.mark.parametrize("enable_pandas_block", [False, True])
def test_from_modin_operator(
ray_start_regular_shared,
Expand Down Expand Up @@ -1050,6 +1052,7 @@ def test_from_modin_operator(
ctx.enable_pandas_block = old_enable_pandas_block


@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
def test_from_modin_e2e(ray_start_regular_shared, enable_optimizer):
import modin.pandas as mopd

Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/modin/test_modin.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@

skip = not modin_installed

# These tests are written for versions of Modin that require python 3.7+
if sys.version_info < (3, 8):
# Modin requires python 3.8+
skip = True

# These tests are written for versions of Modin that require python 3.8+
pytestmark = pytest.mark.skipif(skip, reason="Outdated or missing Modin dependency")

if not skip:
Expand Down
79 changes: 0 additions & 79 deletions python/ray/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,85 +147,6 @@ def h(input_list):
ray.get([h.remote([x]), h.remote([x])])


@pytest.mark.skip(reason="Flaky tests")
def test_caching_functions_to_run(shutdown_only):
# Test that we export functions to run on all workers before the driver
# is connected.
def f(worker_info):
sys.path.append(1)

ray._private.worker.global_worker.run_function_on_all_workers(f)

def f(worker_info):
sys.path.append(2)

ray._private.worker.global_worker.run_function_on_all_workers(f)

def g(worker_info):
sys.path.append(3)

ray._private.worker.global_worker.run_function_on_all_workers(g)

def f(worker_info):
sys.path.append(4)

ray._private.worker.global_worker.run_function_on_all_workers(f)

ray.init(num_cpus=1)

@ray.remote
def get_state():
time.sleep(1)
return sys.path[-4], sys.path[-3], sys.path[-2], sys.path[-1]

res1 = get_state.remote()
res2 = get_state.remote()
assert ray.get(res1) == (1, 2, 3, 4)
assert ray.get(res2) == (1, 2, 3, 4)

# Clean up the path on the workers.
def f(worker_info):
sys.path.pop()
sys.path.pop()
sys.path.pop()
sys.path.pop()

ray._private.worker.global_worker.run_function_on_all_workers(f)


@pytest.mark.skipif(client_test_enabled(), reason="internal api")
def test_running_function_on_all_workers(ray_start_regular):
def f(worker_info):
sys.path.append("fake_directory")

ray._private.worker.global_worker.run_function_on_all_workers(f)

@ray.remote
def get_path1():
return sys.path

assert "fake_directory" == ray.get(get_path1.remote())[-1]

# the function should only run on the current driver once.
assert sys.path[-1] == "fake_directory"
if len(sys.path) > 1:
assert sys.path[-2] != "fake_directory"

def f(worker_info):
sys.path.pop(-1)

ray._private.worker.global_worker.run_function_on_all_workers(f)

# Create a second remote function to guarantee that when we call
# get_path2.remote(), the second function to run will have been run on
# the worker.
@ray.remote
def get_path2():
return sys.path

assert "fake_directory" not in ray.get(get_path2.remote())


@pytest.mark.skipif(
"RAY_PROFILING" not in os.environ, reason="Only tested in client/profiling build."
)
Expand Down
Loading

0 comments on commit 30e6b29

Please sign in to comment.