Skip to content

Commit

Permalink
Re-merge large function def, skipping test failing on Windows (ray-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Jul 20, 2021
1 parent fe9a6b6 commit fabba96
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Install Ray with: ``pip install ray``. For nightly wheels, see the
.. _`Dask`: https://docs.ray.io/en/master/dask-on-ray.html
.. _`Horovod`: https://horovod.readthedocs.io/en/stable/ray_include.html
.. _`Scikit-learn`: joblib.html

.. _`Datasets`: https://docs.ray.io/en/master/data/dataset.html


Quick Start
Expand Down
5 changes: 5 additions & 0 deletions doc/source/data/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ Datasets can read and write in parallel to `custom datasources <package-ref.html
# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)
Tensor-typed values
-------------------

Currently Datasets does not have native support for tensor-typed values in records (e.g., TFRecord / Petastorm format / multi-dimensional arrays). This is planned for development.

Pipelining data processing and ML computations
----------------------------------------------

Expand Down
14 changes: 7 additions & 7 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ray import cloudpickle as pickle
from ray._raylet import PythonFunctionDescriptor
from ray._private.utils import (
check_oversized_pickle,
check_oversized_function,
decode,
ensure_str,
format_error_message,
Expand Down Expand Up @@ -136,9 +136,9 @@ def export(self, remote_function):
function = remote_function._function
pickled_function = pickle.dumps(function)

check_oversized_pickle(pickled_function,
remote_function._function_name,
"remote function", self._worker)
check_oversized_function(pickled_function,
remote_function._function_name,
"remote function", self._worker)
key = (b"RemoteFunction:" + self._worker.current_job_id.binary() + b":"
+ remote_function._function_descriptor.function_id.binary())
self._worker.redis_client.hset(
Expand Down Expand Up @@ -367,9 +367,9 @@ def export_actor_class(self, Class, actor_creation_function_descriptor,
"actor_method_names": json.dumps(list(actor_method_names))
}

check_oversized_pickle(actor_class_info["class"],
actor_class_info["class_name"], "actor",
self._worker)
check_oversized_function(actor_class_info["class"],
actor_class_info["class_name"], "actor",
self._worker)

self._publish_actor_class_to_key(key, actor_class_info)
# TODO(rkn): Currently we allow actor classes to be defined
Expand Down
41 changes: 26 additions & 15 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,29 +599,40 @@ def get_shared_memory_bytes():
return shm_avail


def check_oversized_pickle(pickled, name, obj_type, worker):
"""Send a warning message if the pickled object is too large.
def check_oversized_function(pickled, name, obj_type, worker):
"""Send a warning message if the pickled function is too large.
Args:
pickled: the pickled object.
pickled: the pickled function.
name: name of the pickled object.
obj_type: type of the pickled object, can be 'function',
'remote function', 'actor', or 'object'.
'remote function', or 'actor'.
worker: the worker used to send warning message.
"""
length = len(pickled)
if length <= ray_constants.PICKLE_OBJECT_WARNING_SIZE:
if length <= ray_constants.FUNCTION_SIZE_WARN_THRESHOLD:
return
warning_message = (
"Warning: The {} {} has size {} when pickled. "
"It will be stored in Redis, which could cause memory issues. "
"This may mean that its definition uses a large array or other object."
).format(obj_type, name, length)
push_error_to_driver(
worker,
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR,
warning_message,
job_id=worker.current_job_id)
elif length < ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD:
warning_message = (
"The {} {} is very large ({} MiB). "
"Check that its definition is not implicitly capturing a large "
"array or other object in scope. Tip: use ray.put() to put large "
"objects in the Ray object store.").format(obj_type, name,
length // (1024 * 1024))
push_error_to_driver(
worker,
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR,
"Warning: " + warning_message,
job_id=worker.current_job_id)
else:
error = (
"The {} {} is too large ({} MiB > FUNCTION_SIZE_ERROR_THRESHOLD={}"
" MiB). Check that its definition is not implicitly capturing a "
"large array or other object in scope. Tip: use ray.put() to "
"put large objects in the Ray object store.").format(
obj_type, name, length // (1024 * 1024),
ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD // (1024 * 1024))
raise ValueError(error)


def is_main_thread():
Expand Down
4 changes: 3 additions & 1 deletion python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def env_bool(key, default):

# If a remote function or actor (or some other export) has serialized size
# greater than this quantity, print an warning.
PICKLE_OBJECT_WARNING_SIZE = 10**7
FUNCTION_SIZE_WARN_THRESHOLD = 10**7
FUNCTION_SIZE_ERROR_THRESHOLD = env_integer("FUNCTION_SIZE_ERROR_THRESHOLD",
(10**8))

# If remote functions with the same source are imported this many times, then
# print a warning.
Expand Down
20 changes: 20 additions & 0 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,26 @@ def test_function(fn, remote_fn):
ray.get(remote_test_function.remote(local_method, actor_method))


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows")
def test_oversized_function(ray_start_shared_local_modes):
bar = np.zeros(100 * 1024 * 1024)

@ray.remote
class Actor:
def foo(self):
return len(bar)

@ray.remote
def f():
return len(bar)

with pytest.raises(ValueError):
f.remote()

with pytest.raises(ValueError):
Actor.remote()


def test_args_stars_after(ray_start_shared_local_modes):
def star_args_after(a="hello", b="heo", *args, **kwargs):
return a, b, args, kwargs
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ def test_export_large_objects(ray_start_regular, error_pubsub):
p = error_pubsub
import ray.ray_constants as ray_constants

large_object = np.zeros(2 * ray_constants.PICKLE_OBJECT_WARNING_SIZE)
large_object = np.zeros(
2 * ray_constants.FUNCTION_SIZE_WARN_THRESHOLD, dtype=np.uint8)

@ray.remote
def f():
Expand Down
6 changes: 3 additions & 3 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from ray._private.function_manager import FunctionActorManager
from ray._private.ray_logging import setup_logger
from ray._private.ray_logging import global_worker_stdstream_dispatcher
from ray._private.utils import check_oversized_pickle
from ray._private.utils import check_oversized_function
from ray.util.inspect import is_cython
from ray.experimental.internal_kv import _internal_kv_get, \
_internal_kv_initialized
Expand Down Expand Up @@ -392,8 +392,8 @@ def run_function_on_all_workers(self, function,
# we don't need to export it again.
return

check_oversized_pickle(pickled_function, function.__name__,
"function", self)
check_oversized_function(pickled_function, function.__name__,
"function", self)

# Run the function on all workers.
self.redis_client.hset(
Expand Down

0 comments on commit fabba96

Please sign in to comment.