Skip to content

Commit

Permalink
refactor(engine): do not run redundant cleanup on asyncio.CancelledEr…
Browse files Browse the repository at this point in the history
…ror (#8993)

Closes #8980
  • Loading branch information
mcous committed Dec 7, 2021
1 parent b61ef1d commit fec85cd
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 182 deletions.
8 changes: 6 additions & 2 deletions api/src/opentrons/protocol_engine/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
PlayAction,
PauseAction,
StopAction,
FinishAction,
HardwareStoppedAction,
QueueCommandAction,
UpdateCommandAction,
FailCommandAction,
AddLabwareOffsetAction,
StopErrorDetails,
FinishErrorDetails,
)

__all__ = [
Expand All @@ -27,10 +29,12 @@
"PlayAction",
"PauseAction",
"StopAction",
"FinishAction",
"HardwareStoppedAction",
"QueueCommandAction",
"UpdateCommandAction",
"FailCommandAction",
"AddLabwareOffsetAction",
# action payload values
"StopErrorDetails",
"FinishErrorDetails",
]
28 changes: 23 additions & 5 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,35 @@ class PauseAction:


@dataclass(frozen=True)
class StopErrorDetails:
"""Error details for the payload of a StopAction."""
class StopAction:
"""Stop the current engine execution.
After a StopAction, the engine status will be marked as stopped.
"""


@dataclass(frozen=True)
class FinishErrorDetails:
"""Error details for the payload of a FinishAction."""

error: Exception
error_id: str
created_at: datetime


@dataclass(frozen=True)
class StopAction:
"""Stop processing commands in the engine, marking the engine status as done."""
class FinishAction:
"""Gracefully stop processing commands in the engine.
After a FinishAction, the engine status will be marked as succeeded or failed.
"""

error_details: Optional[FinishErrorDetails] = None

error_details: Optional[StopErrorDetails] = None

@dataclass(frozen=True)
class HardwareStoppedAction:
"""An action dispatched after hardware has successfully been stopped."""


@dataclass(frozen=True)
Expand Down Expand Up @@ -79,6 +95,8 @@ class AddLabwareOffsetAction:
PlayAction,
PauseAction,
StopAction,
FinishAction,
HardwareStoppedAction,
QueueCommandAction,
UpdateCommandAction,
FailCommandAction,
Expand Down
23 changes: 13 additions & 10 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
PlayAction,
PauseAction,
StopAction,
StopErrorDetails,
FinishAction,
FinishErrorDetails,
QueueCommandAction,
AddLabwareOffsetAction,
HardwareStoppedAction,
)


Expand Down Expand Up @@ -126,16 +128,16 @@ async def add_and_execute_command(self, request: CommandCreate) -> Command:

return self._state_store.commands.get(command.id)

async def halt(self) -> None:
"""Halt execution, stopping all motion and cancelling future commands.
async def stop(self) -> None:
"""Stop execution immediately, halting all motion and cancelling future commands.
You should call `stop` after calling `halt` for cleanup and to allow
the engine to settle and recover.
After an engine has been `stop`'ed, it cannot be restarted.
"""
self._action_dispatcher.dispatch(StopAction())
self._queue_worker.cancel()
await self._hardware_api.halt()
await self._hardware_api.stop(home_after=False)
self._action_dispatcher.dispatch(HardwareStoppedAction())

async def wait_until_complete(self) -> None:
"""Wait until there are no more commands to execute.
Expand All @@ -146,11 +148,11 @@ async def wait_until_complete(self) -> None:
condition=self._state_store.commands.get_all_complete
)

