Skip to content

Commit

Permalink
validation of conversions and enabling skipped test
Browse files Browse the repository at this point in the history
  • Loading branch information
hugopendlebury committed Feb 3, 2024
1 parent a68a32d commit d02d150
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 77 deletions.
40 changes: 11 additions & 29 deletions src/arrowutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,24 @@ using arrow::ListBuilder;

arrow::Result<std::vector<data_row>> ColumnarTableToVector(
const std::shared_ptr<arrow::Table>& table) {
// To convert an Arrow table back into the same row-wise representation as in the
// above section, we first will check that the table conforms to our expected
// schema and then will build up the vector of rows incrementally.
//
// For the check if the table is as expected, we can utilise solely its schema.
std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
arrow::field("parameterId", arrow::int64()),
arrow::field("addition_value", arrow::float64()),
arrow::field("subtraction_value", arrow::float64()),
arrow::field("multiplication_value", arrow::float64()),
arrow::field("division_value", arrow::float64()),
arrow::field("ceiling_value", arrow::float64()),
};
auto expected_schema = std::make_shared<arrow::Schema>(schema_vector);

std::cout << "YOYO" << std::endl;

if (!expected_schema->Equals(*table->schema())) {
// The table doesn't have the expected schema thus we cannot directly
// convert it to our target representation.
std::cout << "Schemas are not matching" << std::endl;
return arrow::Status::Invalid("Schemas are not matching!");
}

//Converts the table to a row wise representation
//Note we have already checked the table column names and cast and data types
//so the schema should be fine

//TODO - should we really be just using chunk(0) ?

auto parameterIds = std::static_pointer_cast<arrow::Int64Array>(table->column(0)->chunk(0));
auto parameterIds = std::static_pointer_cast<arrow::Int64Array>(table->GetColumnByName("parameterId")->chunk(0));
auto additionValues =
std::static_pointer_cast<arrow::DoubleArray>(table->column(1)->chunk(0));
std::static_pointer_cast<arrow::DoubleArray>(table->GetColumnByName("addition_value")->chunk(0));
auto subtractionValues =
std::static_pointer_cast<arrow::DoubleArray>(table->column(2)->chunk(0));
std::static_pointer_cast<arrow::DoubleArray>(table->GetColumnByName("subtraction_value")->chunk(0));
auto multiplicationValues =
std::static_pointer_cast<arrow::DoubleArray>(table->column(3)->chunk(0));
std::static_pointer_cast<arrow::DoubleArray>(table->GetColumnByName("multiplication_value")->chunk(0));
auto divisionValues =
std::static_pointer_cast<arrow::DoubleArray>(table->column(4)->chunk(0));
std::static_pointer_cast<arrow::DoubleArray>(table->GetColumnByName("division_value")->chunk(0));
auto ceilingValues =
std::static_pointer_cast<arrow::DoubleArray>(table->column(5)->chunk(0));
std::static_pointer_cast<arrow::DoubleArray>(table->GetColumnByName("ceiling_value")->chunk(0));

