-
Notifications
You must be signed in to change notification settings - Fork 176
/
conftest.py
107 lines (88 loc) · 3.19 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Common fixtures for integration tests."""
from __future__ import annotations
import pytest
from typing import AsyncGenerator, Iterator, AsyncIterator, List, Dict
from _pytest.fixtures import FixtureRequest
from opentrons_hardware.drivers.can_bus.settings import DriverSettings
from opentrons_hardware.drivers.can_bus.build import build_driver
from opentrons_hardware.drivers.can_bus.abstract_driver import AbstractCanDriver
from opentrons_hardware.drivers.can_bus import CanMessenger, WaitableCallback
from opentrons_hardware.firmware_bindings import constants
@pytest.fixture
async def driver() -> AsyncGenerator[AbstractCanDriver, None]:
"""Create CanDriver connected to OT-3 Emulator."""
settings = DriverSettings()
driver = await build_driver(settings)
yield driver
driver.shutdown()
@pytest.fixture
async def can_messenger(
driver: AbstractCanDriver,
) -> AsyncIterator[CanMessenger]:
"""Create Can messenger."""
messenger = CanMessenger(driver)
messenger.start()
yield messenger
await messenger.stop()
@pytest.fixture
def can_messenger_queue(
request: FixtureRequest,
can_messenger: CanMessenger,
) -> Iterator[WaitableCallback]:
"""Create WaitableCallback for the CAN Messenger."""
# Get optional filtering function.
mark = request.node.get_closest_marker("can_filter_func")
if not mark:
filter_func = None
else:
filter_func = mark.args[0]
with WaitableCallback(messenger=can_messenger, filter=filter_func) as wc:
yield wc
@pytest.fixture(
scope="session",
params=[
constants.NodeId.head,
constants.NodeId.pipette_left,
constants.NodeId.gantry_x,
constants.NodeId.gantry_y,
],
)
def subsystem_node_id(request: FixtureRequest) -> Iterator[constants.NodeId]:
"""Each subsystem's node id as a fixture."""
yield request.param # type: ignore[attr-defined]
_motor_nodes = [
constants.NodeId.head_l,
constants.NodeId.head_r,
constants.NodeId.pipette_left,
constants.NodeId.gantry_x,
constants.NodeId.gantry_y,
]
# These unfortunately need to be manually kept up to date with the c++
# configurations
_motor_node_step_sizes = {
# for lead screw pitch 12, microstepping 16
constants.NodeId.head_l: 0.00375,
constants.NodeId.head_r: 0.00375,
# for pulley diameter 12, microstepping 32
constants.NodeId.gantry_x: 0.006234098,
# for pulley diameter 12.7254, microstepping 32
constants.NodeId.gantry_y: 0.006246566,
# for lead screw pitch 3.03, microstepping 32
constants.NodeId.pipette_left: 0.000473437,
constants.NodeId.pipette_right: 0.000473437,
}
@pytest.fixture(
scope="session",
params=_motor_nodes,
)
def motor_node_id(request: FixtureRequest) -> Iterator[constants.NodeId]:
"""Each motor's node id as a fixture."""
yield request.param # type: ignore[attr-defined]
@pytest.fixture(scope="session")
def all_motor_nodes() -> List[constants.NodeId]:
"""The full list of configured motor nodes."""
return _motor_nodes
@pytest.fixture(scope="session")
def all_motor_node_step_sizes() -> Dict[constants.NodeId, float]:
"""Step sizes for all configured motor nodes."""
return _motor_node_step_sizes