An open-source, dataframe-centric ETL framework for building lakehouses with a DataOps approach.
Laktory, the lakehouse factory, is an open-source framework designed for building, deploying, and executing data pipelines while adhering to essential DevOps best practices such as source control, code reviews, and CI/CD. Leveraging Apache Spark and Polars as its core data transformation engines, Laktory ensures robust and scalable data processing capabilities.
While a Laktory data pipeline can be run locally for small datasets or prototyping, it really starts to shine when deployed and orchestrated on a cloud data platform, such as Databricks. When combined with Delta Live Tables, it provides a top-tier, simple and low maintenance fully managed solution.
Beyond just data pipelines, Laktory allows for the comprehensive definition and deployment of your entire data platform. This includes everything from cloud infrastructure to data tables, security, and quality monitoring systems, providing an all-in-one solution for modern data platform management.
See documentation for more details.
Install using
pip install laktory[{cloud_provider}]
where {cloud_provider}
is azure
, aws
or gcp
.
For more installation options, see the Install section in the documentation.
from laktory import models
node_brz = models.PipelineNode(
name="brz_stock_prices",
layer="BRONZE",
source={
"format": "PARQUET",
"path": "./data/brz_stock_prices/"
},
transformer={
"nodes": [
{
"func_name": "select",
"func_args": [
"symbol",
"timestamp",
"open",
"close",
],
},
]
}
)
node_slv = models.PipelineNode(
name="slv_stock_prices",
layer="SILVER",
source={
"node_name": "brz_stock_prices"
},
sink={
"path": "./data/slv_stock_prices",
"mode": "OVERWRITE",
"format": "PARQUET",
},
transformer={
"nodes": [
{
"func_name": "drop_duplicates",
"func_kwargs": {
"subset": ["timestamp", "symbol"]
}
},
]
}
)
pipeline = models.Pipeline(
name="stock_prices",
nodes=[node_brz, node_slv],
)
print(pipeline)
#> resource_name_=None options=ResourceOptions(variables={}, depends_on=[], provider=None, aliases=None, delete_before_replace=True, ignore_changes=None, import_=None, parent=None, replace_on_changes=None) variables={} databricks_job=None dlt=None name='stock_prices' nodes=[PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None), PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=True, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[], spark_func_kwargs={'subset': SparkFuncArg(variables={}, value=['timestamp', 'symbol'])}, spark_func_name='drop_duplicates', sql_expression=None)]), expectations=[], layer='SILVER', name='slv_stock_prices', primary_key=None, sink=FileDataSink(variables={}, mode='OVERWRITE', checkpoint_location=None, format='PARQUET', path='./data/slv_stock_prices', write_options={}), source=PipelineNodeDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, node_name='brz_stock_prices', node=PipelineNode(variables={}, add_layer_columns=True, dlt_template='DEFAULT', description=None, drop_duplicates=None, drop_source_columns=False, transformer=SparkChain(variables={}, nodes=[SparkChainNode(variables={}, allow_missing_column_args=False, column=None, spark_func_args=[SparkFuncArg(variables={}, value='symbol'), SparkFuncArg(variables={}, value='timestamp'), SparkFuncArg(variables={}, value='open'), SparkFuncArg(variables={}, value='close')], spark_func_kwargs={}, spark_func_name='select', sql_expression=None)]), expectations=[], layer='BRONZE', name='brz_stock_prices', primary_key=None, sink=None, source=FileDataSource(variables={}, as_stream=False, broadcast=False, cdc=None, dataframe_type='SPARK', drops=None, filter=None, mock_df=None, renames=None, selects=None, watermark=None, format='PARQUET', header=True, multiline=False, path='./data/brz_stock_prices/', read_options={}, schema_location=None), timestamp_key=None)), timestamp_key=None)] orchestrator=None udfs=[]
pipeline.execute(spark=spark)
To get started with a more useful example, jump into the Quickstart.
A comprehensive template on how to deploy a lakehouse as code using Laktory is maintained here: https://github.com/okube-ai/lakehouse-as-code.
In this template, 4 pulumi projects are used to:
{cloud_provider}_infra
: Deploy the required resources on your cloud providerunity-catalog
: Setup users, groups, catalogs, schemas and Lakehouse grantworkspace-conf
: Setup secrets, clusters and warehousesworkspace
: The data workflows to build your lakehouse.
Okube is dedicated to building open source frameworks, known as the kubes, empowering businesses to build, deploy and operate highly scalable data platforms and AI models.