Skip to content

Commit

Permalink
Making it easier to work with Optional columns. (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
imarios committed Jan 2, 2021
1 parent 0ed3d05 commit 08a2979
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
32 changes: 31 additions & 1 deletion dataset/src/main/scala/frameless/TypedColumn.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package frameless

import frameless.functions.{lit => flit, litAggr}
import frameless.functions.{litAggr, lit => flit}
import frameless.syntax._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DecimalType
Expand Down Expand Up @@ -67,6 +67,36 @@ abstract class AbstractTypedColumn[T, U]

type ThisType[A, B] <: AbstractTypedColumn[A, B]

/** A helper class to make to simplify working with Optional fields.
*
* {{{
* val x: TypedColumn[Option[Int]] = _
* x.opt.map(_*2) // This only compiles if the type of x is Option[X] (in this example X is of type Int)
* }}}
*
* @note Known issue: map() will NOT work when the applied function is a udf().
* It will compile and then throw a runtime error.
**/
trait Mapper[X] {
def map[G, OutputType[_,_]](u: ThisType[T, X] => OutputType[T,G])
(implicit
ev: OutputType[T,G] <:< AbstractTypedColumn[T, G]
): OutputType[T, Option[G]] = {
u(self.asInstanceOf[ThisType[T, X]]).asInstanceOf[OutputType[T, Option[G]]]
}
}

/** Makes it easier to work with Optional columns. It returns an instance of `Mapper[X]`
* where `X` is type of the unwrapped Optional. E.g., in the case of `Option[Long]`,
* `X` is of type Long.
*
* {{{
* val x: TypedColumn[Option[Int]] = _
* x.opt.map(_*2)
* }}}
* */
def opt[X](implicit x: U <:< Option[X]): Mapper[X] = new Mapper[X] {}

/** Fall back to an untyped Column */
def untyped: Column = new Column(expr)

Expand Down
14 changes: 14 additions & 0 deletions dataset/src/test/scala/frameless/ColumnTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,4 +411,18 @@ class ColumnTests extends TypedDatasetSuite with Matchers {
"ds.select(!ds('_2))" shouldNot typeCheck
"ds.select(!ds('_3))" shouldNot typeCheck
}

test("opt") {
val data = (Option(1L), Option(2L)) :: (None, None) :: Nil
val ds = TypedDataset.create(data)
val rs = ds.select(ds('_1).opt.map(_ * 2), ds('_1).opt.map(_ + 2)).collect().run()
val expected = data.map { case (x, y) => (x.map(_ * 2), y.map(_ + 1)) }
rs shouldEqual expected
}

test("opt compiles only for columns of type Option[_]") {
val ds = TypedDataset.create((1, List(1,2,3)) :: Nil)
"ds.select(ds('_1).opt.map(x => x))" shouldNot typeCheck
"ds.select(ds('_2).opt.map(x => x))" shouldNot typeCheck
}
}
41 changes: 41 additions & 0 deletions docs/src/main/tut/FeatureOverview.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,47 @@ in a compilation error.
Check [here](https://github.com/typelevel/frameless/blob/master/core/src/main/scala/frameless/CatalystCast.scala)
for the set of available `CatalystCast.`

## Working with Optional columns

When working with real data we have to deal with imperfections, such as missing fields. Columns that may have
missing data should be represented using `Options`. For this example, let's assume that the Apartments dataset
may have missing values.

```tut:silent
case class ApartmentOpt(city: Option[String], surface: Option[Int], price: Option[Double], bedrooms: Option[Int])
```

```tut:silent
val apartmentsOpt = Seq(
ApartmentOpt(Some("Paris"), Some(50), Some(300000.0), None),
ApartmentOpt(None, None, Some(450000.0), Some(3))
)
```

```tut:book
val aptTypedDsOpt = TypedDataset.create(apartmentsOpt)
aptTypedDsOpt.show().run()
```

Unfortunately the syntax used above with `select()` will not work here:

```tut:book:fail
aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
```

This is because we cannot multiple an `Option` with an `Int`. In Scala, `Option` has a `map()` method to help address
exactly this (e.g., `Some(10).map(c => c * 2)`). Frameless follows a similar convention. By applying the `opt` method on
any `Option[X]` column you can then use `map()` to provide a function that works with the unwrapped type `X`.
This is best shown in the example bellow:

```tut:book
aptTypedDsOpt.select(aptTypedDsOpt('surface).opt.map(c => c * 10), aptTypedDsOpt('surface).opt.map(_ + 2)).show().run()
```

**Known issue**: `map()` will throw a runtime exception when the applied function includes a `udf()`. If you want to
apply a `udf()` to an optional column, we recommend changing your `udf` to work directly with `Optional` fields.


## Casting and projections

In the general case, `select()` returns a TypedDataset of type `TypedDataset[TupleN[...]]` (with N in `[1...10]`).
Expand Down

0 comments on commit 08a2979

Please sign in to comment.