Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer #4421

Closed
wants to merge 9 commits into from
407 changes: 337 additions & 70 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc

Large diffs are not rendered by default.

227 changes: 214 additions & 13 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ using arrow::Field;
using arrow::TimeUnit;

using ParquetType = parquet::Type;
using parquet::LogicalAnnotation;
using parquet::LogicalType;
using parquet::Repetition;
using parquet::schema::GroupNode;
Expand Down Expand Up @@ -115,12 +116,14 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MICROS));
arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
ParquetType::INT32, LogicalType::DATE));
Expand Down Expand Up @@ -168,6 +171,103 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<const LogicalAnnotation> annotation;
parquet::Type::type physical_type;
int physical_length;
std::shared_ptr<::arrow::DataType> datatype;
};

std::vector<FieldConstructionArguments> cases = {
{"string", LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1,
::arrow::utf8()},
{"enum", LogicalAnnotation::Enum(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"decimal(8, 2)", LogicalAnnotation::Decimal(8, 2), ParquetType::INT32, -1,
::arrow::decimal(8, 2)},
{"decimal(16, 4)", LogicalAnnotation::Decimal(16, 4), ParquetType::INT64, -1,
::arrow::decimal(16, 4)},
{"decimal(32, 8)", LogicalAnnotation::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 16, ::arrow::decimal(32, 8)},
{"date", LogicalAnnotation::Date(), ParquetType::INT32, -1, ::arrow::date32()},
{"time(ms)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)},
{"time(us)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)},
{"time(ns)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)},
{"time(ms)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)},
{"time(us)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)},
{"time(ns)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)},
{"timestamp(true, ms)",
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC")},
{"timestamp(true, us)",
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")},
{"timestamp(true, ns)",
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")},
{"timestamp(false, ms)",
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI)},
{"timestamp(false, us)",
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO)},
{"timestamp(false, ns)",
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO)},
{"int(8, false)", LogicalAnnotation::Int(8, false), ParquetType::INT32, -1,
::arrow::uint8()},
{"int(8, true)", LogicalAnnotation::Int(8, true), ParquetType::INT32, -1,
::arrow::int8()},
{"int(16, false)", LogicalAnnotation::Int(16, false), ParquetType::INT32, -1,
::arrow::uint16()},
{"int(16, true)", LogicalAnnotation::Int(16, true), ParquetType::INT32, -1,
::arrow::int16()},
{"int(32, false)", LogicalAnnotation::Int(32, false), ParquetType::INT32, -1,
::arrow::uint32()},
{"int(32, true)", LogicalAnnotation::Int(32, true), ParquetType::INT32, -1,
::arrow::int32()},
{"int(64, false)", LogicalAnnotation::Int(64, false), ParquetType::INT64, -1,
::arrow::uint64()},
{"int(64, true)", LogicalAnnotation::Int(64, true), ParquetType::INT64, -1,
::arrow::int64()},
{"json", LogicalAnnotation::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"bson", LogicalAnnotation::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"interval", LogicalAnnotation::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12,
::arrow::fixed_size_binary(12)},
{"uuid", LogicalAnnotation::UUID(), ParquetType::FIXED_LEN_BYTE_ARRAY, 16,
::arrow::fixed_size_binary(16)},
{"none", LogicalAnnotation::None(), ParquetType::BOOLEAN, -1, ::arrow::boolean()},
{"none", LogicalAnnotation::None(), ParquetType::INT32, -1, ::arrow::int32()},
{"none", LogicalAnnotation::None(), ParquetType::INT64, -1, ::arrow::int64()},
{"none", LogicalAnnotation::None(), ParquetType::FLOAT, -1, ::arrow::float32()},
{"none", LogicalAnnotation::None(), ParquetType::DOUBLE, -1, ::arrow::float64()},
{"none", LogicalAnnotation::None(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"none", LogicalAnnotation::None(), ParquetType::FIXED_LEN_BYTE_ARRAY, 64,
::arrow::fixed_size_binary(64)},
{"null", LogicalAnnotation::Null(), ParquetType::BYTE_ARRAY, -1, ::arrow::null()},
};

std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

for (const FieldConstructionArguments& c : cases) {
parquet_fields.push_back(PrimitiveNode::Make(
c.name, Repetition::OPTIONAL, c.annotation, c.physical_type, c.physical_length));
arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype));
}

ASSERT_OK(ConvertSchema(parquet_fields));
auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -586,6 +686,7 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) {

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -686,12 +787,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MICROS));
arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));

parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
Expand All @@ -714,6 +817,113 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ArrowFields) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
std::shared_ptr<const LogicalAnnotation> annotation;
parquet::Type::type physical_type;
int physical_length;
};

