Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos-fernandez committed Feb 13, 2023
2 parents c3715bf + 47ba406 commit 07c68bc
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 234 deletions.
6 changes: 0 additions & 6 deletions api/src/opentrons/config/defaults_ot3.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
sensor_threshold_pascals=100,
expected_liquid_height=110,
log_pressure=True,
home_plunger_at_start=False,
aspirate_while_sensing=False,
read_only=False,
)

DEFAULT_CALIBRATION_SETTINGS: Final[OT3CalibrationSettings] = OT3CalibrationSettings(
Expand Down Expand Up @@ -373,13 +371,9 @@ def _build_default_liquid_probe(
"expected_liquid_height", default.expected_liquid_height
),
log_pressure=from_conf.get("log_pressure", default.log_pressure),
home_plunger_at_start=from_conf.get(
"home_plunger_at_start", default.home_plunger_at_start
),
aspirate_while_sensing=from_conf.get(
"aspirate_while_sensing", default.aspirate_while_sensing
),
read_only=from_conf.get("read_only", default.read_only),
)


Expand Down
2 changes: 0 additions & 2 deletions api/src/opentrons/config/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ class LiquidProbeSettings:
sensor_threshold_pascals: float
expected_liquid_height: float
log_pressure: bool
home_plunger_at_start: bool
aspirate_while_sensing: bool
read_only: bool


@dataclass(frozen=True)
Expand Down
38 changes: 4 additions & 34 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
node_to_axis,
sub_system_to_node_id,
sensor_node_for_mount,
head_node_for_mount,
sensor_id_for_instrument,
create_gripper_jaw_grip_group,
create_gripper_jaw_home_group,
Expand Down Expand Up @@ -356,7 +355,7 @@ def _build_home_pipettes_runner(
)

distances_pipette = {
ax: -1 * self.phony_bounds[ax][1] - self.phony_bounds[ax][0]
ax: -1 * self.axis_bounds[ax][1] - self.axis_bounds[ax][0]
for ax in axes
if ax in OT3Axis.pipette_axes()
}
Expand Down Expand Up @@ -385,7 +384,7 @@ def _build_home_gantry_z_runner(
)

distances_gantry = {
ax: -1 * self.phony_bounds[ax][1] - self.phony_bounds[ax][0]
ax: -1 * self.axis_bounds[ax][1] - self.axis_bounds[ax][0]
for ax in axes
if ax in OT3Axis.gantry_axes() and ax not in OT3Axis.mount_axes()
}
Expand All @@ -395,7 +394,7 @@ def _build_home_gantry_z_runner(
if ax in OT3Axis.gantry_axes() and ax not in OT3Axis.mount_axes()
}
distances_z = {
ax: -1 * self.phony_bounds[ax][1] - self.phony_bounds[ax][0]
ax: -1 * self.axis_bounds[ax][1] - self.axis_bounds[ax][0]
for ax in axes
if ax in OT3Axis.mount_axes()
}
Expand Down Expand Up @@ -739,32 +738,8 @@ async def watch(self, loop: asyncio.AbstractEventLoop) -> None:
while can_watch and (not self._event_watcher.closed):
await self._handle_watch_event()

def get_slot_center_pos(self, slot_num: int) -> OT3AxisMap[float]:
"""Return the slot center."""
slot_width = self.axis_bounds[OT3Axis.X][1] / 3
slot_depth = self.axis_bounds[OT3Axis.Y][1] / 4

centers_x = [slot_width * 2.5, slot_width * 0.5, slot_width * 1.5]
center_y = (math.ceil(slot_num / 3) - 0.5) * slot_depth

return {OT3Axis.X: centers_x[slot_num % 3], OT3Axis.Y: center_y}

@property
def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
"""Get the axis bounds."""
# TODO (CM): gripper axis bounds need to be defined
return {
OT3Axis.Z_L: (0, 160),
OT3Axis.Z_R: (0, 160),
OT3Axis.P_L: (0, 110),
OT3Axis.P_R: (0, 110),
OT3Axis.X: (0, 455),
OT3Axis.Y: (0, 412),
OT3Axis.Z_G: (0, 1000),
}

@property
def phony_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
"""Get the axis bounds."""
# TODO (AL, 2021-11-18): The bounds need to be defined
phony_bounds = (0, 10000)
Expand All @@ -779,9 +754,6 @@ def phony_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
OT3Axis.Q: phony_bounds,
}

