diff --git a/BUILD.bazel b/BUILD.bazel index 2d13c7d093baf..4959ee06e130f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1956,6 +1956,7 @@ filegroup( "//src/ray/protobuf:agent_manager_py_proto", "//src/ray/protobuf:common_py_proto", "//src/ray/protobuf:core_worker_py_proto", + "//src/ray/protobuf:event_py_proto", "//src/ray/protobuf:gcs_py_proto", "//src/ray/protobuf:gcs_service_py_proto", "//src/ray/protobuf:job_agent_py_proto", diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index b15e678911021..7b45281769ae7 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -45,6 +45,8 @@ class DataSource: job_actors = Dict() # {worker id(str): core worker stats} core_worker_stats = Dict() + # {job id hex(str): {event id(str): event dict}} + events = Dict() # {node ip (str): log entries by pid # (dict from pid to list of latest log entries)} ip_and_pid_to_logs = Dict() diff --git a/dashboard/modules/event/__init__.py b/dashboard/modules/event/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/dashboard/modules/event/event_agent.py b/dashboard/modules/event/event_agent.py new file mode 100644 index 0000000000000..bc4651777ac6b --- /dev/null +++ b/dashboard/modules/event/event_agent.py @@ -0,0 +1,90 @@ +import os +import asyncio +import logging +from typing import Union +from grpc.experimental import aio as aiogrpc + +import ray.new_dashboard.utils as dashboard_utils +import ray.new_dashboard.consts as dashboard_consts +from ray.ray_constants import env_bool +from ray.new_dashboard.utils import async_loop_forever, create_task +from ray.new_dashboard.modules.event import event_consts +from ray.new_dashboard.modules.event.event_utils import monitor_events +from ray.core.generated import event_pb2 +from ray.core.generated import event_pb2_grpc + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +@dashboard_utils.dashboard_module( + enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False)) +class EventAgent(dashboard_utils.DashboardAgentModule): + def __init__(self, dashboard_agent): + super().__init__(dashboard_agent) + self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events") + os.makedirs(self._event_dir, exist_ok=True) + self._monitor: Union[asyncio.Task, None] = None + self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None + self._cached_events = asyncio.Queue( + event_consts.EVENT_AGENT_CACHE_SIZE) + logger.info("Event agent cache buffer size: %s", + self._cached_events.maxsize) + + async def _connect_to_dashboard(self): + """ Connect to the dashboard. If the dashboard is not started, then + this method will never returns. + + Returns: + The ReportEventServiceStub object. + """ + while True: + try: + aioredis = self._dashboard_agent.aioredis_client + dashboard_rpc_address = await aioredis.get( + dashboard_consts.REDIS_KEY_DASHBOARD_RPC) + if dashboard_rpc_address: + logger.info("Report events to %s", dashboard_rpc_address) + options = (("grpc.enable_http_proxy", 0), ) + channel = aiogrpc.insecure_channel( + dashboard_rpc_address, options=options) + return event_pb2_grpc.ReportEventServiceStub(channel) + except Exception: + logger.exception("Connect to dashboard failed.") + await asyncio.sleep( + event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS) + + @async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS) + async def report_events(self): + """ Report events from cached events queue. Reconnect to dashboard if + report failed. Log error after retry EVENT_AGENT_RETRY_TIMES. + + This method will never returns. + """ + data = await self._cached_events.get() + for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES): + try: + logger.info("Report %s events.", len(data)) + request = event_pb2.ReportEventsRequest(event_strings=data) + await self._stub.ReportEvents(request) + break + except Exception: + logger.exception("Report event failed, reconnect to the " + "dashboard.") + self._stub = await self._connect_to_dashboard() + else: + data_str = str(data) + limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT + logger.error("Report event failed: %s", + data_str[:limit] + (data_str[limit:] and "...")) + + async def run(self, server): + # Connect to dashboard. + self._stub = await self._connect_to_dashboard() + # Start monitor task. + self._monitor = monitor_events( + self._event_dir, + lambda data: create_task(self._cached_events.put(data)), + source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES) + # Start reporting events. + await self.report_events() diff --git a/dashboard/modules/event/event_consts.py b/dashboard/modules/event/event_consts.py new file mode 100644 index 0000000000000..c20579e71246b --- /dev/null +++ b/dashboard/modules/event/event_consts.py @@ -0,0 +1,26 @@ +from ray.ray_constants import env_integer +from ray.core.generated import event_pb2 + +EVENT_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_EVENT" +LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000 +RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2 +# Monitor events +SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer( + "SCAN_EVENT_DIR_INTERVAL_SECONDS", 2) +SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60 +CONCURRENT_READ_LIMIT = 50 +EVENT_READ_LINE_COUNT_LIMIT = 200 +EVENT_READ_LINE_LENGTH_LIMIT = env_integer("EVENT_READ_LINE_LENGTH_LIMIT", + 2 * 1024 * 1024) # 2MB +# Report events +EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1 +EVENT_AGENT_RETRY_TIMES = 10 +EVENT_AGENT_CACHE_SIZE = 10240 +# Event sources +EVENT_HEAD_MONITOR_SOURCE_TYPES = [ + event_pb2.Event.SourceType.Name(event_pb2.Event.GCS) +] +EVENT_AGENT_MONITOR_SOURCE_TYPES = list( + set(event_pb2.Event.SourceType.keys()) - + set(EVENT_HEAD_MONITOR_SOURCE_TYPES)) +EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys() diff --git a/dashboard/modules/event/event_head.py b/dashboard/modules/event/event_head.py new file mode 100644 index 0000000000000..1e6eff0dfaae5 --- /dev/null +++ b/dashboard/modules/event/event_head.py @@ -0,0 +1,89 @@ +import os +import asyncio +import logging +from typing import Union +from collections import OrderedDict, defaultdict + +import aiohttp.web + +import ray.new_dashboard.utils as dashboard_utils +from ray.ray_constants import env_bool +from ray.new_dashboard.modules.event import event_consts +from ray.new_dashboard.modules.event.event_utils import ( + parse_event_strings, + monitor_events, +) +from ray.core.generated import event_pb2 +from ray.core.generated import event_pb2_grpc +from ray.new_dashboard.datacenter import DataSource + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + +JobEvents = OrderedDict +dashboard_utils._json_compatible_types.add(JobEvents) + + +@dashboard_utils.dashboard_module( + enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False)) +class EventHead(dashboard_utils.DashboardHeadModule, + event_pb2_grpc.ReportEventServiceServicer): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + self._event_dir = os.path.join(self._dashboard_head.log_dir, "events") + os.makedirs(self._event_dir, exist_ok=True) + self._monitor: Union[asyncio.Task, None] = None + + @staticmethod + def _update_events(event_list): + # {job_id: {event_id: event}} + all_job_events = defaultdict(JobEvents) + for event in event_list: + event_id = event["event_id"] + custom_fields = event.get("custom_fields") + system_event = False + if custom_fields: + job_id = custom_fields.get("job_id", "global") or "global" + else: + job_id = "global" + if system_event is False: + all_job_events[job_id][event_id] = event + # TODO(fyrestone): Limit the event count per job. + for job_id, new_job_events in all_job_events.items(): + job_events = DataSource.events.get(job_id, JobEvents()) + job_events.update(new_job_events) + DataSource.events[job_id] = job_events + + async def ReportEvents(self, request, context): + received_events = [] + if request.event_strings: + received_events.extend(parse_event_strings(request.event_strings)) + logger.info("Received %d events", len(received_events)) + self._update_events(received_events) + return event_pb2.ReportEventsReply(send_success=True) + + @routes.get("/events") + @dashboard_utils.aiohttp_cache(2) + async def get_event(self, req) -> aiohttp.web.Response: + job_id = req.query.get("job_id") + if job_id is None: + all_events = { + job_id: list(job_events.values()) + for job_id, job_events in DataSource.events.items() + } + return dashboard_utils.rest_response( + success=True, message="All events fetched.", events=all_events) + + job_events = DataSource.events.get(job_id, {}) + return dashboard_utils.rest_response( + success=True, + message="Job events fetched.", + job_id=job_id, + events=list(job_events.values())) + + async def run(self, server): + event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server) + self._monitor = monitor_events( + self._event_dir, + lambda data: self._update_events(parse_event_strings(data)), + source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES) diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index ec893fbef252f..a821942579e73 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -10,6 +10,13 @@ def enable_test_module(): os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None) +@pytest.fixture +def enable_event_module(): + os.environ["RAY_DASHBOARD_MODULE_EVENT"] = "true" + yield + os.environ.pop("RAY_DASHBOARD_MODULE_EVENT", None) + + @pytest.fixture def disable_aiohttp_cache(): os.environ["RAY_DASHBOARD_NO_CACHE"] = "true" @@ -38,3 +45,10 @@ def set_http_proxy(): os.environ["https_proxy"] = https_proxy else: del os.environ["https_proxy"] + + +@pytest.fixture +def small_event_line_limit(): + os.environ["EVENT_READ_LINE_LENGTH_LIMIT"] = "1024" + yield 1024 + os.environ.pop("EVENT_READ_LINE_LENGTH_LIMIT", None) diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index 3e639c5a09835..c31f2d6755e35 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -145,6 +145,11 @@ cc_proto_library( deps = [":event_proto"], ) +python_grpc_compile( + name = "event_py_proto", + deps = [":event_proto"], +) + # Job agent. proto_library( name = "job_agent_proto", diff --git a/src/ray/protobuf/event.proto b/src/ray/protobuf/event.proto index 7dfd61c270fc7..2edc202776f6b 100644 --- a/src/ray/protobuf/event.proto +++ b/src/ray/protobuf/event.proto @@ -40,3 +40,15 @@ message Event { // store custom key such as node_id, job_id, task_id map custom_fields = 9; } + +message ReportEventsReply { + bool send_success = 1; +} + +message ReportEventsRequest { + repeated string event_strings = 1; +} + +service ReportEventService { + rpc ReportEvents(ReportEventsRequest) returns (ReportEventsReply); +}