Skip to content

Commit

Permalink
[Jobs] [Logs] Stream runtime env log to job log file (#44405)
Browse files Browse the repository at this point in the history
When a job is submitted, it stays in a "PENDING" state before becoming "RUNNING". In practice, this is nearly always because the runtime_env is being installed. However, it's not obvious where to see the progress of the runtime_env installation.

For better observability, this PR streams the runtime_env log to the existing job-driver-<submission_id>.log file.

Note that currently the Job log API itself won't return the runtime_env setup log until the job starts, because the job_head dashboard module needs to know which node the runtime env is being installed on before the job starts, and we don't have a way of surfacing that information yet. I'll file an issue and link it here. The log file itself will still have the runtime_env setup logs written to it in real time.

Currently this feature is gated behind an environment variable (defaults to OFF), in case the logs are too spammy.

---------

Signed-off-by: Archit Kulkarni <[email protected]>
  • Loading branch information
architkulkarni committed Apr 10, 2024
1 parent 11810c6 commit 01d975e
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 65 deletions.
3 changes: 3 additions & 0 deletions dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
WAIT_AVAILABLE_AGENT_TIMEOUT = 10
TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS = 0.1
RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR = "RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES"
RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR = (
"RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG"
)

# The max time to wait for the JobSupervisor to start before failing the job.
DEFAULT_JOB_START_TIMEOUT_SECONDS = 60 * 15
Expand Down
18 changes: 15 additions & 3 deletions dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR,
DEFAULT_JOB_START_TIMEOUT_SECONDS,
RAY_JOB_START_TIMEOUT_SECONDS_ENV_VAR,
RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR,
)
from ray.dashboard.modules.job.common import (
JOB_ID_METADATA_KEY,
Expand All @@ -43,6 +44,7 @@
from ray.job_submission import JobStatus
from ray._private.event.event_logger import get_event_logger
from ray.core.generated.event_pb2 import Event
from ray.runtime_env import RuntimeEnvConfig

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -225,7 +227,9 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen:
child_process: Child process that runs the driver command. Can be
terminated or killed upon user calling stop().
"""
with open(logs_path, "w") as logs_file:
# Open in append mode to avoid overwriting runtime_env setup logs for the
# supervisor actor, which are also written to the same file.
with open(logs_path, "a") as logs_file:
child_process = subprocess.Popen(
self._entrypoint,
shell=True,
Expand Down Expand Up @@ -771,7 +775,10 @@ def _handle_supervisor_startup(self, job_id: str, result: Optional[Exception]):
return

def _get_supervisor_runtime_env(
self, user_runtime_env: Dict[str, Any], resources_specified: bool = False
self,
user_runtime_env: Dict[str, Any],
submission_id: str,
resources_specified: bool = False,
) -> Dict[str, Any]:
"""Configure and return the runtime_env for the supervisor actor.
Expand Down Expand Up @@ -805,6 +812,11 @@ def _get_supervisor_runtime_env(
# the driver's runtime_env so it isn't inherited by tasks & actors.
env_vars[ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR] = "1"
runtime_env["env_vars"] = env_vars

if os.getenv(RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR, "0") == "1":
config = runtime_env.get("config", RuntimeEnvConfig())
config["log_files"] = [self._log_client.get_log_file_path(submission_id)]
runtime_env["config"] = config
return runtime_env

async def _get_scheduling_strategy(
Expand Down Expand Up @@ -975,7 +987,7 @@ async def submit_job(
resources=entrypoint_resources,
scheduling_strategy=scheduling_strategy,
runtime_env=self._get_supervisor_runtime_env(
runtime_env, resources_specified
runtime_env, submission_id, resources_specified
),
namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE,
).remote(submission_id, entrypoint, metadata or {}, self._gcs_address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import ray

# This prefix is used to identify the output log line that contains the runtime env.
RUNTIME_ENV_LOG_LINE_PREFIX = "ray_job_test_runtime_env_output:"

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Dashboard agent.")
parser.add_argument(
Expand Down Expand Up @@ -36,14 +39,20 @@ def f():

if args.conflict == "pip":
ray.init(runtime_env={"pip": ["numpy"]})
print(ray._private.worker.global_worker.runtime_env)
print(
RUNTIME_ENV_LOG_LINE_PREFIX + ray._private.worker.global_worker.runtime_env
)
elif args.conflict == "env_vars":
ray.init(runtime_env={"env_vars": {"A": "1"}})
print(ray._private.worker.global_worker.runtime_env)
print(
RUNTIME_ENV_LOG_LINE_PREFIX + ray._private.worker.global_worker.runtime_env
)
else:
ray.init(
runtime_env={
"env_vars": {"C": "1"},
}
)
print(ray._private.worker.global_worker.runtime_env)
print(
RUNTIME_ENV_LOG_LINE_PREFIX + ray._private.worker.global_worker.runtime_env
)
88 changes: 55 additions & 33 deletions dashboard/modules/job/tests/test_job_inheritance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from ray.dashboard.modules.job.tests.conftest import (
_driver_script_path,
)
from ray.dashboard.consts import RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR
from ray.dashboard.modules.job.tests.subprocess_driver_scripts.driver_runtime_env_inheritance import ( # noqa: E501
RUNTIME_ENV_LOG_LINE_PREFIX,
)


def wait_until_status(client, job_id, status_to_wait_for, timeout_seconds=20):
Expand All @@ -22,6 +26,29 @@ def wait_until_status(client, job_id, status_to_wait_for, timeout_seconds=20):
raise Exception


def wait(client, job_id):
wait_until_status(
client,
job_id,
{JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
timeout_seconds=60,
)


def get_runtime_env_from_logs(client, job_id):
wait(client, job_id)
logs = client.get_job_logs(job_id)
print(logs)
assert client.get_job_status(job_id) == JobStatus.SUCCEEDED
# Split logs by line, find the unique line that starts with
# RUNTIME_ENV_LOG_LINE_PREFIX, strip it and parse it as JSON.
lines = logs.strip().split("\n")
assert len(lines) > 0
for line in lines:
if line.startswith(RUNTIME_ENV_LOG_LINE_PREFIX):
return json.loads(line[len(RUNTIME_ENV_LOG_LINE_PREFIX) :])


def test_job_driver_inheritance():
try:
c = Cluster()
Expand All @@ -37,21 +64,6 @@ def test_job_driver_inheritance():
},
)

def wait(job_id):
wait_until_status(
client,
job_id,
{JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
timeout_seconds=60,
)

def get_runtime_env_from_logs(client, job_id):
wait(job_id)
logs = client.get_job_logs(job_id)
print(logs)
assert client.get_job_status(job_id) == JobStatus.SUCCEEDED
return json.loads(logs.strip().split("\n")[-1])

# Test key is merged
print("Test key merged")
runtime_env = get_runtime_env_from_logs(client, job_id)
Expand All @@ -70,7 +82,7 @@ def get_runtime_env_from_logs(client, job_id):
"env_vars": {"A": "1", "B": "2"},
},
)
wait(job_id)
wait(client, job_id)
logs = client.get_job_logs(job_id)
assert expected_str in logs

Expand All @@ -80,7 +92,7 @@ def get_runtime_env_from_logs(client, job_id):
entrypoint=f"python {driver_script_path} --conflict=pip",
runtime_env={"pip": ["numpy"]},
)
wait(job_id)
wait(client, job_id)
status = client.get_job_status(job_id)
logs = client.get_job_logs(job_id)
assert status == JobStatus.FAILED
Expand All @@ -94,7 +106,7 @@ def get_runtime_env_from_logs(client, job_id):
"env_vars": {"A": "1"},
},
)
wait(job_id)
wait(client, job_id)
status = client.get_job_status(job_id)
logs = client.get_job_logs(job_id)
assert status == JobStatus.FAILED
Expand All @@ -103,6 +115,31 @@ def get_runtime_env_from_logs(client, job_id):
c.shutdown()


