Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into turn-on-lwr
Browse files Browse the repository at this point in the history
  • Loading branch information
fishbone committed Feb 22, 2023
2 parents b999fda + 7592e5a commit a7098c4
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 45 deletions.
10 changes: 10 additions & 0 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,16 @@
- sudo service mongodb stop
- sudo apt-get purge -y mongodb*

- label: "[unstable] Dataset tests (streaming executor)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=7.* ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --action_env=RAY_DATASET_USE_STREAMING_EXECUTOR=1 --build_tests_only --test_tag_filters=-dataset_integration python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --action_env=RAY_DATASET_USE_STREAMING_EXECUTOR=1 --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Dataset tests (Arrow nightly)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/config_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {

if (!FLAGS_ray_code_search_path.CurrentValue().empty()) {
// Code search path like this "/path1/xxx.so:/path2".
RAY_LOG(DEBUG) << "The code search path is "
<< FLAGS_ray_code_search_path.CurrentValue();
code_search_path = absl::StrSplit(
FLAGS_ray_code_search_path.CurrentValue(), ':', absl::SkipEmpty());
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/ray/util/function_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include <boost/range/iterator_range.hpp>
#include <memory>

#include "ray/common/constants.h"
#include "ray/util/logging.h"
#include "util.h"

namespace ray {
namespace internal {
Expand Down Expand Up @@ -151,6 +153,7 @@ void FunctionHelper::LoadFunctionsFromPaths(const std::vector<std::string> &path
}
}

RAY_LOG(INFO) << std::string(kLibraryPathEnvName) << ": " << getLibraryPathEnv();
// Try to load all found libraries.
for (auto lib : dynamic_libraries) {
LoadDll(lib);
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/ray/util/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>

#include "ray/common/constants.h"
#include "ray/util/logging.h"

namespace ray {
Expand All @@ -43,5 +44,14 @@ std::string GetNodeIpAddress(const std::string &address) {
return "";
}
}

std::string getLibraryPathEnv() {
auto path_env_p = std::getenv(kLibraryPathEnvName);
if (path_env_p != nullptr && strlen(path_env_p) != 0) {
return std::string(path_env_p);
}
return {};
}

} // namespace internal
} // namespace ray
3 changes: 3 additions & 0 deletions cpp/src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ namespace internal {
/// you care about.
/// \return The IP address by which the local node can be reached from the address.
std::string GetNodeIpAddress(const std::string &address = "8.8.8.8:53");

std::string getLibraryPathEnv();

} // namespace internal
} // namespace ray
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _get_or_refresh_resource_limits(self) -> ExecutionResources:
)

def _get_current_usage(self, topology: Topology) -> ExecutionResources:
cur_usage = ExecutionResources()
cur_usage = ExecutionResources(0, 0, 0)
for op, state in topology.items():
cur_usage = cur_usage.add(op.current_resource_usage())
if isinstance(op, InputDataBuffer):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_bulk_lazy_eval_split_mode(shutdown_only, block_split, tmp_path):

@pytest.mark.parametrize("pipelined", [False, True])
def test_basic_actors(shutdown_only, pipelined):
ray.init(num_cpus=2)
ray.init(num_cpus=6)
n = 5
ds = ray.data.range(n)
ds = maybe_pipeline(ds, pipelined)
Expand Down Expand Up @@ -152,7 +152,7 @@ def run():


def test_callable_classes(shutdown_only):
ray.init(num_cpus=1)
ray.init(num_cpus=2)
ds = ray.data.range(10, parallelism=10)

class StatefulFn:
Expand Down
5 changes: 4 additions & 1 deletion python/ray/data/tests/test_dataset_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,16 @@ def get_relative_path(path: str) -> str:
"image-datasets/simple/image3.jpg",
]

def test_e2e_prediction(self, ray_start_regular_shared):
def test_e2e_prediction(self, shutdown_only):
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor

from torchvision import transforms
from torchvision.models import resnet18

ray.shutdown()
ray.init(num_cpus=2)

dataset = ray.data.read_images("example:https://image-datasets/simple")
transform = transforms.ToTensor()

Expand Down
69 changes: 40 additions & 29 deletions python/ray/data/tests/test_dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@
from pytest_lazyfixture import lazy_fixture


def check_num_computed(ds, expected, streaming_expected) -> None:
# When streaming executor is on, the _num_computed() is affected only
# by the ds.schema() which will still partial read the blocks, but will
# not affected by operations like take() as it's executed via streaming
# executor.
if not ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert ds._plan.execute()._num_computed() == expected
else:
assert ds._plan.execute()._num_computed() == streaming_expected


