Skip to content

Commit

Permalink
added stddev_pop and stddev_samp
Browse files Browse the repository at this point in the history
  • Loading branch information
GrafBlutwurst committed Jul 10, 2017
1 parent 47e0c8f commit 8422dcb
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,44 @@ trait AggregateFunctions {
new TypedAggregate[T, Double](untyped.stddev(column.untyped))
}

/**
* Aggregate function: returns the standard deviation of a column by population.
*
* @note In Spark stddev always returns Double
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala#L137]]
*
* apache/spark
*/
def stddev_pop[A, T](column: TypedColumn[T, A])(
implicit
evCanBeDoubleA: CatalystCast[A, Double]
): TypedAggregate[T, Option[Double]] = {
implicit val c1 = column.uencoder

new TypedAggregate[T, Option[Double]](
untyped.stddev_pop(column.cast[Double].untyped)
)
}

/**
* Aggregate function: returns the standard deviation of a column by sample.
*
* @note In Spark stddev always returns Double
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala#L154]]
*
* apache/spark
*/
def stddev_samp[A, T](column: TypedColumn[T, A])(
implicit
evCanBeDoubleA: CatalystCast[A, Double]
): TypedAggregate[T, Option[Double]] = {
implicit val c1 = column.uencoder

new TypedAggregate[T, Option[Double]](
untyped.stddev_samp(column.cast[Double].untyped)
)
}

/** Aggregate function: returns the maximum value of the column in a group.
*
* apache/spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,42 @@ class AggregateFunctionsTests extends TypedDatasetSuite {
): Prop = univariatePropTemplate(xs)(skewness[A,X2[Int, A]],org.apache.spark.sql.functions.skewness)


check(forAll(prop[Double] _))
check(forAll(prop[Int] _))
check(forAll(prop[Short] _))
check(forAll(prop[BigDecimal] _))
check(forAll(prop[Byte] _))
}

test("stddev_pop") {
val spark = session
import spark.implicits._

def prop[A: TypedEncoder](xs: List[X2[Int, A]])(
implicit
encEv: Encoder[(Int, A)],
evCanBeDoubleA: CatalystCast[A, Double]
): Prop = univariatePropTemplate(xs)(stddev_pop[A,X2[Int, A]],org.apache.spark.sql.functions.stddev_pop)


check(forAll(prop[Double] _))
check(forAll(prop[Int] _))
check(forAll(prop[Short] _))
check(forAll(prop[BigDecimal] _))
check(forAll(prop[Byte] _))
}

test("stddev_samp") {
val spark = session
import spark.implicits._

def prop[A: TypedEncoder](xs: List[X2[Int, A]])(
implicit
encEv: Encoder[(Int, A)],
evCanBeDoubleA: CatalystCast[A, Double]
): Prop = univariatePropTemplate(xs)(stddev_samp[A,X2[Int, A]],org.apache.spark.sql.functions.stddev_samp)


check(forAll(prop[Double] _))
check(forAll(prop[Int] _))
check(forAll(prop[Short] _))
Expand Down

0 comments on commit 8422dcb

Please sign in to comment.