Skip to content

Commit

Permalink
[FLINK-2207] Fix TableAPI conversion documenation and further renamin…
Browse files Browse the repository at this point in the history
…gs for consistency.

This closes apache#829
  • Loading branch information
fhueske committed Jun 12, 2015
1 parent e45c5dc commit e4b5695
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 19 deletions.
8 changes: 4 additions & 4 deletions docs/libs/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.flink.api.scala.table._
case class WC(word: String, count: Int)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toTable
val result = expr.groupBy('word).select('word, 'count.sum as 'count).toSet[WC]
val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC]
{% endhighlight %}

The expression DSL uses Scala symbols to refer to field names and we use code generation to
Expand All @@ -69,7 +69,7 @@ case class MyResult(a: String, d: Int)

val input1 = env.fromElements(...).toTable('a, 'b)
val input2 = env.fromElements(...).toTable('c, 'd)
val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toSet[MyResult]
val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toDataSet[MyResult]
{% endhighlight %}

Notice, how a DataSet can be converted to a Table by using `as` and specifying new
Expand Down Expand Up @@ -108,14 +108,14 @@ DataSet<WC> input = env.fromElements(
new WC("Ciao", 1),
new WC("Hello", 1));

Table table = tableEnv.toTable(input);
Table table = tableEnv.fromDataSet(input);

Table filtered = table
.groupBy("word")
.select("word.count as count, word")
.filter("count = 2");

DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
{% endhighlight %}

When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class TableConversions(table: Table) {
/**
* Converts the [[Table]] to a [[DataSet]].
*/
def toSet[T: TypeInformation]: DataSet[T] = {
def toDataSet[T: TypeInformation]: DataSet[T] = {
new ScalaBatchTranslator().translate[T](table.operation)
}

/**
* Converts the [[Table]] to a [[DataStream]].
*/
def toStream[T: TypeInformation]: DataStream[T] = {
def toDataStream[T: TypeInformation]: DataStream[T] = {
new ScalaStreamingTranslator().translate[T](table.operation)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.flink.api.table.plan._
* val table = set.toTable('a, 'b)
* ...
* val table2 = ...
* val set = table2.toSet[MyType]
* val set = table2.toDataSet[MyType]
* }}}
*/
case class Table(private[flink] val operation: PlanNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object PageRankTable {
val newRanks = currentRanks.toTable
// distribute ranks to target pages
.join(adjacencyLists).where('pageId === 'sourceId)
.select('rank, 'targetIds).toSet[RankOutput]
.select('rank, 'targetIds).toDataSet[RankOutput]
.flatMap {
(in, out: Collector[(Long, Double)]) =>
val targets = in.targetIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object StreamingTableFilter {
val cars = genCarStream().toTable
.filter('carId === 0)
.select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
.toStream[CarEvent]
.toDataStream[CarEvent]

cars.print()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod

val filterDs = ds.filter( Literal(false) )

filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "\n"
}
Expand All @@ -76,7 +76,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod

val filterDs = ds.filter( Literal(true) )

filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
Expand Down Expand Up @@ -109,7 +109,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod

val filterDs = ds.filter( 'a % 2 === 0 )

filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
}
Expand All @@ -70,7 +70,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "Hi,Hallo\n"
}
Expand All @@ -83,7 +83,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
Expand All @@ -97,7 +97,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = ""
}
Expand All @@ -110,7 +110,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = ""
}
Expand All @@ -123,7 +123,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = ""
}
Expand All @@ -136,7 +136,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)

val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)

joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "6"
}
Expand Down

0 comments on commit e4b5695

Please sign in to comment.