-
Notifications
You must be signed in to change notification settings - Fork 176
/
robot_context_tracker.py
137 lines (109 loc) · 4.87 KB
/
robot_context_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Module for tracking robot context and execution duration for different operations."""
import inspect
from pathlib import Path
import platform
from functools import partial, wraps
from time import perf_counter_ns
import typing
from .metrics_store import MetricsStore
from .data_shapes import RawContextData, MetricsMetadata
from .dev_types import SupportsTracking, RobotContextState
from .util import get_timing_function
_UnderlyingFunctionParameters = typing.ParamSpec("_UnderlyingFunctionParameters")
_UnderlyingFunctionReturn = typing.TypeVar("_UnderlyingFunctionReturn")
_UnderlyingFunction = typing.Callable[
_UnderlyingFunctionParameters, _UnderlyingFunctionReturn
]
_timing_function = get_timing_function()
class RobotContextTracker(SupportsTracking):
"""Tracks and stores robot context and execution duration for different operations."""
METADATA_NAME: typing.Final[
typing.Literal["robot_context_data"]
] = "robot_context_data"
def __init__(self, storage_location: Path, should_track: bool) -> None:
"""Initializes the RobotContextTracker with an empty storage list."""
self._store = MetricsStore[RawContextData](
MetricsMetadata(
name=self.METADATA_NAME,
storage_dir=storage_location,
headers=RawContextData.headers(),
)
)
self._should_track = should_track
if self._should_track:
self._store.setup()
def track(
self,
state: RobotContextState,
) -> typing.Callable[
[_UnderlyingFunction[_UnderlyingFunctionParameters, _UnderlyingFunctionReturn]],
_UnderlyingFunction[_UnderlyingFunctionParameters, _UnderlyingFunctionReturn],
]:
"""Tracks the given function and its execution duration.
If tracking is disabled, the function is called immediately and its result is returned.
Args:
func_to_track: The function to track.
state: The state of the robot context during the function execution.
*args: The arguments to pass to the function.
**kwargs: The keyword arguments to pass to the function.
Returns:
If the function executes successfully, its return value is returned.
If the function raises an exception, the exception the function raised is raised.
"""
def inner_decorator(
func_to_track: _UnderlyingFunction[
_UnderlyingFunctionParameters, _UnderlyingFunctionReturn
]
) -> _UnderlyingFunction[
_UnderlyingFunctionParameters, _UnderlyingFunctionReturn
]:
if not self._should_track:
return func_to_track
if inspect.iscoroutinefunction(func_to_track):
@wraps(func_to_track)
async def async_wrapper(
*args: _UnderlyingFunctionParameters.args,
**kwargs: _UnderlyingFunctionParameters.kwargs
) -> _UnderlyingFunctionReturn:
function_start_time = _timing_function()
duration_start_time = perf_counter_ns()
try:
result = await func_to_track(*args, **kwargs)
finally:
duration_end_time = perf_counter_ns()
self._store.add(
RawContextData(
func_start=function_start_time,
duration=duration_end_time - duration_start_time,
state=state,
)
)
return result # type: ignore
return async_wrapper # type: ignore
else:
@wraps(func_to_track)
def wrapper(
*args: _UnderlyingFunctionParameters.args,
**kwargs: _UnderlyingFunctionParameters.kwargs
) -> _UnderlyingFunctionReturn:
function_start_time = _timing_function()
duration_start_time = perf_counter_ns()
try:
result = func_to_track(*args, **kwargs)
finally:
duration_end_time = perf_counter_ns()
self._store.add(
RawContextData(
func_start=function_start_time,
duration=duration_end_time - duration_start_time,
state=state,
)
)
return result
return wrapper
return inner_decorator
def store(self) -> None:
"""Returns the stored context data and clears the storage list."""
if not self._should_track:
return
self._store.store()