Skip to content

Commit

Permalink
[runtime env] Async pip runtime env (ray-project#22381)
Browse files Browse the repository at this point in the history
In order to initialize runtime env concurrently, this PR makes pip runtime env asynchronous. It includes,

- [x] New `check_output_cmd` in runtime env utils.
- [x] Async PipProcessor.
- [x] The `asynccontextmanager` from `https://github.com/python-trio/async_generator` for Python 3.6
- [x] Remove pip runtime env lock.
- [x] Disable pip cache.

Co-authored-by: 刘宝 <[email protected]>
  • Loading branch information
fyrestone and 刘宝 committed Feb 24, 2022
1 parent eb99607 commit 6a9a286
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 51 deletions.
18 changes: 18 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,21 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

--------------------------------------------------------------------------------
Code in python/ray/_private/async_compat.py is adapted from
https://github.com/python-trio/async_generator/blob/master/async_generator/_util.py

Copyright (c) 2022, Nathaniel J. Smith

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http:https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
129 changes: 129 additions & 0 deletions python/ray/_private/async_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@
uvloop = None


try:
# This function has been added in Python 3.7. Prior to Python 3.7,
# the low-level asyncio.ensure_future() function can be used instead.
from asyncio import create_task # noqa: F401
except ImportError:
from asyncio import ensure_future as create_task # noqa: F401


try:
from asyncio import get_running_loop # noqa: F401
except ImportError:
from asyncio import _get_running_loop as get_running_loop # noqa: F401


def get_new_event_loop():
"""Construct a new event loop. Ray will use uvloop if it exists"""
if uvloop:
Expand All @@ -29,3 +43,118 @@ async def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper


try:
from contextlib import asynccontextmanager
except ImportError:
# Copy from https://github.com/python-trio/async_generator
# for compatible with Python 3.6
import sys
from functools import wraps
from inspect import isasyncgenfunction

class _aclosing:
def __init__(self, aiter):
self._aiter = aiter

async def __aenter__(self):
return self._aiter

async def __aexit__(self, *args):
await self._aiter.aclose()

# Very much derived from the one in contextlib, by copy/pasting and then
# asyncifying everything. (Also I dropped the obscure support for using
# context managers as function decorators. It could be re-added; I just
# couldn't be bothered.)
# So this is a derivative work licensed under the PSF License, which requires
# the following notice:
#
# Copyright © 2001-2017 Python Software Foundation; All Rights Reserved
class _AsyncGeneratorContextManager:
def __init__(self, func, args, kwds):
self._func_name = func.__name__
self._agen = func(*args, **kwds).__aiter__()

async def __aenter__(self):
if sys.version_info < (3, 5, 2):
self._agen = await self._agen
try:
return await self._agen.asend(None)
except StopAsyncIteration:
raise RuntimeError("async generator didn't yield") from None

async def __aexit__(self, type, value, traceback):
async with _aclosing(self._agen):
if type is None:
try:
await self._agen.asend(None)
except StopAsyncIteration:
return False
else:
raise RuntimeError("async generator didn't stop")
else:
# It used to be possible to have type != None, value == None:
# https://bugs.python.org/issue1705170
# but AFAICT this can't happen anymore.
assert value is not None
try:
await self._agen.athrow(type, value, traceback)
raise RuntimeError("async generator didn't stop after athrow()")
except StopAsyncIteration as exc:
# Suppress StopIteration *unless* it's the same exception
# that was passed to throw(). This prevents a
# StopIteration raised inside the "with" statement from
# being suppressed.
return exc is not value
except RuntimeError as exc:
# Don't re-raise the passed in exception. (issue27112)
if exc is value:
return False
# Likewise, avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a
# RuntimeError (see PEP 479).
if (
isinstance(value, (StopIteration, StopAsyncIteration))
and exc.__cause__ is value
):
return False
raise
except: # noqa: E722
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise an
# exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so
# this fixes the impedance mismatch between the throw()
# protocol and the __exit__() protocol.
#
if sys.exc_info()[1] is value:
return False
raise

def __enter__(self):
raise RuntimeError(
"use 'async with {func_name}(...)', not 'with {func_name}(...)'".format(
func_name=self._func_name
)
)

def __exit__(self): # pragma: no cover
assert False, """Never called, but should be defined"""

def asynccontextmanager(func):
"""Like @contextmanager, but async."""
if not isasyncgenfunction(func):
raise TypeError(
"must be an async generator (native or from async_generator; "
"if using @async_generator then @acontextmanager must be on top."
)

@wraps(func)
def helper(*args, **kwds):
return _AsyncGeneratorContextManager(func, args, kwds)

# A hint for sphinxcontrib-trio:
helper.__returns_acontextmanager__ = True
return helper
111 changes: 63 additions & 48 deletions python/ray/_private/runtime_env/pip.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import contextlib
import os
import sys
import json
import logging
import hashlib
import shutil

from filelock import FileLock
from typing import Optional, List, Dict, Tuple

from ray._private.runtime_env.conda_utils import exec_cmd_stream_to_logger
from ray._private.async_compat import asynccontextmanager, create_task, get_running_loop
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.packaging import Protocol, parse_uri
from ray._private.runtime_env.utils import RuntimeEnv
from ray._private.utils import get_directory_size_bytes, try_to_create_directory
from ray._private.runtime_env.utils import RuntimeEnv, check_output_cmd
from ray._private.utils import (
get_directory_size_bytes,
try_to_create_directory,
)

default_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,8 +87,8 @@ def _is_in_virtualenv() -> bool:
)

