-
Notifications
You must be signed in to change notification settings - Fork 177
/
robot_configs.py
executable file
·187 lines (135 loc) · 5.95 KB
/
robot_configs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Union, Optional, cast
from typing_extensions import Literal
from . import CONFIG, defaults_ot3, defaults_ot2, gripper_config, feature_flags as ff
from opentrons.hardware_control.types import BoardRevision
from .types import CurrentDict, RobotConfig, AxisDict, OT3Config
log = logging.getLogger(__name__)
def current_for_revision(
current_dict: CurrentDict, revision: BoardRevision
) -> AxisDict:
"""Pull the appropriate current value for the specified revision."""
if revision == BoardRevision.UNKNOWN:
return current_dict.get("2.1", current_dict["default"])
elif revision.real_name() in current_dict:
return current_dict[revision.real_name()] # type: ignore
else:
return current_dict["default"]
def build_config(robot_settings: Dict[str, Any]) -> Union[RobotConfig, OT3Config]:
"""Build the appropriate config object for the machine.
Which config object should be used will be detected from the specified config
object.
"""
default_robot_model: Union[Literal["OT-3 Standard"], Literal["OT-2 Standard"]] = (
"OT-3 Standard" if ff.enable_ot3_hardware_controller() else "OT-2 Standard"
)
robot_model = robot_settings.get("model", default_robot_model)
if robot_model == "OT-3 Standard":
return build_config_ot3(robot_settings)
else:
return build_config_ot2(robot_settings)
def build_config_ot2(robot_settings: Dict[str, Any]) -> RobotConfig:
"""Build an OT2 config object with default values for unspecified elements."""
return defaults_ot2.build_with_defaults(robot_settings)
def build_config_ot3(robot_settings: Dict[str, Any]) -> OT3Config:
"""Build an OT3 config object with default values for unspecified elements."""
return defaults_ot3.build_with_defaults(robot_settings)
def config_to_save(config: Union[RobotConfig, OT3Config]) -> Dict[str, Any]:
"""Turn the config into a serializable dictionary."""
if config.model == "OT-2 Standard":
return defaults_ot2.serialize(config)
else:
return defaults_ot3.serialize(config)
def _load_file() -> Dict[str, Any]:
settings_file = CONFIG["robot_settings_file"]
log.debug("Loading robot settings from {}".format(settings_file))
return _load_json(settings_file) or {}
def load() -> Union[RobotConfig, OT3Config]:
"""Load the appropriate config, if it exists, or build defaults."""
return build_config(_load_file())
def load_ot2() -> RobotConfig:
"""Load an OT2 config, or build defaults."""
return build_config_ot2(_load_file())
def load_ot3() -> OT3Config:
"""Load an OT3 config, or build defaults."""
return build_config_ot3(_load_file())
def save_robot_settings(
config: Union[RobotConfig, OT3Config],
rs_filename: Optional[str] = None,
tag: Optional[str] = None,
) -> Dict[str, Any]:
"""Save the specified config to disk as a JSON file."""
config_dict = config_to_save(config)
config_json = json_to_save(config_dict)
# Save everything else in a different file
filename = rs_filename or CONFIG["robot_settings_file"]
if tag:
root, ext = os.path.splitext(filename)
filename = "{}-{}{}".format(root, tag, ext)
_save_config_data(config_json, filename=filename)
return config_dict
def json_to_save(config: Dict[str, Any]) -> str:
"""Build a JSON object of the config, ready to store on disk."""
return json.dumps(config, sort_keys=True, indent=4)
def backup_configuration(
config: Union[RobotConfig, OT3Config], tag: Optional[str] = None
) -> None:
"""Save a backup, tagged by timestamp, of the config."""
import time
if not tag:
tag = str(int(time.time() * 1000))
save_robot_settings(config, tag=tag)
def get_legacy_gantry_calibration() -> Optional[List[List[float]]]:
"""
Returns the legacy gantry calibration if exists.
This should happen only if the new deck calibration file does not exist.
The legacy calibration should then be migrated to the new format.
"""
gantry_cal = _load_json(CONFIG["deck_calibration_file"])
if "gantry_calibration" in gantry_cal:
return gantry_cal["gantry_calibration"] # type: ignore[no-any-return]
else:
return None
def clear() -> None:
_clear_file(CONFIG["robot_settings_file"])
def _clear_file(filename: Union[str, Path]) -> None:
log.debug("Deleting {}".format(filename))
if os.path.exists(filename):
os.remove(filename)
# TODO: move to util (write a default load, save JSON function)
def _load_json(filename: Union[str, Path]) -> Dict[str, Any]:
try:
with open(filename, "r") as file:
res = json.load(file)
except FileNotFoundError:
log.warning("{0} not found. Loading defaults".format(filename))
res = {}
except json.decoder.JSONDecodeError:
log.warning("{0} is corrupt. Loading defaults".format(filename), exc_info=True)
res = {}
return cast(Dict[str, Any], res)
def _save_config_data(data: str, filename: Union[str, Path]) -> None:
try:
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as file:
file.write(data)
file.flush()
os.fsync(file.fileno())
except OSError:
log.exception("Write failed with exception:")
def default_ot2_deck_calibration() -> List[List[float]]:
return defaults_ot2.DEFAULT_DECK_CALIBRATION_V2
def default_ot3_deck_calibration() -> List[List[float]]:
return defaults_ot3.DEFAULT_BELT_ATTITUDE
def default_pipette_offset() -> List[float]:
if ff.enable_ot3_hardware_controller():
return defaults_ot3.DEFAULT_PIPETTE_OFFSET
else:
return defaults_ot2.DEFAULT_PIPETTE_OFFSET
def default_gripper_calibration_offset() -> List[float]:
return gripper_config.DEFAULT_GRIPPER_CALIBRATION_OFFSET
def default_module_calibration_offset() -> List[float]:
return defaults_ot3.DEFAULT_MODULE_OFFSET