Skip to content

Commit

Permalink
Coerce Arrow second timestamps to Parquet millisecond timestamps by d…
Browse files Browse the repository at this point in the history
…efault
  • Loading branch information
tpboudreau committed Jun 12, 2019
1 parent 94828ed commit c2ac4cc
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
35 changes: 35 additions & 0 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,41 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
default_writer_properties(), allow_truncation_to_micros));
}

TEST(TestArrowReadWrite, ImplicitSecondToMillisecondTimestampCoercion) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
using ::arrow::schema;

std::vector<bool> is_valid = {true, true, true, false, true, true};

auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);

std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};

std::shared_ptr<Array> a_s, a_ms;
ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);

auto si = schema({field("timestamp", t_s)});
auto sx = schema({field("timestamp", t_ms)});

auto ci = std::make_shared<Column>("timestamp", a_s);
auto cx = std::make_shared<Column>("timestamp", a_ms);

auto ti = Table::Make(si, {ci}); // input
auto tx = Table::Make(sx, {cx}); // expected output
std::shared_ptr<Table> to; // actual output

// default properties (without explicit coercion instructions) used ...
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(ti, false /* use_threads */, ti->num_rows(), {}, &to));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*tx->schema(), *to->schema()));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*tx, *to));
}

TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
using ::arrow::ArrayFromVector;

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,10 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
}

// The user implicitly wants timestamp data to retain its original time units,
// however the Arrow seconds time unit can not be represented (annotated) in Parquet.
// however the Arrow seconds time unit can not be represented (annotated) in
// Parquet and must be converted to milliseconds.
if (type.unit() == ::arrow::TimeUnit::SECOND) {
return Status::NotImplemented(
"Only MILLI, MICRO, and NANO units supported for Arrow timestamps with Parquet.");
*annotation = TimestampAnnotationFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI);
}

return Status::OK();
Expand Down
21 changes: 15 additions & 6 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ class ArrowColumnWriter {
const int16_t* rep_levels);

Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels);
const int16_t* def_levels, const int16_t* rep_levels,
const ArrowWriterProperties& properties);

template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
Expand Down Expand Up @@ -656,8 +657,15 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level
def_levels, rep_levels);
} else if (ctx_->properties->coerce_timestamps_enabled() &&
(source_type.unit() != ctx_->properties->coerce_timestamps_unit())) {
// Convert timestamps to requested unit
return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
// User explicitly requested conversion to specific units
return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels,
*(ctx_->properties));
} else if (source_type.unit() == TimeUnit::SECOND) {
// Absent superseding user instructions, timestamps in seconds are implicitly
// converted to milliseconds
std::shared_ptr<ArrowWriterProperties> properties =
(ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build();
return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
} else {
// No casting of timestamps is required, take the fast path
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
Expand Down Expand Up @@ -693,7 +701,8 @@ static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {

Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
const int16_t* def_levels,
const int16_t* rep_levels) {
const int16_t* rep_levels,
const ArrowWriterProperties& properties) {
int64_t* buffer;
RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));

Expand All @@ -703,9 +712,9 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_
const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
auto source_unit = source_type.unit();

TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_unit();
TimeUnit::type target_unit = properties.coerce_timestamps_unit();
auto target_type = ::arrow::timestamp(target_unit);
bool truncation_allowed = ctx_->properties->truncated_timestamps_allowed();
bool truncation_allowed = properties.truncated_timestamps_allowed();

auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
Expand Down

0 comments on commit c2ac4cc

Please sign in to comment.