Skip to content

Commit

Permalink
Preserve Arrow timestamp timezones using Parquet file metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
tpboudreau committed Jun 18, 2019
1 parent f937943 commit 0336eee
Show file tree
Hide file tree
Showing 4 changed files with 596 additions and 82 deletions.
280 changes: 275 additions & 5 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ class TestConvertParquetSchema : public ::testing::Test {
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
descr_.Init(schema);
return FromParquetSchema(&descr_, {}, key_value_metadata, &result_schema_);
return FromParquetSchema(&descr_, key_value_metadata, &result_schema_);
}

::arrow::Status ConvertSchema(
const std::vector<NodePtr>& nodes, const std::vector<int>& column_indices,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
descr_.Init(schema);
return FromParquetSchema(&descr_, column_indices, key_value_metadata,
&result_schema_);
}

protected:
Expand Down Expand Up @@ -311,7 +320,7 @@ TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
auto key_value_metadata = std::make_shared<KeyValueMetadata>();
key_value_metadata->Append("foo", "bar");
key_value_metadata->Append("biz", "baz");
ASSERT_OK(ConvertSchema(parquet_fields, key_value_metadata));
ASSERT_OK(ConvertSchema(parquet_fields, {}, key_value_metadata));

auto arrow_metadata = result_schema_->metadata();
ASSERT_EQ("foo", arrow_metadata->key(0));
Expand All @@ -329,12 +338,178 @@ TEST_F(TestConvertParquetSchema, ParquetEmptyKeyValueMetadata) {
arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));

std::shared_ptr<KeyValueMetadata> key_value_metadata = nullptr;
ASSERT_OK(ConvertSchema(parquet_fields, key_value_metadata));
ASSERT_OK(ConvertSchema(parquet_fields, {}, key_value_metadata));

auto arrow_metadata = result_schema_->metadata();
ASSERT_EQ(arrow_metadata, nullptr);
}

TEST_F(TestConvertParquetSchema, ParquetSimpleTimezoneNodes) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
auto parquet_file_metadata = std::make_shared<KeyValueMetadata>();

