Skip to content

Commit

Permalink
Reintroduce Parquet version 1.0 behavior for timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
tpboudreau committed Jun 13, 2019
1 parent c2ac4cc commit 26d7c71
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 53 deletions.
226 changes: 204 additions & 22 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,49 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual
}
}

void DoConfiguredRoundtrip(
const std::shared_ptr<Table>& table, int64_t row_group_size,
std::shared_ptr<Table>* out,
const std::shared_ptr<::parquet::WriterProperties>& parquet_properties =
default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Buffer> buffer;

auto sink = CreateOutputStream();
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
row_group_size, parquet_properties, arrow_properties));
ASSERT_OK_NO_THROW(sink->Finish(&buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
ASSERT_OK_NO_THROW(reader->ReadTable(out));
}

void CheckConfiguredRoundtrip(
const std::shared_ptr<Table>& input_table,
const std::shared_ptr<Table>& expected_table = nullptr,
const std::shared_ptr<::parquet::WriterProperties>& parquet_properties =
default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Table> actual_table;
ASSERT_NO_FATAL_FAILURE(DoConfiguredRoundtrip(input_table, input_table->num_rows(),
&actual_table, parquet_properties,
arrow_properties));
if (expected_table) {
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*actual_table->schema(), *expected_table->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *expected_table));
} else {
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*actual_table->schema(), *input_table->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *input_table));
}
}

void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
int64_t row_group_size, const std::vector<int>& column_subset,
std::shared_ptr<Table>* out,
Expand Down Expand Up @@ -1209,7 +1252,7 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4));
}

void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool expected = false) {
using ::arrow::ArrayFromVector;

std::vector<bool> is_valid = {true, true, true, false, true, true};
Expand All @@ -1219,12 +1262,13 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
auto f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
auto f3_x = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
auto f6 = field("f6", ::arrow::time64(TimeUnit::NANO));

std::shared_ptr<::arrow::Schema> schema(
new ::arrow::Schema({f0, f1, f2, f3, f4, f5, f6}));
new ::arrow::Schema({f0, f1, f2, (expected ? f3_x : f3), f4, f5, f6}));

std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};
Expand All @@ -1235,22 +1279,27 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
1489272, 1489272, 1489273};

std::shared_ptr<Array> a0, a1, a2, a3, a4, a5, a6;
std::shared_ptr<Array> a0, a1, a2, a3, a3_x, a4, a5, a6;
ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
&a1);
ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
&a2);
ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_ns_values,
&a3);
ArrayFromVector<::arrow::TimestampType, int64_t>(f3_x->type(), is_valid, t64_us_values,
&a3_x);
ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);
ArrayFromVector<::arrow::Time64Type, int64_t>(f6->type(), is_valid, t64_ns_values, &a6);

std::vector<std::shared_ptr<::arrow::Column>> columns = {
std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5),
std::make_shared<Column>("f0", a0),
std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2),
std::make_shared<Column>("f3", (expected ? a3_x : a3)),
std::make_shared<Column>("f4", a4),
std::make_shared<Column>("f5", a5),
std::make_shared<Column>("f6", a6)};

*out = Table::Make(schema, columns);
Expand All @@ -1263,7 +1312,7 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));

MakeDateTimeTypesTable(&table);
MakeDateTimeTypesTable(&table, true); // build expected result
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*table->schema(), *result->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
Expand Down Expand Up @@ -1399,21 +1448,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*ex_micro_result->schema(), *micro_result->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_micro_result, *micro_result));

// Result when coercing to nanoseconds
auto s4 = ::arrow::schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
field("f_ns", t_ns)});
auto ex_nano_result = Table::Make(
s4,
{std::make_shared<Column>("f_s", a_ns), std::make_shared<Column>("f_ms", a_ns),
std::make_shared<Column>("f_us", a_ns), std::make_shared<Column>("f_ns", a_ns)});
std::shared_ptr<Table> nano_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, &nano_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build()));
ASSERT_NO_FATAL_FAILURE(
::arrow::AssertSchemaEqual(*ex_nano_result->schema(), *nano_result->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_nano_result, *nano_result));
}

TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
Expand Down Expand Up @@ -1542,6 +1576,154 @@ TEST(TestArrowReadWrite, ImplicitSecondToMillisecondTimestampCoercion) {
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*tx, *to));
}

