Skip to content

Commit

Permalink
[air] Use filesystem wrapper to exclude files from upload (ray-projec…
Browse files Browse the repository at this point in the history
…t#34102)

Ray Tune uploads experiment state using pyarrow. When cloud checkpointing is configured, the driver will exclude any trial-level checkpoints. Pyarrow does not natively support file exclusion, though - instead, we repeatedly call `pyarrow.fs.copy_files` on single non-excluded files.

This seems to be inefficient as the connection to the remote filesystem is opened and closed repeatedly. It also means we can never leverage multi-threaded upload. This PR implements a custom fsspec-based local filesystem that excludes files on the selector level. Thus, we can call pyarrow.fs.copy_files exactly once, with a selector that does not see the excluded files. 

Edit: [See here for benchmark results](ray-project#34102 (comment))

Signed-off-by: Kai Fricke <[email protected]>
  • Loading branch information
krfricke committed Apr 25, 2023
1 parent 790ef9e commit 8728c77
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 5 deletions.
2 changes: 2 additions & 0 deletions doc/source/tune/api/env.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ These are the environment variables Ray Tune currently considers:
repeatedly every this amount of seconds. Defaults to 60 (seconds).
* **TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S**: Threshold for throwing a warning if the experiment state is synced
multiple times in that many seconds. Defaults to 30 (seconds).
* **TUNE_WARN_SLOW_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S**: Threshold for throwing a warning if the experiment state syncing
takes longer than this time in seconds. Defaults to 30 (seconds).
* **TUNE_STATE_REFRESH_PERIOD**: Frequency of updating the resource tracking from Ray. Defaults to 10 (seconds).
* **TUNE_RESTORE_RETRY_NUM**: The number of retries that are done before a particular trial's restore is determined
unsuccessful. After that, the trial is not restored to its previous checkpoint but rather from scratch.
Expand Down
71 changes: 69 additions & 2 deletions python/ray/air/_internal/remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@

try:
import fsspec
from fsspec.implementations.local import LocalFileSystem

except ImportError:
fsspec = None
LocalFileSystem = object

try:
import pyarrow
Expand All @@ -40,6 +42,52 @@ def create_dir(self, path, recursive):
from ray import logger


class _ExcludingLocalFilesystem(LocalFileSystem):
"""LocalFileSystem wrapper to exclude files according to patterns.
Args:
exclude: List of patterns that are applied to files returned by
``self.find()``. If a file path matches this pattern, it will
be excluded.
"""

def __init__(self, exclude: List[str], **kwargs):
super().__init__(**kwargs)
self._exclude = exclude

@property
def fsid(self):
return "_excluding_local"

def _should_exclude(self, name: str) -> bool:
"""Return True if `name` matches any of the `self._exclude` patterns."""
alt = None
if os.path.isdir(name):
# If this is a directory, also test it with trailing slash
alt = os.path.join(name, "")
for excl in self._exclude:
if fnmatch.fnmatch(name, excl):
return True
if alt and fnmatch.fnmatch(alt, excl):
return True
return False

def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
"""Call parent find() and exclude from result."""
names = super().find(
path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs
)
if detail:
return {
name: out
for name, out in names.items()
if not self._should_exclude(name)
}
else:
return [name for name in names if not self._should_exclude(name)]


def _pyarrow_fs_copy_files(
source, destination, source_filesystem=None, destination_filesystem=None, **kwargs
):
Expand Down Expand Up @@ -334,14 +382,33 @@ def upload_to_uri(
if not exclude:
_ensure_directory(bucket_path, fs=fs)
_pyarrow_fs_copy_files(local_path, bucket_path, destination_filesystem=fs)
elif fsspec:
# If fsspec is available, prefer it because it's more efficient than
# calling pyarrow.fs.copy_files multiple times
_upload_to_uri_with_exclude_fsspec(
local_path=local_path, fs=fs, bucket_path=bucket_path, exclude=exclude
)
else:
# Walk the filetree and upload
_upload_to_uri_with_exclude(
_upload_to_uri_with_exclude_pyarrow(
local_path=local_path, fs=fs, bucket_path=bucket_path, exclude=exclude
)


def _upload_to_uri_with_exclude(
def _upload_to_uri_with_exclude_fsspec(
local_path: str, fs: "pyarrow.fs", bucket_path: str, exclude: Optional[List[str]]
) -> None:
local_fs = _ExcludingLocalFilesystem(exclude=exclude)
handler = pyarrow.fs.FSSpecHandler(local_fs)
source_fs = pyarrow.fs.PyFileSystem(handler)

_ensure_directory(bucket_path, fs=fs)
_pyarrow_fs_copy_files(
local_path, bucket_path, source_filesystem=source_fs, destination_filesystem=fs
)


def _upload_to_uri_with_exclude_pyarrow(
local_path: str, fs: "pyarrow.fs", bucket_path: str, exclude: Optional[List[str]]
) -> None:
def _should_exclude(candidate: str) -> bool:
Expand Down
40 changes: 40 additions & 0 deletions python/ray/air/tests/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ def test_upload_exclude_multimatch(temp_data_dirs):
assert_file(False, tmp_target, "subdir_exclude/something/somewhere.txt")


@pytest.mark.parametrize("no_fsspec", [False, True])
def test_upload_local_exclude_multi(temp_data_dirs, no_fsspec):
if no_fsspec:
with patch("ray.air._internal.remote_storage.fsspec", None):
return test_upload_local_exclude_multi(temp_data_dirs, no_fsspec=False)

tmp_source, tmp_target = temp_data_dirs

upload_to_uri(tmp_source, tmp_target, exclude=["*_exclude.txt", "*_exclude/*"])

assert_file(True, tmp_target, "level0.txt")
assert_file(False, tmp_target, "level0_exclude.txt")
assert_file(True, tmp_target, "subdir/level1.txt")
assert_file(False, tmp_target, "subdir/level1_exclude.txt")
assert_file(True, tmp_target, "subdir/nested/level2.txt")
assert_file(False, tmp_target, "subdir_nested_level2_exclude.txt")
assert_file(False, tmp_target, "subdir_exclude")
assert_file(False, tmp_target, "subdir_exclude/something/somewhere.txt")


@pytest.mark.parametrize("no_fsspec", [False, True])
def test_upload_local_exclude_multimatch(temp_data_dirs, no_fsspec):
if no_fsspec:
with patch("ray.air._internal.remote_storage.fsspec", None):
return test_upload_local_exclude_multimatch(temp_data_dirs, no_fsspec=False)

tmp_source, tmp_target = temp_data_dirs

upload_to_uri(tmp_source, tmp_target, exclude=["*_exclude*"])

assert_file(True, tmp_target, "level0.txt")
assert_file(False, tmp_target, "level0_exclude.txt")
assert_file(True, tmp_target, "subdir/level1.txt")
assert_file(False, tmp_target, "subdir/level1_exclude.txt")
assert_file(True, tmp_target, "subdir/nested/level2.txt")
assert_file(False, tmp_target, "subdir_nested_level2_exclude.txt")
assert_file(False, tmp_target, "subdir_exclude")
assert_file(False, tmp_target, "subdir_exclude/something/somewhere.txt")


def test_get_recursive_files_race_con(temp_data_dirs):
tmp_source, _ = temp_data_dirs

Expand Down
36 changes: 33 additions & 3 deletions python/ray/tune/execution/experiment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ray.tune.impl.out_of_band_serialize_dataset import out_of_band_serialize_dataset
from ray.tune.syncer import SyncConfig, get_node_to_storage_syncer


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -149,6 +148,13 @@ def __init__(
# Upload triggered by trial checkpoints
self._sync_every_n_trial_checkpoints = sync_every_n_trial_checkpoints
self._trial_num_checkpoints_since_last_sync: Dict[Trial, int] = Counter()

self._slow_sync_threshold = float(
os.environ.get(
"TUNE_WARN_SLOW_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S", "30"
)
)

self._excessive_sync_threshold = float(
os.environ.get(
"TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S", "30"
Expand Down Expand Up @@ -277,17 +283,41 @@ def sync_up(self, force: bool = False, wait: bool = False) -> bool:
exclude=exclude,
)

start_time = time.monotonic()
if wait:
self._syncer.wait()

now = time.monotonic()
sync_time_taken = now - start_time

if sync_time_taken > self._slow_sync_threshold:
try:
import fsspec
except Exception:
fsspec = None

fsspec_msg = ""
if fsspec is None:
fsspec_msg = (
"If your data is small, try installing fsspec "
"(`pip install fsspec`) for more efficient local file parsing. "
)

logger.warning(
"Syncing the experiment checkpoint to cloud took a long time with "
f"{sync_time_taken:.2f} seconds. This can be due to a large number "
f"of trials, large logfiles, or throttling from the "
f"remote storage provider for too frequent syncs. {fsspec_msg}"
f"If your `CheckpointConfig.num_to_keep` is a low number, this can "
f"trigger frequent syncing, in which case you should increase it. "
)

if not synced:
return False

self._should_force_cloud_sync = False
self._trial_num_checkpoints_since_last_sync.clear()

# syncing might have taken some time, so we grab the current timestamp again
now = time.time()
if now - self._last_sync_time < self._excessive_sync_threshold:
logger.warning(
"Experiment checkpoint syncing has been triggered multiple "
Expand Down

0 comments on commit 8728c77

Please sign in to comment.