Skip to content

Commit

Permalink
refactor(hardware): Update appropriate revision (#12130)
Browse files Browse the repository at this point in the history
We have separate firmware for each revision of each device. We need to
update devices with the firmware for their appropriate revision.

To that end, let's add the revision to the device info cache in the
network subsystem so we can then pass it back down to the firmware
update machinery and select the appropriate firmware file for the
revision.

We also need to establish a map of default revisions because some
devices won't have them. For instance, devices built during the DVT
phase of ot-3 development were built in large numbers before we added
the revision tracking to the firmware, so they won't have a revision and
it's impractical to reflash them all with an ISP (since the revision is
captured in the bootloader as well).

This required some refactoring in utils to keep function complexity
down. Specifically, check_firmware_update was pretty complex (by mccabe
complexity rules) because it was a linear series of transforms with each
transform validity-checked. By turning this into a chained series of
iterators and pushing down the validity into the iterators themselves,
we can make each iterator independently testable.

Closes RET-1323
  • Loading branch information
sfoster1 committed Feb 13, 2023
1 parent 08d6620 commit cc7824c
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 119 deletions.
7 changes: 2 additions & 5 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,10 @@ async def update_firmware(

log.info("Firmware updates are available.")
self.update_required = True
update_details = {
node_id: str(update.filepath)
for node_id, update in firmware_updates.items()
}

updater = firmware_update.RunUpdate(
messenger=self._messenger,
update_details=update_details,
update_details=firmware_updates,
retry_count=3,
timeout_seconds=20,
erase=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
PipetteInformation,
GripperInformation,
)
from opentrons_hardware.hardware_control.types import PCBARevision


@pytest.fixture
Expand Down Expand Up @@ -152,33 +153,19 @@ def mock_tool_detector(controller: OT3Controller):


@pytest.fixture
def fw_update_info() -> Dict[FirmwareUpdateType, UpdateInfo]:
update_info1 = UpdateInfo(
FirmwareUpdateType.head,
2,
"abc12345",
{"rev1": "/some/path/head.hex"},
filepath="/some/path/head.hex",
)
update_info2 = UpdateInfo(
FirmwareUpdateType.gantry_x,
2,
"abc12345",
{"rev1": "/some/path/gantry.hex"},
filepath="/some/path/gantry.hex",
)

update_info = {
FirmwareUpdateType.head: update_info1,
FirmwareUpdateType.gantry_x: update_info2,
def fw_update_info() -> Dict[NodeId, str]:
return {
NodeId.head: "/some/path/head.hex",
NodeId.gantry_x: "/some/path/gantry.hex",
}
return update_info


@pytest.fixture
def fw_node_info() -> Dict[NodeId, DeviceInfoCache]:
node_cache1 = DeviceInfoCache(NodeId.head, 1, "12345678", None)
node_cache2 = DeviceInfoCache(NodeId.gantry_x, 1, "12345678", None)
node_cache1 = DeviceInfoCache(NodeId.head, 1, "12345678", None, PCBARevision(None))
node_cache2 = DeviceInfoCache(
NodeId.gantry_x, 1, "12345678", None, PCBARevision(None)
)
return {NodeId.head: node_cache1, NodeId.gantry_x: node_cache2}


Expand Down Expand Up @@ -864,26 +851,15 @@ async def fake_umpe(


async def test_update_firmware_update_required(
controller: OT3Controller, fw_update_info: Dict[FirmwareUpdateType, UpdateInfo]
controller: OT3Controller, fw_update_info: Dict[NodeId, str]
) -> None:
"""Test that updates are started when shortsha's dont match."""
node_cache1 = DeviceInfoCache(NodeId.head, 1, "12345678", None)
node_cache2 = DeviceInfoCache(NodeId.gantry_x, 1, "12345678", None)

device_info_cache = {NodeId.head: node_cache1, NodeId.gantry_x: node_cache2}
controller._network_info._device_info_cache = device_info_cache

expected_update_details = {
NodeId.head: fw_update_info[FirmwareUpdateType.head].filepath,
NodeId.gantry_x: fw_update_info[FirmwareUpdateType.gantry_x].filepath,
}

# no updates have been started, but lets set this to true so we can assert later on
controller.update_required = True

# test that nodes in NetworkInfo._device_info_cache are updated if they are out of date
with mock.patch(
"opentrons_hardware.firmware_update.utils.load_firmware_manifest",
"opentrons_hardware.firmware_update.check_firmware_updates",
mock.Mock(return_value=fw_update_info),
), mock.patch(
"opentrons_hardware.firmware_update.RunUpdate"
Expand All @@ -893,7 +869,7 @@ async def test_update_firmware_update_required(
await controller.update_firmware({})
run_updates.assert_called_with(
messenger=controller._messenger,
update_details=expected_update_details,
update_details=fw_update_info,
retry_count=mock.ANY,
timeout_seconds=mock.ANY,
erase=True,
Expand All @@ -904,23 +880,17 @@ async def test_update_firmware_update_required(


async def test_update_firmware_up_to_date(
controller: OT3Controller,
fw_node_info: Dict[NodeId, DeviceInfoCache],
fw_update_info: Dict[FirmwareUpdateType, UpdateInfo],
controller: OT3Controller, fw_update_info: Dict[NodeId, str]
):
"""Test that updates are not started if they are not out-of-date (shortsha's match)."""
for node, update_info in fw_update_info.items():
node_id = NodeId.__members__.get(node.name)
fw_node_info[node_id].shortsha = update_info.shortsha

"""Test that updates are not started if they are not required."""
with mock.patch(
"opentrons_hardware.firmware_update.utils.load_firmware_manifest",
mock.Mock(return_value=update_info),
), mock.patch(
"opentrons_hardware.firmware_update.RunUpdate.run_updates"
) as run_updates, mock.patch.object(
controller._network_info, "probe"
) as probe:
) as probe, mock.patch(
"opentrons_hardware.firmware_update.check_firmware_updates",
mock.Mock(return_value={}),
):
await controller.update_firmware({})
assert not controller.update_required
run_updates.assert_not_called()
Expand All @@ -930,29 +900,28 @@ async def test_update_firmware_up_to_date(
async def test_update_firmware_specified_nodes(
controller: OT3Controller,
fw_node_info: Dict[NodeId, DeviceInfoCache],
fw_update_info: Dict[FirmwareUpdateType, UpdateInfo],
fw_update_info: Dict[NodeId, str],
):
"""Test that updates are started if nodes are NOT out-of-date when nodes are specified."""
for node_cache in fw_node_info.values():
node_cache.shortsha = "978abcde"

controller._network_info._device_info_cache = fw_node_info
expected_update_details = {
NodeId.head: fw_update_info[FirmwareUpdateType.head].filepath,
NodeId.gantry_x: fw_update_info[FirmwareUpdateType.gantry_x].filepath,
}
with mock.patch(
"opentrons_hardware.firmware_update.utils.load_firmware_manifest",
"opentrons_hardware.firmware_update.check_firmware_updates",
mock.Mock(return_value=fw_update_info),
), mock.patch(
) as check_updates, mock.patch(
"opentrons_hardware.firmware_update.RunUpdate"
) as run_updates, mock.patch.object(
controller._network_info, "probe"
) as probe:
await controller.update_firmware({}, nodes={NodeId.head, NodeId.gantry_x})
check_updates.assert_called_with(
fw_node_info, {}, nodes={NodeId.head, NodeId.gantry_x}
)
run_updates.assert_called_with(
messenger=controller._messenger,
update_details=expected_update_details,
update_details=fw_update_info,
retry_count=mock.ANY,
timeout_seconds=mock.ANY,
erase=True,
Expand All @@ -968,12 +937,9 @@ async def test_update_firmware_invalid_specified_node(
fw_update_info: Dict[FirmwareUpdateType, UpdateInfo],
):
"""Test that only nodes in device_info_cache are updated when nodes are specified."""
expected_update_details = {
NodeId.head: fw_update_info[FirmwareUpdateType.head].filepath
}
controller._network_info._device_info_cache = fw_node_info
with mock.patch(
"opentrons_hardware.firmware_update.utils.load_firmware_manifest",
"opentrons_hardware.firmware_update.check_firmware_updates",
mock.Mock(return_value=fw_update_info),
), mock.patch(
"opentrons_hardware.firmware_update.RunUpdate"
Expand All @@ -983,7 +949,7 @@ async def test_update_firmware_invalid_specified_node(
await controller.update_firmware({}, nodes={NodeId.head})
run_updates.assert_called_with(
messenger=controller._messenger,
update_details=expected_update_details,
update_details=fw_update_info,
retry_count=mock.ANY,
timeout_seconds=mock.ANY,
erase=True,
Expand Down
Loading

0 comments on commit cc7824c

Please sign in to comment.