@staticmethod
@contextlib.contextmanager
def _check_ray(python: str, cwd: str, logger: logging.Logger):
@asynccontextmanager
async def _check_ray(python: str, cwd: str, logger: logging.Logger):
"""A context manager to check ray is not overwritten.
Currently, we only check ray version and path. It works for virtualenv,
Expand All @@ -96,24 +97,22 @@ def _check_ray(python: str, cwd: str, logger: logging.Logger):
- ray is in virtualenv's site-packages.
"""

def _get_ray_version_and_path() -> Tuple[str, str]:
async def _get_ray_version_and_path() -> Tuple[str, str]:
check_ray_cmd = [
python,
"-c",
"import ray; print(ray.__version__, ray.__path__[0])",
]
exit_code, output = exec_cmd_stream_to_logger(
check_ray_cmd, logger, cwd=cwd, env={}
output = await check_output_cmd(
check_ray_cmd, logger=logger, cwd=cwd, env={}
)
if exit_code != 0:
raise RuntimeError("Get ray version and path failed.")
# print after import ray may have  endings, so we strip them by *_
ray_version, ray_path, *_ = [s.strip() for s in output.split()]
return ray_version, ray_path

version, path = _get_ray_version_and_path()
version, path = await _get_ray_version_and_path()
yield
actual_version, actual_path = _get_ray_version_and_path()
actual_version, actual_path = await _get_ray_version_and_path()
if actual_version != version or actual_path != path:
raise RuntimeError(
"Changing the ray version is not allowed: \n"
Expand All @@ -126,7 +125,9 @@ def _get_ray_version_and_path() -> Tuple[str, str]:
)

@classmethod
def _create_or_get_virtualenv(cls, path: str, cwd: str, logger: logging.Logger):
async def _create_or_get_virtualenv(
cls, path: str, cwd: str, logger: logging.Logger
):
"""Create or get a virtualenv from path."""

python = sys.executable
Expand Down Expand Up @@ -189,16 +190,10 @@ def _create_or_get_virtualenv(cls, path: str, cwd: str, logger: logging.Logger):
virtualenv_path,
current_python_dir,
)
exit_code, output = exec_cmd_stream_to_logger(
create_venv_cmd, logger, cwd=cwd, env={}
)
if exit_code != 0:
raise RuntimeError(
f"Failed to create virtualenv {virtualenv_path}:\n{output}"
)
await check_output_cmd(create_venv_cmd, logger=logger, cwd=cwd, env={})

