Skip to content

Commit

Permalink
Revert "Revert "[runtime env] plugin refactor[2/n]: support json sche… (
Browse files Browse the repository at this point in the history
  • Loading branch information
SongGuyang authored Jul 12, 2022
1 parent adfdc26 commit 22dfd1f
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 3 deletions.
6 changes: 6 additions & 0 deletions python/ray/_private/runtime_env/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@

# The plugins which should be loaded when ray cluster starts.
RAY_RUNTIME_ENV_PLUGINS_ENV_VAR = "RAY_RUNTIME_ENV_PLUGINS"

# The schema files or directories of plugins which should be loaded in workers.
RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR = "RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"

# The file suffix of runtime env plugin schemas.
RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX = ".json"
2 changes: 1 addition & 1 deletion python/ray/_private/runtime_env/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def delete_uri(self, uri: str, logger: logging.Logger) -> float:


class RuntimeEnvPluginManager:
"""This mananger is used to load plugins in runtime env agent."""
"""This manager is used to load plugins in runtime env agent."""

def __init__(self):
self.plugins = {}
Expand Down
88 changes: 88 additions & 0 deletions python/ray/_private/runtime_env/plugin_schema_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
import jsonschema
import logging
from typing import List
import json
from ray._private.runtime_env.constants import (
RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR,
RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX,
)

logger = logging.getLogger(__name__)


class RuntimeEnvPluginSchemaManager:
"""This manager is used to load plugin json schemas."""

default_schema_path = os.path.join(os.path.dirname(__file__), "schemas")
schemas = {}
loaded = False

@classmethod
def _load_schemas(cls, schema_paths: List[str]):
for schema_path in schema_paths:
try:
schema = json.load(open(schema_path))
except json.decoder.JSONDecodeError:
logger.error("Invalid runtime env schema %s, skip it.", schema_path)
if "title" not in schema:
logger.error(
"No valid title in runtime env schema %s, skip it.", schema_path
)
continue
if schema["title"] in cls.schemas:
logger.error(
"The 'title' of runtime env schema %s conflicts with %s, skip it.",
schema_path,
cls.schemas[schema["title"]],
)
continue
cls.schemas[schema["title"]] = schema

@classmethod
def _load_default_schemas(cls):
schema_json_files = list()
for root, _, files in os.walk(cls.default_schema_path):
for f in files:
if f.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX):
schema_json_files.append(os.path.join(root, f))
logger.info(
f"Loading the default runtime env schemas: {schema_json_files}."
)
cls._load_schemas(schema_json_files)

@classmethod
def _load_schemas_from_env_var(cls):
# The format of env var:
# "/path/to/env_1_schema.json,/path/to/env_2_schema.json,/path/to/schemas_dir/"
schema_paths = os.environ.get(RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR)
if schema_paths:
schema_json_files = list()
for path in schema_paths.split(","):
if path.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX):
schema_json_files.append(path)
elif os.path.isdir(path):
for root, _, files in os.walk(path):
for f in files:
if f.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX):
schema_json_files.append(os.path.join(root, f))
logger.info(
f"Loading the runtime env schemas from env var: {schema_json_files}."
)
cls._load_schemas(schema_json_files)

@classmethod
def validate(cls, name, instance):
if not cls.loaded:
# Load the schemas lazily.
cls._load_default_schemas()
cls._load_schemas_from_env_var()
cls.loaded = True
# if no schema matches, skip the validation.
if name in cls.schemas:
jsonschema.validate(instance=instance, schema=cls.schemas[name])