def single_boundary(self, boundary: int) -> OT3AxisMap[float]:
return {ax: bound[boundary] for ax, bound in self.phony_bounds.items()}

def engaged_axes(self) -> OT3AxisMap[bool]:
"""Get engaged axes."""
return {}
Expand Down Expand Up @@ -968,7 +940,7 @@ async def liquid_probe(
read_only: bool = False,
sensor_id: SensorId = SensorId.S0,
) -> None:
head_node = head_node_for_mount(OT3Mount(mount.value))
head_node = axis_to_node(OT3Axis.by_mount(mount))
tool = sensor_node_for_mount(OT3Mount(mount.value))
positions = await liquid_probe(
self._messenger,
Expand All @@ -984,11 +956,9 @@ async def liquid_probe(
read_only,
sensor_id,
)
pos_axes = dict()
for node, point in positions.items():
self._position.update({node: point[0]})
self._encoder_position.update({node: point[1]})
pos_axes[node_to_axis(node)] = point

async def capacitive_probe(
self,
Expand Down
31 changes: 2 additions & 29 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,48 +454,21 @@ async def watch(self, loop: asyncio.AbstractEventLoop) -> None:
]
await self.module_controls.register_modules(new_mods_at_ports=new_mods_at_ports)

def get_slot_center_pos(self, slot_num: int) -> OT3AxisMap[float]:
"""Return the slot center."""
slot_width = self.axis_bounds[OT3Axis.X][1] / 3
slot_depth = self.axis_bounds[OT3Axis.Y][1] / 4

centers_x = [slot_width * 2.5, slot_width * 0.5, slot_width * 1.5]
center_y = (math.ceil(slot_num / 3) - 0.5) * slot_depth

return {OT3Axis.X: centers_x[slot_num % 3], OT3Axis.Y: center_y}

@property
def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
"""Get the axis bounds."""
# TODO (CM): gripper axis bounds need to be defined
return {
OT3Axis.Z_L: (0, 160),
OT3Axis.Z_R: (0, 160),
OT3Axis.P_L: (0, 110),
OT3Axis.P_R: (0, 110),
OT3Axis.X: (0, 455),
OT3Axis.Y: (0, 412),
OT3Axis.Z_G: (0, 1000),
}

@property
def phony_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
"""Get the axis bounds."""
# TODO (AL, 2021-11-18): The bounds need to be defined
phony_bounds = (0, 10000)
return {
OT3Axis.Z_L: phony_bounds,
OT3Axis.Z_R: phony_bounds,
OT3Axis.Z_L: phony_bounds,
OT3Axis.P_L: phony_bounds,
OT3Axis.P_R: phony_bounds,
OT3Axis.X: phony_bounds,
OT3Axis.Y: phony_bounds,
OT3Axis.X: phony_bounds,
OT3Axis.Z_G: phony_bounds,
}

def single_boundary(self, boundary: int) -> OT3AxisMap[float]:
return {ax: bound[boundary] for ax, bound in self.phony_bounds.items()}

@property
def fw_version(self) -> Optional[str]:
"""Get the firmware version."""
Expand Down
7 changes: 0 additions & 7 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
OT3Mount,
InstrumentProbeType,
)
from opentrons.types import Mount
import numpy as np

