Skip to content

Commit

Permalink
added covar_samp as well
Browse files Browse the repository at this point in the history
  • Loading branch information
GrafBlutwurst committed Jul 10, 2017
1 parent bfeb1e7 commit 83ec77f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ trait AggregateFunctions {
/**
* Aggregate function: returns the covariance of two collumns.
*
* @note In Spark corr always returns Double
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala#L93]]
* @note In Spark covar_pop always returns Double
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala#L82]]
*
* apache/spark
*/
Expand All @@ -221,4 +221,25 @@ trait AggregateFunctions {
untyped.covar_pop(column1.cast[Double].untyped, column2.cast[Double].untyped)
)
}

/**
* Aggregate function: returns the covariance of two collumns.
*
* @note In Spark covar_samp always returns Double
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala#L93]]
*
* apache/spark
*/
def covar_samp[A, B, T](column1: TypedColumn[T, A], column2: TypedColumn[T, B])(
implicit
evCanBeDoubleA: CatalystCast[A, Double],
evCanBeDoubleB: CatalystCast[B, Double]
): TypedAggregate[T, Option[Double]] = {
implicit val c1 = column1.uencoder
implicit val c2 = column2.uencoder

new TypedAggregate[T, Option[Double]](
untyped.covar_samp(column1.cast[Double].untyped, column2.cast[Double].untyped)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,22 @@ class AggregateFunctionsTests extends TypedDatasetSuite {
check(forAll(prop[Short, Int] _))
check(forAll(prop[BigDecimal, Byte] _))
}

test("covar_samp") {
val spark = session
import spark.implicits._

def prop[A: TypedEncoder, B: TypedEncoder](xs: List[X3[Int, A, B]])(
implicit
encEv: Encoder[(Int, A, B)],
evCanBeDoubleA: CatalystCast[A, Double],
evCanBeDoubleB: CatalystCast[B, Double]
): Prop = biVariatePropTemplate(xs)(covar_samp[A,B,X3[Int, A, B]],org.apache.spark.sql.functions.covar_samp)

check(forAll(prop[Double, Double] _))
check(forAll(prop[Double, Int] _))
check(forAll(prop[Int, Int] _))
check(forAll(prop[Short, Int] _))
check(forAll(prop[BigDecimal, Byte] _))
}
}

0 comments on commit 83ec77f

Please sign in to comment.