Skip to content

Commit

Permalink
ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet r…
Browse files Browse the repository at this point in the history
…eader/writer

This PR causes the Arrow Parquet I/O facility to recognize Parquet logical annotations (parquet.thrift LogicalType's) on read and to generate them (rather than converted types) on write.

Since Parquet logical annotations include nanosecond timestamps, this patch satisfies the feature requested in ARROW-3729.

Author: TP Boudreau <[email protected]>

Closes apache#4421 from tpboudreau/ARROW-3729 and squashes the following commits:

464f348 <TP Boudreau> Fix failing python parquet tests
4cef109 <TP Boudreau> Temporarily remove validity masks from test
81dc742 <TP Boudreau> Reintroduce Parquet version 1.0 behavior for timestamps
fa0c482 <TP Boudreau> Coerce Arrow second timestamps to Parquet millisecond timestamps by default
be613c4 <TP Boudreau> Smallish code review fixes
b5ebdbd <TP Boudreau> Set Parquet isAdjustedToUTC to true for timestamps with non-empty timezones
c5de420 <TP Boudreau> Revert "Preserve Arrow timestamp timezones using Parquet file metadata"
0336eee <TP Boudreau> Preserve Arrow timestamp timezones using Parquet file metadata
f937943 <TP Boudreau> Use logical annotations in Arrow Parquet reader/writer
  • Loading branch information
tpboudreau authored and wesm committed Jun 19, 2019
1 parent 09c535c commit d54425d
Show file tree
Hide file tree
Showing 7 changed files with 1,017 additions and 292 deletions.
407 changes: 337 additions & 70 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc

Large diffs are not rendered by default.

227 changes: 214 additions & 13 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ using arrow::Field;
using arrow::TimeUnit;

using ParquetType = parquet::Type;
using parquet::LogicalAnnotation;
using parquet::LogicalType;
using parquet::Repetition;
using parquet::schema::GroupNode;
Expand Down Expand Up @@ -115,12 +116,14 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MICROS));
arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
ParquetType::INT32, LogicalType::DATE));
Expand Down Expand Up @@ -168,6 +171,103 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<const LogicalAnnotation> annotation;
parquet::Type::type physical_type;
int physical_length;
std::shared_ptr<::arrow::DataType> datatype;
};

std::vector<FieldConstructionArguments> cases = {
{"string", LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1,
::arrow::utf8()},
{"enum", LogicalAnnotation::Enum(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"decimal(8, 2)", LogicalAnnotation::Decimal(8, 2), ParquetType::INT32, -1,
::arrow::decimal(8, 2)},
{"decimal(16, 4)", LogicalAnnotation::Decimal(16, 4), ParquetType::INT64, -1,
::arrow::decimal(16, 4)},
{"decimal(32, 8)", LogicalAnnotation::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 16, ::arrow::decimal(32, 8)},
{"date", LogicalAnnotation::Date(), ParquetType::INT32, -1, ::arrow::date32()},
{"time(ms)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)},
{"time(us)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)},
{"time(ns)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)},
{"time(ms)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)},
{"time(us)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)},
{"time(ns)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)},
{"timestamp(true, ms)",
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC")},
{"timestamp(true, us)",
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")},
{"timestamp(true, ns)",
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")},
{"timestamp(false, ms)",
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI)},
{"timestamp(false, us)",
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO)},
{"timestamp(false, ns)",
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO)},
{"int(8, false)", LogicalAnnotation::Int(8, false), ParquetType::INT32, -1,
::arrow::uint8()},
{"int(8, true)", LogicalAnnotation::Int(8, true), ParquetType::INT32, -1,
::arrow::int8()},
{"int(16, false)", LogicalAnnotation::Int(16, false), ParquetType::INT32, -1,
::arrow::uint16()},
{"int(16, true)", LogicalAnnotation::Int(16, true), ParquetType::INT32, -1,
::arrow::int16()},
{"int(32, false)", LogicalAnnotation::Int(32, false), ParquetType::INT32, -1,
::arrow::uint32()},
{"int(32, true)", LogicalAnnotation::Int(32, true), ParquetType::INT32, -1,
::arrow::int32()},
{"int(64, false)", LogicalAnnotation::Int(64, false), ParquetType::INT64, -1,
::arrow::uint64()},
{"int(64, true)", LogicalAnnotation::Int(64, true), ParquetType::INT64, -1,
::arrow::int64()},
{"json", LogicalAnnotation::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"bson", LogicalAnnotation::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"interval", LogicalAnnotation::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12,
::arrow::fixed_size_binary(12)},
{"uuid", LogicalAnnotation::UUID(), ParquetType::FIXED_LEN_BYTE_ARRAY, 16,
::arrow::fixed_size_binary(16)},
{"none", LogicalAnnotation::None(), ParquetType::BOOLEAN, -1, ::arrow::boolean()},
{"none", LogicalAnnotation::None(), ParquetType::INT32, -1, ::arrow::int32()},
{"none", LogicalAnnotation::None(), ParquetType::INT64, -1, ::arrow::int64()},
{"none", LogicalAnnotation::None(), ParquetType::FLOAT, -1, ::arrow::float32()},
{"none", LogicalAnnotation::None(), ParquetType::DOUBLE, -1, ::arrow::float64()},
{"none", LogicalAnnotation::None(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
{"none", LogicalAnnotation::None(), ParquetType::FIXED_LEN_BYTE_ARRAY, 64,
::arrow::fixed_size_binary(64)},
{"null", LogicalAnnotation::Null(), ParquetType::BYTE_ARRAY, -1, ::arrow::null()},
};

std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

for (const FieldConstructionArguments& c : cases) {
parquet_fields.push_back(PrimitiveNode::Make(
c.name, Repetition::OPTIONAL, c.annotation, c.physical_type, c.physical_length));
arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype));
}

ASSERT_OK(ConvertSchema(parquet_fields));
auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -586,6 +686,7 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) {

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -686,12 +787,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));

parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
LogicalType::TIMESTAMP_MICROS));
arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));

parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
Expand All @@ -714,6 +817,113 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ArrowFields) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
std::shared_ptr<const LogicalAnnotation> annotation;
parquet::Type::type physical_type;
int physical_length;
};

