Skip to content

Commit

Permalink
need to update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj committed Jul 6, 2023
1 parent 92b2674 commit 6181560
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 60 deletions.
24 changes: 18 additions & 6 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
create_gripper_jaw_hold_group,
create_tip_action_group,
PipetteAction,
create_gear_motor_home_group,
motor_nodes,
LIMIT_SWITCH_OVERTRAVEL_DISTANCE,
)
Expand Down Expand Up @@ -242,7 +243,7 @@ def __init__(
FirmwareUpdate(),
)
self._position = self._get_home_position()
self._gear_motor_position = None
self._gear_motor_position: float
self._encoder_position = self._get_home_position()
self._motor_status = {}
self._check_updates = check_updates
Expand Down Expand Up @@ -596,20 +597,31 @@ def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
)
return new_group

async def home_gear_motors(
self,
distance: float,
velocity: float,
) -> None:
move_group = create_gear_motor_home_group(distance, velocity)
runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True if not ff.stall_detection_enabled() else False,
)
positions = await runner.run(can_messenger=self._messenger)
if NodeId.pipette_left in positions:
self._gear_motor_position = positions[NodeId.pipette_left][0]

async def tip_action(
self,
moves: List[Move],
moves: List[Move[OT3Axis]],
tip_action: str = "home",
accelerate_during_move: bool = True,
) -> None:
group = create_tip_action_group(moves, [NodeId.pipette_left], tip_action, accelerate_during_move)
move_group = group
move_group = create_tip_action_group(moves, [NodeId.pipette_left], tip_action)
runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True if not ff.stall_detection_enabled() else False,
)
positions = await runner.run(can_messenger=self._messenger)
print(f"positions returned = {positions}")
if NodeId.pipette_left in positions:
self._gear_motor_position = positions[NodeId.pipette_left][0]

