Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
acse-ww721 committed Sep 12, 2023
1 parent 2887fdf commit 7888db7
Show file tree
Hide file tree
Showing 8 changed files with 6,799 additions and 8,213 deletions.
148 changes: 129 additions & 19 deletions src/preprocessing/asos_preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
# Name: Wenqi Wang
# Github username: acse-ww721

import os
import gc
import numpy as np
import pandas as pd
import xarray as xr
import dask.dataframe as dd
from tqdm import tqdm
from datetime import datetime, timedelta
from utils import folder_utils
from era5_preprocessing import regrid

"""V15"""


def get_csv_list(country, data_folder, data_category, output_folder):
"""
Get a list of CSV files and their corresponding station names for a specific country.
This function searches for CSV files in the specified folder and returns a list of file paths along with their corresponding station names.
Args:
country (str): The country code or identifier.
data_folder (str): The path to the data folder.
data_category (str): The data category.
output_folder (str): The output folder name.
Returns:
tuple: A tuple containing two lists - the list of CSV file paths and the list of corresponding station names.
Example:
>>> country = "GB"
>>> data_folder = "data"
>>> data_category = "processed_data"
>>> output_folder = "ASOS_DATA"
>>> csv_file_paths, station_list = get_csv_list(country, data_folder, data_category, output_folder)
# Retrieves a list of CSV files and their corresponding station names for the specified country.
"""
# Specify the folder path
input_folder_path = folder_utils.find_folder(
country, data_folder, data_category, output_folder
Expand All @@ -39,6 +60,28 @@ def get_csv_list(country, data_folder, data_category, output_folder):


def extract_data_to_df(country, data_folder, data_category, output_folder):
"""
Extract and merge data from multiple CSV files into a single DataFrame for a specific country.
This function searches for CSV files in the specified folder, reads each file, and merges them into a single DataFrame.
Args:
country (str): The country code or identifier.
data_folder (str): The path to the data folder.
data_category (str): The data category.
output_folder (str): The output folder name.
Returns:
pandas.DataFrame: A DataFrame containing merged data from multiple CSV files.
Example:
>>> country = "GB"
>>> data_folder = "data"
>>> data_category = "processed_data"
>>> output_folder = "ASOS_DATA"
>>> merged_data_df = extract_data_to_df(country, data_folder, data_category, output_folder)
# Extracts and merges data from multiple CSV files into a single DataFrame for the specified country.
"""
# Specify the folder path
input_folder_path = folder_utils.find_folder(
country, data_folder, data_category, output_folder
Expand Down Expand Up @@ -69,9 +112,17 @@ def time_rearrange(df):
For the 50th minute of every hour (such as 01:50:00, 02:50:00, etc.),
check whether the next full point exists, and if it exists, delete the 50-minute data,
otherwise change to the next full point
Args:
df (pandas.DataFrame): A DataFrame containing a 'valid' column with time strings.
df is a DataFrame with a column named 'valid' containing time strings.
Return the processed DataFrame.
Returns:
pandas.DataFrame: The processed DataFrame with rearranged time values.
Example:
>>> import pandas as pd
>>> df = pd.DataFrame({'valid': ['2023-09-01 01:20:00', '2023-09-01 02:50:00', '2023-09-01 03:00:00']})
>>> rearranged_df = time_rearrange(df)
# Rearranges the time values in the DataFrame based on the specified rules.
"""
# Create an assistant volume and convert time strings to datetime objects
df["valid_datetime"] = pd.to_datetime(df["valid"], format="%Y-%m-%d %H:%M")
Expand Down Expand Up @@ -135,11 +186,21 @@ def time_rearrange(df):

def process_asos_rawdata(raw_df):
"""
Unified variable unit based on era5
station: three or four character site identifier
valid: observation time in UTC
tmpc: Air Temperature in Celsius, typically @ 2 meters
Process raw ASOS weather data to unify variable units based on ERA5 format.
Args:
raw_df (pandas.DataFrame): Raw ASOS weather data DataFrame.
Returns:
pandas.DataFrame: Processed DataFrame with unified variable units.
Example:
>>> import pandas as pd
>>> raw_df = pd.DataFrame({'station': ['ABC', 'XYZ'],
... 'valid': ['2023-09-01 01:20:00', '2023-09-01 02:50:00'],
... 'tmpc': [20.5, 25.0]})
>>> processed_df = process_asos_rawdata(raw_df)
# Processes raw ASOS weather data to unify variable units and convert Celsius to Kelvin.
"""

# time preprocessing
Expand All @@ -156,6 +217,25 @@ def process_asos_rawdata(raw_df):
def save_asos_processed_data(
processed_df, station, country, data_folder, data_category, output_folder
):
"""
Save processed ASOS weather data to a CSV file.
Args:
processed_df (pandas.DataFrame): Processed ASOS weather data DataFrame.
station (str): Station identifier.
country (str): Country name.
data_folder (str): Data folder name.
data_category (str): Data category name.
output_folder (str): Output folder name.
Example:
>>> import pandas as pd
>>> processed_df = pd.DataFrame({'station': ['ABC', 'XYZ'],
... 'time': ['2023-09-01T01:20:00', '2023-09-01T02:50:00'],
... 't2m': [293.65, 298.15]})
>>> save_asos_processed_data(processed_df, 'ABC', 'USA', 'data', 'weather', 'output')
# Saves processed ASOS weather data to a CSV file.
"""
output_directory = folder_utils.create_folder(
country, data_folder, data_category, output_folder
)
Expand All @@ -166,8 +246,19 @@ def save_asos_processed_data(


def merge_csv_station(country, data_folder, data_category, output_folder):
"""Merge all csv files in the folder and add station latlon information.
"""
Merge all CSV files in the folder and add station latlon information.
The merged CSV files are saved by year.
Args:
country (str): Country name.
data_folder (str): Data folder name.
data_category (str): Data category name.
output_folder (str): Output folder name.
Example:
>>> merge_csv_station('GB', 'data', 'processed_data', 'ERA5_DATA')
# Merges CSV files in the specified folder, adds station latlon information, and saves the merged files by year.
"""

# Process station_network
Expand Down Expand Up @@ -402,7 +493,17 @@ def get_asos_year_file_list(country, data_folder, data_category, output_folder):

def filter_data(df):
"""
Filter data by deleting rows with missing values and wrong values
Filter data by deleting rows with missing values and incorrect values.
Args:
df (pd.DataFrame): Input DataFrame containing time, latitude, and longitude columns.
Returns:
pd.DataFrame: Filtered DataFrame with valid rows.
Example:
>>> filtered_df = filter_data(raw_data_df)
# Filters the DataFrame to remove rows with missing values and rows that don't meet specific conditions.
"""
# Delete rows with missing values
df = df.dropna()
Expand All @@ -427,7 +528,22 @@ def csv_to_nc4(
merged_csv_path, year, country, data_folder, data_category, output_folder
):
"""
Convert csv files to nc4 files by year
Convert CSV files to NetCDF4 (nc4) files by year.
Args:
merged_csv_path (str): Path to the merged CSV file.
year (int): The year for which the data is being converted.
country (str): Country code or name.
data_folder (str): Folder where the data is stored.
data_category (str): Category of the data.
output_folder (str): Folder where the NetCDF4 files will be saved.
Returns:
None
Example:
>>> csv_to_nc4("merged_data.csv", 2022, "UK", "data_folder", "data_category", "output_folder")
# Converts the CSV data to a NetCDF4 file for the specified year.
"""
# Read csv files
df = pd.read_csv(merged_csv_path)
Expand All @@ -440,12 +556,6 @@ def csv_to_nc4(
ds_adjusted = ds_in.transpose("time", "latitude", "longitude")
ds_adjusted["t2m"] = ds_adjusted["t2m"].astype("float32")

# ddeg_out_lat = 0.25
# ddeg_out_lon = 0.125
# regridded_ds = regrid(
# ds_in, ddeg_out_lat, ddeg_out_lon, method="bilinear", reuse_weights=False
# )

# Save to nc4 file

output_directory = folder_utils.find_folder(
Expand Down
104 changes: 31 additions & 73 deletions src/preprocessing/asos_regrid.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,40 @@
# Name: Wenqi Wang
# Github username: acse-ww721

import os
import pandas as pd
import numpy as np
import xarray as xr
import gstools as gs
import geopandas as gpd
import matplotlib.pyplot as plt
from utils import folder_utils
from tqdm import tqdm
from asos_preprocessing import (
get_year,
get_asos_year_file_list,
get_year_from_filename,
filter_data,
)

# from asos_preprocessing import csv_to_nc4


def filter_data(df):
"""
Filter data by deleting rows with missing values and wrong values
"""
# Delete rows with missing values
df = df.dropna()
# Delete rows with wrong values
df["time"] = pd.to_datetime(df["time"]) # Convert to datetime
# If the time is not a whole hour, delete the row
is_whole_hour = (df["time"].dt.minute == 0) & (df["time"].dt.second == 0)
not_null = df["time"].notnull()
latitude_condition = (df["latitude"] >= 50) & (df["latitude"] <= 58)
longitude_condition = (df["longitude"] >= -6) & (df["longitude"] <= 2)
# Combine all conditions
combined_condition = (
is_whole_hour & not_null & latitude_condition & longitude_condition
)

filtered_df = df[combined_condition]

return filtered_df


def csv_to_nc4(
merged_csv_path, year, country, data_folder, data_category, output_folder
):
"""
Convert csv files to nc4 files by year
"""
# Read csv files
df = pd.read_csv(merged_csv_path)

# Filter data
df = filter_data(df)

ds_in = xr.Dataset.from_dataframe(df.set_index(["latitude", "longitude", "time"]))
ds_in = ds_in.sel(latitude=slice(50, 58), longitude=slice(-6, 2))
ds_adjusted = ds_in.transpose("time", "latitude", "longitude")
ds_adjusted["t2m"] = ds_adjusted["t2m"].astype("float32")

# ddeg_out_lat = 0.25
# ddeg_out_lon = 0.125
# regridded_ds = regrid(
# ds_in, ddeg_out_lat, ddeg_out_lon, method="bilinear", reuse_weights=False
# )

# Save to nc4 file

output_directory = folder_utils.find_folder(
country, data_folder, data_category, output_folder
)
output_filename = f"{country}_ASOS_filter_{year}.nc"
output_path = os.path.join(output_directory, output_filename)
ds_adjusted.to_netcdf(output_path)
print(f"{output_filename} done!")


def krige_regrid_poly(
year_df_path, year, country, data_folder, data_category, output_folder
):
"""
Perform kriging interpolation with polynomial drift modeling to regrid meteorological data.
Args:
year_df_path (str): Path to the input CSV file for the specified year.
year (int): The year for which the data is being regridded.
country (str): Country code or name.
data_folder (str): Folder where the data is stored.
data_category (str): Category of the data.
output_folder (str): Folder where the regridded data will be saved.
Returns:
None
Example:
>>> krige_regrid_poly("year_data.csv", 2022, "GB", "data_folder", "data_category", "output_folder")
# Performs kriging interpolation and polynomial drift modeling for the specified year.
"""
# 1. Load the data
df = pd.read_csv(year_df_path)
df = filter_data(df)
Expand All @@ -91,16 +49,16 @@ def krige_regrid_poly(
g_lat = np.linspace(50.0, 57.75, 32) # latitude
# gridx, gridy = np.meshgrid(gridx, gridy)

# 4. Drift term
def north_south_drift(lat, lon):
return lat

# 4. Drift term

def polynomial_drift(lat, lon):
return [1, lat, lon, lat**2, lon**2, lat * lon]
# # 4. Drift term
# def north_south_drift(lat, lon):
# return lat
#
# # 4. Drift term
#
# def polynomial_drift(lat, lon):
# return [1, lat, lon, lat**2, lon**2, lat * lon]

# 4. Drift terms
# 4. Drift terms

def drift_1(lat, lon):
return 1
Expand Down Expand Up @@ -197,6 +155,6 @@ def drift_lat_lon(lat, lon):
country, data_folder, data_save_category, output_folder
)
for year, csv_path in tqdm(zip(year_list, csv_paths)):
krige_regrid(
krige_regrid_poly(
csv_path, year, country, data_folder, data_save_category, output_folder
)
Loading

0 comments on commit 7888db7

Please sign in to comment.