From 08a2979c486a77ad043957cd263bae5d57a5a966 Mon Sep 17 00:00:00 2001 From: marios iliofotou Date: Fri, 1 Jan 2021 16:26:09 -0800 Subject: [PATCH] Making it easier to work with Optional columns. (#479) --- .../main/scala/frameless/TypedColumn.scala | 32 ++++++++++++++- .../test/scala/frameless/ColumnTests.scala | 14 +++++++ docs/src/main/tut/FeatureOverview.md | 41 +++++++++++++++++++ 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/dataset/src/main/scala/frameless/TypedColumn.scala b/dataset/src/main/scala/frameless/TypedColumn.scala index 32da7e06..c96550b6 100644 --- a/dataset/src/main/scala/frameless/TypedColumn.scala +++ b/dataset/src/main/scala/frameless/TypedColumn.scala @@ -1,6 +1,6 @@ package frameless -import frameless.functions.{lit => flit, litAggr} +import frameless.functions.{litAggr, lit => flit} import frameless.syntax._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.DecimalType @@ -67,6 +67,36 @@ abstract class AbstractTypedColumn[T, U] type ThisType[A, B] <: AbstractTypedColumn[A, B] + /** A helper class to make to simplify working with Optional fields. + * + * {{{ + * val x: TypedColumn[Option[Int]] = _ + * x.opt.map(_*2) // This only compiles if the type of x is Option[X] (in this example X is of type Int) + * }}} + * + * @note Known issue: map() will NOT work when the applied function is a udf(). + * It will compile and then throw a runtime error. + **/ + trait Mapper[X] { + def map[G, OutputType[_,_]](u: ThisType[T, X] => OutputType[T,G]) + (implicit + ev: OutputType[T,G] <:< AbstractTypedColumn[T, G] + ): OutputType[T, Option[G]] = { + u(self.asInstanceOf[ThisType[T, X]]).asInstanceOf[OutputType[T, Option[G]]] + } + } + + /** Makes it easier to work with Optional columns. It returns an instance of `Mapper[X]` + * where `X` is type of the unwrapped Optional. E.g., in the case of `Option[Long]`, + * `X` is of type Long. + * + * {{{ + * val x: TypedColumn[Option[Int]] = _ + * x.opt.map(_*2) + * }}} + * */ + def opt[X](implicit x: U <:< Option[X]): Mapper[X] = new Mapper[X] {} + /** Fall back to an untyped Column */ def untyped: Column = new Column(expr) diff --git a/dataset/src/test/scala/frameless/ColumnTests.scala b/dataset/src/test/scala/frameless/ColumnTests.scala index 558be6a4..58ff98da 100644 --- a/dataset/src/test/scala/frameless/ColumnTests.scala +++ b/dataset/src/test/scala/frameless/ColumnTests.scala @@ -411,4 +411,18 @@ class ColumnTests extends TypedDatasetSuite with Matchers { "ds.select(!ds('_2))" shouldNot typeCheck "ds.select(!ds('_3))" shouldNot typeCheck } + + test("opt") { + val data = (Option(1L), Option(2L)) :: (None, None) :: Nil + val ds = TypedDataset.create(data) + val rs = ds.select(ds('_1).opt.map(_ * 2), ds('_1).opt.map(_ + 2)).collect().run() + val expected = data.map { case (x, y) => (x.map(_ * 2), y.map(_ + 1)) } + rs shouldEqual expected + } + + test("opt compiles only for columns of type Option[_]") { + val ds = TypedDataset.create((1, List(1,2,3)) :: Nil) + "ds.select(ds('_1).opt.map(x => x))" shouldNot typeCheck + "ds.select(ds('_2).opt.map(x => x))" shouldNot typeCheck + } } diff --git a/docs/src/main/tut/FeatureOverview.md b/docs/src/main/tut/FeatureOverview.md index 34107fdc..643bbb85 100644 --- a/docs/src/main/tut/FeatureOverview.md +++ b/docs/src/main/tut/FeatureOverview.md @@ -116,6 +116,47 @@ in a compilation error. Check [here](https://github.com/typelevel/frameless/blob/master/core/src/main/scala/frameless/CatalystCast.scala) for the set of available `CatalystCast.` +## Working with Optional columns + +When working with real data we have to deal with imperfections, such as missing fields. Columns that may have +missing data should be represented using `Options`. For this example, let's assume that the Apartments dataset +may have missing values. + +```tut:silent +case class ApartmentOpt(city: Option[String], surface: Option[Int], price: Option[Double], bedrooms: Option[Int]) +``` + +```tut:silent +val apartmentsOpt = Seq( + ApartmentOpt(Some("Paris"), Some(50), Some(300000.0), None), + ApartmentOpt(None, None, Some(450000.0), Some(3)) +) +``` + +```tut:book +val aptTypedDsOpt = TypedDataset.create(apartmentsOpt) +aptTypedDsOpt.show().run() +``` + +Unfortunately the syntax used above with `select()` will not work here: + +```tut:book:fail +aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run() +``` + +This is because we cannot multiple an `Option` with an `Int`. In Scala, `Option` has a `map()` method to help address +exactly this (e.g., `Some(10).map(c => c * 2)`). Frameless follows a similar convention. By applying the `opt` method on +any `Option[X]` column you can then use `map()` to provide a function that works with the unwrapped type `X`. +This is best shown in the example bellow: + + ```tut:book + aptTypedDsOpt.select(aptTypedDsOpt('surface).opt.map(c => c * 10), aptTypedDsOpt('surface).opt.map(_ + 2)).show().run() + ``` + +**Known issue**: `map()` will throw a runtime exception when the applied function includes a `udf()`. If you want to +apply a `udf()` to an optional column, we recommend changing your `udf` to work directly with `Optional` fields. + + ## Casting and projections In the general case, `select()` returns a TypedDataset of type `TypedDataset[TupleN[...]]` (with N in `[1...10]`).