Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer #4421

Closed
wants to merge 9 commits into from
Closed

Conversation

tpboudreau
Copy link
Contributor

@tpboudreau tpboudreau commented May 31, 2019

This PR causes the Arrow Parquet I/O facility to recognize Parquet logical annotations (parquet.thrift LogicalType's) on read and to generate them (rather than converted types) on write.

Since Parquet logical annotations include nanosecond timestamps, this patch satisfies the feature requested in ARROW-3729.

@tpboudreau

This comment has been minimized.

@wesm wesm changed the title ARROW-3729: Use logical annotations in Arrow Parquet reader/writer ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer May 31, 2019
@tpboudreau

This comment has been minimized.

@tpboudreau tpboudreau changed the title ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer WIP: ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer May 31, 2019
@tpboudreau
Copy link
Contributor Author

Marking this as WIP while I sort through a few issues.

@tpboudreau
Copy link
Contributor Author

Please ignore my earlier comment about temporal types and timezone issues.

After reviewing analogous code in parquet-mr, I think I was incorrect in interpreting the fact that Arrow currently translates Arrow timestamps without timezones to Parquet TIMESTAMP_MILLIS/MICROS as implying that the Arrow timestamps were implicitly UTC. The less attenuated explanation is that Arrow just chose the closest available Parquet type (even though it was lossy), and the Parquet TIMESTAMP_* types probably didn't even imply adjusted to UTC until the parquet.thrift TimestampType LogicalType introduced the concept.

So the type translations are much simpler:

For timestamps:

arrow::timestamp($unit) -> parquet::TimestampAnnotation(false, $unit)
arrow::timestamp($unit, "UTC") -> parquet::TimestampAnnotation(true, $unit)
arrow::timestamp($unit, "anything/else") -> parquet::TimestampAnnotation(false, $unit)

parquet::TimestampAnnotation(false, $unit) -> arrow::timestamp($unit)
parquet::TimestampAnnotation(true, $unit) -> arrow::timestamp($unit, "UTC")

And for times:

arrow::time32/64($unit) -> parquet::TimeAnnotation(false, $unit)

parquet::TimeAnnotation(false, $unit) -> arrow::time32/64($unit)
parquet::TimeAnnotation(true, $unit) -> arrow::time32/64($unit)

@tpboudreau tpboudreau changed the title WIP: ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer Jun 2, 2019
@tpboudreau
Copy link
Contributor Author

This PR is ready for review.

@wesm
Copy link
Member

wesm commented Jun 3, 2019

In

arrow::timestamp($unit, "UTC") -> parquet::TimestampAnnotation(true, $unit)
arrow::timestamp($unit, "anything/else") -> parquet::TimestampAnnotation(false, $unit)

I don't think this is right. If an Arrow timestamp is tz-aware, then it is normalized to UTC. Otherwise isAdjustedToUTC should be false

@tpboudreau
Copy link
Contributor Author

tpboudreau commented Jun 3, 2019

I'll do as you ask, but to me that doesn't seem like the natural interpretation of isAdjustedToUTC. I took it to mean that the timestamps were UTC and, for example, could be safely compared with any other timestamps that show isAdjustedToUTC==true. Your interpretation would have arrow::timestamp(..., "CET") and arrow::timestamp(..., "EST") both recorded as parquet::Timestamp(true, ...).

A second practical problem would be that a round trip thru parquet would lose information:
arrow::timestamp(..., "CET") -> parquet::Timestamp(true, ...) -> arrow::timestamp(..., timezone?). Either you would lose timezone awaredness (if timezone := "") or you'd collapse all timestamps to the same timezone (if, say, timezone := "UTC").

But I could certainly be missing something.

EDIT: small correction in the last sentence -- you couldn't lose timezone awaredness (we'd never assign the empty string), but you would lose information by collapsing all original timezones to one timezone string.

@wesm
Copy link
Member

wesm commented Jun 3, 2019

Parquet and Arrow are semantically distinct in this regard. Parquet does not have a "with timezone" concept, so if we want to preserve the timezone through Parquet round trip we have to use custom schema metadata.

So Arrow has 3 different cases

  • No time zone, no presumption of UTC normalization
  • UTC time zone, values UTC-normalized
  • Some other time zone, values UTC-normalized

I think there is still some value in communicating UTC-normalization in the third case to other Parquet consumers, otherwise they may interpret the data as localtime which is not what we want.

