Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing #135: Adding support for pivot #150

Merged
merged 3 commits into from
Sep 10, 2017
Merged

Conversation

imarios
Copy link
Contributor

@imarios imarios commented Jun 10, 2017

More on pivot: https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

case class G(a: String, b: String, x: Int)
val t = TypedDataset.create(Seq(
   G("a","x", 1), G("a","y", 2), G("a","z", 2), 
   G("a","z", 1), G("a","z", 22), G("b","y", 20), G("b","x", 10))
)

 t.groupBy(t('a)).pivot(t('b), /*the values we want to pivot*/ "x"::"z"::"y"::HNil).agg(sum(t('x)))
res1: frameless.TypedDataset[(String, Option[Long], Option[Long], Option[Long])] 
   = [_1: string, _2: bigint ... 2 more fields]
// Since a missing value on a pivot is encoded as null in Spark, we encode the aggregated values on pivot to be Option[_]
// The resulting type comprises of the groupBy column(s) followed by the aggregated values.

res1.collect().run().foreach(println)
> (b,Some(10),None,Some(20)) // "z" doesn't exists for group "b" so the result of sum is None
> (a,Some(1),Some(25),Some(2))

// More aggregations generate more columns. Each pivoted value (here x,z,y) will have one triplet of aggregated values starting from left to right.  The return type is adjusted accordingly. 
scala> t.groupBy(t('a)).pivot(t('b), "x"::"z"::"y"::HNil).agg(sum(t('x)), count[G], first(t('x)))
res10: frameless.TypedDataset[(String, Option[Long], Option[Long], Option[Int], Option[Long], Option[Long], Option[Int], Option[Long], Option[Long], Option[Int])] = [_1: string, _2: bigint ... 8 more fields]

import shapeless.ops.hlist.Prepend

/* By Jeremy Smith */
trait Repeat[L <: HList, N <: Nat] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeremyrsmith thank you for helping me with Repeat!

@codecov-io
Copy link

codecov-io commented Jun 10, 2017

Codecov Report

Merging #150 into master will increase coverage by 0.43%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #150      +/-   ##
==========================================
+ Coverage   93.52%   93.96%   +0.43%     
==========================================
  Files          28       29       +1     
  Lines         649      663      +14     
  Branches       11       11              
==========================================
+ Hits          607      623      +16     
+ Misses         42       40       -2
Impacted Files Coverage Δ
dataset/src/main/scala/frameless/ops/Repeat.scala 100% <100%> (ø)
...aset/src/main/scala/frameless/ops/GroupByOps.scala 98.14% <100%> (+2.91%) ⬆️
...ataset/src/main/scala/frameless/TypedDataset.scala 93.38% <0%> (+0.82%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 91acd29...174c01a. Read the comment docs.

@imarios imarios force-pushed the pivot branch 2 times, most recently from b2f038c to 1fb5949 Compare June 11, 2017 06:03
@imarios imarios closed this Jun 12, 2017
@imarios imarios reopened this Jun 12, 2017
@imarios
Copy link
Contributor Author

imarios commented Jun 13, 2017

@OlivierBlanvillain @kanterov hey guys, let me know if you have any feedback on this new feature.

@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Jun 13, 2017

'll try to do the usual deep review this week-end, but it looks reasonable from a 60 sec glance 👍

@imarios imarios added this to the 0.4-release milestone Jun 16, 2017
@imarios
Copy link
Contributor Author

imarios commented Jun 16, 2017

@OlivierBlanvillain thanks for taking the time to review this.

@@ -82,6 +82,11 @@ class GroupedByManyOps[T, TK <: HList, K <: HList, KT](
private def retainGroupColumns: Boolean = {
self.dataset.sqlContext.getConf("spark.sql.retainGroupColumns", "true").toBoolean
}

def pivot[P: CatalystPivotable, Values <: HList](pivotColumn: TypedColumn[T, P],
values: Values)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that IntelliJ autoformatting? It looks messed up :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename Values to V just for consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think we forgot the scaladoc in this file, but that could be for a subsequent PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, yes, let me bring this to to our standard formatting style.

Copy link
Contributor Author

@imarios imarios Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V is already used class GroupedBy1Ops[K1, V]. That's why I used Values. It's also more expressive. We are using V usually to express the type of a dataset (TypedDataset[V]).

class GroupedBy1Ops[K1, V](
  self: TypedDataset[V],
  g1: TypedColumn[V, K1]
)

implicit def matchesAll[L <: HList, U, Rest <: HList]
(implicit
filtered: Filter.Aux[L, U, Rest],
remainedSame: Rest =:= L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about FilterNot.Aux[L, U, HNil] instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you probably if MatchesAll is equivalant to a FilterNot you probably don't need a new type class at all

Copy link
Contributor Author

@imarios imarios Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about this one ... So MatchesAll[A,B] exists only if A is an HList and all the elements of the HList are of type B. FilterNot doesn't exactly have the same semantics. I can probably implement MatchesAll in terms of FilterNot.

Essentially MatchesAll breaks down the complexity of Pivot's implicit resolution step. Like in the values space you break down complex functions into smaller reusable functions, here I broke down that complex pivot type-level inductive step into MatchesAll, Repeat, etc.

Copy link
Contributor

@OlivierBlanvillain OlivierBlanvillain Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter.Aux[L, U].Out =:= L means that selecting all elements of type U in L is the identity function (that is, all elements of L have type U)

FilterNot.Aux[L, U].Out =:= HNil means that removing all elements of type U is L is the empty list (that is, all elements of L have type U)

I'm I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean replace MatchesAll[A,B] with FilterNot.Aux[A, B].Out =:= HNil ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm ....

Cannot prove that shapeless.ops.hlist.FilterNot[shapeless.::[Boolean,shapeless.::[Boolean,shapeless.HNil]],Boolean]#Out =:= shapeless.HNil

Copy link
Contributor

@OlivierBlanvillain OlivierBlanvillain Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, FilterNot.Aux[Values, P, HNil] (← proper way to write FilterNot[Values, P]#Out =:= HNil), that was my original suggestion 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ any ideas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilterNot[Values, P]#Out =:= HNil doesn't work ... FilterNot.Aux[Values, P, HNil] works :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the replacement, you are right, this doesn't worth the brain energy and real-estate of another type class.

import shapeless.{HList, Nat, Succ}
import shapeless.ops.hlist.Prepend

/* By Jeremy Smith */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/** Typeclass supporting repeating L-typed HLists N times.
  *  
  * Repeat[Int :: String :: HNil, Nat._2].Out =:=
  * Int :: String :: Int :: String :: HNil
  *
  * By Jeremy Smith. To be replaced by `shapeless.ops.hlists.Repeat`
  * once (https://github.com/milessabin/shapeless/pull/730 is published.
  */

}

/** Represents a typed Pivot operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what's a Pivot and what it is supposed to do? I could try to reverse engineer it the from your test and types but that would be a lot of effect 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry didn't see your text in the PR. I think it would still be nice to have that info in a scaladoc here. I will continue the review with that info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's add more scala doc here as a separate PR.

private case class XString(s1: String, s2: String, s3: String)
private case class XLong(l1: Long, l2: Long, l3: Long)

class MatchesAllTest extends TypedDatasetSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just make this object MatchesAllTest { ... } since there is nothing happening at run time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer needed since this type class is removed.

tc: AggregateTypes.Aux[T, AggrColumns, AggrColumnTypes],
tl1: ToList[GroupedColumns, Any],
tl2: ToList[AggrColumns, Any],
tl3: ToList[Values, Any],
Copy link
Contributor

@OlivierBlanvillain OlivierBlanvillain Jun 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't ask for a type class to do this, just implement it by by hand: (hopefully it's both faster to compile and to run!)

def mapAny[X](h: HList)(f: Any => X): List[X] =
  h match {
    case HNil    => Nil
    case x :: xs => f(x) :: mapAny(xs)(f)
  }

Copy link
Contributor

@OlivierBlanvillain OlivierBlanvillain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very impressive work @imarios! This is one of those operations that I would think "yeah it's probably too hard to do on the type-level..." 😄

toTuple: Tupler.Aux[OutAsHList, Out],
encoder: TypedEncoder[Out]
): TypedDataset[Out] = {
val y: Seq[Column] = aggrColumns.toList[Any].map(_.asInstanceOf[TypedAggregate[_,_]].expr).map(i => new Column(i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fuse the two maps?

@@ -173,4 +185,52 @@ class GroupedBy2Ops[K1, K2, V](
def flatMapGroups[U: TypedEncoder](f: ((K1, K2), Iterator[V]) => TraversableOnce[U]): TypedDataset[U] = {
underlying.flatMapGroups(f)
}

def pivot[P: CatalystPivotable, Values <: HList](pivotColumn: TypedColumn[V, P],
values: Values)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be first time we ask users to write an HList. Could we avoid it using SingletonProductArgs? (We have any example of usage here)

def withCustomGenX4: Gen[Vector[X4[String, String, Int, Boolean]]] = {
val kvPairGen: Gen[X4[String, String, Int, Boolean]] = for {
a <- Gen.oneOf(Seq("1", "2", "3", "4"))
b <- Gen.oneOf(Seq("a", "b", "c"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the custom gen to be only constrained on the pivot values, why do you need the other ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@imarios
Copy link
Contributor Author

imarios commented Jun 19, 2017

@OlivierBlanvillain thank you very much for taking such a close look at this!! This PR took a lot of time :).

@imarios imarios force-pushed the pivot branch 2 times, most recently from bf91a03 to 4df65f7 Compare June 19, 2017 15:24
@imarios
Copy link
Contributor Author

imarios commented Jun 19, 2017

@OlivierBlanvillain the only thing remaining is the pivot values added right now as an HList. Currently (vanilla Spark) values are added as a regular list. Since we want type info on all types and the length of the list, we ask them to use an HList. So it sort of gives a nice analogy.

Nevertheless, I do recognize the benefit of having this replaced with the SingletonProductArgs macro in terms of API complexity. Note thought that this will come at an added cost to the IDE not able to infer the right types. In addition, the list of arguments need to have the first one to be the pivot column and the other ones to be the pivot values, which will also make it a bit more complex on the inductive step.

How would you feel if we keep it as is, and investigate this in a separate PR?

@OlivierBlanvillain
Copy link
Contributor

I think I would prefer not to expose HLists to the users, mainly for consistency for the API that are already implemented. The solution for IntelliJ support is to special special case the method with overloading on small arities (I think currently have 1, 2, 3, 4, 5 in the code base?). Each of these small arity method then forwards to the generic one that's built using SingletonProductArgs.

You might also want to change the syntax a bit to ds.groupBy(t('a)).pivot(t('b))("x", "z", "y") to avoid messing with head and tail of the arguments, you could just have pivot(t('b)) return an instance of SingletonProductArgs.

@imarios
Copy link
Contributor Author

imarios commented Jun 20, 2017

@OlivierBlanvillain ok, let me take another pass on this. My vacation is close to an end and will be traveling back to the US today (I was staying home in Cyprus for the past month). I will probably delay looking at this until (hopefully) the end o the week. The Pivot agg object is already using ProductArgs, so I have a good understanding on how to proceed there. I will probably use ProductArgs rather than SingletonProductArgs since we don't care lifting the actual values at the type-level (we just want to make sure that the types are the same as the pivot column P).

@imarios
Copy link
Contributor Author

imarios commented Jul 2, 2017

@OlivierBlanvillain: So the syntax you had ds.groupBy(t('a)).pivot(t('b))("x", "z", "y") will not work ... the compiler will try to apply ("x", "z", "y") as a second parameter list to the pivot method and it will fail with a "too many arguments for method pivot" ... this must be a common Scalac puzzler?

To get this working you can do ds.groupBy(t('a)).pivot(t('b)).apply("x", "z", "y")

Finally, I am having a very peculiar issue ...

The intermediate class representing the computation of the pivot before any values are giving (for a current luck of a better name) I have:

final case class PivotNotValues[T, GroupedColumns <: HList, PivotType](
  ds: TypedDataset[T],
  groupedBy: GroupedColumns,
  pivotedBy: TypedColumn[T, PivotType]) extends ProductArgs {

  def applyProduct[Values <: HList](values: Values)(
    implicit validValues: FilterNot.Aux[Values, PivotType, HNil]
  ): Pivot[T, GroupedColumns, PivotType, Values] = Pivot(ds, groupedBy, pivotedBy, values)
}

Now, this will work with the .apply() syntax, but I cannot make the FilterNot.Aux[Values, PivotType, HNil] constrain to work :/

scala> t.groupBy(t('a)).pivot(t('b)).apply("x", "y")
<console>:24: error: could not find implicit value for parameter validValues: shapeless.ops.hlist.FilterNot.Aux[shapeless.::[String("x"),shapeless.::[String("y"),shapeless.HNil]],String,shapeless.HNil]
       t.groupBy(t('a)).pivot(t('b)).apply("x", "y")

I can keep trying to figure out what is going on ... but honestly I am not really sure the requirement of not exposing any HList to the API is that hard. I think most of our users are already basic shapeless users ...

WDYT?

It also adds quite a bit of a complexity on the implementation ...

@imarios
Copy link
Contributor Author

imarios commented Jul 6, 2017

@OlivierBlanvillain ping :)

@imarios imarios closed this Sep 4, 2017
@imarios imarios reopened this Sep 4, 2017
@imarios
Copy link
Contributor Author

imarios commented Sep 5, 2017

@OlivierBlanvillain :)

@OlivierBlanvillain
Copy link
Contributor

@imarios Hey, sorry for the long delay, forgot about this one ^^'

I would still be for a SingletonProductArgs solution, just for the sake of consistency with other APIs. Maybe with another name than apply, like t.groupBy(t('a)).pivot(t('b)).on("x", "y") if on makes sense here.

About your FilterNot error it looks like scalac does not have the same precision for the HList (Values = "x" :: "y" :: HNil) and the filter (PivotType = String). Instead of a FilterNot you could maybe try asking for a ToList[Values, PivotType] which uses sub typing instead of type equality.

@imarios
Copy link
Contributor Author

imarios commented Sep 10, 2017

@OlivierBlanvillain done! thanks for pushing me to get this one right :). great tip on ToList. Let me know if you notice anything else.

@OlivierBlanvillain
Copy link
Contributor

LGTM
Thanks for the PR, this is great work!

@imarios
Copy link
Contributor Author

imarios commented Sep 10, 2017

Thank you @OlivierBlanvillain ! squashing and merging.

@imarios imarios merged commit 14d2daf into typelevel:master Sep 10, 2017
@imarios imarios mentioned this pull request Sep 10, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants