A lightweight tool to fetch tables from BigQuery as pandas DataFrames very fast using BigQuery Storage API combined with multiprocessing. This module also aims to fetch large tables that cannot fit into memory, by chunking the table in a smart and scalable way.
pip install bqfetch
pip install -r requirements.txt
- Fetch all distinct values from the given index
column
. - Divide these indices in
chunks
based on the available memory and the number of cores on the machine. If multiprocessing
:- Each chunk will be divided in multiple sub-chunks based on the parameter
nb_cores
and the available memory. - For each sub-chunk, create a temporary table containing all the matching rows in the whole table.
- Fetch these temporary tables using BigQuery Storage as dataframes.
- Merge the dataframes.
- Delete temporary tables.
- Each chunk will be divided in multiple sub-chunks based on the parameter
If !multiprocessing
:- Same process with only one temporary table and no parallel processes created.
id | Name | Age |
---|---|---|
187 | Bartolomé | 30 |
188 | Tristan | 22 |
... | ... | ... |
>>> table = BigQueryTable("PROJECT", "DATASET", "TABLE")
>>> fetcher = BigQueryFetcher('/path/to/service_account.json', table)
>>> chunks = fetcher.chunks('id', by_chunk_size_in_GB=5)
>>> for chunk in chunks:
df = fetcher.fetch(chunk, nb_cores=-1, parallel_backend='billiard')
# ...
- First, we have to create a
BigQueryTable
object which contains the path to the BigQuery table stored in GCP. - A fetcher is created, given in parameter the absolute path to the service_account.json file, the file is mandatory in order to do operations in GCP.
- Chunks the whole table, given the
column
name and the chunk size. In this case, choosing the id column is perfect because this each value of this column appears the same number of times: 1 time. Concerning the chunks size, if by_chunk_size_in_GB=5, each chunk that will be fetched on the machine will be of size 5GB. Thus it has to fit into memory. You need to save 1/3 more memory because the size of a DataFrame object is larger than the raw fetched data. - For each chunk, fetch it.
nb_cores
=-1 will use the number of cores available on the machine.parallel_backend
='billiard' | 'joblib' | 'multiprocessing' specify the backend framework to use.
It is also possible to use by_nb_chunks
instead of by_chunk_size_in_GB
. It will divided the table in N, so you cannot control more flexibly the size of each chunk.
>>> table = BigQueryTable("PROJECT", "DATASET", "TABLE")
>>> fetcher = BigQueryFetcher('/path/to/service_account.json', table)
>>> chunks = fetcher.chunks('id', by_nb_chunks=10)
>>> for chunk in chunks:
df = fetcher.fetch(chunk, nb_cores=-1, parallel_backend='billiard')
# ...
>>> chunks = fetcher.chunks(column='id', by_nb_chunks=1, verbose=True)
# Available memory on device: 7.04GB
# Size of table: 2.19GB
# Prefered size of chunk: 3GB
# Size per chunk: 3GB
# Nb chunks: 1
# Nb values in "id": 96
# Chunk size: 3GB
# Nb chunks: 1
>>> for chunk in chunks:
>>> df = fetcher.fetch(chunk=chunk, nb_cores=1, parallel_backend='joblib', verbose=True)
# Use multiprocessing : False
# Nb cores: 1
# Parallel backend: joblib
# Time to fetch: 43.21s
# Nb lines in dataframe: 3375875
# Size of dataframe: 2.83GB
We recommend to use this tool only in the case where the table to fetch contains a column that can be easily chunked (divided in small parts). Thus the perfect column to achieve this is a column containing distinct values or values that appear ~ the same number of time. If some values appear thousands of times and some only fews, then the chunking will not be reliable because we need to make the assumption that each chunk will be approximatively the same size in order to estimate the needed memory necessary to fetch in an optimize way the table.
This column contains distinct values so can be divided in chunks easily.
Card number |
---|
4390 3849 ... |
2903 1182 ... |
0562 7205 ... |
... |
This column can contains a lot of variance between values so the chunking will not be reliable.
Age |
---|
18 |
18 |
64 |
18 |
... |
I remind you that adding more cores to the fetching process will not necessarily gain performance and most of the time it will even be slower. The reason is that the fetching is directly dependent on the Internet bandwidth available on your network, not the number of working cores or the computer power. However, you can do your own tests and in some cases the multiprocessing can gain time (ex: in the case where cloud machines allow only an amount of bandwidth by core, multiplying the number of cores will also multiplying the bandwidth, ex: GCP compute engines).
bqfetch is open to new contributors, especially for bug fixing or implementation of new features. Do not hesitate to open an issue/pull request :)
Copyright (c) 2021-present, Tristan Bilot