Skip to content

Commit

Permalink
Clean up RAY_DATASET_FORCE_LOCAL_METADATA flag (ray-project#32483)
Browse files Browse the repository at this point in the history
Follow up to ray-project#32015.
  • Loading branch information
jianoaix authored and cassidylaidlaw committed Mar 28, 2023
1 parent 1a6d843 commit 8938d31
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 30 deletions.
45 changes: 18 additions & 27 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import itertools
import logging
import os
import sys
import time
import html
Expand Down Expand Up @@ -2712,35 +2711,27 @@ def transform(blocks: Iterable[Block], ctx, fn) -> Iterable[Block]:
datasource.on_write_failed([], e)
raise
else:
logger.warning(
"The Datasource.do_write() is deprecated in "
"Ray 2.4 and will be removed in future release. Use "
"Datasource.write() instead."
)

ctx = DatasetContext.get_current()
blocks, metadata = zip(*self._plan.execute().get_blocks_with_metadata())

# TODO(ekl) remove this feature flag.
if "RAY_DATASET_FORCE_LOCAL_METADATA" in os.environ:
write_results: List[ObjectRef[WriteResult]] = datasource.do_write(
blocks, metadata, ray_remote_args=ray_remote_args, **write_args
)
else:
logger.warning(
"The Datasource.do_write() is deprecated in "
"Ray 2.4 and will be removed in future release. Use "
"Datasource.write() instead."
)
# Prepare write in a remote task so that in Ray client mode, we
# don't do metadata resolution from the client machine.
do_write = cached_remote_fn(
_do_write, retry_exceptions=False, num_cpus=0
)
write_results: List[ObjectRef[WriteResult]] = ray.get(
do_write.remote(
datasource,
ctx,
blocks,
metadata,
ray_remote_args,
_wrap_arrow_serialization_workaround(write_args),
)
# Prepare write in a remote task so that in Ray client mode, we
# don't do metadata resolution from the client machine.
do_write = cached_remote_fn(_do_write, retry_exceptions=False, num_cpus=0)
write_results: List[ObjectRef[WriteResult]] = ray.get(
do_write.remote(
datasource,
ctx,
blocks,
metadata,
ray_remote_args,
_wrap_arrow_serialization_workaround(write_args),
)
)

progress = ProgressBar("Write Progress", len(write_results))
try:
Expand Down
4 changes: 1 addition & 3 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, Union

import numpy as np
Expand Down Expand Up @@ -279,8 +278,7 @@ def read_datasource(
):
ray_remote_args["scheduling_strategy"] = "SPREAD"

# TODO(ekl) remove this feature flag.
force_local = "RAY_DATASET_FORCE_LOCAL_METADATA" in os.environ
force_local = False
cur_pg = ray.util.get_current_placement_group()
pa_ds = _lazy_import_pyarrow_dataset()
if pa_ds:
Expand Down

0 comments on commit 8938d31

Please sign in to comment.