DEV Community

Cover image for Building a Robust Data Lake with Python and DuckDB
Dima
Dima

Posted on

Building a Robust Data Lake with Python and DuckDB

Every morning, when I wake up, the weight of responsibility settles on my shoulders. It’s a weight that fuels me, driving me to provide for my customers, myself. Today, that responsibility involves crafting a powerful data lake from scratch using Python and DuckDB.

First, let’s define the architecture. My plan is to store Parquet files in S3, using Dagster to orchestrate the Python application and the DuckDB engine.

I will immerse you in the world of Data Lake, Python, and DuckDB. I will provide a step-by-step, practical guide full of examples.

I’ll use the HTTP Range header to read parts of the Parquet files stored in S3 at random. This allows me to access the specific information I need without having to download entire files, saving both time and resources. Here, Python will be my tool of choice, a reliable ally in navigating the complexity of data manipulation.

Advanced Handling of Parquet Files

For efficient handling of large Parquet files in S3, we’ll use the HTTP Range header to read file chunks.

import boto3
from io import BytesIO
import pandas as pd
import pyarrow.parquet as pq

s3 = boto3.client('s3', region_name='your_region', aws_access_key_id='your_key_id', aws_secret_access_key='your_access_key')

def read_parquet_file(bucket, key, start_range, end_range):
    response = s3.get_object(Bucket=bucket, Key=key, Range=f'bytes={start_range}-{end_range}')
    data_chunk = response['Body'].read()
    df = pq.read_table(source=BytesIO(data_chunk)).to_pandas()
    return df

# Now you can use the function to read a chunk of the Parquet file
df = read_parquet_file('mybucket', 'file.parquet', 0, 10000)
Enter fullscreen mode Exit fullscreen mode

Here comes DuckDB, our analytical database, tasked with converting and processing the data. DuckDB, although an incredibly powerful tool, is not without its limitations. Its in-memory nature can lead to performance issues with larger datasets, and its feature set isn’t as comprehensive as some other databases, like SQLite. Yet, for our data lake, DuckDB’s column-oriented design and vectorized query execution make it a potent choice.

Sophisticated DuckDB Processing

Processing data in DuckDB can go beyond simple queries. We can handle advanced operations, including joins, aggregations, and window functions:

con = duckdb.connect('duckdb_file.db')

# Assume we have two tables, orders and customers, in our database
query = """
    SELECT c.name, COUNT(o.id) OVER (PARTITION BY c.id) as num_orders
    FROM customers c
    LEFT JOIN orders o ON c.id = o.customer_id
"""

df = con.execute(query).fetch_df()
Enter fullscreen mode Exit fullscreen mode

For orchestration, I turn to Dagster, employing its Software-Defined Assets to model and manage the data. With Dagster, I create pipelines to run my Python application, allowing me to define dependencies and track the lineage of my data. This way, I know where everything is, and where it came from — a priceless asset in the ever-changing world of data.

Utilizing S3 for Advanced Storage Needs

When dealing with S3, we often need more than just simple file operations. Boto3 supports advanced features like multipart uploads for large files:

def upload_large_file(bucket, key, local_file):
    s3 = boto3.client('s3')
    transfer = boto3.s3.transfer.S3Transfer(s3)
    transfer.upload_file(local_file, bucket, key)

upload_large_file('mybucket', 'largefile.parquet', '/path/to/largefile.parquet')
Enter fullscreen mode Exit fullscreen mode

Deep Dive into DuckDB Engine Set-Up

Setting up the DuckDB engine can include advanced options such as configuring the number of threads, memory limit, and enabling or disabling specific optimizations:

con = duckdb.connect('my_database.duckdb')

# Configure settings
con.execute("PRAGMA threads=4") # Use 4 threads
con.execute("PRAGMA memory_limit='4GB'") # Limit memory usage to 4GB

Enter fullscreen mode Exit fullscreen mode

Orchestrating Complex Tasks with Dagster

Using Dagster, we can orchestrate complex, dependent tasks. Here’s an example of a pipeline where one task prepares data and another analyzes it:

@asset
def prepare_data():
    # Some complex data preparation here
    data = load_data()  # Assume this function loads data
    prepared_data = data * 2
    return prepared_data

@asset(ins={"prepared_data": AssetIn(prepare_data)})
def analyze_data(prepared_data):
    # Some complex analysis here
    results = prepared_data.sum()
    return results
Enter fullscreen mode Exit fullscreen mode

Running Advanced Python Application with Dagster

With Dagster, you can establish more complex pipelines that involve dependencies, conditionals, and more. Here is an example of a pipeline that involves two solid functions where one depends on the other:

from dagster import pipeline, solid

@solid
def process_data(context, df):
    df_processed = df * 2  # Some complex data processing
    return df_processed

@solid
def analyze_data(context, df):
    result = df.sum()  # Some complex data analysis
    context.log.info(f"Result: {result}")

@pipeline
def complex_pipeline():
    analyze_data(process_data())
Enter fullscreen mode Exit fullscreen mode

Integration of All Components

Let’s tie all the components together in a more complex pipeline that reads a Parquet file from S3, processes it using DuckDB, and uploads the result back to S3.

from dagster import execute_pipeline, ModeDefinition, fs_io_manager

@pipeline(
    mode_defs=[
        ModeDefinition(resource_defs={"io_manager": fs_io_manager}),
    ]
)
def data_lake_pipeline():
    data = read_parquet_file('mybucket', 'file.parquet', 0, 10000)
    processed_data = process_data(data)
    result = analyze_data(processed_data)
    upload_large_file('mybucket', 'result.parquet', result)

result = execute_pipeline(data_lake_pipeline)
Enter fullscreen mode Exit fullscreen mode

Monitor Your System

To ensure that our data lake runs smoothly, we need to incorporate monitoring. Let’s use AWS CloudWatch for logging and monitoring our application:

import boto3

def log_to_cloudwatch(message):
    client = boto3.client('logs', region_name='your_region')
    log_group = 'my_log_group'
    log_stream = 'my_log_stream'
    response = client.put_log_events(
        logGroupName=log_group,
        logStreamName=log_stream,
        logEvents=[{'timestamp': 1000, 'message': message}]
    )
    return response

# Now you can log any event or error in your application
log_to_cloudwatch('Data processing started')
Enter fullscreen mode Exit fullscreen mode

Scale Up Your System

As our data grows, our system should scale accordingly. We can create multiple DuckDB instances, each handling a portion of our data. This could be orchestrated in a distributed fashion using Apache Spark or a similar system:

from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Read data into a Spark DataFrame
df = spark.read.parquet("s3:https://mybucket/file.parquet")

# Create a temporary view for SQL queries
df.createOrReplaceTempView("my_data")

# Execute a SQL query using DuckDB and store the result back in a Spark DataFrame
result = spark.sql("SELECT * FROM my_data WHERE value > 100")
Enter fullscreen mode Exit fullscreen mode

Remember that complexity is the essence of the game. It encourages us to explore, expand our knowledge, and reach new horizons. As in business, data science is not about making quick decisions. It’s about taking responsibility, striving for more, and gradually improving every day.

Top comments (0)