Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(api, performance-metrics): clean up performance-metrics tracking #15289

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: way clearer typing, docs, and logic
  • Loading branch information
DerekMaggio committed May 30, 2024
commit b0944682d650c595e89cbf98c168a57ab7385166
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
from pathlib import Path
import platform

from functools import wraps, partial
from functools import partial
from time import perf_counter_ns
from typing import Callable, TypeVar, cast, Literal, Final
from typing import Callable, cast, Literal, Final


from typing_extensions import ParamSpec
from performance_metrics.datashapes import (
RawContextData,
)
Expand All @@ -18,11 +17,11 @@
RobotContextState,
SupportsTracking,
MetricsMetadata,
UnderlyingFunction,
UnderlyingFunctionParams,
UnderlyingFunctionReturn,
)

P = ParamSpec("P")
R = TypeVar("R")


def _get_timing_function() -> Callable[[], int]:
"""Returns a timing function for the current platform."""
Expand Down Expand Up @@ -63,39 +62,54 @@ def __init__(self, storage_location: Path, should_track: bool) -> None:
if self._should_track:
self._store.setup()

async def track(self, func_to_track: Callable, state: RobotContextState, *args, **kwargs) -> Callable: # type: ignore
"""Decorator factory for tracking the execution duration and state of robot operations.
async def __call_function(
self,
func_to_track: UnderlyingFunction,
*args: UnderlyingFunctionParams.args,
**kwargs: UnderlyingFunctionParams.kwargs
) -> UnderlyingFunctionReturn:
"""Calls the given function and returns its result."""
if inspect.iscoroutinefunction(func_to_track):
return await func_to_track(*args, **kwargs) # type: ignore
else:
return func_to_track(*args, **kwargs) # type: ignore

async def track(
self,
func_to_track: UnderlyingFunction,
state: RobotContextState,
*args: UnderlyingFunctionParams.args,
**kwargs: UnderlyingFunctionParams.kwargs
) -> UnderlyingFunctionReturn:
"""Tracks the given function and its execution duration.

If tracking is disabled, the function is called immediately and its result is returned.

Args:
state: The state to track for the decorated function.
func_to_track: The function to track.
state: The state of the robot context during the function execution.
*args: The arguments to pass to the function.
**kwargs: The keyword arguments to pass to the function.