@classmethod
def _install_pip_packages(
async def _install_pip_packages(
cls,
path: str,
pip_packages: List[str],
Expand All @@ -209,28 +204,39 @@ def _install_pip_packages(
python = _PathHelper.get_virtualenv_python(path)
# TODO(fyrestone): Support -i, --no-deps, --no-cache-dir, ...
pip_requirements_file = _PathHelper.get_requirements_file(path)
with open(pip_requirements_file, "w") as file:
for line in pip_packages:
file.write(line + "\n")

def _gen_requirements_txt():
with open(pip_requirements_file, "w") as file:
for line in pip_packages:
file.write(line + "\n")

# Avoid blocking the event loop.
loop = get_running_loop()
await loop.run_in_executor(None, _gen_requirements_txt)

# pip options
#
# --disable-pip-version-check
# Don't periodically check PyPI to determine whether a new version
# of pip is available for download.
#
# --no-cache-dir
# Disable the cache, the pip runtime env is a one-time installation,
# and we don't need to handle the pip cache broken.
pip_install_cmd = [
python,
"-m",
"pip",
"install",
"--disable-pip-version-check",
"--no-cache-dir",
"-r",
pip_requirements_file,
]
logger.info("Installing python requirements to %s", virtualenv_path)
exit_code, output = exec_cmd_stream_to_logger(
pip_install_cmd, logger, cwd=cwd, env={}
)
if exit_code != 0:
raise RuntimeError(
f"Failed to install python requirements to {virtualenv_path}:\n{output}"
)
await check_output_cmd(pip_install_cmd, logger=logger, cwd=cwd, env={})

def run(self):
async def _run(self):
path = self._target_dir
logger = self._logger
pip_packages = self._runtime_env.pip_packages()
Expand All @@ -240,27 +246,26 @@ def run(self):
exec_cwd = os.path.join(path, "exec_cwd")
os.makedirs(exec_cwd, exist_ok=True)
try:
self._create_or_get_virtualenv(path, exec_cwd, logger)
await self._create_or_get_virtualenv(path, exec_cwd, logger)
python = _PathHelper.get_virtualenv_python(path)
with self._check_ray(python, exec_cwd, logger):
self._install_pip_packages(path, pip_packages, exec_cwd, logger)
async with self._check_ray(python, exec_cwd, logger):
await self._install_pip_packages(path, pip_packages, exec_cwd, logger)
# TODO(fyrestone): pip check.
except Exception:
logger.info("Delete incomplete virtualenv: %s", path)
shutil.rmtree(path, ignore_errors=True)
logger.exception("Failed to install pip packages.")
raise

def __await__(self):
return self._run().__await__()


class PipManager:
def __init__(self, resources_dir: str):
self._pip_resources_dir = os.path.join(resources_dir, "pip")
self._creating_task = {}
try_to_create_directory(self._pip_resources_dir)
# Concurrent pip installs are unsafe. This lock prevents concurrent
# installs (and deletions).
self._installs_and_deletions_file_lock = os.path.join(
self._pip_resources_dir, "ray-pip-installs-and-deletions.lock"
)

def _get_path_from_hash(self, hash: str) -> str:
"""Generate a path from the hash of a pip spec.
Expand Down Expand Up @@ -290,11 +295,15 @@ def delete_uri(
f"pip. Received protocol {protocol}, URI {uri}"
)

# Cancel running create task.
task = self._creating_task.pop(hash, None)
if task is not None:
task.cancel()

pip_env_path = self._get_path_from_hash(hash)
local_dir_size = get_directory_size_bytes(pip_env_path)
try:
with FileLock(self._installs_and_deletions_file_lock):
shutil.rmtree(pip_env_path)
shutil.rmtree(pip_env_path)
except OSError as e:
logger.warning(f"Error when deleting pip env {pip_env_path}: {str(e)}")
return 0
Expand All @@ -314,11 +323,17 @@ async def create(
protocol, hash = parse_uri(uri)
target_dir = self._get_path_from_hash(hash)

with FileLock(self._installs_and_deletions_file_lock):
pip_processor = PipProcessor(target_dir, runtime_env, logger)
pip_processor.run()
async def _create_for_hash():
await PipProcessor(target_dir, runtime_env, logger)

loop = get_running_loop()
return await loop.run_in_executor(
None, get_directory_size_bytes, target_dir
)

return get_directory_size_bytes(target_dir)
self._creating_task[hash] = task = create_task(_create_for_hash())
task.add_done_callback(lambda _: self._creating_task.pop(hash, None))
return await task

def modify_context(
self,
Expand Down
Loading

0 comments on commit 6a9a286

Please sign in to comment.