Skip to content

Commit

Permalink
Merge pull request typelevel#125 from imarios/docs_update_0.3
Browse files Browse the repository at this point in the history
Refining/updating Docs for 0.3.0
  • Loading branch information
OlivierBlanvillain committed May 9, 2017
2 parents f1d9c69 + 18f2510 commit 0be9867
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# TypedDataset: Feature Overview

This tutorial introduces `TypedDataset`s through a small toy example.
This tutorial introduces `TypedDataset` using a simple example.
The following imports are needed to make all code examples compile.

```tut:silent
Expand All @@ -19,13 +19,13 @@ import spark.implicits._

## Creating TypedDataset instances

We start by defining a simple case class that will be the basis of our examples.
We start by defining a case class:

```tut:silent
case class Apartment(city: String, surface: Int, price: Double)
```

And let's define a few `Apartment` instances:
And few `Apartment` instances:

```tut:silent
val apartments = Seq(
Expand All @@ -41,110 +41,118 @@ val apartments = Seq(
We are now ready to instantiate a `TypedDataset[Apartment]`:

```tut:book
val apartmentsTypedDS = TypedDataset.create(apartments)
val aptTypedDs = TypedDataset.create(apartments)
```

We can also create it from an existing `Dataset`:
We can also create one from an existing Spark `Dataset`:

```tut:book
val apartmentsDS = spark.createDataset(apartments)
val apartmentsTypedDS = TypedDataset.create(apartmentsDS)
val aptDs = spark.createDataset(apartments)
val aptTypedDs = TypedDataset.create(aptDs)
```

Or use the frameless syntax:
Or use the Frameless syntax:

```tut:book
import frameless.syntax._
val apartmentsTypedDS2 = spark.createDataset(apartments).typed
val aptTypedDs2 = aptDs.typed
```

## Typesafe column referencing
This is how we select a particular column from a `TypedDataset`:

```tut:book
val cities: TypedDataset[String] = apartmentsTypedDS.select(apartmentsTypedDS('city))
val cities: TypedDataset[String] = aptTypedDs.select(aptTypedDs('city))
```

This is completely safe, for instance suppose we misspell `city`:
This is completely type-safe, for instance suppose we misspell `city` as `citi`:

```tut:book:fail
apartmentsTypedDS.select(apartmentsTypedDS('citi))
aptTypedDs.select(aptTypedDs('citi))
```

This gets caught at compile-time, whereas with traditional Spark `Dataset` the error appears at run-time.
This gets raised at compile-time, whereas with the standard `Dataset` API the error appears at run-time (enjoy the stack trace):

```tut:book:fail
apartmentsDS.select('citi)
aptDs.select('citi)
```

`select()` supports arbitrary column operations:

```tut:book
apartmentsTypedDS.select(apartmentsTypedDS('surface) * 10, apartmentsTypedDS('surface) + 2).show().run()
aptTypedDs.select(aptTypedDs('surface) * 10, aptTypedDs('surface) + 2).show().run()
```

*Note that unlike the standard Spark api, here `show()` is lazy. It requires to apply `run()` for the
`show` job to materialize.*
Note that unlike the standard Spark API where some operations are lazy and some are not, **TypedDatasets have all operations to be lazy.**
In the above example, `show()` is lazy. It requires to apply `run()` for the `show` job to materialize.
A more detailed explanation of `Job` is given [here](Job.md).


Let us now try to compute the price by surface unit:
Next we compute the price by surface unit:

```tut:book:fail
val priceBySurfaceUnit = apartmentsTypedDS.select(apartmentsTypedDS('price)/apartmentsTypedDS('surface)) ^
val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
```

Argh! Looks like we can't divide a `TypedColumn` of `Double` by `Int`.
Well, we can cast our `Int`s to `Double`s explicitly to proceed with the computation.
As the error suggests, we can't divide a `TypedColumn` of `Double` by `Int.`
For safety, in Frameless only math operations between same types is allowed.
There are two ways to proceed here:

(a) Explicitly cast `Int` to `Double` (manual)

```tut:book
val priceBySurfaceUnit = apartmentsTypedDS.select(apartmentsTypedDS('price)/apartmentsTypedDS('surface).cast[Double])
val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
priceBySurfaceUnit.collect().run()
```

Alternatively, we can perform the cast implicitly:
(b) Perform the cast implicitly (automated)

```tut:book
import frameless.implicits.widen._
val priceBySurfaceUnit = apartmentsTypedDS.select(apartmentsTypedDS('price)/apartmentsTypedDS('surface))
val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
priceBySurfaceUnit.collect.run()
```

Looks like it worked, but that `cast` looks unsafe right? Actually it is safe.
Looks like it worked, but that `cast` seems unsafe right? Actually it is safe.
Let's try to cast a `TypedColumn` of `String` to `Double`:

```tut:book:fail
apartmentsTypedDS('city).cast[Double]
aptTypedDs('city).cast[Double]
```

The compile-time error tells us that to perform the cast, an evidence (in the form of `CatalystCast[String, Double]`) must be available.
The compile-time error tells us that to perform the cast, an evidence
(in the form of `CatalystCast[String, Double]`) must be available.
Since casting from `String` to `Double` is not allowed, this results
in a compilation error.

Check [here](https://github.com/adelbertc/frameless/blob/master/core/src/main/scala/frameless/CatalystCast.scala) for the set of available `CatalystCast`.
Check [here](https://github.com/typelevel/frameless/blob/master/core/src/main/scala/frameless/CatalystCast.scala)
for the set of available `CatalystCast.`

## TypeSafe TypedDataset casting and projections

With `select()` the resulting TypedDataset is of type `TypedDataset[TupleN[...]]` (with N in `[1...10]`).
For example, if we select three columns with types `String`, `Int`, and `Boolean` the result will have type
`TypedDataset[(String, Int, Boolean)]`.
We often want to give more expressive types to the result of our computations.
`TypedDataset[(String, Int, Boolean)]`. To select more than ten columns use the `selectMany()` method.
Select has better IDE support than the macro based selectMany, so prefer `select()` for the general case.

We often want to give more expressive types to the result of our computations.
`as[T]` allows us to safely cast a `TypedDataset[U]` to another of type `TypedDataset[T]` as long
as the types in `U` and `T` align.

The cast is valid and the expression compiles:
When the cast is valid the expression compiles:

```tut:book
case class UpdatedSurface(city: String, surface: Int)
val updated = apartmentsTypedDS.select(apartmentsTypedDS('city), apartmentsTypedDS('surface) + 2).as[UpdatedSurface]
val updated = aptTypedDs.select(aptTypedDs('city), aptTypedDs('surface) + 2).as[UpdatedSurface]
updated.show(2).run()
```

Next we try to cast a `(String, String)` to an `UpdatedSurface` (which has types `String`, `Int`).
The cast is not valid and the expression does not compile:

```tut:book:fail
apartmentsTypedDS.select(apartmentsTypedDS('city), apartmentsTypedDS('city)).as[UpdatedSurface]
aptTypedDs.select(aptTypedDs('city), aptTypedDs('city)).as[UpdatedSurface]
```

### Projections
Expand All @@ -158,7 +166,7 @@ Here is an example using the `TypedDataset[Apartment]` with an additional column
```tut:book
import frameless.implicits.widen._
val aptds = apartmentsTypedDS // For shorter expressions
val aptds = aptTypedDs // For shorter expressions
case class ApartmentDetails(city: String, price: Double, surface: Int, ratio: Double)
val aptWithRatio = aptds.select(aptds('city), aptds('price), aptds('surface), aptds('price) / aptds('surface)).as[ApartmentDetails]
Expand All @@ -184,7 +192,7 @@ val priceInfo = aptWithRatio.project[PriceInfo]
priceInfo.show(2).run()
```

We see here that the order of the fields doesn't matter as long as the
We see that the order of the fields does not matter as long as the
names and the corresponding types agree. However, if we make a mistake in
any of the names and/or their types, then we get a compilation error.

Expand Down Expand Up @@ -218,9 +226,9 @@ context of a particular `TypedDataset`:
val priceModifier =
(name: String, price:Double) => if(name == "Paris") price * 2.0 else price
val udf = apartmentsTypedDS.makeUDF(priceModifier)
val udf = aptTypedDs.makeUDF(priceModifier)
val aptds = apartmentsTypedDS // For shorter expressions
val aptds = aptTypedDs // For shorter expressions
val adjustedPrice = aptds.select(aptds('city), udf(aptds('city), aptds('price)))
Expand All @@ -230,18 +238,18 @@ adjustedPrice.show().run()
## GroupBy and Aggregations
Let's suppose we wanted to retrieve the average apartment price in each city
```tut:book
val priceByCity = apartmentsTypedDS.groupBy(apartmentsTypedDS('city)).agg(avg(apartmentsTypedDS('price)))
val priceByCity = aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('price)))
priceByCity.collect().run()
```
Again if we try to aggregate a column that can't be aggregated, we get a compilation error
```tut:book:fail
apartmentsTypedDS.groupBy(apartmentsTypedDS('city)).agg(avg(apartmentsTypedDS('city))) ^
aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city))) ^
```

Next, we combine `select` and `groupBy` to calculate the average price/surface ratio per city:

```tut:book
val aptds = apartmentsTypedDS // For shorter expressions
val aptds = aptTypedDs // For shorter expressions
val cityPriceRatio = aptds.select(aptds('city), aptds('price) / aptds('surface))
Expand All @@ -265,7 +273,7 @@ val citiInfoTypedDS = TypedDataset.create(cityInfo)
Here is how to join the population information to the apartment's dataset.

```tut:book
val withCityInfo = apartmentsTypedDS.join(citiInfoTypedDS, apartmentsTypedDS('city), citiInfoTypedDS('name))
val withCityInfo = aptTypedDs.join(citiInfoTypedDS, aptTypedDs('city), citiInfoTypedDS('name))
withCityInfo.show().run()
```
Expand Down
11 changes: 9 additions & 2 deletions docs/src/main/tut/Injection.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
```
Injection lets us define encoders for types that do not have one, by injecting `A` into an encodable type `B`.
Injection lets us define encoders for types that do not have one by injecting `A` into an encodable type `B`.
This is the definition of the injection typeclass:
```scala
trait Injection[A, B] extends Serializable {
Expand Down Expand Up @@ -47,6 +47,13 @@ implicit val dateToLongInjection = new Injection[java.util.Date, Long] {
}
```

We can be less verbose using the `Injection.apply` function:

```tut:book
import frameless._
implicit val dateToLongInjection = Injection((_: java.util.Date).getTime(), new java.util.Date((_: Long)))
```

Now we can create our `TypedDataset`:

```tut:book
Expand All @@ -71,7 +78,7 @@ case class Person(age: Int, gender: Gender)
val people = Seq(Person(42, Male))
```

Again if we try to create a `TypedDataset`, we get an error.
Again if we try to create a `TypedDataset`, we get a compilation error.

```tut:book:fail
val personDS = TypedDataset.create(people)
Expand Down
70 changes: 69 additions & 1 deletion docs/src/main/tut/Job.md
Original file line number Diff line number Diff line change
@@ -1 +1,69 @@
# Job
# Job\[A\]

All operations on `TypedDataset` are lazy. An operation either returns a new
transformed `TypedDataset` or a `Job[A]`, where `A` is the result of running a
non-lazy computation in Spark. `Job` serves several functions:

- Makes all operations on a `TypedDataset` lazy, which makes them more predictable compared to having
few operations being lazy and other being strict:
- Allows the programmer to make expensive blocking operations explicit
- Allows for Spark jobs to be lazily sequenced using monadic composition via for-comprehension
- Provides an obvious place where you can annotate/name your Spark jobs to make it easier
to track different parts of your application in the Spark UI

The toy example showcases the use of for-comprehension to explicitly sequences Spark Jobs.
First we calculate the size of the `TypedDataset` and then we collect to the driver
exactly 20% of its elements:

```tut:invisible
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import frameless.functions.aggregate._
import frameless.TypedDataset
val conf = new SparkConf().setMaster("local[*]").setAppName("frameless repl").set("spark.ui.enabled", "false")
val spark = SparkSession.builder().config(conf).appName("REPL").getOrCreate()
implicit val sqlContext = spark.sqlContext
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
```

```tut:book
val ds = TypedDataset.create(1 to 20)
val countAndTakeJob =
for {
count <- ds.count()
sample <- ds.take((count/5).toInt)
} yield sample
countAndTakeJob.run()
```

The `countAndTakeJob` can either be executed using `run()` (as we show above) or it can
be passed along to other parts of the program to be further composed into more complex sequences
of Spark jobs.

```tut:book
import frameless.Job
def computeMinOfSample(sample: Job[Seq[Int]]): Job[Int] = sample.map(_.min)
val finalJob = computeMinOfSample(countAndTakeJob)
```

Now we can execute this new job by specifying a [group-id](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@setJobGroup(groupId:String,description:String,interruptOnCancel:Boolean):Unit) and a description.
This allows the programmer to see this information on the Spark UI and help track, say,
performance issues.

```tut:book
finalJob.
withGroupId("samplingJob").
withDescription("Samples 20% of elements and computes the min").
run()
```


```tut:invisible
spark.stop()
```
1 change: 1 addition & 0 deletions docs/src/main/tut/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
- [Comparing TypedDatasets with Spark's Datasets](TypedDatasetVsSparkDataset.md)
- [Typed Encoders in Frameless](TypedEncoder.md)
- [Injection: Creating Custom Encoders](Injection.md)
- [Job\[A\]](Job.md)
- [Using Cats with RDDs](Cats.md)
- [Proof of Concept: TypedDataFrame](TypedDataFrame.md)
34 changes: 33 additions & 1 deletion docs/src/main/tut/TypedDatasetVsSparkDataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,40 @@ And the compiler is our friend.
fds.filter( fds('i) === 10 ).select( fds('x) )
```

## Differences in Encoders

Encoders in Spark's `Datasets` are partially type-safe. If you try to create a `Dataset` using a type that is not
a Scala `Product` then you get a compilation error:

```tut:book
class Bar(i: Int)
```

`Bar` is neither a case class nor a `Product`, so the following correctly gives a compilation error in Spark:

```tut:fail
spark.createDataset(Seq(new Bar(1)))
```

However, the compile type guards implemented in Spark are not sufficient to detect non encodable members.
For example, using the following case class leads to a runtime failure:

```tut:book
case class MyDate(jday: java.util.Date)
```

```tut:book:fail
val myDateDs = spark.createDataset(Seq(MyDate(new java.util.Date(System.currentTimeMillis))))
```

In comparison, a TypedDataset will notify about the encoding problem at compile time:

```tut:book:fail
TypedDataset.create(Seq(MyDate(new java.util.Date(System.currentTimeMillis))))
```


```tut:invisible
org.apache.commons.io.FileUtils.deleteDirectory(new java.io.File("/tmp/foo/"))
spark.stop()
```

0 comments on commit 0be9867

Please sign in to comment.