-
Notifications
You must be signed in to change notification settings - Fork 2
/
device_json_to_timeseries.py
executable file
·103 lines (87 loc) · 2.79 KB
/
device_json_to_timeseries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("input", help="Input single-line json file")
parser.add_argument(
"--drop-header",
action="store_true",
help="If enabled, the header line will be dropped",
)
parser.add_argument(
"--deglitch-pressure",
action="store_true",
help="If enabled, pass pressure through pressure_deglitch_smooth",
)
parser.add_argument(
"--start-time-at-zero",
action="store_true",
help="If enabled, the timeseries starts at zero",
)
args = parser.parse_args()
import json
import numpy
from processor.config import config
from processor import analysis
from processor.flow_calibrator import FlowCalibrator
caliber = FlowCalibrator()
flow_scale = config["device"]["flow"]["scale"].as_number()
flow_offset = config["device"]["flow"]["offset"].as_number()
pressure_scale = config["device"]["pressure"]["scale"].as_number()
pressure_offset = config["device"]["pressure"]["offset"].as_number()
time = []
pressure = []
flow = []
starttime = None
with open(args.input) as fin:
for line in fin:
try:
j = json.loads(line)
except json.JSONDecodeError:
continue
if j.get("v", None) != 1:
continue
if "t" not in j or not isinstance(j["t"], (int, float)):
continue
if "P" not in j or not isinstance(j["P"], (int, float)):
continue
if "F" not in j or not isinstance(j["F"], (int, float)):
continue
# FIXME: this will be a lookup function!
# I could not extract this correction from the main codebase; we'll
# have to fix it both here and there.
t = j["t"] / 1000.0
p = j["P"] * pressure_scale - pressure_offset
# this was the old flow calibration
# fO = math.copysign(abs(j["F"]) ** (4 / 7), j["F"]) * flow_scale - flow_offset
f = caliber.Q(j["F"])
if starttime is None:
starttime = t
if args.start_time_at_zero:
t -= starttime
time.append(t)
pressure.append(p)
flow.append(f)
if args.deglitch_pressure:
pressure = analysis.pressure_deglitch_smooth(numpy.array(pressure))
volume = analysis.flow_to_volume(
numpy.array(time),
None,
numpy.array(flow),
None,
critical_frequency=0.004,
)
volume -= numpy.min(volume)
minbias_volume = analysis.flow_to_volume(
numpy.array(time),
None,
numpy.array(flow),
None,
critical_frequency=0.0004,
)
minbias_volume -= numpy.min(minbias_volume)
if not args.drop_header:
print(
" time (sec), pressure (cm H2O), flow (L/min), volume (mL), minbias volume (mL)"
)
for t, p, f, v, mv in zip(time, pressure, flow, volume, minbias_volume):
print(f"{t:15.3f}, {p:17.4f}, {f:12.4f}, {v:11.4f}, {mv:19.4f}")