@pytest.mark.parametrize("stream_runtime_env_log", ["1", "0"])
def test_runtime_env_logs_streamed_to_job_driver_log(
monkeypatch, stream_runtime_env_log
):
monkeypatch.setenv(
RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR, stream_runtime_env_log
)
try:
c = Cluster()
c.add_node(num_cpus=1)
client = JobSubmissionClient("http:https://127.0.0.1:8265")
job_id = client.submit_job(
entrypoint="echo hello world",
runtime_env={"pip": ["requests==2.25.1"]},
)
wait(client, job_id)
logs = client.get_job_logs(job_id)
if stream_runtime_env_log == "0":
assert "Creating virtualenv at" not in logs
else:
assert "Creating virtualenv at" in logs
finally:
c.shutdown()


def test_job_driver_inheritance_override(monkeypatch):
monkeypatch.setenv("RAY_OVERRIDE_JOB_RUNTIME_ENV", "1")

Expand All @@ -120,21 +157,6 @@ def test_job_driver_inheritance_override(monkeypatch):
},
)

def wait(job_id):
wait_until_status(
client,
job_id,
{JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
timeout_seconds=60,
)

def get_runtime_env_from_logs(client, job_id):
wait(job_id)
logs = client.get_job_logs(job_id)
print(logs)
assert client.get_job_status(job_id) == JobStatus.SUCCEEDED
return json.loads(logs.strip().split("\n")[-1])

# Test conflict resolution regular field
job_id = client.submit_job(
entrypoint=f"python {driver_script_path} --conflict=pip",
Expand Down
8 changes: 2 additions & 6 deletions dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,7 @@ async def test_multiple_runtime_envs(self, job_manager):
check_job_succeeded, job_manager=job_manager, job_id=job_id_1
)
logs = job_manager.get_job_logs(job_id_1)
assert (
"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_1_VAR'}}" in logs
) # noqa: E501
assert "'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_1_VAR'" in logs

job_id_2 = await job_manager.submit_job(
entrypoint=f"python {_driver_script_path('print_runtime_env.py')}",
Expand All @@ -585,9 +583,7 @@ async def test_multiple_runtime_envs(self, job_manager):
check_job_succeeded, job_manager=job_manager, job_id=job_id_2
)
logs = job_manager.get_job_logs(job_id_2)
assert (
"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_2_VAR'}}" in logs
) # noqa: E501
assert "'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_2_VAR'" in logs

async def test_failed_runtime_env_validation(self, job_manager):
"""Ensure job status is correctly set as failed if job has an invalid
Expand Down
37 changes: 21 additions & 16 deletions python/ray/_private/ray_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import threading
import time
from typing import Callable, Dict, List, Set, Tuple, Any, Optional
from typing import Callable, Dict, List, Set, Tuple, Any, Optional, Union, Iterable

import ray
from ray.experimental.tqdm_ray import RAY_TQDM_MAGIC
Expand Down Expand Up @@ -35,7 +35,7 @@ def setup_component_logger(
logging_level,
logging_format,
log_dir,
filename,
filename: Union[str, Iterable[str]],
max_bytes,
backup_count,
logger_name=None,
Expand All @@ -58,8 +58,8 @@ def setup_component_logger(
logging_format: Logging format string.
log_dir: Log directory path. If empty, logs will go to
stderr.
filename: Name of the file to write logs. If empty, logs will go
to stderr.
filename: A single filename or an iterable of filenames to write logs to.
If empty, logs will go to stderr.
max_bytes: Same argument as RotatingFileHandler's maxBytes.
backup_count: Same argument as RotatingFileHandler's backupCount.
logger_name: Used to create or get the correspoding
Expand All @@ -71,20 +71,25 @@ def setup_component_logger(
ray._private.log.clear_logger("ray")

logger = logging.getLogger(logger_name)
if type(logging_level) is str:
if isinstance(logging_level, str):
logging_level = logging.getLevelName(logging_level.upper())
if not filename or not log_dir:
handler = logging.StreamHandler()
else:
handler = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, filename),
maxBytes=max_bytes,
backupCount=backup_count,
)
handler.setLevel(logging_level)
logger.setLevel(logging_level)
handler.setFormatter(logging.Formatter(logging_format))
logger.addHandler(handler)

filenames = [filename] if isinstance(filename, str) else filename

for filename in filenames:
if not filename or not log_dir:
handler = logging.StreamHandler()
else:
handler = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, filename),
maxBytes=max_bytes,
backupCount=backup_count,
)
handler.setLevel(logging_level)
handler.setFormatter(logging.Formatter(logging_format))
logger.addHandler(handler)

logger.propagate = propagate
return logger

Expand Down
8 changes: 5 additions & 3 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ def delete_runtime_env():
else:
delete_runtime_env()

def get_or_create_logger(self, job_id: bytes):
def get_or_create_logger(self, job_id: bytes, log_files: List[str]):
job_id = job_id.decode()
if job_id not in self._per_job_logger_cache:
params = self._logging_params.copy()
params["filename"] = f"runtime_env_setup-{job_id}.log"
params["filename"] = [f"runtime_env_setup-{job_id}.log", *log_files]
params["logger_name"] = f"runtime_env_{job_id}"
params["propagate"] = False
per_job_logger = setup_component_logger(**params)
Expand All @@ -301,8 +301,10 @@ async def _setup_runtime_env(
allocated_resource: dict = json.loads(
serialized_allocated_resource_instances or "{}"
)
runtime_env_config = RuntimeEnvConfig.from_proto(request.runtime_env_config)
log_files = runtime_env_config.get("log_files", [])
# Use a separate logger for each job.
per_job_logger = self.get_or_create_logger(request.job_id)
per_job_logger = self.get_or_create_logger(request.job_id, log_files)
# TODO(chenk008): Add log about allocated_resource to
# avoid lint error. That will be moved to cgroup plugin.
per_job_logger.debug(f"Worker has resource :" f"{allocated_resource}")
Expand Down
Loading

0 comments on commit 01d975e

Please sign in to comment.