Skip to content

Commit

Permalink
[Observability] Support ray cluster events (ray-project#28573)
Browse files Browse the repository at this point in the history
This PR supports the cluster events.

After merging this PR, cluster events will be available at 1. ray list cluster-events. 2. Dashboard event tab

This also implements cluster events on existing Job submission + autoscaler events.

Note that most of design follows https://docs.google.com/document/d/1eGHp9FkrLxrwW0tNjCqvvcGVEBQ5Us3jrnxlqJOhiIY/edit#, but the intermediate layer is dashboard agent, and the aggregator is the dashboard (instead of log_monitor + GCS which is in the design doc). I will update the design doc accordingly. Note that agent + dashboard was "already implemented".

I also found some of events (e.g., events from global_event_system) cannot be reported with the same mechanism because they are happening outside the ray cluster (e.g., from CLI). I will discuss with Alex to find the right solution here.

The following is not implemented in this PR

GC old events. It will store all historical events in the dashboard memory right now.
Statelessness. Dashboard is supposed to write events to a log file to make dashboard stateless, but it is not implemented here. We can revisit when we improve the dashboard reliability.
Actual events specified from the PRD (it only prototypes with existing autoscaler events + some new job events).
Schema. We can revisit this soon.
  • Loading branch information
rkooo567 committed Oct 7, 2022
1 parent 4014207 commit dff18fd
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 11 deletions.
4 changes: 1 addition & 3 deletions dashboard/modules/event/event_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ async def run(self, server):
self._stub = await self._connect_to_dashboard()
# Start monitor task.
self._monitor = monitor_events(
self._event_dir,
lambda data: create_task(self._cached_events.put(data)),
source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES,
self._event_dir, lambda data: create_task(self._cached_events.put(data))
)
# Start reporting events.
await self.report_events()
Expand Down
4 changes: 0 additions & 4 deletions dashboard/modules/event/event_consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,4 @@
EVENT_AGENT_RETRY_TIMES = 10
EVENT_AGENT_CACHE_SIZE = 10240
# Event sources
EVENT_HEAD_MONITOR_SOURCE_TYPES = [event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)]
EVENT_AGENT_MONITOR_SOURCE_TYPES = list(
set(event_pb2.Event.SourceType.keys()) - set(EVENT_HEAD_MONITOR_SOURCE_TYPES)
)
EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys()
5 changes: 1 addition & 4 deletions dashboard/modules/event/event_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import ray.dashboard.utils as dashboard_utils
import ray.dashboard.optional_utils as dashboard_optional_utils
from ray.dashboard.modules.event import event_consts
from ray.dashboard.modules.event.event_utils import (
parse_event_strings,
monitor_events,
Expand Down Expand Up @@ -85,9 +84,7 @@ async def get_event(self, req) -> aiohttp.web.Response:
async def run(self, server):
event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server)
self._monitor = monitor_events(
self._event_dir,
lambda data: self._update_events(parse_event_strings(data)),
source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES,
self._event_dir, lambda data: self._update_events(parse_event_strings(data))
)

@staticmethod
Expand Down
63 changes: 63 additions & 0 deletions dashboard/modules/event/tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import asyncio
import random
import tempfile
import socket

import pytest
import numpy as np

import ray
from ray._private.utils import binary_to_hex
from ray._private.event.event_logger import get_event_logger
from ray.dashboard.tests.conftest import * # noqa
from ray.dashboard.modules.event import event_consts
from ray.core.generated import event_pb2
Expand Down Expand Up @@ -66,6 +68,35 @@ def _test_logger(name, log_file, max_bytes, backup_count):
return logger


def test_python_global_event_logger(tmp_path):
logger = get_event_logger(event_pb2.Event.SourceType.GCS, str(tmp_path))
logger.set_global_context({"test_meta": "1"})
logger.info("message", a="a", b="b")
logger.error("message", a="a", b="b")
logger.warning("message", a="a", b="b")
logger.fatal("message", a="a", b="b")
event_dir = tmp_path / "events"
assert event_dir.exists()
event_file = event_dir / "event_GCS.log"
assert event_file.exists()

line_severities = ["INFO", "ERROR", "WARNING", "FATAL"]

