The DataLoader
script is designed to load data from a REST API into a destination database using DLT pipelines based on a configuration file. The script supports both full and incremental data loads, ensuring that data is consistently replaced with the latest available data.
- DataLoader Class: Handles the entire data loading process.
- DLT Pipeline: Uses DLT (Data Load Tool) to manage data pipelines.
- REST API Integration: Fetches data from a REST API that returns data in JSONL (JSON Lines) format.
dlt
: Data Load Tool for managing data pipelines.requests
: For making HTTP requests to the REST API.logging
: For logging information, warnings, and errors.sys
,base64
,datetime
,timedelta
: Standard Python libraries.rest_api
,token_retriever
,toml_updater
: Custom modules for API configuration, token retrieval, and configuration updates.
python data_loader.py [full|incremental] [s3_url]
full
orincremental
: Specifies whether to perform a full load or an incremental load.s3_url
: The S3 URL where the data will be stored.
- config_path: Path to the JSON configuration file (
../configs/configs.json
). - secrets_path: Path to the secrets configuration file (
./.dlt/secrets.toml
).
The DataLoader
class manages the loading of data from a REST API into a destination database using DLT pipelines. It is initialized with the following attributes:
base_url
: The base URL of the REST API.token
: Authorization token for the REST API.dataset
: The name of the dataset.config_path
: Path to the JSON configuration file.
Loads the configuration from a JSON file.
Creates and returns a DLT pipeline configured for a specific dataset.
Generates and returns the REST API configuration for a specified resource and date range. This configuration includes the endpoint parameters and headers required for the API request.
Creates a generator function to fetch data from an API that returns data in JSONL format.
The fetch_api_data
method is decorated with @dlt.resource()
, indicating that it is a data source for the DLT pipeline. This function makes an HTTP GET request to the specified API endpoint and yields each line of JSONL data as a JSON object.
Loads data for a specific resource and date range. It performs the following steps:
- Fetches the API configuration.
- Creates the fetch API data generator.
- Runs the DLT pipeline, replacing the existing data in the table with the new data.
Executes a full load of data up to the end of the previous day for all resources specified in the configuration file.
Executes an incremental load of data for the last day for all resources specified in the configuration file.
The script accepts command-line arguments to determine the load type (full
or incremental
) and the S3 URL for data storage. Based on the load type, it either runs a full load or an incremental load.
{
"resources": [
{
"name": "resource_1",
"params": {
"always_full": false,
"where": "1=1"
}
},
{
"name": "resource_2",
"params": {
"always_full": true,
"where": "status = 'active'"
}
}
]
}
The fetch_api_data
function handles JSONL responses from the API. JSONL (JSON Lines) format is a convenient format for streaming JSON objects. Each line in a JSONL file is a valid JSON object.
@dlt.resource()
def fetch_api_data():
response = requests.get(base_url, headers=headers, params=params, stream=True)
if response.status_code != 200:
raise ValueError(f"API request failed with status: {response.status_code}")
for line in response.iter_lines():
if line:
yield json.loads(line.decode('utf-8'))
@dlt.resource()
Decorator: This decorator indicates thatfetch_api_data
is a data source for the DLT pipeline.- Streaming Data: The
stream=True
parameter in therequests.get
call enables streaming of the response, allowing the function to handle large datasets efficiently. - Line-by-Line Processing: The function processes each line of the response, decoding it and yielding it as a JSON object.
The script includes error handling for configuration loading, API requests, and data loading operations. Errors are logged with appropriate messages to help diagnose issues.
The DataLoader
script offer a solution for loading data from a REST API into a destination database using DLT pipelines. It ensures data consistency by replacing the existing data with each load, making it suitable for both full and incremental data loads. The use of the @dlt.resource()
decorator and efficient handling of JSONL responses make it a scalable and efficient solution for data integration tasks.