@pytest.mark.parametrize(
"fs,data_path",
[
Expand Down Expand Up @@ -122,11 +133,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
ds = ray.data.read_parquet(data_path, filesystem=fs)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)
input_files = ds.input_files()
assert len(input_files) == 2, input_files
assert "test1.parquet" in str(input_files)
Expand All @@ -139,11 +150,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
values = [[s["one"], s["two"]] for s in ds.take_all()]
check_num_computed(ds, 2, 1)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -201,7 +212,7 @@ def prefetch_file_metadata(self, pieces):
)

# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -217,11 +228,11 @@ def prefetch_file_metadata(self, pieces):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -278,7 +289,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
assert ds._meta_count() is None

# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -294,11 +305,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand All @@ -319,7 +330,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -368,7 +379,7 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path
assert ds._meta_count() is None

# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -384,11 +395,11 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 2)
assert sorted(values) == [
[1, "a"],
[2, "b"],
Expand Down Expand Up @@ -427,7 +438,7 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
ds = ray.data.read_parquet(data_path, filesystem=fs)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -443,11 +454,11 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
"schema={two: string, "
"one: dictionary<values=int32, indices=int32, ordered=0>})"
), ds
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 1)
assert sorted(values) == [
[1, "a"],
[1, "b"],
Expand Down Expand Up @@ -479,7 +490,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path
)

values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 0)
assert sorted(values) == [[1, "a"], [1, "a"]]

# 2 partitions, 1 empty partition, 2 block/read tasks, 1 empty block
Expand All @@ -489,7 +500,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path
)

values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
assert sorted(values) == [[1, "a"], [1, "a"]]


Expand All @@ -513,7 +524,7 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path):
)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
Expand All @@ -527,11 +538,11 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path):
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={two: string, one: int32})"
), ds
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 1)
assert sorted(values) == [
[1, "a"],
[1, "b"],
Expand Down Expand Up @@ -560,15 +571,15 @@ def _block_udf(block: pa.Table):
ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf)

ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 0)
np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1)

# 2 blocks/read tasks

ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf)

ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1)

# 2 blocks/read tasks, 1 empty block
Expand All @@ -581,7 +592,7 @@ def _block_udf(block: pa.Table):
)

ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
assert ds._plan.execute()._num_computed() == 2
check_num_computed(ds, 2, 0)
np.testing.assert_array_equal(sorted(ones), np.array(one_data[:2]) + 1)


Expand Down Expand Up @@ -611,17 +622,17 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat
ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism)

# Test metadata-only parquet ops.
assert ds._plan.execute()._num_computed() == 0
check_num_computed(ds, 0, 0)
assert ds.count() == num_dfs * 3
assert ds.size_bytes() > 0
assert ds.schema() is not None
input_files = ds.input_files()
assert len(input_files) == num_dfs, input_files
assert ds._plan.execute()._num_computed() == 1
check_num_computed(ds, 1, 1)

# Forces a data read.
values = [s["one"] for s in ds.take(limit=3 * num_dfs)]
assert ds._plan.execute()._num_computed() == parallelism
check_num_computed(ds, parallelism, 1)
assert sorted(values) == list(range(3 * num_dfs))


Expand Down
8 changes: 6 additions & 2 deletions python/ray/data/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared):
)


def test_optimize_incompatible_stages(ray_start_regular_shared):
def test_optimize_incompatible_stages(shutdown_only):
ray.shutdown()
ray.init(num_cpus=2)
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
context.optimize_fuse_read_stages = True
Expand Down Expand Up @@ -548,7 +550,9 @@ def test_optimize_incompatible_stages(ray_start_regular_shared):
)


def test_optimize_callable_classes(ray_start_regular_shared, tmp_path):
def test_optimize_callable_classes(shutdown_only, tmp_path):
ray.shutdown()
ray.init(num_cpus=2)
context = DatasetContext.get_current()
context.optimize_fuse_stages = True
context.optimize_fuse_read_stages = True
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/tests/test_size_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ def gen(name):


@pytest.mark.parametrize("use_actors", [False, True])
def test_split_map(ray_start_regular_shared, use_actors):
def test_split_map(shutdown_only, use_actors):
ray.shutdown()
ray.init(num_cpus=2)
kwargs = {}
if use_actors:
kwargs = {"compute": "actors"}
Expand Down
Loading

0 comments on commit a7098c4

Please sign in to comment.