Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): implement labware.set_offset in 2.18 #14940

Merged
merged 10 commits into from
Apr 25, 2024
1 change: 1 addition & 0 deletions api/src/opentrons/protocol_api/core/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .well import WellCore

ENGINE_CORE_API_VERSION: Final = APIVersion(2, 14)
SET_OFFSET_RESTORED_API_VERSION: Final = APIVersion(2, 18)

__all__ = [
"ENGINE_CORE_API_VERSION",
Expand Down
29 changes: 27 additions & 2 deletions api/src/opentrons/protocol_api/core/engine/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@

from opentrons.protocol_engine.errors import LabwareNotOnDeckError, ModuleNotOnDeckError
from opentrons.protocol_engine.clients import SyncClient as ProtocolEngineClient
from opentrons.protocol_engine.types import (
LabwareOffsetCreate,
LabwareOffsetVector,
)
from opentrons.types import DeckSlotName, Point
from opentrons.hardware_control.nozzle_manager import NozzleMap


from ..labware import AbstractLabware, LabwareLoadParams
from .well import WellCore

Expand Down Expand Up @@ -92,8 +97,28 @@ def get_quirks(self) -> List[str]:
return self._definition.parameters.quirks or []

def set_calibration(self, delta: Point) -> None:
raise NotImplementedError(
"Setting a labware's calibration after it's been loaded is not supported."
"""Add a labware offset for this labware at its current location.

This will override any previous labware offsets for this definition URI and location,
even if the other labware offset was for a different specific labware instance.
"""
offset_location = self._engine_client.state.geometry.get_offset_location(
self._labware_id
)
if not offset_location:
raise LabwareNotOnDeckError(
message=f"Cannot set offset for {self.get_name()} as it is not currently in a deck slot.",
details={"kind": "labware-not-in-slot"},
)

request = LabwareOffsetCreate.construct(
definitionUri=self.get_uri(),
location=offset_location,
vector=LabwareOffsetVector(x=delta.x, y=delta.y, z=delta.z),
)
self._engine_client.add_labware_offset(request)
self._engine_client.reload_labware(
labware_id=self._labware_id,
)

def get_calibrated_offset(self) -> Point:
Expand Down
17 changes: 7 additions & 10 deletions api/src/opentrons/protocol_api/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ._liquid import Liquid
from ._types import OffDeckType
from .core import well_grid
from .core.engine import ENGINE_CORE_API_VERSION
from .core.engine import ENGINE_CORE_API_VERSION, SET_OFFSET_RESTORED_API_VERSION
from .core.labware import AbstractLabware
from .core.module import AbstractModuleCore
from .core.core_map import LoadedCoreMap
Expand Down Expand Up @@ -594,16 +594,13 @@ def set_offset(self, x: float, y: float, z: float) -> None:
Instead, use Labware Position Check in the app or on the touchscreen.

"""
if self._api_version >= ENGINE_CORE_API_VERSION:
# TODO(mm, 2023-02-13): See Jira RCORE-535.
#
# Until that issue is resolved, the only way to simulate or run a
# >=ENGINE_CORE_API_VERSION protocol is through the Opentrons App.
# Therefore, in >=ENGINE_CORE_API_VERSION protocols,
# there's no legitimate way to use this method.
if (
self._api_version >= ENGINE_CORE_API_VERSION
and self._api_version < SET_OFFSET_RESTORED_API_VERSION
):
raise APIVersionError(
"Labware.set_offset() is not supported when apiLevel is 2.14 or higher."
" Use a lower apiLevel"
"Labware.set_offset() is not supported when apiLevel is 2.14, 2.15, 2.16, or 2.17."
" Use apilevel 2.13 or below, or 2.18 or above to set offset,"
" or use the Opentrons App's Labware Position Check."
)
else:
Expand Down
5 changes: 5 additions & 0 deletions api/src/opentrons/protocol_engine/clients/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Liquid,
NozzleLayoutConfigurationType,
AddressableOffsetVector,
LabwareOffsetCreate,
)
from .transports import ChildThreadTransport

Expand Down Expand Up @@ -92,6 +93,10 @@ def reset_tips(self, labware_id: str) -> None:
labware_id=labware_id,
)

def add_labware_offset(self, request: LabwareOffsetCreate) -> None:
"""Add a labware offset."""
self._transport.call_method("add_labware_offset", request=request)

def set_pipette_movement_speed(
self, pipette_id: str, speed: Optional[float]
) -> None:
Expand Down
46 changes: 46 additions & 0 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
AddressableAreaLocation,
AddressableOffsetVector,
StagingSlotLocation,
LabwareOffsetLocation,
)
from .config import Config
from .labware import LabwareView
Expand Down Expand Up @@ -1090,3 +1091,48 @@ def _labware_gripper_offsets(
return slot_based_offset or self._labware.get_labware_gripper_offsets(
labware_id=labware_id, slot_name=None
)

def get_offset_location(self, labware_id: str) -> Optional[LabwareOffsetLocation]:
"""Provide the LabwareOffsetLocation specifying the current position of the labware.

If the labware is in a location that cannot be specified by a LabwareOffsetLocation
(for instance, OFF_DECK) then return None.
"""
parent_location = self._labware.get_location(labware_id)

if isinstance(parent_location, DeckSlotLocation):
return LabwareOffsetLocation(
slotName=parent_location.slotName, moduleModel=None, definitionUri=None
)
elif isinstance(parent_location, ModuleLocation):
module_model = self._modules.get_requested_model(parent_location.moduleId)
module_location = self._modules.get_location(parent_location.moduleId)
return LabwareOffsetLocation(
slotName=module_location.slotName,
moduleModel=module_model,
definitionUri=None,
)
elif isinstance(parent_location, OnLabwareLocation):
non_labware_parent_location = self._labware.get_parent_location(labware_id)

parent_uri = self._labware.get_definition_uri(parent_location.labwareId)
if isinstance(non_labware_parent_location, DeckSlotLocation):
return LabwareOffsetLocation(
slotName=non_labware_parent_location.slotName,
moduleModel=None,
definitionUri=parent_uri,
)
elif isinstance(non_labware_parent_location, ModuleLocation):
module_model = self._modules.get_requested_model(
non_labware_parent_location.moduleId
)
module_location = self._modules.get_location(
non_labware_parent_location.moduleId
)
return LabwareOffsetLocation(
slotName=module_location.slotName,
moduleModel=module_model,
definitionUri=parent_uri,
)

return None
130 changes: 130 additions & 0 deletions api/tests/opentrons/protocol_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,131 @@
"""Tests for opentrons.protocol_api."""
from typing import List, overload, Optional

from opentrons.protocols.api_support.types import APIVersion
from opentrons.protocol_api import (
MAX_SUPPORTED_VERSION,
MIN_SUPPORTED_VERSION,
MIN_SUPPORTED_VERSION_FOR_FLEX,
)


def versions_at_or_above(from_version: APIVersion) -> List[APIVersion]:
"""Get a list of versions >= the specified one."""
return versions_between(
low_inclusive_bound=from_version, high_inclusive_bound=MAX_SUPPORTED_VERSION
)


def versions_at_or_below(
from_version: APIVersion, flex_only: bool = False
) -> List[APIVersion]:
"""Get a list of versions <= the specified one.

Since there are different minimum supported versions for Flex and OT-2, specify which you care about
with the second argument.
"""
if flex_only:
return versions_between(
low_inclusive_bound=MIN_SUPPORTED_VERSION_FOR_FLEX,
high_inclusive_bound=from_version,
)
else:
return versions_between(
low_inclusive_bound=MIN_SUPPORTED_VERSION, high_inclusive_bound=from_version
)


def versions_above(from_version: APIVersion) -> List[APIVersion]:
"""Get a list of versions > the specified one."""
return versions_between(
low_exclusive_bound=from_version, high_inclusive_bound=MAX_SUPPORTED_VERSION
)


def versions_below(from_version: APIVersion, flex_only: bool) -> List[APIVersion]:
"""Get a list of versions < the specified one.

Since there are different minimum supported versions for Flex and OT-2, specify which you care about
with the second argument.
"""
if flex_only:
return versions_between(
low_inclusive_bound=MIN_SUPPORTED_VERSION_FOR_FLEX,
high_exclusive_bound=from_version,
)
else:
return versions_between(
low_inclusive_bound=MIN_SUPPORTED_VERSION, high_exclusive_bound=from_version
)


@overload
def versions_between(
*,
low_inclusive_bound: APIVersion,
high_inclusive_bound: APIVersion,
) -> List[APIVersion]:
...


@overload
def versions_between(
*, low_inclusive_bound: APIVersion, high_exclusive_bound: APIVersion
) -> List[APIVersion]:
...


@overload
def versions_between(
*,
high_inclusive_bound: APIVersion,
low_exclusive_bound: APIVersion,
) -> List[APIVersion]:
...


@overload
def versions_between(
*, low_exclusive_bound: APIVersion, high_exclusive_bound: APIVersion
) -> List[APIVersion]:
...


def versions_between(
low_inclusive_bound: Optional[APIVersion] = None,
high_inclusive_bound: Optional[APIVersion] = None,
low_exclusive_bound: Optional[APIVersion] = None,
high_exclusive_bound: Optional[APIVersion] = None,
) -> List[APIVersion]:
"""Build a list of versions based on exclusive and inclusive constraints."""
if low_inclusive_bound and high_inclusive_bound:
assert (
low_inclusive_bound.major == high_inclusive_bound.major
), "You need to change this test when you add a new major version"
major = low_inclusive_bound.major
start = low_inclusive_bound.minor
stop = high_inclusive_bound.minor + 1
elif low_inclusive_bound and high_exclusive_bound:
assert (
low_inclusive_bound.major == high_exclusive_bound.major
), "You need to change this test when you add a new major version"
major = low_inclusive_bound.major
start = low_inclusive_bound.minor
stop = high_exclusive_bound.minor
elif low_exclusive_bound and high_inclusive_bound:
assert (
low_exclusive_bound.major == high_inclusive_bound.major
), "You need to change this test when you add a new major version"
major = low_exclusive_bound.major
start = low_exclusive_bound.minor + 1
stop = high_inclusive_bound.minor + 1
elif low_exclusive_bound and high_exclusive_bound:
assert (
low_exclusive_bound.major == high_exclusive_bound.major
), "You need to change this test when you add a new major version"
major = low_exclusive_bound.major
start = low_exclusive_bound.minor + 1
stop = high_exclusive_bound.minor
else:
raise ValueError("You must specify one low bound and one high bound")
return [APIVersion(major, minor) for minor in range(start, stop)]
Loading
Loading