Skip to content

Commit

Permalink
[FLINK-5452] [table] Fix SortITCase which fails under cluster mode.
Browse files Browse the repository at this point in the history
This closes apache#3095.
  • Loading branch information
KurtYoung authored and fhueske committed Jan 19, 2017
1 parent 0c6e0ee commit 0ea996a
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,126 +18,151 @@

package org.apache.flink.table.api.scala.batch.sql

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized

import scala.collection.JavaConverters._

@RunWith(classOf[Parameterized])
class SortITCase(
configMode: TableConfigMode)
extends TableProgramsCollectionTestBase(configMode) {
class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {

private def getExecutionEnvironment = {
val env = ExecutionEnvironment.getExecutionEnvironment
// set the parallelism explicitly to make sure the query is executed in
// a distributed manner
env.setParallelism(3)
env
}

@Test
def testOrderByMultipleFieldsWithSql(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"

implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
(- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

def rowOrdering = Ordering.by((r : Row) => {
// ordering for this tuple will fall into the previous defined tupleOrdering,
// so we just need to return the field by their defining sequence
(r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
})

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(rowOrdering)
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}

@Test
def testOrderByWithOffset(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"

implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
// squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

val result = results.
filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}

@Test
def testOrderByWithOffsetAndFetch(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"

implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
// squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}

@Test
def testOrderByLimit(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"

implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
(x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
// squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

def rowOrdering = Ordering.by((r : Row) => {
// ordering for this tuple will fall into the previous defined tupleOrdering,
// so we just need to return the field by their defining sequence
(r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
})

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(rowOrdering)
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}

@Test(expected = classOf[TableException])
def testLimitWithoutOrder(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@

package org.apache.flink.table.api.scala.batch.table

import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized

import scala.collection.JavaConverters._

@RunWith(classOf[Parameterized])
class SortITCase(
configMode: TableConfigMode)
extends TableProgramsCollectionTestBase(configMode) {
class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {

def getExecutionEnvironment = {
private def getExecutionEnvironment = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
// set the parallelism explicitly to make sure the query is executed in
// a distributed manner
env.setParallelism(3)
env
}

Expand All @@ -51,16 +51,18 @@ class SortITCase(

val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.desc)
implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )

val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
Expand All @@ -72,16 +74,18 @@ class SortITCase(

val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.asc)
implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )

val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
Expand All @@ -93,16 +97,24 @@ class SortITCase(

val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
(x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )

val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

def rowOrdering = Ordering.by((r : Row) => {
// ordering for this tuple will fall into the previous defined tupleOrdering,
// so we just need to return the field by their defining sequence
(r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
})

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(rowOrdering)
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
Expand All @@ -114,16 +126,18 @@ class SortITCase(

val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )

val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
// squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
Expand All @@ -135,16 +149,18 @@ class SortITCase(

val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )

val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
// squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
Expand All @@ -156,16 +172,20 @@ class SortITCase(

val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )

val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
// squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()

implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])

val result = results
.filterNot(_.isEmpty)
.sortBy(_.head)(Ordering.by(f=> f.toString))
.reduceLeft(_ ++ _)
.filterNot(_.isEmpty)
// sort all partitions by their head element to verify the order across partitions
.sortBy(_.head)
.reduceLeft(_ ++ _)

TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
Expand Down

0 comments on commit 0ea996a

Please sign in to comment.