Skip to content

Commit

Permalink
feat(hardware-testing): DVT 96ch diagnostics script (#12736)
Browse files Browse the repository at this point in the history
  • Loading branch information
andySigler committed May 19, 2023
1 parent 5884a41 commit 0cdd135
Show file tree
Hide file tree
Showing 16 changed files with 1,457 additions and 49 deletions.
9 changes: 4 additions & 5 deletions api/src/opentrons/config/defaults_ot3.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
OT3AxisKind.X: 1000,
OT3AxisKind.Y: 1000,
OT3AxisKind.Z: 120,
OT3AxisKind.P: 10,
OT3AxisKind.P: 30,
OT3AxisKind.Z_G: 150,
OT3AxisKind.Q: 10,
},
Expand All @@ -116,9 +116,9 @@
OT3AxisKind.X: 10,
OT3AxisKind.Y: 10,
OT3AxisKind.Z: 5,
OT3AxisKind.P: 10,
OT3AxisKind.P: 5,
OT3AxisKind.Z_G: 10,
OT3AxisKind.Q: 10,
OT3AxisKind.Q: 5,
},
low_throughput={
OT3AxisKind.X: 10,
Expand Down Expand Up @@ -172,8 +172,7 @@
OT3AxisKind.X: 1.4,
OT3AxisKind.Y: 1.4,
OT3AxisKind.Z: 1.4,
# TODO: verify this value
OT3AxisKind.P: 2.0,
OT3AxisKind.P: 2.2,
OT3AxisKind.Z_G: 0.67,
OT3AxisKind.Q: 1.5,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ async def home(
await self.tip_action(
[OT3Axis.Q],
self.axis_bounds[OT3Axis.Q][1] - self.axis_bounds[OT3Axis.Q][0],
self._configuration.motion_settings.default_max_speed.high_throughput[
self._configuration.motion_settings.max_speed_discontinuity.high_throughput[
OT3Axis.to_kind(OT3Axis.Q)
],
)
Expand Down
1 change: 1 addition & 0 deletions hardware-testing/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ test-production-qc:
$(python) -m hardware_testing.production_qc.pipette_assembly_qc_ot3 --operator test --simulate
$(python) -m hardware_testing.production_qc.robot_assembly_qc_ot3 --simulate
$(python) -m hardware_testing.production_qc.gripper_assembly_qc_ot3 --simulate
$(python) -m hardware_testing.production_qc.ninety_six_assembly_qc_ot3 --simulate

.PHONY: test-examples
test-examples:
Expand Down
114 changes: 71 additions & 43 deletions hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ async def build_async_ot3_hardware_api(
kwargs = {"config": config}
if is_simulating:
builder = OT3API.build_hardware_simulator
# TODO (andy s): add ability to simulate:
# - gripper
# - 96-channel
# - modules
sim_pips = _create_attached_instruments_dict(
pipette_left, pipette_right, gripper
)
Expand All @@ -146,7 +142,15 @@ async def build_async_ot3_hardware_api(
builder = OT3API.build_hardware_controller
stop_server_ot3()
restart_canbus_ot3()
return await builder(loop=loop, **kwargs) # type: ignore[arg-type]
kwargs["use_usb_bus"] = True # type: ignore[assignment]
try:
return await builder(loop=loop, **kwargs) # type: ignore[arg-type]
except Exception as e:
if is_simulating:
raise e
print(e)
kwargs["use_usb_bus"] = False # type: ignore[assignment]
return await builder(loop=loop, **kwargs) # type: ignore[arg-type]


def set_gantry_per_axis_setting_ot3(
Expand Down Expand Up @@ -414,6 +418,35 @@ async def move_plunger_absolute_ot3(
await _move_coro


async def move_tip_motor_relative_ot3(
api: OT3API,
distance: float,
motor_current: Optional[float] = None,
speed: Optional[float] = None,
) -> None:
"""Move 96ch tip-motor (Q) to an absolute position."""
if not api.hardware_pipettes[OT3Mount.LEFT.to_mount()]:
raise RuntimeError("No pipette found on LEFT mount")
if distance < 0:
action = "home"
else:
action = "clamp"
_move_coro = api._backend.tip_action(
axes=[OT3Axis.Q],
distance=distance,
speed=speed if speed else 5,
tip_action=action,
)
if motor_current is None:
await _move_coro
else:
async with api._backend.restore_current():
await api._backend.set_active_current(
{OT3Axis.Q: motor_current} # type: ignore[dict-item]
)
await _move_coro


async def move_plunger_relative_ot3(
api: OT3API,
mount: OT3Mount,
Expand Down Expand Up @@ -619,55 +652,38 @@ class SensorResponseBad(Exception):
pass


async def get_capacitance_ot3(
api: OT3API, mount: OT3Mount, channel: Optional[str] = None
) -> float:
"""Get the capacitance reading from the pipette."""
if api.is_simulator:
return 0.0
node_id = sensor_node_for_mount(mount)
if not channel or channel == "rear":
capacitive = sensor_types.CapacitiveSensor.build(SensorId.S0, node_id)
elif channel == "front":
capacitive = sensor_types.CapacitiveSensor.build(SensorId.S1, node_id)
else:
raise ValueError(f"unexpected channel for capacitance sensor: {channel}")
s_driver = sensor_driver.SensorDriver()
data = await s_driver.read(
api._backend._messenger, capacitive, offset=False, timeout=1 # type: ignore[union-attr]
)
if data is None:
raise SensorResponseBad("no response from sensor")
return data.to_float() # type: ignore[union-attr]


async def _get_temp_humidity(
messenger: CanMessenger, mount: OT3Mount
messenger: CanMessenger,
mount: OT3Mount,
sensor_id: SensorId = SensorId.S0,
) -> Tuple[float, float]:
node_id = sensor_node_for_mount(mount)
# FIXME: allow SensorId to specify which sensor on the device to read from
environment = sensor_types.EnvironmentSensor.build(SensorId.S0, node_id)
environment = sensor_types.EnvironmentSensor.build(sensor_id, node_id)
s_driver = sensor_driver.SensorDriver()
data = await s_driver.read(
messenger, environment, offset=False, timeout=1 # type: ignore[union-attr]
messenger, environment, offset=False, timeout=2 # type: ignore[union-attr]
)
if data is None:
raise SensorResponseBad("no response from sensor")
return data.temperature.to_float(), data.humidity.to_float() # type: ignore[union-attr]


async def get_temperature_humidity_ot3(
api: OT3API, mount: OT3Mount
api: OT3API,
mount: OT3Mount,
sensor_id: SensorId = SensorId.S0,
) -> Tuple[float, float]:
"""Get the temperature/humidity reading from the pipette."""
if api.is_simulator:
return 25.0, 50.0
messenger = api._backend._messenger # type: ignore[union-attr]
return await _get_temp_humidity(messenger, mount)
return await _get_temp_humidity(messenger, mount, sensor_id)


def get_temperature_humidity_outside_api_ot3(
mount: OT3Mount, is_simulating: bool = False
mount: OT3Mount,
is_simulating: bool = False,
sensor_id: SensorId = SensorId.S0,
) -> Tuple[float, float]:
"""Get the temperature/humidity reading from the pipette outside of a protocol."""
settings = DriverSettings(
Expand All @@ -684,7 +700,7 @@ async def _run() -> Tuple[float, float]:
async with build.driver(settings) as driver:
messenger = CanMessenger(driver=driver)
messenger.start()
ret = await _get_temp_humidity(messenger, mount)
ret = await _get_temp_humidity(messenger, mount, sensor_id)
await messenger.stop()
return ret

Expand All @@ -694,22 +710,34 @@ async def _run() -> Tuple[float, float]:
return task.result()


async def get_capacitance_ot3(
api: OT3API, mount: OT3Mount, sensor_id: SensorId = SensorId.S0
) -> float:
"""Get the capacitance reading from the pipette."""
if api.is_simulator:
return 0.0
node_id = sensor_node_for_mount(mount)
capacitive = sensor_types.CapacitiveSensor.build(sensor_id, node_id)
s_driver = sensor_driver.SensorDriver()
data = await s_driver.read(
api._backend._messenger, capacitive, offset=False, timeout=2 # type: ignore[union-attr]
)
if data is None:
raise SensorResponseBad("no response from sensor")
return data.to_float() # type: ignore[union-attr]


async def get_pressure_ot3(
api: OT3API, mount: OT3Mount, channel: Optional[str] = None
api: OT3API, mount: OT3Mount, sensor_id: SensorId = SensorId.S0
) -> float:
"""Get the pressure reading from the pipette."""
if api.is_simulator:
return 0.0
node_id = sensor_node_for_mount(mount)
if not channel or channel == "rear":
pressure = sensor_types.PressureSensor.build(SensorId.S0, node_id)
elif channel == "front":
pressure = sensor_types.PressureSensor.build(SensorId.S1, node_id)
else:
raise ValueError(f"unexpected channel for pressure sensor: {channel}")
pressure = sensor_types.PressureSensor.build(sensor_id, node_id)
s_driver = sensor_driver.SensorDriver()
data = await s_driver.read(
api._backend._messenger, pressure, offset=False, timeout=1 # type: ignore[union-attr]
api._backend._messenger, pressure, offset=False, timeout=2 # type: ignore[union-attr]
)
if data is None:
raise SensorResponseBad("no response from sensor")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""96 Channel Assembly QC OT3."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""96 Channel assembly QC OT3."""
import argparse
import asyncio
from pathlib import Path

from hardware_testing.data import ui, get_git_description
from hardware_testing.data.csv_report import RESULTS_OVERVIEW_TITLE
from hardware_testing.opentrons_api import helpers_ot3
from hardware_testing.opentrons_api.types import OT3Mount, OT3Axis

from .config import TestSection, TestConfig, build_report, TESTS


async def _main(cfg: TestConfig) -> None:
# BUILD API
api = await helpers_ot3.build_async_ot3_hardware_api(
is_simulating=cfg.simulate,
pipette_left="p1000_96_v3.4",
)
await api.home()
home_pos = await api.gantry_position(OT3Mount.LEFT)
mount = OT3Mount.LEFT
attach_pos = helpers_ot3.get_slot_calibration_square_position_ot3(5)
attach_pos = attach_pos._replace(z=home_pos.z)
if not api.hardware_pipettes[mount.to_mount()]:
# FIXME: Do not home the plunger using the normal home method.
# See section below where we use OT3Controller to home it.
await api.home()
await helpers_ot3.move_to_arched_ot3(api, mount, attach_pos)
while not api.hardware_pipettes[mount.to_mount()]:
ui.get_user_ready("attach a 96ch pipette")
await api.reset()
await api.home_z(OT3Mount.LEFT)

pipette = api.hardware_pipettes[mount.to_mount()]
assert pipette
pipette_id = str(pipette.pipette_id)

# FIXME: remove this once the "'L' format requires 0 <= number <= 4294967295" bug is gone
await api._backend.home([OT3Axis.P_L])
await api.refresh_positions()

# BUILD REPORT
test_name = Path(__file__).parent.name
ui.print_title(test_name.replace("_", " ").upper())
report = build_report(test_name.replace("_", "-"))
report.set_tag(pipette_id)
if not cfg.simulate:
report.set_operator(input("enter operator name: "))
else:
report.set_operator("simulation")
report.set_version(get_git_description())

# RUN TESTS
for section, test_run in cfg.tests.items():
ui.print_title(section.value)
await test_run(api, report, section.value)

# RELOAD PIPETTE
ui.print_title("DONE")
await helpers_ot3.move_to_arched_ot3(api, mount, attach_pos)

# SAVE REPORT
report_path = report.save_to_disk()
complete_msg = "complete" if report.completed else "incomplete"
print(f"done, {complete_msg} report -> {report_path}")
print("Overall Results:")
for line in report[RESULTS_OVERVIEW_TITLE].lines:
print(f" - {line.tag}: {line.result}")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--simulate", action="store_true")
# add each test-section as a skippable argument (eg: --skip-gantry)
for s in TestSection:
parser.add_argument(
f"--skip-{s.value.lower()}".replace("_", "-"), action="store_true"
)
parser.add_argument(
f"--only-{s.value.lower()}".replace("_", "-"), action="store_true"
)
args = parser.parse_args()
_t_sections = {
s: f
for s, f in TESTS
if getattr(args, f"only_{s.value.lower().replace('-', '_')}")
}
if _t_sections:
assert (
len(list(_t_sections.keys())) < 2
), 'use "--only" for just one test, not multiple tests'
else:
_t_sections = {
s: f
for s, f in TESTS
if not getattr(args, f"skip_{s.value.lower().replace('-', '_')}")
}
_config = TestConfig(simulate=args.simulate, tests=_t_sections)
asyncio.run(_main(_config))
Loading

0 comments on commit 0cdd135

Please sign in to comment.