@wesm
Copy link
Member

wesm commented Jun 3, 2019

cc to @xhochy for additional thoughts. If everyone disagrees with me then I suppose that's OK =)

@tpboudreau
Copy link
Contributor Author

I hadn't considered the possibility of using custom metadata to stash the original timezone -- that's a good idea. It still seems to me that adjusted to UTC flag should be false in that third case, but I'll gladly set it however you all decide.

@tpboudreau
Copy link
Contributor Author

tpboudreau commented Jun 4, 2019

I've looked quickly into stashing Arrow timestamp timezones as key-value metadata and, while I think it's worthwhile, it's not a trivial addition. So I'm changing this PR back to WIP while I sort thru it. (I'm not able to work on this immediately.)

My initial plan is to add keys like "arrow:FIELDNAME.timestamp.timezone"="..." (with crude namespacing) to the file metadata just before writing parquet files and to strip those keys out of the metadata immediately after use on reading parquet files. I'd do this to avoid having the user metadata get cluttered with internal, non-application junk and also hopefully to avoid any need to maintain type information in multiple places if, say, field types change, fields are renamed, maybe names reused, etc.

I realize that's a fairly vague sketch, but if anything jumps out as objectionable, please let me know. Otherwise I'll update the PR when I can.

@tpboudreau tpboudreau changed the title ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer WIP: ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer Jun 4, 2019
@tpboudreau
Copy link
Contributor Author

This PR is ready for review (again, sorry).

Two quick notes:

  1. Preserving timezones. The changes required to preserve Arrow timestamp timezone strings turned out to be imperfect and a bit intrusive:
  • Intrusive because, since Parquet doesn't have per node key-value metadata, the only available option is to store the extra timezone information in the file level metadata. This required transitively passing a map for accumulating the metadata as the Parquet schema is built on write (and similar for building an Arrow schema on read), and thus slightly altering several internal function signatures and requiring the introduction a new overload for the public ToParquetSchema().
  • Imperfect because, since there is no guarantee that Arrow field names are unique (correct?), the best I could see to do is to use the full column path name for each field/node as part of its metadata key and to stop processing if a key collision was detected.

These timezone preservation changes are in an isolated commit, so they can be reverted if the committers decide not to include this functionality.

  1. isAdjustedToUTC flag. Since I haven't heard any further word on it, for now I've left the code so that, when writing from Arrow to Parquet, the Parquet timestamp's isAdjustedToUTC flag is set to true only for Arrow timestamps that have a "UTC" timezone (and false if any other timezone or no timezone is present). I continue to think that's the correct action (otherwise any Parquet reader other than the Arrow reader could see timestamps that might originally have had several different timezones all uniformly showing isAdjustedToUTC==true, which IMHO would be misleading). But this decision is above my pay-grade, so I'll make any necessary changes you all decide.

@tpboudreau tpboudreau changed the title WIP: ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer Jun 8, 2019
@wesm
Copy link
Member

wesm commented Jun 10, 2019

Thanks @tpboudreau -- I will review this and get you some feedback. The time zone issue is annoying, we might want to develop a codified solution for serializing Arrow time zones in the schema (so other Parquet producers can produce the same metadata)

@tpboudreau
Copy link
Contributor Author

Thanks @wesm.

I think we've been talking past each other on the issue of setting the adjustedToUTC flag in Parquet, and I think I've finally figured out the source of my confusion: are you saying that users of the Arrow c++ library are always expected to present timestamp values that have been converted to UTC when they create timestamp typed columns with any timezone (and therefore that when these values are stored they actually always are UTC values regardless of the timezone parameter)?

This possibility had not occurred to me before because (a) AFAICT it's not mentioned in the C++ documentation; and (b) it seems to reduce the timestamp timezone parameter in the C++ API to a mere memo (about what the values were before conversion, or how they should be rendered on display, for example) not a parameter essential to correct interpretation of the data values in the column like all other Arrow type parameters.

If the answer to the question above is "yes", then of course you're correct about the proper setting of the isAdjustedToUTC flag. If so, I propose the following to move this patch along more quickly: (1) I'll revert the commit that adds code for preserving the Arrow timezone as Parquet file metadata -- since it's mostly an advisory parameter, it's preservation across a Parquet roundtrip is probably not worth the cost in code complexity; (2) then I'll alter the remaining patch (still containing the vast bulk of the PR's benefit) to set the UTC flag to true if any timezone is present (iow, your earlier direction). Does this sound right to you?