async def stop(self, error: Optional[Exception] = None) -> None:
"""Gracefully stop the ProtocolEngine, waiting for it to become idle.
async def finish(self, error: Optional[Exception] = None) -> None:
"""Gracefully finish using the ProtocolEngine, waiting for it to become idle.
The engine will finish executing its current command (if any),
and then shut down. After an engine has been `stop`'ed, it cannot
and then shut down. After an engine has been `finished`'ed, it cannot
be restarted.
This method should not raise, but if any exceptions happen during
Expand All @@ -161,21 +163,22 @@ async def stop(self, error: Optional[Exception] = None) -> None:
error: An error that caused the stop, if applicable.
"""
if error:
error_details: Optional[StopErrorDetails] = StopErrorDetails(
error_details: Optional[FinishErrorDetails] = FinishErrorDetails(
error_id=self._model_utils.generate_id(),
created_at=self._model_utils.get_timestamp(),
error=error,
)
else:
error_details = None

self._action_dispatcher.dispatch(StopAction(error_details=error_details))
self._action_dispatcher.dispatch(FinishAction(error_details=error_details))

try:
await self._queue_worker.join()
finally:
await self._hardware_api.stop(home_after=False)

self._action_dispatcher.dispatch(HardwareStoppedAction())
self._plugin_starter.stop()

def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset:
Expand Down
137 changes: 88 additions & 49 deletions api/src/opentrons/protocol_engine/state/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
PlayAction,
PauseAction,
StopAction,
FinishAction,
HardwareStoppedAction,
)

from ..commands import Command, CommandStatus
Expand All @@ -27,10 +29,28 @@

@dataclass(frozen=True)
class CommandState:
"""State of all protocol engine command resources."""

is_running: bool
stop_requested: bool
"""State of all protocol engine command resources.
Attributes:
is_running_queue: Whether the engine is currently pulling new
commands off the queue to execute. A command may still be
executing, and the robot may still be in motion, even if False.
is_hardware_stopped: Whether the engine's hardware has ceased
motion. Once set, this flag cannot be unset.
should_stop: Whether a graceful finish or an ungraceful stop has
been requested. Once set, this flag cannot be unset.
should_report_result: Whether the engine should report a success or
failure status once stopped. If unset, the engine will simply
report "stopped." Once set, this flag cannot be unset.
commands_by_id: All command resources, in insertion order, mapped
by their unique IDs.
errors_by_id: All error occurrences, mapped by their unique IDs.
"""

is_running_queue: bool
is_hardware_stopped: bool
should_stop: bool
should_report_result: bool
# TODO(mc, 2021-06-16): OrderedDict is mutable. Switch to Sequence + Mapping
commands_by_id: OrderedDict[str, Command]
errors_by_id: Mapping[str, ErrorOccurrence]
Expand All @@ -44,13 +64,15 @@ class CommandStore(HasState[CommandState], HandlesActions):
def __init__(self) -> None:
"""Initialize a CommandStore and its state."""
self._state = CommandState(
is_running=True,
stop_requested=False,
is_running_queue=True,
is_hardware_stopped=False,
should_stop=False,
should_report_result=False,
commands_by_id=OrderedDict(),
errors_by_id={},
)

def handle_action(self, action: Action) -> None:
def handle_action(self, action: Action) -> None: # noqa: C901
"""Modify state in reaction to an action."""
errors_by_id: Mapping[str, ErrorOccurrence]

Expand Down Expand Up @@ -103,38 +125,57 @@ def handle_action(self, action: Action) -> None:
)

elif isinstance(action, PlayAction):
if not self._state.stop_requested:
self._state = replace(self._state, is_running=True)
if not self._state.should_stop:
self._state = replace(self._state, is_running_queue=True)

elif isinstance(action, PauseAction):
self._state = replace(self._state, is_running=False)
self._state = replace(self._state, is_running_queue=False)

elif isinstance(action, StopAction):
# any `ProtocolEngineError`'s will be captured by `FailCommandAction`,
# so only capture unknown errors here
if action.error_details and not isinstance(
action.error_details.error,
ProtocolEngineError,
):
errors_by_id = dict(self._state.errors_by_id)
error_id = action.error_details.error_id
created_at = action.error_details.created_at
error = action.error_details.error

errors_by_id[error_id] = ErrorOccurrence(
id=error_id,
createdAt=created_at,
errorType=type(error).__name__,
detail=str(error),
if not self._state.should_stop:
self._state = replace(
self._state,
is_running_queue=False,
should_report_result=False,
should_stop=True,
)

elif isinstance(action, FinishAction):
if not self._state.should_stop:
# any `ProtocolEngineError`'s will be captured by `FailCommandAction`,
# so only capture unknown errors here
if action.error_details and not isinstance(
action.error_details.error,
ProtocolEngineError,
):
errors_by_id = dict(self._state.errors_by_id)
error_id = action.error_details.error_id
created_at = action.error_details.created_at
error = action.error_details.error

errors_by_id[error_id] = ErrorOccurrence(
id=error_id,
createdAt=created_at,
errorType=type(error).__name__,
detail=str(error),
)
else:
errors_by_id = self._state.errors_by_id

self._state = replace(
self._state,
is_running_queue=False,
should_report_result=True,
should_stop=True,
errors_by_id=errors_by_id,
)
else:
errors_by_id = self._state.errors_by_id

elif isinstance(action, HardwareStoppedAction):
self._state = replace(
self._state,
is_running=False,
stop_requested=True,
errors_by_id=errors_by_id,
is_running_queue=False,
is_hardware_stopped=True,
should_stop=True,
)


Expand Down Expand Up @@ -176,10 +217,10 @@ def get_next_queued(self) -> Optional[str]:
Raises:
EngineStoppedError:
"""
if self._state.stop_requested:
if self._state.should_stop:
raise ProtocolEngineStoppedError("Engine was stopped")

