Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(robot-server, api): Wire up protocol engine event bubbling to robot server #14766

Merged
merged 11 commits into from
Apr 2, 2024

Conversation

mjhuff
Copy link
Contributor

@mjhuff mjhuff commented Apr 1, 2024

Closes EXEC-358

Overview

This PR serves to wire up PE event bubbling to the robot server for notifications as an alternative to the current polling that occurs. There are no functional changes.

PublisherNotifier is the new interface that handles event management for publishers, using a generic ChangeNotifier that is given to PE as a callback. When PE reports a change in state, the callback fires. PublisherNotifier then iterates through each callback, invoking them.

In the future, each publisher that requires access to PE state updates (eg, RunsPublisher) will add relevant callbacks during their initialization via register_publish_callbacks. Each callback will contain the conditional logic required for an MQTT publish to occur.

Test Plan

  • No functional changes. You can smoke test a protocol run to verify everything works.

Risk assessment

low

…notification client

Instead of having the robot-server notification publishers poll the engine constantly for state
updates, let's have protocol engine notify the notification client that a state update occurred.
Publishers can then diff the engine store for state changes of interest and emit publish events as
necessary.
@mjhuff mjhuff requested a review from a team April 1, 2024 17:17
@mjhuff mjhuff requested a review from a team as a code owner April 1, 2024 17:17
Copy link

codecov bot commented Apr 1, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 67.19%. Comparing base (c981105) to head (a00015c).
Report is 10 commits behind head on edge.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             edge   #14766      +/-   ##
==========================================
- Coverage   67.19%   67.19%   -0.01%     
==========================================
  Files        2495     2495              
  Lines       71518    71516       -2     
  Branches     9020     9020              
==========================================
- Hits        48054    48052       -2     
  Misses      21342    21342              
  Partials     2122     2122              
Flag Coverage Δ
g-code-testing 92.43% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...pentrons/protocol_engine/create_protocol_engine.py 100.00% <ø> (ø)
api/src/opentrons/protocol_engine/state/state.py 100.00% <ø> (ø)
robot-server/robot_server/app_setup.py 100.00% <ø> (ø)
robot-server/robot_server/runs/engine_store.py 95.83% <ø> (-0.09%) ⬇️
...bot-server/robot_server/runs/router/base_router.py 96.19% <ø> (-0.04%) ⬇️

Copy link
Member

@sfoster1 sfoster1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, very nice!

Comment on lines +22 to +56
@pytest.mark.parametrize("_test_repetition", range(10))
async def test_multiple_subscribers(_test_repetition: int) -> None:
"""Test that multiple subscribers can wait for a notification.

This test checks that the subscribers are awoken in the order they
subscribed. This may or may not be guarenteed according to the
implementations of both ChangeNotifier and the event loop.
This test functions as a canary, given that our code may relies
on this ordering for determinism.

This test runs multiple times to check for flakyness.
"""
subject = ChangeNotifier()
results = []

async def _do_task_1() -> None:
await subject.wait()
results.append(1)

async def _do_task_2() -> None:
await subject.wait()
results.append(2)

async def _do_task_3() -> None:
await subject.wait()
results.append(3)

task_1 = asyncio.create_task(_do_task_1())
task_2 = asyncio.create_task(_do_task_2())
task_3 = asyncio.create_task(_do_task_3())

asyncio.get_running_loop().call_soon(subject.notify)
await asyncio.gather(task_1, task_2, task_3)

assert results == [1, 2, 3]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. I like it

@mjhuff mjhuff merged commit cf93d9c into edge Apr 2, 2024
22 checks passed
@mjhuff mjhuff deleted the bubble-pe-events branch April 2, 2024 13:53
Carlos-fernandez pushed a commit that referenced this pull request May 20, 2024
…o robot server (#14766)

Closes EXEC-358

Wire up PE event bubbling to the robot server for notifications as an alternative to the current polling that occurs. There are no functional changes.

PublisherNotifier is the new interface that handles event management for publishers, using a generic ChangeNotifier that is given to PE as a callback. When PE reports a change in state, the callback fires. PublisherNotifier then iterates through each callback, invoking them.

In the future, each publisher that requires access to PE state updates (eg, RunsPublisher) will add relevant callbacks during their initialization via register_publish_callbacks. Each callback will contain the conditional logic required for an MQTT publish to occur.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants