-
Notifications
You must be signed in to change notification settings - Fork 0
/
reference.py
130 lines (110 loc) · 3.62 KB
/
reference.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# credit to https://github.com/ifnesi/1brc
# time python3 calculateAverage.py
import os
import multiprocessing as mp
def get_file_chunks(
file_name: str,
max_cpu: int = 8,
) -> tuple[int, list[tuple[str, int, int]]]:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())
file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count
start_end = list()
with open(file_name, "r+b") as f:
def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == b"\n"
def next_line(position):
f.seek(position)
f.readline()
return f.tell()
chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)
while not is_new_line(chunk_end):
chunk_end -= 1
if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)
start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)
chunk_start = chunk_end
return (
cpu_count,
start_end,
)
def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
) -> dict:
"""Process each file chunk in a different process"""
result = dict()
with open(file_name, "rb") as f:
f.seek(chunk_start)
for line in f:
chunk_start += len(line)
if chunk_start > chunk_end:
break
location, measurement = line.split(b";")
measurement = float(measurement)
if location not in result:
result[location] = [
measurement,
measurement,
measurement,
1,
] # min, max, sum, count
else:
_result = result[location]
if measurement < _result[0]:
_result[0] = measurement
if measurement > _result[1]:
_result[1] = measurement
_result[2] += measurement
_result[3] += 1
return result
def process_file(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)
# Combine all results from all chunks
result = dict()
for chunk_result in chunk_results:
for location, measurements in chunk_result.items():
if location not in result:
result[location] = measurements
else:
_result = result[location]
if measurements[0] < _result[0]:
_result[0] = measurements[0]
if measurements[1] > _result[1]:
_result[1] = measurements[1]
_result[2] += measurements[2]
_result[3] += measurements[3]
# Print final results
print("{", end="")
for location, measurements in sorted(result.items()):
print(
f"{location.decode('utf8')}={measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] != 0 else 0:.1f}/{measurements[1]:.1f}",
end=", ",
)
print("\b\b} ")
if __name__ == "__main__":
cpu_count, *start_end = get_file_chunks("data/measurements.txt")
process_file(cpu_count, start_end[0])