@wesm
Copy link
Member

wesm commented Jun 10, 2019

are you saying that users of the Arrow c++ library are always expected to present timestamp values that have been converted to UTC when they create timestamp typed columns with any timezone (and therefore that when these values are stored they actually always are UTC values regardless of the timezone parameter)?

Yes. See comment in Schema.fbs

https://github.com/apache/arrow/blob/master/format/Schema.fbs#L177

The plan you described sounds good to me. It seems that as a result we'll drop the timezone metadata in the round trip for now, but the data will be preserved. I think it's fine if it's returned to the user as UTC time zone for now

@codecov-io
Copy link

Codecov Report

Merging #4421 into master will increase coverage by 1.18%.
The diff coverage is 86.98%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #4421      +/-   ##
==========================================
+ Coverage   88.27%   89.46%   +1.18%     
==========================================
  Files         846      645     -201     
  Lines      103662    90009   -13653     
  Branches     1253        0    -1253     
==========================================
- Hits        91512    80528   -10984     
+ Misses      11903     9481    -2422     
+ Partials      247        0     -247
Impacted Files Coverage Δ
cpp/src/parquet/arrow/reader.cc 85.61% <100%> (+0.03%) ⬆️
cpp/src/parquet/arrow/writer.cc 97% <100%> (+0.05%) ⬆️
cpp/src/parquet/arrow/arrow-schema-test.cc 99.55% <100%> (+0.03%) ⬆️
cpp/src/parquet/arrow/arrow-reader-writer-test.cc 95.21% <74.64%> (-0.71%) ⬇️
cpp/src/parquet/arrow/schema.cc 90% <83.78%> (-2.78%) ⬇️
python/pyarrow/tests/test_parquet.py 96.42% <96.42%> (+0.23%) ⬆️
cpp/src/parquet/schema.cc 90.24% <0%> (-0.38%) ⬇️
r/src/recordbatch.cpp
r/R/Table.R
go/arrow/math/uint64_amd64.go
... and 200 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bed52ed...59f144c. Read the comment docs.

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments, but overall this looks OK. The major annoyance is that I think we need to make nanoseconds-as-int64 something that users opt into for the time being. If the default behavior is that files are written in a form that most production Parquet readers don't understand yet (because the new LogicalType facility hasn't trickled down everywhere) seems like recipe for people shooting their toes off.

LogicalType::type get_logical_type(const ::DataType& type) {
std::shared_ptr<const LogicalAnnotation> get_logical_annotation(const ::DataType& type,
int32_t precision,
int32_t scale) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the precision/scale params here should come from the Arrow type object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was all was pretty sloppy; thanks for pointing it out. I've cleaned it up.

if (ty.id() == ArrowId::DECIMAL) {
const auto& dt = static_cast<const ::arrow::Decimal128Type&>(ty);
pr = dt.precision();
sc = dt.scale();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use the type's precision/scale here but not in the next block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up now.

// Result when coercing to nanoseconds
auto s4 = std::shared_ptr<::arrow::Schema>(
new ::arrow::Schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
field("f_ns", t_ns)}));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also use ::arrow::schema factory function for tighter code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to it here and in a few other nearby places.

std::shared_ptr<ArrowType> MakeDecimal128Type(const PrimitiveNode& node) {
const auto& metadata = node.decimal_metadata();
return ::arrow::decimal(metadata.precision, metadata.scale);
static Status MakeArrowDecimal(const std::shared_ptr<const LogicalAnnotation>& annotation,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to get in the habit of passing const T& rather than const shared_ptr<T>& if the function does not need to transfer ownership. I just opened ARROW-5549 about documenting some of our unspoken guidelines around this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the signature here and for all similar functions added to this file.

"Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
"Parquet.");
"Only MILLI, MICRO, and NANO units supported for Arrow timestamps with Parquet.");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could this code path be reached (do we automatically cast to millis/micros)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is reached if your have an Arrow timestamp in seconds and don't give any instructions on how to convert/coerce it.

