Skip to content

Commit

Permalink
Change loading into BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
flpmonteiro committed Apr 12, 2024
1 parent 0adac88 commit 2d2a931
Showing 1 changed file with 74 additions and 44 deletions.
118 changes: 74 additions & 44 deletions nyc_citibike/assets/rides.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,66 +116,74 @@ def bike_rides_to_duckdb(context, database: DuckDBResource) -> None:
end_lng,
member_casual,
'{year_month}' as partition_date
from '{constants.BIKE_RIDES_FILE_PATH.format(year_month)}';
from '{constants.RAW_FILE_PATH.format(year_month)}';
"""

with database.get_connection() as conn:
conn.execute(query)


@asset(
deps=["bike_rides_file"],
partitions_def=monthly_partition,
group_name="ingested",
deps=["download_extract_historic_ride_data"],
group_name="bigquery",
)
def bike_rides_to_bigquery(context, bigquery_resource: BigQueryResource):
partition_date_str = context.asset_partition_key_for_output()
year_month = partition_date_str
def create_bigquery_table(bigquery_resource: BigQueryResource):
dataset = "nyc_citibike_data"
table_name = "rides_raw"
project_id = bigquery_resource.project
full_table_id = f"{project_id}.{dataset}.{table_name}"

csv_file = constants.BIKE_RIDES_FILE_PATH.format(year_month)
df = pd.read_csv(csv_file)
# Schema capable of handling current format and previous format
schema = [bigquery.SchemaField(k, v) for k, v in constants.SCHEMA.items()]

schema = "nyc_citibike_data"
table_name = "rides_raw"
table = bigquery.Table(full_table_id, schema=schema)

with bigquery_resource.get_client() as client:
# Load data into a staging table
main_table = f"{schema}.{table_name}"
staging_table = f"{schema}.{table_name}_staging"

# Ensure the main table exists
try:
client.get_table(main_table)
client.get_table(full_table_id)
except NotFound:
# If the table does not exist, create it or handle accordingly
job = client.load_table_from_dataframe(dataframe=df, destination=main_table)
job.result()
return None
client.create_table(table)


@asset(
deps=["download_extract_historic_ride_data"],
partitions_def=yearly_partition,
)
def convert_csv_to_parquet(context) -> None:
year = context.asset_partition_key_for_output()
data_dir = constants.RAW_FILE_PATH
pattern = os.path.join(data_dir, f"{year}-citibike-tripdata", "**", "*.csv")
csv_files = glob.glob(pattern, recursive=True)

for csv_file_path in csv_files:
df = pd.read_csv(csv_file_path, dtype=str)
df.to_parquet(os.path.splitext(csv_file_path)[0] + ".parquet", index=False)


@asset(
deps=["create_bigquery_table", "convert_csv_to_parquet"],
partitions_def=yearly_partition,
group_name="bigquery",
)
def bike_rides_to_bigquery(context, bigquery_resource: BigQueryResource):
dataset = "nyc_citibike_data"
table_name = "rides_raw"
main_table = f"{bigquery_resource.project}.{dataset}.{table_name}"
staging_table = (
f"{bigquery_resource.project}.{dataset}.{table_name}_staging_parquet"
)

job = client.load_table_from_dataframe(dataframe=df, destination=staging_table)
job.result()
year = context.asset_partition_key_for_output()
data_dir = constants.RAW_FILE_PATH
pattern = os.path.join(data_dir, f"{year}-citibike-tripdata", "**", "*.parquet")
parquet_files = glob.glob(pattern, recursive=True)

# Use SQL to merge staging data into the main table, avoiding duplicates
merge_query = f"""
merge_query = f"""
MERGE INTO `{main_table}` AS main
USING `{staging_table}` AS staging
ON main.ride_id = staging.ride_id
WHEN NOT MATCHED BY TARGET THEN
INSERT (
ride_id,
rideable_type,
started_at,
ended_at,
start_station_name,
start_station_id,
end_station_name,
end_station_id,
start_lat,
start_lng,
end_lat,
end_lng,
member_casual
)
INSERT ({",".join(constants.SCHEMA.keys())})
VALUES (
staging.ride_id,
staging.rideable_type,
Expand All @@ -192,8 +200,30 @@ def bike_rides_to_bigquery(context, bigquery_resource: BigQueryResource):
staging.member_casual
)
"""
query_job = client.query(merge_query)
query_job.result()

# Clean up the staging table after merge
client.delete_table(staging_table, not_found_ok=True)
job_config = bigquery.LoadJobConfig(
# schema=[bigquery.SchemaField(k, v) for k, v in constants.SCHEMA_NEW.items()],
# skip_leading_rows=1, # Skip header row in CSV files
source_format=bigquery.SourceFormat.PARQUET,
)

with bigquery_resource.get_client() as client:
for file_path in parquet_files:
print(f"Begin loading data from {os.path.basename(file_path)}")
with open(file_path, "rb") as f:
# Load data into a staging table
job = client.load_table_from_file(
file_obj=f, destination=staging_table,
job_config=job_config
)
job.result()
print(
f"Done loading data from {os.path.basename(file_path)} into staging"
)

# Use SQL to merge staging data into the main table, avoiding duplicates
# query_job = client.query(merge_query)
# query_job.result()

# Clean up the staging table after merge
# client.delete_table(staging_table, not_found_ok=True)

0 comments on commit 2d2a931

Please sign in to comment.