Expand Down
16 changes: 14 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def __init__(
self._update_required = False
self._initialized = False
self._lights = {"button": False, "rails": False}
self._gear_motor_position = 0

def _sanitize_attached_instrument(
mount: OT3Mount, passed_ai: Optional[Dict[str, Optional[str]]] = None
Expand Down Expand Up @@ -197,6 +198,10 @@ def initialized(self) -> bool:
def initialized(self, value: bool) -> None:
self._initialized = value

@property
def gear_motor_position(self) -> Optional[float]:
return self._gear_motor_position

@property
def board_revision(self) -> BoardRevision:
"""Get the board revision"""
Expand Down Expand Up @@ -244,6 +249,14 @@ async def update_motor_status(self) -> None:
async def gear_motor_position_estimation(self) -> None:
return None

@ensure_yield
async def home_gear_motors(
self,
distance: float,
velocity: float,
) -> None:
return None

@ensure_yield
async def update_motor_estimation(self, axes: Sequence[OT3Axis]) -> None:
"""Update motor position estimation for commanded nodes, and update cache of data."""
Expand Down Expand Up @@ -374,9 +387,8 @@ async def get_tip_present(self, mount: OT3Mount, tip_state: TipStateType) -> Non

async def tip_action(
self,
moves: List[Move],
moves: List[Move[OT3Axis]],
tip_action: str = "home",
accelerate_during_move: bool = True,
) -> None:
pass

Expand Down
27 changes: 20 additions & 7 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Shared utilities for ot3 hardware control."""
from typing import Dict, Iterable, List, Set, Tuple, TypeVar, Sequence, Optional
from typing_extensions import Literal
from opentrons.config.defaults_ot3 import DEFAULT_CALIBRATION_AXIS_MAX_SPEED
from opentrons.config.defaults_ot3 import (
DEFAULT_CALIBRATION_AXIS_MAX_SPEED,
DEFAULT_MAX_SPEED_DISCONTINUITY,
)
from opentrons.config.types import OT3MotionSettings, OT3CurrentSettings, GantryLoad
from opentrons.hardware_control.types import (
OT3Axis,
Expand Down Expand Up @@ -372,18 +375,14 @@ def create_home_group(
def create_tip_action_group(
moves: List[Move[OT3Axis]],
present_nodes: Iterable[NodeId],
action: PipetteTipActionType,
accelerate_during_move: bool = True,
action: str,
) -> MoveGroup:
move_group: MoveGroup = []
for move in moves:
unit_vector = move.unit_vector
for block in move.blocks:
velocities = unit_vector_multiplication(unit_vector, block.initial_speed)
if accelerate_during_move:
accelerations = unit_vector_multiplication(unit_vector, block.acceleration)
else:
accelerations = {OT3Axis.Q: 0}
accelerations = unit_vector_multiplication(unit_vector, block.acceleration)
step = create_tip_action_step(
velocity=_convert_to_node_id_dict(velocities),
acceleration=_convert_to_node_id_dict(accelerations),
Expand All @@ -395,6 +394,20 @@ def create_tip_action_group(
return move_group


def create_gear_motor_home_group(
distance: float,
velocity: float,
) -> MoveGroup:
step = create_tip_action_step(
velocity={NodeId.pipette_left: np.float64(velocity)},
acceleration={NodeId.pipette_left: np.float64(0)},
duration=np.float64(distance / velocity),
present_nodes=[NodeId.pipette_left],
action=PipetteTipActionType.home,
)
return [step]


def create_gripper_jaw_grip_group(
duty_cycle: float,
stop_condition: MoveStopCondition = MoveStopCondition.none,
Expand Down
50 changes: 27 additions & 23 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,21 +697,29 @@ async def home_z(
axes = list(OT3Axis.mount_axes())
await self.home(axes)

async def home_gear_motors(self, accelerate_during_move: bool = False) -> None:
if accelerate_during_move:
current_pos = await self._backend.gear_motor_position_estimation()
if current_pos == 0:
async def home_gear_motors(self) -> None:
gear_motor_axis_bound = self._backend.axis_bounds[OT3Axis.Q][1]
position_estimation = self._backend.gear_motor_position
velocity = self._config.motion_settings.default_max_speed[
GantryLoad.HIGH_THROUGHPUT
][OT3AxisKind.Q]
if position_estimation is not None:
position_valid = abs(position_estimation) < gear_motor_axis_bound
if position_valid:
position_dict = {OT3Axis.Q: position_estimation}
safe_home_target = {OT3Axis.Q: self._config.safe_home_distance}
moves = self._build_moves(position_dict, safe_home_target)

await self._backend.tip_action(moves[0], "clamp")
distance = self._backend.gear_motor_position

await self._backend.home_gear_motors(distance, velocity)
return
current_pos_dict = {OT3Axis.Q: current_pos[0]}
else:
# start at the axis boundary, and move without acceleration
# to avoid crashing if the gear motors' positions are not available
axis_bound = self._backend.axis_bounds[OT3Axis.Q][1]
current_pos_dict = {OT3Axis.Q: axis_bound}

home_target = {OT3Axis.Q: 0}
home_moves = self._build_moves(current_pos_dict, home_target)
await self._backend.tip_action(home_moves[0], "home", accelerate_during_move)
# if gear motor position is not known:
# start at the axis boundary, and move without acceleration
# to avoid crashing if the gear motors' positions are not available
axis_bound = self._backend.axis_bounds[OT3Axis.Q][1]
await self._backend.home_gear_motors(axis_bound, velocity)

async def home_gripper_jaw(self) -> None:
"""
Expand Down Expand Up @@ -1666,12 +1674,7 @@ async def _motor_pick_up_tip(

await self._backend.tip_action(clamp_moves[0], "clamp")

# adjust move group for home so that only third tip action request
# expects to hit the limit switch
gear_motor_position = {OT3Axis.Q: self._backend.gear_motor_position}
home_target = {OT3Axis.Q: 0}
home_moves = self._build_moves(gear_motor_position, home_target)
await self._backend.tip_action(home_moves[0], "home")
await self.home_gear_motors()

async def pick_up_tip(
self,
Expand Down Expand Up @@ -1749,7 +1752,9 @@ async def drop_tip(
# The speed check is needed because speed can sometimes be None.
# Not sure why

gear_start_position = await self._backend.gear_motor_position_estimation()
gear_start_position = (
await self._backend.gear_motor_position_estimation()
)
gear_start_pos_dict = {OT3Axis.Q: gear_start_position[0]}
drop_target_dict = {OT3Axis.Q: move.target_position}
drop_moves = self._build_moves(gear_start_pos_dict, drop_target_dict)
Expand All @@ -1758,7 +1763,7 @@ async def drop_tip(

current_gear_pos = await self._backend.gear_motor_position_estimation()
gear_pos_dict = {OT3Axis.Q: current_gear_pos[0]}
home_target_dict = {OT3Axis.Q: 0}
home_target_dict = {OT3Axis.Q: 0.0}
home_moves = self._build_moves(gear_pos_dict, home_target_dict)

await self._backend.tip_action(home_moves[0], "home")
Expand Down Expand Up @@ -1796,7 +1801,6 @@ async def drop_tip(

_remove()


async def clean_up(self) -> None:
"""Get the API ready to stop cleanly."""
await self._backend.clean_up()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ async def _parser_update_gear_motor_position_response(
reader: WaitableCallback, expected: NodeId
) -> Tuple[float, bool]:
async for response, arb_id in reader:
if isinstance(response, UpdateGearMotorPositionEstimationResponse): # or isinstance(response, TipActionResponse):
if isinstance(
response, UpdateGearMotorPositionEstimationResponse
): # or isinstance(response, TipActionResponse):
node = NodeId(arb_id.parts.originating_node_id)
if node == expected:
return (
Expand Down
26 changes: 5 additions & 21 deletions hardware/opentrons_hardware/hardware_control/move_group_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ def __init__(
"""
self._move_groups = move_groups
self._start_at_index = start_at_index
# self._ignore_stalls = ignore_stalls
self._ignore_stalls = True
self._ignore_stalls = ignore_stalls
self._is_prepped: bool = False

@staticmethod
Expand Down Expand Up @@ -164,30 +163,12 @@ async def run(

@staticmethod
def _accumulate_move_completions(
self,
completions: _Completions,
) -> NodeDict[Tuple[float, float, bool, bool]]:
position: NodeDict[
List[Tuple[Tuple[int, int], float, float, bool, bool]]
] = defaultdict(list)
for arbid, completion in completions:
if isinstance(completion, TipActionResponse):
# assuming the two gear motors finish at the same time in the case of
# the 96 channel, return their position
for move in self._expected_tip_action_motors:
if not any(self._expected_tip_action_motors):
# if the nested list is empty, we are done with all 3
# tip action moves and should return the current position

return {
arbid.parts.originating_node_id:
(
completion.payload.current_position_um.value / 1000.0,
completion.payload.encoder_position_um.value / 1000.0,
True,
True,
)
}
position[NodeId(arbid.parts.originating_node_id)].append(
(
(
Expand Down Expand Up @@ -374,7 +355,9 @@ def __init__(self, move_groups: MoveGroups, start_at_index: int = 0) -> None:
move_set.update(set((k.value, seq_id) for k in move.keys()))
duration += float(movesteps[0].duration_sec)
if any(isinstance(g, MoveGroupTipActionStep) for g in movesteps):
self._expected_tip_action_motors.append([GearMotorId.left, GearMotorId.right])
self._expected_tip_action_motors.append(
[GearMotorId.left, GearMotorId.right]
)
for step in move_group[seq_id]:
stop_cond.append(move_group[seq_id][step].stop_condition)

Expand Down Expand Up @@ -472,6 +455,7 @@ def __call__(
print(f"expected tip motors = {self._expected_tip_action_motors}")
if len(self._expected_tip_action_motors[seq_id]) == 0:
self._remove_move_group(message, arbitration_id)
self._handle_move_completed(message, arbitration_id)
elif isinstance(message, ErrorMessage):
self._handle_error(message, arbitration_id)

Expand Down

0 comments on commit 6181560

Please sign in to comment.