with event_file.open() as f:
for line, severity in zip(f.readlines(), line_severities):
data = json.loads(line)
assert data["severity"] == severity
assert data["label"] == ""
assert "timestamp" in data
assert len(data["event_id"]) == 36
assert data["message"] == "message"
assert data["source_type"] == "GCS"
assert data["source_hostname"] == socket.gethostname()
assert data["source_pid"] == os.getpid()
assert data["custom_fields"]["a"] == "a"
assert data["custom_fields"]["b"] == "b"


def test_event_basic(disable_aiohttp_cache, ray_start_with_dashboard):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"])
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
Expand Down Expand Up @@ -257,5 +288,37 @@ async def _check_events(expect_events, read_events, timeout=10):
assert len(os.listdir(temp_dir)) > 1, "Event log should have rollovers."


# TODO(sang): Enable it.
# def test_autoscaler_cluster_events(shutdown_only):
# ray.init()

# @ray.remote(num_gpus=1)
# def f():
# pass

# f.remote()

# wait_for_condition(lambda: len(list_cluster_events()) == 1)
# infeasible_event = list_cluster_events()[0]
# assert infeasible_event["source_type"] == "AUTOSCALER"


# def test_jobs_cluster_events(shutdown_only):
# ray.init()
# address = ray._private.worker._global_node.webui_url
# address = format_web_url(address)
# client = JobSubmissionClient(address)
# client.submit_job(entrypoint="ls")

# def verify():
# assert len(list_cluster_events()) == 3
# for e in list_cluster_events():
# e["source_type"] = "JOBS"
# return True

# wait_for_condition(verify)
# print(list_cluster_events())


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
7 changes: 7 additions & 0 deletions dashboard/modules/state/state_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ async def list_objects(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
async def list_runtime_envs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
return await self._handle_list_api(self._state_api.list_runtime_envs, req)

@routes.get("/api/v0/cluster_events")
@RateLimitedModule.enforce_max_concurrent_calls
async def list_cluster_events(
self, req: aiohttp.web.Request
) -> aiohttp.web.Response:
return await self._handle_list_api(self._state_api.list_cluster_events, req)

@routes.get("/api/v0/logs")
@RateLimitedModule.enforce_max_concurrent_calls
async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
Expand Down
31 changes: 31 additions & 0 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dataclasses import asdict, fields
from itertools import islice
from typing import List, Tuple
from datetime import datetime

from ray._private.ray_constants import env_integer

Expand All @@ -30,6 +31,7 @@
StateSummary,
ActorSummaries,
ObjectSummaries,
ClusterEventState,
filter_fields,
PredicateType,
)
Expand Down Expand Up @@ -595,6 +597,35 @@ def sort_func(entry):
num_filtered=num_filtered,
)

async def list_cluster_events(self, *, option: ListApiOptions) -> ListApiResponse:
"""List all cluster events from the cluster.
Returns:
A list of cluster events in the cluster.
The schema of returned "dict" is equivalent to the
`ClusterEventState` protobuf message.
"""
result = []
all_events = await self._client.get_all_cluster_events()
for _, events in all_events.items():
for _, event in events.items():
event["time"] = str(datetime.utcfromtimestamp(int(event["timestamp"])))
result.append(event)

num_after_truncation = len(result)
result.sort(key=lambda entry: entry["timestamp"])
total = len(result)
result = self._filter(result, option.filters, ClusterEventState, option.detail)
num_filtered = len(result)
# Sort to make the output deterministic.
result = list(islice(result, option.limit))
return ListApiResponse(
result=result,
total=total,
num_after_truncation=num_after_truncation,
num_filtered=num_filtered,
)

async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse:
# For summary, try getting as many entries as possible to minimze data loss.
result = await self.list_tasks(
Expand Down
140 changes: 140 additions & 0 deletions python/ray/_private/event/event_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import logging
import pathlib
import json
import random
import string
import socket
import os
import threading

from typing import Dict
from datetime import datetime

from google.protobuf.json_format import MessageToDict

from ray.core.generated.event_pb2 import Event


def get_event_id():
return "".join([random.choice(string.hexdigits) for _ in range(36)])


class EventLoggerAdapter:
def __init__(self, source: Event.SourceType, logger: logging.Logger):
"""Adapter for the Python logger that's used to emit events.
When events are emitted, they are aggregated and available via
state API and dashboard.
This class is thread-safe.
"""
self.logger = logger
# Aligned with `event.proto`'s `message Event``
self.source = source
self.source_hostname = socket.gethostname()
self.source_pid = os.getpid()

# The below fields must be protected by this lock.
self.lock = threading.Lock()
# {str -> str} typed dict
self.global_context = {}

def set_global_context(self, global_context: Dict[str, str] = None):
"""Set the global metadata.
This method overwrites the global metadata if it is called more than once.
"""
with self.lock:
self.global_context = {} if not global_context else global_context

def info(self, message: str, **kwargs):
self._emit(Event.Severity.INFO, message, **kwargs)

def warning(self, message: str, **kwargs):
self._emit(Event.Severity.WARNING, message, **kwargs)

def error(self, message: str, **kwargs):
self._emit(Event.Severity.ERROR, message, **kwargs)

def fatal(self, message: str, **kwargs):
self._emit(Event.Severity.FATAL, message, **kwargs)

def _emit(self, severity: Event.Severity, message: str, **kwargs):
# NOTE: Python logger is thread-safe,
# so we don't need to protect it using locks.
event = Event()
event.event_id = get_event_id()
event.timestamp = int(datetime.now().timestamp())
event.message = message
event.severity = severity
# TODO(sang): Support event type & schema.
event.label = ""
event.source_type = self.source
event.source_hostname = self.source_hostname
event.source_pid = self.source_pid
custom_fields = event.custom_fields
with self.lock:
for k, v in self.global_context.items():
if v is not None and k is not None:
custom_fields[k] = v
for k, v in kwargs.items():
if v is not None and k is not None:
custom_fields[k] = v

self.logger.info(
json.dumps(
MessageToDict(
event,
including_default_value_fields=True,
preserving_proto_field_name=True,
use_integers_for_enums=False,
)
)
)

# Force flush so that we won't lose events
self.logger.handlers[0].flush()


def _build_event_file_logger(source: Event.SourceType, sink_dir: str):
logger = logging.getLogger("_ray_event_logger")
logger.setLevel(logging.INFO)
dir_path = pathlib.Path(sink_dir) / "events"
filepath = dir_path / f"event_{source}.log"
dir_path.mkdir(exist_ok=True)
filepath.touch(exist_ok=True)
# Configure the logger.
handler = logging.FileHandler(filepath)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
return logger


# This lock must be used when accessing or updating global event logger dict.
_event_logger_lock = threading.Lock()
_event_logger = {}


def get_event_logger(source: Event.SourceType, sink_dir: str):
"""Get the event logger of the current process.
There's only 1 event logger per (process, source).
TODO(sang): Support more impl than file-based logging.
Currently, the interface also ties to the
file-based logging impl.
Args:
source: The source of the event.
sink_dir: The directory to sink event logs.
"""
with _event_logger_lock:
global _event_logger
source_name = Event.SourceType.Name(source)
if source_name not in _event_logger:
logger = _build_event_file_logger(source_name, sink_dir)
_event_logger[source_name] = EventLoggerAdapter(source, logger)

return _event_logger[source_name]
1 change: 1 addition & 0 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def _run(self):
ray_constants.LOG_PREFIX_EVENT_SUMMARY, line
)
)

self.event_summarizer.clear()

as_json = json.dumps(status)
Expand Down
19 changes: 19 additions & 0 deletions python/ray/experimental/state/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,25 @@ def list_runtime_envs(
)


def list_cluster_events(
address: Optional[str] = None,
filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
limit: int = DEFAULT_LIMIT,
timeout: int = DEFAULT_RPC_TIMEOUT,
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
) -> List[Dict]:
return StateApiClient(address=address).list(
StateResource.CLUSTER_EVENTS,
options=ListApiOptions(
limit=limit, timeout=timeout, filters=filters, detail=detail
),
raise_on_missing_output=raise_on_missing_output,
_explain=_explain,
)


"""
Log APIs
"""
Expand Down
Loading

0 comments on commit dff18fd

Please sign in to comment.