-
Notifications
You must be signed in to change notification settings - Fork 175
/
hepa_uv_lifetime_test.py
324 lines (278 loc) · 10.3 KB
/
hepa_uv_lifetime_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""This is the life-time testing script for the Hepa/UV module."""
import asyncio
import argparse
import datetime
import logging
import logging.config
from typing import Optional, cast, Dict
from hardware_testing.opentrons_api import helpers_ot3
from opentrons.hardware_control.backends.ot3controller import OT3Controller
from opentrons.hardware_control.ot3api import OT3API
from opentrons.hardware_control.types import (
SubSystem,
HepaFanState,
HepaUVState,
DoorState,
)
# Default constants
DEFAULT_DUTY_CYCLE: int = 75
MAX_DUTY_CYCLE: int = 100
DEFAULT_UV_DOSAGE_DURATION: int = 900 # 15m
MAX_UV_DOSAGE: int = 60 * 60 # 1hr max dosage
DEFAULT_CYCLES: int = 1
log = logging.getLogger(__name__)
async def _turn_off_hepa_uv(api: OT3API) -> None:
"""Set and Make sure that the Hepa fan and UV light are off."""
log.info("Turning off Hepa Fan and UV Light.")
await api.set_hepa_uv_state(turn_on=False)
await api.set_hepa_fan_state(turn_on=False)
# Confirm that they are off
hepa_uv_state: Optional[HepaUVState] = await api.get_hepa_uv_state()
if hepa_uv_state:
assert not hepa_uv_state.light_on, "Hepa UV did not turn OFF!"
hepa_fan_state: Optional[HepaFanState] = await api.get_hepa_fan_state()
if hepa_fan_state:
assert not hepa_fan_state.fan_on, "Hepa Fan did not turn OFF!"
async def run_hepa_fan(
api: OT3API,
duty_cycle: int,
on_time: int,
off_time: int,
cycles: int,
) -> None:
"""Coroutine that will run the hepa fan."""
fan_duty_cycle = max(0, min(duty_cycle, MAX_DUTY_CYCLE))
fan_on_time = on_time if on_time > 0 else 0
fan_off_time = off_time if off_time > 0 else 0
run_forever = on_time == -1
start_time = datetime.datetime.now()
# Dont run task if there are no valid parameters
if not fan_on_time and not fan_off_time:
return
log.info(
f"Hepa Task: Starting - duty_cycle={fan_duty_cycle}, "
f"on_time={fan_on_time}s, off_time={fan_off_time}s, cycles={cycles}, "
f"run_forever: {run_forever}"
)
fan_on: bool = False
cycle: int = 1
while True:
try:
if not run_forever and cycle > cycles:
log.info(f"Hepa Task: Reached target cycles={cycles}")
break
# on time
if not fan_on:
fan_on = True
log.info(f"Hepa Task: cycle {cycle}")
msg = "forever" if run_forever else f"for {fan_on_time} seconds"
log.info(f"Hepa Task: Turning on fan {msg}")
await api.set_hepa_fan_state(turn_on=True, duty_cycle=fan_duty_cycle)
await asyncio.sleep(fan_on_time)
# off time
if fan_off_time:
log.info(f"Hepa Task: Turning off fan for {fan_off_time} seconds")
await api.set_hepa_fan_state(turn_on=False, duty_cycle=0)
fan_on = False
# sleep and increment the cycle
await asyncio.sleep(fan_off_time or 1)
if not run_forever:
cycle += 1
except asyncio.CancelledError:
break
log.info("Hepa Task: Finished - Turning off Fan")
await api.set_hepa_fan_state(turn_on=False, duty_cycle=DEFAULT_DUTY_CYCLE)
elapsed_time = datetime.datetime.now() - start_time
log.info(f"Hepa Task: Elapsed time={elapsed_time}")
async def run_hepa_uv(api: OT3API, on_time: int, off_time: int, cycles: int) -> None:
"""Coroutine that will run the hepa uv light."""
light_on_time = max(0, min(on_time, MAX_UV_DOSAGE))
light_off_time = off_time if off_time > 0 else 0
start_time = datetime.datetime.now()
# Dont run task if there are no valid parameters
if not light_on_time and not light_off_time:
return
if api.door_state == DoorState.OPEN:
log.warning("UV Task: Flex Door must be closed to operate the UV light")
return
log.info(
f"Hepa UV Task: Starting - on_time={light_on_time}s, "
f"off_time={light_off_time}s, cycles={cycles}"
)
log.info("===========================================")
uv_light_on: bool = False
cycle: int = 1
while True:
try:
if cycle > cycles:
log.info(f"UV Task: Reached target cycles={cycles}")
break
# on time
if not uv_light_on:
uv_light_on = True
log.info(f"UV Task: cycle number={cycle}")
log.info(
f"UV Task: Turning on the UV Light for {light_on_time} seconds"
)
await api.set_hepa_uv_state(turn_on=True, uv_duration_s=light_on_time)
await asyncio.sleep(light_on_time)
# off time
if light_off_time:
log.info(
f"UV Task: Turning off the UV Light for {light_off_time} seconds"
)
await api.set_hepa_uv_state(turn_on=False, uv_duration_s=0)
uv_light_on = False
# Sleep and increment the cycle
await asyncio.sleep(light_off_time or 1)
cycle += 1
except asyncio.CancelledError:
break
log.info("UV Task: Finished - Turning off UV Light ")
await api.set_hepa_uv_state(turn_on=False, uv_duration_s=DEFAULT_UV_DOSAGE_DURATION)
elapsed_time = datetime.datetime.now() - start_time
log.info(f"UV Task: Elapsed time={elapsed_time}")
async def _control_task(
api: OT3API, hepa_task: asyncio.Task, uv_task: asyncio.Task
) -> None:
"""Checks robot status and cancels tasks."""
while True:
# Make sure the door is closed
if api.door_state == DoorState.OPEN:
if not uv_task.done():
log.warning("Control Task: Flex Door Opened, stopping UV task")
uv_task.cancel()
if uv_task.done() and hepa_task.done():
break
await asyncio.sleep(1)
async def _main(args: argparse.Namespace) -> None:
api = await helpers_ot3.build_async_ot3_hardware_api(
is_simulating=args.is_simulating
)
# Scan for subsystems and make sure we have a hepa/uv module if not simulating
if not args.is_simulating:
await cast(OT3Controller, api._backend).probe_network()
assert (
SubSystem.hepa_uv in api.attached_subsystems
), "No Hepa/UV module detected!"
# Make sure everything is off before we start testing
await _turn_off_hepa_uv(api)
# create tasks
hepa_fan_task = asyncio.create_task(
run_hepa_fan(
api, args.fan_duty_cycle, args.fan_on_time, args.fan_off_time, args.cycles
)
)
hepa_uv_task = asyncio.create_task(
run_hepa_uv(api, args.uv_on_time, args.uv_off_time, args.cycles)
)
control_task = asyncio.create_task(_control_task(api, hepa_fan_task, hepa_uv_task))
# start the tasks
try:
await asyncio.gather(control_task, hepa_fan_task, hepa_uv_task)
finally:
# Make sure we always turn OFF everything!
await _turn_off_hepa_uv(api)
def log_config(log_level: int) -> Dict:
"""Configure logging."""
return {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"},
"production_trace": {"format": "%(asctime)s %(message)s"},
},
"handlers": {
"main_log_handler": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "basic",
"filename": "/var/log/hepauv_lifetime.log",
"maxBytes": 5000000,
"level": log_level,
"backupCount": 3,
},
"stream_handler": {
"class": "logging.StreamHandler",
"formatter": "basic",
"level": log_level,
},
},
"loggers": {
"": {
"handlers": (
["main_log_handler"]
if log_level > logging.INFO
else ["main_log_handler", "stream_handler"]
),
"level": log_level,
},
},
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Hepa/UV Life-Time Test",
description="Program to test the lifetime of the Hepa/UV module.",
)
parser.add_argument(
"--log-level",
help=(
"Developer logging level. At DEBUG or below, logs are written "
"to console; at INFO or above, logs are only written to "
"/var/log/hepauv_lifetime.log"
),
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
)
parser.add_argument(
"--is_simulating",
action="store_true",
help="Whether this is a simulation or not.",
)
parser.add_argument(
"--fan-duty-cycle",
type=int,
default=DEFAULT_DUTY_CYCLE,
help="The duty cycle of the hepa fan 0-100%.",
)
parser.add_argument(
"--fan-on-time",
type=int,
default=0,
help="The time in seconds the fan should stay on for. "
"0 turns off fan (default), -1 stays on until program is stopped",
)
parser.add_argument(
"--fan-off-time",
type=int,
default=0,
help="The time in seconds the fan should stay on for. ignored if not set",
)
parser.add_argument(
"--uv-on-time",
type=int,
default=0,
help="The time in seconds the UV light will be turned on for. "
"0 turns off uv light (default), "
f"The max value is {MAX_UV_DOSAGE} seconds.",
)
parser.add_argument(
"--uv-off-time",
type=int,
default=0,
help="The time in seconds the UV light will be turned off for. "
"if 0 DONT turn off the uv light explictly but wait for "
"the hepa/uv to turn off on its on based on --uv-on-time.",
)
parser.add_argument(
"--cycles",
type=int,
default=DEFAULT_CYCLES,
help="The number of cycles to run.",
)
args = parser.parse_args()
logging.config.dictConfig(log_config(getattr(logging, args.log_level)))
try:
asyncio.run(_main(args))
except KeyboardInterrupt:
log.warning("KeyBoard Interrupt")