std::vector<FieldConstructionArguments> cases = {
{"boolean", ::arrow::boolean(), LogicalAnnotation::None(), ParquetType::BOOLEAN,
-1},
{"binary", ::arrow::binary(), LogicalAnnotation::None(), ParquetType::BYTE_ARRAY,
-1},
{"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalAnnotation::None(),
ParquetType::FIXED_LEN_BYTE_ARRAY, 64},
{"uint8", ::arrow::uint8(), LogicalAnnotation::Int(8, false), ParquetType::INT32,
-1},
{"int8", ::arrow::int8(), LogicalAnnotation::Int(8, true), ParquetType::INT32, -1},
{"uint16", ::arrow::uint16(), LogicalAnnotation::Int(16, false), ParquetType::INT32,
-1},
{"int16", ::arrow::int16(), LogicalAnnotation::Int(16, true), ParquetType::INT32,
-1},
{"uint32", ::arrow::uint32(), LogicalAnnotation::None(), ParquetType::INT64,
-1}, // Parquet 1.0
{"int32", ::arrow::int32(), LogicalAnnotation::None(), ParquetType::INT32, -1},
{"uint64", ::arrow::uint64(), LogicalAnnotation::Int(64, false), ParquetType::INT64,
-1},
{"int64", ::arrow::int64(), LogicalAnnotation::None(), ParquetType::INT64, -1},
{"float32", ::arrow::float32(), LogicalAnnotation::None(), ParquetType::FLOAT, -1},
{"float64", ::arrow::float64(), LogicalAnnotation::None(), ParquetType::DOUBLE, -1},
{"utf8", ::arrow::utf8(), LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1},
{"decimal(1, 0)", ::arrow::decimal(1, 0), LogicalAnnotation::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal(8, 2), LogicalAnnotation::Decimal(8, 2),
ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
{"decimal(16, 4)", ::arrow::decimal(16, 4), LogicalAnnotation::Decimal(16, 4),
ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
{"decimal(32, 8)", ::arrow::decimal(32, 8), LogicalAnnotation::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
{"time32", ::arrow::time32(::arrow::TimeUnit::MILLI),
LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT32, -1},
{"time64(microsecond)", ::arrow::time64(::arrow::TimeUnit::MICRO),
LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1},
{"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1},
{"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1},
{"timestamp(microsecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1},
{"timestamp(microsecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"null", ::arrow::null(), LogicalAnnotation::Null(), ParquetType::INT32, -1}};

std::vector<std::shared_ptr<Field>> arrow_fields;
std::vector<NodePtr> parquet_fields;

for (const FieldConstructionArguments& c : cases) {
arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false));
parquet_fields.push_back(PrimitiveNode::Make(
c.name, Repetition::REQUIRED, c.annotation, c.physical_type, c.physical_length));
}

ASSERT_OK(ConvertSchema(arrow_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
};

std::vector<FieldConstructionArguments> cases = {
{"float16", ::arrow::float16()},
};

for (const FieldConstructionArguments& c : cases) {
auto field = std::make_shared<Field>(c.name, c.datatype);
ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
}
}

TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -809,15 +1019,6 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, UnsupportedTypes) {
std::vector<std::shared_ptr<Field>> unsupported_fields = {
::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))};

for (const auto& field : unsupported_fields) {
ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
}
}

TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,11 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read,
TRANSFER_DATA(::arrow::TimestampType, Int64Type);
} break;
case ::arrow::TimeUnit::NANO: {
TRANSFER_DATA(::arrow::TimestampType, Int96Type);
if (descr_->physical_type() == ::parquet::Type::INT96) {
TRANSFER_DATA(::arrow::TimestampType, Int96Type);
} else {
TRANSFER_DATA(::arrow::TimestampType, Int64Type);
}
} break;
default:
return Status::NotImplemented("TimeUnit not supported");
Expand Down
Loading