Skip to content

Commit

Permalink
modify tip action, debugging code still in there
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj committed Jun 28, 2023
1 parent 9070819 commit 5856a53
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 184 deletions.
78 changes: 36 additions & 42 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
create_gripper_jaw_home_group,
create_gripper_jaw_hold_group,
create_tip_action_group,
create_fast_tip_action_group,
PipetteAction,
motor_nodes,
LIMIT_SWITCH_OVERTRAVEL_DISTANCE,
Expand Down Expand Up @@ -74,7 +73,7 @@
from opentrons_hardware.hardware_control.motor_position_status import (
get_motor_position,
update_motor_position_estimation,
# update_gear_motor_position_estimation,
update_gear_motor_position_estimation,
)
from opentrons_hardware.hardware_control.limit_switches import get_limit_switches
from opentrons_hardware.hardware_control.tip_presence import get_tip_ejector_state
Expand Down Expand Up @@ -326,16 +325,16 @@ async def update_motor_status(self) -> None:
response = await get_motor_position(self._messenger, motor_nodes)
self._handle_motor_status_response(response)

# async def gear_motor_position_estimation(self) -> Tuple[float, bool]:
# motor_response = await update_gear_motor_position_estimation(self._messenger)
# updated_successfully = motor_response[1]
# if updated_successfully:
# self._gear_motor_position = motor_response[0]
# return self._gear_motor_position, True
# else:
# # if the gear motor didn't update successfully, return the most recent
# # position estimate
# return self._gear_motor_position, False
async def gear_motor_position_estimation(self) -> Tuple[float, bool]:
motor_response = await update_gear_motor_position_estimation(self._messenger)
updated_successfully = motor_response[1]
if updated_successfully:
self._gear_motor_position = motor_response[0]
return self._gear_motor_position, True
else:
# if the gear motor didn't update successfully, return the most recent
# position estimate
return self._gear_motor_position, False

async def update_motor_estimation(self, axes: Sequence[OT3Axis]) -> None:
"""Update motor position estimation for commanded nodes, and update cache of data."""
Expand Down Expand Up @@ -607,19 +606,14 @@ def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
)
return new_group