std::vector<data_row> rows;
for (int64_t i = 0; i < table->num_rows(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/exceptions/invalidschemaexception.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ class InvalidSchemaException : public std::runtime_error
{
public:

InvalidSchemaException(std::string errorMsg) : std::runtime_error("Invalid Schema " + errorMsg) { }
InvalidSchemaException(std::string errorMsg) : std::runtime_error(errorMsg) { }

};
111 changes: 65 additions & 46 deletions src/gribreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ GribReader::GribReader(string filepath) : filepath(filepath) {
fin = fopen(filepath.c_str(), "rb");
if (!fin) {
throw NoSuchGribFileException(filepath);
cout << "Error: unable to open input file" << filepath << endl;
} else {
cout << "I'm ready file is " << fin << endl;

}
}
};

GribReader GribReader::withLocations(std::shared_ptr<arrow::Table> locations) {
Expand Down Expand Up @@ -86,45 +82,72 @@ void GribReader::validateConversionFields(std::shared_ptr<arrow::Table> location
}
}

void GribReader::castConversionFields(std::shared_ptr<arrow::Table> locations, std::string table_name) {
arrow::ArrayVector* GribReader::castColumn(std::shared_ptr<arrow::Table> locations,
std::string colName,
std::shared_ptr<arrow::DataType> fieldType) {

auto table = locations.get();
auto col = table->GetColumnByName(colName).get();
cp::CastOptions castOptions;
castOptions.to_type = fieldType.get();

auto chunkVector = new arrow::ArrayVector();

for (auto chunk : col->chunks()) {
auto arr = chunk.get();
auto result = cp::Cast(*arr, fieldType.get(), castOptions);
if (result.ok()) {
auto converted = result.ValueOrDie();
chunkVector->emplace_back(converted);
} else{
std::string errMsg = "Unable to cast conversion column " + colName;
throw InvalidSchemaException(errMsg);
}
}
return chunkVector;
}

std::vector<std::string> f64_cols = {"addition_value",
"subtraction_value",
"multiplication_value",
"division_value",
"ceiling_value"};
std::shared_ptr<arrow::Table> GribReader::castConversionFields(std::shared_ptr<arrow::Table> locations, std::string table_name) {

//Ok this is a PITA - Although there is a .swap method of a column it doesn't work if we want to
//swap the column with a new data type
//trying to remove and add wasn't working either so we basically create a new table

auto table = locations.get();

std::vector<std::shared_ptr<arrow::ChunkedArray>> resultsArray;
for (auto colName: f64_cols) {
auto col = table->GetColumnByName(colName).get();
arrow::FieldVector fieldVector;

cp::CastOptions castOptions;
castOptions.to_type = arrow::float64().get();

auto x = arrow::float64().get();
auto chunkVector = new arrow::ArrayVector();
for (auto chunk : col->chunks()) {
auto arr = chunk.get();
auto result = cp::Cast(*arr, arrow::float64().get(), castOptions);
if (result.ok()) {
auto converted = result.ValueOrDie();
chunkVector->emplace_back(converted);
} else{
std::string errMsg = "Unable to cast conversion column " + colName + " to float64";
throw new InvalidSchemaException(errMsg);
}
}
auto chunkedArrayResult = col->Make(*chunkVector, arrow::float64());
std::unordered_map<std::string, std::shared_ptr<arrow::DataType>> fieldTypes;
fieldTypes.emplace(make_pair("parameterId", arrow::int64()));
fieldTypes.emplace(make_pair("addition_value", arrow::float64()));
fieldTypes.emplace(make_pair("subtraction_value", arrow::float64()));
fieldTypes.emplace(make_pair("multiplication_value", arrow::float64()));
fieldTypes.emplace(make_pair("division_value", arrow::float64()));
fieldTypes.emplace(make_pair("ceiling_value", arrow::float64()));


for (auto colDetails: fieldTypes) {

auto colName = colDetails.first;
auto colType = colDetails.second;
fieldVector.push_back(arrow::field(colName, colType));

auto chunkVector = castColumn(locations, colName, colType);
auto col = table->GetColumnByName(colName).get();
auto chunkedArrayResult = col->Make(*chunkVector, colType);
if(chunkedArrayResult.ok()) {
resultsArray.push_back(chunkedArrayResult.ValueOrDie());
resultsArray.emplace_back(chunkedArrayResult.ValueOrDie());
}else {
std::string errMsg = "Unable to create arrow column for column " + colName;
throw new InvalidSchemaException(errMsg);
}

std::string errMsg = "Unable to create arrow column for column " + colName + " " + chunkedArrayResult.status().message();
throw InvalidSchemaException(errMsg);
}
}

auto schema = arrow::schema(fieldVector);
auto newTable = arrow::Table::Make(schema, resultsArray);

return newTable;

}

Expand Down Expand Up @@ -183,20 +206,20 @@ GribReader GribReader::withConversions(std::string conversionsPath) {

GribReader GribReader::withConversions(std::shared_ptr<arrow::Table> conversions) {
cout << "Setting conversions" << endl;
this->conversions = conversions.get();

auto conv = conversions.get();

//the table should contain 2 columns "lat" and "lon"
validateConversionFields(conversions, " passed conversions via arrow");

//TODO - Validate types ?

std::cout << "Fields validated" << std::endl;
conversions = castConversionFields(conversions, " passed conversions via arrow");
auto rowConversion = ColumnarTableToVector(conversions);

for (auto row : rowConversion.ValueOrDie()) {

cout << "adding conversions" << endl ;


vector<pair<conversionMethods, optional<double>>> methods {
make_pair(conversionMethods::Add, row.additionValue),
make_pair(conversionMethods::Subtract, row.subtractionValue),
Expand All @@ -214,11 +237,6 @@ GribReader GribReader::withConversions(std::shared_ptr<arrow::Table> conversions

}

//REMOVED THIS - Only present in newer compilers
//auto match = ranges::find_if(methods, [](const pair<conversionTypes, optional<double>>& v) {
// return v.second.has_value();
//});

if (match) {
cout << "Adding to cache for " << row.parameterId << endl;

Expand Down Expand Up @@ -250,8 +268,9 @@ GribReader GribReader::withConversions(std::shared_ptr<arrow::Table> conversions
auto converter = new Converter(conversionFunc, firstMatch.second.value());

conversion_funcs.emplace(row.parameterId, converter);

}

}

return *this;
Expand Down
5 changes: 4 additions & 1 deletion src/gribreader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class GribReader
std::shared_ptr<arrow::Table> getTableFromCsv(std::string path, arrow::csv::ConvertOptions convertOptions);
arrow::Result<std::shared_ptr<arrow::Array>> createSurrogateKeyCol(long numberOfRows);
void validateConversionFields(std::shared_ptr<arrow::Table> locations, std::string table_name);
void castConversionFields(std::shared_ptr<arrow::Table> locations, std::string table_name) ;
std::shared_ptr<arrow::Table> castConversionFields(std::shared_ptr<arrow::Table> locations, std::string table_name);
arrow::ArrayVector* castColumn(std::shared_ptr<arrow::Table> locations,
std::string colName,
std::shared_ptr<arrow::DataType> fieldType) ;


};

0 comments on commit d02d150

Please sign in to comment.