Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierBlanvillain authored and imarios committed Jan 30, 2018
1 parent ac047f9 commit bc49c8a
Showing 1 changed file with 73 additions and 36 deletions.
109 changes: 73 additions & 36 deletions dataset/src/main/scala/frameless/functions/NonAggregateFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ trait NonAggregateFunctions {
* apache/spark
*/
def abs[A, B, T](column: AbstractTypedColumn[T, A])
(implicit
evAbs: CatalystAbsolute[A, B],
enc:TypedEncoder[B]): column.ThisType[T, B] =
column.typed(untyped.abs(column.untyped))(enc)
(implicit
i0: CatalystAbsolute[A, B],
i1: TypedEncoder[B]
): column.ThisType[T, B] =
column.typed(untyped.abs(column.untyped))(i1)

/** Non-Aggregate function: returns the acos of a numeric column
*
Expand All @@ -24,9 +25,8 @@ trait NonAggregateFunctions {
* apache/spark
*/
def acos[A, T](column: AbstractTypedColumn[T, A])
(implicit
evCanBeDouble: CatalystCast[A, Double]): column.ThisType[T, Double] =
column.typed(untyped.acos(column.cast[Double].untyped))
(implicit i0: CatalystCast[A, Double]): column.ThisType[T, Double] =
column.typed(untyped.acos(column.cast[Double].untyped))

/** Non-Aggregate function: returns true if value is contained with in the array in the specified column
*
Expand All @@ -42,9 +42,8 @@ trait NonAggregateFunctions {
* apache/spark
*/
def atan[A, T](column: AbstractTypedColumn[T,A])
(implicit
evCanBeDouble: CatalystCast[A, Double]): column.ThisType[T, Double] =
column.typed(untyped.atan(column.cast[Double].untyped))
(implicit i0: CatalystCast[A, Double]): column.ThisType[T, Double] =
column.typed(untyped.atan(column.cast[Double].untyped))

/** Non-Aggregate function: returns the asin of a numeric column
*
Expand All @@ -53,9 +52,8 @@ trait NonAggregateFunctions {
* apache/spark
*/
def asin[A, T](column: AbstractTypedColumn[T, A])
(implicit
evCanBeDouble: CatalystCast[A, Double]): column.ThisType[T, Double] =
column.typed(untyped.asin(column.cast[Double].untyped))
(implicit i0: CatalystCast[A, Double]): column.ThisType[T, Double] =
column.typed(untyped.asin(column.cast[Double].untyped))

/** Non-Aggregate function: returns the angle theta from the conversion of rectangular coordinates (x, y) to
* polar coordinates (r, theta).
Expand All @@ -64,21 +62,42 @@ trait NonAggregateFunctions {
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala#L67]]
* apache/spark
*/
def atan2[A, B, T](l: AbstractTypedColumn[T,A], r: AbstractTypedColumn[T, B])
def atan2[A, B, T](l: TypedColumn[T, A], r: TypedColumn[T, B])
(implicit
evCanBeDoubleL: CatalystCast[A, Double],
evCanBeDoubleR: CatalystCast[B, Double]
): r.ThisType[T, Double] =
r.typed(untyped.atan2(l.cast[Double].untyped, r.cast[Double].untyped))
i0: CatalystCast[A, Double],
i1: CatalystCast[B, Double]
): TypedColumn[T, Double] =
r.typed(untyped.atan2(l.cast[Double].untyped, r.cast[Double].untyped))

def atan2[B, T](l: Double, r: AbstractTypedColumn[T, B])
(implicit
evCanBeDoubleR: CatalystCast[B, Double]): r.ThisType[T, Double] = atan2(r.lit(l), r)
/** Non-Aggregate function: returns the angle theta from the conversion of rectangular coordinates (x, y) to
* polar coordinates (r, theta).
*
* Spark will expect a Double value for this expression. See:
* [[https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala#L67]]
* apache/spark
*/
def atan2[A, B, T](l: TypedAggregate[T, A], r: TypedAggregate[T, B])
(implicit
i0: CatalystCast[A, Double],
i1: CatalystCast[B, Double]
): TypedAggregate[T, Double] =
r.typed(untyped.atan2(l.cast[Double].untyped, r.cast[Double].untyped))

def atan2[B, T](l: Double, r: TypedColumn[T, B])
(implicit i0: CatalystCast[B, Double]): TypedColumn[T, Double] =
atan2(r.lit(l), r)

def atan2[A, T](l: TypedColumn[T, A], r: Double)
(implicit i0: CatalystCast[A, Double]): TypedColumn[T, Double] =
atan2(l, l.lit(r))

def atan2[A, T](l: AbstractTypedColumn[T, A], r: Double)
(implicit
evCanBeDoubleL: CatalystCast[A, Double]): l.ThisType[T, Double] =
atan2(l, l.lit(r)).asInstanceOf[l.ThisType[T, Double]]
def atan2[B, T](l: Double, r: TypedAggregate[T, B])
(implicit i0: CatalystCast[B, Double]): TypedAggregate[T, Double] =
atan2(r.lit(l), r)

def atan2[A, T](l: TypedAggregate[T, A], r: Double)
(implicit i0: CatalystCast[A, Double]): TypedAggregate[T, Double] =
atan2(l, l.lit(r))

/** Non-Aggregate function: Returns the string representation of the binary value of the given long
* column. For example, bin("12") returns "1100".
Expand Down Expand Up @@ -128,9 +147,8 @@ trait NonAggregateFunctions {
private[functions] def this(condition: AbstractTypedColumn[T, Boolean], value: AbstractTypedColumn[T, A]) =
this(untyped.when(condition.untyped, value.untyped))

def when(condition: AbstractTypedColumn[T, Boolean], value: AbstractTypedColumn[T, A]): When[T, A] = new When[T, A](
untypedC.when(condition.untyped, value.untyped)
)
def when(condition: AbstractTypedColumn[T, Boolean], value: AbstractTypedColumn[T, A]): When[T, A] =
new When[T, A](untypedC.when(condition.untyped, value.untyped))

def otherwise(value: AbstractTypedColumn[T, A]): value.ThisType[T, A] =
value.typed(untypedC.otherwise(value.untyped))(value.uencoder)
Expand Down Expand Up @@ -160,20 +178,32 @@ trait NonAggregateFunctions {
*
* apache/spark
*/
def concat[T](c1: AbstractTypedColumn[T, String],
rest: AbstractTypedColumn[T, String]*): c1.ThisType[T, String] =
c1.typed(untyped.concat((c1 +: rest).map(_.untyped): _*))
def concat[T](c1: TypedColumn[T, String], xs: TypedColumn[T, String]*): TypedColumn[T, String] =
c1.typed(untyped.concat((c1 +: xs).map(_.untyped): _*))

/** Non-Aggregate function: Concatenates multiple input string columns together into a single string column,
* using the given separator.
*
* apache/spark
*/
def concatWs[T](sep: String, c1: TypedColumn[T, String], xs: TypedColumn[T, String]*): TypedColumn[T, String] =
c1.typed(untyped.concat_ws(sep, (c1 +: xs).map(_.untyped): _*))

/** Non-Aggregate function: Concatenates multiple input string columns together into a single string column.
*
* apache/spark
*/
def concat[T](c1: TypedAggregate[T, String], xs: TypedAggregate[T, String]*): TypedAggregate[T, String] =
c1.typed(untyped.concat((c1 +: xs).map(_.untyped): _*))


/** Non-Aggregate function: Concatenates multiple input string columns together into a single string column,
* using the given separator.
*
* apache/spark
*/
def concatWs[T](sep: String,
c1: AbstractTypedColumn[T, String],
rest: AbstractTypedColumn[T, String]*): c1.ThisType[T, String] =
c1.typed(untyped.concat_ws(sep, (c1 +: rest).map(_.untyped): _*))
def concatWs[T](sep: String, c1: TypedAggregate[T, String], xs: TypedAggregate[T, String]*): TypedAggregate[T, String] =
c1.typed(untyped.concat_ws(sep, (c1 +: xs).map(_.untyped): _*))

/** Non-Aggregate function: Locates the position of the first occurrence of substring column
* in given string
Expand All @@ -198,7 +228,14 @@ trait NonAggregateFunctions {
*
* apache/spark
*/
def levenshtein[T](l: AbstractTypedColumn[T, String], r: AbstractTypedColumn[T, String]): l.ThisType[T, Int] =
def levenshtein[T](l: TypedColumn[T, String], r: TypedColumn[T, String]): TypedColumn[T, Int] =
l.typed(untyped.levenshtein(l.untyped, r.untyped))

/** Non-Aggregate function: Computes the Levenshtein distance of the two given string columns.
*
* apache/spark
*/
def levenshtein[T](l: TypedAggregate[T, String], r: TypedAggregate[T, String]): TypedAggregate[T, Int] =
l.typed(untyped.levenshtein(l.untyped, r.untyped))

/** Non-Aggregate function: Converts a string column to lower case.
Expand Down

0 comments on commit bc49c8a

Please sign in to comment.