Skip to content

Commit

Permalink
Add function to download and extract historical data
Browse files Browse the repository at this point in the history
Historical data comes in zip files, partitioned by year.
Each zip files contains one directory for each month of the year,
and each month directory containes csv files with ride data.
If a month has more than 1000000 rides, the csv files are split such
that each file has at most 1000000 rides. (For example, some months have
data split into 4 csv file.)
  • Loading branch information
flpmonteiro committed Apr 12, 2024
1 parent 156d2ed commit 0adac88
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions nyc_citibike/assets/rides.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import zipfile
import os
import glob
from dagster_gcp.bigquery.resources import bigquery_resource
import requests
import pandas as pd
from io import BytesIO
Expand All @@ -11,7 +14,7 @@
from dagster_gcp import BigQueryResource

from . import constants
from ..partitions import monthly_partition
from ..partitions import monthly_partition, yearly_partition


def download_and_extract(url: str, destination_path: str) -> bool:
Expand All @@ -38,39 +41,37 @@ def download_and_extract(url: str, destination_path: str) -> bool:


@asset(
partitions_def=monthly_partition,
partitions_def=yearly_partition,
group_name="raw_files",
)
def bike_rides_file(context) -> MaterializeResult:
def download_extract_historic_ride_data(context) -> None:
"""
Download files of Citi Bike trip data.
"""
partition_date_str = context.asset_partition_key_for_output()
year_month = partition_date_str
year = context.asset_partition_key_for_output() # partition date string

url = constants.HISTORIC_DOWNLOAD_URL.format(year)
raw_file_path = constants.RAW_FILE_PATH

url = constants.DOWNLOAD_URL.format(year_month)
raw_file_path = constants.BIKE_RIDES_FILE_PATH.format(year_month)

# Attempt to download and save file to disk.
# If successful, load as DataFrame to return some metadata
if download_and_extract(url, raw_file_path):
print(f"File saved successfully to {raw_file_path}")
df = pd.read_csv(raw_file_path)
return MaterializeResult(
metadata={
"Number of records": len(df),
"Preview": MetadataValue.md(df.head().to_markdown()),
}
)
else:
print(f"Failed to download or extract the file")
return MaterializeResult(metadata={"Error": "Download or extraction failed"})
# Download the zip file
print(f"Starting download from url {url}")
response = requests.get(url)
zip_content = BytesIO(response.content)

# Use zipfile to extract CSV files
with zipfile.ZipFile(zip_content) as zip_ref:
# List all the file names in the zip
for file_name in zip_ref.namelist():
# Check if the file is a CSV
if file_name.endswith(".csv"):
# Extract the file to the specified directory
zip_ref.extract(file_name, raw_file_path)


@asset(
deps=["bike_rides_file"],
partitions_def=monthly_partition,
group_name="ingested",
deps=["download_extract_historic_ride_data"],
partitions_def=yearly_partition,
group_name="duckdb",
)
def bike_rides_to_duckdb(context, database: DuckDBResource) -> None:
"""
Expand Down

0 comments on commit 0adac88

Please sign in to comment.