async def fast_tip_action(
self,
origin: Dict[OT3Axis, float],
moves: List[Move],
tip_action: str = "clamp",
async def tip_action(
self,
moves: List[Move],
tip_action: str = "home",
accelerate_during_move: bool = True,
) -> None:
if tip_action == "home":
stop_condition = MoveStopCondition.limit_switch
else:
stop_condition = MoveStopCondition.none
breakpoint()
group = create_move_group(origin, moves, [NodeId.pipette_left], stop_condition, tip_action)
move_group, _ = group
group = create_tip_action_group(moves, [NodeId.pipette_left], tip_action, accelerate_during_move)
move_group = group
runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True if not ff.stall_detection_enabled() else False,
Expand All @@ -628,24 +622,24 @@ async def fast_tip_action(
# print(f"done w move group runner: {positions} at {time.time()}")
if NodeId.pipette_left in positions:
self._gear_motor_position = positions[NodeId.pipette_left][0]
# print(f"gear position is now {self._gear_motor_position}")

async def tip_action(
self,
axes: Sequence[OT3Axis],
distance: float,
speed: float,
acceleration: float = 0,
tip_action: str = "home",
) -> None:
if tip_action == "home":
speed = speed * -1
move_group = create_tip_action_group(
axes, distance, speed, acceleration, cast(PipetteAction, tip_action)
)
# print(f"about to create runner at {time.time()}")
runner = MoveGroupRunner(move_groups=[move_group])
positions = await runner.run(can_messenger=self._messenger)
print(f"gear position is now {self._gear_motor_position}")

# async def tip_action(
# self,
# axes: Sequence[OT3Axis],
# distance: float,
# speed: float,
# acceleration: float = 0,
# tip_action: str = "home",
# ) -> None:
# if tip_action == "home":
# speed = speed * -1
# move_group = create_tip_action_group(
# axes, distance, speed, acceleration, cast(PipetteAction, tip_action)
# )
# # print(f"about to create runner at {time.time()}")
# runner = MoveGroupRunner(move_groups=[move_group])
# positions = await runner.run(can_messenger=self._messenger)
# print(f"positions returned {positions}, at {time.time()}")
# await update_gear_motor_position_estimation(self._messenger)

Expand Down
79 changes: 39 additions & 40 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,9 @@ def create_move_group(
moves: List[Move[OT3Axis]],
present_nodes: Iterable[NodeId],
stop_condition: MoveStopCondition,
tip_action_type: Optional[PipetteTipActionType] = None,
) -> Tuple[MoveGroup, Dict[NodeId, float]]:
pos = _convert_to_node_id_dict(origin)
move_group: MoveGroup = []
if tip_action_type is not None:
tip_action_step = PipetteTipActionType[tip_action_type]
else:
tip_action_step = None
for move in moves:
unit_vector = move.unit_vector
for block in move.blocks:
Expand All @@ -326,7 +321,6 @@ def create_move_group(
duration=block.time,
present_nodes=present_nodes,
stop_condition=stop_condition,
tip_action_step=tip_action_step,
)
for ax in pos.keys():
pos[ax] += node_id_distances.get(ax, 0)
Expand All @@ -351,43 +345,48 @@ def create_home_group(
return move_group


def create_fast_tip_action_group(
axes: Sequence[OT3Axis],
move: Move,
action: str = "clamp",
def create_tip_action_group(
moves: List[Move[OT3Axis]],
present_nodes: Iterable[NodeId],
action: PipetteTipActionType,
accelerate_during_move: bool = True,
) -> MoveGroup:
current_nodes = [axis_to_node(ax) for ax in axes]
tip_action_group = []
for block in move.blocks:
if block.initial_speed == 0:
continue
step = create_tip_action_step(
velocity={node_id: np.float64(block.initial_speed) for node_id in current_nodes},
distance={node_id: np.float64(block.distance) for node_id in current_nodes},
acceleration={node_id: np.float64(block.acceleration) for node_id in current_nodes},
present_nodes=current_nodes,
action=PipetteTipActionType[action],
)
tip_action_group.append(step)
return tip_action_group
move_group: MoveGroup = []
for move in moves:
unit_vector = move.unit_vector
for block in move.blocks:
velocities = unit_vector_multiplication(unit_vector, block.initial_speed)
if accelerate_during_move:
accelerations = unit_vector_multiplication(unit_vector, block.acceleration)
else:
accelerations = {OT3Axis.Q: 0}
step = create_tip_action_step(
velocity=_convert_to_node_id_dict(velocities),
acceleration=_convert_to_node_id_dict(accelerations),
duration=block.time,
present_nodes=present_nodes,
action=PipetteTipActionType[action],
)
move_group.append(step)
return move_group


def create_tip_action_group(
axes: Sequence[OT3Axis],
distance: float,
velocity: float,
acceleration: float,
action: PipetteAction,
) -> MoveGroup:
current_nodes = [axis_to_node(ax) for ax in axes]
step = create_tip_action_step(
velocity={node_id: np.float64(velocity) for node_id in current_nodes},
distance={node_id: np.float64(distance) for node_id in current_nodes},
acceleration={node_id: np.float64(acceleration) for node_id in current_nodes},
present_nodes=current_nodes,
action=PipetteTipActionType[action],
)
return [step]
# def create_tip_action_group(
# axes: Sequence[OT3Axis],
# distance: float,
# velocity: float,
# acceleration: float,
# action: PipetteAction,
# ) -> MoveGroup:
# current_nodes = [axis_to_node(ax) for ax in axes]
# step = create_tip_action_step(
# velocity={node_id: np.float64(velocity) for node_id in current_nodes},
# distance={node_id: np.float64(distance) for node_id in current_nodes},
# acceleration={node_id: np.float64(acceleration) for node_id in current_nodes},
# present_nodes=current_nodes,
# action=PipetteTipActionType[action],
# )
# return [step]


def create_gripper_jaw_grip_group(
Expand Down
47 changes: 32 additions & 15 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,22 @@ async def _home(self, axes: Sequence[OT3Axis]) -> None:
await self._cache_current_position()
await self._cache_encoder_position()

async def home_gear_motors(self, accelerate_during_move: bool = False) -> None:
if accelerate_during_move:
current_pos = await self._backend.gear_motor_position_estimation()
if current_pos == 0:
return
print(f"got current gear position = {current_pos}")
current_pos_dict = {OT3Axis.Q: current_pos[0]}
# current_pos_dict = {OT3Axis.Q: 5}
# breakpoint()
else:
current_pos_dict = {OT3Axis.Q: 50}
home_target = {OT3Axis.Q: 0}
home_moves = self._build_moves(current_pos_dict, home_target)
await self._backend.tip_action(home_moves[0], "home", accelerate_during_move)


@ExecutionManagerProvider.wait_for_running
async def home(
self, axes: Optional[Union[List[Axis], List[OT3Axis]]] = None
Expand All @@ -1233,7 +1249,8 @@ async def home(
else:
checked_axes = [ax for ax in OT3Axis if ax != OT3Axis.Q]
if self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
checked_axes.append(OT3Axis.Q)
# checked_axes.append(OT3Axis.Q)
await self.home_gear_motors()
self._log.info(f"Homing {axes}")

home_seq = [
Expand Down Expand Up @@ -1630,7 +1647,7 @@ async def _motor_pick_up_tip(
gear_target_dict = {OT3Axis.Q: gear_motor_target}
clamp_moves = self._build_moves(gear_origin_dict, gear_target_dict)

await self._backend.fast_tip_action(gear_origin_dict, clamp_moves[0], "clamp")
await self._backend.tip_action(clamp_moves[0], "clamp")

# else:
# await self._backend.tip_action(
Expand All @@ -1642,22 +1659,22 @@ async def _motor_pick_up_tip(
# )

gear_motor_position = {OT3Axis.Q: self._backend.gear_motor_position}
# print(f"gear motor pos = {gear_motor_position}")
home_target = {OT3Axis.Q: 5}
print(f"gear motor pos = {gear_motor_position}")
home_target = {OT3Axis.Q: 0}
# print(f"passing in gear motor position = {gear_motor_position}")
home_moves = self._build_moves(gear_motor_position, home_target)
# print(f"home moves = {home_moves[0]}")
await self._backend.fast_tip_action(gear_motor_position, home_moves[0], "clamp")
print(f"done almost homing at {time.time()}")

await self._backend.tip_action(
[OT3Axis.of_main_tool_actuator(mount)],
# float(pipette_spec.pick_up_distance + pipette_spec.home_buffer),
float(5.5),
float(pipette_spec.speed - 1),
float(0),
"home",
)
await self._backend.tip_action(home_moves[0], "home")
# print(f"done almost homing at {time.time()}")

# await self._backend.tip_action(
# [OT3Axis.of_main_tool_actuator(mount)],
# # float(pipette_spec.pick_up_distance + pipette_spec.home_buffer),
# float(5.5),
# float(pipette_spec.speed - 1),
# float(0),
# "home",
# )

async def pick_up_tip(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ def serialize(self) -> bytes:
"""
string = self._get_format_string()
vals = [x.value for x in astuple(self)]
# breakpoint()
try:
return struct.pack(string, *vals)
except struct.error as e:
# breakpoint()
raise SerializationException(str(e))

@classmethod
Expand Down
Loading

0 comments on commit 5856a53

Please sign in to comment.