from opentrons_hardware.firmware_bindings.constants import (
Expand Down Expand Up @@ -318,12 +317,6 @@ def axis_convert(
}


def head_node_for_mount(mount: Union[OT3Mount, Mount]) -> NodeId:
if isinstance(mount, Mount):
mount = OT3Mount.from_mount(mount)
return _head_node_lookup[mount]


def sensor_node_for_mount(mount: OT3Mount) -> ProbeTarget:
return _sensor_node_lookup[mount]

Expand Down
1 change: 1 addition & 0 deletions api/src/opentrons/hardware_control/motion_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def deck_from_machine(
axis_enum = type(next(iter(machine_pos.keys())))
plunger_axes = {k: v for k, v in machine_pos.items() if k not in k.gantry_axes()}
mount_axes = {k: v for k, v in machine_pos.items() if k in k.mount_axes()}
# print(f"mount axes = {mount_axes}")
deck_positions_by_mount = {
axis_enum.to_mount(axis): deck_point_from_machine_point(
Point(machine_pos[axis_enum.X], machine_pos[axis_enum.Y], value),
Expand Down
26 changes: 15 additions & 11 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
)
from opentrons_hardware.hardware_control.motion_planning.move_utils import (
MoveConditionNotMet,
ThresholdReachedTooEarly,
EarlyLiquidSenseTrigger,
)

mod_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -633,6 +633,7 @@ async def home_plunger(self, mount: Union[top_types.Mount, OT3Mount]) -> None:

checked_mount = OT3Mount.from_mount(mount)
await self.home([OT3Axis.of_main_tool_actuator(checked_mount)])
print(f"pipette = self._pipette_handler.hardware_instruments")
instr = self._pipette_handler.hardware_instruments[checked_mount]
if instr:
target_pos = target_position_from_plunger(
Expand Down Expand Up @@ -944,7 +945,7 @@ async def _move(
self._transforms.deck_calibration.attitude,
self._transforms.carriage_offset,
)
bounds = self._backend.phony_bounds
bounds = self._backend.axis_bounds
to_check = {
ax: machine_pos[ax]
for ax in target_position.keys()
Expand Down Expand Up @@ -1687,7 +1688,7 @@ async def liquid_probe(
If the move is completed without the specified threshold being triggered, a
MoveConditionNotMet error will be thrown.
If the threshold is triggered before the minimum z distance has been traveled,
a ThresholdReachedTooEarly error will be thrown.
a EarlyLiquidSenseTrigger error will be thrown.
Otherwise, the function will stop moving once the threshold is triggered,
home the plunger to prepare to aspirate, and return the position of the
Expand All @@ -1707,10 +1708,7 @@ async def liquid_probe(

if not self._current_position:
await self.home()
if (
probe_settings.home_plunger_at_start
or probe_settings.aspirate_while_sensing
):
if probe_settings.aspirate_while_sensing:
await self.home_plunger(mount)

direction = -1 if probe_settings.aspirate_while_sensing else 1
Expand All @@ -1725,13 +1723,13 @@ async def liquid_probe(
probe_settings.starting_mount_height,
probe_settings.prep_move_speed,
probe_settings.log_pressure,
probe_settings.read_only,
)
except MoveConditionNotMet:
self._log.exception("Liquid Sensing failed- threshold never reached.")
raise
else:
machine_pos = await self._backend.update_position()
print(f"machine pos = {machine_pos}")
position = deck_from_machine(
machine_pos,
self._transforms.deck_calibration.attitude,
Expand All @@ -1751,11 +1749,17 @@ async def liquid_probe(
position[mount_axis] - probe_settings.starting_mount_height
)

print(f"z distance = {z_distance_traveled}")
if z_distance_traveled < probe_settings.min_z_distance:
self._log.exception(
"Liquid Sensing failed- threshold reached too early."
print("should raise")
raise EarlyLiquidSenseTrigger(
triggered_at=z_distance_traveled,
min_z=probe_settings.min_z_distance,
)
else:
print(
f"distance {z_distance_traveled} larger than min {probe_settings.min_z_distance}"
)
raise ThresholdReachedTooEarly

return position[mount_axis], encoder_position[mount_axis]

Expand Down
1 change: 0 additions & 1 deletion api/tests/opentrons/config/ot3_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@
"sensor_threshold_pascals": 17,
"expected_liquid_height": 90,
"log_pressure": True,
"home_plunger_at_start": False,
"aspirate_while_sensing": False,
"read_only": False,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ def fake_liquid_settings() -> LiquidProbeSettings:
sensor_threshold_pascals=15,
expected_liquid_height=109,
log_pressure=True,
home_plunger_at_start=False,
aspirate_while_sensing=False,
read_only=False,
)


Expand Down Expand Up @@ -628,13 +626,11 @@ async def test_liquid_probe(
starting_mount_height=fake_liquid_settings.starting_mount_height,
prep_move_speed=fake_liquid_settings.prep_move_speed,
)
breakpoint()
for call in mock_move_group_run.call_args_list:
move_group_runner = call[0][0]
for move_group in move_group_runner._move_groups:
assert move_group # don't pass in empty groups
assert len(move_group) == 2
breakpoint()
step = move_group[0][NodeId.pipette_left]
assert step.stop_condition == MoveStopCondition.none

Expand Down
Loading

0 comments on commit 07c68bc

Please sign in to comment.