if not self._state.is_running:
if not self._state.is_running_queue:
return None

for command_id, command in self._state.commands_by_id.items():
Expand All @@ -192,7 +233,7 @@ def get_next_queued(self) -> Optional[str]:

def get_is_running(self) -> bool:
"""Get whether the engine is running and queued commands should be executed."""
return self._state.is_running
return self._state.is_running_queue

def get_is_complete(self, command_id: str) -> bool:
"""Get whether a given command is completed.
Expand Down Expand Up @@ -236,15 +277,16 @@ def get_stop_requested(self) -> bool:
A command may still be executing while the engine is stopping.
"""
return self._state.stop_requested
return self._state.should_stop

def get_is_stopped(self) -> bool:
"""Get whether an engine stop has completed."""
return self._state.stop_requested and not any(
return self._state.should_stop and not any(
c.status == CommandStatus.RUNNING
for c in self._state.commands_by_id.values()
)

# TODO(mc, 2021-12-07): reject adding commands to a stopped engine
def validate_action_allowed(self, action: Union[PlayAction, PauseAction]) -> None:
"""Validate if a PlayAction or PauseAction is allowed, raising if not.
Expand All @@ -253,7 +295,7 @@ def validate_action_allowed(self, action: Union[PlayAction, PauseAction]) -> Non
Raises:
ProtocolEngineStoppedError: the engine has been stopped.
"""
if self._state.stop_requested:
if self._state.should_stop:
action_desc = "play" if isinstance(action, PlayAction) else "pause"
raise ProtocolEngineStoppedError(f"Cannot {action_desc} a stopped engine.")

Expand All @@ -263,20 +305,21 @@ def get_status(self) -> EngineStatus:
all_errors = self._state.errors_by_id.values()
all_statuses = [c.status for c in all_commands]

if self._state.stop_requested:
if any(all_errors):
if self._state.should_report_result:
if not self._state.is_hardware_stopped:
return EngineStatus.RUNNING
elif any(all_errors):
return EngineStatus.FAILED

if all(s == CommandStatus.SUCCEEDED for s in all_statuses):
else:
return EngineStatus.SUCCEEDED

elif any(s == CommandStatus.RUNNING for s in all_statuses):
elif self._state.should_stop:
if not self._state.is_hardware_stopped:
return EngineStatus.STOP_REQUESTED

else:
return EngineStatus.STOPPED

elif self._state.is_running:
elif self._state.is_running_queue:
any_running = any(s == CommandStatus.RUNNING for s in all_statuses)
any_queued = any(s == CommandStatus.QUEUED for s in all_statuses)

Expand All @@ -287,8 +330,4 @@ def get_status(self) -> EngineStatus:
return EngineStatus.IDLE

else:
if any(s == CommandStatus.RUNNING for s in all_statuses):
return EngineStatus.PAUSE_REQUESTED

else:
return EngineStatus.PAUSED
return EngineStatus.PAUSED
1 change: 0 additions & 1 deletion api/src/opentrons/protocol_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class EngineStatus(str, Enum):

IDLE = "idle"
RUNNING = "running"
PAUSE_REQUESTED = "pause-requested"
PAUSED = "paused"
STOP_REQUESTED = "stop-requested"
STOPPED = "stopped"
Expand Down
Loading

0 comments on commit fec85cd

Please sign in to comment.