Skip to content

Commit

Permalink
[Tune] Enable tune.ExperimentAnalysis to pull experiment checkpoint…
Browse files Browse the repository at this point in the history
… files from the cloud if needed (ray-project#34461)

For post-experiment analysis of a Tune run that uploaded results and checkpoints to S3, the node where analysis is being done may not contain the experiment directory. In this case, the experiment checkpoint + other files (json + csv result files and the param space) should be pulled to a temp directory in the local filesys.

While this adds functionality to `ExperimentAnalysis`, it also provides the functionality to:
1. `ResultGrid(ExperimentAnalysis("s3:https://..."))`, which is what we do in the `tuner.fit()`
2. `Tuner.restore("s3:https://...").get_results()`

Point 2 was the error that flagged this issue in the first place.

This PR also cleans up some confusing trial metadata loading code in `ExperimentAnalysis`.

Signed-off-by: Justin Yu <[email protected]>
  • Loading branch information
justinvyu committed Apr 21, 2023
1 parent 46fc663 commit 333c300
Show file tree
Hide file tree
Showing 19 changed files with 447 additions and 179 deletions.
39 changes: 27 additions & 12 deletions python/ray/air/_internal/checkpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ class _TrackedCheckpoint:
into `"evaluation/episode_reward_mean"`.
node_ip: IP of the node where the checkpoint was generated. Defaults
to the current node.
local_dir_to_remote_uri_fn: Function that takes in this checkpoint's local
directory path and returns the corresponding remote URI in the cloud.
This should only be specified if the data was synced to cloud.
Only applied during conversion to AIR checkpoint and only
if ``dir_or_data`` is or resolves to a directory path.
"""

def __init__(
Expand All @@ -69,16 +64,12 @@ def __init__(
checkpoint_id: Optional[int] = None,
metrics: Optional[Dict] = None,
node_ip: Optional[str] = None,
local_to_remote_path_fn: Optional[Callable[[str], str]] = None,
):
from ray.tune.result import NODE_IP

self.dir_or_data = dir_or_data
self.id = checkpoint_id
self.storage_mode = storage_mode
# This is a function because dir_or_data may be an object ref
# and we need to wait until its resolved first.
self.local_to_remote_path_fn = local_to_remote_path_fn

self.metrics = flatten_dict(metrics) if metrics else {}
self.node_ip = node_ip or self.metrics.get(NODE_IP, None)
Expand Down Expand Up @@ -142,7 +133,31 @@ def delete(
except Exception as e:
logger.warning(f"Checkpoint deletion failed: {e}")

def to_air_checkpoint(self) -> Optional[Checkpoint]:
def to_air_checkpoint(
self, local_to_remote_path_fn: Optional[Callable[[str], str]] = None
) -> Optional[Checkpoint]:
"""Converter from a `_TrackedCheckpoint` to a `ray.air.Checkpoint`.
This method Resolves the checkpoint data if it is an object reference.
This method handles multiple types of checkpoint data:
- If the data is a string (local checkpoint path), this returns a
directory-backed checkpoint.
- If a `local_to_remote_path_fn` is provided, this converts
local path to a remote URI, then returns a URI-backed checkpoint.
- If the data is bytes or a dictionary, it returns an in-memory
bytes/dict-backed checkpoint.
Args:
local_to_remote_path_fn: Function that takes in this checkpoint's local
directory path and returns the corresponding remote URI in the cloud.
This should only be specified if the data was synced to cloud.
Only applied during conversion to AIR checkpoint and only
if ``dir_or_data`` is or resolves to a directory path.
Returns:
Checkpoint: The AIR checkpoint backed by the resolved data.
"""
from ray.tune.trainable.util import TrainableUtil

checkpoint_data = self.dir_or_data
Expand All @@ -158,9 +173,9 @@ def to_air_checkpoint(self) -> Optional[Checkpoint]:

if isinstance(checkpoint_data, str):
# Prefer cloud checkpoints
if self.local_to_remote_path_fn:
if local_to_remote_path_fn:
checkpoint = Checkpoint.from_uri(
self.local_to_remote_path_fn(checkpoint_data)
local_to_remote_path_fn(checkpoint_data)
)
else:
try:
Expand Down
35 changes: 19 additions & 16 deletions python/ray/air/_internal/remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,24 @@ def is_non_local_path_uri(uri: str) -> bool:
_cached_fs = {}


def _is_local_path(uri: str) -> bool:
"""Check if the path points to the local filesystem."""
if len(uri) >= 1 and uri[0] == "/":
return True

def is_local_path(path: str) -> bool:
"""Check if a given path is a local path or a remote URI."""
if sys.platform == "win32":
return _is_local_windows_path(uri)
return False
return _is_local_windows_path(path)

scheme = urllib.parse.urlparse(path).scheme
return scheme in ("", "file")

def _is_local_windows_path(uri: str) -> bool:

def _is_local_windows_path(path: str) -> bool:
"""Determines if path is a Windows file-system location."""
if len(uri) >= 1 and uri[0] == "\\":
if len(path) >= 1 and path[0] == "\\":
return True
if (
len(uri) >= 3
and uri[1] == ":"
and (uri[2] == "/" or uri[2] == "\\")
and uri[0].isalpha()
len(path) >= 3
and path[1] == ":"
and (path[2] == "/" or path[2] == "\\")
and path[0].isalpha()
):
return True
return False
Expand All @@ -132,8 +131,9 @@ def get_fs_and_path(
if not pyarrow:
return None, None

if _is_local_path(uri):
# Append protocol such that the downstream operations work
scheme = urllib.parse.urlparse(uri).scheme
if is_local_path(uri) and not scheme:
# Append local filesys scheme such that the downstream operations work
# properly on Linux and Windows.
uri = "file:https://" + pathlib.Path(uri).as_posix()

Expand Down Expand Up @@ -284,10 +284,13 @@ def download_from_uri(uri: str, local_path: str, filelock: bool = True):
f"Hint: {fs_hint(uri)}"
)

_local_path = Path(local_path)
_local_path = Path(local_path).resolve()
exists_before = _local_path.exists()
if is_directory(uri):
_local_path.mkdir(parents=True, exist_ok=True)
else:
_local_path.parent.mkdir(parents=True, exist_ok=True)

try:
if filelock:
with TempFileLock(f"{os.path.normpath(local_path)}.lock"):
Expand Down
28 changes: 28 additions & 0 deletions python/ray/air/_internal/uri_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ def parent(self) -> "URI":
assert self._path.parent != ".", f"{str(self)} has no valid parent URI"
return URI(self._get_str_representation(self._parsed, self._path.parent))

@property
def scheme(self) -> str:
return self._parsed.scheme

@property
def path(self) -> str:
return str(self._path)

def __truediv__(self, path_to_append):
assert isinstance(path_to_append, str)
return URI(
Expand All @@ -59,3 +67,23 @@ def __repr__(self):

def __str__(self):
return self._get_str_representation(self._parsed, self._path)


def _join_path_or_uri(base_path: str, path_to_join: str) -> str:
"""Joins paths to form either a URI (w/ possible URL params) or a local path.
Example:
>>> local_path = "/a/b"
>>> uri = "s3:https://bucket/a?scheme=http"
>>> path_to_join = "c/d"
>>> _join_path_or_uri(local_path, path_to_join)
'/a/b/c/d'
>>> _join_path_or_uri(uri, path_to_join)
's3:https://bucket/a/c/d?scheme=http'
"""
from ray.air._internal.remote_storage import is_local_path

base_path_or_uri = Path(base_path) if is_local_path(base_path) else URI(base_path)
return str(base_path_or_uri / path_to_join)
6 changes: 3 additions & 3 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ py_test(
name = "test_experiment_analysis",
size = "medium",
srcs = ["tests/test_experiment_analysis.py"],
deps = [":tune_lib"],
deps = [":tune_lib", ":conftest"],
tags = ["team:ml", "exclusive"],
)

Expand Down Expand Up @@ -236,7 +236,7 @@ py_test(
name = "test_result_grid",
size = "medium",
srcs = ["tests/test_result_grid.py"],
deps = [":tune_lib"],
deps = [":tune_lib", ":conftest"],
tags = ["team:ml", "exclusive"],
)

Expand Down Expand Up @@ -308,7 +308,7 @@ py_test(
name = "test_syncer",
size = "medium",
srcs = ["tests/test_syncer.py"],
deps = [":tune_lib"],
deps = [":tune_lib", ":conftest"],
tags = ["team:ml", "exclusive"],
)

Expand Down
Loading

0 comments on commit 333c300

Please sign in to comment.