Returns:
Callable: A decorator that wraps a function to track its execution duration and state.
If the function executes successfully, its return value is returned.
If the function raises an exception, the exception the function raised is raised.
"""



if not self._should_track:
return func_to_track
return await self.__call_function(func_to_track, *args, **kwargs)

function_start_time = timing_function()
duration_start_time = perf_counter_ns()

if inspect.iscoroutinefunction(func_to_track):
try:
result = await func_to_track(*args, **kwargs)
except Exception as e:
result = e
finally:
duration_end_time = perf_counter_ns()
else:
try:
result = func_to_track(*args, **kwargs)
except Exception as e:
result = e
finally:
duration_end_time = perf_counter_ns()

result: UnderlyingFunctionReturn | Exception

try:
result = await self.__call_function(func_to_track, *args, **kwargs)
except Exception as e:
result = e

duration_end_time = perf_counter_ns()

self._store.add(
RawContextData(
func_start=function_start_time,
Expand All @@ -109,8 +123,6 @@ async def track(self, func_to_track: Callable, state: RobotContextState, *args,
else:
return result



def store(self) -> None:
"""Returns the stored context data and clears the storage list."""
if not self._should_track:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ def calibrating_robot() -> None:
def analyzing_protocol() -> None:
sleep(ANALYZING_TIME)

await robot_context_tracker.track(starting_robot, state=RobotContextState.STARTING_UP)
await robot_context_tracker.track(calibrating_robot, state=RobotContextState.CALIBRATING)
await robot_context_tracker.track(analyzing_protocol, state=RobotContextState.ANALYZING_PROTOCOL)
await robot_context_tracker.track(
starting_robot, state=RobotContextState.STARTING_UP
)
await robot_context_tracker.track(
calibrating_robot, state=RobotContextState.CALIBRATING
)
await robot_context_tracker.track(
analyzing_protocol, state=RobotContextState.ANALYZING_PROTOCOL
)

robot_context_tracker.store()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ def robot_context_tracker(tmp_path: Path) -> RobotContextTracker:
return RobotContextTracker(storage_location=tmp_path, should_track=True)


async def test_robot_context_tracker(robot_context_tracker: RobotContextTracker) -> None:
async def test_robot_context_tracker(
robot_context_tracker: RobotContextTracker,
) -> None:
"""Tests the tracking of various robot context states through RobotContextTracker."""


def starting_robot() -> None:
sleep(STARTING_TIME)

Expand All @@ -46,12 +47,22 @@ def shutting_down_robot() -> None:
len(robot_context_tracker._store._data) == 0
), "Storage should be initially empty."

await robot_context_tracker.track(starting_robot, state=RobotContextState.STARTING_UP)
await robot_context_tracker.track(calibrating_robot, state=RobotContextState.CALIBRATING)
await robot_context_tracker.track(analyzing_protocol, state=RobotContextState.ANALYZING_PROTOCOL)
await robot_context_tracker.track(running_protocol, state=RobotContextState.RUNNING_PROTOCOL)
await robot_context_tracker.track(shutting_down_robot, state=RobotContextState.SHUTTING_DOWN)

await robot_context_tracker.track(
starting_robot, state=RobotContextState.STARTING_UP
)
await robot_context_tracker.track(
calibrating_robot, state=RobotContextState.CALIBRATING
)
await robot_context_tracker.track(
analyzing_protocol, state=RobotContextState.ANALYZING_PROTOCOL
)
await robot_context_tracker.track(
running_protocol, state=RobotContextState.RUNNING_PROTOCOL
)
await robot_context_tracker.track(
shutting_down_robot, state=RobotContextState.SHUTTING_DOWN
)

# Verify that all states were tracked
assert len(robot_context_tracker._store._data) == 5, "All states should be tracked."

Expand Down Expand Up @@ -83,8 +94,12 @@ def first_operation() -> None:
def second_operation() -> None:
sleep(RUNNING_TIME)

await robot_context_tracker.track(first_operation, state=RobotContextState.RUNNING_PROTOCOL)
await robot_context_tracker.track(second_operation, state=RobotContextState.RUNNING_PROTOCOL)
await robot_context_tracker.track(
first_operation, state=RobotContextState.RUNNING_PROTOCOL
)
await robot_context_tracker.track(
second_operation, state=RobotContextState.RUNNING_PROTOCOL
)

assert (
len(robot_context_tracker._store._data) == 2
Expand All @@ -106,7 +121,9 @@ async def error_prone_operation() -> None:
raise RuntimeError("Simulated operation failure")

with pytest.raises(RuntimeError):
await robot_context_tracker.track(error_prone_operation, state=RobotContextState.SHUTTING_DOWN)
await robot_context_tracker.track(
error_prone_operation, state=RobotContextState.SHUTTING_DOWN
)

assert (
len(robot_context_tracker._store._data) == 1
Expand All @@ -125,7 +142,9 @@ async def test_async_operation_tracking(
async def async_analyzing_operation() -> None:
await asyncio.sleep(ANALYZING_TIME)

await robot_context_tracker.track(async_analyzing_operation, state=RobotContextState.ANALYZING_PROTOCOL)
await robot_context_tracker.track(
async_analyzing_operation, state=RobotContextState.ANALYZING_PROTOCOL
)

assert (
len(robot_context_tracker._store._data) == 1
Expand All @@ -144,7 +163,11 @@ def test_sync_operation_timing_accuracy(
def running_operation() -> None:
sleep(RUNNING_TIME)

asyncio.run(robot_context_tracker.track(running_operation, state=RobotContextState.RUNNING_PROTOCOL))
asyncio.run(
robot_context_tracker.track(
running_operation, state=RobotContextState.RUNNING_PROTOCOL
)
)

duration_data = robot_context_tracker._store._data[0]
assert (
Expand All @@ -161,7 +184,9 @@ async def test_async_operation_timing_accuracy(
async def async_running_operation() -> None:
await asyncio.sleep(RUNNING_TIME)

await robot_context_tracker.track(async_running_operation, state=RobotContextState.RUNNING_PROTOCOL)
await robot_context_tracker.track(
async_running_operation, state=RobotContextState.RUNNING_PROTOCOL
)

duration_data = robot_context_tracker._store._data[0]
assert (
Expand All @@ -180,7 +205,9 @@ async def async_error_prone_operation() -> None:
raise RuntimeError("Simulated async operation failure")

with pytest.raises(RuntimeError):
await robot_context_tracker.track(async_error_prone_operation, state=RobotContextState.SHUTTING_DOWN)
await robot_context_tracker.track(
async_error_prone_operation, state=RobotContextState.SHUTTING_DOWN
)

assert (
len(robot_context_tracker._store._data) == 1
Expand All @@ -203,8 +230,12 @@ async def second_async_calibrating() -> None:
await asyncio.sleep(CALIBRATING_TIME)

await asyncio.gather(
robot_context_tracker.track(first_async_calibrating, state=RobotContextState.CALIBRATING),
robot_context_tracker.track(second_async_calibrating, state=RobotContextState.CALIBRATING)
robot_context_tracker.track(
first_async_calibrating, state=RobotContextState.CALIBRATING
),
robot_context_tracker.track(
second_async_calibrating, state=RobotContextState.CALIBRATING
),
)

assert (
Expand All @@ -223,13 +254,49 @@ def test_no_tracking(tmp_path: Path) -> None:
def operation_without_tracking() -> None:
sleep(STARTING_TIME)

asyncio.run(robot_context_tracker.track(operation_without_tracking, state=RobotContextState.STARTING_UP))
asyncio.run(
robot_context_tracker.track(
operation_without_tracking, state=RobotContextState.STARTING_UP
)
)

assert (
len(robot_context_tracker._store._data) == 0
), "Operation should not be tracked when tracking is disabled."


def test_async_exception_handling_when_not_tracking(tmp_path: Path) -> None:
"""Ensures exceptions in operations are still raised when tracking is disabled."""
robot_context_tracker = RobotContextTracker(tmp_path, should_track=False)

async def error_prone_operation() -> None:
sleep(SHUTTING_DOWN_TIME)
raise RuntimeError("Simulated operation failure")

with pytest.raises(RuntimeError):
asyncio.run(
robot_context_tracker.track(
error_prone_operation, state=RobotContextState.SHUTTING_DOWN
)
)


def test_sync_exception_handling_when_not_tracking(tmp_path: Path) -> None:
"""Ensures exceptions in operations are still raised when tracking is disabled."""
robot_context_tracker = RobotContextTracker(tmp_path, should_track=False)

def error_prone_operation() -> None:
sleep(SHUTTING_DOWN_TIME)
raise RuntimeError("Simulated operation failure")

with pytest.raises(RuntimeError):
asyncio.run(
robot_context_tracker.track(
error_prone_operation, state=RobotContextState.SHUTTING_DOWN
)
)


@patch(
"performance_metrics.robot_context_tracker._get_timing_function",
return_value=time_ns,
Expand All @@ -245,8 +312,14 @@ def starting_robot() -> None:
def calibrating_robot() -> None:
sleep(CALIBRATING_TIME)

asyncio.run(robot_context_tracker.track(starting_robot, state=RobotContextState.STARTING_UP))
asyncio.run(robot_context_tracker.track(calibrating_robot, state=RobotContextState.CALIBRATING))
asyncio.run(
robot_context_tracker.track(starting_robot, state=RobotContextState.STARTING_UP)
)
asyncio.run(
robot_context_tracker.track(
calibrating_robot, state=RobotContextState.CALIBRATING
)
)

storage = robot_context_tracker._store._data
assert all(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
"""Type definitions for performance tracking."""
import typing
from dataclasses import dataclass
from typing import Protocol, Tuple, TypeVar, Callable, Any
from pathlib import Path
from enum import Enum

F = TypeVar("F", bound=Callable[..., Any])

UnderlyingFunctionReturn = typing.TypeVar("UnderlyingFunctionReturn")
UnderlyingFunctionParams = typing.ParamSpec("UnderlyingFunctionParams")
UnderlyingFunction = typing.TypeVar(
"UnderlyingFunction", bound=typing.Callable[..., typing.Any]
)

class SupportsTracking(Protocol):

class SupportsTracking(typing.Protocol):
"""Protocol for classes that support tracking of robot context."""

def __init__(
self, storage_location: Path, should_track: bool
) -> None:
def __init__(self, storage_location: Path, should_track: bool) -> None:
"""Initialize the tracker."""
...

def track(self, state: "RobotContextState") -> Callable[[F], F]:
async def track(
self,
func_to_track: UnderlyingFunction,
state: "RobotContextState",
*args: UnderlyingFunctionParams.args,
**kwargs: UnderlyingFunctionParams.kwargs,
) -> UnderlyingFunctionReturn:
"""Decorator to track the given state for the decorated function."""
...

Expand Down Expand Up @@ -64,7 +73,7 @@ class MetricsMetadata:

name: str
storage_dir: Path
headers: Tuple[str, ...]
headers: typing.Tuple[str, ...]

@property
def data_file_location(self) -> Path:
Expand Down
Loading