Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contributed python variant to the 1BRC. #1

Merged
merged 1 commit into from
Apr 21, 2024

Conversation

adamfarquhar
Copy link
Contributor

A python variant for the 1BRC. This makes some different design choices from Doug Mercer's versions. It reduces the amount of string creation and seems to perform best with buffers around 64mb on my test system.

A python variant for the 1BRC. This makes some different design choices from Doug Mercer's versions. It reduces the amount of string creation and seems to perform best with buffers around 64mb on my test system.
@adamfarquhar
Copy link
Contributor Author

Doug - I loved your recent (14-Apr-2024) video and how you drew attention to the 1BRC and pypy (which I've not used before). It was a great match for this problem and gave a substantial speedup with no work necessary. Indeed, almost all of the real speed improvements came from pypy and multiprocessing. I also enjoyed using hyperfine for performing a simple grid search to find good combinations of parameters and record behavior averaged over multiple runs.

My version reduces the amount of string creation and seems to perform best when each process handless multiple chunks (e.g. 10 chunks/process). I've adjusted it to match the CLI of your implementation by default. There are a couple of other command line options (e.g. for number of processes, chunk size, etc).

I would love to find ways to:

  • Do good quality performance monitoring with pypy when using multi-processing. I've tried vmprof, but would like a good visualization tool for it (the vmprof.com site has been down).
  • Reduce the amount of data shipped back from the sub-processes.
  • Reduce the number of strings required for the station names. Would it be cheating to read in the list first and create an optimized data structure for storing the min/max/sum/count values?

@dougmercer
Copy link
Collaborator

dougmercer commented Apr 17, 2024

Hi Adam -- Thanks for putting this together!

This is a neat variation on the problem, and you added some nice creature comforts to the code.

On my machine, your approach is nearly as fast, but not faster than the two "doug_booty4*" implementations.

python src/duckdb_1brc.py: 9.100 seconds
pypy3 src/doug_booty4.py: 9.798 seconds
pypy3 src/doug_booty4_alternate.py: 9.871 seconds
pypy3 src/farquhar_v6.py --workers 10 --chunk_size 536870912: 10.044 seconds
pypy3 src/farquhar_v6_tweaked.py --workers 10 --chunk_size 536870912: 10.072 seconds
pypy3 src/farquhar_v6.py --workers 10: 10.566 seconds
pypy3 src/farquhar_v6_tweaked.py --workers 10: 10.619 seconds
python src/polars_1brc.py: 11.704 seconds

I ran it..:

  • as-is and slightly tweaked in ways that I thought might speed it up (no major difference)
  • with 9 and 10 workers on a 10 core system (10 was faster)
  • with a few different chunk sizes (536870912) was the best I tried

That said, it still is probably worth merging this in. I think with a few minor tweaks + additional comments, this version of the code can show that mmap and some of the other tweaks I made (e.g., removing the global variables) didn't really have a major difference in performance.

Would you mind giving me a day or two to recommend a few changes, then we can merge it in?

Also, I may try searching a few more chunk_sizes. I'll have to give hyperfine a try!

For context, I tried this "tweaked" version and it didn't affect performance. I was really surprised!

import argparse
import os
from multiprocessing import Pool

# Adam Farquhar 2021-09-30
# Github: adamfarquhar


Record = list[int, int, int, int]


def rec_str(loc, rec) -> str:
    return f"{loc.decode('utf-8')}={rec[0]/10:.1f}/{0.1 * rec[2] / rec[3]:.1f}/{rec[1]/10:.1f}"


def parse_temp(buf: bytes, offset: int) -> int:
    # DASH: int = 45  # ord("-")
    # DOT: int = 46  # ord(".")

    if buf[offset] == 45:  # dash ord("-")
        neg = -1
        offset += 1
    else:
        neg = 1
    if buf[offset + 1] == 46:  # dot ord(".")
        # d.d
        val: int = (buf[offset] - 48) * 10 + (buf[offset + 2] - 48)
    else:
        # dd.d
        val: int = (buf[offset] - 48) * 100 + (buf[offset + 1] - 48) * 10 + (buf[offset + 3] - 48)
    return neg*val


def merge_data(target: dict[str, Record], source_data: list[dict[str, Record]]) -> dict[str, Record]:
    """Merge the source data into the target."""
    for result in source_data:
        for key, val in result.items():
            if key in target:
                rec = target[key]
                rec[0] = min(rec[0], val[0])
                rec[1] = max(rec[1], val[1])
                rec[2] += val[2]
                rec[3] += val[3]
            else:
                target[key] = val
    return target


def output_data(out_file: str, data: dict[str, Record]) -> None:
    with open(out_file, "w", encoding="utf-8") as file:
        print("{", ", ".join(rec_str(loc, val) for loc, val in sorted(data.items())), "}", sep="", file=file)


# For the chunk_size, we seem to bottom out at 64MB. 128MB, 32MB are slower - but not by much.
def get_chunk_info(
    file_name: str, chunk_size: int = 32 * 1024 * 1024, buf_size: int = 4096
) -> list[tuple[str, int, int, int]]:
    assert chunk_size > 0, "Chunk target_size must be positive"
    results = []
    buffer = bytearray(buf_size)

    with open(file_name, "rb") as file:
        # Get the file size
        file.seek(0, 2)
        file_size = file.tell()
        # Reset the file pointer
        file.seek(0, 0)
        id = 0
        offset = 0

        while offset < file_size:
            id += 1
            if offset + chunk_size >= file_size:
                results.append((file_name, id, offset, file_size - offset))
                break
            else:
                file.seek(offset + chunk_size)
                file.readinto(buffer)
                newline_loc = buffer.find(b"\n")
                assert newline_loc != -1, f"No end of line found following {offset=:,}"
                results.append((file_name, id, offset, chunk_size + newline_loc + 1))
                offset += chunk_size + newline_loc + 1
    return results


def process_chunk(chunk: bytes, id=int) -> dict[str, list]:
    result: dict[str, list] = {}
    size: int = len(chunk)
    line_start: int = 0
    while line_start < size:
        name_end = chunk.find(b";", line_start)
        # assert name_end != -1, f"Separator not found after offset {line_start}."
        # It seems to be OK(ish) to skip the decoding step here. We don't really need it until we write the output.
        station = chunk[line_start:name_end]  # .decode(encoding="utf-8")
        temp = parse_temp(chunk, name_end + 1)
        # Add data for this line
        if station not in result:
            # min, max, sum, count
            result[station] = [temp, temp, temp, 1]
        else:
            rec = result[station]
            rec[0] = min(rec[0], temp)
            rec[1] = max(rec[1], temp)
            rec[2] += temp
            rec[3] += 1
        # Find next line
        line_start = chunk.find(b"\n", name_end)  # could catch the end of temp and start there
        # assert line_start != -1, f"No newline at end of chunk {id} following {name_end=}"
        line_start += 1  # eat the newline
    return result


def read_and_process_chunk(in_file: str, id: int, offset: int, size: int) -> dict[str, list]:
    with open(in_file, "rb") as file:
        file.seek(offset)
        chunk = file.read(size)
        return process_chunk(chunk, id)


def main(input: str, output: str, workers: int, chunk_size: int, verbose: bool = False) -> None:
    chunk_info = get_chunk_info(input, chunk_size=chunk_size)
    with Pool(workers) as pool:
        results = pool.starmap(read_and_process_chunk, chunk_info)
    merged = merge_data(results[0], results[1:])

    print("{", ", ".join(rec_str(loc, val) for loc, val in sorted(merged.items())), "}", sep="")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Process some temperatures.")
    parser.add_argument(
        "--input",
        type=str,
        help="Input file path.",
        default="data/measurements.txt",
    )
    parser.add_argument("--output", type=str, help="Output file path", default="output.csv")
    parser.add_argument("--workers", type=int, help="Number of subprocesses", default=max(1, os.cpu_count() - 1))
    parser.add_argument("--chunk_size", type=int, help="Chunk size in bytes", default=64 * 1024 * 1024)
    parser.add_argument("--verbose", type=bool, help="Suppress extra output", default=False)

    args = parser.parse_args()
    main(args.input, args.output, args.workers, args.chunk_size, verbose=args.verbose)

Thanks again!
--Doug

@dougmercer dougmercer merged commit 65949b4 into dougmercer-yt:main Apr 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants