Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer #4421

Closed
wants to merge 9 commits into from
Prev Previous commit
Next Next commit
Reintroduce Parquet version 1.0 behavior for timestamps
  • Loading branch information
tpboudreau committed Jun 18, 2019
commit 81dc742731958e41ff65f63e4c66ab205f12e3f0
226 changes: 204 additions & 22 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,49 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual
}
}

void DoConfiguredRoundtrip(
const std::shared_ptr<Table>& table, int64_t row_group_size,
std::shared_ptr<Table>* out,
const std::shared_ptr<::parquet::WriterProperties>& parquet_properties =
default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Buffer> buffer;

auto sink = CreateOutputStream();
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
row_group_size, parquet_properties, arrow_properties));
ASSERT_OK_NO_THROW(sink->Finish(&buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
ASSERT_OK_NO_THROW(reader->ReadTable(out));
}

void CheckConfiguredRoundtrip(
const std::shared_ptr<Table>& input_table,
const std::shared_ptr<Table>& expected_table = nullptr,
const std::shared_ptr<::parquet::WriterProperties>& parquet_properties =
default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Table> actual_table;
ASSERT_NO_FATAL_FAILURE(DoConfiguredRoundtrip(input_table, input_table->num_rows(),
&actual_table, parquet_properties,
arrow_properties));
if (expected_table) {
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*actual_table->schema(), *expected_table->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *expected_table));
} else {
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*actual_table->schema(), *input_table->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *input_table));
}
}

void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
int64_t row_group_size, const std::vector<int>& column_subset,
std::shared_ptr<Table>* out,
Expand Down Expand Up @@ -1209,7 +1252,7 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4));
}

void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool expected = false) {
using ::arrow::ArrayFromVector;

std::vector<bool> is_valid = {true, true, true, false, true, true};
Expand All @@ -1219,12 +1262,13 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
auto f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
auto f3_x = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
auto f6 = field("f6", ::arrow::time64(TimeUnit::NANO));

std::shared_ptr<::arrow::Schema> schema(
new ::arrow::Schema({f0, f1, f2, f3, f4, f5, f6}));
new ::arrow::Schema({f0, f1, f2, (expected ? f3_x : f3), f4, f5, f6}));

std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};
Expand All @@ -1235,22 +1279,27 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
1489272, 1489272, 1489273};

std::shared_ptr<Array> a0, a1, a2, a3, a4, a5, a6;
std::shared_ptr<Array> a0, a1, a2, a3, a3_x, a4, a5, a6;
ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
&a1);
ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
&a2);
ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_ns_values,
&a3);
ArrayFromVector<::arrow::TimestampType, int64_t>(f3_x->type(), is_valid, t64_us_values,
&a3_x);
ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);
ArrayFromVector<::arrow::Time64Type, int64_t>(f6->type(), is_valid, t64_ns_values, &a6);

std::vector<std::shared_ptr<::arrow::Column>> columns = {
std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5),
std::make_shared<Column>("f0", a0),
std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2),
std::make_shared<Column>("f3", (expected ? a3_x : a3)),
std::make_shared<Column>("f4", a4),
std::make_shared<Column>("f5", a5),
std::make_shared<Column>("f6", a6)};

*out = Table::Make(schema, columns);
Expand All @@ -1263,7 +1312,7 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));

MakeDateTimeTypesTable(&table);
MakeDateTimeTypesTable(&table, true); // build expected result
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*table->schema(), *result->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
Expand Down Expand Up @@ -1399,21 +1448,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*ex_micro_result->schema(), *micro_result->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_micro_result, *micro_result));

// Result when coercing to nanoseconds
auto s4 = ::arrow::schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
field("f_ns", t_ns)});
auto ex_nano_result = Table::Make(
s4,
{std::make_shared<Column>("f_s", a_ns), std::make_shared<Column>("f_ms", a_ns),
std::make_shared<Column>("f_us", a_ns), std::make_shared<Column>("f_ns", a_ns)});
std::shared_ptr<Table> nano_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, &nano_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build()));
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*ex_nano_result->schema(), *nano_result->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_nano_result, *nano_result));
}

TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
Expand Down Expand Up @@ -1542,6 +1576,154 @@ TEST(TestArrowReadWrite, ImplicitSecondToMillisecondTimestampCoercion) {
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*tx, *to));
}

TEST(TestArrowReadWrite, ParquetVersionTimestampDifferences) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
using ::arrow::schema;

auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
auto t_ns = ::arrow::timestamp(TimeUnit::NANO);

const int N = 10;
int64_t instant = INT64_C(1262304000); // 2010-01-01T00:00:00 seconds offset
std::vector<bool> mask;
std::vector<int64_t> d_s, d_ms, d_us, d_ns;
for (int i = 0; i < N; ++i) {
mask.push_back(i % 3 == 1);
d_s.push_back(instant);
d_ms.push_back(instant * INT64_C(1000));
d_us.push_back(instant * INT64_C(1000000));
d_ns.push_back(instant * INT64_C(1000000000));
instant += 1;
}

std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, mask, d_s, &a_s);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, mask, d_ms, &a_ms);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, mask, d_us, &a_us);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, mask, d_ns, &a_ns);

auto c_s = std::make_shared<Column>("ts:s", a_s);
auto c_ms = std::make_shared<Column>("ts:ms", a_ms);
auto c_us = std::make_shared<Column>("ts:us", a_us);
auto c_ns = std::make_shared<Column>("ts:ns", a_ns);

auto input_schema = schema({field("ts:s", t_s), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_ns)});
auto input_table = Table::Make(input_schema, {c_s, c_ms, c_us, c_ns});

auto parquet_version_1_properties = default_writer_properties();
auto parquet_version_2_properties = ::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->build();

{
// Using Parquet version 1.0 defaults, seconds should be coerced to milliseconds
// and nanoseconds should be coerced to microseconds
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_us)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties));
}
{
// Using Parquet version 2.0 defaults, seconds should be coerced to milliseconds
// and nanoseconds should be retained
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties));
}

auto arrow_coerce_to_seconds_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::SECOND)->build();
auto arrow_coerce_to_millis_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build();
auto arrow_coerce_to_micros_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build();
auto arrow_coerce_to_nanos_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build();
{
// Neither Parquet version 1.0 nor 2.0 allow coercing to seconds
auto sink = CreateOutputStream();
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(NotImplemented, WriteTable(*input_table, ::arrow::default_memory_pool(),
sink, N, parquet_version_1_properties,
arrow_coerce_to_seconds_properties));
ASSERT_RAISES(NotImplemented, WriteTable(*input_table, ::arrow::default_memory_pool(),
sink, N, parquet_version_2_properties,
arrow_coerce_to_seconds_properties));
}
{
// Using Parquet version 1.0, coercing to milliseconds or microseconds is allowed
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_ms), field("ts:ns", t_ms)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_coerce_to_millis_properties));

expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
field("ts:us", t_us), field("ts:ns", t_us)});
expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_coerce_to_micros_properties));
}
{
// Using Parquet version 2.0, coercing to milliseconds or microseconds is allowed
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_ms), field("ts:ns", t_ms)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_millis_properties));

expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
field("ts:us", t_us), field("ts:ns", t_us)});
expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_micros_properties));
}
{
// Using Parquet version 1.0, coercing to (int64) nanoseconds is not allowed
auto sink = CreateOutputStream();
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(NotImplemented, WriteTable(*input_table, ::arrow::default_memory_pool(),
sink, N, parquet_version_1_properties,
arrow_coerce_to_nanos_properties));
}
{
// Using Parquet version 2.0, coercing to (int64) nanoseconds is allowed
auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
field("ts:us", t_ns), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_nanos_properties));
}

auto arrow_enable_int96_properties =
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build();
{
// For either Parquet version, coercing to nanoseconds is allowed if Int96
// storage is used
auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
field("ts:us", t_ns), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_enable_int96_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_enable_int96_properties));
}
}

TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
using ::arrow::ArrayFromVector;

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::NANOS),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
Expand All @@ -882,7 +882,7 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
Expand All @@ -891,7 +891,7 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"null", ::arrow::null(), LogicalAnnotation::Null(), ParquetType::INT32, -1}};

Expand Down
Loading