-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer #4421
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Marking this as WIP while I sort through a few issues. |
Please ignore my earlier comment about temporal types and timezone issues. After reviewing analogous code in parquet-mr, I think I was incorrect in interpreting the fact that Arrow currently translates Arrow timestamps without timezones to Parquet TIMESTAMP_MILLIS/MICROS as implying that the Arrow timestamps were implicitly UTC. The less attenuated explanation is that Arrow just chose the closest available Parquet type (even though it was lossy), and the Parquet TIMESTAMP_* types probably didn't even imply adjusted to UTC until the parquet.thrift TimestampType LogicalType introduced the concept. So the type translations are much simpler: For timestamps: arrow::timestamp($unit) -> parquet::TimestampAnnotation(false, $unit) parquet::TimestampAnnotation(false, $unit) -> arrow::timestamp($unit) And for times: arrow::time32/64($unit) -> parquet::TimeAnnotation(false, $unit) parquet::TimeAnnotation(false, $unit) -> arrow::time32/64($unit) |
This PR is ready for review. |
In
I don't think this is right. If an Arrow timestamp is tz-aware, then it is normalized to UTC. Otherwise isAdjustedToUTC should be false |
I'll do as you ask, but to me that doesn't seem like the natural interpretation of isAdjustedToUTC. I took it to mean that the timestamps were UTC and, for example, could be safely compared with any other timestamps that show isAdjustedToUTC==true. Your interpretation would have arrow::timestamp(..., "CET") and arrow::timestamp(..., "EST") both recorded as parquet::Timestamp(true, ...). A second practical problem would be that a round trip thru parquet would lose information: But I could certainly be missing something. EDIT: small correction in the last sentence -- you couldn't lose timezone awaredness (we'd never assign the empty string), but you would lose information by collapsing all original timezones to one timezone string. |
Parquet and Arrow are semantically distinct in this regard. Parquet does not have a "with timezone" concept, so if we want to preserve the timezone through Parquet round trip we have to use custom schema metadata. So Arrow has 3 different cases
I think there is still some value in communicating UTC-normalization in the third case to other Parquet consumers, otherwise they may interpret the data as localtime which is not what we want. |
cc to @xhochy for additional thoughts. If everyone disagrees with me then I suppose that's OK =) |
I hadn't considered the possibility of using custom metadata to stash the original timezone -- that's a good idea. It still seems to me that adjusted to UTC flag should be false in that third case, but I'll gladly set it however you all decide. |
I've looked quickly into stashing Arrow timestamp timezones as key-value metadata and, while I think it's worthwhile, it's not a trivial addition. So I'm changing this PR back to WIP while I sort thru it. (I'm not able to work on this immediately.) My initial plan is to add keys like "arrow:FIELDNAME.timestamp.timezone"="..." (with crude namespacing) to the file metadata just before writing parquet files and to strip those keys out of the metadata immediately after use on reading parquet files. I'd do this to avoid having the user metadata get cluttered with internal, non-application junk and also hopefully to avoid any need to maintain type information in multiple places if, say, field types change, fields are renamed, maybe names reused, etc. I realize that's a fairly vague sketch, but if anything jumps out as objectionable, please let me know. Otherwise I'll update the PR when I can. |
This PR is ready for review (again, sorry). Two quick notes:
These timezone preservation changes are in an isolated commit, so they can be reverted if the committers decide not to include this functionality.
|
Thanks @tpboudreau -- I will review this and get you some feedback. The time zone issue is annoying, we might want to develop a codified solution for serializing Arrow time zones in the schema (so other Parquet producers can produce the same metadata) |
Thanks @wesm. I think we've been talking past each other on the issue of setting the adjustedToUTC flag in Parquet, and I think I've finally figured out the source of my confusion: are you saying that users of the Arrow c++ library are always expected to present timestamp values that have been converted to UTC when they create timestamp typed columns with any timezone (and therefore that when these values are stored they actually always are UTC values regardless of the timezone parameter)? This possibility had not occurred to me before because (a) AFAICT it's not mentioned in the C++ documentation; and (b) it seems to reduce the timestamp timezone parameter in the C++ API to a mere memo (about what the values were before conversion, or how they should be rendered on display, for example) not a parameter essential to correct interpretation of the data values in the column like all other Arrow type parameters. If the answer to the question above is "yes", then of course you're correct about the proper setting of the isAdjustedToUTC flag. If so, I propose the following to move this patch along more quickly: (1) I'll revert the commit that adds code for preserving the Arrow timezone as Parquet file metadata -- since it's mostly an advisory parameter, it's preservation across a Parquet roundtrip is probably not worth the cost in code complexity; (2) then I'll alter the remaining patch (still containing the vast bulk of the PR's benefit) to set the UTC flag to true if any timezone is present (iow, your earlier direction). Does this sound right to you? |
Yes. See comment in Schema.fbs https://github.com/apache/arrow/blob/master/format/Schema.fbs#L177 The plan you described sounds good to me. It seems that as a result we'll drop the timezone metadata in the round trip for now, but the data will be preserved. I think it's fine if it's returned to the user as UTC time zone for now |
Codecov Report
@@ Coverage Diff @@
## master #4421 +/- ##
==========================================
+ Coverage 88.27% 89.46% +1.18%
==========================================
Files 846 645 -201
Lines 103662 90009 -13653
Branches 1253 0 -1253
==========================================
- Hits 91512 80528 -10984
+ Misses 11903 9481 -2422
+ Partials 247 0 -247
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small comments, but overall this looks OK. The major annoyance is that I think we need to make nanoseconds-as-int64 something that users opt into for the time being. If the default behavior is that files are written in a form that most production Parquet readers don't understand yet (because the new LogicalType facility hasn't trickled down everywhere) seems like recipe for people shooting their toes off.
LogicalType::type get_logical_type(const ::DataType& type) { | ||
std::shared_ptr<const LogicalAnnotation> get_logical_annotation(const ::DataType& type, | ||
int32_t precision, | ||
int32_t scale) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the precision/scale params here should come from the Arrow type object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was all was pretty sloppy; thanks for pointing it out. I've cleaned it up.
if (ty.id() == ArrowId::DECIMAL) { | ||
const auto& dt = static_cast<const ::arrow::Decimal128Type&>(ty); | ||
pr = dt.precision(); | ||
sc = dt.scale(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You use the type's precision/scale here but not in the next block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned up now.
// Result when coercing to nanoseconds | ||
auto s4 = std::shared_ptr<::arrow::Schema>( | ||
new ::arrow::Schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns), | ||
field("f_ns", t_ns)})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also use ::arrow::schema
factory function for tighter code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to it here and in a few other nearby places.
cpp/src/parquet/arrow/schema.cc
Outdated
std::shared_ptr<ArrowType> MakeDecimal128Type(const PrimitiveNode& node) { | ||
const auto& metadata = node.decimal_metadata(); | ||
return ::arrow::decimal(metadata.precision, metadata.scale); | ||
static Status MakeArrowDecimal(const std::shared_ptr<const LogicalAnnotation>& annotation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is good to get in the habit of passing const T&
rather than const shared_ptr<T>&
if the function does not need to transfer ownership. I just opened ARROW-5549 about documenting some of our unspoken guidelines around this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the signature here and for all similar functions added to this file.
"Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with " | ||
"Parquet."); | ||
"Only MILLI, MICRO, and NANO units supported for Arrow timestamps with Parquet."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could this code path be reached (do we automatically cast to millis/micros)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is reached if your have an Arrow timestamp in seconds and don't give any instructions on how to convert/coerce it.
Currently (before this patch), if you don't ask for int96 and you don't supply an explicit coercion instruction, you fall to here, where the converter is willing to truncate your Arrow nanoseconds to Parquet microseconds, but is unwilling (for some reason) to expand your Arrow seconds to, say, Parquet milliseconds. I don't know the history or reasoning for this latter behavior, so my bias for conserving existing behavior whenever in doubt kicked in, and I retained it.
But it would be both easy and safe (it seems to me) to default to converting seconds to milliseconds (rather than failing) and I've done that in commit c2ac4cc. If you don't approve of this behavior change, we can (more or less) just revert that commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine to me
cpp/src/parquet/arrow/schema.cc
Outdated
break; | ||
case ArrowTypeId::BOOL: | ||
type = ParquetType::BOOLEAN; | ||
break; | ||
case ArrowTypeId::UINT8: | ||
type = ParquetType::INT32; | ||
logical_type = LogicalType::UINT_8; | ||
PARQUET_CATCH_NOT_OK(annotation = LogicalAnnotation::Int(8, false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be able to throw (versus making assertions with DCHECK)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added DCHECK's to the convenience creation methods for Int, Time, and Timestamp annotations, and removed the try/catch macro wrapper from the (sometimes indirect) call sites that have valid hard-coded arguments.
cpp/src/parquet/arrow/schema.cc
Outdated
break; | ||
case ArrowTypeId::TIME32: | ||
type = ParquetType::INT32; | ||
logical_type = LogicalType::TIME_MILLIS; | ||
PARQUET_CATCH_NOT_OK(annotation = LogicalAnnotation::Time( | ||
false, LogicalAnnotation::TimeUnit::MILLIS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, does this need to be able to throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
cpp/src/parquet/arrow/writer.cc
Outdated
} else { | ||
DCHECK_EQ(TimeUnit::NANO, target_unit); | ||
RETURN_NOT_OK(MultiplyBy(INT64_C(1000000000))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind refactoring this to use a conversion table like we have in
https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/cast.cc#L470
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this and tidied up the the method a bit.
with pytest.raises(ValueError): | ||
pq.write_table(tb, filename, coerce_timestamps='ms') | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is a nuisance, but old Parquet readers (i.e. most deployed Big Data applications) aren't going to be able to read the new nanosecond metadata -- for example, most deployed versions of Spark! For version 1.0
files, I think the appropriate default behavior is to write in compatibility mode in millis or micros with the old ConvertedType set, while for version=2.0
I think writing the new Nanosecond annotation is acceptable.
cc @xhochy for any comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so I've reintroduced the old behavior for timestamps if you've got a v1.0 writer; mostly this means that (1) Arrow nanos timestamps are converted to micros by default, and (2) you can't coerce any timestamp to nanoseconds (unless you use int96's). Conformed this and other tests and added some new tests to demonstrate and check these as well.
Can I ask for clarification on something: recall that (after PARQUET-1411) Thrift serialized Parquet schemas always contain both ConvertedType and LogicalType (aka annotation) information (modulo types that don't mutually convert). Are you saying here that for version 1.0 writers (in addition to the Arrow specific change I described above) that the Parquet library proper should not even serialize the LogicalType/annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One further note: the pre-existing design has us asking and answering the same non-trivial questions about timestamp typing twice: once when the schema is built (GetTimestampMetadata()) and again when the data arrays are built (ArrowColumnWriter::WriteTimestamps()). I retained this design, but it's straining under this patch. I believe that after a schema is built, we have all the information needed to provide direct instructions to the column/array writer, but the module doesn't provide any channel for that (at least not an obvious one -- I haven't spent too much time analyzing it). I think a refactor for this would be desirable, but is probably beyond the scope of what I can do right now (given the desire to get 0.14 out in a few weeks, and the "big rename" for ConvertedType still pending and blocked). Just wanted to flag this for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the current structure isn't ideal, and a refactor is a good idea
The test I added in commit 26d7c71 is tripping over an apparently pre-existing bug that produces integer overflow caught by clang's UB sanitizer. More here: https://issues.apache.org/jira/browse/ARROW-5618 . In order to ensure that the rest of the patch is clean, I've revised the new test to remove the validity bit masks that trigger the bug; they're not essential to the test, and can be replaced if appropriate after the bug is resolved. |
This reverts commit f98a108.
@tpboudreau is this ready for another review? |
Yes, thanks, @wesm . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Thanks @tpboudreau for conscientious work on this
"Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with " | ||
"Parquet."); | ||
"Only MILLI, MICRO, and NANO units supported for Arrow timestamps with Parquet."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine to me
with pytest.raises(ValueError): | ||
pq.write_table(tb, filename, coerce_timestamps='ms') | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the current structure isn't ideal, and a refactor is a good idea
This PR causes the Arrow Parquet I/O facility to recognize Parquet logical annotations (parquet.thrift LogicalType's) on read and to generate them (rather than converted types) on write.
Since Parquet logical annotations include nanosecond timestamps, this patch satisfies the feature requested in ARROW-3729.