diff --git a/BUILD.bazel b/BUILD.bazel index 33558f8ccb46c..8e96245cf5043 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1892,6 +1892,7 @@ filegroup( "//src/ray/protobuf:agent_manager_py_proto", "//src/ray/protobuf:common_py_proto", "//src/ray/protobuf:core_worker_py_proto", + "//src/ray/protobuf:event_py_proto", "//src/ray/protobuf:gcs_py_proto", "//src/ray/protobuf:gcs_service_py_proto", "//src/ray/protobuf:job_agent_py_proto", diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index b15e678911021..7b45281769ae7 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -45,6 +45,8 @@ class DataSource: job_actors = Dict() # {worker id(str): core worker stats} core_worker_stats = Dict() + # {job id hex(str): {event id(str): event dict}} + events = Dict() # {node ip (str): log entries by pid # (dict from pid to list of latest log entries)} ip_and_pid_to_logs = Dict() diff --git a/dashboard/modules/event/__init__.py b/dashboard/modules/event/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/dashboard/modules/event/event_agent.py b/dashboard/modules/event/event_agent.py new file mode 100644 index 0000000000000..52f6accf334dd --- /dev/null +++ b/dashboard/modules/event/event_agent.py @@ -0,0 +1,87 @@ +import os +import asyncio +import logging +from typing import Union +from grpc.experimental import aio as aiogrpc + +import ray.new_dashboard.utils as dashboard_utils +import ray.new_dashboard.consts as dashboard_consts +from ray.new_dashboard.utils import async_loop_forever, create_task +from ray.new_dashboard.modules.event import event_consts +from ray.new_dashboard.modules.event.event_utils import monitor_events +from ray.core.generated import event_pb2 +from ray.core.generated import event_pb2_grpc + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +class EventAgent(dashboard_utils.DashboardAgentModule): + def __init__(self, dashboard_agent): + super().__init__(dashboard_agent) + self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events") + os.makedirs(self._event_dir, exist_ok=True) + self._monitor: Union[asyncio.Task, None] = None + self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None + self._cached_events = asyncio.Queue( + event_consts.EVENT_AGENT_CACHE_SIZE) + logger.info("Event agent cache buffer size: %s", + self._cached_events.maxsize) + + async def _connect_to_dashboard(self): + """ Connect to the dashboard. If the dashboard is not started, then + this method will never returns. + + Returns: + The ReportEventServiceStub object. + """ + while True: + try: + aioredis = self._dashboard_agent.aioredis_client + dashboard_rpc_address = await aioredis.get( + dashboard_consts.REDIS_KEY_DASHBOARD_RPC) + if dashboard_rpc_address: + logger.info("Report events to %s", dashboard_rpc_address) + options = (("grpc.enable_http_proxy", 0), ) + channel = aiogrpc.insecure_channel( + dashboard_rpc_address, options=options) + return event_pb2_grpc.ReportEventServiceStub(channel) + except Exception: + logger.exception("Connect to dashboard failed.") + await asyncio.sleep( + event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS) + + @async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS) + async def report_events(self): + """ Report events from cached events queue. Reconnect to dashboard if + report failed. Log error after retry EVENT_AGENT_RETRY_TIMES. + + This method will never returns. + """ + data = await self._cached_events.get() + for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES): + try: + logger.info("Report %s events.", len(data)) + request = event_pb2.ReportEventsRequest(event_strings=data) + await self._stub.ReportEvents(request) + break + except Exception: + logger.exception("Report event failed, reconnect to the " + "dashboard.") + self._stub = await self._connect_to_dashboard() + else: + data_str = str(data) + limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT + logger.error("Report event failed: %s", + data_str[:limit] + (data_str[limit:] and "...")) + + async def run(self, server): + # Connect to dashboard. + self._stub = await self._connect_to_dashboard() + # Start monitor task. + self._monitor = monitor_events( + self._event_dir, + lambda data: create_task(self._cached_events.put(data)), + source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES) + # Start reporting events. + await self.report_events() diff --git a/dashboard/modules/event/event_consts.py b/dashboard/modules/event/event_consts.py new file mode 100644 index 0000000000000..0f0014935d171 --- /dev/null +++ b/dashboard/modules/event/event_consts.py @@ -0,0 +1,24 @@ +from ray.ray_constants import env_integer +from ray.core.generated import event_pb2 + +LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000 +RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2 +# Monitor events +SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer( + "SCAN_EVENT_DIR_INTERVAL_SECONDS", 2) +SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60 +CONCURRENT_READ_LIMIT = 50 +READ_LINE_COUNT_LIMIT = 200 +READ_LINE_LENGTH_LIMIT = 2 * 1024 * 1024 # 2MB +# Report events +EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1 +EVENT_AGENT_RETRY_TIMES = 10 +EVENT_AGENT_CACHE_SIZE = 10240 +# Event sources +EVENT_HEAD_MONITOR_SOURCE_TYPES = [ + event_pb2.Event.SourceType.Name(event_pb2.Event.GCS) +] +EVENT_AGENT_MONITOR_SOURCE_TYPES = list( + set(event_pb2.Event.SourceType.keys()) - + set(EVENT_HEAD_MONITOR_SOURCE_TYPES)) +EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys() diff --git a/dashboard/modules/event/event_head.py b/dashboard/modules/event/event_head.py new file mode 100644 index 0000000000000..f8a12754b7f93 --- /dev/null +++ b/dashboard/modules/event/event_head.py @@ -0,0 +1,86 @@ +import os +import asyncio +import logging +from typing import Union +from collections import OrderedDict, defaultdict + +import aiohttp.web + +import ray.new_dashboard.utils as dashboard_utils +from ray.new_dashboard.modules.event import event_consts +from ray.new_dashboard.modules.event.event_utils import ( + parse_event_strings, + monitor_events, +) +from ray.core.generated import event_pb2 +from ray.core.generated import event_pb2_grpc +from ray.new_dashboard.datacenter import DataSource + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + +JobEvents = OrderedDict +dashboard_utils._json_compatible_types.add(JobEvents) + + +class EventHead(dashboard_utils.DashboardHeadModule, + event_pb2_grpc.ReportEventServiceServicer): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + self._event_dir = os.path.join(self._dashboard_head.log_dir, "events") + os.makedirs(self._event_dir, exist_ok=True) + self._monitor: Union[asyncio.Task, None] = None + + @staticmethod + def _update_events(event_list): + # {job_id: {event_id: event}} + all_job_events = defaultdict(JobEvents) + for event in event_list: + event_id = event["event_id"] + custom_fields = event.get("custom_fields") + system_event = False + if custom_fields: + job_id = custom_fields.get("job_id", "global") or "global" + else: + job_id = "global" + if system_event is False: + all_job_events[job_id][event_id] = event + # TODO(fyrestone): Limit the event count per job. + for job_id, new_job_events in all_job_events.items(): + job_events = DataSource.events.get(job_id, JobEvents()) + job_events.update(new_job_events) + DataSource.events[job_id] = job_events + + async def ReportEvents(self, request, context): + received_events = [] + if request.event_strings: + received_events.extend(parse_event_strings(request.event_strings)) + logger.info("Received %d events", len(received_events)) + self._update_events(received_events) + return event_pb2.ReportEventsReply(send_success=True) + + @routes.get("/events") + @dashboard_utils.aiohttp_cache(2) + async def get_event(self, req) -> aiohttp.web.Response: + job_id = req.query.get("job_id") + if job_id is None: + all_events = { + job_id: list(job_events.values()) + for job_id, job_events in DataSource.events.items() + } + return dashboard_utils.rest_response( + success=True, message="All events fetched.", events=all_events) + + job_events = DataSource.events.get(job_id, {}) + return dashboard_utils.rest_response( + success=True, + message="Job events fetched.", + job_id=job_id, + events=list(job_events.values())) + + async def run(self, server): + event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server) + self._monitor = monitor_events( + self._event_dir, + lambda data: self._update_events(parse_event_strings(data)), + source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES) diff --git a/dashboard/modules/event/event_utils.py b/dashboard/modules/event/event_utils.py new file mode 100644 index 0000000000000..7db4c4043408e --- /dev/null +++ b/dashboard/modules/event/event_utils.py @@ -0,0 +1,190 @@ +import os +import time +import mmap +import json +import fnmatch +import asyncio +import itertools +import collections +import logging.handlers + +from ray.new_dashboard.modules.event import event_consts +from ray.new_dashboard.utils import async_loop_forever, create_task + +logger = logging.getLogger(__name__) + + +def _get_source_files(event_dir, source_types=None, event_file_filter=None): + event_log_names = os.listdir(event_dir) + source_files = {} + all_source_types = set(event_consts.EVENT_SOURCE_ALL) + for source_type in source_types or event_consts.EVENT_SOURCE_ALL: + assert source_type in all_source_types, \ + f"Invalid source type: {source_type}" + files = [] + for n in event_log_names: + if fnmatch.fnmatch(n, f"*{source_type}*"): + f = os.path.join(event_dir, n) + if event_file_filter is not None and not event_file_filter(f): + continue + files.append(f) + if files: + source_files[source_type] = files + return source_files + + +def _restore_newline(event_dict): + try: + event_dict["message"] = event_dict["message"]\ + .replace("\\n", "\n")\ + .replace("\\r", "\n") + except Exception: + logger.exception("Restore newline for event failed: %s", event_dict) + return event_dict + + +def _parse_line(event_str): + return _restore_newline(json.loads(event_str)) + + +def parse_event_strings(event_string_list): + events = [] + for data in event_string_list: + if not data: + continue + try: + event = _parse_line(data) + events.append(event) + except Exception: + logger.exception("Parse event line failed: %s", repr(data)) + return events + + +ReadFileResult = collections.namedtuple( + "ReadFileResult", ["fid", "size", "mtime", "position", "lines"]) + + +def _read_file(file, + pos, + n_lines=event_consts.READ_LINE_COUNT_LIMIT, + closefd=True): + with open(file, "rb", closefd=closefd) as f: + # The ino may be 0 on Windows. + stat = os.stat(f.fileno()) + fid = stat.st_ino or file + lines = [] + with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: + start = pos + for _ in range(n_lines): + sep = mm.find(b"\n", start) + if sep == -1: + break + if sep - start <= event_consts.READ_LINE_LENGTH_LIMIT: + lines.append(mm[start:sep].decode("utf-8")) + else: + truncated_size = min(100, + event_consts.READ_LINE_LENGTH_LIMIT) + logger.warning( + "Ignored long string: %s...(%s chars)", + mm[start:start + truncated_size].decode("utf-8"), + sep - start) + start = sep + 1 + return ReadFileResult(fid, stat.st_size, stat.st_mtime, start, lines) + + +def monitor_events( + event_dir, + callback, + scan_interval_seconds=event_consts.SCAN_EVENT_DIR_INTERVAL_SECONDS, + start_mtime=time.time() + event_consts.SCAN_EVENT_START_OFFSET_SECONDS, + monitor_files=None, + source_types=None): + """ Monitor events in directory. New events will be read and passed to the + callback. + + Args: + event_dir (str): The event log directory. + callback (def callback(List[str]): pass): A callback accepts a list of + event strings. + scan_interval_seconds (float): An interval seconds between two scans. + start_mtime (float): Only the event log files whose last modification + time is greater than start_mtime are monitored. + monitor_files (Dict[int, MonitorFile]): The map from event log file id + to MonitorFile object. Monitor all files start from the beginning + if the value is None. + source_types (List[str]): A list of source type name from + event_pb2.Event.SourceType.keys(). Monitor all source types if the + value is None. + """ + loop = asyncio.get_event_loop() + if monitor_files is None: + monitor_files = {} + + logger.info( + "Monitor events logs modified after %s on %s, " + "the source types are %s.", start_mtime, event_dir, "all" + if source_types is None else source_types) + + MonitorFile = collections.namedtuple("MonitorFile", + ["size", "mtime", "position"]) + + def _source_file_filter(source_file): + stat = os.stat(source_file) + return stat.st_mtime > start_mtime + + def _read_monitor_file(file, pos): + assert isinstance(file, str), \ + f"File should be a str, but a {type(file)}({file}) found" + fd = os.open(file, os.O_RDONLY) + try: + stat = os.stat(fd) + # Check the file size to avoid raising the exception + # ValueError: cannot mmap an empty file + if stat.st_size <= 0: + return [] + fid = stat.st_ino or file + monitor_file = monitor_files.get(fid) + if monitor_file: + if (monitor_file.position == monitor_file.size + and monitor_file.size == stat.st_size + and monitor_file.mtime == stat.st_mtime): + logger.debug( + "Skip reading the file because " + "there is no change: %s", file) + return [] + position = monitor_file.position + else: + logger.info("Found new event log file: %s", file) + position = pos + # Close the fd in finally. + r = _read_file(fd, position, closefd=False) + # It should be fine to update the dict in executor thread. + monitor_files[r.fid] = MonitorFile(r.size, r.mtime, r.position) + loop.call_soon_threadsafe(callback, r.lines) + except Exception as e: + raise Exception(f"Read event file failed: {file}") from e + finally: + os.close(fd) + + @async_loop_forever(scan_interval_seconds) + async def _scan_event_log_files(): + # Scan event files. + source_files = await loop.run_in_executor(None, _get_source_files, + event_dir, source_types, + _source_file_filter) + + # Limit concurrent read to avoid fd exhaustion. + semaphore = asyncio.Semaphore(event_consts.CONCURRENT_READ_LIMIT) + + async def _concurrent_coro(filename): + async with semaphore: + return await loop.run_in_executor(None, _read_monitor_file, + filename, 0) + + # Read files. + await asyncio.gather(*[ + _concurrent_coro(filename) + for filename in list(itertools.chain(*source_files.values())) + ]) + + return create_task(_scan_event_log_files()) diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py new file mode 100644 index 0000000000000..9ebc7247dc6c3 --- /dev/null +++ b/dashboard/modules/event/tests/test_event.py @@ -0,0 +1,254 @@ +import os +import sys +import time +import json +import copy +import logging +import requests +import asyncio +import random +import tempfile + +import pytest +import numpy as np + +import ray +from ray._private.utils import binary_to_hex +from ray.new_dashboard.tests.conftest import * # noqa +from ray.new_dashboard.modules.event import event_consts +from ray.core.generated import event_pb2 +from ray.test_utils import ( + format_web_url, + wait_until_server_available, + wait_for_condition, +) +from ray.new_dashboard.modules.event.event_utils import ( + monitor_events, ) + +logger = logging.getLogger(__name__) + + +def _get_event(msg="empty message", job_id=None, source_type=None): + return { + "event_id": binary_to_hex(np.random.bytes(18)), + "source_type": random.choice(event_pb2.Event.SourceType.keys()) + if source_type is None else source_type, + "host_name": "po-dev.inc.alipay.net", + "pid": random.randint(1, 65536), + "label": "", + "message": msg, + "time_stamp": time.time(), + "severity": "INFO", + "custom_fields": { + "job_id": ray.JobID.from_int(random.randint(1, 100)).hex() + if job_id is None else job_id, + "node_id": "", + "task_id": "", + } + } + + +def _test_logger(name, log_file, max_bytes, backup_count): + handler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=max_bytes, backupCount=backup_count) + formatter = logging.Formatter("%(message)s") + handler.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.propagate = False + logger.setLevel(logging.INFO) + logger.addHandler(handler) + + return logger + + +def test_event_basic(disable_aiohttp_cache, ray_start_with_dashboard): + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) + webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) + session_dir = ray_start_with_dashboard["session_dir"] + event_dir = os.path.join(session_dir, "logs", "events") + job_id = ray.JobID.from_int(100).hex() + + source_type_gcs = event_pb2.Event.SourceType.Name(event_pb2.Event.GCS) + source_type_raylet = event_pb2.Event.SourceType.Name( + event_pb2.Event.RAYLET) + test_count = 200 + + for source_type in [source_type_gcs, source_type_raylet]: + test_log_file = os.path.join(event_dir, f"event_{source_type}.log") + test_logger = _test_logger( + __name__ + str(random.random()), + test_log_file, + max_bytes=2000, + backup_count=1000) + for i in range(test_count): + sample_event = _get_event( + str(i), job_id=job_id, source_type=source_type) + test_logger.info("%s", json.dumps(sample_event)) + + def _check_events(): + try: + resp = requests.get(f"{webui_url}/events") + resp.raise_for_status() + result = resp.json() + all_events = result["data"]["events"] + job_events = all_events[job_id] + assert len(job_events) >= test_count * 2 + source_messages = {} + for e in job_events: + source_type = e["sourceType"] + message = e["message"] + source_messages.setdefault(source_type, set()).add(message) + assert len(source_messages[source_type_gcs]) >= test_count + assert len(source_messages[source_type_raylet]) >= test_count + data = {str(i) for i in range(test_count)} + assert data & source_messages[source_type_gcs] == data + assert data & source_messages[source_type_raylet] == data + return True + except Exception as ex: + logger.exception(ex) + return False + + wait_for_condition(_check_events, timeout=15) + + +def test_event_message_limit(disable_aiohttp_cache, ray_start_with_dashboard): + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) + webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) + session_dir = ray_start_with_dashboard["session_dir"] + event_dir = os.path.join(session_dir, "logs", "events") + job_id = ray.JobID.from_int(100).hex() + events = [] + # Sample event equals with limit. + sample_event = _get_event("", job_id=job_id) + message_len = event_consts.READ_LINE_LENGTH_LIMIT - len( + json.dumps(sample_event)) + for i in range(10): + sample_event = copy.deepcopy(sample_event) + sample_event["event_id"] = binary_to_hex(np.random.bytes(18)) + sample_event["message"] = str(i) * message_len + assert len( + json.dumps(sample_event)) == event_consts.READ_LINE_LENGTH_LIMIT + events.append(sample_event) + # Sample event longer than limit. + sample_event = copy.deepcopy(sample_event) + sample_event["event_id"] = binary_to_hex(np.random.bytes(18)) + sample_event["message"] = "2" * (message_len + 1) + assert len(json.dumps(sample_event)) > event_consts.READ_LINE_LENGTH_LIMIT + events.append(sample_event) + + for i in range(event_consts.READ_LINE_COUNT_LIMIT): + events.append(_get_event(str(i), job_id=job_id)) + + with open(os.path.join(event_dir, "tmp.log"), "w") as f: + f.writelines([(json.dumps(e) + "\n") for e in events]) + + try: + os.remove(os.path.join(event_dir, "event_GCS.log")) + except Exception: + pass + os.rename( + os.path.join(event_dir, "tmp.log"), + os.path.join(event_dir, "event_GCS.log")) + + def _check_events(): + try: + resp = requests.get(f"{webui_url}/events") + resp.raise_for_status() + result = resp.json() + all_events = result["data"]["events"] + assert len( + all_events[job_id]) >= event_consts.READ_LINE_COUNT_LIMIT + 10 + messages = [e["message"] for e in all_events[job_id]] + for i in range(10): + assert str(i) * message_len in messages + assert "2" * (message_len + 1) not in messages + assert str(event_consts.READ_LINE_COUNT_LIMIT - 1) in messages + return True + except Exception as ex: + logger.exception(ex) + return False + + wait_for_condition(_check_events, timeout=15) + + +@pytest.mark.asyncio +async def test_monitor_events(): + with tempfile.TemporaryDirectory() as temp_dir: + common = event_pb2.Event.SourceType.Name(event_pb2.Event.COMMON) + common_log = os.path.join(temp_dir, f"event_{common}.log") + test_logger = _test_logger( + __name__ + str(random.random()), + common_log, + max_bytes=10, + backup_count=10) + read_events = [] + monitor_task = monitor_events( + temp_dir, + lambda x: read_events.extend(x), + scan_interval_seconds=0.01) + assert not monitor_task.done() + count = 10 + + async def _writer(*args, spin=True): + for x in range(*args): + test_logger.info("%s", x) + if spin: + while str(x) not in read_events: + await asyncio.sleep(0.01) + + async def _check_events(expect_events, timeout=5): + start_time = time.time() + while True: + sorted_events = sorted(int(i) for i in read_events) + sorted_events = [str(i) for i in sorted_events] + if time.time() - start_time > timeout: + raise TimeoutError( + f"Timeout, read events: {sorted_events}, " + f"expect events: {expect_events}") + if len(sorted_events) == len(expect_events): + if sorted_events == expect_events: + break + await asyncio.sleep(1) + + await asyncio.gather( + _writer(count), _check_events([str(i) for i in range(count)])) + + read_events = [] + monitor_task.cancel() + monitor_task = monitor_events( + temp_dir, + lambda x: read_events.extend(x), + scan_interval_seconds=0.1) + + await _check_events([str(i) for i in range(count)]) + + await _writer(count, count * 2) + await _check_events([str(i) for i in range(count * 2)]) + + log_file_count = len(os.listdir(temp_dir)) + + test_logger = _test_logger( + __name__ + str(random.random()), + common_log, + max_bytes=1000, + backup_count=10) + assert len(os.listdir(temp_dir)) == log_file_count + + await _writer(count * 2, count * 3, spin=False) + await _check_events([str(i) for i in range(count * 3)]) + await _writer(count * 3, count * 4, spin=False) + await _check_events([str(i) for i in range(count * 4)]) + + # Test cancel monitor task. + monitor_task.cancel() + with pytest.raises(asyncio.CancelledError): + await monitor_task + assert monitor_task.done() + + assert len( + os.listdir(temp_dir)) > 1, "Event log should have rollovers." + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index d77e67c38fab8..81650350a1dae 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -141,6 +141,11 @@ cc_proto_library( deps = [":event_proto"], ) +python_grpc_compile( + name = "event_py_proto", + deps = [":event_proto"], +) + # Job agent. proto_library( name = "job_agent_proto", diff --git a/src/ray/protobuf/event.proto b/src/ray/protobuf/event.proto index 7dfd61c270fc7..2edc202776f6b 100644 --- a/src/ray/protobuf/event.proto +++ b/src/ray/protobuf/event.proto @@ -40,3 +40,15 @@ message Event { // store custom key such as node_id, job_id, task_id map custom_fields = 9; } + +message ReportEventsReply { + bool send_success = 1; +} + +message ReportEventsRequest { + repeated string event_strings = 1; +} + +service ReportEventService { + rpc ReportEvents(ReportEventsRequest) returns (ReportEventsReply); +}