Skip to content

Commit

Permalink
add validation and conversions of lat/lon to locations passed via arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
hugopendlebury committed Feb 22, 2024
1 parent 6b2b57b commit e20e5cc
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
89 changes: 60 additions & 29 deletions src/gribreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ GribReader::GribReader(string filepath) : filepath(filepath) {
GribReader GribReader::withLocations(std::shared_ptr<arrow::Table> locations) {
//TODO - add some validation
validateLocationFields(locations, " passed conversions via arrow");
locations = GribReader::enrichLocationsWithSurrogateKey(locations);
locations = enrichLocationsWithSurrogateKey(locations);
locations = castTableFields(locations, " passed conversions via arrow", getLocationFieldDefinitions());
this->shared_locations = locations;
return *this;
Expand All @@ -77,24 +77,36 @@ void GribReader::validateLocationFields(std::shared_ptr<arrow::Table> locations,

std::shared_ptr<arrow::Table> GribReader::enrichLocationsWithSurrogateKey(std::shared_ptr<arrow::Table> locations) {
//Append an additional column to the table called surrogate key
auto numberOfRows = locations.get()->num_rows();
auto surrogate_columns = createSurrogateKeyCol(numberOfRows);
auto skField = arrow::field("surrogate_key", arrow::uint16());
auto chunkedArray = std::make_shared<arrow::ChunkedArray>(arrow::ChunkedArray(surrogate_columns.ValueOrDie()));
auto locationsResult = locations.get()->AddColumn(0, skField, chunkedArray);
if (!locationsResult.ok()) {
std::string errDetails = "Error adding surrogate key "
" " + locationsResult.status().message();
throw UnableToCreateArrowTableReaderException(errDetails);

}
return locationsResult.ValueOrDie();

auto locationsTable = locations.get();
auto columns = locations.get()->ColumnNames();
std::set<std::string> columnsSet(std::make_move_iterator(columns.begin()),
std::make_move_iterator(columns.end()));

const bool hasSurrogateKey = columnsSet.find("surrogate_key") != columnsSet.end();

if (!hasSurrogateKey) {
std::cout << "Enriching with surrogate_key field " << std::endl;
auto numberOfRows = locations.get()->num_rows();
auto surrogate_columns = createSurrogateKeyCol(numberOfRows);
auto skField = arrow::field("surrogate_key", arrow::uint16());
auto chunkedArray = std::make_shared<arrow::ChunkedArray>(arrow::ChunkedArray(surrogate_columns.ValueOrDie()));
auto locationsResult = locations.get()->AddColumn(0, skField, chunkedArray);
if (!locationsResult.ok()) {
std::string errDetails = "Error adding surrogate key "
" " + locationsResult.status().message();
throw UnableToCreateArrowTableReaderException(errDetails);

}
auto newColumns = locationsResult.ValueOrDie()->ColumnNames();
return locationsResult.ValueOrDie();
}
return locations;

}


GribReader GribReader::withRepeatableIterator(bool repeatable) {
//TODO - add some validation
this->isRepeatable = repeatable;
return *this;
}
Expand Down Expand Up @@ -157,26 +169,45 @@ std::shared_ptr<arrow::Table> GribReader::castTableFields(std::shared_ptr<arrow:
//trying to remove and add wasn't working either so we basically create a new table

auto table = arrow_table.get();


std::vector<std::shared_ptr<arrow::ChunkedArray>> resultsArray;
arrow::FieldVector fieldVector;

//We don't want to loose any columns which won't be cast
auto columnNames = table->ColumnNames();

for (auto colDetails: fieldTypes) {

auto colName = colDetails.first;
auto colType = colDetails.second;
fieldVector.push_back(arrow::field(colName, colType));
for (auto columnName : columnNames) {

auto it = fieldTypes.find(columnName);

auto chunkVector = castColumn(arrow_table, colName, colType);
auto col = table->GetColumnByName(colName).get();
auto chunkedArrayResult = col->Make(*chunkVector, colType);
if(chunkedArrayResult.ok()) {
resultsArray.emplace_back(chunkedArrayResult.ValueOrDie());
}else {
std::string errMsg = "Unable to create arrow column for column " + colName + " " + chunkedArrayResult.status().message();
throw InvalidSchemaException(errMsg);
}
if(it != fieldTypes.end()) {
auto colName = it->first;
auto colType = it->second;
fieldVector.push_back(arrow::field(colName, colType));

auto chunkVector = castColumn(arrow_table, colName, colType);
auto col = table->GetColumnByName(colName).get();
auto chunkedArrayResult = col->Make(*chunkVector, colType);
if(chunkedArrayResult.ok()) {
resultsArray.emplace_back(chunkedArrayResult.ValueOrDie());
}else {
std::string errMsg = "Unable to create arrow column for column " + colName + " " + chunkedArrayResult.status().message();
throw InvalidSchemaException(errMsg);
}
}
else {
//Make sure we add any columns which aren't cast to the table
//This whole thing seems to messy and complex
//TODO - Investigate issue with swap
//This is a small table so although not ideal this will do for now
auto col = table->GetColumnByName(columnName).get();
fieldVector.push_back(arrow::field(columnName, col->type()));
auto chunkedArrayResult = col->Make(col->chunks(), col->type());
if(chunkedArrayResult.ok()) {
resultsArray.emplace_back(chunkedArrayResult.ValueOrDie());
}
}
}

auto schema = arrow::schema(fieldVector);
Expand All @@ -191,7 +222,7 @@ GribReader GribReader::withLocations(std::string path){
//Reads a CSV with the location data and enriches it with a row_number / surrogate_key

std::shared_ptr<arrow::Table> locations = getTableFromCsv(path, arrow::csv::ConvertOptions::Defaults());
return GribReader::withLocations(locations);
return withLocations(locations);

}

Expand Down
2 changes: 1 addition & 1 deletion src/gribreader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class GribReader
std::shared_ptr<arrow::Table> getTableFromCsv(std::string path, arrow::csv::ConvertOptions convertOptions);
arrow::Result<std::shared_ptr<arrow::Array>> createSurrogateKeyCol(long numberOfRows);
void validateConversionFields(std::shared_ptr<arrow::Table> conversions, std::string table_name);
std::shared_ptr<arrow::Table> GribReader::castTableFields(std::shared_ptr<arrow::Table> arrow_table,
std::shared_ptr<arrow::Table> castTableFields(std::shared_ptr<arrow::Table> arrow_table,
std::string table_name,
std::unordered_map<std::string, std::shared_ptr<arrow::DataType>> fieldTypes);
void validateLocationFields(std::shared_ptr<arrow::Table> locations, std::string table_name) ;
Expand Down

0 comments on commit e20e5cc

Please sign in to comment.