std::vector<FieldConstructionArguments> cases = {
{"boolean", ::arrow::boolean(), LogicalAnnotation::None(), ParquetType::BOOLEAN,
-1},
{"binary", ::arrow::binary(), LogicalAnnotation::None(), ParquetType::BYTE_ARRAY,
-1},
{"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalAnnotation::None(),
ParquetType::FIXED_LEN_BYTE_ARRAY, 64},
{"uint8", ::arrow::uint8(), LogicalAnnotation::Int(8, false), ParquetType::INT32,
-1},
{"int8", ::arrow::int8(), LogicalAnnotation::Int(8, true), ParquetType::INT32, -1},
{"uint16", ::arrow::uint16(), LogicalAnnotation::Int(16, false), ParquetType::INT32,
-1},
{"int16", ::arrow::int16(), LogicalAnnotation::Int(16, true), ParquetType::INT32,
-1},
{"uint32", ::arrow::uint32(), LogicalAnnotation::None(), ParquetType::INT64,
-1}, // Parquet 1.0
{"int32", ::arrow::int32(), LogicalAnnotation::None(), ParquetType::INT32, -1},
{"uint64", ::arrow::uint64(), LogicalAnnotation::Int(64, false), ParquetType::INT64,
-1},
{"int64", ::arrow::int64(), LogicalAnnotation::None(), ParquetType::INT64, -1},
{"float32", ::arrow::float32(), LogicalAnnotation::None(), ParquetType::FLOAT, -1},
{"float64", ::arrow::float64(), LogicalAnnotation::None(), ParquetType::DOUBLE, -1},
{"utf8", ::arrow::utf8(), LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1},
{"decimal(1, 0)", ::arrow::decimal(1, 0), LogicalAnnotation::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal(8, 2), LogicalAnnotation::Decimal(8, 2),
ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
{"decimal(16, 4)", ::arrow::decimal(16, 4), LogicalAnnotation::Decimal(16, 4),
ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
{"decimal(32, 8)", ::arrow::decimal(32, 8), LogicalAnnotation::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
{"time32", ::arrow::time32(::arrow::TimeUnit::MILLI),
LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT32, -1},
{"time64(microsecond)", ::arrow::time64(::arrow::TimeUnit::MICRO),
LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS),
ParquetType::INT64, -1},
{"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1},
{"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1},
{"timestamp(microsecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
ParquetType::INT64, -1},
{"timestamp(microsecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"null", ::arrow::null(), LogicalAnnotation::Null(), ParquetType::INT32, -1}};

std::vector<std::shared_ptr<Field>> arrow_fields;
std::vector<NodePtr> parquet_fields;

for (const FieldConstructionArguments& c : cases) {
arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false));
parquet_fields.push_back(PrimitiveNode::Make(
c.name, Repetition::REQUIRED, c.annotation, c.physical_type, c.physical_length));
}

ASSERT_OK(ConvertSchema(arrow_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
};

std::vector<FieldConstructionArguments> cases = {
{"float16", ::arrow::float16()},
};

for (const FieldConstructionArguments& c : cases) {
auto field = std::make_shared<Field>(c.name, c.datatype);
ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
}
}

TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -809,15 +1019,6 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, UnsupportedTypes) {
std::vector<std::shared_ptr<Field>> unsupported_fields = {
::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))};

for (const auto& field : unsupported_fields) {
ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
}
}

TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,11 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read,
TRANSFER_DATA(::arrow::TimestampType, Int64Type);
} break;
case ::arrow::TimeUnit::NANO: {
TRANSFER_DATA(::arrow::TimestampType, Int96Type);
if (descr_->physical_type() == ::parquet::Type::INT96) {
TRANSFER_DATA(::arrow::TimestampType, Int96Type);
} else {
TRANSFER_DATA(::arrow::TimestampType, Int64Type);
}
} break;
default:
return Status::NotImplemented("TimeUnit not supported");
Expand Down
Loading

0 comments on commit d54425d

Please sign in to comment.