Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow creating TypedDataset from DFs with different column order #259

Merged
merged 2 commits into from
Feb 21, 2018

Conversation

mfelsche
Copy link
Contributor

as they can appear when loading partitioned datasets from e.g. parquet.

Vanilla spark is able to deserialize from dataframes where the fields are in different order.
The reshaping in TypedDataset.createUnsafe just renamed column so that types did not align anymore, although beforehand, in this particular case, it actually should have worked.

Given a dataframe of ("a": A, "b": B, "c": C) we want a Dataset of case class X(b: B, c: C, a: A) from it. When using TypedDataset.createUnsafe we an encoder which will try to read the underlying relation/dataframe as: ("b": A, "c": B, "a": C) which will clearly fail to serialize/deserialize to X.

To work around this issue, we now check if the target columns are a subset of the source df columns and if so, do a select instead of a .toDf(names) which only renames.

as they can appear when loading partitioned datasets from e.g. parquet
@codecov-io
Copy link

codecov-io commented Feb 19, 2018

Codecov Report

Merging #259 into master will increase coverage by <.01%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #259      +/-   ##
==========================================
+ Coverage   96.56%   96.57%   +<.01%     
==========================================
  Files          51       51              
  Lines         874      876       +2     
  Branches       11       10       -1     
==========================================
+ Hits          844      846       +2     
  Misses         30       30
Impacted Files Coverage Δ
...ataset/src/main/scala/frameless/TypedDataset.scala 100% <ø> (ø) ⬆️
...ain/scala/frameless/functions/UnaryFunctions.scala 100% <0%> (ø) ⬆️
...ore/src/main/scala/frameless/CatalystOrdered.scala 100% <0%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6aca77e...5bb8162. Read the comment docs.

@@ -1036,8 +1036,15 @@ object TypedDataset {
val shouldReshape = output.zip(targetColNames).exists {
case (expr, colName) => expr.name != colName
}

val reshaped = if (shouldReshape) df.toDF(targetColNames: _*) else df
val canSelect = targetColNames.toSet.subsetOf(output.map(_.name).toSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example where this would be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you'd remove the .toDF("b", "a") call from the test case this one would be such a case where the column names are still _1 and _2 from the original dataframe.
Frameless would be successful here if the types would align.

val canSelect = targetColNames.toSet.subsetOf(output.map(_.name).toSet)

val reshaped = if (shouldReshape && canSelect) {
df.select(targetColNames.head, targetColNames.tail:_*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not totally clear whether or not systematically doing a select would create a lot of additional work at runtime. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is i don't know how to otherwise reshape the dataframe into the correct form. I would suspect this is a rather cheap operation as it only shifts references, as it is guaranteed to only fetch a subset of the existing fields.

Actually the plan for the test case collect() is:

Project [_1#2 AS b#11, _2#3 AS a#12]
+- LocalRelation [_1#2, _2#3]

which is just shifting the fields.

And the select is only done if the column names are not already in the right order.

(a1: A, b1: B) => {
val ds = TypedDataset.create(
Vector((b1, a1))
).dataset.toDF("b", "a").as[X2[A, B]](TypedExpressionEncoder[X2[A, B]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have to pass this TypedExpressionEncoder explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise DataFrame.as is going to use spark encoders.

TypedDataset.create(ds).collect().run().head ?= X2(a1, b1)
}
}
check(prop[Double, Double])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe to prop[X1[Double], X1[X1[Double]]] or something, because having both Double do no check for proper column ordering :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally true, gonna change that.

@OlivierBlanvillain
Copy link
Contributor

LGTM, thanks!

@OlivierBlanvillain OlivierBlanvillain merged commit 53fcf79 into typelevel:master Feb 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants