Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dashboard][event] Basic event module #16283

Merged
merged 6 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ filegroup(
"//src/ray/protobuf:agent_manager_py_proto",
"//src/ray/protobuf:common_py_proto",
"//src/ray/protobuf:core_worker_py_proto",
"//src/ray/protobuf:event_py_proto",
"//src/ray/protobuf:gcs_py_proto",
"//src/ray/protobuf:gcs_service_py_proto",
"//src/ray/protobuf:job_agent_py_proto",
Expand Down
2 changes: 2 additions & 0 deletions dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class DataSource:
job_actors = Dict()
# {worker id(str): core worker stats}
core_worker_stats = Dict()
# {job id hex(str): {event id(str): event dict}}
events = Dict()
# {node ip (str): log entries by pid
# (dict from pid to list of latest log entries)}
ip_and_pid_to_logs = Dict()
Expand Down
Empty file.
87 changes: 87 additions & 0 deletions dashboard/modules/event/event_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import asyncio
import logging
from typing import Union
from grpc.experimental import aio as aiogrpc

import ray.new_dashboard.utils as dashboard_utils
import ray.new_dashboard.consts as dashboard_consts
from ray.new_dashboard.utils import async_loop_forever, create_task
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.modules.event.event_utils import monitor_events
from ray.core.generated import event_pb2
from ray.core.generated import event_pb2_grpc

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable


class EventAgent(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)
self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events")
os.makedirs(self._event_dir, exist_ok=True)
self._monitor: Union[asyncio.Task, None] = None
self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None
self._cached_events = asyncio.Queue(
event_consts.EVENT_AGENT_CACHE_SIZE)
logger.info("Event agent cache buffer size: %s",
self._cached_events.maxsize)

async def _connect_to_dashboard(self):
""" Connect to the dashboard. If the dashboard is not started, then
this method will never returns.

Returns:
The ReportEventServiceStub object.
"""
while True:
try:
aioredis = self._dashboard_agent.aioredis_client
dashboard_rpc_address = await aioredis.get(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC)
if dashboard_rpc_address:
logger.info("Report events to %s", dashboard_rpc_address)
options = (("grpc.enable_http_proxy", 0), )
channel = aiogrpc.insecure_channel(
dashboard_rpc_address, options=options)
return event_pb2_grpc.ReportEventServiceStub(channel)
except Exception:
logger.exception("Connect to dashboard failed.")
await asyncio.sleep(
event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS)

@async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS)
async def report_events(self):
""" Report events from cached events queue. Reconnect to dashboard if
report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.

This method will never returns.
"""
data = await self._cached_events.get()
for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES):
try:
logger.info("Report %s events.", len(data))
request = event_pb2.ReportEventsRequest(event_strings=data)
await self._stub.ReportEvents(request)
break
except Exception:
logger.exception("Report event failed, reconnect to the "
"dashboard.")
self._stub = await self._connect_to_dashboard()
else:
data_str = str(data)
limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT
logger.error("Report event failed: %s",
data_str[:limit] + (data_str[limit:] and "..."))

async def run(self, server):
# Connect to dashboard.
self._stub = await self._connect_to_dashboard()
# Start monitor task.
self._monitor = monitor_events(
self._event_dir,
lambda data: create_task(self._cached_events.put(data)),
source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES)
# Start reporting events.
await self.report_events()
24 changes: 24 additions & 0 deletions dashboard/modules/event/event_consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from ray.ray_constants import env_integer
from ray.core.generated import event_pb2

LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000
RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2
# Monitor events
SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer(
"SCAN_EVENT_DIR_INTERVAL_SECONDS", 2)
SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60
CONCURRENT_READ_LIMIT = 50
READ_LINE_COUNT_LIMIT = 200
READ_LINE_LENGTH_LIMIT = 2 * 1024 * 1024 # 2MB
# Report events
EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1
EVENT_AGENT_RETRY_TIMES = 10
EVENT_AGENT_CACHE_SIZE = 10240
# Event sources
EVENT_HEAD_MONITOR_SOURCE_TYPES = [
event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)
]
EVENT_AGENT_MONITOR_SOURCE_TYPES = list(
set(event_pb2.Event.SourceType.keys()) -
set(EVENT_HEAD_MONITOR_SOURCE_TYPES))
EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys()
86 changes: 86 additions & 0 deletions dashboard/modules/event/event_head.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import asyncio
import logging
from typing import Union
from collections import OrderedDict, defaultdict

import aiohttp.web

import ray.new_dashboard.utils as dashboard_utils
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.modules.event.event_utils import (
parse_event_strings,
monitor_events,
)
from ray.core.generated import event_pb2
from ray.core.generated import event_pb2_grpc
from ray.new_dashboard.datacenter import DataSource

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable

JobEvents = OrderedDict
dashboard_utils._json_compatible_types.add(JobEvents)


class EventHead(dashboard_utils.DashboardHeadModule,
event_pb2_grpc.ReportEventServiceServicer):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
self._event_dir = os.path.join(self._dashboard_head.log_dir, "events")
os.makedirs(self._event_dir, exist_ok=True)
self._monitor: Union[asyncio.Task, None] = None

@staticmethod
def _update_events(event_list):
# {job_id: {event_id: event}}
all_job_events = defaultdict(JobEvents)
for event in event_list:
event_id = event["event_id"]
custom_fields = event.get("custom_fields")
system_event = False
if custom_fields:
job_id = custom_fields.get("job_id", "global") or "global"
else:
job_id = "global"
if system_event is False:
all_job_events[job_id][event_id] = event
# TODO(fyrestone): Limit the event count per job.
for job_id, new_job_events in all_job_events.items():
job_events = DataSource.events.get(job_id, JobEvents())
job_events.update(new_job_events)
DataSource.events[job_id] = job_events

async def ReportEvents(self, request, context):
received_events = []
if request.event_strings:
received_events.extend(parse_event_strings(request.event_strings))
logger.info("Received %d events", len(received_events))
self._update_events(received_events)
return event_pb2.ReportEventsReply(send_success=True)

@routes.get("/events")
@dashboard_utils.aiohttp_cache(2)
async def get_event(self, req) -> aiohttp.web.Response:
job_id = req.query.get("job_id")
if job_id is None:
all_events = {
job_id: list(job_events.values())
for job_id, job_events in DataSource.events.items()
}
return dashboard_utils.rest_response(
success=True, message="All events fetched.", events=all_events)

job_events = DataSource.events.get(job_id, {})
return dashboard_utils.rest_response(
success=True,
message="Job events fetched.",
job_id=job_id,
events=list(job_events.values()))

async def run(self, server):
event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server)
self._monitor = monitor_events(
self._event_dir,
lambda data: self._update_events(parse_event_strings(data)),
source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES)
Loading