Currently (before this patch), if you don't ask for int96 and you don't supply an explicit coercion instruction, you fall to here, where the converter is willing to truncate your Arrow nanoseconds to Parquet microseconds, but is unwilling (for some reason) to expand your Arrow seconds to, say, Parquet milliseconds. I don't know the history or reasoning for this latter behavior, so my bias for conserving existing behavior whenever in doubt kicked in, and I retained it.

But it would be both easy and safe (it seems to me) to default to converting seconds to milliseconds (rather than failing) and I've done that in commit c2ac4cc. If you don't approve of this behavior change, we can (more or less) just revert that commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine to me

break;
case ArrowTypeId::BOOL:
type = ParquetType::BOOLEAN;
break;
case ArrowTypeId::UINT8:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_8;
PARQUET_CATCH_NOT_OK(annotation = LogicalAnnotation::Int(8, false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be able to throw (versus making assertions with DCHECK)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added DCHECK's to the convenience creation methods for Int, Time, and Timestamp annotations, and removed the try/catch macro wrapper from the (sometimes indirect) call sites that have valid hard-coded arguments.

break;
case ArrowTypeId::TIME32:
type = ParquetType::INT32;
logical_type = LogicalType::TIME_MILLIS;
PARQUET_CATCH_NOT_OK(annotation = LogicalAnnotation::Time(
false, LogicalAnnotation::TimeUnit::MILLIS));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, does this need to be able to throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

} else {
DCHECK_EQ(TimeUnit::NANO, target_unit);
RETURN_NOT_OK(MultiplyBy(INT64_C(1000000000)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind refactoring this to use a conversion table like we have in

https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/cast.cc#L470

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this and tidied up the the method a bit.

with pytest.raises(ValueError):
pq.write_table(tb, filename, coerce_timestamps='ms')


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a nuisance, but old Parquet readers (i.e. most deployed Big Data applications) aren't going to be able to read the new nanosecond metadata -- for example, most deployed versions of Spark! For version 1.0 files, I think the appropriate default behavior is to write in compatibility mode in millis or micros with the old ConvertedType set, while for version=2.0 I think writing the new Nanosecond annotation is acceptable.

cc @xhochy for any comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so I've reintroduced the old behavior for timestamps if you've got a v1.0 writer; mostly this means that (1) Arrow nanos timestamps are converted to micros by default, and (2) you can't coerce any timestamp to nanoseconds (unless you use int96's). Conformed this and other tests and added some new tests to demonstrate and check these as well.

Can I ask for clarification on something: recall that (after PARQUET-1411) Thrift serialized Parquet schemas always contain both ConvertedType and LogicalType (aka annotation) information (modulo types that don't mutually convert). Are you saying here that for version 1.0 writers (in addition to the Arrow specific change I described above) that the Parquet library proper should not even serialize the LogicalType/annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One further note: the pre-existing design has us asking and answering the same non-trivial questions about timestamp typing twice: once when the schema is built (GetTimestampMetadata()) and again when the data arrays are built (ArrowColumnWriter::WriteTimestamps()). I retained this design, but it's straining under this patch. I believe that after a schema is built, we have all the information needed to provide direct instructions to the column/array writer, but the module doesn't provide any channel for that (at least not an obvious one -- I haven't spent too much time analyzing it). I think a refactor for this would be desirable, but is probably beyond the scope of what I can do right now (given the desire to get 0.14 out in a few weeks, and the "big rename" for ConvertedType still pending and blocked). Just wanted to flag this for you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the current structure isn't ideal, and a refactor is a good idea

@tpboudreau
Copy link
Contributor Author

The test I added in commit 26d7c71 is tripping over an apparently pre-existing bug that produces integer overflow caught by clang's UB sanitizer. More here: https://issues.apache.org/jira/browse/ARROW-5618 .

In order to ensure that the rest of the patch is clean, I've revised the new test to remove the validity bit masks that trigger the bug; they're not essential to the test, and can be replaced if appropriate after the bug is resolved.

@wesm
Copy link
Member

wesm commented Jun 18, 2019

@tpboudreau is this ready for another review?

@tpboudreau
Copy link
Contributor Author

Yes, thanks, @wesm .

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Thanks @tpboudreau for conscientious work on this

"Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
"Parquet.");
"Only MILLI, MICRO, and NANO units supported for Arrow timestamps with Parquet.");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine to me

with pytest.raises(ValueError):
pq.write_table(tb, filename, coerce_timestamps='ms')


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the current structure isn't ideal, and a refactor is a good idea

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants