Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exec 597 performance metrics configuration #15571

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
clean up logic
  • Loading branch information
DerekMaggio committed Jun 28, 2024
commit eb1e5a4b13b424f139762c27440419265870efd7
7 changes: 0 additions & 7 deletions performance-metrics/src/performance_metrics/dev_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@
"ROBOT_SHUTTING_DOWN",
]

SystemResourceMetricName = typing.Literal[
"COMMAND_PATH",
"RUNNING_SINCE",
"CPU_PERCENT",
"MEMORY_PERCENT",
]


class SupportsTracking(typing.Protocol):
"""Protocol for classes that support tracking of robot context."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""System resource tracker."""

import typing
import psutil
import dataclasses
import fnmatch
from time import sleep

from .util import get_timing_function, format_command

_timing_function = get_timing_function()


@dataclasses.dataclass(frozen=True)
class ProcessResourceUsageSnapshot:
"""Data for a tracked process."""

query_time: int
command: str
running_since: float
cpu_percent: float
memory_percent: float

@classmethod
def from_psutil_process(cls, process: psutil.Process) -> "ProcessResourceUsageSnapshot":
"""Create a ProcessData object from a psutil.Process object."""
return cls(
query_time=_timing_function(),
command=format_command(process.cmdline()),
running_since=process.create_time(),
cpu_percent=process.cpu_percent(),
memory_percent=process.memory_percent(),
)


class ProcessResourceTracker:
"""Tracks system resource usage."""

def __init__(
self,
process_filters: typing.List[str],
) -> None:
"""Initialize the tracker."""
self._process_filters = process_filters
self._processes: typing.List[psutil.Process] # intentionally not exposed as process.kill can be called
self._refresh_processes()

def _refresh_processes(self) -> None:
"""Filter processes by their command line path with globbing support.

Returns:
list of psutil.Process: List of processes that match the filters.
"""
# Note that psutil.process_iter caches the list of processes
# As long as the process is alive, it will be cached and reused on the next call to process_iter.

# Ensure that when calling process_iter you specify at least one attribute to the attr list.
# Otherwise all processes info will be retrieved which is slow.
# Ideally you will only specify the attributes that you want to filter on.

# See https://psutil.readthedocs.io/en/latest/#psutil.process_iter

processes = []
for process in psutil.process_iter(attrs=["cmdline"]):
try:
process_cmdline: typing.List[str] | None = process.info.get("cmdline")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue

if not process_cmdline:
continue

formatted_cmdline: str = format_command(process_cmdline)

if not formatted_cmdline:
continue

if any(fnmatch.fnmatch(formatted_cmdline, pattern) for pattern in self._process_filters):
processes.append(process)

self._processes = processes

def query_process_data(self) -> typing.List[ProcessResourceUsageSnapshot]:
"""Query the tracked processes."""
self._refresh_processes()
return [ProcessResourceUsageSnapshot.from_psutil_process(process) for process in self._processes]
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .metrics_store import MetricsStore
from .data_shapes import RawContextData, MetricsMetadata
from .dev_types import SupportsTracking, RobotContextState
from .util import get_timing_function

_UnderlyingFunctionParameters = typing.ParamSpec("_UnderlyingFunctionParameters")
_UnderlyingFunctionReturn = typing.TypeVar("_UnderlyingFunctionReturn")
Expand All @@ -19,24 +20,7 @@
]


def _get_timing_function() -> typing.Callable[[], int]:
"""Returns a timing function for the current platform."""
time_function: typing.Callable[[], int]
if platform.system() == "Linux":
from time import clock_gettime_ns, CLOCK_REALTIME

time_function = typing.cast(
typing.Callable[[], int], partial(clock_gettime_ns, CLOCK_REALTIME)
)
else:
from time import time_ns

time_function = time_ns

return time_function


timing_function = _get_timing_function()
_timing_function = get_timing_function()


class RobotContextTracker(SupportsTracking):
Expand Down Expand Up @@ -99,7 +83,7 @@ async def async_wrapper(
*args: _UnderlyingFunctionParameters.args,
**kwargs: _UnderlyingFunctionParameters.kwargs
) -> _UnderlyingFunctionReturn:
function_start_time = timing_function()
function_start_time = _timing_function()
duration_start_time = perf_counter_ns()
try:
result = await func_to_track(*args, **kwargs)
Expand All @@ -124,7 +108,7 @@ def wrapper(
*args: _UnderlyingFunctionParameters.args,
**kwargs: _UnderlyingFunctionParameters.kwargs
) -> _UnderlyingFunctionReturn:
function_start_time = timing_function()
function_start_time = _timing_function()
duration_start_time = perf_counter_ns()

try:
Expand Down
128 changes: 0 additions & 128 deletions performance-metrics/src/performance_metrics/system_resource_tracker.py

This file was deleted.

23 changes: 23 additions & 0 deletions performance-metrics/src/performance_metrics/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import typing
import platform
from functools import partial

def format_command(cmd_list: typing.List[str]) -> str:
"""Format the command line for the given process."""
return " ".join(cmd_list).strip()

def get_timing_function() -> typing.Callable[[], int]:
"""Returns a timing function for the current platform."""
time_function: typing.Callable[[], int]
if platform.system() == "Linux":
from time import clock_gettime_ns, CLOCK_REALTIME

time_function = typing.cast(
typing.Callable[[], int], partial(clock_gettime_ns, CLOCK_REALTIME)
)
else:
from time import time_ns

time_function = time_ns

return time_function
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import typing
import pytest
import psutil
from unittest.mock import patch, MagicMock
from performance_metrics.process_resource_tracker import ProcessResourceTracker
from performance_metrics.util import get_timing_function, format_command


def mock_process_iter(attrs=None) -> typing.List[psutil.Process]:
mock_procs = []

def create_mock_process(pid, cmdline):
mock_proc = MagicMock(spec=psutil.Process)
mock_proc.pid = pid
mock_proc.info = {"cmdline": cmdline}
mock_proc.cmdline.return_value = cmdline
return mock_proc

mock_procs.append(create_mock_process(1, ["python", "my_script.py"]))
mock_procs.append(create_mock_process(2, ["bash", "another_script.sh"]))
mock_procs.append(create_mock_process(3, ["python", "yet_another_script.py"]))
mock_procs.append(create_mock_process(4, ["java", "my_java_app.jar"]))

return mock_procs

@patch('psutil.process_iter', mock_process_iter)
def test_process_filtering():
tracker = ProcessResourceTracker(process_filters=["*my_script.py", "*another_script*"])

tracker._refresh_processes()
filtered_processes = tracker._processes

assert len(filtered_processes) == 3
assert filtered_processes[0].pid == 1
assert filtered_processes[1].pid == 2
assert filtered_processes[2].pid == 3
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def error_prone_operation() -> None:


@patch(
"performance_metrics.robot_context_tracker._get_timing_function",
"performance_metrics.util.get_timing_function",
return_value=time_ns,
)
def test_using_non_linux_time_functions(tmp_path: Path) -> None:
Expand Down