-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
allow creating TypedDataset from DFs with different column order #259
allow creating TypedDataset from DFs with different column order #259
Conversation
as they can appear when loading partitioned datasets from e.g. parquet
Codecov Report
@@ Coverage Diff @@
## master #259 +/- ##
==========================================
+ Coverage 96.56% 96.57% +<.01%
==========================================
Files 51 51
Lines 874 876 +2
Branches 11 10 -1
==========================================
+ Hits 844 846 +2
Misses 30 30
Continue to review full report at Codecov.
|
@@ -1036,8 +1036,15 @@ object TypedDataset { | |||
val shouldReshape = output.zip(targetColNames).exists { | |||
case (expr, colName) => expr.name != colName | |||
} | |||
|
|||
val reshaped = if (shouldReshape) df.toDF(targetColNames: _*) else df | |||
val canSelect = targetColNames.toSet.subsetOf(output.map(_.name).toSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give an example where this would be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you'd remove the .toDF("b", "a")
call from the test case this one would be such a case where the column names are still _1
and _2
from the original dataframe.
Frameless would be successful here if the types would align.
val canSelect = targetColNames.toSet.subsetOf(output.map(_.name).toSet) | ||
|
||
val reshaped = if (shouldReshape && canSelect) { | ||
df.select(targetColNames.head, targetColNames.tail:_*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not totally clear whether or not systematically doing a select would create a lot of additional work at runtime. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is i don't know how to otherwise reshape the dataframe into the correct form. I would suspect this is a rather cheap operation as it only shifts references, as it is guaranteed to only fetch a subset of the existing fields.
Actually the plan for the test case collect()
is:
Project [_1#2 AS b#11, _2#3 AS a#12]
+- LocalRelation [_1#2, _2#3]
which is just shifting the fields.
And the select is only done if the column names are not already in the right order.
(a1: A, b1: B) => { | ||
val ds = TypedDataset.create( | ||
Vector((b1, a1)) | ||
).dataset.toDF("b", "a").as[X2[A, B]](TypedExpressionEncoder[X2[A, B]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you have to pass this TypedExpressionEncoder
explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise DataFrame.as
is going to use spark encoders.
TypedDataset.create(ds).collect().run().head ?= X2(a1, b1) | ||
} | ||
} | ||
check(prop[Double, Double]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe to prop[X1[Double], X1[X1[Double]]]
or something, because having both Double do no check for proper column ordering :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totally true, gonna change that.
LGTM, thanks! |
as they can appear when loading partitioned datasets from e.g. parquet.
Vanilla spark is able to deserialize from dataframes where the fields are in different order.
The reshaping in
TypedDataset.createUnsafe
just renamed column so that types did not align anymore, although beforehand, in this particular case, it actually should have worked.Given a dataframe of
("a": A, "b": B, "c": C)
we want a Dataset ofcase class X(b: B, c: C, a: A)
from it. When usingTypedDataset.createUnsafe
we an encoder which will try to read the underlying relation/dataframe as:("b": A, "c": B, "a": C)
which will clearly fail to serialize/deserialize toX
.To work around this issue, we now check if the target columns are a subset of the source df columns and if so, do a select instead of a
.toDf(names)
which only renames.