-
-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Contributed python variant to the 1BRC. #1
Conversation
A python variant for the 1BRC. This makes some different design choices from Doug Mercer's versions. It reduces the amount of string creation and seems to perform best with buffers around 64mb on my test system.
Doug - I loved your recent (14-Apr-2024) video and how you drew attention to the 1BRC and pypy (which I've not used before). It was a great match for this problem and gave a substantial speedup with no work necessary. Indeed, almost all of the real speed improvements came from pypy and multiprocessing. I also enjoyed using hyperfine for performing a simple grid search to find good combinations of parameters and record behavior averaged over multiple runs. My version reduces the amount of string creation and seems to perform best when each process handless multiple chunks (e.g. 10 chunks/process). I've adjusted it to match the CLI of your implementation by default. There are a couple of other command line options (e.g. for number of processes, chunk size, etc). I would love to find ways to:
|
Hi Adam -- Thanks for putting this together! This is a neat variation on the problem, and you added some nice creature comforts to the code. On my machine, your approach is nearly as fast, but not faster than the two "doug_booty4*" implementations.
I ran it..:
That said, it still is probably worth merging this in. I think with a few minor tweaks + additional comments, this version of the code can show that mmap and some of the other tweaks I made (e.g., removing the global variables) didn't really have a major difference in performance. Would you mind giving me a day or two to recommend a few changes, then we can merge it in? Also, I may try searching a few more chunk_sizes. I'll have to give For context, I tried this "tweaked" version and it didn't affect performance. I was really surprised! import argparse
import os
from multiprocessing import Pool
# Adam Farquhar 2021-09-30
# Github: adamfarquhar
Record = list[int, int, int, int]
def rec_str(loc, rec) -> str:
return f"{loc.decode('utf-8')}={rec[0]/10:.1f}/{0.1 * rec[2] / rec[3]:.1f}/{rec[1]/10:.1f}"
def parse_temp(buf: bytes, offset: int) -> int:
# DASH: int = 45 # ord("-")
# DOT: int = 46 # ord(".")
if buf[offset] == 45: # dash ord("-")
neg = -1
offset += 1
else:
neg = 1
if buf[offset + 1] == 46: # dot ord(".")
# d.d
val: int = (buf[offset] - 48) * 10 + (buf[offset + 2] - 48)
else:
# dd.d
val: int = (buf[offset] - 48) * 100 + (buf[offset + 1] - 48) * 10 + (buf[offset + 3] - 48)
return neg*val
def merge_data(target: dict[str, Record], source_data: list[dict[str, Record]]) -> dict[str, Record]:
"""Merge the source data into the target."""
for result in source_data:
for key, val in result.items():
if key in target:
rec = target[key]
rec[0] = min(rec[0], val[0])
rec[1] = max(rec[1], val[1])
rec[2] += val[2]
rec[3] += val[3]
else:
target[key] = val
return target
def output_data(out_file: str, data: dict[str, Record]) -> None:
with open(out_file, "w", encoding="utf-8") as file:
print("{", ", ".join(rec_str(loc, val) for loc, val in sorted(data.items())), "}", sep="", file=file)
# For the chunk_size, we seem to bottom out at 64MB. 128MB, 32MB are slower - but not by much.
def get_chunk_info(
file_name: str, chunk_size: int = 32 * 1024 * 1024, buf_size: int = 4096
) -> list[tuple[str, int, int, int]]:
assert chunk_size > 0, "Chunk target_size must be positive"
results = []
buffer = bytearray(buf_size)
with open(file_name, "rb") as file:
# Get the file size
file.seek(0, 2)
file_size = file.tell()
# Reset the file pointer
file.seek(0, 0)
id = 0
offset = 0
while offset < file_size:
id += 1
if offset + chunk_size >= file_size:
results.append((file_name, id, offset, file_size - offset))
break
else:
file.seek(offset + chunk_size)
file.readinto(buffer)
newline_loc = buffer.find(b"\n")
assert newline_loc != -1, f"No end of line found following {offset=:,}"
results.append((file_name, id, offset, chunk_size + newline_loc + 1))
offset += chunk_size + newline_loc + 1
return results
def process_chunk(chunk: bytes, id=int) -> dict[str, list]:
result: dict[str, list] = {}
size: int = len(chunk)
line_start: int = 0
while line_start < size:
name_end = chunk.find(b";", line_start)
# assert name_end != -1, f"Separator not found after offset {line_start}."
# It seems to be OK(ish) to skip the decoding step here. We don't really need it until we write the output.
station = chunk[line_start:name_end] # .decode(encoding="utf-8")
temp = parse_temp(chunk, name_end + 1)
# Add data for this line
if station not in result:
# min, max, sum, count
result[station] = [temp, temp, temp, 1]
else:
rec = result[station]
rec[0] = min(rec[0], temp)
rec[1] = max(rec[1], temp)
rec[2] += temp
rec[3] += 1
# Find next line
line_start = chunk.find(b"\n", name_end) # could catch the end of temp and start there
# assert line_start != -1, f"No newline at end of chunk {id} following {name_end=}"
line_start += 1 # eat the newline
return result
def read_and_process_chunk(in_file: str, id: int, offset: int, size: int) -> dict[str, list]:
with open(in_file, "rb") as file:
file.seek(offset)
chunk = file.read(size)
return process_chunk(chunk, id)
def main(input: str, output: str, workers: int, chunk_size: int, verbose: bool = False) -> None:
chunk_info = get_chunk_info(input, chunk_size=chunk_size)
with Pool(workers) as pool:
results = pool.starmap(read_and_process_chunk, chunk_info)
merged = merge_data(results[0], results[1:])
print("{", ", ".join(rec_str(loc, val) for loc, val in sorted(merged.items())), "}", sep="")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process some temperatures.")
parser.add_argument(
"--input",
type=str,
help="Input file path.",
default="data/measurements.txt",
)
parser.add_argument("--output", type=str, help="Output file path", default="output.csv")
parser.add_argument("--workers", type=int, help="Number of subprocesses", default=max(1, os.cpu_count() - 1))
parser.add_argument("--chunk_size", type=int, help="Chunk size in bytes", default=64 * 1024 * 1024)
parser.add_argument("--verbose", type=bool, help="Suppress extra output", default=False)
args = parser.parse_args()
main(args.input, args.output, args.workers, args.chunk_size, verbose=args.verbose) Thanks again! |
A python variant for the 1BRC. This makes some different design choices from Doug Mercer's versions. It reduces the amount of string creation and seems to perform best with buffers around 64mb on my test system.