@classmethod
def clear(cls):
cls.schemas.clear()
cls.loaded = False
50 changes: 50 additions & 0 deletions python/ray/_private/runtime_env/schemas/pip_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"$schema": "https://json-schema.org/draft-07/schema#",
"$id": "https://github.com/ray-project/ray/runtime_env/pip_schema.json",
"title": "pip",
"description": "A pip environment specification.",
"oneOf": [
{
"type": "object",
"properties": {
"packages": {
"oneOf": [
{
"type": "array",
"items": {
"type": "string"
},
"description": "a list of pip packages"
},
{
"type": "string",
"description": "the path to a pip `requirements.txt` file"
}
]
},
"pip_check": {
"type": "boolean",
"description": "whether to enable pip check at the end of pip install"
},
"pip_version": {
"type": "string",
"description": "the version of pip"
}
},
"required": [
"packages"
]
},
{
"type": "string",
"description": "the path to a pip `requirements.txt` file"
},
{
"type": "array",
"items": {
"type": "string"
},
"description": "a list of pip requirements specifiers"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"$schema": "https://json-schema.org/draft-07/schema#",
"$id": "https://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"title": "working_dir",
"type": "string",
"description": "Specifies the working directory for the Ray workers."
}
3 changes: 2 additions & 1 deletion python/ray/runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ray._private.ray_constants import DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS
from ray._private.runtime_env.conda import get_uri as get_conda_uri
from ray._private.runtime_env.pip import get_uri as get_pip_uri
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN
from ray.core.generated.runtime_env_common_pb2 import RuntimeEnv as ProtoRuntimeEnv
from ray.core.generated.runtime_env_common_pb2 import (
Expand Down Expand Up @@ -430,8 +431,8 @@ def get_uris(self) -> List[str]:
return plugin_uris

def __setitem__(self, key: str, value: Any) -> None:
# TODO(SongGuyang): Validate the schemas of plugins by json schema.
res_value = value
RuntimeEnvPluginSchemaManager.validate(key, res_value)
if key in RuntimeEnv.known_fields and key in OPTION_TO_VALIDATION_FN:
res_value = OPTION_TO_VALIDATION_FN[key](value)
if res_value is None:
Expand Down
13 changes: 13 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import ray._private.ray_constants as ray_constants
import ray.util.client.server.server as ray_client_server
from ray._private.runtime_env.pip import PipProcessor
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray._private.services import (
REDIS_EXECUTABLE,
_start_redis_instance,
Expand Down Expand Up @@ -941,3 +942,15 @@ def set_runtime_env_plugins(request):
yield runtime_env_plugins
finally:
del os.environ["RAY_RUNTIME_ENV_PLUGINS"]


@pytest.fixture
def set_runtime_env_plugin_schemas(request):
runtime_env_plugin_schemas = getattr(request, "param", "0")
try:
os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] = runtime_env_plugin_schemas
# Clear and reload schemas.
RuntimeEnvPluginSchemaManager.clear()
yield runtime_env_plugin_schemas
finally:
del os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"]
79 changes: 79 additions & 0 deletions python/ray/tests/test_runtime_env_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from ray import job_config
import yaml
import jsonschema

from ray._private.runtime_env.validation import (
parse_and_validate_excludes,
Expand All @@ -14,6 +15,7 @@
parse_and_validate_env_vars,
parse_and_validate_py_modules,
)
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray.runtime_env import RuntimeEnv

CONDA_DICT = {"dependencies": ["pip", {"pip": ["pip-install-test==0.5"]}]}
Expand Down Expand Up @@ -298,6 +300,83 @@ def test_parse_runtime_env_from_json_env_variable(self):
assert config.metadata == {}


schemas_dir = os.path.dirname(__file__)
test_env_1 = os.path.join(
os.path.dirname(__file__), "test_runtime_env_validation_1_schema.json"
)
test_env_2 = os.path.join(
os.path.dirname(__file__), "test_runtime_env_validation_2_schema.json"
)


@pytest.mark.parametrize(
"set_runtime_env_plugin_schemas",
[schemas_dir, f"{test_env_1},{test_env_2}"],
indirect=True,
)
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
class TestValidateByJsonSchema:
def test_validate_pip(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("pip", {"packages": ["requests"], "pip_check": True})
with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"):
runtime_env.set("pip", {"packages": ["requests"], "pip_check": "1"})
runtime_env["pip"] = {"packages": ["requests"], "pip_check": True}
with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"):
runtime_env["pip"] = {"packages": ["requests"], "pip_check": "1"}

def test_validate_working_dir(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("working_dir", "https://abc/file.zip")
with pytest.raises(jsonschema.exceptions.ValidationError, match="working_dir"):
runtime_env.set("working_dir", ["https://abc/file.zip"])
runtime_env["working_dir"] = "https://abc/file.zip"
with pytest.raises(jsonschema.exceptions.ValidationError, match="working_dir"):
runtime_env["working_dir"] = ["https://abc/file.zip"]

def test_validate_test_env_1(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("test_env_1", {"array": ["123"], "bool": True})
with pytest.raises(jsonschema.exceptions.ValidationError, match="bool"):
runtime_env.set("test_env_1", {"array": ["123"], "bool": "1"})

def test_validate_test_env_2(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("test_env_2", "123")
with pytest.raises(jsonschema.exceptions.ValidationError, match="test_env_2"):
runtime_env.set("test_env_2", ["123"])


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
class TestRuntimeEnvPluginSchemaManager:
def test(self):
RuntimeEnvPluginSchemaManager.clear()
# No schemas when starts.
assert len(RuntimeEnvPluginSchemaManager.schemas) == 0
# When the `validate` is used first time, the schemas will be loaded lazily.
# The validation of pip is enabled.
with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"):
RuntimeEnvPluginSchemaManager.validate(
"pip", {"packages": ["requests"], "pip_check": "123"}
)
# The validation of test_env_1 is disabled because we haven't set the env var.
RuntimeEnvPluginSchemaManager.validate(
"test_env_1", {"array": ["123"], "bool": "123"}
)
assert len(RuntimeEnvPluginSchemaManager.schemas) != 0
# Set the thirdparty schemas.
os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] = schemas_dir
# clear the loaded schemas to make sure the schemas chould be reloaded next
# time.
RuntimeEnvPluginSchemaManager.clear()
assert len(RuntimeEnvPluginSchemaManager.schemas) == 0
# The validation of test_env_1 is enabled.
with pytest.raises(jsonschema.exceptions.ValidationError, match="bool"):
RuntimeEnvPluginSchemaManager.validate(
"test_env_1", {"array": ["123"], "bool": "123"}
)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
20 changes: 20 additions & 0 deletions python/ray/tests/test_runtime_env_validation_1_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"$schema": "https://json-schema.org/draft-07/schema#",
"$id": "https://github.com/ray-project/ray/runtime_env/pip_schema.json",
"title": "test_env_1",
"type": "object",
"properties": {
"array": {
"type": "array",
"items": {
"type": "string"
}
},
"bool": {
"type": "boolean"
}
},
"required": [
"array"
]
}
6 changes: 6 additions & 0 deletions python/ray/tests/test_runtime_env_validation_2_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"$schema": "https://json-schema.org/draft-07/schema#",
"$id": "https://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"title": "test_env_2",
"type": "string"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"$schema": "https://json-schema.org/draft-07/schema#",
"$id": "https://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"type": "string"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"$schema": "https://json-schema.org/draft-07/schema#",
"$id": "https://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"type": "string"

2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace gcs {
/// from being too large to review.
///
/// 1). Remove `node_resource_usages_` related code as it could be calculated from
/// `cluseter_resource_mananger`
/// `cluster_resource_manager`
/// 2). Move all resource-write-related logic out from `gcs_resource_manager`
/// 3). Move `placement_group_load_` from `gcs_resource_manager` to
/// `placement_group_manager` and make `gcs_resource_manager` depend on
Expand Down

0 comments on commit 22dfd1f

Please sign in to comment.