Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer #4421

Closed
wants to merge 9 commits into from
Prev Previous commit
Next Next commit
Coerce Arrow second timestamps to Parquet millisecond timestamps by d…
…efault
  • Loading branch information
tpboudreau committed Jun 18, 2019
commit fa0c4828f2599fe362e1821ae9d82cf0a16f012c
35 changes: 35 additions & 0 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,41 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
default_writer_properties(), allow_truncation_to_micros));
}

TEST(TestArrowReadWrite, ImplicitSecondToMillisecondTimestampCoercion) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
using ::arrow::schema;

std::vector<bool> is_valid = {true, true, true, false, true, true};

auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);

std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};

std::shared_ptr<Array> a_s, a_ms;
ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);

auto si = schema({field("timestamp", t_s)});
auto sx = schema({field("timestamp", t_ms)});

auto ci = std::make_shared<Column>("timestamp", a_s);
auto cx = std::make_shared<Column>("timestamp", a_ms);

auto ti = Table::Make(si, {ci}); // input
auto tx = Table::Make(sx, {cx}); // expected output
std::shared_ptr<Table> to; // actual output

// default properties (without explicit coercion instructions) used ...
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(ti, false /* use_threads */, ti->num_rows(), {}, &to));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*tx->schema(), *to->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*tx, *to));
}

TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
using ::arrow::ArrayFromVector;

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,10 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
}

// The user implicitly wants timestamp data to retain its original time units,
// however the Arrow seconds time unit can not be represented (annotated) in Parquet.
// however the Arrow seconds time unit can not be represented (annotated) in
// Parquet and must be converted to milliseconds.
if (type.unit() == ::arrow::TimeUnit::SECOND) {
return Status::NotImplemented(
"Only MILLI, MICRO, and NANO units supported for Arrow timestamps with Parquet.");
*annotation = TimestampAnnotationFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could this code path be reached (do we automatically cast to millis/micros)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is reached if your have an Arrow timestamp in seconds and don't give any instructions on how to convert/coerce it.

Currently (before this patch), if you don't ask for int96 and you don't supply an explicit coercion instruction, you fall to here, where the converter is willing to truncate your Arrow nanoseconds to Parquet microseconds, but is unwilling (for some reason) to expand your Arrow seconds to, say, Parquet milliseconds. I don't know the history or reasoning for this latter behavior, so my bias for conserving existing behavior whenever in doubt kicked in, and I retained it.

But it would be both easy and safe (it seems to me) to default to converting seconds to milliseconds (rather than failing) and I've done that in commit c2ac4cc. If you don't approve of this behavior change, we can (more or less) just revert that commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine to me


return Status::OK();
Expand Down
21 changes: 15 additions & 6 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ class ArrowColumnWriter {
const int16_t* rep_levels);

Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels);
const int16_t* def_levels, const int16_t* rep_levels,
const ArrowWriterProperties& properties);

template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
Expand Down Expand Up @@ -657,8 +658,15 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level
def_levels, rep_levels);
} else if (ctx_->properties->coerce_timestamps_enabled() &&
(source_type.unit() != ctx_->properties->coerce_timestamps_unit())) {
// Convert timestamps to requested unit
return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
// User explicitly requested conversion to specific units
return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels,
*(ctx_->properties));
} else if (source_type.unit() == TimeUnit::SECOND) {
// Absent superseding user instructions, timestamps in seconds are implicitly
// converted to milliseconds
std::shared_ptr<ArrowWriterProperties> properties =
(ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build();
return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
} else {
// No casting of timestamps is required, take the fast path
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
Expand Down Expand Up @@ -694,7 +702,8 @@ static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {

Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
const int16_t* def_levels,
const int16_t* rep_levels) {
const int16_t* rep_levels,
const ArrowWriterProperties& properties) {
int64_t* buffer;
RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));

Expand All @@ -704,9 +713,9 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_
const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
auto source_unit = source_type.unit();

TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_unit();
TimeUnit::type target_unit = properties.coerce_timestamps_unit();
auto target_type = ::arrow::timestamp(target_unit);
bool truncation_allowed = ctx_->properties->truncated_timestamps_allowed();
bool truncation_allowed = properties.truncated_timestamps_allowed();

auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
Expand Down