From 2108efa1a762acc9d7e83d432a72822441a7a8b5 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Sat, 2 Mar 2024 16:13:52 -0500 Subject: [PATCH 01/13] fix notifyrefetchbody syntax --- robot-server/robot_server/service/json_api/response.py | 3 ++- .../robot_server/service/notifications/notification_client.py | 4 ++-- robot-server/tests/service/json_api/test_response.py | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/robot-server/robot_server/service/json_api/response.py b/robot-server/robot_server/service/json_api/response.py index a43e6c11568..2a0d350c7fc 100644 --- a/robot-server/robot_server/service/json_api/response.py +++ b/robot-server/robot_server/service/json_api/response.py @@ -285,5 +285,6 @@ class ResponseList(BaseModel, Generic[ResponseDataT]): class NotifyRefetchBody(BaseResponseBody): - "A notification response that returns a flag for refetching via HTTP." + """A notification response that returns a flag for refetching via HTTP.""" + refetchUsingHTTP: bool = True diff --git a/robot-server/robot_server/service/notifications/notification_client.py b/robot-server/robot_server/service/notifications/notification_client.py index 1ca2703d031..221da1d9415 100644 --- a/robot-server/robot_server/service/notifications/notification_client.py +++ b/robot-server/robot_server/service/notifications/notification_client.py @@ -78,7 +78,7 @@ async def disconnect(self) -> None: await to_thread.run_sync(self.client.disconnect) async def publish_async( - self, topic: str, message: NotifyRefetchBody = NotifyRefetchBody() + self, topic: str, message: NotifyRefetchBody = NotifyRefetchBody.construct() ) -> None: """Asynchronously Publish a message on a specific topic to the MQTT broker. @@ -89,7 +89,7 @@ async def publish_async( await to_thread.run_sync(self.publish, topic, message) def publish( - self, topic: str, message: NotifyRefetchBody = NotifyRefetchBody() + self, topic: str, message: NotifyRefetchBody = NotifyRefetchBody.construct() ) -> None: """Publish a message on a specific topic to the MQTT broker. diff --git a/robot-server/tests/service/json_api/test_response.py b/robot-server/tests/service/json_api/test_response.py index 4424774140a..57b937b158c 100644 --- a/robot-server/tests/service/json_api/test_response.py +++ b/robot-server/tests/service/json_api/test_response.py @@ -115,7 +115,9 @@ class ResponseSpec(NamedTuple): "links": {"sibling": {"href": "/bar", "meta": None}}, }, ), - ResponseSpec(subject=NotifyRefetchBody(), expected={"refetchUsingHTTP": True}), + ResponseSpec( + subject=NotifyRefetchBody.construct(), expected={"refetchUsingHTTP": True} + ), ] From 4044db8e36c35dfad4e8ba09b936d3b18f32bc8f Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Sat, 2 Mar 2024 17:06:53 -0500 Subject: [PATCH 02/13] refactor(robot-server): send unsubscribe flags from dynamic topics This is the robot-server groundwork required to refactor MQTT client subscription logic. Instead of relying on the app-shell's ConnectionManager to determine topic subscription status based on component interest, we greatly reduce the notification system's complexity and bug surface by requiring the robort-server to broadcast an unsubscribe flag for dynamic topics (ex, /runs/:runId). The tradeoff here is the app may be subscribed to topics it actually has no interest in, but the overhead for this is insigificant: occasional refetch flags sent by the robot-server that the client doesn't do anything with (in fact, no longer does the client need to send repeated unsubscribe then subsequent resubscribe packets). --- .../robot_server/service/json_api/__init__.py | 2 ++ .../robot_server/service/json_api/response.py | 10 ++++++ .../notifications/notification_client.py | 31 +++++++++++++------ .../publishers/runs_publisher.py | 7 +++-- .../tests/service/json_api/test_response.py | 5 ++- 5 files changed, 43 insertions(+), 12 deletions(-) diff --git a/robot-server/robot_server/service/json_api/__init__.py b/robot-server/robot_server/service/json_api/__init__.py index 8966763cb53..2680c99049f 100644 --- a/robot-server/robot_server/service/json_api/__init__.py +++ b/robot-server/robot_server/service/json_api/__init__.py @@ -16,6 +16,7 @@ PydanticResponse, ResponseList, NotifyRefetchBody, + NotifyUnsubscribeBody, ) @@ -46,4 +47,5 @@ "ResponseList", # notify models "NotifyRefetchBody", + "NotifyUnsubscribeBody", ] diff --git a/robot-server/robot_server/service/json_api/response.py b/robot-server/robot_server/service/json_api/response.py index 2a0d350c7fc..6fdfc77a9b4 100644 --- a/robot-server/robot_server/service/json_api/response.py +++ b/robot-server/robot_server/service/json_api/response.py @@ -288,3 +288,13 @@ class NotifyRefetchBody(BaseResponseBody): """A notification response that returns a flag for refetching via HTTP.""" refetchUsingHTTP: bool = True + + +class NotifyUnsubscribeBody(BaseResponseBody): + """A notification response. + + Returns flags for refetching via HTTP and unsubscribing from a topic. + """ + + refetchUsingHTTP: bool = True + unsubscribe: bool = True diff --git a/robot-server/robot_server/service/notifications/notification_client.py b/robot-server/robot_server/service/notifications/notification_client.py index 221da1d9415..e969a76c52e 100644 --- a/robot-server/robot_server/service/notifications/notification_client.py +++ b/robot-server/robot_server/service/notifications/notification_client.py @@ -3,10 +3,10 @@ import paho.mqtt.client as mqtt from anyio import to_thread from fastapi import Depends -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union from enum import Enum -from ..json_api import NotifyRefetchBody +from ..json_api import NotifyRefetchBody, NotifyUnsubscribeBody from server_utils.fastapi_utils.app_state import ( AppState, AppStateAccessor, @@ -77,26 +77,28 @@ async def disconnect(self) -> None: self.client.loop_stop() await to_thread.run_sync(self.client.disconnect) - async def publish_async( - self, topic: str, message: NotifyRefetchBody = NotifyRefetchBody.construct() - ) -> None: + async def publish_async(self, topic: str, should_unsubscribe: bool = False) -> None: """Asynchronously Publish a message on a specific topic to the MQTT broker. Args: topic: The topic to publish the message on. - message: The message to be published, in the format of NotifyRefetchBody. + should_unsubscribe: Whether the client should unsubscribe from the topic. + """ - await to_thread.run_sync(self.publish, topic, message) + await to_thread.run_sync(self.publish, topic, should_unsubscribe) def publish( - self, topic: str, message: NotifyRefetchBody = NotifyRefetchBody.construct() + self, + topic: str, + should_unsubscribe: bool = False, ) -> None: """Publish a message on a specific topic to the MQTT broker. Args: topic: The topic to publish the message on. - message: The message to be published. + should_unsubscribe: Whether the client should unsubscribe from the topic. """ + message = self._create_message(should_unsubscribe) payload = message.json() self.client.publish( topic=topic, @@ -105,6 +107,17 @@ def publish( retain=self._retain_message, ) + def _create_message( + self, should_unsubscribe: bool + ) -> Union[NotifyRefetchBody, NotifyUnsubscribeBody]: + message: Union[NotifyRefetchBody, NotifyUnsubscribeBody] + message = ( + NotifyUnsubscribeBody.construct() + if should_unsubscribe + else NotifyRefetchBody.construct() + ) + return message + def _on_connect( self, client: mqtt.Client, diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index 11222005b05..c2bc494c990 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -66,14 +66,17 @@ async def stop_polling_engine_store(self) -> None: self._previous_state_summary_status = None await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) - def publish_runs(self, run_id: str) -> None: + def publish_runs(self, run_id: str, should_unsubscribe: bool = False) -> None: """Publishes the equivalent of GET /runs and GET /runs/:runId. Args: run_id: ID of the current run. + should_unsubscribe: Whether the client should unsubscribe from the run_id topic. """ self._client.publish(topic=Topics.RUNS) - self._client.publish(topic=f"{Topics.RUNS}/{run_id}") + self._client.publish( + topic=f"{Topics.RUNS}/{run_id}", should_unsubscribe=should_unsubscribe + ) async def _poll_engine_store( self, diff --git a/robot-server/tests/service/json_api/test_response.py b/robot-server/tests/service/json_api/test_response.py index 57b937b158c..124d3288cda 100644 --- a/robot-server/tests/service/json_api/test_response.py +++ b/robot-server/tests/service/json_api/test_response.py @@ -13,6 +13,7 @@ MultiBody, MultiBodyMeta, NotifyRefetchBody, + NotifyUnsubscribeBody, DeprecatedResponseModel, DeprecatedMultiResponseModel, ) @@ -115,8 +116,10 @@ class ResponseSpec(NamedTuple): "links": {"sibling": {"href": "/bar", "meta": None}}, }, ), + ResponseSpec(subject=NotifyRefetchBody(), expected={"refetchUsingHTTP": True}), ResponseSpec( - subject=NotifyRefetchBody.construct(), expected={"refetchUsingHTTP": True} + subject=NotifyUnsubscribeBody(), + expected={"refetchUsingHTTP": True, "unsubscribe": True}, ), ] From 177f47d03090d4f44a5f356383b125e1a0b4be67 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Sat, 2 Mar 2024 17:28:58 -0500 Subject: [PATCH 03/13] refactor(robot-server): add the should_unsubscribe flag for /runs/:runId topic --- robot-server/robot_server/runs/run_store.py | 2 +- .../publishers/runs_publisher.py | 46 +++++++++++-------- robot-server/tests/runs/test_run_store.py | 13 +++++- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/robot-server/robot_server/runs/run_store.py b/robot-server/robot_server/runs/run_store.py index aa65ce19704..e09a991c942 100644 --- a/robot-server/robot_server/runs/run_store.py +++ b/robot-server/robot_server/runs/run_store.py @@ -411,7 +411,7 @@ def remove(self, run_id: str) -> None: raise RunNotFoundError(run_id) self._clear_caches() - self._runs_publisher.publish_runs(run_id=run_id) + self._runs_publisher.publish_runs(run_id=run_id, should_unsubscribe=True) def _run_exists( self, run_id: str, connection: sqlalchemy.engine.Connection diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index c2bc494c990..569a0e15e12 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -56,7 +56,7 @@ async def begin_polling_engine_store( ) async def stop_polling_engine_store(self) -> None: - """Stops polling the engine store.""" + """Stops polling the engine store. Run-related topics will publish as the poller is cancelled.""" if self._poller is not None: self._run_data_manager_polling.set() self._poller.cancel() @@ -64,7 +64,6 @@ async def stop_polling_engine_store(self) -> None: self._run_data_manager_polling.clear() self._previous_current_command = None self._previous_state_summary_status = None - await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) def publish_runs(self, run_id: str, should_unsubscribe: bool = False) -> None: """Publishes the equivalent of GET /runs and GET /runs/:runId. @@ -90,21 +89,25 @@ async def _poll_engine_store( get_current_command: Retrieves the engine store's current command. run_id: ID of the current run. """ - while not self._run_data_manager_polling.is_set(): - current_command = get_current_command(run_id) - current_state_summary = get_state_summary(run_id) - current_state_summary_status = ( - current_state_summary.status if current_state_summary else None - ) + try: + while not self._run_data_manager_polling.is_set(): + current_command = get_current_command(run_id) + current_state_summary = get_state_summary(run_id) + current_state_summary_status = ( + current_state_summary.status if current_state_summary else None + ) - if self._previous_current_command != current_command: - await self._publish_current_command() - self._previous_current_command = current_command + if self._previous_current_command != current_command: + await self._publish_current_command() + self._previous_current_command = current_command - if self._previous_state_summary_status != current_state_summary_status: - await self._publish_runs_async(run_id=run_id) - self._previous_state_summary_status = current_state_summary_status - await asyncio.sleep(1) + if self._previous_state_summary_status != current_state_summary_status: + await self._publish_runs_async(run_id=run_id) + self._previous_state_summary_status = current_state_summary_status + await asyncio.sleep(1) + except asyncio.CancelledError: + await self._publish_runs_async(run_id=run_id, should_unsubscribe=True) + await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) async def _publish_current_command( self, @@ -112,14 +115,21 @@ async def _publish_current_command( """Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1.""" await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) - async def _publish_runs_async(self, run_id: str) -> None: + async def _publish_runs_async( + self, run_id: str, should_unsubscribe: bool = False + ) -> None: """Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId. Args: run_id: ID of the current run. + should_unsubscribe: Whether the client should unsubscribe from the run_id topic. """ - await self._client.publish_async(topic=Topics.RUNS) - await self._client.publish_async(topic=f"{Topics.RUNS}/{run_id}") + await self._client.publish_async( + topic=Topics.RUNS, should_unsubscribe=should_unsubscribe + ) + await self._client.publish_async( + topic=f"{Topics.RUNS}/{run_id}", should_unsubscribe=should_unsubscribe + ) _runs_publisher_accessor: AppStateAccessor[RunsPublisher] = AppStateAccessor[ diff --git a/robot-server/tests/runs/test_run_store.py b/robot-server/tests/runs/test_run_store.py index b807cbf1e18..b3c469523c7 100644 --- a/robot-server/tests/runs/test_run_store.py +++ b/robot-server/tests/runs/test_run_store.py @@ -5,6 +5,7 @@ import pytest from decoy import Decoy from sqlalchemy.engine import Engine +from unittest import mock from opentrons_shared_data.pipette.dev_types import PipetteNameType @@ -162,6 +163,7 @@ def test_update_run_state( subject: RunStore, state_summary: StateSummary, protocol_commands: List[pe_commands.Command], + mock_runs_publisher: mock.Mock, ) -> None: """It should be able to update a run state to the store.""" action = RunAction( @@ -197,6 +199,7 @@ def test_update_run_state( ) assert run_summary_result == state_summary assert commands_result.commands == protocol_commands + mock_runs_publisher.publish_runs.assert_called_once_with(run_id="run-id") def test_update_state_run_not_found( @@ -372,7 +375,7 @@ def test_get_all_runs( assert result == expected_result -def test_remove_run(subject: RunStore) -> None: +def test_remove_run(subject: RunStore, mock_runs_publisher: mock.Mock) -> None: """It can remove a previously stored run entry.""" action = RunAction( actionType=RunActionType.PLAY, @@ -389,6 +392,9 @@ def test_remove_run(subject: RunStore) -> None: subject.remove(run_id="run-id") assert subject.get_all(length=20) == [] + mock_runs_publisher.publish_runs.assert_called_once_with( + run_id="run-id", shouldUnsubscribe=True + ) def test_remove_run_missing_id(subject: RunStore) -> None: @@ -409,7 +415,9 @@ def test_insert_actions_no_run(subject: RunStore) -> None: subject.insert_action(run_id="run-id-996", action=action) -def test_get_state_summary(subject: RunStore, state_summary: StateSummary) -> None: +def test_get_state_summary( + subject: RunStore, state_summary: StateSummary, mock_runs_publisher: mock.Mock +) -> None: """It should be able to get store run data.""" subject.insert( run_id="run-id", @@ -419,6 +427,7 @@ def test_get_state_summary(subject: RunStore, state_summary: StateSummary) -> No subject.update_run_state(run_id="run-id", summary=state_summary, commands=[]) result = subject.get_state_summary(run_id="run-id") assert result == state_summary + mock_runs_publisher.publish_runs.assert_called_once_with(run_id="run-id") def test_get_state_summary_failure( From be7774be238733578cdf9997450e5c54cb49c63d Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Sat, 2 Mar 2024 17:34:40 -0500 Subject: [PATCH 04/13] update docstring --- .../service/notifications/publishers/runs_publisher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index 569a0e15e12..acc515a1653 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -34,7 +34,8 @@ async def begin_polling_engine_store( """Continuously poll the engine store for the current_command. Args: - current_command: The currently executing command, if any. + get_current_command: Callback to get the currently executing command, if any. + get_state_summary: Callback to get the current run's state summary, if any. run_id: ID of the current run. """ if self._poller is None: From 64615d79e76e0e11c4f10c53f2fc3ff5a9447fa8 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 11:47:32 -0400 Subject: [PATCH 05/13] move poll clean up logic to the cancelled exception --- .../notifications/publishers/runs_publisher.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index acc515a1653..71c3c31ae4a 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -61,10 +61,6 @@ async def stop_polling_engine_store(self) -> None: if self._poller is not None: self._run_data_manager_polling.set() self._poller.cancel() - self._poller = None - self._run_data_manager_polling.clear() - self._previous_current_command = None - self._previous_state_summary_status = None def publish_runs(self, run_id: str, should_unsubscribe: bool = False) -> None: """Publishes the equivalent of GET /runs and GET /runs/:runId. @@ -107,6 +103,7 @@ async def _poll_engine_store( self._previous_state_summary_status = current_state_summary_status await asyncio.sleep(1) except asyncio.CancelledError: + self._clean_up_poller() await self._publish_runs_async(run_id=run_id, should_unsubscribe=True) await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) @@ -132,6 +129,13 @@ async def _publish_runs_async( topic=f"{Topics.RUNS}/{run_id}", should_unsubscribe=should_unsubscribe ) + def _clean_up_poller(self) -> None: + """Cleans up the runs data manager poller.""" + self._poller = None + self._run_data_manager_polling.clear() + self._previous_current_command = None + self._previous_state_summary_status = None + _runs_publisher_accessor: AppStateAccessor[RunsPublisher] = AppStateAccessor[ RunsPublisher From d1536688aa0071d5b83809bd379362c2086f7d75 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 13:02:29 -0400 Subject: [PATCH 06/13] add generic error logging to polling breaking --- .../service/notifications/publishers/runs_publisher.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index 71c3c31ae4a..5cfc2267fee 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -1,5 +1,6 @@ from fastapi import Depends import asyncio +import logging from typing import Union, Callable, Optional from opentrons.protocol_engine import CurrentCommand, StateSummary, EngineStatus @@ -13,6 +14,9 @@ from ..topics import Topics +log: logging.Logger = logging.getLogger(__name__) + + class RunsPublisher: """Publishes protocol runs topics.""" @@ -106,6 +110,8 @@ async def _poll_engine_store( self._clean_up_poller() await self._publish_runs_async(run_id=run_id, should_unsubscribe=True) await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) + except Exception as e: + log.error(f"Error within run data manager poller: {e}") async def _publish_current_command( self, From bff6d1101701e3e4845d82d696541b29cc7adc05 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 16:26:16 -0400 Subject: [PATCH 07/13] update unsubscribe response --- robot-server/robot_server/service/json_api/response.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/robot-server/robot_server/service/json_api/response.py b/robot-server/robot_server/service/json_api/response.py index 6fdfc77a9b4..bac05c71daf 100644 --- a/robot-server/robot_server/service/json_api/response.py +++ b/robot-server/robot_server/service/json_api/response.py @@ -296,5 +296,4 @@ class NotifyUnsubscribeBody(BaseResponseBody): Returns flags for refetching via HTTP and unsubscribing from a topic. """ - refetchUsingHTTP: bool = True - unsubscribe: bool = True + refetchAndUnsubscribe: bool = True From 6bba0ce76d5d39b346ab60e71a23a4bd9405387c Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 16:46:03 -0400 Subject: [PATCH 08/13] separate publish logic into refetch or unsubscribe --- robot-server/robot_server/runs/run_store.py | 8 ++-- .../notifications/notification_client.py | 47 ++++++++++++------- .../publishers/maintenance_runs_publisher.py | 4 +- .../publishers/runs_publisher.py | 41 +++++++++------- 4 files changed, 59 insertions(+), 41 deletions(-) diff --git a/robot-server/robot_server/runs/run_store.py b/robot-server/robot_server/runs/run_store.py index e09a991c942..fc7e2d39f20 100644 --- a/robot-server/robot_server/runs/run_store.py +++ b/robot-server/robot_server/runs/run_store.py @@ -129,7 +129,7 @@ def update_run_state( action_rows = transaction.execute(select_actions).all() self._clear_caches() - self._runs_publisher.publish_runs(run_id=run_id) + self._runs_publisher.publish_runs_advise_refetch(run_id=run_id) return _convert_row_to_run(row=run_row, action_rows=action_rows) def insert_action(self, run_id: str, action: RunAction) -> None: @@ -152,7 +152,7 @@ def insert_action(self, run_id: str, action: RunAction) -> None: transaction.execute(insert) self._clear_caches() - self._runs_publisher.publish_runs(run_id=run_id) + self._runs_publisher.publish_runs_advise_refetch(run_id=run_id) def insert( self, @@ -194,7 +194,7 @@ def insert( raise ProtocolNotFoundError(protocol_id=run.protocol_id) self._clear_caches() - self._runs_publisher.publish_runs(run_id=run_id) + self._runs_publisher.publish_runs_advise_refetch(run_id=run_id) return run @lru_cache(maxsize=_CACHE_ENTRIES) @@ -411,7 +411,7 @@ def remove(self, run_id: str) -> None: raise RunNotFoundError(run_id) self._clear_caches() - self._runs_publisher.publish_runs(run_id=run_id, should_unsubscribe=True) + self._runs_publisher.publish_runs_advise_unsubscribe(run_id=run_id) def _run_exists( self, run_id: str, connection: sqlalchemy.engine.Connection diff --git a/robot-server/robot_server/service/notifications/notification_client.py b/robot-server/robot_server/service/notifications/notification_client.py index e969a76c52e..fd47f348e15 100644 --- a/robot-server/robot_server/service/notifications/notification_client.py +++ b/robot-server/robot_server/service/notifications/notification_client.py @@ -77,28 +77,32 @@ async def disconnect(self) -> None: self.client.loop_stop() await to_thread.run_sync(self.client.disconnect) - async def publish_async(self, topic: str, should_unsubscribe: bool = False) -> None: - """Asynchronously Publish a message on a specific topic to the MQTT broker. + async def publish_advise_refetch_async(self, topic: str) -> None: + """Asynchronously publish a refetch message on a specific topic to the MQTT broker. Args: topic: The topic to publish the message on. - should_unsubscribe: Whether the client should unsubscribe from the topic. + """ + await to_thread.run_sync(self.publish_advise_refetch, topic) + + async def publish_advise_unsubscribe_async(self, topic: str) -> None: + """Asynchronously publish an unsubscribe message on a specific topic to the MQTT broker. + Args: + topic: The topic to publish the message on. """ - await to_thread.run_sync(self.publish, topic, should_unsubscribe) + await to_thread.run_sync(self.publish_advise_unsubscribe, topic) - def publish( + def publish_advise_refetch( self, topic: str, - should_unsubscribe: bool = False, ) -> None: - """Publish a message on a specific topic to the MQTT broker. + """Publish a refetch message on a specific topic to the MQTT broker. Args: topic: The topic to publish the message on. - should_unsubscribe: Whether the client should unsubscribe from the topic. """ - message = self._create_message(should_unsubscribe) + message = NotifyRefetchBody.construct() payload = message.json() self.client.publish( topic=topic, @@ -107,16 +111,23 @@ def publish( retain=self._retain_message, ) - def _create_message( - self, should_unsubscribe: bool - ) -> Union[NotifyRefetchBody, NotifyUnsubscribeBody]: - message: Union[NotifyRefetchBody, NotifyUnsubscribeBody] - message = ( - NotifyUnsubscribeBody.construct() - if should_unsubscribe - else NotifyRefetchBody.construct() + def publish_advise_unsubscribe( + self, + topic: str, + ) -> None: + """Publish an unsubscribe message on a specific topic to the MQTT broker. + + Args: + topic: The topic to publish the message on. + """ + message = NotifyUnsubscribeBody.construct() + payload = message.json() + self.client.publish( + topic=topic, + payload=payload, + qos=self._default_qos, + retain=self._retain_message, ) - return message def _on_connect( self, diff --git a/robot-server/robot_server/service/notifications/publishers/maintenance_runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/maintenance_runs_publisher.py index f6f146e11e4..8ef07fd7eac 100644 --- a/robot-server/robot_server/service/notifications/publishers/maintenance_runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/maintenance_runs_publisher.py @@ -20,7 +20,9 @@ async def publish_current_maintenance_run( self, ) -> None: """Publishes the equivalent of GET /maintenance_run/current_run""" - await self._client.publish_async(topic=Topics.MAINTENANCE_RUNS_CURRENT_RUN) + await self._client.publish_advise_refetch_async( + topic=Topics.MAINTENANCE_RUNS_CURRENT_RUN + ) _maintenance_runs_publisher_accessor: AppStateAccessor[ diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index 5cfc2267fee..de4994fa511 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -66,17 +66,23 @@ async def stop_polling_engine_store(self) -> None: self._run_data_manager_polling.set() self._poller.cancel() - def publish_runs(self, run_id: str, should_unsubscribe: bool = False) -> None: + def publish_runs_advise_refetch(self, run_id: str) -> None: """Publishes the equivalent of GET /runs and GET /runs/:runId. Args: run_id: ID of the current run. - should_unsubscribe: Whether the client should unsubscribe from the run_id topic. """ - self._client.publish(topic=Topics.RUNS) - self._client.publish( - topic=f"{Topics.RUNS}/{run_id}", should_unsubscribe=should_unsubscribe - ) + self._client.publish_advise_refetch(topic=Topics.RUNS) + self._client.publish_advise_refetch(topic=f"{Topics.RUNS}/{run_id}") + + def publish_runs_advise_unsubscribe(self, run_id: str) -> None: + """Publishes the equivalent of GET /runs and GET /runs/:runId. + + Args: + run_id: ID of the current run. + """ + self._client.publish_advise_unsubscribe(topic=Topics.RUNS) + self._client.publish_advise_unsubscribe(topic=f"{Topics.RUNS}/{run_id}") async def _poll_engine_store( self, @@ -108,8 +114,10 @@ async def _poll_engine_store( await asyncio.sleep(1) except asyncio.CancelledError: self._clean_up_poller() - await self._publish_runs_async(run_id=run_id, should_unsubscribe=True) - await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) + await self._publish_runs_advise_unsubscribe_async(run_id=run_id) + await self._client.publish_advise_refetch_async( + topic=Topics.RUNS_CURRENT_COMMAND + ) except Exception as e: log.error(f"Error within run data manager poller: {e}") @@ -117,22 +125,19 @@ async def _publish_current_command( self, ) -> None: """Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1.""" - await self._client.publish_async(topic=Topics.RUNS_CURRENT_COMMAND) + await self._client.publish_advise_refetch_async( + topic=Topics.RUNS_CURRENT_COMMAND + ) - async def _publish_runs_async( - self, run_id: str, should_unsubscribe: bool = False - ) -> None: + async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None: """Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId. Args: run_id: ID of the current run. - should_unsubscribe: Whether the client should unsubscribe from the run_id topic. """ - await self._client.publish_async( - topic=Topics.RUNS, should_unsubscribe=should_unsubscribe - ) - await self._client.publish_async( - topic=f"{Topics.RUNS}/{run_id}", should_unsubscribe=should_unsubscribe + await self._client.publish_advise_unsubscribe_async(topic=Topics.RUNS) + await self._client.publish_advise_unsubscribe_async( + topic=f"{Topics.RUNS}/{run_id}" ) def _clean_up_poller(self) -> None: From dc24831c63d1b25c6e48d14f9a160d16b3fb4376 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 16:49:52 -0400 Subject: [PATCH 09/13] make poll interval tunable --- .../service/notifications/publishers/runs_publisher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index de4994fa511..8cad6e3049e 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -16,6 +16,8 @@ log: logging.Logger = logging.getLogger(__name__) +POLL_INTERVAL = 1 + class RunsPublisher: """Publishes protocol runs topics.""" @@ -111,7 +113,7 @@ async def _poll_engine_store( if self._previous_state_summary_status != current_state_summary_status: await self._publish_runs_async(run_id=run_id) self._previous_state_summary_status = current_state_summary_status - await asyncio.sleep(1) + await asyncio.sleep(POLL_INTERVAL) except asyncio.CancelledError: self._clean_up_poller() await self._publish_runs_advise_unsubscribe_async(run_id=run_id) From d04968eb0c8d6592307abb18e257fd42552840da Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 17:10:39 -0400 Subject: [PATCH 10/13] break out the poll function --- .../publishers/runs_publisher.py | 62 ++++++++++++++----- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index 8cad6e3049e..94aed694e8f 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -96,24 +96,15 @@ async def _poll_engine_store( Args: get_current_command: Retrieves the engine store's current command. + get_state_summary: Retrieves the engine store's state summary. run_id: ID of the current run. """ try: - while not self._run_data_manager_polling.is_set(): - current_command = get_current_command(run_id) - current_state_summary = get_state_summary(run_id) - current_state_summary_status = ( - current_state_summary.status if current_state_summary else None - ) - - if self._previous_current_command != current_command: - await self._publish_current_command() - self._previous_current_command = current_command - - if self._previous_state_summary_status != current_state_summary_status: - await self._publish_runs_async(run_id=run_id) - self._previous_state_summary_status = current_state_summary_status - await asyncio.sleep(POLL_INTERVAL) + await self._poll_for_run_id_info( + get_current_command=get_current_command, + get_state_summary=get_state_summary, + run_id=run_id, + ) except asyncio.CancelledError: self._clean_up_poller() await self._publish_runs_advise_unsubscribe_async(run_id=run_id) @@ -123,6 +114,36 @@ async def _poll_engine_store( except Exception as e: log.error(f"Error within run data manager poller: {e}") + async def _poll_for_run_id_info( + self, + get_current_command: Callable[[str], Optional[CurrentCommand]], + get_state_summary: Callable[[str], Optional[StateSummary]], + run_id: str, + ): + """Poll the engine store for a specific run's state while the poll is active. + + Args: + get_current_command: Retrieves the engine store's current command. + get_state_summary: Retrieves the engine store's state summary. + run_id: ID of the current run. + """ + + while not self._run_data_manager_polling.is_set(): + current_command = get_current_command(run_id) + current_state_summary = get_state_summary(run_id) + current_state_summary_status = ( + current_state_summary.status if current_state_summary else None + ) + + if self._previous_current_command != current_command: + await self._publish_current_command() + self._previous_current_command = current_command + + if self._previous_state_summary_status != current_state_summary_status: + await self._publish_runs_advise_refetch_async(run_id=run_id) + self._previous_state_summary_status = current_state_summary_status + await asyncio.sleep(POLL_INTERVAL) + async def _publish_current_command( self, ) -> None: @@ -131,8 +152,17 @@ async def _publish_current_command( topic=Topics.RUNS_CURRENT_COMMAND ) + async def _publish_runs_advise_refetch_async(self, run_id: str) -> None: + """Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId via a refetch message. + + Args: + run_id: ID of the current run. + """ + await self._client.publish_advise_refetch_async(topic=Topics.RUNS) + await self._client.publish_advise_refetch_async(topic=f"{Topics.RUNS}/{run_id}") + async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None: - """Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId. + """Asynchronously publishes the equivalent of GET /runs and GET /runs/:runId via an unsubscribe message. Args: run_id: ID of the current run. From b7f1835f313aebd48672d7cac3374cec1cdb357d Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 17:27:53 -0400 Subject: [PATCH 11/13] naming --- robot-server/robot_server/service/json_api/response.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robot-server/robot_server/service/json_api/response.py b/robot-server/robot_server/service/json_api/response.py index bac05c71daf..a012df445cc 100644 --- a/robot-server/robot_server/service/json_api/response.py +++ b/robot-server/robot_server/service/json_api/response.py @@ -293,7 +293,7 @@ class NotifyRefetchBody(BaseResponseBody): class NotifyUnsubscribeBody(BaseResponseBody): """A notification response. - Returns flags for refetching via HTTP and unsubscribing from a topic. + Returns flags for unsubscribing from a topic. """ - refetchAndUnsubscribe: bool = True + unsubscribe: bool = True From f115d405ef040bfb0ef977f2306133e1131ee9dc Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 17:37:17 -0400 Subject: [PATCH 12/13] lint --- .../robot_server/service/notifications/notification_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/robot-server/robot_server/service/notifications/notification_client.py b/robot-server/robot_server/service/notifications/notification_client.py index fd47f348e15..568d161cf53 100644 --- a/robot-server/robot_server/service/notifications/notification_client.py +++ b/robot-server/robot_server/service/notifications/notification_client.py @@ -3,7 +3,7 @@ import paho.mqtt.client as mqtt from anyio import to_thread from fastapi import Depends -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional from enum import Enum from ..json_api import NotifyRefetchBody, NotifyUnsubscribeBody From 557149f2ff6c8dfe5426748c4a36587068809f9c Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 11 Mar 2024 18:26:47 -0400 Subject: [PATCH 13/13] test updates --- robot-server/tests/runs/test_run_store.py | 12 ++++++++---- robot-server/tests/service/json_api/test_response.py | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/robot-server/tests/runs/test_run_store.py b/robot-server/tests/runs/test_run_store.py index b3c469523c7..8c696426c76 100644 --- a/robot-server/tests/runs/test_run_store.py +++ b/robot-server/tests/runs/test_run_store.py @@ -199,7 +199,9 @@ def test_update_run_state( ) assert run_summary_result == state_summary assert commands_result.commands == protocol_commands - mock_runs_publisher.publish_runs.assert_called_once_with(run_id="run-id") + mock_runs_publisher.publish_runs_advise_refetch.assert_called_once_with( + run_id="run-id" + ) def test_update_state_run_not_found( @@ -392,8 +394,8 @@ def test_remove_run(subject: RunStore, mock_runs_publisher: mock.Mock) -> None: subject.remove(run_id="run-id") assert subject.get_all(length=20) == [] - mock_runs_publisher.publish_runs.assert_called_once_with( - run_id="run-id", shouldUnsubscribe=True + mock_runs_publisher.publish_runs_advise_unsubscribe.assert_called_once_with( + run_id="run-id" ) @@ -427,7 +429,9 @@ def test_get_state_summary( subject.update_run_state(run_id="run-id", summary=state_summary, commands=[]) result = subject.get_state_summary(run_id="run-id") assert result == state_summary - mock_runs_publisher.publish_runs.assert_called_once_with(run_id="run-id") + mock_runs_publisher.publish_runs_advise_refetch.assert_called_once_with( + run_id="run-id" + ) def test_get_state_summary_failure( diff --git a/robot-server/tests/service/json_api/test_response.py b/robot-server/tests/service/json_api/test_response.py index 124d3288cda..1429d88b5e0 100644 --- a/robot-server/tests/service/json_api/test_response.py +++ b/robot-server/tests/service/json_api/test_response.py @@ -119,7 +119,7 @@ class ResponseSpec(NamedTuple): ResponseSpec(subject=NotifyRefetchBody(), expected={"refetchUsingHTTP": True}), ResponseSpec( subject=NotifyUnsubscribeBody(), - expected={"refetchUsingHTTP": True, "unsubscribe": True}, + expected={"unsubscribe": True}, ), ]