parquet_fields.push_back(PrimitiveNode::Make(
"ts", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
arrow_fields.push_back(
std::make_shared<Field>("ts", ::arrow::timestamp(TimeUnit::MILLI), false));

parquet_fields.push_back(PrimitiveNode::Make(
"ts:UTC", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
arrow_fields.push_back(std::make_shared<Field>(
"ts:UTC", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make(
"ts:CET", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
parquet_file_metadata->Append("org.apache.arrow.field[ts:CET].timestamp.timezone",
"CET");
arrow_fields.push_back(std::make_shared<Field>(
"ts:CET", ::arrow::timestamp(TimeUnit::MILLI, "CET"), false));

parquet_file_metadata->Append("application.key", "application.value");

ASSERT_OK(ConvertSchema(parquet_fields, parquet_file_metadata));
ASSERT_NO_FATAL_FAILURE(
CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
// Confirm arrow timezone metadata removed, application metadata retained
auto arrow_metadata = result_schema_->metadata();
ASSERT_EQ(1, arrow_metadata->size());
ASSERT_EQ("application.key", arrow_metadata->key(0));
ASSERT_EQ("application.value", arrow_metadata->value(0));
}

static void MakeNestedTimezoneSchemas(std::vector<std::shared_ptr<Field>>& arrow_fields,
std::vector<NodePtr>& parquet_fields,
std::shared_ptr<KeyValueMetadata>& parquet_metadata,
bool unpacked_dictionaries = false) {
{
auto arrow_element = std::make_shared<Field>(
"timestamp:CET", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"), false);
auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
arrow_fields.push_back(std::make_shared<Field>("some_list", arrow_list, false));

auto parquet_element = PrimitiveNode::Make(
"timestamp:CET", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64);
auto parquet_list = GroupNode::Make("list", Repetition::REPEATED, {parquet_element});
parquet_fields.push_back(GroupNode::Make("some_list", Repetition::REQUIRED,
{parquet_list}, LogicalType::LIST));
parquet_metadata->Append(
"org.apache.arrow.field[some_list.list.timestamp:CET].timestamp.timezone", "CET");
}

{
auto arrow_element_1 = std::make_shared<Field>(
"timestamp", ::arrow::timestamp(::arrow::TimeUnit::MILLI), false);
auto arrow_element_2 = std::make_shared<Field>(
"timestamp:UTC", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"), false);
auto arrow_element_3 = std::make_shared<Field>(
"timestamp:CET", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"), false);
auto arrow_elements = {arrow_element_1, arrow_element_2, arrow_element_3};
auto arrow_struct = std::make_shared<::arrow::StructType>(arrow_elements);
auto arrow_group = std::make_shared<Field>("some_struct", arrow_struct, false);
auto arrow_field = std::make_shared<Field>(
"timestamp:NZST", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "NZST"), false);
arrow_fields.push_back(arrow_group);
arrow_fields.push_back(arrow_field);

auto parquet_element_1 = PrimitiveNode::Make(
"timestamp", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64);
auto parquet_element_2 = PrimitiveNode::Make(
"timestamp:UTC", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64);
auto parquet_element_3 = PrimitiveNode::Make(
"timestamp:CET", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64);
auto parquet_group =
GroupNode::Make("some_struct", Repetition::REQUIRED,
{parquet_element_1, parquet_element_2, parquet_element_3});
auto parquet_field = PrimitiveNode::Make(
"timestamp:NZST", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64);
parquet_fields.push_back(parquet_group);
parquet_fields.push_back(parquet_field);
parquet_metadata->Append(
"org.apache.arrow.field[some_struct.timestamp:CET].timestamp.timezone", "CET");
parquet_metadata->Append("org.apache.arrow.field[timestamp:NZST].timestamp.timezone",
"NZST");
}

{
if (unpacked_dictionaries) {
// The Parquet nodes immediately below are converted to these non-dictionary Arrow
// fields
arrow_fields.push_back(std::make_shared<Field>(
"dictionary_timestamp", ::arrow::timestamp(::arrow::TimeUnit::MILLI), false));
arrow_fields.push_back(std::make_shared<Field>(
"dictionary_timestamp:UTC", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
false));
arrow_fields.push_back(std::make_shared<Field>(
"dictionary_timestamp:CET", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
false));
} else {
// When converted, these Arrow dictionaries are unpacked and stored as the Parquet
// nodes immediately below
arrow_fields.push_back(std::make_shared<Field>(
"dictionary_timestamp",
::arrow::dictionary(::arrow::int16(), ::arrow::timestamp(TimeUnit::MILLI)),
false));
arrow_fields.push_back(std::make_shared<Field>(
"dictionary_timestamp:UTC",
::arrow::dictionary(::arrow::int16(),
::arrow::timestamp(TimeUnit::MILLI, "UTC")),
false));
arrow_fields.push_back(std::make_shared<Field>(
"dictionary_timestamp:CET",
::arrow::dictionary(::arrow::int16(),
::arrow::timestamp(TimeUnit::MILLI, "CET")),
false));
}

parquet_fields.push_back(PrimitiveNode::Make(
"dictionary_timestamp", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
parquet_fields.push_back(PrimitiveNode::Make(
"dictionary_timestamp:UTC", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
parquet_fields.push_back(PrimitiveNode::Make(
"dictionary_timestamp:CET", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
parquet_metadata->Append(
"org.apache.arrow.field[dictionary_timestamp:CET].timestamp.timezone", "CET");
}

return;
}

TEST_F(TestConvertParquetSchema, ParquetNestedTimezoneNodes) {
std::vector<NodePtr> parquet_fields;
std::shared_ptr<KeyValueMetadata> parquet_metadata =
std::make_shared<KeyValueMetadata>();
std::vector<std::shared_ptr<Field>> arrow_fields;

MakeNestedTimezoneSchemas(arrow_fields, parquet_fields, parquet_metadata, true);

ASSERT_OK(ConvertSchema(parquet_fields, parquet_metadata));
ASSERT_NO_FATAL_FAILURE(
CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
}

TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -733,7 +908,10 @@ class TestConvertArrowSchema : public ::testing::Test {
public:
virtual void SetUp() {}

void CheckFlatSchema(const std::vector<NodePtr>& nodes) {
void CheckFlatSchema(
const std::vector<NodePtr>& nodes,
const std::shared_ptr<const KeyValueMetadata>& expected_metadata = nullptr,
bool expect_strict_metadata_equality = true) {
NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes);
const GroupNode* expected_schema_node =
static_cast<const GroupNode*>(schema_node.get());
Expand All @@ -746,18 +924,35 @@ class TestConvertArrowSchema : public ::testing::Test {
auto rhs = expected_schema_node->field(i);
EXPECT_TRUE(lhs->Equals(rhs.get()));
}

if (expected_metadata) {
if (expect_strict_metadata_equality) {
ASSERT_TRUE(result_metadata_->Equals(*expected_metadata));
} else {
// check for expected keys and values, indifferent to order in container
ASSERT_EQ(result_metadata_->size(), expected_metadata->size());
for (int i = 0; i < expected_metadata->size(); ++i) {
int index = result_metadata_->FindKey(expected_metadata->key(i));
ASSERT_NE(index, -1) << "key '" << expected_metadata->key(i)
<< "' unexpectedly missing from result metadata";
ASSERT_EQ(result_metadata_->value(index), expected_metadata->value(i));
}
}
}
}

::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::default_writer_properties();
return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_metadata_,
&result_schema_);
}

protected:
std::shared_ptr<::arrow::Schema> arrow_schema_;
std::shared_ptr<SchemaDescriptor> result_schema_;
std::shared_ptr<const KeyValueMetadata> result_metadata_;
};

TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
Expand Down Expand Up @@ -924,6 +1119,72 @@ TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
}
}

TEST_F(TestConvertArrowSchema, ArrowSimpleTimezoneFields) {
struct TimezoneFieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
bool is_adjusted_to_utc;
bool inserts_metadata;
std::pair<std::string, std::string> key_value_metadata;
};

std::vector<TimezoneFieldConstructionArguments> cases = {
{"timestamp", ::arrow::timestamp(::arrow::TimeUnit::MILLI), false, false, {"", ""}},
{"timestamp:UTC",
::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
true,
false,
{"", ""}},
{"timestamp:utc",
::arrow::timestamp(::arrow::TimeUnit::MILLI, "utc"),
true,
false,
{"", ""}},
{"timestamp:CET",
::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
false,
true,
{"org.apache.arrow.field[timestamp:CET].timestamp.timezone", "CET"}},
{"timestamp:NZST",
::arrow::timestamp(::arrow::TimeUnit::MILLI, "NZST"),
false,
true,
{"org.apache.arrow.field[timestamp:NZST].timestamp.timezone", "NZST"}},
};

std::vector<std::shared_ptr<Field>> arrow_fields;
std::vector<NodePtr> parquet_fields;
auto parquet_file_metadata = std::make_shared<KeyValueMetadata>();

for (const TimezoneFieldConstructionArguments& c : cases) {
arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false));
parquet_fields.push_back(PrimitiveNode::Make(
c.name, Repetition::REQUIRED,
LogicalAnnotation::Timestamp(c.is_adjusted_to_utc,
LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
if (c.inserts_metadata) {
parquet_file_metadata->Append(c.key_value_metadata.first,
c.key_value_metadata.second);
}
}

ASSERT_OK(ConvertSchema(arrow_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields, parquet_file_metadata, false));
}

TEST_F(TestConvertArrowSchema, ArrowNestedTimezoneFields) {
std::vector<std::shared_ptr<Field>> arrow_fields;
std::vector<NodePtr> parquet_fields;
std::shared_ptr<KeyValueMetadata> parquet_metadata =
std::make_shared<KeyValueMetadata>();

MakeNestedTimezoneSchemas(arrow_fields, parquet_fields, parquet_metadata);

ASSERT_OK(ConvertSchema(arrow_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields, parquet_metadata, false));
}

TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand All @@ -949,6 +1210,15 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
arrow_fields.push_back(std::make_shared<Field>(
"date64", ::arrow::dictionary(::arrow::int8(), ::arrow::date64()), false));

parquet_fields.push_back(PrimitiveNode::Make(
"timestamp", Repetition::REQUIRED,
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp",
::arrow::dictionary(::arrow::int8(), ::arrow::timestamp(TimeUnit::MILLI, "UTC")),
false));

parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
arrow_fields.push_back(std::make_shared<Field>(
Expand Down
Loading

0 comments on commit 0336eee

Please sign in to comment.