TEST(TestArrowReadWrite, ParquetVersionTimestampDifferences) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
using ::arrow::schema;

auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
auto t_ns = ::arrow::timestamp(TimeUnit::NANO);

const int N = 10;
int64_t instant = INT64_C(1262304000); // 2010-01-01T00:00:00 seconds offset
std::vector<bool> mask;
std::vector<int64_t> d_s, d_ms, d_us, d_ns;
for (int i = 0; i < N; ++i) {
mask.push_back(i % 3 == 1);
d_s.push_back(instant);
d_ms.push_back(instant * INT64_C(1000));
d_us.push_back(instant * INT64_C(1000000));
d_ns.push_back(instant * INT64_C(1000000000));
instant += 1;
}

std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, mask, d_s, &a_s);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, mask, d_ms, &a_ms);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, mask, d_us, &a_us);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, mask, d_ns, &a_ns);

auto c_s = std::make_shared<Column>("ts:s", a_s);
auto c_ms = std::make_shared<Column>("ts:ms", a_ms);
auto c_us = std::make_shared<Column>("ts:us", a_us);
auto c_ns = std::make_shared<Column>("ts:ns", a_ns);

auto input_schema = schema({field("ts:s", t_s), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_ns)});
auto input_table = Table::Make(input_schema, {c_s, c_ms, c_us, c_ns});

auto parquet_version_1_properties = default_writer_properties();
auto parquet_version_2_properties = ::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->build();

{
// Using Parquet version 1.0 defaults, seconds should be coerced to milliseconds
// and nanoseconds should be coerced to microseconds
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_us)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties));
}
{
// Using Parquet version 2.0 defaults, seconds should be coerced to milliseconds
// and nanoseconds should be retained
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties));
}

auto arrow_coerce_to_seconds_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::SECOND)->build();
auto arrow_coerce_to_millis_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build();
auto arrow_coerce_to_micros_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build();
auto arrow_coerce_to_nanos_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build();
{
// Neither Parquet version 1.0 nor 2.0 allow coercing to seconds
auto sink = CreateOutputStream();
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(NotImplemented, WriteTable(*input_table, ::arrow::default_memory_pool(),
sink, N, parquet_version_1_properties,
arrow_coerce_to_seconds_properties));
ASSERT_RAISES(NotImplemented, WriteTable(*input_table, ::arrow::default_memory_pool(),
sink, N, parquet_version_2_properties,
arrow_coerce_to_seconds_properties));
}
{
// Using Parquet version 1.0, coercing to milliseconds or microseconds is allowed
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_ms), field("ts:ns", t_ms)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_coerce_to_millis_properties));

expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
field("ts:us", t_us), field("ts:ns", t_us)});
expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_coerce_to_micros_properties));
}
{
// Using Parquet version 2.0, coercing to milliseconds or microseconds is allowed
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_ms), field("ts:ns", t_ms)});
auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_millis_properties));

expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
field("ts:us", t_us), field("ts:ns", t_us)});
expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_micros_properties));
}
{
// Using Parquet version 1.0, coercing to (int64) nanoseconds is not allowed
auto sink = CreateOutputStream();
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(NotImplemented, WriteTable(*input_table, ::arrow::default_memory_pool(),
sink, N, parquet_version_1_properties,
arrow_coerce_to_nanos_properties));
}
{
// Using Parquet version 2.0, coercing to (int64) nanoseconds is allowed
auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
field("ts:us", t_ns), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_nanos_properties));
}

auto arrow_enable_int96_properties =
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build();
{
// For either Parquet version, coercing to nanoseconds is allowed if Int96
// storage is used
auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
field("ts:us", t_ns), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_enable_int96_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_enable_int96_properties));
}
}

TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
using ::arrow::ArrayFromVector;

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::NANOS),
LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
Expand All @@ -882,7 +882,7 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(millisecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
Expand All @@ -891,7 +891,7 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"timestamp(nanosecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "CET"),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
ParquetType::INT64, -1},
{"null", ::arrow::null(), LogicalAnnotation::Null(), ParquetType::INT32, -1}};

Expand Down
Loading

0 comments on commit 26d7c71

Please sign in to comment.